Microsoft AutoGen Handbook
A production-ready reference for building enterprise multi-agent systems with AutoGen 0.7 — covering models, agents, teams, memory, handoffs, MCP, and Studio integration.
Installation
AutoGen is distributed as three complementary packages. Install them together for the full experience.
# Core + AgentChat + Extensions (with OpenAI and Azure support) pip install -U "autogen-agentchat" "autogen-ext[openai,azure]" # Add Anthropic model support pip install -U "autogen-ext[anthropic]" # Add Ollama (local models) support pip install -U "autogen-ext[ollama]" # Add Docker-based code execution pip install -U "autogen-ext[docker]" # All-in-one (kitchen sink) pip install -U "autogen-agentchat" "autogen-ext[openai,azure,anthropic,docker]"
async/await. In scripts, wrap your entry point with asyncio.run(main()). In Jupyter, await main() works directly.Package Architecture
| Package | Purpose | Key Imports |
|---|---|---|
| autogen-agentchat | High-level agents, teams, termination conditions, and UI utilities | AssistantAgent, RoundRobinGroupChat, Console |
| autogen-ext | Extensions: model clients, code executors, auth providers, MCP bridges | OpenAIChatCompletionClient, DockerCommandLineCodeExecutor |
| autogen-core | Low-level runtime, event system, message types, logging constants | EVENT_LOGGER_NAME, CancellationToken, UserMessage |
Architecture Overview
AutoGen’s modern architecture (v0.4+) cleanly separates concerns into three layers. This handbook targets the stable 0.7.x release line.
ChatCompletionClient), event logger, cancellation tokens. The foundation everything builds on.AssistantAgent, team orchestrators (RoundRobinGroupChat, SelectorGroupChat, Swarm), termination conditions, and streaming UI.autogen_agentchat, autogen_ext, and autogen_core. Do not mix with old import autogen patterns.Module 1: Core Models & Enterprise Integration
AutoGen decouples model access from agent logic through the ChatCompletionClient protocol. Extensions provide concrete implementations for every major provider. This module shows how to initialize each client and wire up production-grade logging.
1.1 — OpenAI Chat Completion Client
The OpenAIChatCompletionClient from autogen_ext.models.openai connects to OpenAI’s API. It supports GPT-4o, GPT-4 Turbo, o1, and any model available on the OpenAI platform.
import asyncio from autogen_core.models import UserMessage from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: # Initialize the OpenAI client. # If OPENAI_API_KEY is set as an env var, api_key can be omitted. model_client = OpenAIChatCompletionClient( model="gpt-4o", # api_key="sk-...", # Or set OPENAI_API_KEY env var ) # Send a single message and inspect the result. result = await model_client.create( [UserMessage(content="Explain multi-agent systems in one sentence.", source="user")] ) # Result contains content, finish_reason, and token usage. print(result.content) print(f"Prompt tokens: {result.usage.prompt_tokens}") print(f"Completion tokens: {result.usage.completion_tokens}") # Always close the client when done to release connections. await model_client.close() asyncio.run(main())
OpenAIChatCompletionClient at any OpenAI-compatible API (vLLM, LiteLLM, Gemini) by passing base_url and a model_info dict describing capabilities.Gemini via OpenAI-Compatible Endpoint
from autogen_core.models import ModelInfo, UserMessage from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: # Use Gemini through its OpenAI-compatible API. gemini_client = OpenAIChatCompletionClient( model="gemini-2.0-flash-lite", # api_key="GEMINI_API_KEY", model_info=ModelInfo( vision=True, function_calling=True, json_output=True, family="unknown", structured_output=True, ), ) result = await gemini_client.create( [UserMessage(content="What is AutoGen?", source="user")] ) print(result.content) await gemini_client.close()
1.2 — Azure OpenAI Chat Completion Client
For enterprise workloads, AzureOpenAIChatCompletionClient offers Azure-native authentication via AAD tokens or API keys, plus compliance with Azure’s data-residency guarantees.
Option A: Azure AD (Entra ID) Authentication
import asyncio from autogen_core.models import UserMessage from autogen_ext.auth.azure import AzureTokenProvider from autogen_ext.models.openai import AzureOpenAIChatCompletionClient from azure.identity import DefaultAzureCredential async def main() -> None: # Create an AAD token provider scoped to Cognitive Services. # The identity must have "Cognitive Services OpenAI User" role. token_provider = AzureTokenProvider( DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default", ) # Initialize the Azure OpenAI client with your deployment details. az_client = AzureOpenAIChatCompletionClient( azure_deployment="my-gpt4o-deployment", model="gpt-4o", api_version="2024-06-01", azure_endpoint="https://my-resource.openai.azure.com/", azure_ad_token_provider=token_provider, ) result = await az_client.create( [UserMessage(content="Summarize Azure OpenAI compliance features.", source="user")] ) print(result.content) await az_client.close() asyncio.run(main())
Option B: API Key Authentication
import asyncio from autogen_core.models import UserMessage from autogen_ext.models.openai import AzureOpenAIChatCompletionClient async def main() -> None: # API-key auth ā simpler, but less secure than AAD for production. az_client = AzureOpenAIChatCompletionClient( azure_deployment="my-gpt4o-deployment", model="gpt-4o", api_version="2024-06-01", azure_endpoint="https://my-resource.openai.azure.com/", api_key="your-azure-api-key", ) result = await az_client.create( [UserMessage(content="What are the benefits of Azure OpenAI?", source="user")] ) print(result.content) await az_client.close() asyncio.run(main())
DefaultAzureCredential which supports managed identities, workload federation, and interactive browser auth as fallback.1.3 — Anthropic, Ollama & Other Providers
AutoGen supports alternative model providers through dedicated extension clients. Each follows the same ChatCompletionClient protocol, making them interchangeable with any agent.
Anthropic (Claude)
# pip install "autogen-ext[anthropic]" import asyncio from autogen_core.models import UserMessage from autogen_ext.models.anthropic import AnthropicChatCompletionClient async def main() -> None: # Initialize the Anthropic client. # Requires ANTHROPIC_API_KEY env var or explicit api_key param. anthropic_client = AnthropicChatCompletionClient( model="claude-3-7-sonnet-20250219", ) result = await anthropic_client.create( [UserMessage(content="What is the capital of France?", source="user")] ) print(result.content) print(f"Usage: {result.usage}") await anthropic_client.close() asyncio.run(main())
Ollama (Local Models)
# pip install "autogen-ext[ollama]" # Ensure ollama is running: ollama serve import asyncio from autogen_core.models import UserMessage from autogen_ext.models.ollama import OllamaChatCompletionClient async def main() -> None: # Connect to a locally running Ollama server on default port 11434. ollama_client = OllamaChatCompletionClient( model="llama3.2", ) result = await ollama_client.create( [UserMessage(content="Explain transformers briefly.", source="user")] ) print(result.content) await ollama_client.close() asyncio.run(main())
Azure AI Foundry (GitHub Models, Phi-4, etc.)
# pip install "autogen-ext[azure]" import asyncio import os from autogen_core.models import UserMessage from autogen_ext.models.azure import AzureAIChatCompletionClient from azure.core.credentials import AzureKeyCredential async def main() -> None: # Access models hosted on Azure AI Foundry or GitHub Models. client = AzureAIChatCompletionClient( model="Phi-4", endpoint="https://models.github.ai/inference", credential=AzureKeyCredential(os.environ["GITHUB_TOKEN"]), model_info={ "json_output": False, "function_calling": False, "vision": False, "family": "unknown", "structured_output": False, }, ) result = await client.create( [UserMessage(content="What is the capital of France?", source="user")] ) print(result.content) await client.close() asyncio.run(main())
ChatCompletionClient protocol. You can inject any client into any agent — switch from OpenAI to Anthropic to Ollama by changing one line.Provider Quick-Reference
| Provider | Install Extra | Client Class | Module |
|---|---|---|---|
| OpenAI | openai |
OpenAIChatCompletionClient |
autogen_ext.models.openai |
| Azure OpenAI | openai,azure |
AzureOpenAIChatCompletionClient |
autogen_ext.models.openai |
| Azure AI Foundry | azure |
AzureAIChatCompletionClient |
autogen_ext.models.azure |
| Anthropic | anthropic |
AnthropicChatCompletionClient |
autogen_ext.models.anthropic |
| Ollama | ollama |
OllamaChatCompletionClient |
autogen_ext.models.ollama |
| Gemini / vLLM / LiteLLM | openai |
OpenAIChatCompletionClient + model_info |
autogen_ext.models.openai |
1.4 — Logging & Telemetry
AutoGen uses Python’s standard logging module with a dedicated event logger. Subscribing to EVENT_LOGGER_NAME from autogen_core gives you visibility into every model call, token count, and agent action — no third-party APM required.
import asyncio import logging from autogen_core import EVENT_LOGGER_NAME from autogen_core.models import UserMessage from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: # --- Configure the AutoGen event logger --- # Set WARNING on root logger to suppress noise, then # enable INFO specifically on AutoGen's event logger. logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(EVENT_LOGGER_NAME) logger.setLevel(logging.INFO) # Add a console handler to see LLMCall events in real time. handler = logging.StreamHandler() handler.setFormatter( logging.Formatter("%(asctime)s [%(name)s] %(levelname)s: %(message)s") ) logger.addHandler(handler) # --- Use a model client (logging is automatic) --- model_client = OpenAIChatCompletionClient(model="gpt-4o") result = await model_client.create( [UserMessage(content="What is 2 + 2?", source="user")] ) print(f"\nAnswer: {result.content}") print(f"Prompt tokens: {result.usage.prompt_tokens}") print(f"Completion tokens: {result.usage.completion_tokens}") await model_client.close() asyncio.run(main())
LLMCall events containing the model name, messages sent, response content, and RequestUsage (prompt + completion token counts). You can route these to files, cloud telemetry, or structured JSON sinks.Structured File Logging for Production
import asyncio import json import logging from autogen_core import EVENT_LOGGER_NAME from autogen_core.models import UserMessage from autogen_ext.models.openai import OpenAIChatCompletionClient class JsonEventHandler(logging.Handler): """Custom handler that writes AutoGen events as structured JSON lines.""" def __init__(self, filepath: str) -> None: super().__init__() self._file = open(filepath, "a", encoding="utf-8") def emit(self, record: logging.LogRecord) -> None: # Each record.msg is a structured event object. entry = { "timestamp": record.created, "level": record.levelname, "event": str(record.msg), } self._file.write(json.dumps(entry) + "\n") self._file.flush() def close(self) -> None: self._file.close() super().close() async def main() -> None: # Attach the JSON handler to the AutoGen event logger. logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(EVENT_LOGGER_NAME) logger.setLevel(logging.INFO) logger.addHandler(JsonEventHandler("autogen_events.jsonl")) # Any model call now is automatically captured to the JSONL file. model_client = OpenAIChatCompletionClient(model="gpt-4o") result = await model_client.create( [UserMessage(content="Explain event-driven architectures.", source="user")] ) print(result.content) await model_client.close() print("\nā Events written to autogen_events.jsonl") asyncio.run(main())
LLMCall event includes token counts, letting you build real-time cost dashboards.Module 2: Agents & Custom Tools
Agents are the building blocks of every AutoGen application. This module covers the built-in AssistantAgent, creating custom tools (Python functions, FunctionTool, MCP), executing generated code safely, and building fully custom agents by subclassing BaseChatAgent.
FunctionTool, BaseTool subclasses, MCP server integration, and agent-as-tool patterns.CodeExecutorAgent with Docker or local sandboxes for safe LLM-generated code execution.2.1 — AssistantAgent: The All-Purpose Agent
AssistantAgent is the primary built-in agent for prototyping and production. It wraps a model client and supports tools, handoffs, structured output, streaming, and reflection — all through constructor arguments.
Basic Agent with System Message
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.messages import TextMessage from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="travel_assistant", model_client=model_client, system_message="You are a helpful travel assistant. Provide concise answers.", ) # on_messages() accepts a list of ChatMessage objects. response = await agent.on_messages( [TextMessage(content="What is the capital of France?", source="user")], cancellation_token=None, ) print(response.chat_message.content) # "The capital of France is Paris." await model_client.close() asyncio.run(main())
Single-Agent run() / run_stream() Shorthand
For simple single-agent tasks, use the run() or run_stream() methods instead of calling on_messages() directly.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="assistant", model_client=model_client, system_message="You are a helpful AI assistant.", ) # run_stream() yields events; Console renders them nicely. await Console(agent.run_stream(task="Explain async/await in Python.")) await model_client.close() asyncio.run(main())
Structured Output with Pydantic
Set output_content_type to a Pydantic model to force the agent to output validated structured data.
import asyncio from pydantic import BaseModel from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.messages import TextMessage, StructuredMessage from autogen_ext.models.openai import OpenAIChatCompletionClient class AgentResponse(BaseModel): thoughts: str answer: str async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="structured_agent", model_client=model_client, system_message="Think step by step then provide a final answer.", output_content_type=AgentResponse, ) response = await agent.on_messages( [TextMessage(content="What is 37 * 42?", source="user")], cancellation_token=None, ) # response.chat_message is a StructuredMessage[AgentResponse] structured: StructuredMessage[AgentResponse] = response.chat_message print(structured.content.thoughts) # "37 * 42 = 1554" print(structured.content.answer) # "1554" await model_client.close() asyncio.run(main())
Token Streaming
Enable token-by-token streaming to display partial responses in real time.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.messages import TextMessage, ModelClientStreamingChunkEvent from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="streaming_agent", model_client=model_client, system_message="You are a helpful assistant.", model_client_stream=True, # Enable token streaming ) # on_messages_stream() yields events including streaming chunks. async for message in agent.on_messages_stream( [TextMessage(content="Write a haiku about programming.", source="user")], cancellation_token=None, ): if isinstance(message, ModelClientStreamingChunkEvent): print(message.content, end="", flush=True) await model_client.close() asyncio.run(main())
Model Context (Limiting History)
For long conversations, limit the context window sent to the model using BufferedChatCompletionContext or TokenLimitedChatCompletionContext.
from autogen_agentchat.agents import AssistantAgent from autogen_core.model_context import BufferedChatCompletionContext from autogen_ext.models.openai import OpenAIChatCompletionClient model_client = OpenAIChatCompletionClient(model="gpt-4o") # Only send the last 5 messages to the model. agent = AssistantAgent( name="short_memory_agent", model_client=model_client, system_message="You are a concise assistant.", model_context=BufferedChatCompletionContext(buffer_size=5), )
BufferedChatCompletionContext(buffer_size=N) keeps the last N messages. TokenLimitedChatCompletionContext(max_tokens=N) trims to a token budget. Both are imported from autogen_core.model_context.AssistantAgent Key Parameters Reference
| Parameter | Type | Description |
|---|---|---|
name | str | Unique identifier used for speaker selection in teams |
model_client | ChatCompletionClient | The LLM client (OpenAI, Azure, Anthropic, etc.) |
system_message | str | System prompt prepended to every model call |
description | str | Used by teams (SelectorGroupChat) for speaker selection |
tools | list | Python callables or Tool objects the agent can invoke |
handoffs | list | Agents this agent can hand off to (for Swarm teams) |
reflect_on_tool_use | bool | If True, agent summarizes tool output in natural language |
parallel_tool_calls | bool | If False, disables parallel tool calling (required for AgentTool) |
max_tool_iterations | int | Max loops of tool call → result → next call (default: unlimited) |
output_content_type | type[BaseModel] | Pydantic model for structured output |
model_client_stream | bool | Enable token-by-token streaming |
model_context | ChatCompletionContext | Buffered or token-limited context window |
2.2 — Custom Tools
AutoGen agents use tools to interact with the external world. Any Python function can become a tool — AutoGen automatically wraps it as a FunctionTool using the function’s name, docstring, and type hints.
Python Function as Tool (Auto-Wrapped)
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient # Any Python function with type hints and a docstring becomes a tool. def get_weather(city: str) -> str: """Get the current weather for a given city.""" return f"The weather in {city} is 72°F and sunny." async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="weather_agent", model_client=model_client, tools=[get_weather], # Auto-wrapped as FunctionTool system_message="Use the get_weather tool to answer weather questions.", ) await Console(agent.run_stream(task="What's the weather in New York?")) await model_client.close() asyncio.run(main())
Explicit FunctionTool with Custom Name
from autogen_core.tools import FunctionTool def calculate_percentage(start: float, end: float) -> float: """Calculate percentage change between two values.""" return ((end - start) / start) * 100 # Explicit wrapping gives you control over name and description. pct_tool = FunctionTool( func=calculate_percentage, name="percentage_change", description="Compute the percentage change from a start value to an end value.", )
Reflect on Tool Use
By default, AssistantAgent returns the raw tool output as its response. Set reflect_on_tool_use=True to have the agent summarize tool results in natural language.
agent = AssistantAgent(
name="reflective_agent",
model_client=model_client,
tools=[get_weather],
reflect_on_tool_use=True, # Agent summarizes tool output
system_message="Use tools to find answers, then explain the result.",
)
Controlling Tool Call Behavior
# Disable parallel tool calls (call tools one at a time). agent = AssistantAgent( name="sequential_tools", model_client=model_client, tools=[tool_a, tool_b], parallel_tool_calls=False, # Required when using AgentTool ) # Limit the number of tool call iterations. agent = AssistantAgent( name="bounded_agent", model_client=model_client, tools=[search_tool], max_tool_iterations=10, # Stop after 10 rounds of tool calls )
MCP Server Tools via McpWorkbench
The Model Context Protocol (MCP) lets agents use tools hosted on external MCP servers. Use McpWorkbench with StdioServerParams to connect.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.tools.mcp import McpWorkbench, StdioServerParams async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # Connect to an MCP server running as a subprocess. params = StdioServerParams( command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "."], ) async with McpWorkbench(server_params=params) as workbench: agent = AssistantAgent( name="file_agent", model_client=model_client, workbench=workbench, # Provides MCP tools to the agent system_message="Use MCP tools to interact with the filesystem.", ) await Console(agent.run_stream(task="List all Python files in the current directory.")) await model_client.close() asyncio.run(main())
McpWorkbench as an async context manager (async with) to properly start and stop the MCP server subprocess.AgentTool: Wrapping an Agent as a Tool
Use AgentTool to let one agent call another as if it were a tool. The inner agent runs a full conversation and returns its result.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.tools import AgentTool from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # Inner agent: a specialist at writing poetry. poet_agent = AssistantAgent( name="poet", model_client=model_client, system_message="You are a poet. Write poems on the given topic.", ) # Wrap the poet as a tool. poet_tool = AgentTool(agent=poet_agent) # Outer agent: uses the poet tool when it needs a poem. orchestrator = AssistantAgent( name="orchestrator", model_client=model_client, tools=[poet_tool], parallel_tool_calls=False, # Required for AgentTool system_message="Help the user. When they want a poem, use the poet tool.", ) await Console(orchestrator.run_stream(task="Write me a haiku about the ocean.")) await model_client.close() asyncio.run(main())
AgentTool or TeamTool. These tools run full agent/team conversations and must not be called in parallel.2.3 — Code Execution with CodeExecutorAgent
CodeExecutorAgent extracts code blocks from incoming messages and executes them using a configured executor. This enables LLM-generated code to run in a sandboxed environment.
Preset Agents Overview
AutoGen provides several preset agent types beyond AssistantAgent:
| Agent | Import | Purpose |
|---|---|---|
AssistantAgent | autogen_agentchat.agents | General-purpose LLM agent with tools, handoffs, streaming |
CodeExecutorAgent | autogen_agentchat.agents | Extracts & executes code from messages |
UserProxyAgent | autogen_agentchat.agents | Prompts the user for input (human-in-the-loop) |
OpenAIAssistantAgent | autogen_ext.agents.openai | Wraps the OpenAI Assistants API |
MultimodalWebSurfer | autogen_ext.agents.web_surfer | Uses Playwright to browse the web |
FileSurfer | autogen_ext.agents.file_surfer | Reads and navigates local files |
Docker-Based Code Execution (Recommended)
import asyncio from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async with DockerCommandLineCodeExecutor(work_dir="coding_output") as executor: # The coding agent writes code. coder = AssistantAgent( name="coder", model_client=model_client, system_message="""Write Python code to solve tasks. Put all code in a single markdown code block (```python ... ```). When finished, reply with TERMINATE.""", ) # The executor agent runs the code in Docker. executor_agent = CodeExecutorAgent( name="executor", code_executor=executor, ) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat( [coder, executor_agent], termination_condition=termination, ) await Console(team.run_stream( task="Calculate the first 20 Fibonacci numbers and print them." )) await model_client.close() asyncio.run(main())
DockerCommandLineCodeExecutor in production. It runs code in an isolated container, preventing LLM-generated code from accessing your host filesystem or network. LocalCommandLineCodeExecutor runs code directly on the host — use it only in trusted development environments.Local Code Execution (Development Only)
from autogen_agentchat.agents import CodeExecutorAgent from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor # WARNING: Runs code directly on your machine. Use Docker in production. local_executor = LocalCommandLineCodeExecutor(work_dir="./output") executor_agent = CodeExecutorAgent( name="local_executor", code_executor=local_executor, )
Building a Custom Agent with BaseChatAgent
Need full control? Subclass BaseChatAgent to implement custom message handling, external API calls, or non-LLM logic.
import asyncio from typing import Sequence from autogen_agentchat.agents import BaseChatAgent from autogen_agentchat.base import Response from autogen_agentchat.messages import ( BaseAgentEvent, BaseChatMessage, TextMessage, ) from autogen_core import CancellationToken class CountdownAgent(BaseChatAgent): """A custom agent that counts down from a given number.""" def __init__(self, name: str, count: int = 3): super().__init__(name=name, description="Counts down from N to 0.") self._count = count @property def produced_message_types(self) -> list[type]: return [TextMessage] async def on_messages( self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken | None = None, ) -> Response: lines = [str(i) for i in range(self._count, 0, -1)] lines.append("Liftoff! 🚀") return Response( chat_message=TextMessage( content="\\n".join(lines), source=self.name, ) ) async def on_reset(self, cancellation_token: CancellationToken | None = None) -> None: pass # No state to reset async def main() -> None: agent = CountdownAgent(name="countdown", count=5) response = await agent.on_messages( [TextMessage(content="Start countdown!", source="user")], cancellation_token=None, ) print(response.chat_message.content) asyncio.run(main())
BaseChatAgent and implement three things: the produced_message_types property (list of message types this agent can produce), on_messages() (handle incoming messages and return a Response), and on_reset() (clear any internal state).Module 3: Multi-Agent Teams & Orchestration
Teams are the orchestration layer that coordinates multiple agents. AutoGen provides three built-in team types — RoundRobinGroupChat, SelectorGroupChat, and Swarm — plus composable termination conditions to control when teams stop.
HandoffMessage. Ideal for pipeline-style workflows and customer support bots.3.1 — RoundRobinGroupChat
Agents take turns in the order they are listed. After the last agent speaks, the cycle repeats from the first.
Basic Round-Robin: Coder + Reviewer
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") coder = AssistantAgent( name="coder", model_client=model_client, system_message="""You are a Python developer. Write clean, efficient code. When the reviewer approves, respond with APPROVE.""", ) reviewer = AssistantAgent( name="reviewer", model_client=model_client, system_message="""You are a code reviewer. Review the code for correctness, style, and edge cases. If it looks good, say APPROVE.""", ) termination = TextMentionTermination("APPROVE") team = RoundRobinGroupChat( [coder, reviewer], termination_condition=termination, ) # run_stream() drives the team's conversation and yields events. await Console(team.run_stream( task="Write a Python function to find the longest palindromic substring." )) await model_client.close() asyncio.run(main())
Resuming a Team Conversation
After a team run completes, you can resume the same conversation by calling run() or run_stream() again with a new task. To start fresh, call reset() first.
# Resume the same conversation with follow-up. await Console(team.run_stream( task="Now add type hints to the function." )) # OR: Reset and start a completely new conversation. await team.reset() await Console(team.run_stream( task="Write a function to merge two sorted arrays." ))
3.2 — SelectorGroupChat
SelectorGroupChat uses a model to dynamically select the next speaker based on the conversation history. This is ideal for multi-expert teams where different agents have different specializations.
How Speaker Selection Works
Multi-Expert Research Team
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") planner = AssistantAgent( name="planner", model_client=model_client, description="A planning agent that breaks tasks into steps and delegates.", system_message="""You are a planning agent. Break down complex tasks into steps. Delegate research to the web_searcher and data analysis to the analyst. Summarize final results. When the task is complete, respond with TERMINATE.""", ) web_searcher = AssistantAgent( name="web_searcher", model_client=model_client, description="A web search agent that finds information online.", system_message="You search the web and return factual information.", ) analyst = AssistantAgent( name="analyst", model_client=model_client, description="A data analyst that processes data and creates insights.", system_message="You analyze data, compute metrics, and provide insights.", ) termination = TextMentionTermination("TERMINATE") | MaxMessageTermination(15) team = SelectorGroupChat( [planner, web_searcher, analyst], model_client=model_client, # Model used for speaker selection termination_condition=termination, ) await Console(team.run_stream( task="Compare the market caps of Apple and Microsoft over the last 5 years." )) await model_client.close() asyncio.run(main())
description parameter to decide who speaks next. Write clear, specific descriptions that explain what each agent is best at.Custom selector_prompt
Override the default selection prompt to give the selector model specific instructions. Use the template variables {participants}, {roles}, and {history}.
team = SelectorGroupChat(
[planner, web_searcher, analyst],
model_client=model_client,
termination_condition=termination,
selector_prompt="""You are the team coordinator. Below are the available agents:
{roles}
Based on the conversation so far:
{history}
Which agent should speak next? Rules:
- The planner should always go first and last.
- The web_searcher should handle any factual lookups.
- The analyst should handle computations.
Reply with ONLY the agent name.""",
)
Custom selector_func (Override Model Selection)
Provide a selector_func to apply hard-coded routing rules. Return an agent name to force that speaker, or None to fall back to model-based selection.
def my_selector(messages) -> str | None: """Always start with the planner. Otherwise, let the model decide.""" if not messages: return "planner" # If the last speaker was the planner, route to web_searcher. if messages[-1].source == "planner": return "web_searcher" return None # Fall back to model-based selection team = SelectorGroupChat( [planner, web_searcher, analyst], model_client=model_client, termination_condition=termination, selector_func=my_selector, )
Filtering Candidates with candidate_func
Use candidate_func to narrow the pool of agents the selector model can choose from, based on conversation state.
def filter_candidates(messages) -> list[str]: """Only allow the planner to wrap up after 10+ messages.""" if len(messages) > 10: return ["planner"] return ["planner", "web_searcher", "analyst"] team = SelectorGroupChat( [planner, web_searcher, analyst], model_client=model_client, termination_condition=termination, candidate_func=filter_candidates, )
allow_repeated_speaker
By default the same agent can speak multiple times in a row. Set allow_repeated_speaker=False to force turn-taking.
team = SelectorGroupChat(
[planner, web_searcher, analyst],
model_client=model_client,
termination_condition=termination,
allow_repeated_speaker=False, # Different agent must speak each turn
)
SelectorGroupChat Key Parameters
| Parameter | Type | Description |
|---|---|---|
participants | list[ChatAgent] | List of agents in the team |
model_client | ChatCompletionClient | Model used for speaker selection (separate from agent models) |
termination_condition | TerminationCondition | When to stop the team conversation |
selector_prompt | str | Custom prompt with {participants}, {roles}, {history} |
selector_func | Callable | Hard-coded routing; return agent name or None for model fallback |
candidate_func | Callable | Filter candidate agents per turn |
allow_repeated_speaker | bool | Allow same agent to speak consecutively (default: True) |
3.3 — Swarm Orchestration
In a Swarm, agents explicitly hand off to one another using HandoffMessage. Each agent declares who it can hand off to via the handoffs parameter. AutoGen automatically generates transfer_to_<agent> tool calls behind the scenes.
How Swarm Routing Works
Customer Support Bot
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination from autogen_agentchat.teams import Swarm from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def refund_flight(flight_id: str) -> str: """Process a flight refund for the given flight ID.""" return f"Refund processed for flight {flight_id}." async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # Triage agent: routes to the right specialist. triage_agent = AssistantAgent( name="triage_agent", model_client=model_client, handoffs=["flights_refunder", "general_support"], system_message="""You are a customer support triage agent. Route flight refund requests to 'flights_refunder'. Route all other questions to 'general_support'. Transfer immediately — do not answer questions yourself.""", ) # Specialist: handles flight refunds. flights_refunder = AssistantAgent( name="flights_refunder", model_client=model_client, tools=[refund_flight], handoffs=["triage_agent", "user"], system_message="""You handle flight refunds. Use the refund_flight tool to process refunds. If you need more information, ask the user by handing off to 'user'. When done, hand off back to 'triage_agent'.""", ) # Specialist: handles general questions. general_support = AssistantAgent( name="general_support", model_client=model_client, handoffs=["triage_agent", "user"], system_message="""You handle general customer support questions. Answer questions directly or hand off to 'user' if you need input. When done, hand off back to 'triage_agent'.""", ) # HandoffTermination stops when an agent hands off to "user". termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE") team = Swarm( [triage_agent, flights_refunder, general_support], termination_condition=termination, ) await Console(team.run_stream( task="I want a refund for flight FL-1234." )) await model_client.close() asyncio.run(main())
"user". This is the standard pattern for human-in-the-loop workflows in Swarm teams. The application can then collect user input and resume with run_stream(task=user_input).Multi-Step Swarm with Chained Handoffs
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.teams import Swarm from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # Pipeline: planner -> researcher -> writer -> planner (summarizes) planner = AssistantAgent( name="planner", model_client=model_client, handoffs=["researcher"], system_message="""You create research plans. Break the topic into key questions, then hand off to 'researcher'. When the writer delivers the final report, summarize it and say TERMINATE.""", ) researcher = AssistantAgent( name="researcher", model_client=model_client, handoffs=["writer"], system_message="""You research the questions from the planner. Provide detailed findings, then hand off to 'writer'.""", ) writer = AssistantAgent( name="writer", model_client=model_client, handoffs=["planner"], system_message="""You write polished reports from research findings. Write the report and hand off to 'planner' for final review.""", ) termination = TextMentionTermination("TERMINATE") team = Swarm([planner, researcher, writer], termination_condition=termination) await Console(team.run_stream( task="Write a report on the current state of quantum computing." )) await model_client.close() asyncio.run(main())
3.4 — Termination Conditions
Termination conditions control when a team stops. AutoGen provides several built-in conditions that can be composed with | (OR) and & (AND) operators.
Built-in Termination Conditions
| Condition | Import | Stops when… |
|---|---|---|
MaxMessageTermination | autogen_agentchat.conditions | Total message count reaches the limit |
TextMentionTermination | autogen_agentchat.conditions | A message contains the specified text (e.g., “TERMINATE”) |
TokenUsageTermination | autogen_agentchat.conditions | Cumulative token usage exceeds the budget |
TimeoutTermination | autogen_agentchat.conditions | Wall-clock time exceeds the limit (in seconds) |
HandoffTermination | autogen_agentchat.conditions | An agent hands off to the specified target |
ExternalTermination | autogen_agentchat.conditions | Externally triggered by calling .set() from another task |
Composing Conditions with | (OR) and & (AND)
from autogen_agentchat.conditions import ( MaxMessageTermination, TextMentionTermination, TokenUsageTermination, TimeoutTermination, ) # OR: Stop when ANY condition is met. termination = ( TextMentionTermination("TERMINATE") | MaxMessageTermination(20) | TimeoutTermination(120) # 2-minute timeout ) # AND: Stop only when ALL conditions are met simultaneously. termination = ( TextMentionTermination("DONE") & TokenUsageTermination(max_total_token=5000) )
TextMentionTermination) with a safety limit (MaxMessageTermination or TimeoutTermination) using |. This prevents runaway conversations if the agent never says the stop word.ExternalTermination for Async Control
ExternalTermination lets external code (e.g., a web server or UI) stop a running team.
import asyncio from autogen_agentchat.conditions import ExternalTermination, MaxMessageTermination from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="chatbot", model_client=model_client, system_message="You are a helpful assistant.", ) external = ExternalTermination() termination = external | MaxMessageTermination(50) team = RoundRobinGroupChat([agent], termination_condition=termination) # In a separate task, stop the team after 5 seconds. async def stop_after_delay(): await asyncio.sleep(5) external.set() # Triggers termination asyncio.create_task(stop_after_delay()) await Console(team.run_stream(task="Count to infinity.")) await model_client.close() asyncio.run(main())
Accessing the TaskResult
Every run() call returns a TaskResult object with the full message history and stop reason.
from autogen_agentchat.conditions import MaxMessageTermination result = await team.run(task="Solve the traveling salesman problem.") print(result.stop_reason) # e.g., "Maximal number of messages 20 reached" for msg in result.messages: print(f"[{msg.source}]", msg.content[:100])
Module 4: Memory & State Management
AutoGen agents are stateless by default — each run() starts with a blank context window. The Memory protocol injects persistent knowledge before the LLM is called, while save_state / load_state lets you serialize and restore full conversation history across sessions.
4.1 — Memory Protocol & Memory Stores
The Memory protocol (from autogen_core.memory) defines a unified interface for all memory back-ends. Memory is queried automatically at the start of each agent run(), and matching entries are injected as a SystemMessage into the model context.
Memory Protocol Methods
| Method | Signature | Description |
|---|---|---|
add | async add(content: MemoryContent, cancellation_token) | Store a new memory entry |
query | async query(query: str | MemoryContent, cancellation_token) → list[MemoryContent] | Retrieve matching memories |
update_context | async update_context(model_context: ChatCompletionContext) | Inject retrieved memories into LLM context (called internally) |
clear | async clear() | Remove all entries from the store |
close | async close() | Release resources (DB connections, etc.) |
ListMemory — Simple Chronological Memory
ListMemory is the simplest built-in memory: it stores entries in a list and appends the most recent k memories to the model context. No vector DB or embeddings required.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.ui import Console from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import MaxMessageTermination from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # Create a ListMemory and seed it with user preferences. user_memory = ListMemory() await user_memory.add(MemoryContent( content="User prefers dark-roast coffee and avoids dairy.", mime_type=MemoryMimeType.TEXT, metadata={"category": "preferences"}, )) await user_memory.add(MemoryContent( content="User's birthday is March 15.", mime_type=MemoryMimeType.TEXT, metadata={"category": "personal"}, )) # Pass the memory to the agent. agent = AssistantAgent( name="barista_bot", model_client=model_client, system_message="You are a helpful barista assistant.", memory=[user_memory], # List of Memory objects ) team = RoundRobinGroupChat( [agent], termination_condition=MaxMessageTermination(2), ) # The agent sees the stored memories as a SystemMessage. await Console(team.run_stream( task="Recommend me a drink for this morning." )) await model_client.close() asyncio.run(main())
memory.update_context() which formats all matching entries into a SystemMessage: “Relevant memory content (in chronological order): 1. User prefers dark-roast coffee…”. A MemoryQueryEvent is emitted so you can observe what was retrieved.ChromaDB Vector Memory
ChromaDBVectorMemory uses vector embeddings for semantic retrieval — ideal when you have many memories and need the most relevant ones, not just the most recent.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.ui import Console from autogen_ext.memory.chromadb import ChromaDBVectorMemory, PersistentChromaDBVectorMemoryConfig from autogen_ext.memory.chromadb import SentenceTransformerEmbeddingFunctionConfig from autogen_core.memory import MemoryContent, MemoryMimeType from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # Configure persistent ChromaDB with sentence-transformer embeddings. chroma_memory = ChromaDBVectorMemory( config=PersistentChromaDBVectorMemoryConfig( collection_name="project_memory", persistence_path="./chroma_store", k=3, # Return top 3 matches score_threshold=0.4, # Minimum similarity score embedding_function_config=SentenceTransformerEmbeddingFunctionConfig( model_name="all-MiniLM-L6-v2", ), ) ) # Add a library of knowledge entries. await chroma_memory.add(MemoryContent( content="AutoGen agents support tool calling via Python functions.", mime_type=MemoryMimeType.TEXT, metadata={"topic": "tools"}, )) await chroma_memory.add(MemoryContent( content="SelectorGroupChat uses an LLM to pick the next speaker.", mime_type=MemoryMimeType.TEXT, metadata={"topic": "teams"}, )) agent = AssistantAgent( name="docs_assistant", model_client=model_client, system_message="You answer AutoGen questions using your memory.", memory=[chroma_memory], ) team = RoundRobinGroupChat( [agent], termination_condition=MaxMessageTermination(2), ) # Query will semantically match the most relevant memories. await Console(team.run_stream( task="How do agents use tools in AutoGen?" )) await chroma_memory.close() await model_client.close() asyncio.run(main())
pip install "autogen-ext[chromadb]". For Redis: pip install "autogen-ext[redis]". For Mem0: pip install "autogen-ext[mem0]".Redis Vector Memory
RedisMemory provides a production-ready vector store backed by Redis. Requires a running Redis instance with the RediSearch module.
from autogen_ext.memory.redis import RedisMemory, RedisMemoryConfig redis_memory = RedisMemory( config=RedisMemoryConfig( redis_url="redis://localhost:6379", index_name="agent_memory", prefix="autogen:", ) ) # Use exactly like ChromaDBVectorMemory: agent = AssistantAgent( name="redis_agent", model_client=model_client, memory=[redis_memory], )
Mem0 Cloud Memory
Mem0Memory integrates with Mem0.ai for managed memory with automatic extraction and retrieval.
from autogen_ext.memory.mem0 import Mem0Memory # Cloud-hosted Mem0 (requires MEM0_API_KEY env variable). mem0_memory = Mem0Memory( is_cloud=True, limit=5, # Max entries to retrieve per query ) agent = AssistantAgent( name="mem0_agent", model_client=model_client, memory=[mem0_memory], )
Memory Back-end Comparison
| Back-end | Import | Retrieval | Persistence | Best For |
|---|---|---|---|---|
ListMemory |
autogen_core.memory |
Most recent k | In-process only | Prototyping, small context |
ChromaDBVectorMemory |
autogen_ext.memory.chromadb |
Semantic similarity | Local file or server | RAG, large knowledge bases |
RedisMemory |
autogen_ext.memory.redis |
Semantic similarity | Redis server | Production, shared state |
Mem0Memory |
autogen_ext.memory.mem0 |
AI-managed | Mem0 cloud / local | Managed, auto-extraction |
RAG Pattern with ChromaDB Memory
For full Retrieval-Augmented Generation, chunk documents and store them in a vector memory. The agent automatically retrieves the most relevant chunks at query time.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.ui import Console from autogen_ext.memory.chromadb import ChromaDBVectorMemory, PersistentChromaDBVectorMemoryConfig from autogen_ext.memory.chromadb import SentenceTransformerEmbeddingFunctionConfig from autogen_core.memory import MemoryContent, MemoryMimeType from autogen_ext.models.openai import OpenAIChatCompletionClient def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]: """Split text into overlapping chunks.""" chunks = [] start = 0 while start < len(text): end = start + chunk_size chunks.append(text[start:end]) start = end - overlap return chunks async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") chroma_memory = ChromaDBVectorMemory( config=PersistentChromaDBVectorMemoryConfig( collection_name="docs_rag", persistence_path="./rag_store", k=5, score_threshold=0.3, embedding_function_config=SentenceTransformerEmbeddingFunctionConfig( model_name="all-MiniLM-L6-v2", ), ) ) # Ingest a document as chunked memory entries. document = "Your long document text goes here..." for i, chunk in enumerate(chunk_text(document)): await chroma_memory.add(MemoryContent( content=chunk, mime_type=MemoryMimeType.TEXT, metadata={"source": "docs", "chunk_index": i}, )) # Agent retrieves relevant chunks at query time. rag_agent = AssistantAgent( name="rag_agent", model_client=model_client, system_message="Answer questions using the provided memory context.", memory=[chroma_memory], ) team = RoundRobinGroupChat( [rag_agent], termination_condition=MaxMessageTermination(2), ) await Console(team.run_stream( task="Summarize the key points from the document." )) await chroma_memory.close() await model_client.close() asyncio.run(main())
Serializing Memory Configuration
All memory back-ends support component serialization for persistence and portability:
# Serialize memory configuration to JSON. config_json = chroma_memory.dump_component().model_dump_json() print(config_json) # Reconstruct from JSON later. from autogen_core import ComponentLoader restored_memory = ComponentLoader.load_component(json.loads(config_json))
4.2 — State Management
While memory provides persistent knowledge, state management preserves the full conversation history and internal data of agents and teams. Use save_state() and load_state() to serialize an entire session and resume it later.
Agent State: Save & Restore
save_state() returns a dictionary containing the agent’s LLM message history. Load it into a new agent to continue the conversation.
import asyncio import json from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import MaxMessageTermination from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="assistant", model_client=model_client, system_message="You are a helpful assistant.", ) team = RoundRobinGroupChat( [agent], termination_condition=MaxMessageTermination(2), ) # Run a conversation. await team.run(task="What is the capital of France?") # Save the agent's state (contains LLM message history). agent_state = await agent.save_state() print(agent_state) # {'type': 'AssistantAgentState', 'version': '1.0.0', 'llm_messages': [...]} # Later: create a new agent and restore the state. new_agent = AssistantAgent( name="assistant", model_client=model_client, system_message="You are a helpful assistant.", ) await new_agent.load_state(agent_state) # new_agent now has the full conversation history. await model_client.close() asyncio.run(main())
Team State: Save & Restore All Agents
team.save_state() captures the state of every agent in the team, plus the team’s own orchestration state.
import asyncio import json from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import MaxMessageTermination from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") coder = AssistantAgent( name="coder", model_client=model_client, system_message="You write Python code.", ) reviewer = AssistantAgent( name="reviewer", model_client=model_client, system_message="You review code for bugs and improvements.", ) team = RoundRobinGroupChat( [coder, reviewer], termination_condition=MaxMessageTermination(4), ) await team.run(task="Write a Fibonacci function and review it.") # Save the full team state (all agents + team orchestration). team_state = await team.save_state() # {'type': 'TeamState', 'version': '1.0.0', 'agent_states': {...}} # Persist to disk as JSON. with open("team_state.json", "w") as f: json.dump(team_state, f, indent=2) # --- Later: restore the team from JSON --- with open("team_state.json", "r") as f: loaded_state = json.load(f) # Rebuild the team with the same agents. new_coder = AssistantAgent( name="coder", model_client=model_client, system_message="You write Python code.", ) new_reviewer = AssistantAgent( name="reviewer", model_client=model_client, system_message="You review code for bugs and improvements.", ) new_team = RoundRobinGroupChat( [new_coder, new_reviewer], termination_condition=MaxMessageTermination(4), ) # Restore all agent states and team state. await new_team.load_state(loaded_state) # Continue the conversation from where it left off. await new_team.run(task="Now optimize the Fibonacci function.") await model_client.close() asyncio.run(main())
State Management Quick Reference
| Operation | Code | Returns / Effect |
|---|---|---|
| Save agent | await agent.save_state() |
{'type': 'AssistantAgentState', 'llm_messages': [...]} |
| Load agent | await agent.load_state(state) |
Restores conversation history into agent |
| Save team | await team.save_state() |
{'type': 'TeamState', 'agent_states': {…}} |
| Load team | await team.load_state(state) |
Restores all agent states + team orchestration |
| Reset team | await team.reset() |
Clears all state (conversation history lost) |
| Persist to disk | json.dump(state, f) |
State dicts are JSON-serializable |
save_state() and load_state() to persist their own internal data.Module 5: Human-in-the-Loop & Handoffs
Fully autonomous agents aren’t always desirable. AutoGen provides two patterns for human involvement: inline feedback during a run (via UserProxyAgent) and feedback between runs (via max_turns or HandoffTermination). Choose the approach that matches your application’s needs.
5.1 — Human-in-the-Loop Patterns
UserProxyAgent blocks execution and collects input inline. Simple for scripts but puts the team in an unstable state that cannot be saved.max_turns or HandoffTermination stops the team cleanly. The app collects input, then starts a new run(). Supports state save/restore.UserProxyAgent — Inline Feedback
UserProxyAgent participates in the team like any other agent but blocks to collect human input. Pass an input_func to customize how input is gathered (console, WebSocket, etc.).
import asyncio from autogen_agentchat.agents import AssistantAgent, UserProxyAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") assistant = AssistantAgent( name="assistant", model_client=model_client, system_message="You are a helpful assistant. Say APPROVE when the user approves.", ) # UserProxyAgent blocks to get user input via the console. user_proxy = UserProxyAgent( name="user_proxy", input_func=input, # Built-in console input ) team = RoundRobinGroupChat( [assistant, user_proxy], termination_condition=TextMentionTermination("APPROVE"), ) await Console(team.run_stream( task="Draft a project proposal for a REST API." )) await model_client.close() asyncio.run(main())
save_state(). Use this pattern only for short, interactive console sessions. For web apps and production systems, prefer the max_turns or HandoffTermination patterns below.Custom input_func for Web Applications
Replace the default input with an async function that awaits user input from a WebSocket, message queue, or other async source:
from autogen_core import CancellationToken async def web_input_func(prompt: str, cancellation_token: CancellationToken | None = None) -> str: """Await user input from a WebSocket connection.""" await websocket.send_json({"type": "input_request", "prompt": prompt}) response = await websocket.receive_text() return response user_proxy = UserProxyAgent( name="user_proxy", input_func=web_input_func, )
max_turns Pattern — Turn-Based Feedback Loop
Set max_turns on any team to stop after a fixed number of agent turns. The application collects feedback and starts a new run(). Agent state is preserved across runs, but the turn count resets.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="assistant", model_client=model_client, system_message="You are a helpful assistant.", ) # max_turns=1 means the team stops after ONE agent response. team = RoundRobinGroupChat( [agent], max_turns=1, ) # Interactive loop: agent responds, user gives feedback. task = "Write a haiku about coding." while True: await Console(team.run_stream(task=task)) task = input("Your feedback (type 'exit' to quit): ") if task.strip().lower() == "exit": break await model_client.close() asyncio.run(main())
RoundRobinGroupChat, SelectorGroupChat, and Swarm all support the max_turns parameter. The turn count resets after each run() call, but agent state (conversation history) is preserved.5.2 — Agent Handoffs
The HandoffTermination pattern gives agents the ability to explicitly transfer control to a human (or to another external entity). This is more flexible than max_turns — the agent decides when to involve the human.
Handoff to User Pattern
Add a Handoff object targeting "user" to the agent’s handoffs list. When the agent determines it needs human input, it generates a HandoffMessage. The HandoffTermination condition detects this and stops the team.
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.base import Handoff from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( name="travel_agent", model_client=model_client, handoffs=[Handoff(target="user", message="Transfer to user for clarification.")], system_message="""You are a travel planning assistant. If you need more information from the user, use the handoff to transfer to 'user'. When the trip plan is complete, say TERMINATE.""", ) termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE") team = RoundRobinGroupChat( [agent], termination_condition=termination, ) # Interactive loop driven by agent handoffs. task = "Plan a 5-day trip to Japan." while True: result = await Console(team.run_stream(task=task)) # Check if the team stopped due to a handoff to "user". if result.stop_reason and "handoff" in result.stop_reason.lower(): task = input("Your response: ") if task.strip().lower() == "exit": break else: # Agent said TERMINATE or other stop reason. break await model_client.close() asyncio.run(main())
Handoff in Swarm Teams
In a Swarm, agents hand off to each other by name. When an agent hands off to "user", the team stops. To resume, you must send a HandoffMessage targeting the agent that should continue:
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination from autogen_agentchat.messages import HandoffMessage from autogen_agentchat.teams import Swarm from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") booking_agent = AssistantAgent( name="booking_agent", model_client=model_client, handoffs=["user"], # Shorthand for Handoff(target="user") system_message="""You handle hotel bookings. Ask the user for dates and preferences by handing off to 'user'. When booking is confirmed, say TERMINATE.""", ) termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE") team = Swarm([booking_agent], termination_condition=termination) # First run. result = await Console(team.run_stream( task="I need a hotel in Tokyo." )) # Resume after handoff in a Swarm: use HandoffMessage. while "handoff" in (result.stop_reason or "").lower(): user_input = input("Your response: ") if user_input.strip().lower() == "exit": break # Target the agent that should receive the user's response. result = await Console(team.run_stream( task=HandoffMessage( source="user", target="booking_agent", content=user_input, ) )) await model_client.close() asyncio.run(main())
HandoffTermination, wrap the user’s response in a HandoffMessage(source="user", target="agent_name", content=...). For RoundRobinGroupChat and SelectorGroupChat, a plain string task=user_input is sufficient.HITL Pattern Comparison
| Pattern | Feedback Timing | State Saveable? | Best For |
|---|---|---|---|
UserProxyAgent |
During run (blocking) | ❌ No | Quick console scripts |
max_turns |
Between runs (automatic) | ✅ Yes | Iterative refinement loops |
HandoffTermination |
Between runs (agent-initiated) | ✅ Yes | Agent decides when to ask human |
Web Framework Integration
AutoGen provides sample integrations for building production HITL applications with popular Python web frameworks:
HandoffTermination + custom input_func for real-time feedback.cl.on_message handler calling team.run_stream().st.chat_input(). Use max_turns=1 pattern for natural turn-taking.HandoffTermination or max_turns pattern instead of UserProxyAgent. These patterns allow clean state serialization between runs, support async I/O natively, and don’t block the event loop. Combine with save_state() / load_state() from Module 4 for fully resumable sessions.Module 6: Advanced Tooling & MCP
Compose agents as callable tools and integrate external MCP servers
6.1 — AgentTool: Wrap an Agent as a Tool
AgentTool lets you wrap any BaseChatAgent so it can be called as a tool by another agent. The outer agent’s model decides when to invoke the inner agent, passing a task string and receiving the result — enabling dynamic, model-driven multi-agent workflows without a fixed team structure.
AgentTool (or TeamTool), you must set parallel_tool_calls=False on the model client. Agents maintain internal state that conflicts with parallel execution.import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.tools import AgentTool
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# Inner agent — a specialist writer
writer = AssistantAgent(
name="writer",
description="A writer agent for generating polished text.",
model_client=model_client,
system_message="Write concise, well-structured content.",
)
# Wrap the writer as a callable tool
writer_tool = AgentTool(agent=writer)
# Outer agent — must disable parallel tool calls
main_model = OpenAIChatCompletionClient(
model="gpt-4o",
parallel_tool_calls=False, # Required for AgentTool
)
orchestrator = AssistantAgent(
name="orchestrator",
model_client=main_model,
tools=[writer_tool],
system_message="Delegate writing tasks to the writer tool.",
)
await Console(orchestrator.run_stream(task="Write a haiku about distributed systems."))
await model_client.close()
asyncio.run(main())
AgentTool returns all messages from the inner agent’s run, prefixed by source. Set return_value_as_last_message=True to return only the final message — ideal when you want a clean, single-string result.6.2 — TeamTool: Wrap an Entire Team as a Tool
TeamTool takes the concept further — wrapping a full BaseGroupChat team (with its own agents, termination conditions, and workflow) as a single callable tool. The outer agent triggers the entire team run and receives the aggregated result.
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import SourceMatchTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.tools import TeamTool
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# Build a review team: writer → reviewer → summarizer
writer = AssistantAgent(
name="writer",
model_client=model_client,
system_message="Draft clear, engaging content.",
)
reviewer = AssistantAgent(
name="reviewer",
model_client=model_client,
system_message="Critically review the draft and suggest improvements.",
)
summarizer = AssistantAgent(
name="summarizer",
model_client=model_client,
system_message="Combine feedback and produce a final revised version.",
)
team = RoundRobinGroupChat(
[writer, reviewer, summarizer],
termination_condition=SourceMatchTermination(sources=["summarizer"]),
)
# Wrap the entire team as a single tool
writing_team_tool = TeamTool(
team=team,
name="writing_team",
description="A collaborative writing team for high-quality content.",
return_value_as_last_message=True,
)
# Outer orchestrator agent
main_model = OpenAIChatCompletionClient(
model="gpt-4o",
parallel_tool_calls=False, # Required for TeamTool
)
orchestrator = AssistantAgent(
name="orchestrator",
model_client=main_model,
tools=[writing_team_tool],
system_message="Use the writing team tool to produce polished content.",
)
await Console(
orchestrator.run_stream(task="Write a blog post about AI agents in 2025.")
)
await model_client.close()
asyncio.run(main())
AgentTool vs TeamTool — Comparison
| Feature | AgentTool | TeamTool |
|---|---|---|
| Wraps | Single BaseChatAgent | Full BaseGroupChat team |
| Name/Description | Auto-derived from agent | Explicitly provided |
Requires parallel_tool_calls=False | Yes | Yes |
| Inner termination | Single agent run completes | Team termination condition |
| Use case | Specialist delegation | Complex multi-step workflows |
| Serializable | Yes (dump_component()) | Yes (dump_component()) |
TeamTool team can contain agents that themselves use AgentTool, creating deep hierarchies. The outer model dynamically decides which sub-agent or sub-team to invoke based on the task.6.3 — Model Context Protocol (MCP) Integration
The Model Context Protocol is an open standard that lets LLM applications connect to external tool servers. AutoGen provides first-class MCP support through autogen_ext.tools.mcp, allowing agents to use any MCP-compatible server’s tools.
pip install -U "autogen-ext[mcp]"
Transport Types
StdioServerParams
Launch a local process (e.g., npx, uvx) communicating over stdin/stdout. Best for local CLI tools.
SseServerParams
Connect to a remote MCP server via HTTP Server-Sent Events. Ideal for cloud-hosted tool services.
StreamableHttpServerParams
Connect via Streamable HTTP with bidirectional streaming. Modern alternative to SSE for remote servers.
Approach 1: McpWorkbench (Recommended)
McpWorkbench is the preferred way to connect agents to MCP servers. It manages the session lifecycle, supports tool listing, tool calling, resources, prompts, and more — all as an async context manager.
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.tools.mcp import McpWorkbench, StdioServerParams
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# Connect to the mcp-server-fetch via stdio
server_params = StdioServerParams(
command="uvx",
args=["mcp-server-fetch"],
read_timeout_seconds=60,
)
async with McpWorkbench(server_params) as workbench:
agent = AssistantAgent(
name="fetcher",
model_client=model_client,
workbench=workbench,
reflect_on_tool_use=True,
)
result = await agent.run(
task="Summarize the content of https://en.wikipedia.org/wiki/Seattle"
)
print(result.messages[-1].content)
await model_client.close()
asyncio.run(main())
McpWorkbench exposes more than just tools: list_resources(), read_resource(), list_prompts(), get_prompt(), and list_resource_templates() are all available on the workbench session.Approach 2: mcp_server_tools() Factory
For simpler use cases, the mcp_server_tools() factory creates a list of tool adapters that can be passed directly to an agent’s tools= parameter.
import asyncio
from pathlib import Path
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.tools.mcp import StdioServerParams, mcp_server_tools
from autogen_core import CancellationToken
async def main() -> None:
# Local filesystem MCP server
desktop = str(Path.home() / "Desktop")
server_params = StdioServerParams(
command="npx.cmd",
args=["-y", "@modelcontextprotocol/server-filesystem", desktop],
)
# Get all tools from the server
tools = await mcp_server_tools(server_params)
agent = AssistantAgent(
name="file_manager",
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
tools=tools,
)
await agent.run(
task="Create a file called test.txt with some content",
cancellation_token=CancellationToken(),
)
asyncio.run(main())
Shared MCP Sessions
When the MCP server maintains session state (e.g., a browser via Playwright), you can share a single session across multiple tool calls using create_mcp_server_session():
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.tools.mcp import (
StdioServerParams,
create_mcp_server_session,
mcp_server_tools,
)
async def main() -> None:
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
parallel_tool_calls=False,
)
params = StdioServerParams(
command="npx",
args=["@playwright/mcp@latest"],
read_timeout_seconds=60,
)
# Shared session — browser state preserved across calls
async with create_mcp_server_session(params) as session:
await session.initialize()
tools = await mcp_server_tools(server_params=params, session=session)
agent = AssistantAgent(
name="browser_agent",
model_client=model_client,
tools=tools,
)
termination = TextMentionTermination("TERMINATE")
team = RoundRobinGroupChat([agent], termination_condition=termination)
await Console(
team.run_stream(task="Go to https://github.com and describe the page.")
)
await model_client.close()
asyncio.run(main())
Remote SSE & Streamable HTTP
For remote MCP servers, use SseServerParams or StreamableHttpServerParams:
from autogen_ext.tools.mcp import SseServerParams, McpWorkbench
# SSE-based remote server
sse_params = SseServerParams(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer your-api-key"},
timeout=30,
)
from autogen_ext.tools.mcp import StreamableHttpServerParams, McpWorkbench
# Streamable HTTP-based remote server
http_params = StreamableHttpServerParams(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer your-api-key"},
timeout=30.0,
sse_read_timeout=300.0,
terminate_on_close=True,
)
MCP Approach Comparison
| Feature | McpWorkbench | mcp_server_tools() |
|---|---|---|
| Session management | Built-in (context manager) | Manual / auto per-call |
| Passed via | workbench= parameter | tools= parameter |
| Resources & Prompts | Yes | No (tools only) |
| Tool overrides | Yes (tool_overrides=) | No |
| Shared sessions | Built-in | Via create_mcp_server_session() |
| Serializable | Yes (dump_component()) | Yes (per adapter) |
StdioServerParams executes commands in your local environment. Always validate server origins and never pass untrusted input directly to MCP tool arguments.Module 7: Serializing Components & AutoGen Studio
Export agents, teams, and tools to declarative JSON configs for sharing and no-code Studio import
7.1 — The Component Protocol
Every major AutoGen building block — agents, teams, model clients, termination conditions, tools, workbenches — implements the Component protocol. This gives each object two key methods:
| Method | Direction | Purpose |
|---|---|---|
dump_component() | Object → Config | Serialize to a ComponentModel (dict/JSON) |
load_component(config) | Config → Object | Reconstruct a live object from config |
_to_config() | Object → Config | Internal: produces the typed config dataclass |
_from_config(config) | Config → Object | Internal: class method to rebuild from config |
7.2 — Serializing Termination Conditions
Termination conditions are fully serializable, including composed conditions built with | (OR) and & (AND).
from autogen_agentchat.conditions import (
MaxMessageTermination,
StopMessageTermination,
)
max_term = MaxMessageTermination(5)
stop_term = StopMessageTermination()
# Compose with OR
combined = max_term | stop_term
# Serialize to JSON
config = combined.dump_component()
print(config.model_dump_json(indent=2))
# Deserialize back to a live object
restored = combined.load_component(config)
print(type(restored))
# <class 'autogen_agentchat.base.OrTerminationCondition'>
The JSON output captures the full tree:
{
"provider": "autogen_agentchat.base.OrTerminationCondition",
"component_type": "termination",
"version": 1,
"config": {
"conditions": [
{
"provider": "autogen_agentchat.conditions.MaxMessageTermination",
"component_type": "termination",
"config": { "max_messages": 5 }
},
{
"provider": "autogen_agentchat.conditions.StopMessageTermination",
"component_type": "termination",
"config": {}
}
]
}
}
7.3 — Serializing Agents
Agents serialize their full configuration — model client, system message, handoffs, model context, and more.
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
handoffs=["flights_refunder", "user"],
system_message="Use tools to solve tasks.",
)
# Serialize
agent_config = agent.dump_component()
print(agent_config.model_dump_json(indent=2))
# Deserialize
restored_agent = AssistantAgent.load_component(agent_config)
print(restored_agent.name) # "assistant"
- Model client — provider, model name, config (API keys are included if set directly)
- System message — full text
- Handoffs — target names, descriptions, transfer messages
- Model context — context type and settings (e.g., buffer size)
- Tools — serializable tools only; plain Python function tools are not yet supported
7.4 — Serializing Teams
Teams serialize their entire participant graph, termination conditions, and configuration — producing a single JSON that can recreate the full multi-agent system.
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
system_message="You are a helpful assistant.",
)
team = RoundRobinGroupChat(
participants=[agent],
termination_condition=MaxMessageTermination(2),
)
# Serialize the entire team to JSON
team_config = team.dump_component()
print(team_config.model_dump_json(indent=2))
# Rebuild the team from JSON
restored_team = RoundRobinGroupChat.load_component(team_config)
Team Config Structure
The resulting JSON nests every component recursively:
{
"provider": "autogen_agentchat.teams.RoundRobinGroupChat",
"component_type": "team",
"version": 1,
"config": {
"participants": [
{
"provider": "autogen_agentchat.agents.AssistantAgent",
"component_type": "agent",
"config": {
"name": "assistant",
"model_client": {
"provider": "autogen_ext.models.openai.OpenAIChatCompletionClient",
"component_type": "model",
"config": { "model": "gpt-4o" }
},
"system_message": "You are a helpful assistant."
}
}
],
"termination_condition": {
"provider": "autogen_agentchat.conditions.MaxMessageTermination",
"component_type": "termination",
"config": { "max_messages": 2 }
}
}
}
7.5 — Save to File & Load from File
A practical pattern for persisting configs to disk:
import json
from pathlib import Path
from autogen_agentchat.teams import RoundRobinGroupChat
# Save to file
def save_team_config(team, filepath: str) -> None:
config = team.dump_component()
Path(filepath).write_text(
config.model_dump_json(indent=2), encoding="utf-8"
)
# Load from file
def load_team_config(filepath: str) -> RoundRobinGroupChat:
from autogen_core import ComponentModel
raw = json.loads(Path(filepath).read_text(encoding="utf-8"))
config = ComponentModel(**raw)
return RoundRobinGroupChat.load_component(config)
# Usage
save_team_config(team, "my_team.json")
loaded_team = load_team_config("my_team.json")
7.6 — AutoGen Studio Integration
AutoGen Studio is the no-code/low-code UI for building and running AutoGen workflows. The JSON configs produced by dump_component() are the native format that Studio understands — meaning you can build in code, export to JSON, and import directly into Studio (and vice versa).
Code → Studio
- Build your team in Python
- Call
team.dump_component() - Save the JSON to a file
- Open AutoGen Studio → Import config
- Run, test, and iterate visually
Studio → Code
- Design your team in Studio UI
- Export the JSON config
- Call
Team.load_component(config) - Extend with custom tools/logic in Python
- Deploy to production
Serializable Component Types
| Component | component_type | Example Provider |
|---|---|---|
| Model Client | "model" | autogen_ext.models.openai.OpenAIChatCompletionClient |
| Agent | "agent" | autogen_agentchat.agents.AssistantAgent |
| Team | "team" | autogen_agentchat.teams.RoundRobinGroupChat |
| Termination | "termination" | autogen_agentchat.conditions.MaxMessageTermination |
| Tool (AgentTool) | "tool" | autogen_agentchat.tools.AgentTool |
| Tool (TeamTool) | "tool" | autogen_agentchat.tools.TeamTool |
| MCP Workbench | "workbench" | autogen_ext.tools.mcp.McpWorkbench |
| Model Context | "chat_completion_context" | autogen_core.model_context.BufferedChatCompletionContext |
- Plain Python function tools are not yet serializable — use
AgentTool/TeamToolor MCP tools for serializable workflows selector_funcinSelectorGroupChatis ignored during serialization- API keys embedded in configs are stored in plain text — use environment variables in production