import asyncio
import copy
import inspect
import json
import random
import re
import sys
import time
from abc import ABC, abstractmethod
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
from uuid import uuid4
from funcdesc import parse_func
from pydantic import BaseModel, create_model
if TYPE_CHECKING:
from .internal.memory import Memory
from .remote import (
RemoteBackendFactory,
RemoteConfig,
RemoteWorker,
)
from .toolset import ToolSet
from .utils.llm import (
TimingTracker,
process_messages_for_hook_func,
process_messages_for_model,
count_tokens_in_messages,
format_token_visualization,
process_tool_result,
)
from .utils.llm_providers import (
ProviderType,
call_llm_provider,
create_enhanced_process_chunk,
detect_provider,
get_openai_effective_config,
)
from .utils.log import logger
from .utils.misc import desc_to_openai_dict, run_func
from .utils.tool_pairing import ensure_tool_result_pairing_with_stats
if TYPE_CHECKING:
from .utils.vision import VisionInput
def _get_default_model() -> list[str]:
"""Get default model fallback chain from ModelSelector.
Uses environment API keys to detect available providers and
returns appropriate default models.
Returns:
List of models as fallback chain
"""
try:
from .utils.model_selector import get_default_model
return get_default_model()
except Exception as e:
from .utils.model_selector import ULTIMATE_FALLBACK
logger.warning(f"Failed to get default model from selector: {e}")
return [ULTIMATE_FALLBACK]
def _parse_thinking_suffix(model_str: str) -> tuple[str, str | None]:
"""Strip ``+think[:level]`` suffix from a model string.
Args:
model_str: Model name or tag, optionally ending with ``+think`` or
``+think:low``/``+think:medium``/``+think:high``.
Returns:
(clean_model_str, thinking_level) where *thinking_level* is
``"low"``, ``"medium"``, ``"high"``, or ``None``.
"""
import re
match = re.search(r"\+think(?::(\w+))?$", model_str)
if not match:
return model_str, None
level = match.group(1) or "high"
if level not in ("low", "medium", "high"):
return model_str, None
return model_str[: match.start()], level
def _is_model_tag(model_str: str) -> bool:
"""Check if a string is a model tag vs a model name.
Model tags are quality/capability identifiers like "high", "normal,vision".
Model names are actual model identifiers like "openai/gpt-4o", "gpt-4o-mini".
A ``+think[:level]`` suffix is allowed on any tag string.
Args:
model_str: The string to check
Returns:
True if the string is a tag, False if it's a model name
"""
if not model_str or not model_str.strip():
return False
# Strip +think suffix before checking
clean, _ = _parse_thinking_suffix(model_str)
# Model names typically contain "/" (provider/model format)
if "/" in clean:
return False
# Check if all parts are known tags
try:
from .utils.model_selector import QUALITY_TAGS, CAPABILITY_MAP
parts = [p.strip().lower() for p in clean.split(",")]
all_known_tags = QUALITY_TAGS | set(CAPABILITY_MAP.keys())
return all(part in all_known_tags for part in parts if part)
except ImportError:
return False
def _resolve_model_tag(tag: str) -> list[str]:
"""Resolve a tag string to a model list.
A ``+think[:level]`` suffix is stripped before resolution (it only
affects ``model_params``, not model selection).
Args:
tag: Tag string like "high", "normal,vision", "high+think"
Returns:
List of models as fallback chain
"""
from .utils.model_selector import get_model_selector
clean, _ = _parse_thinking_suffix(tag)
return get_model_selector().resolve_model(clean)
def _normalize_model_spec(
model: str | list[str] | None,
) -> str | list[str] | None:
"""Treat empty string model specs as unspecified."""
if isinstance(model, str) and not model.strip():
return None
return model
def _resolve_model_spec_with_current_provider(
model: str | list[str] | None,
current_model: str | None = None,
) -> str | list[str] | None:
"""Resolve model tags while preferring the current dialog provider.
Example: if the active dialog runs on ``codex/gpt-5.4-mini`` and an
internal helper asks for ``low``, keep the helper on the Codex provider
instead of letting global auto-selection jump to OpenAI.
"""
model = _normalize_model_spec(model)
if not isinstance(model, str) or not _is_model_tag(model):
return model
# Strip +think suffix — it doesn't affect model selection
clean, _ = _parse_thinking_suffix(model)
if current_model and "/" in current_model:
provider = current_model.split("/", 1)[0].strip().lower()
if provider:
from .utils.model_selector import get_model_selector
return get_model_selector().resolve_model_for_provider(clean, provider)
return model
# ===== Execution Context =====
@dataclass
class ExecutionContext:
"""Prepared execution context for agent run."""
conversation_history: list[dict]
context_variables: dict
should_use_memory: bool
memory_instance: "Memory | None"
execution_context_id: str | None = None
input_messages: list[dict] | None = None
@dataclass
class AgentRunContext:
"""Runtime context information for the currently executing Agent.run call."""
agent: "Agent"
memory: "Memory | None"
execution_context_id: str | None = None
process_step_message: Callable | None = None
process_chunk: Callable | None = None
cache_safe_runtime_params: Any | None = None
cache_safe_prompt_messages: list[dict] | None = None
cache_safe_tool_definitions: list[dict] | None = None
context_collapse_manager: Any | None = None
current_model: str | None = None
# Maps a call_agent tool_call_id → the child's execution_context_id so the
# eventual tool_message can be stamped with it. Lets the UI link a
# call_agent response back to the exact sub-agent invocation, even when
# the parent launches multiple parallel calls to the same agent name.
sub_agent_exec_ids: dict[str, str] = field(default_factory=dict)
_RUN_CONTEXT: ContextVar[AgentRunContext | None] = ContextVar(
"agent_run_context", default=None
)
def get_current_run_context() -> AgentRunContext | None:
"""Get the runtime context for the current Agent.run invocation."""
return _RUN_CONTEXT.get()
def get_current_run_model() -> str | None:
"""Get the currently executing model for the active agent run, if any."""
run_context = get_current_run_context()
if run_context is None:
return None
if run_context.current_model:
return run_context.current_model
cache_params = getattr(run_context, "cache_safe_runtime_params", None)
return getattr(cache_params, "model", None)
# ===== Tool Provider Base Class =====
class ToolInfo(BaseModel):
"""Information about a tool"""
name: str
description: str
inputSchema: Optional[dict] = None # JSON Schema format (OpenAI function schema)
class ToolProvider(ABC):
"""Abstract base class for tool providers (MCP, ToolSet, etc.)
Tool providers abstract away the source of tools, allowing agents to
work uniformly with tools from different sources.
"""
@abstractmethod
async def list_tools(self) -> list[ToolInfo]:
"""List all available tools from this provider
Returns:
List of ToolInfo objects describing available tools
"""
pass
@abstractmethod
async def call_tool(self, name: str, args: dict) -> Any:
"""Call a tool by name with the given arguments
Args:
name: Name of the tool to call
args: Arguments to pass to the tool
Returns:
Result from the tool call
"""
pass
async def initialize(self):
"""Optional: Initialize the provider (e.g., connect to remote server)
Called once when the provider is added to an agent.
"""
pass
async def shutdown(self):
"""Optional: Clean up provider resources
Called when the agent is being shut down.
"""
pass
_CTX_VARS_NAME = "context_variables"
_SKIP_PARAMS = [_CTX_VARS_NAME]
_CLIENT_ID_NAME = "client_id"
class AgentService:
def __init__(self, agent: "Agent", **kwargs):
self.agent = agent
from .remote import RemoteBackendFactory, RemoteWorker
self.backend = RemoteBackendFactory.create_backend()
self.worker: RemoteWorker = self.backend.create_worker(**kwargs)
self.setup_worker()
async def response(self, msg, **kwargs):
resp = await self.agent.run(msg, **kwargs)
return resp
async def get_info(self):
return {
"name": self.agent.name,
"instructions": self.agent.instructions,
"models": self.agent.models,
"tool_names": list(
self.agent.functions.keys()
), # Simplified: return tool names
}
async def get_message_queue(self):
return await self.agent.events_queue.get()
async def check_message_queue(self):
# check if there is a message in the queue
return not self.agent.events_queue.empty()
async def add_tool(self, func: Callable):
self.agent.tool(func)
return {"success": True}
def setup_worker(self):
"""Register methods with the worker"""
self.worker.register(self.response)
self.worker.register(self.get_info)
self.worker.register(self.get_message_queue)
self.worker.register(self.check_message_queue)
self.worker.register(self.add_tool)
async def run(self, log_level: str = "INFO"):
from loguru import logger
logger.remove()
logger.add(sys.stderr, level=log_level)
return await self.worker.run()
class RemoteAgent:
def __init__(
self,
service_id_or_name: str,
backend_config: Optional["RemoteConfig"] = None,
**remote_kwargs,
):
self.service_id_or_name = service_id_or_name
from .remote import RemoteBackendFactory
self.backend = RemoteBackendFactory.create_backend(backend_config)
self.remote_kwargs = remote_kwargs
self.name = None
self.instructions = None
self.model = None
self.models = [] # Initialize empty list to avoid AttributeError
self.events_queue = RemoteAgentMessageQueue(self)
async def _connect(self):
return await self.backend.connect(self.service_id_or_name, **self.remote_kwargs)
async def fetch_info(self):
service = await self._connect()
info = await service.invoke("get_info")
self.name = info["name"]
self.instructions = info["instructions"]
self.models = info["models"]
self.tool_names = info.get("tool_names", []) # New format
# Backward compatibility
if "functions_names" in info:
self.tool_names = info["functions_names"]
await service.close()
return info
async def run(self, msg: "AgentInput", **kwargs):
await self.fetch_info()
service = await self._connect()
try:
return await service.invoke("response", {"msg": msg, **kwargs})
finally:
await service.close()
async def tool(self, func: Callable):
service = await self._connect()
try:
func_arg = {"func": func}
await service.invoke("add_tool", func_arg)
finally:
await service.close()
async def chat(self, message: str | dict | None = None):
"""Chat with the agent with a REPL interface."""
await self.fetch_info()
from .repl.core import Repl
repl = Repl(self)
await repl.run(message)
class RemoteAgentMessageQueue:
def __init__(self, agent: "RemoteAgent"):
self.agent = agent
async def get(self, interval: float = 0.2):
service = await self.agent._connect()
try:
while True:
res = await service.invoke("check_message_queue")
if res:
return await service.invoke("get_message_queue")
await asyncio.sleep(interval)
finally:
await service.close()
class ResponseDetails(BaseModel):
"""
The ResponseDetails class is used to store the details of the agent response.
"""
messages: list[dict]
context_variables: dict
execution_context_id: str | None = None # NEW: Context ID for message filtering
[docs]
class AgentResponse(BaseModel):
"""
The AgentResponse class is used to store the agent response.
"""
agent_name: str
content: Any
details: ResponseDetails | None
interrupt: bool = False
[docs]
class AgentTransfer(BaseModel):
"""
The AgentTransfer class is used to transfer the agent response to another agent.
"""
from_agent: str
to_agent: str
history: list[dict]
context_variables: dict
init_message_length: int
tool_call_id: str | None = None
AgentInput = Union[
str,
BaseModel,
AgentResponse,
list[str | BaseModel | dict],
AgentTransfer,
"VisionInput",
]
class StopRunning(Exception):
"""Raised to interrupt a running agent.
Optionally carries a partial assistant message (dict) that was being
streamed when the stop was requested, so callers can persist it.
"""
def __init__(self, partial_message: dict | None = None):
super().__init__()
self.partial_message = partial_message
def _is_retryable_error(error: Exception) -> bool:
"""Determine if an LLM API error is transient and worth retrying."""
from pantheon.utils.adapters.base import (
ServiceUnavailableError,
InternalServerError,
RateLimitError,
APIConnectionError,
)
if isinstance(error, (ServiceUnavailableError, InternalServerError,
RateLimitError, APIConnectionError)):
return True
# Fallback: string matching for common transient error indicators
error_str = str(error).lower()
return any(kw in error_str for kw in (
"overloaded", "rate", "capacity", "temporarily", "503", "502", "429",
))
def _get_message_text(message: dict) -> str | None:
"""Extract text content from a message (handles string or multimodal list)."""
# Use _llm_content if available (already modified), otherwise use content
user_input = message.get("_llm_content")
if user_input is None:
user_input = message.get("content")
if not user_input:
return None
if isinstance(user_input, str):
return user_input
if isinstance(user_input, list):
# Extract text from content array
text_parts = [
part.get("text", "")
for part in user_input
if isinstance(part, dict) and part.get("type") == "text"
]
return " ".join(text_parts)
return None
def _apply_injections(message: dict, injections: list[dict]) -> None:
"""Apply collected injections to the message."""
if not injections:
return
# Initialize _llm_content if needed
if "_llm_content" not in message or message["_llm_content"] is None:
message["_llm_content"] = message.get("content")
# Append all injections
for injection in injections:
content_str = injection["content"]
source = injection["source"]
if isinstance(message["_llm_content"], str):
message["_llm_content"] += f"\n\n{content_str}"
elif isinstance(message["_llm_content"], list):
# For content arrays, append as text part
message["_llm_content"].append({
"type": "text",
"text": f"\n\n{content_str}"
})
logger.debug(
f"Injected {len(content_str)} chars from {source}"
)
[docs]
class Agent:
"""
The Agent class is the core component of Pantheon,
providing a flexible interface for creating AI-powered agents with tools,
memory, and collaboration capabilities.
Args:
name: The name of the agent.
instructions: The instructions for the agent.
The instructions are the system instructions that the agent will follow.
All prompt composition (work strategy, delegation, skills, etc.) should be
done at template parsing time, not at runtime.
model: The model to use for the agent.
Can be a single model or list of fallback models.
model_params: The additional parameters for the model(LLM).
icon: The icon to use for the agent.
tools: The tools to use for the agent.
response_format: The response format to use for the agent.
It can be a Pydantic model or a function that returns a Pydantic model.
use_memory: Whether to use memory for the agent. (default: True)
memory: The memory to use for the agent.
If not provided, a new memory will be created.
tool_timeout: The timeout for the tool. (default: from settings.endpoint.local_toolset_timeout, or 3600s)
relaxed_schema: Use relaxed (non-strict) tool schema mode. (default: False)
max_tool_content_length: The maximum length of the tool content. (default: 100000)
description: The description of the agent. (default: None)
"""
[docs]
def __init__(
self,
name: str,
instructions: str,
model: str | list[str] | None = None,
model_params: dict | None = None,
icon: str = "🤖",
tools: list[Callable] | None = None,
response_format: Any | None = None,
use_memory: bool = True,
memory: "Memory | None" = None,
tool_timeout: int | None = None,
relaxed_schema: bool = False,
max_tool_content_length: int | None = None,
description: str | None = None,
):
# Parse +think suffix before any processing
thinking_level: str | None = None
if isinstance(model, str):
model, thinking_level = _parse_thinking_suffix(model)
model = _normalize_model_spec(model)
self.id = uuid4()
self.name = name
self.instructions = instructions
self.description = description
self._model_was_explicit = model is not None
# Smart model selection: use ModelSelector when no model specified
if model is None:
# Get default model fallback chain from ModelSelector
self.models = _get_default_model()
elif isinstance(model, str):
# Check if it's a tag string (e.g., "high", "normal,vision")
if _is_model_tag(model):
# Resolve tag to model fallback chain
self.models = _resolve_model_tag(model)
else:
# Regular model name - wrap in list
self.models = [model]
else:
# Already a list of models (fallback chain)
self.models = list(model)
self.model_params = model_params or {}
if thinking_level:
self.model_params.setdefault("thinking", thinking_level)
# Tool storage (simplified - unified handling)
self._base_functions: dict[
str, Callable
] = {} # All tools (local + remote wrappers)
if tools:
for func in tools:
self.tool(func)
self.response_format = response_format
self.use_memory = use_memory
if memory is None:
from .internal.memory import Memory
memory = Memory(str(uuid4()))
self.memory = memory
# Store user-specified overrides (if provided, these take priority)
self._tool_timeout_override = tool_timeout
self._max_tool_content_length_override = max_tool_content_length
self.events_queue: asyncio.Queue = asyncio.Queue()
# Input queue for run_loop() — messages/notifications enter here
self.input_queue: asyncio.Queue = asyncio.Queue()
self._loop_running: bool = False
self.relaxed_schema = relaxed_schema
self.icon = icon
# Provider management (MCP, ToolSet, etc.)
self.providers: dict[str, ToolProvider] = {} # name -> ToolProvider instance
self.not_loaded_toolsets: list[str] = [] # Track which toolsets failed to load
# Context injectors for dynamic content injection
self.context_injectors: list = [] # List of ContextInjector instances
# Background task support
from .background import BackgroundTaskManager
self._bg_manager = BackgroundTaskManager()
self._tool_output_buffers: dict[str, list[str]] = {}
self._register_bg_tools()
# Callable hooks registered by plugins via get_toolsets() (set during PantheonTeam.async_setup)
# Each hook is a plain async callable — agent has no knowledge of plugins.
# Signature: async (history: list[dict], context_variables: dict) -> list[dict]
self._ephemeral_hooks: list = []
# Signature: async (tool_calls: list[dict], tool_messages: list[dict], context_variables: dict) -> None
self._tool_tracking_hooks: list = []
def _register_bg_tools(self) -> None:
"""Register background task management tools."""
bg_manager = self._bg_manager
async def background_task(
action: str = "list",
task_id: str = "",
timeout: int = 10,
) -> dict:
"""Manage and monitor background tasks.
Args:
action: Operation to perform:
- "list": List all background tasks (default when no task_id)
- "status": Get status, progress and result of a specific task (requires task_id)
- "wait": Wait for a task to complete, up to timeout seconds (requires task_id).
Use this instead of polling with repeated status calls.
- "cancel": Cancel a running task (requires task_id)
- "remove": Remove a task from the list, cancels if still running (requires task_id)
task_id: ID of the task (e.g. 'bg_1'). Required for status/wait/cancel/remove.
timeout: Max seconds to wait for the "wait" action (default 10). If the task
hasn't completed by then, returns current status so you can decide
whether to wait again or do something else.
Returns:
dict with task details or task list
Examples:
background_task() # List all tasks
background_task(action="status", task_id="bg_1") # Check once
background_task(action="wait", task_id="bg_1") # Wait up to 10s
background_task(action="wait", task_id="bg_1", timeout=30) # Wait up to 30s
background_task(action="cancel", task_id="bg_1") # Cancel task
background_task(action="remove", task_id="bg_1") # Remove task
"""
if action == "list":
return {
"tasks": [
bg_manager.to_summary(t) for t in bg_manager.list_tasks()
]
}
if action == "status":
if not task_id:
return {"error": "task_id is required for status action"}
task = bg_manager.get(task_id)
if task is None:
return {"error": f"Task '{task_id}' not found."}
return bg_manager.to_summary(task)
elif action == "wait":
if not task_id:
return {"error": "task_id is required for wait action"}
task = bg_manager.get(task_id)
if task is None:
return {"error": f"Task '{task_id}' not found."}
# Already finished
if task.status != "running":
return bg_manager.to_summary(task)
# Wait with polling
timeout = max(1, min(timeout, 300)) # clamp 1-300s
poll_interval = 1.0
elapsed = 0.0
while elapsed < timeout:
await asyncio.sleep(poll_interval)
elapsed += poll_interval
if task.status != "running":
return bg_manager.to_summary(task)
# Timed out — return current status
summary = bg_manager.to_summary(task)
summary["timed_out"] = True
summary["waited_seconds"] = round(elapsed, 1)
return summary
elif action == "cancel":
if not task_id:
return {"error": "task_id is required for cancel action"}
if bg_manager.cancel(task_id):
return {"task_id": task_id, "status": "cancelling"}
task = bg_manager.get(task_id)
if task is None:
return {"error": f"Task '{task_id}' not found."}
return {"error": f"Task '{task_id}' is already {task.status}."}
elif action == "remove":
if not task_id:
return {"error": "task_id is required for remove action"}
if bg_manager.remove(task_id):
return {"task_id": task_id, "status": "removed"}
return {"error": f"Task '{task_id}' not found."}
else:
return {
"error": f"Unknown action '{action}'. Must be one of: list, status, wait, cancel, remove"
}
self._base_functions["background_task"] = background_task
def _get_tool_timeout(self) -> int:
"""Get tool timeout with priority: user override > settings."""
if self._tool_timeout_override is not None:
return self._tool_timeout_override
try:
from .settings import get_settings
return get_settings().tool_timeout
except Exception:
return 3600
def _get_max_tool_content_length(self) -> int:
"""Get max tool content length with priority: user override > settings."""
if self._max_tool_content_length_override is not None:
return self._max_tool_content_length_override
try:
from .settings import get_settings
return get_settings().max_tool_content_length
except Exception:
return 10000
@property
def tool_timeout(self) -> int:
"""Public API property for backward compatibility and dynamic access."""
return self._get_tool_timeout()
@property
def max_tool_content_length(self) -> int:
"""Public API property for backward compatibility and dynamic access."""
return self._get_max_tool_content_length()
@staticmethod
def _filter_messages_by_execution_context(
messages: list[dict], execution_context_id: str | None
) -> list[dict]:
if not messages:
return []
if execution_context_id is None:
return [msg for msg in messages if msg.get("execution_context_id") is None]
return [
msg
for msg in messages
if msg.get("execution_context_id") == execution_context_id
]
@staticmethod
def _sanitize_messages(messages: list[dict]) -> list[dict]:
"""Sanitize messages before sending to the LLM.
1. Drop messages missing a role.
2. Canonically repair tool-call / tool-result pairing.
"""
if not messages:
return []
# Pass 1: drop messages without a role
with_role: list[dict] = []
for msg in messages:
role = msg.get("role")
if role is None or (isinstance(role, str) and not role.strip()):
logger.warning("Dropping message without role: {}", msg)
continue
with_role.append(msg)
cleaned, stats = ensure_tool_result_pairing_with_stats(with_role)
if stats.dropped_orphan_tool_messages:
logger.warning(
"Dropped {} orphaned tool message(s) without matching tool_calls",
stats.dropped_orphan_tool_messages,
)
if stats.dropped_duplicate_tool_calls:
logger.warning(
"Dropped {} duplicate assistant tool_call(s)",
stats.dropped_duplicate_tool_calls,
)
if stats.dropped_duplicate_tool_messages:
logger.warning(
"Dropped {} duplicate tool response message(s)",
stats.dropped_duplicate_tool_messages,
)
if stats.inserted_placeholder_tool_messages:
logger.warning(
"Inserted {} placeholder tool response message(s) for missing tool outputs",
stats.inserted_placeholder_tool_messages,
)
if stats.dropped_empty_assistant_messages:
logger.warning(
"Dropped {} empty assistant message(s) after tool_call cleanup",
stats.dropped_empty_assistant_messages,
)
return cleaned
@property
def functions(self) -> dict[str, Callable]:
"""Get current tools (returns a copy for safety)."""
return self._base_functions
# ===== New Provider-Based Tool Management =====
[docs]
async def mcp(self, name: str, provider: "ToolProvider") -> "Agent":
"""Add a MCP provider to the agent (one at a time)
Dynamic routing approach: Provider tools are retrieved on-demand via get_tools_for_llm()
and called via call_tool() with prefix routing. No pre-wrapping.
Args:
name: Name/identifier for this provider (e.g., 'biomcp')
provider: ToolProvider instance (MCPProvider recommended, already initialized)
Returns:
The agent instance
"""
# Store the provider for dynamic routing
self.providers[name] = provider
logger.debug(
f"Agent '{self.name}': Added MCP provider '{name}' (dynamic routing)"
)
return self
def _should_inject_context_variables(self, prefixed_name: str) -> bool:
"""Determine if context_variables should be injected for a tool.
Returns True for:
1. ToolSet base functions (has _is_tool attribute)
2. ToolSetProvider calls
3. Functions that explicitly declare context_variables parameter
Args:
prefixed_name: Tool name (possibly with prefix)
Returns:
bool: Whether to inject context_variables
"""
# Check 1: Is it a ToolSet base function with _is_tool attribute?
if prefixed_name in self._base_functions:
func = self._base_functions[prefixed_name]
if hasattr(func, "_is_tool"):
return True
else:
# Check 3: Does the function explicitly declare context_variables parameter?
try:
sig = inspect.signature(func)
if "context_variables" in sig.parameters:
return True
except (ValueError, TypeError):
pass
return False
# Check 2: Is it a ToolSetProvider call (prefix routing)?
if "__" in prefixed_name:
provider_name = prefixed_name.split("__", 1)[0]
if provider_name in self.providers:
provider = self.providers[provider_name]
# Check if it's a ToolSetProvider
from .providers import ToolSetProvider, LocalProvider
if isinstance(provider, (ToolSetProvider, LocalProvider)):
return True
return False
def _prepare_context_variables(
self,
prefixed_name: str,
args: dict,
context_variables: dict | None,
tool_call_id: str | None,
) -> dict:
"""Inject context_variables into args if needed."""
should_inject = self._should_inject_context_variables(prefixed_name)
if not should_inject or context_variables is None:
return args
# Create _call_agent callback
async def _call_agent_wrap(
messages: list,
system_prompt: str | None = None,
model: str | None = None,
use_memory: bool = False,
) -> dict:
memory = self.memory[:-1] if use_memory else None
preferred_model = get_current_run_model() or (self.models[0] if self.models else None)
return await _call_agent(
messages=messages,
system_prompt=system_prompt,
model=_resolve_model_spec_with_current_provider(
model,
current_model=preferred_model,
),
memory=memory,
)
inherited_model = get_current_run_model()
caller_models = list(self.models)
if inherited_model:
caller_models = [inherited_model, *[
candidate for candidate in caller_models
if candidate != inherited_model
]]
# Build complete context_variables with execution metadata
full_context = context_variables.copy()
full_context["agent_name"] = self.name
if tool_call_id is not None:
full_context["tool_call_id"] = tool_call_id
full_context["_call_agent"] = _call_agent_wrap
full_context["caller_models"] = caller_models # For scfm_router LLM calls
# Pre-inject output buffer for background task adoption on timeout
if tool_call_id is not None:
output_buffer: list[str] = []
self._tool_output_buffers[tool_call_id] = output_buffer
full_context["_report_output"] = lambda line, buf=output_buffer: buf.append(str(line))
# Merge with existing context_variables in args
if _CTX_VARS_NAME in args:
existing = args[_CTX_VARS_NAME]
if isinstance(existing, dict):
merged = dict(existing)
merged.update(full_context)
args[_CTX_VARS_NAME] = merged
else:
args[_CTX_VARS_NAME] = full_context
else:
args[_CTX_VARS_NAME] = full_context
return args
# ===== Legacy MCP method (deprecated, kept for backward compatibility) =====
def _convert_functions(
self, relaxed_schema: bool, allow_transfer: bool
) -> list[dict]:
"""Convert function to the format that the model can understand."""
functions = []
for func in self.functions.values():
desc = parse_func(func, update_by_docstring=False)
assert isinstance(desc.name, str), "Function name must be a string"
# Filter transfer functions if not allowed
if not allow_transfer:
if desc.name.startswith("transfer_to_") or desc.name.startswith(
"call_agent_"
):
# NOTE: transfer function should start with `transfer_to_`
continue
skip_params = list(_SKIP_PARAMS)
func_dict = desc_to_openai_dict(
desc,
skip_params=skip_params,
relaxed_schema=relaxed_schema,
)
functions.append(func_dict)
return functions
async def _handle_tool_calls(
self,
tool_calls: list,
context_variables: dict,
timeout: float,
time_delta: float = 0.5,
check_stop: Callable | None = None,
) -> list[dict]:
tool_messages = []
async def _run_single_tool_call(call: dict) -> dict:
func_name = call["function"]["name"]
tool_call_id = call["id"]
start_time = time.time()
# Try to parse arguments
try:
args_str = call["function"]["arguments"]
if not args_str.endswith("}"):
args_str = args_str + "}"
params = json.loads(call["function"]["arguments"]) or {}
parse_error = None
except Exception as e:
logger.warning(f"Failed to parse arguments for tool '{func_name}': {e}")
params = {}
parse_error = e
allow_timeout = func_name != "call_agent"
# Pop _background flag before passing params to tool
run_in_bg = params.pop("_background", False)
if func_name == "call_agent" and run_in_bg:
logger.warning(
"Ignoring _background=True for call_agent; delegation must remain synchronous"
)
run_in_bg = False
from .background import _bg_output_buffer
# Handle parse error or execute tool
if parse_error:
# Treat as execution failure
truncated = call["function"]["arguments"][:200]
if len(call["function"]["arguments"]) > 200:
truncated += "..."
result = (
f"Error: Failed to parse tool arguments.\n"
f"JSON error: {parse_error}\n"
f"Raw arguments: {truncated}"
)
elif run_in_bg:
# Explicit background execution via _background=True
bg_tool_call_id = f"bg_call_{uuid4()}"
_stdout_buffer: list[str] = []
_token = _bg_output_buffer.set(_stdout_buffer)
try:
# Snapshot args before call_tool injects context_variables
# (which contains non-serializable functions like _call_agent)
bg_args = dict(params)
coro = self.call_tool(
func_name, params, context_variables,
tool_call_id=bg_tool_call_id,
)
bg_task = self._bg_manager.start(
tool_name=func_name,
tool_call_id=bg_tool_call_id,
args=bg_args,
coro=coro,
source="explicit",
)
finally:
_bg_output_buffer.reset(_token)
result = {
"task_id": bg_task.task_id,
"status": "running",
"tool_name": func_name,
"message": (
f"Tool launched in background. Use "
f"background_task(action='status', task_id='{bg_task.task_id}') "
f"to check progress."
),
}
else:
# Normal execution with timeout adoption
_stdout_buffer: list[str] = []
_token = _bg_output_buffer.set(_stdout_buffer)
call_task = asyncio.create_task(
self.call_tool(
func_name, params, context_variables, tool_call_id=tool_call_id
)
)
adopted_to_bg = False
try:
result: Any
while True:
done, _ = await asyncio.wait(
{call_task},
timeout=time_delta,
return_when=asyncio.FIRST_COMPLETED,
)
if call_task in done:
result = call_task.result()
break
logger.debug("Check stop when tool calling")
elapsed = time.time() - start_time
if allow_timeout and timeout is not None and elapsed >= timeout:
# Adopt into background instead of cancelling.
# Merge _report_output items INTO _stdout_buffer so
# post-adoption prints keep accumulating in the same list.
report_buf = self._tool_output_buffers.pop(tool_call_id, [])
_stdout_buffer.extend(report_buf)
bg_task = self._bg_manager.adopt(
tool_name=func_name,
tool_call_id=tool_call_id,
args=params,
existing_task=call_task,
output_buffer=_stdout_buffer, # same list object
)
adopted_to_bg = True
result = (
f"Tool '{func_name}' exceeded timeout ({timeout}s) and was moved to "
f"background execution. task_id='{bg_task.task_id}'. "
f"Use background_task(action='status', task_id='{bg_task.task_id}') to check progress and results."
)
break
if check_stop is not None and check_stop(elapsed):
call_task.cancel()
raise StopRunning()
if not adopted_to_bg:
self._tool_output_buffers.pop(tool_call_id, None)
except StopRunning:
self._tool_output_buffers.pop(tool_call_id, None)
raise
except SystemExit as e:
if not call_task.done():
call_task.cancel()
result = f"SystemExit: {e}"
self._tool_output_buffers.pop(tool_call_id, None)
except Exception as e:
if not call_task.done():
call_task.cancel()
result = repr(e)
self._tool_output_buffers.pop(tool_call_id, None)
finally:
_bg_output_buffer.reset(_token)
# Critical Fix: Ensure child task is cancelled if WE are cancelled
# But skip if it was adopted to background
if not call_task.done() and not self._bg_manager._is_adopted(call_task):
logger.warning(f"Cancelling orphaned tool task for {func_name}")
call_task.cancel()
end_timestamp = time.time()
execution_duration = end_timestamp - start_time
# P1: Move timestamp fields to _metadata (except top-level timestamp for compatibility)
tool_message = {
"role": "tool",
"tool_name": func_name,
"name": func_name,
"id": str(uuid4()),
"tool_call_id": tool_call_id,
"timestamp": end_timestamp, # Keep for backward compatibility
"_metadata": {
"start_timestamp": start_time,
"end_timestamp": end_timestamp,
"execution_duration": execution_duration,
},
}
# Stamp the child's execution_context_id if this was a call_agent
# invocation. The PantheonTeam.call_agent closure records the
# (tool_call_id → child_exec_id) mapping on run_context when it
# spawns the sub-agent.
_run_ctx = _RUN_CONTEXT.get()
if _run_ctx is not None and tool_call_id:
_child_exec_id = _run_ctx.sub_agent_exec_ids.pop(tool_call_id, None)
if _child_exec_id:
tool_message["execution_context_id"] = _child_exec_id
if isinstance(result, (Agent, RemoteAgent)):
tool_message.update(
{
"transfer": True,
"content": result.name,
}
)
else:
# Extract and merge nested metadata from tool result if available
# Tools that use _call_agent internally will return _metadata with cost info
if isinstance(result, dict) and "_metadata" in result:
tool_metadata = result.pop("_metadata", {})
# Merge instead of overwrite to preserve all metadata fields
tool_message["_metadata"].update(tool_metadata)
# Native multimodal content opt-in (content_blocks):
# When a tool returns `result["content_blocks"]` as a list of
# OpenAI-style content blocks containing at least one image,
# the framework merges structured JSON data + the blocks into
# a single multimodal tool_result. This lets the adapter layer
# carry images natively to providers that support it.
#
# Design:
# - Tool keeps its normal dict fields (cell_id, success, ...).
# - Tool sets `content_blocks` ONLY when it wants native
# images AND has verified provider capability.
# - Framework strips `content_blocks` from the dict, turns
# the remainder into a text summary via the usual
# process_tool_result path (preserving hidden_to_model
# filtering), and prepends that text to the blocks.
native_blocks = None
if isinstance(result, dict):
maybe_blocks = result.get("content_blocks")
if isinstance(maybe_blocks, list) and any(
isinstance(b, dict) and b.get("type") == "image_url"
for b in maybe_blocks
):
native_blocks = maybe_blocks
if native_blocks is not None:
# Peel content_blocks out so the JSON summary doesn't
# also contain the image payload in text form.
structured_result = {
k: v for k, v in result.items() if k != "content_blocks"
}
text_summary = process_tool_result(
structured_result,
max_length=self.max_tool_content_length,
tool_name=func_name,
)
merged_blocks: list[dict] = []
if text_summary:
merged_blocks.append({
"type": "text",
"text": (
text_summary
if isinstance(text_summary, str)
else str(text_summary)
),
})
merged_blocks.extend(native_blocks)
# Run the merged content through ImageStore so base64
# data URIs are persisted to disk and replaced with
# file:// references (same memory-efficiency trick used
# for user input images).
try:
from .utils.vision import get_image_store
image_store = get_image_store()
chat_id = self.memory.id if self.memory else "default"
tmp = {"content": merged_blocks}
image_store.process_message_images(tmp, chat_id)
merged_blocks = tmp["content"]
except Exception as e:
logger.debug(f"ImageStore processing failed: {e}")
tool_message.update({
"raw_content": result,
"content": merged_blocks,
})
else:
# Process and truncate tool result in one step
content = process_tool_result(
result,
max_length=self.max_tool_content_length,
tool_name=func_name,
)
tool_message.update({
"raw_content": result, # Now without _metadata
"content": content,
})
return tool_message
tasks = [
asyncio.create_task(_run_single_tool_call(call)) for call in tool_calls
]
try:
tool_messages = await asyncio.gather(*tasks)
except Exception:
for task in tasks:
if not task.done():
task.cancel()
raise
return tool_messages
async def _acompletion(
self,
messages: list[dict],
model: str,
tool_use: bool = True,
response_format: Any | None = None,
process_chunk: Callable | None = None,
allow_transfer: bool = True,
context_variables: dict | None = None,
) -> dict:
"""Get LLM completion for messages (simplified and optimized).
This method orchestrates the LLM completion workflow:
1. Process messages for the model
2. Detect provider and load configuration
3. Convert functions to tools
4. Create enhanced chunk processor
5. Call LLM provider
6. Add metadata to response
Args:
messages: Chat message history
model: Model identifier (e.g., "gpt-4", "zhipu/glm-4")
tool_use: Whether to include tools
response_format: Response format specification
process_chunk: Optional callback for streaming chunks
allow_transfer: Whether to allow agent transfers
Returns:
Message dictionary with id, timestamps, and generation_duration
"""
# Initialize timing tracker
tracker = TimingTracker()
tracker.start("total")
request_start_time = time.time()
logger.info(f"🚀 [Agent:{self.name}] Starting LLM request for model: {model}")
# Step 1: Process messages for the model
async with tracker.measure("message_processing"):
from pantheon.utils.token_optimization import (
_estimate_message_tokens,
build_llm_view_async,
inject_cache_control_markers,
supports_explicit_cache_control,
)
run_context = get_current_run_context()
if run_context is not None:
run_context.current_model = model
optimization_memory = run_context.memory if run_context else None
is_main_thread = (
run_context.execution_context_id is None if run_context else True
)
messages = await build_llm_view_async(
messages,
memory=optimization_memory,
is_main_thread=is_main_thread,
autocompact_model=model,
context_window_model=model,
)
logger.info(
"[resume] prompt_view agent={} model={} messages={} est_tokens={}",
self.name,
model,
len(messages),
sum(_estimate_message_tokens(message) for message in messages),
)
messages = process_messages_for_model(messages, model)
# Token optimization can drop earlier assistant tool-call messages
# while leaving later tool results. Re-sanitize right before the
# provider call so Responses API inputs never contain orphaned
# function_call_output items.
messages = self._sanitize_messages(messages)
# Inject prompt-cache markers for providers that support
# explicit cache_control (Anthropic, Qwen).
# OpenAI/DeepSeek/Gemini use automatic prefix caching —
# stabilize_tool_definitions() ensures stable prefixes for them.
if supports_explicit_cache_control(model):
messages = inject_cache_control_markers(messages)
if run_context is not None:
# Selective copy: shallow for messages with string content,
# deepcopy only for messages with list content (Anthropic blocks
# from inject_cache_control_markers) to avoid mutation issues.
cached = []
for m in messages:
if isinstance(m.get("content"), list):
cached.append(copy.deepcopy(m))
else:
cached.append({**m})
run_context.cache_safe_prompt_messages = cached
# Step 2: Detect provider and get configuration
provider_config = detect_provider(model, self.relaxed_schema)
# Step 3: Seed OpenAI-routed config from settings when available.
# Native provider-specific Base URL / API key resolution is finalized
# inside pantheon.utils.llm.acompletion() using the provider registry.
if provider_config.provider_type == ProviderType.OPENAI:
effective_base, effective_key = get_openai_effective_config()
if not provider_config.base_url and effective_base:
provider_config.base_url = effective_base
if not provider_config.api_key and effective_key:
provider_config.api_key = effective_key
# Step 4: Get unified tools (base functions + provider tools)
tools = None
if tool_use:
async with tracker.measure("tools_conversion"):
# Use get_tools_for_llm() for unified tool access
# This includes both base_functions and provider tools
tools = await self.get_tools_for_llm() or None
if run_context is not None and tools is not None:
run_context.cache_safe_tool_definitions = copy.deepcopy(tools)
# For non-OpenAI providers or OpenAI-compatible providers, adjust tool format
# OpenAI-compatible providers (e.g. minimax) have api_key set in config
is_compat_provider = provider_config.api_key is not None
if (provider_config.provider_type.value != "openai" or is_compat_provider) and tools:
for tool in tools:
if "function" in tool:
func = tool["function"]
# Remove strict mode (not supported by non-OpenAI APIs)
func.pop("strict", None)
# Ensure description is non-empty
if not func.get("description"):
func["description"] = func.get("name", "tool")
# Ensure parameters is present (required by some providers e.g. MiniMax)
if "parameters" not in func:
func["parameters"] = {
"type": "object",
"properties": {},
"required": [],
}
else:
# Remove unsupported additionalProperties
func["parameters"].pop(
"additionalProperties", None
)
# Step 5: Create message ID and enhanced chunk processor
message_id = str(uuid4())
enhanced_process_chunk = create_enhanced_process_chunk(
process_chunk, message_id, agent_name=self.name
)
# Step 6: Send begin chunk
if enhanced_process_chunk:
async with tracker.measure("begin_chunk"):
await enhanced_process_chunk({"begin": True})
# Step 7: Merge model_params from context_variables if provided
model_params = self.model_params
if context_variables and "model_params" in context_variables:
# Runtime overrides instance defaults
model_params = {**self.model_params, **context_variables["model_params"]}
if run_context is not None:
from pantheon.utils.token_optimization import build_cache_safe_runtime_params
run_context.cache_safe_runtime_params = build_cache_safe_runtime_params(
model=model,
model_params=model_params,
response_format=response_format,
)
# Step 8: Call LLM provider (unified interface)
# logger.info(f"Raw messages: {messages}")
async with tracker.measure("llm_api"):
message = await call_llm_provider(
config=provider_config,
messages=messages,
tools=tools,
response_format=response_format,
process_chunk=enhanced_process_chunk,
model_params=model_params,
)
if message is None:
message = {"role": "assistant", "content": "Error: Empty response from model."}
# Step 8: Add metadata to message
end_timestamp = time.time()
total_time = tracker.end("total")
message["id"] = message_id
message["timestamp"] = end_timestamp # Keep backward compatibility
message.setdefault("_metadata", {}).update({
"start_timestamp": request_start_time,
"end_timestamp": end_timestamp,
})
# Step 9: Collect stats and log timing
# ✅ Use lightweight collection for Write mode (O(1))
from pantheon.utils.llm import collect_message_stats_lightweight
collect_message_stats_lightweight(
message=message,
messages=messages,
model=model,
)
# ✅ Simplified logging using only required metadata fields
timings = tracker.get_all()
meta = message.get("_metadata", {})
# Extract required fields
total_tokens = meta.get("total_tokens", 0)
max_tokens = meta.get("max_tokens", 200000)
current_cost = meta.get("current_cost", 0)
# Calculate usage percentage
usage_pct = (total_tokens / max_tokens * 100) if max_tokens > 0 else 0
# Determine if this is a free OAuth channel (shadow cost)
_oauth_prefixes = ("codex/", "gemini-cli/")
_is_shadow = isinstance(model, str) and any(model.startswith(p) for p in _oauth_prefixes)
_cost_label = f"~${current_cost:.4f} (shadow)" if _is_shadow else f"${current_cost:.4f}"
# Format log message
timing_log = (
f"📊 [Agent:{self.name}] "
f"⏳ Timing: Total: {total_time:.3f}s | "
f"Message: {timings.get('message_processing', 0):.3f}s | "
f"Begin: {timings.get('begin_chunk', 0):.3f}s | "
f"LLM: {timings['llm_api']:.3f}s | "
f"Tool: {timings.get('tools_conversion', 0):.3f}s for {len(tools or [])} tools "
f"💬 Tokens: {total_tokens:,} | "
f"Usage: {usage_pct:.1f}% | "
f"Cost: {_cost_label}"
)
# Add warning if usage is high
if usage_pct >= 95:
timing_log += "\n⚠️ WARNING: Context usage ≥95%"
elif usage_pct >= 90:
timing_log += "\n⚠️ Warning: Context usage ≥90%"
logger.info(timing_log)
return message
async def _acompletion_with_models(
self,
history,
tool_use,
response_format,
process_chunk,
allow_transfer,
model: str | list[str] | None = None,
context_variables: dict | None = None,
):
"""Try multiple models with fallback and exponential backoff retry.
For each model, transient errors (overloaded, rate-limit, 5xx) are
retried with exponential backoff. Non-transient errors skip directly
to the next model. The adapter's ``num_retries`` still handles initial
connection-level retries; this layer covers mid-stream failures that
the adapter cannot retry on its own.
"""
# --- Read retry settings (with sensible defaults) ---
from .settings import get_settings
retry_cfg = get_settings().get("llm_retry", {})
if not isinstance(retry_cfg, dict):
retry_cfg = {}
max_retries: int = int(retry_cfg.get("max_retries", 3))
base_delay: float = float(retry_cfg.get("base_delay", 1.0))
max_delay: float = float(retry_cfg.get("max_delay", 30.0))
jitter: float = float(retry_cfg.get("jitter", 0.5))
# --- Prepare model list ---
if model is None:
models = self.models
else:
if isinstance(model, str):
models = [model] + self.models
else:
models = model + self.models
if not models:
raise RuntimeError(f"No model is available. models: {models}")
model_error_count = 0
last_error = None
for model_name in models:
if model_error_count > 0:
logger.warning(
f"Trying {model_name} due to previous model failure"
)
# max_retries *additional* attempts after the first try
for attempt in range(max_retries + 1):
try:
message = await self._acompletion(
history,
model=model_name,
tool_use=tool_use,
response_format=response_format,
process_chunk=process_chunk,
allow_transfer=allow_transfer,
context_variables=context_variables,
)
return message
except StopRunning:
raise
except Exception as e:
last_error = e
import traceback
logger.error(f"[Agent:{self.name}] Full traceback:\n{traceback.format_exc()}")
if _is_retryable_error(e) and attempt < max_retries:
delay = min(base_delay * (2 ** attempt), max_delay)
delay *= random.uniform(1 - jitter, 1 + jitter)
delay = max(delay, 0)
logger.warning(
f"[Agent:{self.name}] Transient error on {model_name} "
f"(attempt {attempt + 1}/{max_retries + 1}): "
f"{type(e).__name__}: {e}. Retrying in {delay:.1f}s..."
)
await asyncio.sleep(delay)
continue
# Non-retryable or retries exhausted — log and move to next model
error_type = type(e).__name__
if "RateLimitError" in error_type or "rate" in str(e).lower():
logger.error(
f"Rate limit exceeded for {model_name} after retries: {e}"
)
else:
logger.error(f"Error completing with model {model_name}: {e}")
model_error_count += 1
break # break retry loop, continue to next model
# All models failed
raise RuntimeError(
f"All {len(models)} model(s) failed after {model_error_count} attempts. "
f"Models tried: {models}. Last error: {last_error}"
)
def _render_system_prompt(self, prompt: str, context_variables: dict) -> str:
"""Render system prompt — delegates to pantheon.internal.system_prompt."""
from pantheon.internal.system_prompt import render_system_prompt
return render_system_prompt(prompt, context_variables)
async def _run_stream(
self,
messages: list[dict],
process_chunk: Callable | None = None,
process_step_message: Callable | None = None,
check_stop: Callable | None = None,
max_turns: int | float = float("inf"),
context_variables: dict | None = None,
response_format: Any | None = None,
tool_use: bool = True,
tool_timeout: int | None = None,
model: str | list[str] | None = None,
allow_transfer: bool = True,
execution_context_id: str
| None = None, # NEW: Context ID for message filtering
) -> ResponseDetails | AgentTransfer:
response_format = response_format or self.response_format
history = copy.deepcopy(messages)
# Use _llm_content for LLM if present (assembled user messages)
for msg in history:
if "_llm_content" in msg:
msg["content"] = msg["_llm_content"]
# Expand file:// image references to Base64 for LLM API call
from .utils.vision import expand_image_references_for_llm
history = expand_image_references_for_llm(history)
tool_timeout = tool_timeout or self.tool_timeout
# Use instructions directly - all prompt composition happens at template parsing time
# Render system prompt: substitutes ${{}} variables and appends standard context blocks
system_prompt = self._render_system_prompt(
self.instructions, context_variables or {}
)
current_timestamp = time.time()
if (len(history) > 0) and (history[0]["role"] == "system"):
history[0]["content"] = system_prompt
history[0]["timestamp"] = current_timestamp
if "id" not in history[0]:
history[0]["id"] = str(uuid4())
# Mark system message with execution_context_id (not done in _process_step_message)
if execution_context_id is not None:
history[0]["execution_context_id"] = execution_context_id
else:
system_msg = {
"role": "system",
"content": system_prompt,
"timestamp": current_timestamp,
"id": str(uuid4()),
}
# Mark system message with execution_context_id (not done in _process_step_message)
if execution_context_id is not None:
system_msg["execution_context_id"] = execution_context_id
history.insert(0, system_msg)
init_len = len(history)
context_variables = context_variables or {}
if response_format:
Response = create_model("Response", result=(response_format, ...))
else:
Response = None
async def _process_chunk(chunk: dict):
if process_chunk is not None:
await run_func(process_chunk, chunk)
if check_stop is not None and check_stop(chunk):
raise StopRunning()
while len(history) - init_len < max_turns:
# Build history for LLM with ephemeral messages from hooks (not persisted)
history_for_llm = list(history)
for hook in self._ephemeral_hooks:
plugin_msgs = await hook(history, context_variables or {})
history_for_llm.extend(plugin_msgs)
# Inject background task completion notifications (ephemeral)
bg_notifs = self._bg_manager.drain_notifications()
if bg_notifs:
lines = []
for bg_task in bg_notifs:
summary = self._bg_manager.to_summary(bg_task)
result_preview = str(summary.get("result", ""))[:500]
lines.append(
f"- task_id='{bg_task.task_id}', tool='{bg_task.tool_name}', "
f"status='{bg_task.status}', result: {result_preview}"
)
notif_msg = {
"role": "user",
"content": (
"<EPHEMERAL_MESSAGE>\n"
"[Background Task Notification] The following background tasks have completed:\n"
+ "\n".join(lines)
+ "\nPlease inform the user about these results."
+ "\n</EPHEMERAL_MESSAGE>"
),
}
history_for_llm = list(history_for_llm) + [notif_msg]
message = await self._acompletion_with_models(
history_for_llm,
tool_use,
Response,
_process_chunk,
allow_transfer,
model=model,
context_variables=context_variables,
)
if Response is not None:
content = message.get("content")
if content:
parsed = Response.model_validate_json(content)
message["parsed"] = parsed.result
message["agent_name"] = self.name
history.append(message)
self.events_queue.put_nowait(message)
if process_step_message:
await run_func(process_step_message, message)
# If no tool calls, stop conversation
# BUT: If we have reasoning content WITHOUT actual content, continue to let the model output its conclusion/tool calls
# This handles models that output reasoning in one turn and content/tools in the next
# However, if the model outputs both reasoning AND content in the same turn (e.g., Gemini),
# we should stop to avoid an unnecessary extra loop that returns empty content
has_reasoning = message.get("reasoning_content") and str(message.get("reasoning_content")).strip()
has_content = message.get("content") and str(message.get("content")).strip()
# Stop if:
# 1. No tool calls AND no reasoning (normal completion)
# 2. No tool calls AND has reasoning BUT also has content (reasoning + content in same turn)
# Continue only if: has reasoning but NO content (waiting for model to output conclusion)
if not message.get("tool_calls"):
if not has_reasoning or has_content:
break
tool_messages = await self._handle_tool_calls(
message.get("tool_calls") or [],
context_variables=context_variables,
timeout=tool_timeout,
check_stop=check_stop,
)
# Process tool calls for state tracking via hooks
for hook in self._tool_tracking_hooks:
await hook(
message.get("tool_calls") or [],
tool_messages,
context_variables or {},
)
# Filter out all transfer messages - they will be handled in _prepare_execution_context()
# Both call_agent and transfer_to_* transfers should skip history addition here
non_transfer_messages = []
transfer_message = None
interrupt_message = None
for msg in tool_messages:
if msg.get("transfer"):
# Skip all transfer messages
transfer_message = msg
else:
# Check for notify_user interrupt
raw_content = msg.get("raw_content", {})
if isinstance(raw_content, dict) and raw_content.get("interrupt"):
interrupt_message = msg
# Keep regular tool messages
non_transfer_messages.append(msg)
history.extend(non_transfer_messages)
for msg in non_transfer_messages:
self.events_queue.put_nowait(msg)
if process_step_message:
for msg in process_messages_for_hook_func(non_transfer_messages):
msg["agent_name"] = self.name # Add agent_name to tool messages
await run_func(process_step_message, msg)
if transfer_message:
# Add the transfer tool result to history so the next agent sees it
# This ensures every tool_call has a corresponding tool response
# Without this, OpenAI API will reject: "tool_calls must be followed by tool messages"
transfer_tool_result = {
"role": "tool",
"tool_call_id": transfer_message.get("tool_call_id"),
"content": f"Transferred to {transfer_message['content']}",
}
history.append(transfer_tool_result)
transfer = AgentTransfer(
from_agent=self.name,
to_agent=transfer_message["content"],
history=history,
context_variables=context_variables,
init_message_length=init_len,
tool_call_id=transfer_message.get("tool_call_id"),
)
return transfer
# Handle notify_user interrupt - break loop to return control to user
if interrupt_message:
break
return ResponseDetails(
messages=history[init_len:],
context_variables=context_variables,
execution_context_id=execution_context_id, # NEW: Pass context ID through response
)
async def _input_to_openai_messages(
self,
msg: AgentInput,
) -> list[dict]:
"""Convert input to OpenAI message format.
Handles conversion of user input types to standardized message dicts:
- VisionInput: Converts vision input to messages
- BaseModel: Converts to JSON user message
- str: Wraps in user message dict
- list: Processes each item and accumulates messages
Note: AgentTransfer is NOT handled here - it's processed directly in
_prepare_execution_context() before calling this method.
Args:
msg: The input message (user input types, not AgentTransfer)
Returns:
List of message dicts in OpenAI format
"""
# Lazy import to avoid loading PIL/numpy at module load time
from .utils.vision import VisionInput, vision_to_openai
assert isinstance(
msg, (list, str, BaseModel, AgentResponse, VisionInput, dict)
), (
"Message must be a list, string, BaseModel, AgentResponse, VisionInput, or dict"
)
if isinstance(msg, AgentResponse):
# For accepting the result of previous run or other agent
msg = msg.content
# Convert message to OpenAI message format based on type
if isinstance(msg, dict):
# Handle dict messages (e.g., tool_message from team.run() loop)
# These are already in OpenAI message format
if msg.get("role") == "tool":
# Tool message from sub-agent completion
converted_messages = [msg]
else:
# Regular dict message, wrap as user message
converted_messages = [_create_user_message(msg)]
elif isinstance(msg, VisionInput):
# Vision input: use specialized converter
converted_messages = vision_to_openai(msg)
elif isinstance(msg, BaseModel):
# BaseModel input: convert to JSON user message
converted_messages = [_create_user_message(msg.model_dump_json())]
elif isinstance(msg, str):
# String input: wrap in user message
converted_messages = [_create_user_message(msg)]
elif isinstance(msg, list):
# List input: process each item and accumulate
converted_messages = []
for item in msg:
if isinstance(item, str):
# String item: create user message
converted_messages.append(_create_user_message(item))
elif isinstance(item, VisionInput):
# Vision input: extend with converted vision messages
converted_messages.extend(vision_to_openai(item))
elif isinstance(item, BaseModel):
# BaseModel item: convert to JSON user message
converted_messages.append(
_create_user_message(item.model_dump_json())
)
else:
# Dict item: use as-is
assert isinstance(item, dict), (
"Message must be a string, BaseModel or dict"
)
converted_messages.append(item)
return converted_messages
async def _collect_injections(self, input_text: str, injector_context: dict) -> list[dict]:
"""Collect injections from all registered context injectors."""
pending_injections = []
for injector in self.context_injectors:
try:
injected_content = await injector.inject(input_text, injector_context)
if injected_content:
pending_injections.append({
"content": injected_content,
"source": type(injector).__name__
})
except Exception as e:
logger.error(f"Context injection failed: {e}", exc_info=True)
return pending_injections
async def _inject_context_to_messages(
self,
messages: list[dict],
context_variables: dict,
) -> None:
"""
Inject dynamic context into user messages via context injectors.
Modifies messages in-place by appending injected content to _llm_content field.
Only injects into user messages (role="user").
"""
if not self.context_injectors:
return
# Create _call_agent callback definition
async def _call_agent_wrap(
messages: list,
system_prompt: str | None = None,
model: str | None = None,
use_memory: bool = False,
) -> dict:
memory = self.memory[:-1] if use_memory else None # Exclude current message
preferred_model = get_current_run_model() or (self.models[0] if self.models else None)
return await _call_agent(
messages=messages,
system_prompt=system_prompt,
model=_resolve_model_spec_with_current_provider(
model,
current_model=preferred_model,
),
memory=memory,
)
inherited_model = get_current_run_model()
caller_models = list(self.models)
if inherited_model:
caller_models = [inherited_model, *[
candidate for candidate in caller_models
if candidate != inherited_model
]]
# Build context for injectors
injector_context = {
"agent_name": self.name,
"_call_agent": _call_agent_wrap,
"caller_models": caller_models,
**context_variables,
}
for message in messages:
# Only inject into user messages
if message.get("role") != "user":
continue
input_text = _get_message_text(message)
if not input_text:
continue
injections = await self._collect_injections(input_text, injector_context)
if injections:
_apply_injections(message, injections)
async def _prepare_execution_context(
self,
msg: AgentInput,
execution_context_id: str | None = None,
context_variables: dict | None = None,
memory: "Memory | None" = None,
use_memory: bool | None = None,
) -> ExecutionContext:
"""Prepare execution context based on input type.
Handles two paths:
1. AgentTransfer (delegated from another agent): Uses transfer's history and context
2. Normal user input: Converts input to messages and optionally merges with memory
"""
# Determine whether to use memory
should_use_memory = use_memory if use_memory is not None else self.use_memory
memory_instance = memory or self.memory
working_context_variables = (context_variables or {}).copy()
fork_context_messages = working_context_variables.pop(
"_cache_safe_fork_context_messages",
None,
)
input_messages = None # Only set for normal user input, not AgentTransfer
if isinstance(msg, AgentTransfer):
# Delegation path: use transfer's history and context
conversation_history = self._filter_messages_by_execution_context(
msg.history, execution_context_id
)
conversation_history = self._sanitize_messages(conversation_history)
context_variables = msg.context_variables
else:
# User input path: convert input to messages
input_messages = await self._input_to_openai_messages(msg)
# Process images: convert Base64 to file:// paths for efficient storage
from .utils.vision import get_image_store
image_store = get_image_store()
chat_id = memory_instance.id if memory_instance else "default"
for m in input_messages:
image_store.process_message_images(m, chat_id)
logger.debug(
f"Input messages: {input_messages} , memory_length: {len(memory_instance.get_messages(execution_context_id=execution_context_id, for_llm=False))} "
f"raw memory_length: {len(memory_instance.get_messages(for_llm=False))} memory_id: {memory_instance.id}"
)
conversation_history = []
if should_use_memory and memory_instance:
from pantheon.repl.conversationRecovery import loadConversationForResume
from pantheon.repl.sessionRestore import processResumedConversation
raw_history = memory_instance.get_messages(
execution_context_id=execution_context_id,
for_llm=False,
)
resume_result = loadConversationForResume(
memory_instance,
execution_context_id=execution_context_id,
)
if resume_result is not None:
processed_resume = await processResumedConversation(
resume_result,
{"forkSession": False},
{"memory": memory_instance, "initialState": {}},
)
conversation_history = processed_resume.get("messages", [])
logger.info(
"[resume] agent={} execution_context_id={} raw_history={} resumed_history={}",
self.name,
execution_context_id,
len(raw_history),
len(conversation_history),
)
else:
conversation_history = raw_history
if isinstance(fork_context_messages, list) and fork_context_messages:
conversation_history = [
*copy.deepcopy(fork_context_messages),
*conversation_history,
]
conversation_history += input_messages
conversation_history = self._sanitize_messages(conversation_history)
# preserve execution_context_id if tool need
context_variables = working_context_variables
# Inject global context variables from settings
from .settings import get_settings
context_variables.update(get_settings().get_context_variables())
if execution_context_id is not None:
context_variables["execution_context_id"] = execution_context_id
# preserve client_id from context_variables or memory
context_variables[_CLIENT_ID_NAME] = context_variables.get(
_CLIENT_ID_NAME,
memory_instance.id,
)
# Apply context injectors to user's NEW input messages only
# (not the entire conversation_history to avoid re-injecting into memory)
if self.context_injectors and input_messages:
await self._inject_context_to_messages(
input_messages,
context_variables,
)
return ExecutionContext(
conversation_history=conversation_history,
context_variables=context_variables or {},
execution_context_id=execution_context_id,
should_use_memory=should_use_memory,
memory_instance=memory_instance,
input_messages=input_messages, # NEW: Pass converted messages for memory update
)
[docs]
async def run(
self,
msg: AgentInput,
response_format: Any | None = None,
tool_use: bool = True,
context_variables: dict | None = None,
process_chunk: Callable | None = None,
process_step_message: Callable | None = None,
check_stop: Callable | None = None,
memory: "Memory | None" = None,
use_memory: bool | None = None,
update_memory: bool = True,
tool_timeout: int | None = None,
model: str | list[str] | None = None,
allow_transfer: bool = True,
execution_context_id: str | None = None,
) -> AgentResponse | AgentTransfer:
"""Run the agent.
Args:
msg: The input message to the agent.
response_format: The response format to use.
tool_use: Whether to use tools.
context_variables: Runtime variables available to tools during execution.
These are stored in ExecutionContext.runtime_context and injected
into tools that request them via the 'context_variables' parameter.
Example: {'user_id': '123', 'session_id': 'abc'}
process_chunk: The function to process the chunk.
process_step_message: The function to process the step message.
check_stop: The function to check if the agent should stop.
memory: The memory to use.
use_memory: Whether to use short term memory.
update_memory: Whether to update the short term memory.
tool_timeout: The timeout for the tool.
model: The model to use in this run.
Could be a list of models for fallback.
If not provided, the model will be selected from the agent's models.
allow_transfer: Whether to allow transfer to another agent.
execution_context_id: Unique identifier for sub-agent delegation contexts.
Used for message filtering in _run_stream to isolate messages by
delegation context. None for primary/team agents.
Returns:
The agent response. Either an AgentResponse or an AgentTransfer.
If the agent is interrupted, the AgentResponse will have the interrupt flag set to True.
If the agent is transferring to another agent, the AgentTransfer will be returned.
"""
# Prepare execution context based on input type
# This handles both AgentTransfer delegation and normal user input paths
exec_context = await self._prepare_execution_context(
msg,
execution_context_id=execution_context_id,
context_variables=context_variables,
memory=memory,
use_memory=use_memory,
)
# Prepare response format
response_format_to_use = response_format or self.response_format
# ============ Unified message processing: Always detect attachments ============
async def _process_step_message(step_message: dict):
# Validate message before processing (prevent empty messages in memory)
role = step_message.get("role")
if role is None or (isinstance(role, str) and not role.strip()):
logger.warning(
f"Dropping invalid message without role: {step_message}"
)
return # Skip invalid message
# Get execution_context_id from ExecutionContext
if (
exec_context.execution_context_id is not None
and "execution_context_id" not in step_message
):
step_message["execution_context_id"] = exec_context.execution_context_id
await _detect_attachments(step_message)
if update_memory and exec_context.memory_instance:
exec_context.memory_instance.add_messages([step_message])
if process_step_message is not None:
try:
await run_func(process_step_message, step_message)
except Exception as e:
logger.error(f"Error in process_step_message: {e}")
async def _process_chunk(chunk: dict):
if (
exec_context.execution_context_id is not None
and "execution_context_id" not in chunk
):
chunk["execution_context_id"] = exec_context.execution_context_id
if process_chunk is not None:
try:
await run_func(process_chunk, chunk)
except Exception as e:
logger.error(f"Error in process_chunk: {e}")
# now user input also goes through the pipeline
# all messages: (user input, sub agent mock user input, sub agent transfer tool response)
for msg in exec_context.input_messages or []:
await _process_step_message(msg)
try:
run_context = AgentRunContext(
agent=self,
memory=exec_context.memory_instance,
execution_context_id=exec_context.execution_context_id,
process_step_message=_process_step_message,
process_chunk=_process_chunk,
)
token = _RUN_CONTEXT.set(run_context)
run_result = await self._run_stream(
messages=exec_context.conversation_history,
response_format=response_format_to_use,
tool_use=tool_use,
context_variables=exec_context.context_variables,
process_chunk=_process_chunk,
process_step_message=_process_step_message,
check_stop=check_stop,
tool_timeout=tool_timeout,
model=model,
allow_transfer=allow_transfer,
execution_context_id=exec_context.execution_context_id,
)
except StopRunning as e:
logger.info("StopRunning")
partial_content = ""
if update_memory and exec_context.memory_instance:
# Save partial assistant message if the LLM was mid-stream
partial = e.partial_message
if partial and isinstance(partial, dict):
content = partial.get("content")
if content and str(content).strip():
partial.setdefault("role", "assistant")
partial["agent_name"] = self.name
exec_context.memory_instance.add_messages([partial])
partial_content = str(content)
exec_context.memory_instance.cleanup()
return AgentResponse(
agent_name=self.name,
content=partial_content,
details=None,
interrupt=True,
)
finally:
if "token" in locals():
_RUN_CONTEXT.reset(token)
# Handle response - could be AgentTransfer or ResponseDetails
if isinstance(run_result, AgentTransfer):
return run_result
else:
# Extract content from the last message
last_message = run_result.messages[-1]
if response_format_to_use:
content = last_message.get("parsed")
else:
content = last_message.get("content")
return AgentResponse(
agent_name=self.name,
content=content,
details=run_result,
)
# ===== Event-Driven Loop =====
[docs]
async def run_loop(
self,
process_chunk: Callable | None = None,
process_step_message: Callable | None = None,
check_stop: Callable | None = None,
context_variables: dict | None = None,
memory: "Memory | None" = None,
tool_timeout: int | None = None,
model: str | list[str] | None = None,
on_response: Callable | None = None,
on_error: Callable | None = None,
) -> None:
"""Persistent event-driven loop consuming messages from input_queue.
Blocks until stop_loop() is called or the task is cancelled.
Each message dequeued triggers a full agent.run() cycle.
Background task completions auto-enqueue notifications, so the
agent is automatically triggered when bg tasks finish.
Args:
process_chunk: Streaming chunk callback (forwarded to run()).
process_step_message: Step message callback (forwarded to run()).
check_stop: Stop check callback (forwarded to run()).
context_variables: Shared context variables for all runs.
memory: Memory instance to use.
tool_timeout: Tool timeout (forwarded to run()).
model: Model override (forwarded to run()).
on_response: Callback(AgentResponse) after each successful run.
on_error: Callback(Exception) when a run fails.
"""
self._loop_running = True
self._bg_manager.on_complete = self._on_bg_task_complete_for_loop
try:
while self._loop_running:
msg = await self.input_queue.get()
if msg is None: # Sentinel from stop_loop()
self.input_queue.task_done()
break
try:
response = await self.run(
msg=msg,
process_chunk=process_chunk,
process_step_message=process_step_message,
check_stop=check_stop,
context_variables=context_variables,
memory=memory,
tool_timeout=tool_timeout,
model=model,
)
if on_response:
await run_func(on_response, response)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"run_loop error: {e}")
if on_error:
try:
await run_func(on_error, e)
except Exception:
pass
finally:
self.input_queue.task_done()
finally:
self._loop_running = False
self._bg_manager.on_complete = None
[docs]
def stop_loop(self):
"""Signal run_loop() to exit after current iteration."""
self._loop_running = False
self.input_queue.put_nowait(None) # Sentinel to unblock .get()
def _on_bg_task_complete_for_loop(self, bg_task):
"""Push lightweight trigger to input_queue on bg task completion.
The real task data comes from drain_notifications() in _run_stream
(ephemeral message injection, already implemented).
"""
try:
self.input_queue.put_nowait(
f"[Background task '{bg_task.task_id}' ({bg_task.tool_name}) "
f"completed with status: {bg_task.status}]"
)
except Exception:
pass
[docs]
def setup_bg_notify_queue(self, queue: asyncio.Queue):
"""Wire bg task completion to an external asyncio.Queue.
Alternative to run_loop() for frontends that manage their own
event loops (e.g. REPL with prompt_toolkit, ChatRoom server).
Args:
queue: The asyncio.Queue to push notification strings into.
"""
def _on_complete(bg_task):
status = bg_task.status
result_preview = ""
if bg_task.result is not None:
result_preview = str(bg_task.result)[:200]
elif bg_task.error:
result_preview = bg_task.error[:200]
notif = (
f"[Background task '{bg_task.task_id}' ({bg_task.tool_name}) "
f"{status}. Result: {result_preview}]"
)
try:
queue.put_nowait(notif)
except Exception:
pass
self._bg_manager.on_complete = _on_complete
# FIX: agent should not call REPL, REPL call agent instead
[docs]
async def chat(self, message: str | dict | None = None):
"""Chat with the agent with a REPL interface."""
from .repl.core import Repl
repl = Repl(self)
await repl.run(message)
[docs]
async def serve(self, **kwargs):
"""Serve the agent to a remote server."""
service = AgentService(self, **kwargs)
return await service.run()
# ===== Utility Functions =====
def _create_user_message(content: str) -> dict:
"""Create a standard user message dict."""
return {
"role": "user",
"content": content,
"timestamp": time.time(),
"id": str(uuid4()),
}
async def _detect_attachments(step_message: dict) -> None:
"""Helper: Detect attachments in a message (independent of memory saving)."""
# Attachment detection is currently disabled
return
async def _call_agent(
messages: list,
system_prompt: Optional[str],
model: Optional[str | list[str]] = None,
memory: "Memory | None" = None,
) -> dict:
"""call agent callback to let toolset use llm agent to sample response
Returns:
dict with keys:
- success: bool
- response: str (if success)
- error: str (if not success)
- _metadata: dict with cost info (if success)
- current_cost: float - the cost of this nested LLM call
"""
from .background import _bg_report, _bg_output_buffer
current_run_model = get_current_run_model()
inherited_model = _resolve_model_spec_with_current_provider(
model or current_run_model,
current_model=current_run_model,
)
# Progress callback for background context: reports each sub-agent message
progress_cb = None
if _bg_output_buffer.get() is not None:
async def _on_step_message(msg):
role = msg.get("role", "")
content = str(msg.get("content", "") or "")
if role == "assistant" and content:
preview = content[:300]
if len(content) > 300:
preview += "..."
_bg_report(f"[agent] {preview}")
elif role == "tool":
tool_name = msg.get("name", msg.get("tool_call_id", "tool"))
_bg_report(f"[agent] Tool result from {tool_name}")
progress_cb = _on_step_message
# not tested with remote mode, should work naturally with reverse call support
try:
# Create temporary Agent
agent = Agent(
name="sampler",
model=inherited_model,
instructions=system_prompt or "You are a helpful assistant.",
memory=memory,
)
_bg_report(f"[agent] Sub-agent starting (model={inherited_model or 'default'})")
# Run Agent with the user query
result = await agent.run(
messages,
use_memory=False,
update_memory=False,
process_step_message=progress_cb,
)
_bg_report("[agent] Sub-agent completed")
# Extract cost from the agent response
nested_cost = 0.0
if result and result.details and result.details.messages:
# Find the last assistant message which contains cost info
for msg in reversed(result.details.messages):
if msg.get("role") == "assistant" and "_metadata" in msg:
nested_cost = msg.get("_metadata", {}).get("current_cost", 0.0)
break
return {
"success": True,
"response": result.content,
"_metadata": {
"current_cost": nested_cost,
},
}
except Exception as e:
_bg_report(f"[agent] Sub-agent failed: {e}")
# log stack trace
logger.opt(exception=True).info("Error in agent sampling: {}", e)
return {
"success": False,
"error": str(e),
}