Skip to content

Microsoft Agent Framework Python - Recipes and Code Patterns

Microsoft Agent Framework Python — Recipes and Code Patterns

Section titled “Microsoft Agent Framework Python — Recipes and Code Patterns”

Practical, runnable patterns for the real agent_framework package. Every recipe targets the public surface of the latest release; nothing here relies on private modules.

  • Package: agent-framework (the umbrella distribution that pulls in agent-framework-core plus every official provider). Imports root at agent_framework.
  • Pinned version: agent-framework==1.2.2. Check the latest with pip index versions agent-framework.
  • Python: 3.10+ — the entire package uses from __future__ import annotations, the | union syntax, and modern asyncio.
  • Verified APIs: Agent, AgentSession, FileHistoryProvider, FileCheckpointStorage, MCPStdioTool, MCPStreamableHTTPTool, AgentMiddleware, FunctionMiddleware, SkillsProvider, Skill, LocalEvaluator, WorkflowBuilder.
Terminal window
pip install agent-framework
# Or pin explicitly:
pip install 'agent-framework==1.2.2'

Recipe 1 — Hello agent (single-turn, multi-turn)

Section titled “Recipe 1 — Hello agent (single-turn, multi-turn)”

The minimal agent. The same Agent class handles both stateless one-shot calls and stateful conversations — the difference is whether you pass a session=.

hello_agent.py
import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(), # reads OPENAI_API_KEY
instructions="You are a friendly assistant. Keep responses concise.",
)
# Stateless — no history.
one_shot = await agent.run("In one sentence: what is asyncio?")
print(one_shot.text)
# Stateful — pass a session to keep the thread alive across turns.
session = agent.create_session()
await agent.run("My favourite colour is teal.", session=session)
follow_up = await agent.run("What did I just tell you?", session=session)
print(follow_up.text) # mentions teal
if __name__ == "__main__":
asyncio.run(main())

Swap OpenAIChatClient for AzureOpenAIChatClient, FoundryChatClient, AnthropicClient, OllamaChatClient, or BedrockChatClient to switch providers — every chat client implements the same SupportsChatGetResponse protocol so the rest of your code is identical.

agent.run(stream=True) returns a ResponseStream[AgentResponseUpdate, AgentResponse]. There is no agent.run_stream(...).

stream_agent.py
import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(client=OpenAIChatClient(), instructions="You are a tutor.")
stream = agent.run("Explain backpressure in 3 paragraphs.", stream=True)
async for update in stream:
if update.text:
print(update.text, end="", flush=True)
print()
# The assembled final response — finish_reason, full text, tool calls.
final = await stream.get_final_response()
print(f"\nfinish_reason={final.finish_reasons}")
if __name__ == "__main__":
asyncio.run(main())

stream.get_final_response() waits until consumption finishes and returns the joined AgentResponse. For HITL approvals mid-stream use await stream.send_response(approval) instead of starting a new run.

Recipe 3 — A tool with typed arguments and error handling

Section titled “Recipe 3 — A tool with typed arguments and error handling”

@tool builds a JSON schema from your signature. Raise on bad inputs — the exception message becomes the model’s tool-call result so it can recover.

typed_tool.py
import asyncio
from typing import Annotated
from agent_framework import Agent, tool
from agent_framework.openai import OpenAIChatClient
@tool(description="Divide two numbers.")
def divide(
numerator: Annotated[float, "The dividend"],
denominator: Annotated[float, "The divisor — must not be zero"],
) -> str:
if denominator == 0:
raise ValueError("denominator must be non-zero")
return f"{numerator / denominator:.4f}"
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a math assistant. Use tools when arithmetic is needed.",
tools=[divide],
)
print((await agent.run("What is 22 over 7?")).text)
print((await agent.run("What is 10 divided by 0?")).text)
if __name__ == "__main__":
asyncio.run(main())

The model sees the Annotated strings as parameter descriptions in the JSON schema. For richer constraints use pydantic.Field(description=..., ge=..., le=...) instead — the decorator picks up either form.

Recipe 4 — Structured output with Pydantic

Section titled “Recipe 4 — Structured output with Pydantic”

Force a schema-conforming response. response.value is the validated Pydantic instance.

