Skip to content

Microsoft Agent Framework Python - Recipes and Code Patterns

Microsoft Agent Framework Python — Recipes and Code Patterns

Section titled “Microsoft Agent Framework Python — Recipes and Code Patterns”

Practical, runnable patterns for the real agent_framework package. Every recipe targets the public surface of the latest release; nothing here relies on private modules.

  • Package: agent-framework (the umbrella distribution that pulls in agent-framework-core plus every official provider). Imports root at agent_framework.
  • Pinned version: agent-framework==1.6.0. Check the latest with pip index versions agent-framework.
  • Python: 3.10+ — the entire package uses from __future__ import annotations, the | union syntax, and modern asyncio.
  • Verified APIs: Agent, RawAgent, AgentSession, FileHistoryProvider, FileCheckpointStorage, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool, AgentMiddleware, ChatMiddleware, FunctionMiddleware, SkillsProvider, InlineSkill, ClassSkill, FileSkillsSource, LocalEvaluator, WorkflowBuilder, CompactionProvider, ToolResultCompactionStrategy, SelectiveToolCallCompactionStrategy, TokenBudgetComposedStrategy, TodoProvider, TodoFileStore, MemoryContextProvider, MemoryFileStore.
Terminal window
pip install agent-framework
# Or pin explicitly:
pip install 'agent-framework==1.6.0'

Recipe 1 — Hello agent (single-turn, multi-turn)

Section titled “Recipe 1 — Hello agent (single-turn, multi-turn)”

The minimal agent. The same Agent class handles both stateless one-shot calls and stateful conversations — the difference is whether you pass a session=.

hello_agent.py
import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(), # reads OPENAI_API_KEY
instructions="You are a friendly assistant. Keep responses concise.",
)
# Stateless — no history.
one_shot = await agent.run("In one sentence: what is asyncio?")
print(one_shot.text)
# Stateful — pass a session to keep the thread alive across turns.
session = agent.create_session()
await agent.run("My favourite colour is teal.", session=session)
follow_up = await agent.run("What did I just tell you?", session=session)
print(follow_up.text) # mentions teal
if __name__ == "__main__":
asyncio.run(main())

Swap OpenAIChatClient for AzureOpenAIChatClient, FoundryChatClient, AnthropicClient, OllamaChatClient, or BedrockChatClient to switch providers — every chat client implements the same SupportsChatGetResponse protocol so the rest of your code is identical.

agent.run(stream=True) returns a ResponseStream[AgentResponseUpdate, AgentResponse]. There is no agent.run_stream(...).

stream_agent.py
import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(client=OpenAIChatClient(), instructions="You are a tutor.")
stream = agent.run("Explain backpressure in 3 paragraphs.", stream=True)
async for update in stream:
if update.text:
print(update.text, end="", flush=True)
print()
# The assembled final response — finish_reason, full text, tool calls.
final = await stream.get_final_response()
print(f"\nfinish_reason={final.finish_reasons}")
if __name__ == "__main__":
asyncio.run(main())

stream.get_final_response() waits until consumption finishes and returns the joined AgentResponse. For HITL approvals mid-stream use await stream.send_response(approval) instead of starting a new run.

Recipe 3 — A tool with typed arguments and error handling

Section titled “Recipe 3 — A tool with typed arguments and error handling”

@tool builds a JSON schema from your signature. Raise on bad inputs — the exception message becomes the model’s tool-call result so it can recover.

typed_tool.py
import asyncio
from typing import Annotated
from agent_framework import Agent, tool
from agent_framework.openai import OpenAIChatClient
@tool(description="Divide two numbers.")
def divide(
numerator: Annotated[float, "The dividend"],
denominator: Annotated[float, "The divisor — must not be zero"],
) -> str:
if denominator == 0:
raise ValueError("denominator must be non-zero")
return f"{numerator / denominator:.4f}"
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a math assistant. Use tools when arithmetic is needed.",
tools=[divide],
)
print((await agent.run("What is 22 over 7?")).text)
print((await agent.run("What is 10 divided by 0?")).text)
if __name__ == "__main__":
asyncio.run(main())

The model sees the Annotated strings as parameter descriptions in the JSON schema. For richer constraints use pydantic.Field(description=..., ge=..., le=...) instead — the decorator picks up either form.

Recipe 4 — Structured output with Pydantic

Section titled “Recipe 4 — Structured output with Pydantic”

Force a schema-conforming response. response.value is the validated Pydantic instance.

structured.py
import asyncio
from pydantic import BaseModel, Field
from agent_framework import Agent, ChatOptions
from agent_framework.openai import OpenAIChatClient
class Profile(BaseModel):
name: str = Field(description="Full name")
age: int = Field(ge=0, le=130)
interests: list[str]
async def main() -> None:
agent = Agent(client=OpenAIChatClient(), instructions="Extract structured data.")
response = await agent.run(
"I'm Jane Doe, 28, and I love hiking and Python.",
options=ChatOptions(response_format=Profile),
)
profile: Profile = response.value
print(profile.model_dump_json(indent=2))
if __name__ == "__main__":
asyncio.run(main())

Pass response_format= via ChatOptions (or set it as a default on the chat client). For streaming structured output, the same option works against agent.run(..., stream=True) — the framework lazily validates final.value on the final update.


Default sessions live in memory. FileHistoryProvider writes one JSONL file per session_id and survives restarts.

persistent_sessions.py
import asyncio
from agent_framework import Agent, FileHistoryProvider
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
history = FileHistoryProvider(
storage_path="./conversations", # directory created on demand
skip_excluded=True, # honour CompactionProvider exclusions
)
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a customer-service agent.",
context_providers=[history],
)
# Stable id — same conversation across process restarts.
session = agent.create_session(session_id="customer-9281")
await agent.run("My order #4421 hasn't arrived.", session=session)
# Run later (different process) — history is reloaded from JSONL.
follow_up = await agent.run("Any update on that?", session=session)
print(follow_up.text)
if __name__ == "__main__":
asyncio.run(main())

