Creating Custom Teams#
This guide explains how to create custom team types by extending the base Team class to implement specialized collaboration patterns.
Overview#
Custom teams allow you to:
Define unique agent coordination strategies
Implement domain-specific collaboration patterns
Control message flow between agents
Add custom logic for task distribution
Base Class#
All teams inherit from the Team base class:
from pantheon.team.base import Team
from pantheon.agent import Agent, AgentInput
class Team(ABC):
def __init__(self, agents: list[Agent]):
self.agents = {} # Dict[name, agent]
self.events_queue = asyncio.Queue()
async def async_setup(self):
"""Optional setup before running."""
pass
async def run(self, msg: AgentInput, **kwargs):
"""Execute the team's collaboration pattern."""
pass
async def chat(self, message: str | dict | None = None):
"""Start interactive REPL session."""
pass
Creating a Custom Team#
Basic Structure#
from pantheon.team.base import Team
from pantheon.agent import Agent, AgentInput
class MyCustomTeam(Team):
def __init__(self, agents: list[Agent], my_param: str = "default"):
super().__init__(agents)
self.my_param = my_param
async def run(self, msg: AgentInput, **kwargs):
# Implement your collaboration logic
pass
Example: Round-Robin Team#
A team where agents take turns responding until a condition is met:
from pantheon.team.base import Team
from pantheon.agent import Agent, AgentInput
class RoundRobinTeam(Team):
"""Team where agents take turns responding."""
def __init__(
self,
agents: list[Agent],
max_rounds: int = 3,
):
super().__init__(agents)
self.agent_order = list(self.agents.keys())
self.max_rounds = max_rounds
async def run(self, msg: AgentInput, **kwargs):
# Convert input to message history
first_agent = self.agents[self.agent_order[0]]
history = first_agent.input_to_openai_messages(msg, False)
result = None
for round_num in range(self.max_rounds):
for agent_name in self.agent_order:
agent = self.agents[agent_name]
# Run agent with current history
result = await agent.run(history, **kwargs)
# Add agent's response to history
history.append({
"role": "assistant",
"content": f"[{agent_name}]: {result.content}"
})
# Check if task is complete
if self._is_complete(result):
return result
return result
def _is_complete(self, result) -> bool:
"""Override to define completion criteria."""
return "COMPLETE" in result.content.upper()
Example: Voting Team#
A team where multiple agents vote on a decision:
from collections import Counter
from pantheon.team.base import Team
from pantheon.agent import Agent, AgentInput
class VotingTeam(Team):
"""Team where agents vote on decisions."""
def __init__(
self,
agents: list[Agent],
vote_prompt: str = "Vote YES or NO:",
):
super().__init__(agents)
self.vote_prompt = vote_prompt
async def run(self, msg: AgentInput, **kwargs):
import asyncio
# Create voting prompt
first_agent = list(self.agents.values())[0]
history = first_agent.input_to_openai_messages(msg, False)
history.append({"role": "user", "content": self.vote_prompt})
# Collect votes from all agents in parallel
async def get_vote(agent):
result = await agent.run(history, **kwargs)
return self._parse_vote(result.content)
tasks = [get_vote(agent) for agent in self.agents.values()]
votes = await asyncio.gather(*tasks)
# Tally votes
vote_counts = Counter(votes)
winner = vote_counts.most_common(1)[0]
# Create result
from pantheon.agent import AgentOutput
return AgentOutput(
content=f"Vote result: {winner[0]} ({winner[1]}/{len(votes)} votes)",
details={"votes": dict(vote_counts)}
)
def _parse_vote(self, content: str) -> str:
content_upper = content.upper()
if "YES" in content_upper:
return "YES"
elif "NO" in content_upper:
return "NO"
return "ABSTAIN"
Example: Hierarchical Team#
A team with manager and worker agents:
from pantheon.team.base import Team
from pantheon.agent import Agent, AgentInput
class HierarchicalTeam(Team):
"""Team with a manager who delegates to workers."""
def __init__(
self,
manager: Agent,
workers: list[Agent],
):
all_agents = [manager] + workers
super().__init__(all_agents)
self.manager = manager
self.workers = {w.name: w for w in workers}
async def async_setup(self):
# Add delegation tool to manager
@self.manager.tool
async def delegate(worker_name: str, task: str) -> str:
"""Delegate a task to a worker agent.
Args:
worker_name: Name of the worker to delegate to
task: The task description
"""
if worker_name not in self.workers:
return f"Unknown worker: {worker_name}"
worker = self.workers[worker_name]
result = await worker.run(task)
return f"[{worker_name}]: {result.content}"
@self.manager.tool
def list_workers() -> str:
"""List available worker agents."""
return "\n".join([
f"- {name}: {w.instructions[:100]}..."
for name, w in self.workers.items()
])
async def run(self, msg: AgentInput, **kwargs):
# Manager handles the request, delegating as needed
return await self.manager.run(msg, **kwargs)
Key Methods to Implement#
run()#
The main entry point for team execution:
async def run(self, msg: AgentInput, **kwargs):
"""
Execute the team's collaboration pattern.
Args:
msg: User input (str, dict, or list of messages)
**kwargs: Additional arguments passed to agents
Returns:
AgentOutput: The final response from the team
"""
# Your collaboration logic here
pass
async_setup()#
Optional initialization before first run:
async def async_setup(self):
"""Called once before the team starts processing."""
# Register tools, initialize resources, etc.
pass
gather_events()#
For advanced event handling (streaming, progress updates):
async def gather_events(self):
"""Collect events from all agents into team's event queue."""
async def _gather_agent_events(agent):
while True:
event = await agent.events_queue.get()
self.events_queue.put_nowait({
"agent_name": agent.name,
"event": event,
})
tasks = [_gather_agent_events(a) for a in self.agents.values()]
await asyncio.gather(*tasks)
Working with Messages#
Convert input to message history:
# Convert AgentInput to OpenAI message format
agent = list(self.agents.values())[0]
history = agent.input_to_openai_messages(msg, False)
# Add messages to history
history.append({"role": "user", "content": "Next step"})
history.append({"role": "assistant", "content": response})
# Pass history between agents
result = await next_agent.run(history)
history.extend(result.details.messages)
Best Practices#
Call super().__init__(): Always initialize the base class with agents list
Use agent names: Access agents via
self.agents[name]dictionaryHandle errors gracefully: Wrap agent calls in try/except for robustness
Support streaming: Use
events_queuefor real-time updatesDocument behavior: Add clear docstrings explaining the collaboration pattern
Test edge cases: Handle scenarios with 1 agent, failed agents, etc.
Using Custom Teams#
import asyncio
from pantheon.agent import Agent
async def main():
# Create agents
agent1 = Agent(name="analyst", instructions="Analyze data.")
agent2 = Agent(name="writer", instructions="Write reports.")
agent3 = Agent(name="reviewer", instructions="Review content.")
# Create custom team
team = RoundRobinTeam(
agents=[agent1, agent2, agent3],
max_rounds=2
)
# Run task
result = await team.run("Analyze Q4 sales data and write a report")
print(result.content)
# Or use interactive chat
await team.chat()
asyncio.run(main())