structured.py
import asyncio
from pydantic import BaseModel, Field
from agent_framework import Agent, ChatOptions
from agent_framework.openai import OpenAIChatClient
class Profile(BaseModel):
name: str = Field(description="Full name")
age: int = Field(ge=0, le=130)
interests: list[str]
async def main() -> None:
agent = Agent(client=OpenAIChatClient(), instructions="Extract structured data.")
response = await agent.run(
"I'm Jane Doe, 28, and I love hiking and Python.",
options=ChatOptions(response_format=Profile),
)
profile: Profile = response.value
print(profile.model_dump_json(indent=2))
if __name__ == "__main__":
asyncio.run(main())

Pass response_format= via ChatOptions (or set it as a default on the chat client). For streaming structured output, the same option works against agent.run(..., stream=True) — the framework lazily validates final.value on the final update.


Default sessions live in memory. FileHistoryProvider writes one JSONL file per session_id and survives restarts.

persistent_sessions.py
import asyncio
from agent_framework import Agent, FileHistoryProvider
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
history = FileHistoryProvider(
storage_path="./conversations", # directory created on demand
skip_excluded=True, # honour CompactionProvider exclusions
)
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a customer-service agent.",
context_providers=[history],
)
# Stable id — same conversation across process restarts.
session = agent.create_session(session_id="customer-9281")
await agent.run("My order #4421 hasn't arrived.", session=session)
# Run later (different process) — history is reloaded from JSONL.
follow_up = await agent.run("Any update on that?", session=session)
print(follow_up.text)
if __name__ == "__main__":
asyncio.run(main())

FileHistoryProvider rejects session_ids that escape the storage root (../etc/passwd and Windows reserved stems are rewritten or rejected). For multi-host deployments use the Redis or Cosmos DB providers — the JSONL implementation only locks within a single process.

Recipe 6 — Encrypt session history at rest

Section titled “Recipe 6 — Encrypt session history at rest”

Inject your own JSON serialisers via dumps= / loads= to add envelope encryption. The provider validates that the output is a single line — Fernet tokens already are.

encrypted_sessions.py
import json
import os
from cryptography.fernet import Fernet
from agent_framework import Agent, FileHistoryProvider
from agent_framework.openai import OpenAIChatClient
cipher = Fernet(os.environ["AGENT_HISTORY_FERNET_KEY"])
def encrypt_dumps(payload: dict) -> str:
plaintext = json.dumps(payload, ensure_ascii=False).encode("utf-8")
return cipher.encrypt(plaintext).decode("ascii")
def decrypt_loads(line: str | bytes) -> dict:
if isinstance(line, str):
line = line.encode("ascii")
return json.loads(cipher.decrypt(line).decode("utf-8"))
history = FileHistoryProvider(
storage_path="./encrypted-conversations",
dumps=encrypt_dumps,
loads=decrypt_loads,
)
agent = Agent(client=OpenAIChatClient(), context_providers=[history])

Test with decrypt_loads(encrypt_dumps(x)) == x for a representative payload before deploying — both callables must round-trip cleanly.

Convert an agent into a tool that other agents can delegate to. propagate_session=True shares the parent’s session so the sub-agent sees the same conversation history.

delegating_agents.py
import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
client = OpenAIChatClient()
researcher = Agent(
client=client,
name="researcher",
description="Performs deep research on technical topics.",
instructions="Produce thorough, citation-backed research on the requested topic.",
)
summariser = Agent(
client=client,
name="summariser",
description="Condenses long passages into bullet points.",
instructions="Reduce input to <= 5 bullet points.",
)
coordinator = Agent(
client=client,
instructions=(
"You are a project coordinator. Use the `researcher` tool to gather "
"information, then the `summariser` tool to produce final bullets."
),
tools=[
researcher.as_tool(),
summariser.as_tool(propagate_session=True), # share the session
],
)
response = await coordinator.run("Brief me on quantum-resistant cryptography.")
print(response.text)
if __name__ == "__main__":
asyncio.run(main())

Three knobs on as_tool() worth knowing:

  • name= / description= override the auto-generated tool name and description.
  • approval_mode="always_require" puts a HITL gate on the delegation itself — useful when a sub-agent has access to expensive or sensitive tools.
  • stream_callback= surfaces the sub-agent’s streaming updates to your UI as it works.

Recipe 8 — Router pattern (classify + dispatch)