FileHistoryProvider rejects session_ids that escape the storage root (../etc/passwd and Windows reserved stems are rewritten or rejected). For multi-host deployments use the Redis or Cosmos DB providers — the JSONL implementation only locks within a single process.

Recipe 6 — Encrypt session history at rest

Section titled “Recipe 6 — Encrypt session history at rest”

Inject your own JSON serialisers via dumps= / loads= to add envelope encryption. The provider validates that the output is a single line — Fernet tokens already are.

encrypted_sessions.py
import json
import os
from cryptography.fernet import Fernet
from agent_framework import Agent, FileHistoryProvider
from agent_framework.openai import OpenAIChatClient
cipher = Fernet(os.environ["AGENT_HISTORY_FERNET_KEY"])
def encrypt_dumps(payload: dict) -> str:
plaintext = json.dumps(payload, ensure_ascii=False).encode("utf-8")
return cipher.encrypt(plaintext).decode("ascii")
def decrypt_loads(line: str | bytes) -> dict:
if isinstance(line, str):
line = line.encode("ascii")
return json.loads(cipher.decrypt(line).decode("utf-8"))
history = FileHistoryProvider(
storage_path="./encrypted-conversations",
dumps=encrypt_dumps,
loads=decrypt_loads,
)
agent = Agent(client=OpenAIChatClient(), context_providers=[history])

Test with decrypt_loads(encrypt_dumps(x)) == x for a representative payload before deploying — both callables must round-trip cleanly.

Convert an agent into a tool that other agents can delegate to. propagate_session=True shares the parent’s session so the sub-agent sees the same conversation history.

delegating_agents.py
import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
client = OpenAIChatClient()
researcher = Agent(
client=client,
name="researcher",
description="Performs deep research on technical topics.",
instructions="Produce thorough, citation-backed research on the requested topic.",
)
summariser = Agent(
client=client,
name="summariser",
description="Condenses long passages into bullet points.",
instructions="Reduce input to <= 5 bullet points.",
)
coordinator = Agent(
client=client,
instructions=(
"You are a project coordinator. Use the `researcher` tool to gather "
"information, then the `summariser` tool to produce final bullets."
),
tools=[
researcher.as_tool(),
summariser.as_tool(propagate_session=True), # share the session
],
)
response = await coordinator.run("Brief me on quantum-resistant cryptography.")
print(response.text)
if __name__ == "__main__":
asyncio.run(main())

Three knobs on as_tool() worth knowing:

  • name= / description= override the auto-generated tool name and description.
  • approval_mode="always_require" puts a HITL gate on the delegation itself — useful when a sub-agent has access to expensive or sensitive tools.
  • stream_callback= surfaces the sub-agent’s streaming updates to your UI as it works.

Recipe 8 — Router pattern (classify + dispatch)

Section titled “Recipe 8 — Router pattern (classify + dispatch)”

Pure-Python routing using a stateless classifier agent and per-domain specialists.

router_agent.py
import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
class SupportRouter:
def __init__(self, client: OpenAIChatClient) -> None:
self.router = Agent(
client=client,
instructions=(
"Classify the user query as one of: BILLING, TECHNICAL, ACCOUNT. "
"Reply with exactly that single word."
),
)
self.specialists = {
"BILLING": Agent(client=client, instructions="You are a billing specialist."),
"TECHNICAL": Agent(client=client, instructions="You are a tech-support engineer."),
"ACCOUNT": Agent(client=client, instructions="You are an account-management agent."),
}
async def handle(self, query: str) -> str:
route = (await self.router.run(query)).text.strip().upper()
agent = self.specialists.get(route, self.specialists["TECHNICAL"])
return (await agent.run(query)).text
async def main() -> None:
router = SupportRouter(OpenAIChatClient())
print(await router.handle("My invoice for April is wrong."))
if __name__ == "__main__":
asyncio.run(main())

For a graph-based variant with checkpointing, fan-in / fan-out edges, and event streaming, swap the dispatcher for WorkflowBuilder (Recipe 13).

LocalEvaluator is dependency-free — every check is a Python function. Combine the built-in checks (keyword_check, tool_called_check, tool_calls_present, tool_call_args_match) with your own @evaluator callables.

eval_smoke.py
import asyncio
from agent_framework import (
Agent,
LocalEvaluator,
evaluate_agent,
keyword_check,
tool,
tool_called_check,
evaluator,
CheckResult,
)
from agent_framework.openai import OpenAIChatClient
@tool
def get_weather(location: str) -> str:
"""Return the current weather for a city."""
return f"{location}: 22°C, clear"
@evaluator
def mentions_celsius(response: str) -> CheckResult:
return CheckResult(
passed="°C" in response,
reason="celsius unit present" if "°C" in response else "missing °C",
check_name="mentions_celsius",
)
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a weather assistant; always cite °C.",
tools=[get_weather],
)
[results] = await evaluate_agent(
agent=agent,
queries=["What's the weather in Amsterdam?", "Is it warm in Cairo?"],
evaluators=LocalEvaluator(
keyword_check("weather"),
tool_called_check("get_weather"),
mentions_celsius,
),
)
counts = results.result_counts
print(f"passed={counts['passed']} failed={counts['failed']}")
assert counts["failed"] == 0, results.error
if __name__ == "__main__":
asyncio.run(main())

For LLM-as-judge or weighted scoring patterns see the evaluation guide.

Recipe 10 — Skills for progressive-disclosure knowledge

Section titled “Recipe 10 — Skills for progressive-disclosure knowledge”

