Source code for pantheon.team.swarm
from pantheon.agent import Agent, AgentInput, AgentResponse, AgentTransfer, RemoteAgent
from pantheon.internal.memory import Memory
from pantheon.utils.log import logger
from pantheon.utils.misc import run_func
from .base import Team
[docs]
class SwarmTeam(Team):
"""Team that run agents in handoff & routines patterns like
OpenAI's [Swarm framework](https://github.com/openai/swarm).
"""
[docs]
def __init__(self, agents: list[Agent | RemoteAgent]):
super().__init__(agents)
[docs]
def get_active_agent(self, memory: Memory) -> Agent | RemoteAgent:
active_agent_name = memory.extra_data.get("active_agent")
if (active_agent_name is None) or (active_agent_name not in self.agents):
active_agent_name = list(self.agents.keys())[0]
logger.warning(
f"Active agent not found in memory, setting to {active_agent_name}"
)
memory.set_metadata("active_agent", active_agent_name)
active_agent = self.agents[active_agent_name]
return active_agent
[docs]
def set_active_agent(self, memory: Memory, agent_name: str):
memory.set_metadata("active_agent", agent_name)
[docs]
async def run(self, msg: AgentInput, memory: Memory | None = None, **kwargs):
if memory is None:
memory = Memory(name="swarm-team")
while True:
active_agent = self.get_active_agent(memory)
resp = await active_agent.run(msg, memory=memory, **kwargs)
if isinstance(resp, AgentTransfer):
self.set_active_agent(memory, resp.to_agent)
msg = resp
else:
return resp
[docs]
class SwarmCenterTeam(SwarmTeam):
"""Swarm team that has a central triage agent that decides which agent to handoff to."""
[docs]
def __init__(self, triage: Agent, agents: list[Agent | RemoteAgent]):
super().__init__([triage])
self.triage = triage
self._agents_to_add = agents
[docs]
async def add_agent(self, agent: Agent | RemoteAgent):
if isinstance(agent, RemoteAgent):
await agent.fetch_info()
assert isinstance(agent.name, str), "Agent name must be a string"
agent_func_name = agent.name.replace(" ", "_").lower()
func_name = f"transfer_to_{agent_func_name}"
# Create transfer function using closure (no exec needed)
# Note: No return type annotation to avoid Pydantic serialization issues
def transfer_func(target_name: str = agent.name):
"""Transfer to the target agent."""
return self.agents[target_name]
transfer_func.__name__ = func_name
transfer_func.__doc__ = f"Transfer to {agent.name}."
await run_func(self.triage.tool, transfer_func)
# Transfer back to triage
# Note: No return type annotation to avoid Pydantic serialization issues
def transfer_back_to_triage():
"""Transfer back to the triage agent."""
return self.triage
await run_func(agent.tool, transfer_back_to_triage)
self.agents[agent.name] = agent
[docs]
async def remove_agent(self, agent: Agent | RemoteAgent):
assert isinstance(agent.name, str), "Agent name must be a string"
del self.agents[agent.name]
self.triage.functions.pop(f"transfer_to_{agent.name.replace(' ', '_').lower()}")
[docs]
async def async_setup(self):
while self._agents_to_add:
agent = self._agents_to_add.pop(0)
await self.add_agent(agent)
[docs]
async def run(self, msg: AgentInput, **kwargs):
await self.async_setup()
return await super().run(msg, **kwargs)