Section titled “Recipe 8 — Router pattern (classify + dispatch)”

Pure-Python routing using a stateless classifier agent and per-domain specialists.

router_agent.py
import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
class SupportRouter:
def __init__(self, client: OpenAIChatClient) -> None:
self.router = Agent(
client=client,
instructions=(
"Classify the user query as one of: BILLING, TECHNICAL, ACCOUNT. "
"Reply with exactly that single word."
),
)
self.specialists = {
"BILLING": Agent(client=client, instructions="You are a billing specialist."),
"TECHNICAL": Agent(client=client, instructions="You are a tech-support engineer."),
"ACCOUNT": Agent(client=client, instructions="You are an account-management agent."),
}
async def handle(self, query: str) -> str:
route = (await self.router.run(query)).text.strip().upper()
agent = self.specialists.get(route, self.specialists["TECHNICAL"])
return (await agent.run(query)).text
async def main() -> None:
router = SupportRouter(OpenAIChatClient())
print(await router.handle("My invoice for April is wrong."))
if __name__ == "__main__":
asyncio.run(main())

For a graph-based variant with checkpointing, fan-in / fan-out edges, and event streaming, swap the dispatcher for WorkflowBuilder (Recipe 13).

LocalEvaluator is dependency-free — every check is a Python function. Combine the built-in checks (keyword_check, tool_called_check, tool_calls_present, tool_call_args_match) with your own @evaluator callables.

eval_smoke.py
import asyncio
from agent_framework import (
Agent,
LocalEvaluator,
evaluate_agent,
keyword_check,
tool,
tool_called_check,
evaluator,
CheckResult,
)
from agent_framework.openai import OpenAIChatClient
@tool
def get_weather(location: str) -> str:
"""Return the current weather for a city."""
return f"{location}: 22°C, clear"
@evaluator
def mentions_celsius(response: str) -> CheckResult:
return CheckResult(
passed="°C" in response,
reason="celsius unit present" if "°C" in response else "missing °C",
check_name="mentions_celsius",
)
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a weather assistant; always cite °C.",
tools=[get_weather],
)
[results] = await evaluate_agent(
agent=agent,
queries=["What's the weather in Amsterdam?", "Is it warm in Cairo?"],
evaluators=LocalEvaluator(
keyword_check("weather"),
tool_called_check("get_weather"),
mentions_celsius,
),
)
counts = results.result_counts
print(f"passed={counts['passed']} failed={counts['failed']}")
assert counts["failed"] == 0, results.error
if __name__ == "__main__":
asyncio.run(main())

For LLM-as-judge or weighted scoring patterns see the evaluation guide.

Recipe 10 — Skills for progressive-disclosure knowledge

Section titled “Recipe 10 — Skills for progressive-disclosure knowledge”

A Skill is advertised by name + description. Only when the model decides to use it is the full body or a resource fetched — keeping context lean for rarely-used domain knowledge.

pricing_skill.py
import asyncio
from agent_framework import Agent, Skill, SkillsProvider
from agent_framework.openai import OpenAIChatClient
pricing = Skill(
name="tenant-pricing",
description="Look up the per-SKU pricing matrix for the active tenant.",
content=(
"Use `read_skill_resource('tenant-pricing', 'matrix')` to fetch the current "
"matrix. Quote prices in USD unless the user specifies otherwise."
),
)
@pricing.resource
async def matrix(**kwargs) -> str:
"""Pricing matrix for the active tenant.
The agent invokes this with no arguments; we receive runtime context via
**kwargs forwarded from agent.run(..., function_invocation_kwargs=...).
"""
tenant_id = kwargs.get("tenant_id", "default")
# Replace with a real DB / API call.
return f"tenant={tenant_id}\npro: $99/mo\nteam: $299/mo\nenterprise: contact sales"
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You answer pricing questions for the active tenant.",
context_providers=[SkillsProvider(skills=[pricing])],
)
response = await agent.run(
"What does the team plan cost for ACME?",
function_invocation_kwargs={"tenant_id": "acme"},
)
print(response.text)
if __name__ == "__main__":
asyncio.run(main())

**kwargs in the resource signature opts the function in to runtime data the model never sees (tenant id, request id, user id). Without **kwargs the framework calls the resource with no args. See the skills guide for file-based skills, executable scripts, and approval gates.