A Skill is advertised by name + description. Only when the model decides to use it is the full body or a resource fetched — keeping context lean for rarely-used domain knowledge.

pricing_skill.py
import asyncio
from agent_framework import Agent, Skill, SkillsProvider
from agent_framework.openai import OpenAIChatClient
pricing = Skill(
name="tenant-pricing",
description="Look up the per-SKU pricing matrix for the active tenant.",
content=(
"Use `read_skill_resource('tenant-pricing', 'matrix')` to fetch the current "
"matrix. Quote prices in USD unless the user specifies otherwise."
),
)
@pricing.resource
async def matrix(**kwargs) -> str:
"""Pricing matrix for the active tenant.
The agent invokes this with no arguments; we receive runtime context via
**kwargs forwarded from agent.run(..., function_invocation_kwargs=...).
"""
tenant_id = kwargs.get("tenant_id", "default")
# Replace with a real DB / API call.
return f"tenant={tenant_id}\npro: $99/mo\nteam: $299/mo\nenterprise: contact sales"
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You answer pricing questions for the active tenant.",
context_providers=[SkillsProvider(skills=[pricing])],
)
response = await agent.run(
"What does the team plan cost for ACME?",
function_invocation_kwargs={"tenant_id": "acme"},
)
print(response.text)
if __name__ == "__main__":
asyncio.run(main())

**kwargs in the resource signature opts the function in to runtime data the model never sees (tenant id, request id, user id). Without **kwargs the framework calls the resource with no args. See the skills guide for file-based skills, executable scripts, and approval gates.


Recipe 11 — MCP stdio + HTTP composition

Section titled “Recipe 11 — MCP stdio + HTTP composition”

Combine a local MCP server (filesystem) with a remote one (Microsoft Learn). Each tool’s exposed functions are namespaced by its name=.

multi_mcp.py
import asyncio
from agent_framework import Agent, MCPStdioTool, MCPStreamableHTTPTool
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
async with (
MCPStdioTool(
name="fs",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
description="Local filesystem under /tmp",
) as fs,
MCPStreamableHTTPTool(
name="learn",
url="https://learn.microsoft.com/api/mcp",
description="Microsoft Learn documentation search",
request_timeout=30,
) as learn,
):
agent = Agent(
client=OpenAIChatClient(),
instructions=(
"You can read and write files in /tmp via `fs` and search Microsoft "
"Learn via `learn`. Cite documentation URLs when relevant."
),
tools=[fs, learn],
)
response = await agent.run(
"List files in /tmp and find the latest azure-identity guidance."
)
print(response.text)
if __name__ == "__main__":
asyncio.run(main())

MCPStdioTool spawns a subprocess; async with cleans it up deterministically. For long-running servers, manage connect() / close() explicitly so the same MCP session survives across many agent runs (see the MCP guide).

Recipe 12 — Per-tenant MCP via header_provider

Section titled “Recipe 12 — Per-tenant MCP via header_provider”

Forward auth tokens to a remote MCP server without building a fresh httpx.AsyncClient per tenant. The header_provider callable receives function_invocation_kwargs from the outer agent.run(...).

tenant_mcp.py
import asyncio
from agent_framework import Agent, MCPStreamableHTTPTool
from agent_framework.openai import OpenAIChatClient
# Per-tool approval policy: write ops require approval, reads don't.
# (`MCPSpecificApproval` is a TypedDict in agent_framework._mcp; the dict
# below is structurally typed so the import isn't required at runtime.)
billing_approval = {
"always_require_approval": ["billing.refund", "billing.void"],
"never_require_approval": ["billing.list_invoices", "billing.get_invoice"],
}
async def serve_tenant(tenant_token: str, query: str) -> str:
mcp = MCPStreamableHTTPTool(
name="billing",
url="https://mcp.example.com",
approval_mode=billing_approval,
header_provider=lambda kwargs: {
"Authorization": f"Bearer {kwargs['token']}",
"X-Tenant-Id": kwargs.get("tenant_id", "default"),
},
)
async with mcp:
agent = Agent(client=OpenAIChatClient(), tools=mcp)
response = await agent.run(
query,
function_invocation_kwargs={"token": tenant_token, "tenant_id": "acme"},
)
return response.text
asyncio.run(serve_tenant("tk_live_abc", "List my open invoices."))

MCPSpecificApproval is a typed dict — IDEs autocomplete the two valid keys. Tools listed in both lists require approval (the safe default).

Long-running multi-step pipelines benefit from FileCheckpointStorage: every superstep is durable, and a process restart can resume from the latest checkpoint.

checkpointed_workflow.py
import asyncio
from agent_framework import Agent, FileCheckpointStorage, WorkflowBuilder
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
client = OpenAIChatClient()
researcher = Agent(client=client, name="researcher",
instructions="Produce 5 bullet research findings.")
analyst = Agent(client=client, name="analyst",
instructions="Identify the most important finding.")
writer = Agent(client=client, name="writer",
instructions="Turn the chosen finding into a 3-paragraph brief.")
storage = FileCheckpointStorage(
storage_path="./checkpoints",
# Allow your domain types if you persist them in workflow state.
# allowed_checkpoint_types=["my_app.models:ResearchState"],
)
workflow = (
WorkflowBuilder(
start_executor=researcher,
checkpoint_storage=storage,
name="research-brief",
max_iterations=10,
)
.add_edge(researcher, analyst)
.add_edge(analyst, writer)
.build()
)
result = await workflow.run("Topic: post-quantum cryptography migration")
print(result.get_outputs()[-1])
# Inspect what was saved.
latest = await storage.get_latest(workflow_name="research-brief")
if latest is not None:
print(f"latest checkpoint: id={latest.checkpoint_id} step={latest.iteration_count}")
if __name__ == "__main__":
asyncio.run(main())

