Microsoft Agent Framework Python - Recipes and Code Patterns
Microsoft Agent Framework Python — Recipes and Code Patterns
Section titled “Microsoft Agent Framework Python — Recipes and Code Patterns”Practical, runnable patterns for the real agent_framework package. Every recipe targets the public surface of the latest release; nothing here relies on private modules.
- Package:
agent-framework(the umbrella distribution that pulls inagent-framework-coreplus every official provider). Imports root atagent_framework. - Pinned version:
agent-framework==1.6.0. Check the latest withpip index versions agent-framework. - Python: 3.10+ — the entire package uses
from __future__ import annotations, the|union syntax, and modern asyncio. - Verified APIs:
Agent,RawAgent,AgentSession,FileHistoryProvider,FileCheckpointStorage,MCPStdioTool,MCPStreamableHTTPTool,MCPWebsocketTool,AgentMiddleware,ChatMiddleware,FunctionMiddleware,SkillsProvider,InlineSkill,ClassSkill,FileSkillsSource,LocalEvaluator,WorkflowBuilder,CompactionProvider,ToolResultCompactionStrategy,SelectiveToolCallCompactionStrategy,TokenBudgetComposedStrategy,TodoProvider,TodoFileStore,MemoryContextProvider,MemoryFileStore.
pip install agent-framework# Or pin explicitly:pip install 'agent-framework==1.6.0'Beginner
Section titled “Beginner”Recipe 1 — Hello agent (single-turn, multi-turn)
Section titled “Recipe 1 — Hello agent (single-turn, multi-turn)”The minimal agent. The same Agent class handles both stateless one-shot calls and stateful conversations — the difference is whether you pass a session=.
import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent( client=OpenAIChatClient(), # reads OPENAI_API_KEY instructions="You are a friendly assistant. Keep responses concise.", )
# Stateless — no history. one_shot = await agent.run("In one sentence: what is asyncio?") print(one_shot.text)
# Stateful — pass a session to keep the thread alive across turns. session = agent.create_session() await agent.run("My favourite colour is teal.", session=session) follow_up = await agent.run("What did I just tell you?", session=session) print(follow_up.text) # mentions teal
if __name__ == "__main__": asyncio.run(main())Swap OpenAIChatClient for AzureOpenAIChatClient, FoundryChatClient, AnthropicClient, OllamaChatClient, or BedrockChatClient to switch providers — every chat client implements the same SupportsChatGetResponse protocol so the rest of your code is identical.
Recipe 2 — Streaming responses
Section titled “Recipe 2 — Streaming responses”agent.run(stream=True) returns a ResponseStream[AgentResponseUpdate, AgentResponse]. There is no agent.run_stream(...).
import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent(client=OpenAIChatClient(), instructions="You are a tutor.")
stream = agent.run("Explain backpressure in 3 paragraphs.", stream=True) async for update in stream: if update.text: print(update.text, end="", flush=True) print()
# The assembled final response — finish_reason, full text, tool calls. final = await stream.get_final_response() print(f"\nfinish_reason={final.finish_reasons}")
if __name__ == "__main__": asyncio.run(main())stream.get_final_response() waits until consumption finishes and returns the joined AgentResponse. For HITL approvals mid-stream use await stream.send_response(approval) instead of starting a new run.
Recipe 3 — A tool with typed arguments and error handling
Section titled “Recipe 3 — A tool with typed arguments and error handling”@tool builds a JSON schema from your signature. Raise on bad inputs — the exception message becomes the model’s tool-call result so it can recover.
import asynciofrom typing import Annotatedfrom agent_framework import Agent, toolfrom agent_framework.openai import OpenAIChatClient
@tool(description="Divide two numbers.")def divide( numerator: Annotated[float, "The dividend"], denominator: Annotated[float, "The divisor — must not be zero"],) -> str: if denominator == 0: raise ValueError("denominator must be non-zero") return f"{numerator / denominator:.4f}"
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You are a math assistant. Use tools when arithmetic is needed.", tools=[divide], ) print((await agent.run("What is 22 over 7?")).text) print((await agent.run("What is 10 divided by 0?")).text)
if __name__ == "__main__": asyncio.run(main())The model sees the Annotated strings as parameter descriptions in the JSON schema. For richer constraints use pydantic.Field(description=..., ge=..., le=...) instead — the decorator picks up either form.
Recipe 4 — Structured output with Pydantic
Section titled “Recipe 4 — Structured output with Pydantic”Force a schema-conforming response. response.value is the validated Pydantic instance.
import asynciofrom pydantic import BaseModel, Fieldfrom agent_framework import Agent, ChatOptionsfrom agent_framework.openai import OpenAIChatClient
class Profile(BaseModel): name: str = Field(description="Full name") age: int = Field(ge=0, le=130) interests: list[str]
async def main() -> None: agent = Agent(client=OpenAIChatClient(), instructions="Extract structured data.")
response = await agent.run( "I'm Jane Doe, 28, and I love hiking and Python.", options=ChatOptions(response_format=Profile), ) profile: Profile = response.value print(profile.model_dump_json(indent=2))
if __name__ == "__main__": asyncio.run(main())Pass response_format= via ChatOptions (or set it as a default on the chat client). For streaming structured output, the same option works against agent.run(..., stream=True) — the framework lazily validates final.value on the final update.
Intermediate
Section titled “Intermediate”Recipe 5 — Persistent sessions on disk
Section titled “Recipe 5 — Persistent sessions on disk”Default sessions live in memory. FileHistoryProvider writes one JSONL file per session_id and survives restarts.
import asynciofrom agent_framework import Agent, FileHistoryProviderfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: history = FileHistoryProvider( storage_path="./conversations", # directory created on demand skip_excluded=True, # honour CompactionProvider exclusions )
agent = Agent( client=OpenAIChatClient(), instructions="You are a customer-service agent.", context_providers=[history], )
# Stable id — same conversation across process restarts. session = agent.create_session(session_id="customer-9281") await agent.run("My order #4421 hasn't arrived.", session=session)
# Run later (different process) — history is reloaded from JSONL. follow_up = await agent.run("Any update on that?", session=session) print(follow_up.text)
if __name__ == "__main__": asyncio.run(main())FileHistoryProvider rejects session_ids that escape the storage root (../etc/passwd and Windows reserved stems are rewritten or rejected). For multi-host deployments use the Redis or Cosmos DB providers — the JSONL implementation only locks within a single process.
Recipe 6 — Encrypt session history at rest
Section titled “Recipe 6 — Encrypt session history at rest”Inject your own JSON serialisers via dumps= / loads= to add envelope encryption. The provider validates that the output is a single line — Fernet tokens already are.
import jsonimport osfrom cryptography.fernet import Fernetfrom agent_framework import Agent, FileHistoryProviderfrom agent_framework.openai import OpenAIChatClient
cipher = Fernet(os.environ["AGENT_HISTORY_FERNET_KEY"])
def encrypt_dumps(payload: dict) -> str: plaintext = json.dumps(payload, ensure_ascii=False).encode("utf-8") return cipher.encrypt(plaintext).decode("ascii")
def decrypt_loads(line: str | bytes) -> dict: if isinstance(line, str): line = line.encode("ascii") return json.loads(cipher.decrypt(line).decode("utf-8"))
history = FileHistoryProvider( storage_path="./encrypted-conversations", dumps=encrypt_dumps, loads=decrypt_loads,)
agent = Agent(client=OpenAIChatClient(), context_providers=[history])Test with decrypt_loads(encrypt_dumps(x)) == x for a representative payload before deploying — both callables must round-trip cleanly.
Recipe 7 — Sub-agents with as_tool
Section titled “Recipe 7 — Sub-agents with as_tool”Convert an agent into a tool that other agents can delegate to. propagate_session=True shares the parent’s session so the sub-agent sees the same conversation history.
import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: client = OpenAIChatClient()
researcher = Agent( client=client, name="researcher", description="Performs deep research on technical topics.", instructions="Produce thorough, citation-backed research on the requested topic.", )
summariser = Agent( client=client, name="summariser", description="Condenses long passages into bullet points.", instructions="Reduce input to <= 5 bullet points.", )
coordinator = Agent( client=client, instructions=( "You are a project coordinator. Use the `researcher` tool to gather " "information, then the `summariser` tool to produce final bullets." ), tools=[ researcher.as_tool(), summariser.as_tool(propagate_session=True), # share the session ], )
response = await coordinator.run("Brief me on quantum-resistant cryptography.") print(response.text)
if __name__ == "__main__": asyncio.run(main())Three knobs on as_tool() worth knowing:
name=/description=override the auto-generated tool name and description.approval_mode="always_require"puts a HITL gate on the delegation itself — useful when a sub-agent has access to expensive or sensitive tools.stream_callback=surfaces the sub-agent’s streaming updates to your UI as it works.
Recipe 8 — Router pattern (classify + dispatch)
Section titled “Recipe 8 — Router pattern (classify + dispatch)”Pure-Python routing using a stateless classifier agent and per-domain specialists.
import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
class SupportRouter: def __init__(self, client: OpenAIChatClient) -> None: self.router = Agent( client=client, instructions=( "Classify the user query as one of: BILLING, TECHNICAL, ACCOUNT. " "Reply with exactly that single word." ), ) self.specialists = { "BILLING": Agent(client=client, instructions="You are a billing specialist."), "TECHNICAL": Agent(client=client, instructions="You are a tech-support engineer."), "ACCOUNT": Agent(client=client, instructions="You are an account-management agent."), }
async def handle(self, query: str) -> str: route = (await self.router.run(query)).text.strip().upper() agent = self.specialists.get(route, self.specialists["TECHNICAL"]) return (await agent.run(query)).text
async def main() -> None: router = SupportRouter(OpenAIChatClient()) print(await router.handle("My invoice for April is wrong."))
if __name__ == "__main__": asyncio.run(main())For a graph-based variant with checkpointing, fan-in / fan-out edges, and event streaming, swap the dispatcher for WorkflowBuilder (Recipe 13).
Recipe 9 — Lightweight evaluation in CI
Section titled “Recipe 9 — Lightweight evaluation in CI”LocalEvaluator is dependency-free — every check is a Python function. Combine the built-in checks (keyword_check, tool_called_check, tool_calls_present, tool_call_args_match) with your own @evaluator callables.
import asynciofrom agent_framework import ( Agent, LocalEvaluator, evaluate_agent, keyword_check, tool, tool_called_check, evaluator, CheckResult,)from agent_framework.openai import OpenAIChatClient
@tooldef get_weather(location: str) -> str: """Return the current weather for a city.""" return f"{location}: 22°C, clear"
@evaluatordef mentions_celsius(response: str) -> CheckResult: return CheckResult( passed="°C" in response, reason="celsius unit present" if "°C" in response else "missing °C", check_name="mentions_celsius", )
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You are a weather assistant; always cite °C.", tools=[get_weather], )
[results] = await evaluate_agent( agent=agent, queries=["What's the weather in Amsterdam?", "Is it warm in Cairo?"], evaluators=LocalEvaluator( keyword_check("weather"), tool_called_check("get_weather"), mentions_celsius, ), )
counts = results.result_counts print(f"passed={counts['passed']} failed={counts['failed']}") assert counts["failed"] == 0, results.error
if __name__ == "__main__": asyncio.run(main())For LLM-as-judge or weighted scoring patterns see the evaluation guide.
Recipe 10 — Skills for progressive-disclosure knowledge
Section titled “Recipe 10 — Skills for progressive-disclosure knowledge”A Skill is advertised by name + description. Only when the model decides to use it is the full body or a resource fetched — keeping context lean for rarely-used domain knowledge.
import asynciofrom agent_framework import Agent, Skill, SkillsProviderfrom agent_framework.openai import OpenAIChatClient
pricing = Skill( name="tenant-pricing", description="Look up the per-SKU pricing matrix for the active tenant.", content=( "Use `read_skill_resource('tenant-pricing', 'matrix')` to fetch the current " "matrix. Quote prices in USD unless the user specifies otherwise." ),)
@pricing.resourceasync def matrix(**kwargs) -> str: """Pricing matrix for the active tenant.
The agent invokes this with no arguments; we receive runtime context via **kwargs forwarded from agent.run(..., function_invocation_kwargs=...). """ tenant_id = kwargs.get("tenant_id", "default") # Replace with a real DB / API call. return f"tenant={tenant_id}\npro: $99/mo\nteam: $299/mo\nenterprise: contact sales"
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You answer pricing questions for the active tenant.", context_providers=[SkillsProvider(skills=[pricing])], )
response = await agent.run( "What does the team plan cost for ACME?", function_invocation_kwargs={"tenant_id": "acme"}, ) print(response.text)
if __name__ == "__main__": asyncio.run(main())**kwargs in the resource signature opts the function in to runtime data the model never sees (tenant id, request id, user id). Without **kwargs the framework calls the resource with no args. See the skills guide for file-based skills, executable scripts, and approval gates.
Advanced
Section titled “Advanced”Recipe 11 — MCP stdio + HTTP composition
Section titled “Recipe 11 — MCP stdio + HTTP composition”Combine a local MCP server (filesystem) with a remote one (Microsoft Learn). Each tool’s exposed functions are namespaced by its name=.
import asynciofrom agent_framework import Agent, MCPStdioTool, MCPStreamableHTTPToolfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: async with ( MCPStdioTool( name="fs", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"], description="Local filesystem under /tmp", ) as fs, MCPStreamableHTTPTool( name="learn", url="https://learn.microsoft.com/api/mcp", description="Microsoft Learn documentation search", request_timeout=30, ) as learn, ): agent = Agent( client=OpenAIChatClient(), instructions=( "You can read and write files in /tmp via `fs` and search Microsoft " "Learn via `learn`. Cite documentation URLs when relevant." ), tools=[fs, learn], )
response = await agent.run( "List files in /tmp and find the latest azure-identity guidance." ) print(response.text)
if __name__ == "__main__": asyncio.run(main())MCPStdioTool spawns a subprocess; async with cleans it up deterministically. For long-running servers, manage connect() / close() explicitly so the same MCP session survives across many agent runs (see the MCP guide).
Recipe 12 — Per-tenant MCP via header_provider
Section titled “Recipe 12 — Per-tenant MCP via header_provider”Forward auth tokens to a remote MCP server without building a fresh httpx.AsyncClient per tenant. The header_provider callable receives function_invocation_kwargs from the outer agent.run(...).
import asynciofrom agent_framework import Agent, MCPStreamableHTTPToolfrom agent_framework.openai import OpenAIChatClient
# Per-tool approval policy: write ops require approval, reads don't.# (`MCPSpecificApproval` is a TypedDict in agent_framework._mcp; the dict# below is structurally typed so the import isn't required at runtime.)billing_approval = { "always_require_approval": ["billing.refund", "billing.void"], "never_require_approval": ["billing.list_invoices", "billing.get_invoice"],}
async def serve_tenant(tenant_token: str, query: str) -> str: mcp = MCPStreamableHTTPTool( name="billing", url="https://mcp.example.com", approval_mode=billing_approval, header_provider=lambda kwargs: { "Authorization": f"Bearer {kwargs['token']}", "X-Tenant-Id": kwargs.get("tenant_id", "default"), }, )
async with mcp: agent = Agent(client=OpenAIChatClient(), tools=mcp) response = await agent.run( query, function_invocation_kwargs={"token": tenant_token, "tenant_id": "acme"}, ) return response.text
asyncio.run(serve_tenant("tk_live_abc", "List my open invoices."))MCPSpecificApproval is a typed dict — IDEs autocomplete the two valid keys. Tools listed in both lists require approval (the safe default).
Recipe 13 — Workflow with checkpointing
Section titled “Recipe 13 — Workflow with checkpointing”Long-running multi-step pipelines benefit from FileCheckpointStorage: every superstep is durable, and a process restart can resume from the latest checkpoint.
import asynciofrom agent_framework import Agent, FileCheckpointStorage, WorkflowBuilderfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: client = OpenAIChatClient()
researcher = Agent(client=client, name="researcher", instructions="Produce 5 bullet research findings.") analyst = Agent(client=client, name="analyst", instructions="Identify the most important finding.") writer = Agent(client=client, name="writer", instructions="Turn the chosen finding into a 3-paragraph brief.")
storage = FileCheckpointStorage( storage_path="./checkpoints", # Allow your domain types if you persist them in workflow state. # allowed_checkpoint_types=["my_app.models:ResearchState"], )
workflow = ( WorkflowBuilder( start_executor=researcher, checkpoint_storage=storage, name="research-brief", max_iterations=10, ) .add_edge(researcher, analyst) .add_edge(analyst, writer) .build() )
result = await workflow.run("Topic: post-quantum cryptography migration") print(result.get_outputs()[-1])
# Inspect what was saved. latest = await storage.get_latest(workflow_name="research-brief") if latest is not None: print(f"latest checkpoint: id={latest.checkpoint_id} step={latest.iteration_count}")
if __name__ == "__main__": asyncio.run(main())Checkpoints are keyed on the workflow definition, not the instance — any process running the same WorkflowBuilder output can resume. For Cosmos DB, install agent-framework-azure-cosmos; for Redis, agent-framework-redis.
Recipe 14 — Function middleware: cache + retry
Section titled “Recipe 14 — Function middleware: cache + retry”Stack two FunctionMiddleware instances. The cache short-circuits repeat calls; the retry handles transient failures.
import asyncioimport jsonfrom typing import Anyfrom agent_framework import ( Agent, FunctionMiddleware, FunctionInvocationContext, MiddlewareTermination, tool,)from agent_framework.openai import OpenAIChatClient
class IdempotentCache(FunctionMiddleware): """Memoise calls to tools tagged kind='readonly'."""
def __init__(self) -> None: self._cache: dict[str, Any] = {}
@staticmethod def _key(name: str, arguments: dict[str, Any]) -> str: return f"{name}::{json.dumps(arguments, sort_keys=True, default=str)}"
async def process(self, context: FunctionInvocationContext, call_next) -> None: if context.function.kind != "readonly": await call_next() return key = self._key(context.function.name, dict(context.arguments or {})) if key in self._cache: raise MiddlewareTermination("cache hit", result=self._cache[key]) await call_next() if context.result is not None: self._cache[key] = context.result
class RetryOnError(FunctionMiddleware): """Exponential backoff for any exception raised by the wrapped tool."""
def __init__(self, attempts: int = 3, backoff: float = 0.5) -> None: self.attempts = attempts self.backoff = backoff
async def process(self, context: FunctionInvocationContext, call_next) -> None: last_exc: Exception | None = None for i in range(self.attempts): try: await call_next() return except Exception as exc: last_exc = exc await asyncio.sleep(self.backoff * (2 ** i)) assert last_exc is not None raise last_exc
@tool(kind="readonly")def lookup_sku(sku: str) -> str: """Return the canonical name for a SKU.""" # Replace with a real DB / HTTP call. return f"SKU {sku}: Wireless Charger"
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="Answer using the lookup_sku tool when needed.", tools=[lookup_sku], middleware=[IdempotentCache(), RetryOnError()], ) print((await agent.run("What is SKU WCH-42?")).text) print((await agent.run("And what is SKU WCH-42 again?")).text) # cache hit
if __name__ == "__main__": asyncio.run(main())Outer-to-inner: IdempotentCache wraps RetryOnError wraps the tool. The cache short-circuits via MiddlewareTermination so the retry layer doesn’t even fire on a hit.
Recipe 15 — Agent middleware: budgets and policy gates
Section titled “Recipe 15 — Agent middleware: budgets and policy gates”AgentMiddleware wraps the entire agent.run(...) call. Use it for global concerns: budgets, profanity filters, system-prompt injection, run-level retries.
import timefrom agent_framework import ( Agent, AgentContext, AgentMiddleware, AgentResponse, Content, Message, MiddlewareTermination,)from agent_framework.openai import OpenAIChatClient
class BudgetGuard(AgentMiddleware): """Reject runs once a tenant has burned through its quota."""
def __init__(self, quotas: dict[str, int]) -> None: self.quotas = dict(quotas)
async def process(self, context: AgentContext, call_next) -> None: tenant = context.kwargs.get("tenant_id", "default") if self.quotas.get(tenant, 0) <= 0: raise MiddlewareTermination( f"tenant={tenant} over quota", result=AgentResponse( messages=[Message(role="assistant", contents=[Content.from_text("Quota exhausted.")])], ), ) self.quotas[tenant] -= 1 await call_next()
class LatencyTimer(AgentMiddleware): """Stamp wall-clock latency onto context.metadata so downstream code can read it."""
async def process(self, context: AgentContext, call_next) -> None: t0 = time.monotonic() await call_next() context.metadata["wall_ms"] = (time.monotonic() - t0) * 1000
agent = Agent( client=OpenAIChatClient(), instructions="…", middleware=[ LatencyTimer(), BudgetGuard({"acme": 100, "globex": 50}), ],)MiddlewareTermination(result=...) lets you return a fully-formed synthetic response without invoking the model. See the middleware guide for streaming hooks, ChatMiddleware, and observability spans.
Recipe 16 — Expose an agent over MCP
Section titled “Recipe 16 — Expose an agent over MCP”agent.as_mcp_server() returns a low-level mcp.server.lowlevel.Server that hosts the agent as a single tool. Drop it into stdio for inter-process composition or hook it up to the streamable-HTTP host of your choice.
import anyiofrom mcp.server.stdio import stdio_serverfrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent( client=OpenAIChatClient(), name="docs_agent", description="Answers questions about our internal documentation.", instructions="Only answer using the indexed docs.", )
server = agent.as_mcp_server( server_name="docs-mcp", version="1.0.0", instructions="Use docs_agent for any question about internal docs.", )
async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, server.create_initialization_options())
anyio.run(main)Other agents (or Claude Desktop, an LLM IDE, …) can now consume your agent via MCPStdioTool(name="docs", command="python", args=["mcp_server.py"]). Image and audio outputs are dropped with a warning — only text content is forwarded over the MCP server path.
Integration
Section titled “Integration”Recipe 17 — FastAPI service (modern lifespan)
Section titled “Recipe 17 — FastAPI service (modern lifespan)”Wire an agent into a FastAPI app using the modern lifespan context manager (@app.on_event("startup") is deprecated in FastAPI 0.93+).
from contextlib import asynccontextmanagerfrom fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModel
from agent_framework import Agent, AgentSession, FileHistoryProviderfrom agent_framework.openai import OpenAIChatClient
class State: agent: Agent history: FileHistoryProvider
state = State()
@asynccontextmanagerasync def lifespan(app: FastAPI): state.history = FileHistoryProvider(storage_path="./conversations") state.agent = Agent( client=OpenAIChatClient(), instructions="You are a helpful API assistant.", context_providers=[state.history], ) yield # Add explicit cleanup here if you need it (close clients, flush metrics).
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel): message: str session_id: str
@app.post("/chat")async def chat(payload: ChatRequest) -> dict[str, str]: if not payload.message.strip(): raise HTTPException(status_code=400, detail="empty message")
session = state.agent.create_session(session_id=payload.session_id) response = await state.agent.run(payload.message, session=session) return {"reply": response.text}
# Run with: uvicorn api:app --reloadAuthorise the session_id against the authenticated user before passing it to create_session() — FileHistoryProvider only protects you from filesystem traversal, not authorisation.
Recipe 18 — Azure Functions queue trigger
Section titled “Recipe 18 — Azure Functions queue trigger”Reuse the agent across invocations (warm-start) so Functions container reuse pays off.
import loggingimport azure.functions as funcfrom agent_framework import Agentfrom agent_framework.foundry import FoundryChatClient
app = func.FunctionApp()
# Module-level construction → warm-start reuse across invocations._client = FoundryChatClient()_agent = Agent( client=_client, instructions="You process queued tasks. Reply with the action plan only.",)
@app.queue_trigger(arg_name="msg", queue_name="agent-tasks", connection="AzureWebJobsStorage")async def process(msg: func.QueueMessage) -> None: body = msg.get_body().decode("utf-8") logging.info("received task: %s", body)
response = await _agent.run(body) logging.info("plan: %s", response.text)For long-running jobs that should survive a redeploy, persist intermediate state via FileCheckpointStorage (Recipe 13) on a mounted Azure Files share, or move to a queue-driven Workflow with Cosmos checkpointing.
Recipe 19 — Server-Sent Events streaming endpoint
Section titled “Recipe 19 — Server-Sent Events streaming endpoint”Stream the agent’s tokens to a browser via SSE. Reuse the ResponseStream directly — no extra buffering needed.
import jsonfrom contextlib import asynccontextmanagerfrom typing import AsyncIterator
from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom pydantic import BaseModel
from agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
class State: agent: Agent
state = State()
@asynccontextmanagerasync def lifespan(app: FastAPI): state.agent = Agent(client=OpenAIChatClient(), instructions="Be concise.") yield
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel): message: str
async def sse(stream) -> AsyncIterator[bytes]: async for update in stream: if update.text: payload = json.dumps({"text": update.text}).encode() yield b"data: " + payload + b"\n\n" final = await stream.get_final_response() yield b"data: " + json.dumps({"done": True, "finish_reasons": list(final.finish_reasons or [])}).encode() + b"\n\n"
@app.post("/chat/stream")async def chat_stream(req: ChatRequest) -> StreamingResponse: stream = state.agent.run(req.message, stream=True) return StreamingResponse(sse(stream), media_type="text/event-stream")stream.get_final_response() after iteration completes is the canonical place to access the joined response, finish reasons, and any HITL approval requests. For approval mid-stream, drop into await stream.send_response(...) before the loop returns.
Troubleshooting
Section titled “Troubleshooting”Recipe 20 — Debug streaming with tool-call visibility
Section titled “Recipe 20 — Debug streaming with tool-call visibility”Inspect tool calls inline as the model emits them — useful when an agent picks the wrong tool or emits empty arguments.
import asynciofrom agent_framework import Agent, toolfrom agent_framework.openai import OpenAIChatClient
@tooldef get_time() -> str: """Return the current UTC time.""" from datetime import datetime, timezone return datetime.now(timezone.utc).isoformat()
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="Use get_time when asked about time.", tools=[get_time], )
stream = agent.run("What's the current UTC time? Then tell me a joke.", stream=True) async for update in stream: # Text chunks if update.text: print(f"\033[92m{update.text}\033[0m", end="", flush=True)
# Tool calls and results live inside `update.contents` as Content items. for content in update.contents or []: ctype = getattr(content, "type", None) if ctype == "function_call": print(f"\n\033[90m[call {content.name}({content.arguments})]\033[0m") elif ctype == "function_result": print(f"\n\033[90m[result {content.call_id}: {content.result}]\033[0m")
print() final = await stream.get_final_response() print(f"\nfinish_reasons={final.finish_reasons}")
if __name__ == "__main__": asyncio.run(main())Each AgentResponseUpdate carries contents — a list of unified Content items. Filter by type to pick out function calls, function results, text, images, and HITL requests. There’s no separate tool_calls attribute on the update.
Recipe 21 — Replay a session blob across processes
Section titled “Recipe 21 — Replay a session blob across processes”Stateless workers (Lambda, Cloud Run) can round-trip the entire session through to_dict() / from_dict(). The blob carries every message plus any custom state you stored.
import asyncioimport jsonfrom agent_framework import Agent, AgentSession, InMemoryHistoryProviderfrom agent_framework.openai import OpenAIChatClient
async def turn(blob: str | None, message: str) -> tuple[str, str]: agent = Agent( client=OpenAIChatClient(), instructions="You are a customer-service agent.", context_providers=[InMemoryHistoryProvider()], ) session = AgentSession.from_dict(json.loads(blob)) if blob else AgentSession() response = await agent.run(message, session=session) return response.text, json.dumps(session.to_dict())
async def main() -> None: reply, blob = await turn(None, "My order #4421 is missing.") print(reply)
# A different worker process picks up the same conversation: reply, blob = await turn(blob, "What did I just ask about?") print(reply)
if __name__ == "__main__": asyncio.run(main())For custom types in session.state, register them once at module import via register_state_type(...) so cold-starts work even before the model is serialised. See the sessions guide for the full contract.
Recipe 22 — Agent todo list with TodoProvider (Experimental)
Section titled “Recipe 22 — Agent todo list with TodoProvider (Experimental)”Give the agent a persistent task list. The provider exposes add_todos, complete_todos, get_remaining_todos, and remove_todos tools and injects usage instructions into the system prompt automatically.
import asynciofrom agent_framework import Agent, TodoFileStore, TodoProviderfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: # TodoFileStore persists todos under ./todos/<session_id>/todos.json store = TodoFileStore(base_path="./todos")
agent = Agent( client=OpenAIChatClient(), instructions=( "You are a project-management assistant. " "Break every task into trackable todo items." ), context_providers=[TodoProvider(store=store)], )
# Reuse the same session_id to accumulate todos across runs session = agent.create_session(session_id="sprint-42")
r = await agent.run( "Plan the three phases of our Q3 product launch: " "marketing, engineering, and customer-support prep.", session=session, ) print(r.text)
# Second turn — mark phase 1 complete after the team signs off r2 = await agent.run( "Engineering phase is done. Mark it complete.", session=session, ) print(r2.text)
# Inspect remaining todos from application code items, _ = await store.load_state(session, source_id="todo") pending = [i for i in items if not i.is_complete] print(f"\n{len(pending)} task(s) remaining:") for item in pending: print(f" - {item.title}")
if __name__ == "__main__": asyncio.run(main())Use TodoProvider() (no store=) for in-session todos that vanish at process end. Swap in TodoFileStore (or a custom TodoStore subclass backed by Cosmos DB / Redis) for durable, cross-session tracking.
Recipe 23 — Plan / execute mode with AgentModeProvider (Experimental)
Section titled “Recipe 23 — Plan / execute mode with AgentModeProvider (Experimental)”AgentModeProvider lets the agent switch between named operating modes. The default modes are plan (interactive, ask questions) and execute (autonomous, minimise interruptions). You can define any modes you need.
import asynciofrom agent_framework import Agent, AgentModeProvider, set_agent_modefrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You are a senior software engineer.", context_providers=[ AgentModeProvider( default_mode="plan", mode_descriptions={ "plan": "Analyse the requirements and ask clarifying questions. Do not write code yet.", "execute": "Implement the approved plan autonomously. Do not ask questions mid-task.", "review": "Review the completed implementation for correctness and style.", }, ) ], )
session = agent.create_session(session_id="feature-auth-v2")
# Phase 1: planning — agent should ask questions, not write code r1 = await agent.run( "We need to add OAuth2 PKCE login to our FastAPI app.", session=session, ) print("=== PLAN PHASE ===") print(r1.text)
# Switch to execute mode programmatically after the human approves the plan set_agent_mode( session, "execute", available_modes=["plan", "execute", "review"], )
r2 = await agent.run( "Plan looks good. Go ahead and implement it.", session=session, ) print("\n=== EXECUTE PHASE ===") print(r2.text)
# Switch to review mode set_agent_mode(session, "review", available_modes=["plan", "execute", "review"])
r3 = await agent.run( "Review what you just built.", session=session, ) print("\n=== REVIEW PHASE ===") print(r3.text)
if __name__ == "__main__": asyncio.run(main())The agent can also switch modes itself by calling set_mode — useful when the agent decides it has enough information to start executing without waiting for the user. Combine with TodoProvider (Recipe 22) so the agent tracks its own tasks across the mode transitions.
Recipe 24 — Prompt injection defense with SecureAgentConfig (Experimental)
Section titled “Recipe 24 — Prompt injection defense with SecureAgentConfig (Experimental)”SecureAgentConfig is a ContextProvider that defends against prompt injection using information-flow control. It labels tool results as TRUSTED or UNTRUSTED, blocks untrusted content from reaching privileged tools, and optionally logs policy violations. Import it from agent_framework.security — a separate sub-module from the main namespace.
import asynciofrom agent_framework import Agent, toolfrom agent_framework.openai import OpenAIChatClientfrom agent_framework.security import SecureAgentConfig, IntegrityLabel, ConfidentialityLabel
@toolasync def fetch_news(query: str) -> str: """Fetch news headlines — untrusted external content.""" return f"[external] Top story about {query}: ..."
@toolasync def send_email(to: str, body: str) -> str: """Send an email — privileged, must not be reachable from untrusted data.""" return f"sent to {to}"
@toolasync def log_search(query: str) -> str: """Log the search query for auditing — allowed even in untrusted context.""" return f"logged: {query}"
# allow_untrusted_tools lets log_search run even when context is tainted.# approval_on_violation=True asks a human instead of hard-blocking on policy# violations (e.g. untrusted data attempting to call send_email).security = SecureAgentConfig( auto_hide_untrusted=True, # hide UNTRUSTED results from the model default_integrity=IntegrityLabel.UNTRUSTED, # all tool results default to untrusted default_confidentiality=ConfidentialityLabel.PUBLIC, allow_untrusted_tools={"log_search"}, # these tools may run in untrusted context block_on_violation=False, approval_on_violation=True, # route violation to human approval enable_audit_log=True, enable_policy_enforcement=True,)
# SecureAgentConfig is a ContextProvider — pass via context_providers=agent = Agent( client=OpenAIChatClient(), instructions=( "You are a research assistant. " "Do not send emails based on external news content." ), tools=[fetch_news, send_email, log_search], context_providers=[security],)
async def main() -> None: # A prompt injection attempt — external news content tries to trigger send_email. # The policy enforcer will either block or route the send_email call for approval. response = await agent.run( "Find news about AI and send a summary to boss@example.com." ) print(response.text)
if __name__ == "__main__": asyncio.run(main())Key points:
SecureAgentConfigautomatically injectsLabelTrackingFunctionMiddlewareand (whenenable_policy_enforcement=True)PolicyEnforcementFunctionMiddlewareas function middleware.- It also registers two built-in security tools:
quarantined_llm(runs a sub-prompt through an isolated model without privileged tool access) andinspect_variable(lets the agent examine labelled variables before acting). - This feature is
ExperimentalFeature.FIDES— the API is functional but may change between minor releases.
See the comprehensive guide’s SecureAgentConfig section for the full constructor reference table.
Recipe 25 — FunctionExecutor data pipeline
Section titled “Recipe 25 — FunctionExecutor data pipeline”Chain multiple @executor-decorated functions into a multi-step processing workflow using WorkflowBuilder. Sync functions run automatically in a thread pool via asyncio.to_thread.
import asynciofrom agent_framework import FunctionExecutor, WorkflowBuilder, WorkflowViz, executor, WorkflowContext
# Step 1 — normalise whitespace and capitalise@executor(id="normalise")async def normalise(text: str) -> None: return " ".join(text.split()).capitalize()
# Step 2 — split into sentences and emit each one downstream@executor(id="splitter")async def splitter(text: str, ctx: WorkflowContext[str]) -> None: for sentence in text.split("."): stripped = sentence.strip() if stripped: await ctx.send_message(stripped)
# Step 3 — count words per sentence (sync function runs in asyncio.to_thread automatically)def word_count(sentence: str) -> str: count = len(sentence.split()) return f"{count} word(s): {sentence}"
counter = FunctionExecutor(word_count, id="word_counter")
# Wire the three steps into a linear pipelineworkflow = ( WorkflowBuilder(start_executor=normalise, name="text-pipeline") .add_edge(normalise, splitter) .add_edge(splitter, counter) .build())
async def main() -> None: # Visualise the graph before running viz = WorkflowViz(workflow) print("--- Mermaid diagram ---") print(viz.to_mermaid()) print()
result = await workflow.run( "hello world. this is a test. agent framework is great." )
print("--- Outputs ---") for output in result.get_outputs(): print(output)
if __name__ == "__main__": asyncio.run(main())Expected output:
--- Mermaid diagram ---graph LR normalise --> splitter splitter --> word_counter
--- Outputs ---3 word(s): Hello world5 word(s): This is a test4 word(s): Agent framework is greatTip: Use
WorkflowViz(workflow).export(format="svg", filename="pipeline.svg")(requirespip install graphvizand thedotbinary) to produce a shareable diagram for documentation.
Recipe 26 — CompactionProvider + ToolResultCompactionStrategy + TokenBudgetComposedStrategy
Section titled “Recipe 26 — CompactionProvider + ToolResultCompactionStrategy + TokenBudgetComposedStrategy”Long-running agents accumulate tool chatter fast. This recipe uses the CompactionProvider context provider to apply a two-phase compaction pipeline:
- before: trim what the model sees each turn using a token-budget strategy that collapses old tool-call groups first, then shrinks the sliding window.
- after: compact the stored history so the next session starts leaner.
import asynciofrom agent_framework import ( Agent, CharacterEstimatorTokenizer, CompactionProvider, InMemoryHistoryProvider, SlidingWindowStrategy, ToolResultCompactionStrategy, TokenBudgetComposedStrategy,)from agent_framework.openai import OpenAIChatClient
async def main() -> None: client = OpenAIChatClient() tokenizer = CharacterEstimatorTokenizer()
history = InMemoryHistoryProvider()
# before_strategy: try collapsing old tool calls first; fall back to the # sliding window only if the token budget is still exceeded. before_strategy = TokenBudgetComposedStrategy( token_budget=6_000, tokenizer=tokenizer, strategies=[ ToolResultCompactionStrategy(keep_last_tool_call_groups=3), SlidingWindowStrategy(keep_last_groups=20), ], early_stop=True, )
# after_strategy: after each reply, collapse all but 1 tool-call group so # the persisted history stays compact for future sessions. after_strategy = ToolResultCompactionStrategy(keep_last_tool_call_groups=1)
compaction = CompactionProvider( before_strategy=before_strategy, after_strategy=after_strategy, tokenizer=tokenizer, history_source_id=history.source_id, # "in_memory" by default )
agent = Agent( client=client, instructions="You are a research agent that calls web search many times.", context_providers=[history, compaction], # history must come before compaction )
session = agent.create_session() for i in range(20): response = await agent.run( f"Turn {i}: search for Python async patterns and summarise.", session=session, ) print(f"[{i}] {response.text[:80]}...")
if __name__ == "__main__": asyncio.run(main())Key points:
CompactionProvidermust be listed after the history provider so it can access the stored messages insession.state.history_source_iddefaults to"in_memory"— change it if yourInMemoryHistoryProviderhas a customsource_id.TokenBudgetComposedStrategyruns its sub-strategies in order, stopping as soon astoken_budgetis satisfied. Place cheap strategies (no extra LLM calls) first.ToolResultCompactionStrategypreserves readability by replacing dropped tool-call groups with a one-line summary like[Tool results: web_search: 3 results].
Recipe 27 — Long-term memory with MemoryContextProvider (Experimental)
Section titled “Recipe 27 — Long-term memory with MemoryContextProvider (Experimental)”MemoryContextProvider gives an agent cross-session memory. After every turn it automatically extracts facts from the conversation into per-topic Markdown files, maintains a MEMORY.md index, and archives transcripts for semantic search. On subsequent turns the index is injected into context so the agent recalls earlier sessions without exceeding the context window.
import asynciofrom agent_framework import Agent, MemoryContextProvider, MemoryFileStorefrom agent_framework.openai import OpenAIChatClient
async def main() -> None: client = OpenAIChatClient()
# MemoryFileStore creates: # ./memory/<source_id>/<owner_id>/memory/ # MEMORY.md ← topic index injected every turn # topics/*.md ← one file per extracted topic # transcripts/*.jsonl ← full turn history store = MemoryFileStore( base_path="./memory", owner_state_key="user_id", # resolved from session.state["user_id"] )
provider = MemoryContextProvider( store=store, recent_turns=3, # also inject 3 most-recent transcript turns each call max_extractions=5, # extract up to 5 facts per turn into topic files )
agent = Agent( client=client, instructions=( "You are a personal assistant with long-term memory. " "Proactively recall relevant facts about the user from MEMORY.md." ), context_providers=[provider], )
# --- Session 1 --- session1 = agent.create_session(session_id="alice-jan") session1.state["user_id"] = "alice" # routes memory to ./memory/.../alice/
await agent.run("I'm Alice. I prefer Python for backends and React for frontends.", session=session1) await agent.run("My current project is a FastAPI service with a React dashboard.", session=session1)
print("Session 1 complete — facts extracted to disk.")
# --- Session 2 (simulates a new day / process restart) --- session2 = agent.create_session(session_id="alice-feb") session2.state["user_id"] = "alice" # same user_id → same memory root
r = await agent.run( "What stack should I use for a new backend microservice?", session=session2, ) # Agent recalls Alice's Python preference stored in the previous session. print(r.text)
# Inspect extracted topics from application code (list_topics is synchronous) topics = store.list_topics(session2, source_id="memory") print(f"\nExtracted {len(topics)} memory topic(s):") for t in topics: print(f" • {t.topic}: {t.summary}")
if __name__ == "__main__": asyncio.run(main())Key parameters:
| Parameter | Default | Purpose |
|---|---|---|
store | required | MemoryFileStore or custom MemoryStore subclass |
recent_turns | 0 | Inject N most-recent transcript turns alongside MEMORY.md |
max_extractions | 5 | Maximum facts extracted from a single turn |
selection_limit | 10 | Maximum topic files auto-loaded per turn |
consolidation_interval | 7 days | How often topic files are consolidated |
consolidation_client | None (uses agent client) | Cheaper model for the consolidation pass |
MemoryFileStore constructor reference:
MemoryFileStore( base_path="./memory", # root directory owner_state_key="user_id", # session.state key that holds the user/owner ID kind="memory", # bucket name (subdirectory) owner_prefix="", # optional prefix for owner directory names)The store enforces path isolation — owner IDs with .. or absolute path characters raise ValueError. Use store.list_topics(session, source_id=...), store.get_topic(...), and store.write_topic(...) from application code to read and seed memory without running the agent.
Quick reference
Section titled “Quick reference”| Need | Class / function |
|---|---|
| Build an agent | Agent(client=, instructions=, tools=, context_providers=, middleware=) |
| One-shot vs threaded | agent.run(prompt) vs agent.run(prompt, session=session) |
| Stream tokens | agent.run(prompt, stream=True) returns ResponseStream |
| Tool from a function | @tool from agent_framework |
| Sub-agent as tool | agent.as_tool(propagate_session=True) |
| Expose agent over MCP | agent.as_mcp_server(server_name=..., version=...) |
| Persistent sessions | FileHistoryProvider(storage_path=...) |
| Encrypted sessions | FileHistoryProvider(dumps=..., loads=...) |
| MCP local subprocess | MCPStdioTool(name=, command=, args=) |
| MCP remote HTTP | MCPStreamableHTTPTool(name=, url=, header_provider=) |
| Per-tool approval | approval_mode="always_require" or per-tool dict ({"always_require_approval": [...], "never_require_approval": [...]}) |
| Tool retries | FunctionMiddleware subclass, await call_next() in a loop |
| Run-level guards | AgentMiddleware subclass, raise MiddlewareTermination(result=...) |
| Workflow checkpoints | FileCheckpointStorage(storage_path=...) + WorkflowBuilder(checkpoint_storage=) |
| Skills | SkillsProvider(my_skill) or SkillsProvider([skill_a, skill_b]) — takes a positional source (single Skill, list of Skill, or a SkillsSource) |
| Eval gates | LocalEvaluator(*checks) + evaluate_agent(...) |
| Agent todo list (exp.) | TodoProvider(store=TodoFileStore(...)) |
| Cross-session memory (exp.) | MemoryContextProvider(store=MemoryFileStore(..., owner_state_key=...)) |
| Compact context via provider | CompactionProvider(before_strategy=..., after_strategy=..., history_source_id=...) |
| Drop old tool-call groups | SelectiveToolCallCompactionStrategy(keep_last_tool_call_groups=N) |
| Summarise old tool results | ToolResultCompactionStrategy(keep_last_tool_call_groups=N) |
| Token-budget composition | TokenBudgetComposedStrategy(token_budget=N, tokenizer=..., strategies=[...]) |
| Plan / execute modes (exp.) | AgentModeProvider(mode_descriptions={...}) |
| Prompt injection defense (exp.) | SecureAgentConfig(...) from agent_framework.security |
| Function-based workflow nodes | @executor / FunctionExecutor(func, id=) |
| Visualise a workflow | WorkflowViz(workflow).to_mermaid() / .to_digraph() / .export(format="svg") |
| In-memory skill source | InMemorySkillsSource([skill_a, skill_b]) |
| Custom composable skill source | Subclass DelegatingSkillsSource(inner_source) |
| WebSocket MCP server | MCPWebsocketTool(name=, url="wss://...", request_timeout=) |
| Intercept agent.run() | Subclass AgentMiddleware, override async process(context, call_next) |
| Intercept LLM calls | Subclass ChatMiddleware, override async process(context, call_next) |
| Intercept tool calls | Subclass FunctionMiddleware, override async process(context, call_next) |
| Early-exit from middleware | raise MiddlewareTermination(result=AgentResponse(...)) |
For deep dives see the framework’s other Python guides: tools, sessions, middleware, MCP, skills, evaluation, checkpointing, and orchestration.