Recipe 11 — MCP stdio + HTTP composition

Section titled “Recipe 11 — MCP stdio + HTTP composition”

Combine a local MCP server (filesystem) with a remote one (Microsoft Learn). Each tool’s exposed functions are namespaced by its name=.

multi_mcp.py
import asyncio
from agent_framework import Agent, MCPStdioTool, MCPStreamableHTTPTool
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
async with (
MCPStdioTool(
name="fs",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
description="Local filesystem under /tmp",
) as fs,
MCPStreamableHTTPTool(
name="learn",
url="https://learn.microsoft.com/api/mcp",
description="Microsoft Learn documentation search",
request_timeout=30,
) as learn,
):
agent = Agent(
client=OpenAIChatClient(),
instructions=(
"You can read and write files in /tmp via `fs` and search Microsoft "
"Learn via `learn`. Cite documentation URLs when relevant."
),
tools=[fs, learn],
)
response = await agent.run(
"List files in /tmp and find the latest azure-identity guidance."
)
print(response.text)
if __name__ == "__main__":
asyncio.run(main())

MCPStdioTool spawns a subprocess; async with cleans it up deterministically. For long-running servers, manage connect() / close() explicitly so the same MCP session survives across many agent runs (see the MCP guide).

Recipe 12 — Per-tenant MCP via header_provider

Section titled “Recipe 12 — Per-tenant MCP via header_provider”

Forward auth tokens to a remote MCP server without building a fresh httpx.AsyncClient per tenant. The header_provider callable receives function_invocation_kwargs from the outer agent.run(...).

tenant_mcp.py
import asyncio
from agent_framework import Agent, MCPStreamableHTTPTool
from agent_framework.openai import OpenAIChatClient
# Per-tool approval policy: write ops require approval, reads don't.
# (`MCPSpecificApproval` is a TypedDict in agent_framework._mcp; the dict
# below is structurally typed so the import isn't required at runtime.)
billing_approval = {
"always_require_approval": ["billing.refund", "billing.void"],
"never_require_approval": ["billing.list_invoices", "billing.get_invoice"],
}
async def serve_tenant(tenant_token: str, query: str) -> str:
mcp = MCPStreamableHTTPTool(
name="billing",
url="https://mcp.example.com",
approval_mode=billing_approval,
header_provider=lambda kwargs: {
"Authorization": f"Bearer {kwargs['token']}",
"X-Tenant-Id": kwargs.get("tenant_id", "default"),
},
)
async with mcp:
agent = Agent(client=OpenAIChatClient(), tools=mcp)
response = await agent.run(
query,
function_invocation_kwargs={"token": tenant_token, "tenant_id": "acme"},
)
return response.text
asyncio.run(serve_tenant("tk_live_abc", "List my open invoices."))

MCPSpecificApproval is a typed dict — IDEs autocomplete the two valid keys. Tools listed in both lists require approval (the safe default).

Long-running multi-step pipelines benefit from FileCheckpointStorage: every superstep is durable, and a process restart can resume from the latest checkpoint.

checkpointed_workflow.py
import asyncio
from agent_framework import Agent, FileCheckpointStorage, WorkflowBuilder
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
client = OpenAIChatClient()
researcher = Agent(client=client, name="researcher",
instructions="Produce 5 bullet research findings.")
analyst = Agent(client=client, name="analyst",
instructions="Identify the most important finding.")
writer = Agent(client=client, name="writer",
instructions="Turn the chosen finding into a 3-paragraph brief.")
storage = FileCheckpointStorage(
storage_path="./checkpoints",
# Allow your domain types if you persist them in workflow state.
# allowed_checkpoint_types=["my_app.models:ResearchState"],
)
workflow = (
WorkflowBuilder(
start_executor=researcher,
checkpoint_storage=storage,
name="research-brief",
max_iterations=10,
)
.add_edge(researcher, analyst)
.add_edge(analyst, writer)
.build()
)
result = await workflow.run("Topic: post-quantum cryptography migration")
print(result.get_outputs()[-1])
# Inspect what was saved.
latest = await storage.get_latest(workflow_name="research-brief")
if latest is not None:
print(f"latest checkpoint: id={latest.checkpoint_id} step={latest.iteration_count}")
if __name__ == "__main__":
asyncio.run(main())