Checkpoints are keyed on the workflow definition, not the instance — any process running the same WorkflowBuilder output can resume. For Cosmos DB, install agent-framework-azure-cosmos; for Redis, agent-framework-redis.

Recipe 14 — Function middleware: cache + retry

Section titled “Recipe 14 — Function middleware: cache + retry”

Stack two FunctionMiddleware instances. The cache short-circuits repeat calls; the retry handles transient failures.

function_middleware.py
import asyncio
import json
from typing import Any
from agent_framework import (
Agent,
FunctionMiddleware,
FunctionInvocationContext,
MiddlewareTermination,
tool,
)
from agent_framework.openai import OpenAIChatClient
class IdempotentCache(FunctionMiddleware):
"""Memoise calls to tools tagged kind='readonly'."""
def __init__(self) -> None:
self._cache: dict[str, Any] = {}
@staticmethod
def _key(name: str, arguments: dict[str, Any]) -> str:
return f"{name}::{json.dumps(arguments, sort_keys=True, default=str)}"
async def process(self, context: FunctionInvocationContext, call_next) -> None:
if context.function.kind != "readonly":
await call_next()
return
key = self._key(context.function.name, dict(context.arguments or {}))
if key in self._cache:
raise MiddlewareTermination("cache hit", result=self._cache[key])
await call_next()
if context.result is not None:
self._cache[key] = context.result
class RetryOnError(FunctionMiddleware):
"""Exponential backoff for any exception raised by the wrapped tool."""
def __init__(self, attempts: int = 3, backoff: float = 0.5) -> None:
self.attempts = attempts
self.backoff = backoff
async def process(self, context: FunctionInvocationContext, call_next) -> None:
last_exc: Exception | None = None
for i in range(self.attempts):
try:
await call_next()
return
except Exception as exc:
last_exc = exc
await asyncio.sleep(self.backoff * (2 ** i))
assert last_exc is not None
raise last_exc
@tool(kind="readonly")
def lookup_sku(sku: str) -> str:
"""Return the canonical name for a SKU."""
# Replace with a real DB / HTTP call.
return f"SKU {sku}: Wireless Charger"
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="Answer using the lookup_sku tool when needed.",
tools=[lookup_sku],
middleware=[IdempotentCache(), RetryOnError()],
)
print((await agent.run("What is SKU WCH-42?")).text)
print((await agent.run("And what is SKU WCH-42 again?")).text) # cache hit
if __name__ == "__main__":
asyncio.run(main())

Outer-to-inner: IdempotentCache wraps RetryOnError wraps the tool. The cache short-circuits via MiddlewareTermination so the retry layer doesn’t even fire on a hit.

Recipe 15 — Agent middleware: budgets and policy gates

Section titled “Recipe 15 — Agent middleware: budgets and policy gates”

AgentMiddleware wraps the entire agent.run(...) call. Use it for global concerns: budgets, profanity filters, system-prompt injection, run-level retries.

agent_middleware.py
import time
from agent_framework import (
Agent,
AgentContext,
AgentMiddleware,
AgentResponse,
Content,
Message,
MiddlewareTermination,
)
from agent_framework.openai import OpenAIChatClient
class BudgetGuard(AgentMiddleware):
"""Reject runs once a tenant has burned through its quota."""
def __init__(self, quotas: dict[str, int]) -> None:
self.quotas = dict(quotas)
async def process(self, context: AgentContext, call_next) -> None:
tenant = context.kwargs.get("tenant_id", "default")
if self.quotas.get(tenant, 0) <= 0:
raise MiddlewareTermination(
f"tenant={tenant} over quota",
result=AgentResponse(
messages=[Message(role="assistant",
contents=[Content.from_text("Quota exhausted.")])],
),
)
self.quotas[tenant] -= 1
await call_next()
class LatencyTimer(AgentMiddleware):
"""Stamp wall-clock latency onto context.metadata so downstream code can read it."""
async def process(self, context: AgentContext, call_next) -> None:
t0 = time.monotonic()
await call_next()
context.metadata["wall_ms"] = (time.monotonic() - t0) * 1000
agent = Agent(
client=OpenAIChatClient(),
instructions="",
middleware=[
LatencyTimer(),
BudgetGuard({"acme": 100, "globex": 50}),
],
)

MiddlewareTermination(result=...) lets you return a fully-formed synthetic response without invoking the model. See the middleware guide for streaming hooks, ChatMiddleware, and observability spans.

agent.as_mcp_server() returns a low-level mcp.server.lowlevel.Server that hosts the agent as a single tool. Drop it into stdio for inter-process composition or hook it up to the streamable-HTTP host of your choice.

mcp_server.py
import anyio
from mcp.server.stdio import stdio_server
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
name="docs_agent",
description="Answers questions about our internal documentation.",
instructions="Only answer using the indexed docs.",
)
server = agent.as_mcp_server(
server_name="docs-mcp",
version="1.0.0",
instructions="Use docs_agent for any question about internal docs.",
)
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
anyio.run(main)

Other agents (or Claude Desktop, an LLM IDE, …) can now consume your agent via MCPStdioTool(name="docs", command="python", args=["mcp_server.py"]). Image and audio outputs are dropped with a warning — only text content is forwarded over the MCP server path.


Recipe 17 — FastAPI service (modern lifespan)

Section titled “Recipe 17 — FastAPI service (modern lifespan)”

Wire an agent into a FastAPI app using the modern lifespan context manager (@app.on_event("startup") is deprecated in FastAPI 0.93+).

