Microsoft Agent Framework Python - Recipes and Code Patterns
Microsoft Agent Framework Python — Recipes and Code Patterns
Section titled “Microsoft Agent Framework Python — Recipes and Code Patterns”Practical, runnable patterns for the real agent_framework package. Every recipe targets the public surface of the latest release; nothing here relies on private modules.
- Package:
agent-framework(the umbrella distribution that pulls inagent-framework-coreplus every official provider). Imports root atagent_framework. - Pinned version:
agent-framework==1.2.2. Check the latest withpip index versions agent-framework. - Python: 3.10+ — the entire package uses
from __future__ import annotations, the|union syntax, and modern asyncio. - Verified APIs:
Agent,AgentSession,FileHistoryProvider,FileCheckpointStorage,MCPStdioTool,MCPStreamableHTTPTool,AgentMiddleware,FunctionMiddleware,SkillsProvider,Skill,LocalEvaluator,WorkflowBuilder.
pip install agent-framework# Or pin explicitly:pip install 'agent-framework==1.2.2'Beginner
Section titled “Beginner”Recipe 1 — Hello agent (single-turn, multi-turn)
Section titled “Recipe 1 — Hello agent (single-turn, multi-turn)”The minimal agent. The same Agent class handles both stateless one-shot calls and stateful conversations — the difference is whether you pass a session=.
import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent( client=OpenAIChatClient(), # reads OPENAI_API_KEY instructions="You are a friendly assistant. Keep responses concise.", )
# Stateless — no history. one_shot = await agent.run("In one sentence: what is asyncio?") print(one_shot.text)
# Stateful — pass a session to keep the thread alive across turns. session = agent.create_session() await agent.run("My favourite colour is teal.", session=session) follow_up = await agent.run("What did I just tell you?", session=session) print(follow_up.text) # mentions teal
if __name__ == "__main__": asyncio.run(main())Swap OpenAIChatClient for AzureOpenAIChatClient, FoundryChatClient, AnthropicClient, OllamaChatClient, or BedrockChatClient to switch providers — every chat client implements the same SupportsChatGetResponse protocol so the rest of your code is identical.
Recipe 2 — Streaming responses
Section titled “Recipe 2 — Streaming responses”agent.run(stream=True) returns a ResponseStream[AgentResponseUpdate, AgentResponse]. There is no agent.run_stream(...).
import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent(client=OpenAIChatClient(), instructions="You are a tutor.")
stream = agent.run("Explain backpressure in 3 paragraphs.", stream=True) async for update in stream: if update.text: print(update.text, end="", flush=True) print()
# The assembled final response — finish_reason, full text, tool calls. final = await stream.get_final_response() print(f"\nfinish_reason={final.finish_reasons}")
if __name__ == "__main__": asyncio.run(main())stream.get_final_response() waits until consumption finishes and returns the joined AgentResponse. For HITL approvals mid-stream use await stream.send_response(approval) instead of starting a new run.
Recipe 3 — A tool with typed arguments and error handling
Section titled “Recipe 3 — A tool with typed arguments and error handling”@tool builds a JSON schema from your signature. Raise on bad inputs — the exception message becomes the model’s tool-call result so it can recover.
import asynciofrom typing import Annotatedfrom agent_framework import Agent, toolfrom agent_framework.openai import OpenAIChatClient
@tool(description="Divide two numbers.")def divide( numerator: Annotated[float, "The dividend"], denominator: Annotated[float, "The divisor — must not be zero"],) -> str: if denominator == 0: raise ValueError("denominator must be non-zero") return f"{numerator / denominator:.4f}"
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You are a math assistant. Use tools when arithmetic is needed.", tools=[divide], ) print((await agent.run("What is 22 over 7?")).text) print((await agent.run("What is 10 divided by 0?")).text)
if __name__ == "__main__": asyncio.run(main())The model sees the Annotated strings as parameter descriptions in the JSON schema. For richer constraints use pydantic.Field(description=..., ge=..., le=...) instead — the decorator picks up either form.
Recipe 4 — Structured output with Pydantic
Section titled “Recipe 4 — Structured output with Pydantic”Force a schema-conforming response. response.value is the validated Pydantic instance.
import asynciofrom pydantic import BaseModel, Fieldfrom agent_framework import Agent, ChatOptionsfrom agent_framework.openai import OpenAIChatClient
class Profile(BaseModel): name: str = Field(description="Full name") age: int = Field(ge=0, le=130) interests: list[str]
async def main() -> None: agent = Agent(client=OpenAIChatClient(), instructions="Extract structured data.")
response = await agent.run( "I'm Jane Doe, 28, and I love hiking and Python.", options=ChatOptions(response_format=Profile), ) profile: Profile = response.value print(profile.model_dump_json(indent=2))
if __name__ == "__main__": asyncio.run(main())Pass response_format= via ChatOptions (or set it as a default on the chat client). For streaming structured output, the same option works against agent.run(..., stream=True) — the framework lazily validates final.value on the final update.
Intermediate
Section titled “Intermediate”Recipe 5 — Persistent sessions on disk
Section titled “Recipe 5 — Persistent sessions on disk”Default sessions live in memory. FileHistoryProvider writes one JSONL file per session_id and survives restarts.
import asynciofrom agent_framework import Agent, FileHistoryProviderfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: history = FileHistoryProvider( storage_path="./conversations", # directory created on demand skip_excluded=True, # honour CompactionProvider exclusions )
agent = Agent( client=OpenAIChatClient(), instructions="You are a customer-service agent.", context_providers=[history], )
# Stable id — same conversation across process restarts. session = agent.create_session(session_id="customer-9281") await agent.run("My order #4421 hasn't arrived.", session=session)
# Run later (different process) — history is reloaded from JSONL. follow_up = await agent.run("Any update on that?", session=session) print(follow_up.text)
if __name__ == "__main__": asyncio.run(main())FileHistoryProvider rejects session_ids that escape the storage root (../etc/passwd and Windows reserved stems are rewritten or rejected). For multi-host deployments use the Redis or Cosmos DB providers — the JSONL implementation only locks within a single process.
Recipe 6 — Encrypt session history at rest
Section titled “Recipe 6 — Encrypt session history at rest”Inject your own JSON serialisers via dumps= / loads= to add envelope encryption. The provider validates that the output is a single line — Fernet tokens already are.
import jsonimport osfrom cryptography.fernet import Fernetfrom agent_framework import Agent, FileHistoryProviderfrom agent_framework.openai import OpenAIChatClient
cipher = Fernet(os.environ["AGENT_HISTORY_FERNET_KEY"])
def encrypt_dumps(payload: dict) -> str: plaintext = json.dumps(payload, ensure_ascii=False).encode("utf-8") return cipher.encrypt(plaintext).decode("ascii")
def decrypt_loads(line: str | bytes) -> dict: if isinstance(line, str): line = line.encode("ascii") return json.loads(cipher.decrypt(line).decode("utf-8"))
history = FileHistoryProvider( storage_path="./encrypted-conversations", dumps=encrypt_dumps, loads=decrypt_loads,)
agent = Agent(client=OpenAIChatClient(), context_providers=[history])Test with decrypt_loads(encrypt_dumps(x)) == x for a representative payload before deploying — both callables must round-trip cleanly.
Recipe 7 — Sub-agents with as_tool
Section titled “Recipe 7 — Sub-agents with as_tool”Convert an agent into a tool that other agents can delegate to. propagate_session=True shares the parent’s session so the sub-agent sees the same conversation history.
import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: client = OpenAIChatClient()
researcher = Agent( client=client, name="researcher", description="Performs deep research on technical topics.", instructions="Produce thorough, citation-backed research on the requested topic.", )
summariser = Agent( client=client, name="summariser", description="Condenses long passages into bullet points.", instructions="Reduce input to <= 5 bullet points.", )
coordinator = Agent( client=client, instructions=( "You are a project coordinator. Use the `researcher` tool to gather " "information, then the `summariser` tool to produce final bullets." ), tools=[ researcher.as_tool(), summariser.as_tool(propagate_session=True), # share the session ], )
response = await coordinator.run("Brief me on quantum-resistant cryptography.") print(response.text)
if __name__ == "__main__": asyncio.run(main())Three knobs on as_tool() worth knowing:
name=/description=override the auto-generated tool name and description.approval_mode="always_require"puts a HITL gate on the delegation itself — useful when a sub-agent has access to expensive or sensitive tools.stream_callback=surfaces the sub-agent’s streaming updates to your UI as it works.
Recipe 8 — Router pattern (classify + dispatch)
Section titled “Recipe 8 — Router pattern (classify + dispatch)”Pure-Python routing using a stateless classifier agent and per-domain specialists.
import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
class SupportRouter: def __init__(self, client: OpenAIChatClient) -> None: self.router = Agent( client=client, instructions=( "Classify the user query as one of: BILLING, TECHNICAL, ACCOUNT. " "Reply with exactly that single word." ), ) self.specialists = { "BILLING": Agent(client=client, instructions="You are a billing specialist."), "TECHNICAL": Agent(client=client, instructions="You are a tech-support engineer."), "ACCOUNT": Agent(client=client, instructions="You are an account-management agent."), }
async def handle(self, query: str) -> str: route = (await self.router.run(query)).text.strip().upper() agent = self.specialists.get(route, self.specialists["TECHNICAL"]) return (await agent.run(query)).text
async def main() -> None: router = SupportRouter(OpenAIChatClient()) print(await router.handle("My invoice for April is wrong."))
if __name__ == "__main__": asyncio.run(main())For a graph-based variant with checkpointing, fan-in / fan-out edges, and event streaming, swap the dispatcher for WorkflowBuilder (Recipe 13).
Recipe 9 — Lightweight evaluation in CI
Section titled “Recipe 9 — Lightweight evaluation in CI”LocalEvaluator is dependency-free — every check is a Python function. Combine the built-in checks (keyword_check, tool_called_check, tool_calls_present, tool_call_args_match) with your own @evaluator callables.
import asynciofrom agent_framework import ( Agent, LocalEvaluator, evaluate_agent, keyword_check, tool, tool_called_check, evaluator, CheckResult,)from agent_framework.openai import OpenAIChatClient
@tooldef get_weather(location: str) -> str: """Return the current weather for a city.""" return f"{location}: 22°C, clear"
@evaluatordef mentions_celsius(response: str) -> CheckResult: return CheckResult( passed="°C" in response, reason="celsius unit present" if "°C" in response else "missing °C", check_name="mentions_celsius", )
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You are a weather assistant; always cite °C.", tools=[get_weather], )
[results] = await evaluate_agent( agent=agent, queries=["What's the weather in Amsterdam?", "Is it warm in Cairo?"], evaluators=LocalEvaluator( keyword_check("weather"), tool_called_check("get_weather"), mentions_celsius, ), )
counts = results.result_counts print(f"passed={counts['passed']} failed={counts['failed']}") assert counts["failed"] == 0, results.error
if __name__ == "__main__": asyncio.run(main())For LLM-as-judge or weighted scoring patterns see the evaluation guide.
Recipe 10 — Skills for progressive-disclosure knowledge
Section titled “Recipe 10 — Skills for progressive-disclosure knowledge”A Skill is advertised by name + description. Only when the model decides to use it is the full body or a resource fetched — keeping context lean for rarely-used domain knowledge.
import asynciofrom agent_framework import Agent, Skill, SkillsProviderfrom agent_framework.openai import OpenAIChatClient
pricing = Skill( name="tenant-pricing", description="Look up the per-SKU pricing matrix for the active tenant.", content=( "Use `read_skill_resource('tenant-pricing', 'matrix')` to fetch the current " "matrix. Quote prices in USD unless the user specifies otherwise." ),)
@pricing.resourceasync def matrix(**kwargs) -> str: """Pricing matrix for the active tenant.
The agent invokes this with no arguments; we receive runtime context via **kwargs forwarded from agent.run(..., function_invocation_kwargs=...). """ tenant_id = kwargs.get("tenant_id", "default") # Replace with a real DB / API call. return f"tenant={tenant_id}\npro: $99/mo\nteam: $299/mo\nenterprise: contact sales"
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You answer pricing questions for the active tenant.", context_providers=[SkillsProvider(skills=[pricing])], )
response = await agent.run( "What does the team plan cost for ACME?", function_invocation_kwargs={"tenant_id": "acme"}, ) print(response.text)
if __name__ == "__main__": asyncio.run(main())**kwargs in the resource signature opts the function in to runtime data the model never sees (tenant id, request id, user id). Without **kwargs the framework calls the resource with no args. See the skills guide for file-based skills, executable scripts, and approval gates.
Advanced
Section titled “Advanced”Recipe 11 — MCP stdio + HTTP composition
Section titled “Recipe 11 — MCP stdio + HTTP composition”Combine a local MCP server (filesystem) with a remote one (Microsoft Learn). Each tool’s exposed functions are namespaced by its name=.
import asynciofrom agent_framework import Agent, MCPStdioTool, MCPStreamableHTTPToolfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: async with ( MCPStdioTool( name="fs", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"], description="Local filesystem under /tmp", ) as fs, MCPStreamableHTTPTool( name="learn", url="https://learn.microsoft.com/api/mcp", description="Microsoft Learn documentation search", request_timeout=30, ) as learn, ): agent = Agent( client=OpenAIChatClient(), instructions=( "You can read and write files in /tmp via `fs` and search Microsoft " "Learn via `learn`. Cite documentation URLs when relevant." ), tools=[fs, learn], )
response = await agent.run( "List files in /tmp and find the latest azure-identity guidance." ) print(response.text)
if __name__ == "__main__": asyncio.run(main())MCPStdioTool spawns a subprocess; async with cleans it up deterministically. For long-running servers, manage connect() / close() explicitly so the same MCP session survives across many agent runs (see the MCP guide).
Recipe 12 — Per-tenant MCP via header_provider
Section titled “Recipe 12 — Per-tenant MCP via header_provider”Forward auth tokens to a remote MCP server without building a fresh httpx.AsyncClient per tenant. The header_provider callable receives function_invocation_kwargs from the outer agent.run(...).
import asynciofrom agent_framework import Agent, MCPStreamableHTTPToolfrom agent_framework.openai import OpenAIChatClient
# Per-tool approval policy: write ops require approval, reads don't.# (`MCPSpecificApproval` is a TypedDict in agent_framework._mcp; the dict# below is structurally typed so the import isn't required at runtime.)billing_approval = { "always_require_approval": ["billing.refund", "billing.void"], "never_require_approval": ["billing.list_invoices", "billing.get_invoice"],}
async def serve_tenant(tenant_token: str, query: str) -> str: mcp = MCPStreamableHTTPTool( name="billing", url="https://mcp.example.com", approval_mode=billing_approval, header_provider=lambda kwargs: { "Authorization": f"Bearer {kwargs['token']}", "X-Tenant-Id": kwargs.get("tenant_id", "default"), }, )
async with mcp: agent = Agent(client=OpenAIChatClient(), tools=mcp) response = await agent.run( query, function_invocation_kwargs={"token": tenant_token, "tenant_id": "acme"}, ) return response.text
asyncio.run(serve_tenant("tk_live_abc", "List my open invoices."))MCPSpecificApproval is a typed dict — IDEs autocomplete the two valid keys. Tools listed in both lists require approval (the safe default).
Recipe 13 — Workflow with checkpointing
Section titled “Recipe 13 — Workflow with checkpointing”Long-running multi-step pipelines benefit from FileCheckpointStorage: every superstep is durable, and a process restart can resume from the latest checkpoint.
import asynciofrom agent_framework import Agent, FileCheckpointStorage, WorkflowBuilderfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: client = OpenAIChatClient()
researcher = Agent(client=client, name="researcher", instructions="Produce 5 bullet research findings.") analyst = Agent(client=client, name="analyst", instructions="Identify the most important finding.") writer = Agent(client=client, name="writer", instructions="Turn the chosen finding into a 3-paragraph brief.")
storage = FileCheckpointStorage( storage_path="./checkpoints", # Allow your domain types if you persist them in workflow state. # allowed_checkpoint_types=["my_app.models:ResearchState"], )
workflow = ( WorkflowBuilder( start_executor=researcher, checkpoint_storage=storage, name="research-brief", max_iterations=10, ) .add_edge(researcher, analyst) .add_edge(analyst, writer) .build() )
result = await workflow.run("Topic: post-quantum cryptography migration") print(result.get_outputs()[-1])
# Inspect what was saved. latest = await storage.get_latest(workflow_name="research-brief") if latest is not None: print(f"latest checkpoint: id={latest.checkpoint_id} step={latest.iteration_count}")
if __name__ == "__main__": asyncio.run(main())Checkpoints are keyed on the workflow definition, not the instance — any process running the same WorkflowBuilder output can resume. For Cosmos DB, install agent-framework-azure-cosmos; for Redis, agent-framework-redis.
Recipe 14 — Function middleware: cache + retry
Section titled “Recipe 14 — Function middleware: cache + retry”Stack two FunctionMiddleware instances. The cache short-circuits repeat calls; the retry handles transient failures.
import asyncioimport jsonfrom typing import Anyfrom agent_framework import ( Agent, FunctionMiddleware, FunctionInvocationContext, MiddlewareTermination, tool,)from agent_framework.openai import OpenAIChatClient
class IdempotentCache(FunctionMiddleware): """Memoise calls to tools tagged kind='readonly'."""
def __init__(self) -> None: self._cache: dict[str, Any] = {}
@staticmethod def _key(name: str, arguments: dict[str, Any]) -> str: return f"{name}::{json.dumps(arguments, sort_keys=True, default=str)}"
async def process(self, context: FunctionInvocationContext, call_next) -> None: if context.function.kind != "readonly": await call_next() return key = self._key(context.function.name, dict(context.arguments or {})) if key in self._cache: raise MiddlewareTermination("cache hit", result=self._cache[key]) await call_next() if context.result is not None: self._cache[key] = context.result
class RetryOnError(FunctionMiddleware): """Exponential backoff for any exception raised by the wrapped tool."""
def __init__(self, attempts: int = 3, backoff: float = 0.5) -> None: self.attempts = attempts self.backoff = backoff
async def process(self, context: FunctionInvocationContext, call_next) -> None: last_exc: Exception | None = None for i in range(self.attempts): try: await call_next() return except Exception as exc: last_exc = exc await asyncio.sleep(self.backoff * (2 ** i)) assert last_exc is not None raise last_exc
@tool(kind="readonly")def lookup_sku(sku: str) -> str: """Return the canonical name for a SKU.""" # Replace with a real DB / HTTP call. return f"SKU {sku}: Wireless Charger"
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="Answer using the lookup_sku tool when needed.", tools=[lookup_sku], middleware=[IdempotentCache(), RetryOnError()], ) print((await agent.run("What is SKU WCH-42?")).text) print((await agent.run("And what is SKU WCH-42 again?")).text) # cache hit
if __name__ == "__main__": asyncio.run(main())Outer-to-inner: IdempotentCache wraps RetryOnError wraps the tool. The cache short-circuits via MiddlewareTermination so the retry layer doesn’t even fire on a hit.
Recipe 15 — Agent middleware: budgets and policy gates
Section titled “Recipe 15 — Agent middleware: budgets and policy gates”AgentMiddleware wraps the entire agent.run(...) call. Use it for global concerns: budgets, profanity filters, system-prompt injection, run-level retries.
import timefrom agent_framework import ( Agent, AgentContext, AgentMiddleware, AgentResponse, Content, Message, MiddlewareTermination,)from agent_framework.openai import OpenAIChatClient
class BudgetGuard(AgentMiddleware): """Reject runs once a tenant has burned through its quota."""
def __init__(self, quotas: dict[str, int]) -> None: self.quotas = dict(quotas)
async def process(self, context: AgentContext, call_next) -> None: tenant = context.kwargs.get("tenant_id", "default") if self.quotas.get(tenant, 0) <= 0: raise MiddlewareTermination( f"tenant={tenant} over quota", result=AgentResponse( messages=[Message(role="assistant", contents=[Content.from_text("Quota exhausted.")])], ), ) self.quotas[tenant] -= 1 await call_next()
class LatencyTimer(AgentMiddleware): """Stamp wall-clock latency onto context.metadata so downstream code can read it."""
async def process(self, context: AgentContext, call_next) -> None: t0 = time.monotonic() await call_next() context.metadata["wall_ms"] = (time.monotonic() - t0) * 1000
agent = Agent( client=OpenAIChatClient(), instructions="…", middleware=[ LatencyTimer(), BudgetGuard({"acme": 100, "globex": 50}), ],)MiddlewareTermination(result=...) lets you return a fully-formed synthetic response without invoking the model. See the middleware guide for streaming hooks, ChatMiddleware, and observability spans.
Recipe 16 — Expose an agent over MCP
Section titled “Recipe 16 — Expose an agent over MCP”agent.as_mcp_server() returns a low-level mcp.server.lowlevel.Server that hosts the agent as a single tool. Drop it into stdio for inter-process composition or hook it up to the streamable-HTTP host of your choice.
import anyiofrom mcp.server.stdio import stdio_serverfrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent( client=OpenAIChatClient(), name="docs_agent", description="Answers questions about our internal documentation.", instructions="Only answer using the indexed docs.", )
server = agent.as_mcp_server( server_name="docs-mcp", version="1.0.0", instructions="Use docs_agent for any question about internal docs.", )
async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, server.create_initialization_options())
anyio.run(main)Other agents (or Claude Desktop, an LLM IDE, …) can now consume your agent via MCPStdioTool(name="docs", command="python", args=["mcp_server.py"]). Image and audio outputs are dropped with a warning — only text content is forwarded over the MCP server path.
Integration
Section titled “Integration”Recipe 17 — FastAPI service (modern lifespan)
Section titled “Recipe 17 — FastAPI service (modern lifespan)”Wire an agent into a FastAPI app using the modern lifespan context manager (@app.on_event("startup") is deprecated in FastAPI 0.93+).
from contextlib import asynccontextmanagerfrom fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModel
from agent_framework import Agent, AgentSession, FileHistoryProviderfrom agent_framework.openai import OpenAIChatClient
class State: agent: Agent history: FileHistoryProvider
state = State()
@asynccontextmanagerasync def lifespan(app: FastAPI): state.history = FileHistoryProvider(storage_path="./conversations") state.agent = Agent( client=OpenAIChatClient(), instructions="You are a helpful API assistant.", context_providers=[state.history], ) yield # Add explicit cleanup here if you need it (close clients, flush metrics).
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel): message: str session_id: str
@app.post("/chat")async def chat(payload: ChatRequest) -> dict[str, str]: if not payload.message.strip(): raise HTTPException(status_code=400, detail="empty message")
session = state.agent.create_session(session_id=payload.session_id) response = await state.agent.run(payload.message, session=session) return {"reply": response.text}
# Run with: uvicorn api:app --reloadAuthorise the session_id against the authenticated user before passing it to create_session() — FileHistoryProvider only protects you from filesystem traversal, not authorisation.
Recipe 18 — Azure Functions queue trigger
Section titled “Recipe 18 — Azure Functions queue trigger”Reuse the agent across invocations (warm-start) so Functions container reuse pays off.
import loggingimport azure.functions as funcfrom agent_framework import Agentfrom agent_framework.foundry import FoundryChatClient
app = func.FunctionApp()
# Module-level construction → warm-start reuse across invocations._client = FoundryChatClient()_agent = Agent( client=_client, instructions="You process queued tasks. Reply with the action plan only.",)
@app.queue_trigger(arg_name="msg", queue_name="agent-tasks", connection="AzureWebJobsStorage")async def process(msg: func.QueueMessage) -> None: body = msg.get_body().decode("utf-8") logging.info("received task: %s", body)
response = await _agent.run(body) logging.info("plan: %s", response.text)For long-running jobs that should survive a redeploy, persist intermediate state via FileCheckpointStorage (Recipe 13) on a mounted Azure Files share, or move to a queue-driven Workflow with Cosmos checkpointing.
Recipe 19 — Server-Sent Events streaming endpoint
Section titled “Recipe 19 — Server-Sent Events streaming endpoint”Stream the agent’s tokens to a browser via SSE. Reuse the ResponseStream directly — no extra buffering needed.
import jsonfrom contextlib import asynccontextmanagerfrom typing import AsyncIterator
from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom pydantic import BaseModel
from agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
class State: agent: Agent
state = State()
@asynccontextmanagerasync def lifespan(app: FastAPI): state.agent = Agent(client=OpenAIChatClient(), instructions="Be concise.") yield
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel): message: str
async def sse(stream) -> AsyncIterator[bytes]: async for update in stream: if update.text: payload = json.dumps({"text": update.text}).encode() yield b"data: " + payload + b"\n\n" final = await stream.get_final_response() yield b"data: " + json.dumps({"done": True, "finish_reasons": list(final.finish_reasons or [])}).encode() + b"\n\n"
@app.post("/chat/stream")async def chat_stream(req: ChatRequest) -> StreamingResponse: stream = state.agent.run(req.message, stream=True) return StreamingResponse(sse(stream), media_type="text/event-stream")stream.get_final_response() after iteration completes is the canonical place to access the joined response, finish reasons, and any HITL approval requests. For approval mid-stream, drop into await stream.send_response(...) before the loop returns.
Troubleshooting
Section titled “Troubleshooting”Recipe 20 — Debug streaming with tool-call visibility
Section titled “Recipe 20 — Debug streaming with tool-call visibility”Inspect tool calls inline as the model emits them — useful when an agent picks the wrong tool or emits empty arguments.
import asynciofrom agent_framework import Agent, toolfrom agent_framework.openai import OpenAIChatClient
@tooldef get_time() -> str: """Return the current UTC time.""" from datetime import datetime, timezone return datetime.now(timezone.utc).isoformat()
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="Use get_time when asked about time.", tools=[get_time], )
stream = agent.run("What's the current UTC time? Then tell me a joke.", stream=True) async for update in stream: # Text chunks if update.text: print(f"\033[92m{update.text}\033[0m", end="", flush=True)
# Tool calls and results live inside `update.contents` as Content items. for content in update.contents or []: ctype = getattr(content, "type", None) if ctype == "function_call": print(f"\n\033[90m[call {content.name}({content.arguments})]\033[0m") elif ctype == "function_result": print(f"\n\033[90m[result {content.call_id}: {content.result}]\033[0m")
print() final = await stream.get_final_response() print(f"\nfinish_reasons={final.finish_reasons}")
if __name__ == "__main__": asyncio.run(main())Each AgentResponseUpdate carries contents — a list of unified Content items. Filter by type to pick out function calls, function results, text, images, and HITL requests. There’s no separate tool_calls attribute on the update.
Recipe 21 — Replay a session blob across processes
Section titled “Recipe 21 — Replay a session blob across processes”Stateless workers (Lambda, Cloud Run) can round-trip the entire session through to_dict() / from_dict(). The blob carries every message plus any custom state you stored.
import asyncioimport jsonfrom agent_framework import Agent, AgentSession, InMemoryHistoryProviderfrom agent_framework.openai import OpenAIChatClient
async def turn(blob: str | None, message: str) -> tuple[str, str]: agent = Agent( client=OpenAIChatClient(), instructions="You are a customer-service agent.", context_providers=[InMemoryHistoryProvider()], ) session = AgentSession.from_dict(json.loads(blob)) if blob else AgentSession() response = await agent.run(message, session=session) return response.text, json.dumps(session.to_dict())
async def main() -> None: reply, blob = await turn(None, "My order #4421 is missing.") print(reply)
# A different worker process picks up the same conversation: reply, blob = await turn(blob, "What did I just ask about?") print(reply)
if __name__ == "__main__": asyncio.run(main())For custom types in session.state, register them once at module import via register_state_type(...) so cold-starts work even before the model is serialised. See the sessions guide for the full contract.
Quick reference
Section titled “Quick reference”| Need | Class / function |
|---|---|
| Build an agent | Agent(client=, instructions=, tools=, context_providers=, middleware=) |
| One-shot vs threaded | agent.run(prompt) vs agent.run(prompt, session=session) |
| Stream tokens | agent.run(prompt, stream=True) returns ResponseStream |
| Tool from a function | @tool from agent_framework |
| Sub-agent as tool | agent.as_tool(propagate_session=True) |
| Expose agent over MCP | agent.as_mcp_server(server_name=..., version=...) |
| Persistent sessions | FileHistoryProvider(storage_path=...) |
| Encrypted sessions | FileHistoryProvider(dumps=..., loads=...) |
| MCP local subprocess | MCPStdioTool(name=, command=, args=) |
| MCP remote HTTP | MCPStreamableHTTPTool(name=, url=, header_provider=) |
| Per-tool approval | approval_mode="always_require" or per-tool dict ({"always_require_approval": [...], "never_require_approval": [...]}) |
| Tool retries | FunctionMiddleware subclass, await call_next() in a loop |
| Run-level guards | AgentMiddleware subclass, raise MiddlewareTermination(result=...) |
| Workflow checkpoints | FileCheckpointStorage(storage_path=...) + WorkflowBuilder(checkpoint_storage=) |
| Skills | Skill(...) + SkillsProvider(skills=[...]) |
| Eval gates | LocalEvaluator(*checks) + evaluate_agent(...) |
For deep dives see the framework’s other Python guides: tools, sessions, middleware, MCP, skills, evaluation, checkpointing, and orchestration.