Checkpoints are keyed on the workflow definition, not the instance — any process running the same WorkflowBuilder output can resume. For Cosmos DB, install agent-framework-azure-cosmos; for Redis, agent-framework-redis.

Recipe 14 — Function middleware: cache + retry

Section titled “Recipe 14 — Function middleware: cache + retry”

Stack two FunctionMiddleware instances. The cache short-circuits repeat calls; the retry handles transient failures.

function_middleware.py
import asyncio
import json
from typing import Any
from agent_framework import (
Agent,
FunctionMiddleware,
FunctionInvocationContext,
MiddlewareTermination,
tool,
)
from agent_framework.openai import OpenAIChatClient
class IdempotentCache(FunctionMiddleware):
"""Memoise calls to tools tagged kind='readonly'."""
def __init__(self) -> None:
self._cache: dict[str, Any] = {}
@staticmethod
def _key(name: str, arguments: dict[str, Any]) -> str:
return f"{name}::{json.dumps(arguments, sort_keys=True, default=str)}"
async def process(self, context: FunctionInvocationContext, call_next) -> None:
if context.function.kind != "readonly":
await call_next()
return
key = self._key(context.function.name, dict(context.arguments or {}))
if key in self._cache:
raise MiddlewareTermination("cache hit", result=self._cache[key])
await call_next()
if context.result is not None:
self._cache[key] = context.result
class RetryOnError(FunctionMiddleware):
"""Exponential backoff for any exception raised by the wrapped tool."""
def __init__(self, attempts: int = 3, backoff: float = 0.5) -> None:
self.attempts = attempts
self.backoff = backoff
async def process(self, context: FunctionInvocationContext, call_next) -> None:
last_exc: Exception | None = None
for i in range(self.attempts):
try:
await call_next()
return
except Exception as exc:
last_exc = exc
await asyncio.sleep(self.backoff * (2 ** i))
assert last_exc is not None
raise last_exc
@tool(kind="readonly")
def lookup_sku(sku: str) -> str:
"""Return the canonical name for a SKU."""
# Replace with a real DB / HTTP call.
return f"SKU {sku}: Wireless Charger"
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="Answer using the lookup_sku tool when needed.",
tools=[lookup_sku],
middleware=[IdempotentCache(), RetryOnError()],
)
print((await agent.run("What is SKU WCH-42?")).text)
print((await agent.run("And what is SKU WCH-42 again?")).text) # cache hit
if __name__ == "__main__":
asyncio.run(main())

Outer-to-inner: IdempotentCache wraps RetryOnError wraps the tool. The cache short-circuits via MiddlewareTermination so the retry layer doesn’t even fire on a hit.

Recipe 15 — Agent middleware: budgets and policy gates

Section titled “Recipe 15 — Agent middleware: budgets and policy gates”

AgentMiddleware wraps the entire agent.run(...) call. Use it for global concerns: budgets, profanity filters, system-prompt injection, run-level retries.

agent_middleware.py
import time
from agent_framework import (
Agent,
AgentContext,
AgentMiddleware,
AgentResponse,
Content,
Message,
MiddlewareTermination,
)
from agent_framework.openai import OpenAIChatClient
class BudgetGuard(AgentMiddleware):
"""Reject runs once a tenant has burned through its quota."""
def __init__(self, quotas: dict[str, int]) -> None:
self.quotas = dict(quotas)
async def process(self, context: AgentContext, call_next) -> None:
tenant = context.kwargs.get("tenant_id", "default")
if self.quotas.get(tenant, 0) <= 0:
raise MiddlewareTermination(
f"tenant={tenant} over quota",
result=AgentResponse(
messages=[Message(role="assistant",
contents=[Content.from_text("Quota exhausted.")])],
),
)
self.quotas[tenant] -= 1
await call_next()
class LatencyTimer(AgentMiddleware):
"""Stamp wall-clock latency onto context.metadata so downstream code can read it."""
async def process(self, context: AgentContext, call_next) -> None:
t0 = time.monotonic()
await call_next()
context.metadata["wall_ms"] = (time.monotonic() - t0) * 1000
agent = Agent(
client=OpenAIChatClient(),
instructions="",
middleware=[
LatencyTimer(),
BudgetGuard({"acme": 100, "globex": 50}),
],
)