api.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agent_framework import Agent, AgentSession, FileHistoryProvider
from agent_framework.openai import OpenAIChatClient
class State:
agent: Agent
history: FileHistoryProvider
state = State()
@asynccontextmanager
async def lifespan(app: FastAPI):
state.history = FileHistoryProvider(storage_path="./conversations")
state.agent = Agent(
client=OpenAIChatClient(),
instructions="You are a helpful API assistant.",
context_providers=[state.history],
)
yield
# Add explicit cleanup here if you need it (close clients, flush metrics).
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel):
message: str
session_id: str
@app.post("/chat")
async def chat(payload: ChatRequest) -> dict[str, str]:
if not payload.message.strip():
raise HTTPException(status_code=400, detail="empty message")
session = state.agent.create_session(session_id=payload.session_id)
response = await state.agent.run(payload.message, session=session)
return {"reply": response.text}
# Run with: uvicorn api:app --reload

Authorise the session_id against the authenticated user before passing it to create_session()FileHistoryProvider only protects you from filesystem traversal, not authorisation.

Recipe 18 — Azure Functions queue trigger

Section titled “Recipe 18 — Azure Functions queue trigger”

Reuse the agent across invocations (warm-start) so Functions container reuse pays off.

function_app.py
import logging
import azure.functions as func
from agent_framework import Agent
from agent_framework.foundry import FoundryChatClient
app = func.FunctionApp()
# Module-level construction → warm-start reuse across invocations.
_client = FoundryChatClient()
_agent = Agent(
client=_client,
instructions="You process queued tasks. Reply with the action plan only.",
)
@app.queue_trigger(arg_name="msg", queue_name="agent-tasks", connection="AzureWebJobsStorage")
async def process(msg: func.QueueMessage) -> None:
body = msg.get_body().decode("utf-8")
logging.info("received task: %s", body)
response = await _agent.run(body)
logging.info("plan: %s", response.text)

For long-running jobs that should survive a redeploy, persist intermediate state via FileCheckpointStorage (Recipe 13) on a mounted Azure Files share, or move to a queue-driven Workflow with Cosmos checkpointing.

Recipe 19 — Server-Sent Events streaming endpoint

Section titled “Recipe 19 — Server-Sent Events streaming endpoint”

Stream the agent’s tokens to a browser via SSE. Reuse the ResponseStream directly — no extra buffering needed.

sse_api.py
import json
from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
class State:
agent: Agent
state = State()
@asynccontextmanager
async def lifespan(app: FastAPI):
state.agent = Agent(client=OpenAIChatClient(), instructions="Be concise.")
yield
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel):
message: str
async def sse(stream) -> AsyncIterator[bytes]:
async for update in stream:
if update.text:
payload = json.dumps({"text": update.text}).encode()
yield b"data: " + payload + b"\n\n"
final = await stream.get_final_response()
yield b"data: " + json.dumps({"done": True, "finish_reasons": list(final.finish_reasons or [])}).encode() + b"\n\n"
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest) -> StreamingResponse:
stream = state.agent.run(req.message, stream=True)
return StreamingResponse(sse(stream), media_type="text/event-stream")

stream.get_final_response() after iteration completes is the canonical place to access the joined response, finish reasons, and any HITL approval requests. For approval mid-stream, drop into await stream.send_response(...) before the loop returns.


Recipe 20 — Debug streaming with tool-call visibility

Section titled “Recipe 20 — Debug streaming with tool-call visibility”

Inspect tool calls inline as the model emits them — useful when an agent picks the wrong tool or emits empty arguments.

debug_stream.py
import asyncio
from agent_framework import Agent, tool
from agent_framework.openai import OpenAIChatClient
@tool
def get_time() -> str:
"""Return the current UTC time."""
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="Use get_time when asked about time.",
tools=[get_time],
)
stream = agent.run("What's the current UTC time? Then tell me a joke.", stream=True)
async for update in stream:
# Text chunks
if update.text:
print(f"\033[92m{update.text}\033[0m", end="", flush=True)
# Tool calls and results live inside `update.contents` as Content items.
for content in update.contents or []:
ctype = getattr(content, "type", None)
if ctype == "function_call":
print(f"\n\033[90m[call {content.name}({content.arguments})]\033[0m")
elif ctype == "function_result":
print(f"\n\033[90m[result {content.call_id}: {content.result}]\033[0m")
print()
final = await stream.get_final_response()
print(f"\nfinish_reasons={final.finish_reasons}")
if __name__ == "__main__":
asyncio.run(main())

Each AgentResponseUpdate carries contents — a list of unified Content items. Filter by type to pick out function calls, function results, text, images, and HITL requests. There’s no separate tool_calls attribute on the update.

Recipe 21 — Replay a session blob across processes

Section titled “Recipe 21 — Replay a session blob across processes”

Stateless workers (Lambda, Cloud Run) can round-trip the entire session through to_dict() / from_dict(). The blob carries every message plus any custom state you stored.

session_replay.py
import asyncio
import json
from agent_framework import Agent, AgentSession, InMemoryHistoryProvider
from agent_framework.openai import OpenAIChatClient
async def turn(blob: str | None, message: str) -> tuple[str, str]:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a customer-service agent.",
context_providers=[InMemoryHistoryProvider()],
)
session = AgentSession.from_dict(json.loads(blob)) if blob else AgentSession()
response = await agent.run(message, session=session)
return response.text, json.dumps(session.to_dict())
async def main() -> None:
reply, blob = await turn(None, "My order #4421 is missing.")
print(reply)
# A different worker process picks up the same conversation:
reply, blob = await turn(blob, "What did I just ask about?")
print(reply)
if __name__ == "__main__":
asyncio.run(main())

For custom types in session.state, register them once at module import via register_state_type(...) so cold-starts work even before the model is serialised. See the sessions guide for the full contract.

Recipe 22 — Agent todo list with TodoProvider (Experimental)

Section titled “Recipe 22 — Agent todo list with TodoProvider (Experimental)”

Give the agent a persistent task list. The provider exposes add_todos, complete_todos, get_remaining_todos, and remove_todos tools and injects usage instructions into the system prompt automatically.