MiddlewareTermination(result=...) lets you return a fully-formed synthetic response without invoking the model. See the middleware guide for streaming hooks, ChatMiddleware, and observability spans.

agent.as_mcp_server() returns a low-level mcp.server.lowlevel.Server that hosts the agent as a single tool. Drop it into stdio for inter-process composition or hook it up to the streamable-HTTP host of your choice.

mcp_server.py
import anyio
from mcp.server.stdio import stdio_server
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
name="docs_agent",
description="Answers questions about our internal documentation.",
instructions="Only answer using the indexed docs.",
)
server = agent.as_mcp_server(
server_name="docs-mcp",
version="1.0.0",
instructions="Use docs_agent for any question about internal docs.",
)
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
anyio.run(main)

Other agents (or Claude Desktop, an LLM IDE, …) can now consume your agent via MCPStdioTool(name="docs", command="python", args=["mcp_server.py"]). Image and audio outputs are dropped with a warning — only text content is forwarded over the MCP server path.


Recipe 17 — FastAPI service (modern lifespan)

Section titled “Recipe 17 — FastAPI service (modern lifespan)”

Wire an agent into a FastAPI app using the modern lifespan context manager (@app.on_event("startup") is deprecated in FastAPI 0.93+).

api.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agent_framework import Agent, AgentSession, FileHistoryProvider
from agent_framework.openai import OpenAIChatClient
class State:
agent: Agent
history: FileHistoryProvider
state = State()
@asynccontextmanager
async def lifespan(app: FastAPI):
state.history = FileHistoryProvider(storage_path="./conversations")
state.agent = Agent(
client=OpenAIChatClient(),
instructions="You are a helpful API assistant.",
context_providers=[state.history],
)
yield
# Add explicit cleanup here if you need it (close clients, flush metrics).
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel):
message: str
session_id: str
@app.post("/chat")
async def chat(payload: ChatRequest) -> dict[str, str]:
if not payload.message.strip():
raise HTTPException(status_code=400, detail="empty message")
session = state.agent.create_session(session_id=payload.session_id)
response = await state.agent.run(payload.message, session=session)
return {"reply": response.text}
# Run with: uvicorn api:app --reload

Authorise the session_id against the authenticated user before passing it to create_session()FileHistoryProvider only protects you from filesystem traversal, not authorisation.

Recipe 18 — Azure Functions queue trigger

Section titled “Recipe 18 — Azure Functions queue trigger”

Reuse the agent across invocations (warm-start) so Functions container reuse pays off.

function_app.py
import logging
import azure.functions as func
from agent_framework import Agent
from agent_framework.foundry import FoundryChatClient
app = func.FunctionApp()
# Module-level construction → warm-start reuse across invocations.
_client = FoundryChatClient()
_agent = Agent(
client=_client,
instructions="You process queued tasks. Reply with the action plan only.",
)
@app.queue_trigger(arg_name="msg", queue_name="agent-tasks", connection="AzureWebJobsStorage")
async def process(msg: func.QueueMessage) -> None:
body = msg.get_body().decode("utf-8")
logging.info("received task: %s", body)
response = await _agent.run(body)
logging.info("plan: %s", response.text)

For long-running jobs that should survive a redeploy, persist intermediate state via FileCheckpointStorage (Recipe 13) on a mounted Azure Files share, or move to a queue-driven Workflow with Cosmos checkpointing.

Recipe 19 — Server-Sent Events streaming endpoint

Section titled “Recipe 19 — Server-Sent Events streaming endpoint”

Stream the agent’s tokens to a browser via SSE. Reuse the ResponseStream directly — no extra buffering needed.

sse_api.py
import json
from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
class State:
agent: Agent
state = State()
@asynccontextmanager
async def lifespan(app: FastAPI):
state.agent = Agent(client=OpenAIChatClient(), instructions="Be concise.")
yield
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel):
message: str
async def sse(stream) -> AsyncIterator[bytes]:
async for update in stream:
if update.text:
payload = json.dumps({"text": update.text}).encode()
yield b"data: " + payload + b"\n\n"
final = await stream.get_final_response()
yield b"data: " + json.dumps({"done": True, "finish_reasons": list(final.finish_reasons or [])}).encode() + b"\n\n"
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest) -> StreamingResponse:
stream = state.agent.run(req.message, stream=True)
return StreamingResponse(sse(stream), media_type="text/event-stream")