todo_agent.py
import asyncio
from agent_framework import Agent, TodoFileStore, TodoProvider
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
# TodoFileStore persists todos under ./todos/<session_id>/todos.json
store = TodoFileStore(base_path="./todos")
agent = Agent(
client=OpenAIChatClient(),
instructions=(
"You are a project-management assistant. "
"Break every task into trackable todo items."
),
context_providers=[TodoProvider(store=store)],
)
# Reuse the same session_id to accumulate todos across runs
session = agent.create_session(session_id="sprint-42")
r = await agent.run(
"Plan the three phases of our Q3 product launch: "
"marketing, engineering, and customer-support prep.",
session=session,
)
print(r.text)
# Second turn — mark phase 1 complete after the team signs off
r2 = await agent.run(
"Engineering phase is done. Mark it complete.",
session=session,
)
print(r2.text)
# Inspect remaining todos from application code
items, _ = await store.load_state(session, source_id="todo")
pending = [i for i in items if not i.is_complete]
print(f"\n{len(pending)} task(s) remaining:")
for item in pending:
print(f" - {item.title}")
if __name__ == "__main__":
asyncio.run(main())

Use TodoProvider() (no store=) for in-session todos that vanish at process end. Swap in TodoFileStore (or a custom TodoStore subclass backed by Cosmos DB / Redis) for durable, cross-session tracking.


Recipe 23 — Plan / execute mode with AgentModeProvider (Experimental)

Section titled “Recipe 23 — Plan / execute mode with AgentModeProvider (Experimental)”

AgentModeProvider lets the agent switch between named operating modes. The default modes are plan (interactive, ask questions) and execute (autonomous, minimise interruptions). You can define any modes you need.

mode_agent.py
import asyncio
from agent_framework import Agent, AgentModeProvider, set_agent_mode
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a senior software engineer.",
context_providers=[
AgentModeProvider(
default_mode="plan",
mode_descriptions={
"plan": "Analyse the requirements and ask clarifying questions. Do not write code yet.",
"execute": "Implement the approved plan autonomously. Do not ask questions mid-task.",
"review": "Review the completed implementation for correctness and style.",
},
)
],
)
session = agent.create_session(session_id="feature-auth-v2")
# Phase 1: planning — agent should ask questions, not write code
r1 = await agent.run(
"We need to add OAuth2 PKCE login to our FastAPI app.",
session=session,
)
print("=== PLAN PHASE ===")
print(r1.text)
# Switch to execute mode programmatically after the human approves the plan
set_agent_mode(
session,
"execute",
available_modes=["plan", "execute", "review"],
)
r2 = await agent.run(
"Plan looks good. Go ahead and implement it.",
session=session,
)
print("\n=== EXECUTE PHASE ===")
print(r2.text)
# Switch to review mode
set_agent_mode(session, "review", available_modes=["plan", "execute", "review"])
r3 = await agent.run(
"Review what you just built.",
session=session,
)
print("\n=== REVIEW PHASE ===")
print(r3.text)
if __name__ == "__main__":
asyncio.run(main())

The agent can also switch modes itself by calling set_mode — useful when the agent decides it has enough information to start executing without waiting for the user. Combine with TodoProvider (Recipe 22) so the agent tracks its own tasks across the mode transitions.

Recipe 24 — Prompt injection defense with SecureAgentConfig (Experimental)

Section titled “Recipe 24 — Prompt injection defense with SecureAgentConfig (Experimental)”

SecureAgentConfig is a ContextProvider that defends against prompt injection using information-flow control. It labels tool results as TRUSTED or UNTRUSTED, blocks untrusted content from reaching privileged tools, and optionally logs policy violations. Import it from agent_framework.security — a separate sub-module from the main namespace.

secure_agent.py
import asyncio
from agent_framework import Agent, tool
from agent_framework.openai import OpenAIChatClient
from agent_framework.security import SecureAgentConfig, IntegrityLabel, ConfidentialityLabel
@tool
async def fetch_news(query: str) -> str:
"""Fetch news headlines — untrusted external content."""
return f"[external] Top story about {query}: ..."
@tool
async def send_email(to: str, body: str) -> str:
"""Send an email — privileged, must not be reachable from untrusted data."""
return f"sent to {to}"
@tool
async def log_search(query: str) -> str:
"""Log the search query for auditing — allowed even in untrusted context."""
return f"logged: {query}"
# allow_untrusted_tools lets log_search run even when context is tainted.
# approval_on_violation=True asks a human instead of hard-blocking on policy
# violations (e.g. untrusted data attempting to call send_email).
security = SecureAgentConfig(
auto_hide_untrusted=True, # hide UNTRUSTED results from the model
default_integrity=IntegrityLabel.UNTRUSTED, # all tool results default to untrusted
default_confidentiality=ConfidentialityLabel.PUBLIC,
allow_untrusted_tools={"log_search"}, # these tools may run in untrusted context
block_on_violation=False,
approval_on_violation=True, # route violation to human approval
enable_audit_log=True,
enable_policy_enforcement=True,
)
# SecureAgentConfig is a ContextProvider — pass via context_providers=
agent = Agent(
client=OpenAIChatClient(),
instructions=(
"You are a research assistant. "
"Do not send emails based on external news content."
),
tools=[fetch_news, send_email, log_search],
context_providers=[security],
)
async def main() -> None:
# A prompt injection attempt — external news content tries to trigger send_email.
# The policy enforcer will either block or route the send_email call for approval.
response = await agent.run(
"Find news about AI and send a summary to boss@example.com."
)
print(response.text)
if __name__ == "__main__":
asyncio.run(main())

Key points:

  • SecureAgentConfig automatically injects LabelTrackingFunctionMiddleware and (when enable_policy_enforcement=True) PolicyEnforcementFunctionMiddleware as function middleware.
  • It also registers two built-in security tools: quarantined_llm (runs a sub-prompt through an isolated model without privileged tool access) and inspect_variable (lets the agent examine labelled variables before acting).
  • This feature is ExperimentalFeature.FIDES — the API is functional but may change between minor releases.

See the comprehensive guide’s SecureAgentConfig section for the full constructor reference table.

Recipe 25 — FunctionExecutor data pipeline

Section titled “Recipe 25 — FunctionExecutor data pipeline”

Chain multiple @executor-decorated functions into a multi-step processing workflow using WorkflowBuilder. Sync functions run automatically in a thread pool via asyncio.to_thread.

executor_pipeline.py
import asyncio
from agent_framework import FunctionExecutor, WorkflowBuilder, WorkflowViz, executor, WorkflowContext
# Step 1 — normalise whitespace and capitalise
@executor(id="normalise")
async def normalise(text: str) -> None:
return " ".join(text.split()).capitalize()
# Step 2 — split into sentences and emit each one downstream
@executor(id="splitter")
async def splitter(text: str, ctx: WorkflowContext[str]) -> None:
for sentence in text.split("."):
stripped = sentence.strip()
if stripped:
await ctx.send_message(stripped)
# Step 3 — count words per sentence (sync function runs in asyncio.to_thread automatically)
def word_count(sentence: str) -> str:
count = len(sentence.split())
return f"{count} word(s): {sentence}"
counter = FunctionExecutor(word_count, id="word_counter")
# Wire the three steps into a linear pipeline
workflow = (
WorkflowBuilder(start_executor=normalise, name="text-pipeline")
.add_edge(normalise, splitter)
.add_edge(splitter, counter)
.build()
)
async def main() -> None:
# Visualise the graph before running
viz = WorkflowViz(workflow)
print("--- Mermaid diagram ---")
print(viz.to_mermaid())
print()
result = await workflow.run(
"hello world. this is a test. agent framework is great."
)
print("--- Outputs ---")
for output in result.get_outputs():
print(output)
if __name__ == "__main__":
asyncio.run(main())

Expected output:

--- Mermaid diagram ---
graph LR
normalise --> splitter
splitter --> word_counter
--- Outputs ---
3 word(s): Hello world
5 word(s): This is a test
4 word(s): Agent framework is great

Tip: Use WorkflowViz(workflow).export(format="svg", filename="pipeline.svg") (requires pip install graphviz and the dot binary) to produce a shareable diagram for documentation.


Recipe 26 — CompactionProvider + ToolResultCompactionStrategy + TokenBudgetComposedStrategy

Section titled “Recipe 26 — CompactionProvider + ToolResultCompactionStrategy + TokenBudgetComposedStrategy”

Long-running agents accumulate tool chatter fast. This recipe uses the CompactionProvider context provider to apply a two-phase compaction pipeline:

  • before: trim what the model sees each turn using a token-budget strategy that collapses old tool-call groups first, then shrinks the sliding window.
  • after: compact the stored history so the next session starts leaner.
compaction_provider.py
import asyncio
from agent_framework import (
Agent,
CharacterEstimatorTokenizer,
CompactionProvider,
InMemoryHistoryProvider,
SlidingWindowStrategy,
ToolResultCompactionStrategy,
TokenBudgetComposedStrategy,
)
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
client = OpenAIChatClient()
tokenizer = CharacterEstimatorTokenizer()
history = InMemoryHistoryProvider()
# before_strategy: try collapsing old tool calls first; fall back to the
# sliding window only if the token budget is still exceeded.
before_strategy = TokenBudgetComposedStrategy(
token_budget=6_000,
tokenizer=tokenizer,
strategies=[
ToolResultCompactionStrategy(keep_last_tool_call_groups=3),
SlidingWindowStrategy(keep_last_groups=20),
],
early_stop=True,
)
# after_strategy: after each reply, collapse all but 1 tool-call group so
# the persisted history stays compact for future sessions.
after_strategy = ToolResultCompactionStrategy(keep_last_tool_call_groups=1)
compaction = CompactionProvider(
before_strategy=before_strategy,
after_strategy=after_strategy,
tokenizer=tokenizer,
history_source_id=history.source_id, # "in_memory" by default
)
agent = Agent(
client=client,
instructions="You are a research agent that calls web search many times.",
context_providers=[history, compaction], # history must come before compaction
)
session = agent.create_session()
for i in range(20):
response = await agent.run(
f"Turn {i}: search for Python async patterns and summarise.",
session=session,
)
print(f"[{i}] {response.text[:80]}...")
if __name__ == "__main__":
asyncio.run(main())

Key points:

  • CompactionProvider must be listed after the history provider so it can access the stored messages in session.state.
  • history_source_id defaults to "in_memory" — change it if your InMemoryHistoryProvider has a custom source_id.
  • TokenBudgetComposedStrategy runs its sub-strategies in order, stopping as soon as token_budget is satisfied. Place cheap strategies (no extra LLM calls) first.
  • ToolResultCompactionStrategy preserves readability by replacing dropped tool-call groups with a one-line summary like [Tool results: web_search: 3 results].

Recipe 27 — Long-term memory with MemoryContextProvider (Experimental)

Section titled “Recipe 27 — Long-term memory with MemoryContextProvider (Experimental)”

MemoryContextProvider gives an agent cross-session memory. After every turn it automatically extracts facts from the conversation into per-topic Markdown files, maintains a MEMORY.md index, and archives transcripts for semantic search. On subsequent turns the index is injected into context so the agent recalls earlier sessions without exceeding the context window.