stream.get_final_response() after iteration completes is the canonical place to access the joined response, finish reasons, and any HITL approval requests. For approval mid-stream, drop into await stream.send_response(...) before the loop returns.


Recipe 20 — Debug streaming with tool-call visibility

Section titled “Recipe 20 — Debug streaming with tool-call visibility”

Inspect tool calls inline as the model emits them — useful when an agent picks the wrong tool or emits empty arguments.

debug_stream.py
import asyncio
from agent_framework import Agent, tool
from agent_framework.openai import OpenAIChatClient
@tool
def get_time() -> str:
"""Return the current UTC time."""
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="Use get_time when asked about time.",
tools=[get_time],
)
stream = agent.run("What's the current UTC time? Then tell me a joke.", stream=True)
async for update in stream:
# Text chunks
if update.text:
print(f"\033[92m{update.text}\033[0m", end="", flush=True)
# Tool calls and results live inside `update.contents` as Content items.
for content in update.contents or []:
ctype = getattr(content, "type", None)
if ctype == "function_call":
print(f"\n\033[90m[call {content.name}({content.arguments})]\033[0m")
elif ctype == "function_result":
print(f"\n\033[90m[result {content.call_id}: {content.result}]\033[0m")
print()
final = await stream.get_final_response()
print(f"\nfinish_reasons={final.finish_reasons}")
if __name__ == "__main__":
asyncio.run(main())

Each AgentResponseUpdate carries contents — a list of unified Content items. Filter by type to pick out function calls, function results, text, images, and HITL requests. There’s no separate tool_calls attribute on the update.

Recipe 21 — Replay a session blob across processes

Section titled “Recipe 21 — Replay a session blob across processes”

Stateless workers (Lambda, Cloud Run) can round-trip the entire session through to_dict() / from_dict(). The blob carries every message plus any custom state you stored.

session_replay.py
import asyncio
import json
from agent_framework import Agent, AgentSession, InMemoryHistoryProvider
from agent_framework.openai import OpenAIChatClient
async def turn(blob: str | None, message: str) -> tuple[str, str]:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a customer-service agent.",
context_providers=[InMemoryHistoryProvider()],
)
session = AgentSession.from_dict(json.loads(blob)) if blob else AgentSession()
response = await agent.run(message, session=session)
return response.text, json.dumps(session.to_dict())
async def main() -> None:
reply, blob = await turn(None, "My order #4421 is missing.")
print(reply)
# A different worker process picks up the same conversation:
reply, blob = await turn(blob, "What did I just ask about?")
print(reply)
if __name__ == "__main__":
asyncio.run(main())

For custom types in session.state, register them once at module import via register_state_type(...) so cold-starts work even before the model is serialised. See the sessions guide for the full contract.


NeedClass / function
Build an agentAgent(client=, instructions=, tools=, context_providers=, middleware=)
One-shot vs threadedagent.run(prompt) vs agent.run(prompt, session=session)
Stream tokensagent.run(prompt, stream=True) returns ResponseStream
Tool from a function@tool from agent_framework
Sub-agent as toolagent.as_tool(propagate_session=True)
Expose agent over MCPagent.as_mcp_server(server_name=..., version=...)
Persistent sessionsFileHistoryProvider(storage_path=...)
Encrypted sessionsFileHistoryProvider(dumps=..., loads=...)
MCP local subprocessMCPStdioTool(name=, command=, args=)
MCP remote HTTPMCPStreamableHTTPTool(name=, url=, header_provider=)
Per-tool approvalapproval_mode="always_require" or per-tool dict ({"always_require_approval": [...], "never_require_approval": [...]})
Tool retriesFunctionMiddleware subclass, await call_next() in a loop
Run-level guardsAgentMiddleware subclass, raise MiddlewareTermination(result=...)
Workflow checkpointsFileCheckpointStorage(storage_path=...) + WorkflowBuilder(checkpoint_storage=)
SkillsSkill(...) + SkillsProvider(skills=[...])
Eval gatesLocalEvaluator(*checks) + evaluate_agent(...)

For deep dives see the framework’s other Python guides: tools, sessions, middleware, MCP, skills, evaluation, checkpointing, and orchestration.