memory_agent.py
import asyncio
from agent_framework import Agent, MemoryContextProvider, MemoryFileStore
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
client = OpenAIChatClient()
# MemoryFileStore creates:
# ./memory/<source_id>/<owner_id>/memory/
# MEMORY.md ← topic index injected every turn
# topics/*.md ← one file per extracted topic
# transcripts/*.jsonl ← full turn history
store = MemoryFileStore(
base_path="./memory",
owner_state_key="user_id", # resolved from session.state["user_id"]
)
provider = MemoryContextProvider(
store=store,
recent_turns=3, # also inject 3 most-recent transcript turns each call
max_extractions=5, # extract up to 5 facts per turn into topic files
)
agent = Agent(
client=client,
instructions=(
"You are a personal assistant with long-term memory. "
"Proactively recall relevant facts about the user from MEMORY.md."
),
context_providers=[provider],
)
# --- Session 1 ---
session1 = agent.create_session(session_id="alice-jan")
session1.state["user_id"] = "alice" # routes memory to ./memory/.../alice/
await agent.run("I'm Alice. I prefer Python for backends and React for frontends.", session=session1)
await agent.run("My current project is a FastAPI service with a React dashboard.", session=session1)
print("Session 1 complete — facts extracted to disk.")
# --- Session 2 (simulates a new day / process restart) ---
session2 = agent.create_session(session_id="alice-feb")
session2.state["user_id"] = "alice" # same user_id → same memory root
r = await agent.run(
"What stack should I use for a new backend microservice?",
session=session2,
)
# Agent recalls Alice's Python preference stored in the previous session.
print(r.text)
# Inspect extracted topics from application code (list_topics is synchronous)
topics = store.list_topics(session2, source_id="memory")
print(f"\nExtracted {len(topics)} memory topic(s):")
for t in topics:
print(f" • {t.topic}: {t.summary}")
if __name__ == "__main__":
asyncio.run(main())

Key parameters:

ParameterDefaultPurpose
storerequiredMemoryFileStore or custom MemoryStore subclass
recent_turns0Inject N most-recent transcript turns alongside MEMORY.md
max_extractions5Maximum facts extracted from a single turn
selection_limit10Maximum topic files auto-loaded per turn
consolidation_interval7 daysHow often topic files are consolidated
consolidation_clientNone (uses agent client)Cheaper model for the consolidation pass

MemoryFileStore constructor reference:

MemoryFileStore(
base_path="./memory", # root directory
owner_state_key="user_id", # session.state key that holds the user/owner ID
kind="memory", # bucket name (subdirectory)
owner_prefix="", # optional prefix for owner directory names
)

The store enforces path isolation — owner IDs with .. or absolute path characters raise ValueError. Use store.list_topics(session, source_id=...), store.get_topic(...), and store.write_topic(...) from application code to read and seed memory without running the agent.


NeedClass / function
Build an agentAgent(client=, instructions=, tools=, context_providers=, middleware=)
One-shot vs threadedagent.run(prompt) vs agent.run(prompt, session=session)
Stream tokensagent.run(prompt, stream=True) returns ResponseStream
Tool from a function@tool from agent_framework
Sub-agent as toolagent.as_tool(propagate_session=True)
Expose agent over MCPagent.as_mcp_server(server_name=..., version=...)
Persistent sessionsFileHistoryProvider(storage_path=...)
Encrypted sessionsFileHistoryProvider(dumps=..., loads=...)
MCP local subprocessMCPStdioTool(name=, command=, args=)
MCP remote HTTPMCPStreamableHTTPTool(name=, url=, header_provider=)
Per-tool approvalapproval_mode="always_require" or per-tool dict ({"always_require_approval": [...], "never_require_approval": [...]})
Tool retriesFunctionMiddleware subclass, await call_next() in a loop
Run-level guardsAgentMiddleware subclass, raise MiddlewareTermination(result=...)
Workflow checkpointsFileCheckpointStorage(storage_path=...) + WorkflowBuilder(checkpoint_storage=)
SkillsSkillsProvider(my_skill) or SkillsProvider([skill_a, skill_b]) — takes a positional source (single Skill, list of Skill, or a SkillsSource)
Eval gatesLocalEvaluator(*checks) + evaluate_agent(...)
Agent todo list (exp.)TodoProvider(store=TodoFileStore(...))
Cross-session memory (exp.)MemoryContextProvider(store=MemoryFileStore(..., owner_state_key=...))
Compact context via providerCompactionProvider(before_strategy=..., after_strategy=..., history_source_id=...)
Drop old tool-call groupsSelectiveToolCallCompactionStrategy(keep_last_tool_call_groups=N)
Summarise old tool resultsToolResultCompactionStrategy(keep_last_tool_call_groups=N)
Token-budget compositionTokenBudgetComposedStrategy(token_budget=N, tokenizer=..., strategies=[...])
Plan / execute modes (exp.)AgentModeProvider(mode_descriptions={...})
Prompt injection defense (exp.)SecureAgentConfig(...) from agent_framework.security
Function-based workflow nodes@executor / FunctionExecutor(func, id=)
Visualise a workflowWorkflowViz(workflow).to_mermaid() / .to_digraph() / .export(format="svg")
In-memory skill sourceInMemorySkillsSource([skill_a, skill_b])
Custom composable skill sourceSubclass DelegatingSkillsSource(inner_source)
WebSocket MCP serverMCPWebsocketTool(name=, url="wss://...", request_timeout=)
Intercept agent.run()Subclass AgentMiddleware, override async process(context, call_next)
Intercept LLM callsSubclass ChatMiddleware, override async process(context, call_next)
Intercept tool callsSubclass FunctionMiddleware, override async process(context, call_next)
Early-exit from middlewareraise MiddlewareTermination(result=AgentResponse(...))

For deep dives see the framework’s other Python guides: tools, sessions, middleware, MCP, skills, evaluation, checkpointing, and orchestration.