Microsoft Agent Framework (Python) — Class Deep Dives
Microsoft Agent Framework Python — Class Deep Dives
Section titled “Microsoft Agent Framework Python — Class Deep Dives”Verified against agent-framework-core 1.6.0 (released May 2026). Each section is derived from the installed source in agent_framework/ — constructor signatures, docstrings, and live behaviour.
Ten classes are covered here. They were picked to span the full breadth of the SDK: from the entry-point Agent down through tools, sessions, middleware, graph workflows, functional workflows, skills, and MCP integration.
1. Agent — the recommended entry point
Section titled “1. Agent — the recommended entry point”Source: agent_framework/_agents.py — Agent(AgentMiddlewareLayer, AgentTelemetryLayer, RawAgent)
Agent is the recommended class for almost every use-case. It layers OpenTelemetry telemetry and agent middleware on top of RawAgent. Prefer RawAgent only for latency-critical hot paths.
Constructor
Section titled “Constructor”Agent( client: SupportsChatGetResponse[OptionsCoT], instructions: str | None = None, *, id: str | None = None, name: str | None = None, description: str | None = None, tools: ToolTypes | Callable | Sequence[ToolTypes | Callable] | None = None, default_options: OptionsCoT | None = None, context_providers: Sequence[ContextProvider] | None = None, middleware: Sequence[MiddlewareTypes] | None = None, require_per_service_call_history_persistence: bool = False, compaction_strategy: CompactionStrategy | None = None, tokenizer: TokenizerProtocol | None = None, additional_properties: MutableMapping[str, Any] | None = None,)All keyword args are optional except client.
Minimal agent
Section titled “Minimal agent”import asynciofrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You are a concise assistant.", ) response = await agent.run("Explain asyncio in one sentence.") print(response.text)
asyncio.run(main())Typed options for IDE autocomplete
Section titled “Typed options for IDE autocomplete”OptionsCoT is a TypedDict that flows from the chat client. Use it to get IDE completions for provider-specific options like reasoning_effort.
from agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient, OpenAIChatOptions
client = OpenAIChatClient(model="gpt-4o")
agent: Agent[OpenAIChatOptions] = Agent( client=client, name="reasoning-agent", instructions="Solve step by step.", default_options={ "temperature": 0.2, "max_tokens": 2048, "reasoning_effort": "high", # OpenAI-specific; IDE will autocomplete },)
# Override options at call time — merged with default_optionsresponse = await agent.run( "What is 137 × 89?", options={"temperature": 0.0},)print(response.text)Streaming
Section titled “Streaming”from agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
async def stream_example() -> None: agent = Agent(client=OpenAIChatClient(), instructions="You are a storyteller.")
stream = agent.run("Tell me a one-paragraph story.", stream=True)
async for update in stream: print(update.text, end="", flush=True)
final = await stream.get_final_response() print(f"\n\nFinished. Total chars: {len(final.text)}")as_tool() — delegate an agent as a sub-agent tool
Section titled “as_tool() — delegate an agent as a sub-agent tool”from agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
client = OpenAIChatClient()
research_agent = Agent( client=client, name="research", description="Searches and summarises information on any topic.", instructions="You are a research specialist. Be thorough and cite sources.",)
coordinator = Agent( client=client, name="coordinator", instructions="You are a coordinator. Delegate research to the research tool.", tools=[ research_agent.as_tool( arg_name="topic", arg_description="The topic to research in depth.", propagate_session=False, # default: independent session per call ) ],)
result = await coordinator.run("Research the history of the Python language.")print(result.text)as_tool() parameters:
| Parameter | Default | Description |
|---|---|---|
name | agent’s name | Tool name exposed to the calling model |
description | agent’s description | Tool description |
arg_name | "task" | Name of the single string argument |
arg_description | "Task for {name}" | Argument description |
approval_mode | "never_require" | Set to "always_require" to gate execution |
stream_callback | None | Async callback receiving AgentResponseUpdate |
propagate_session | False | Share parent session with sub-agent |
as_mcp_server() — expose the agent as an MCP server
Section titled “as_mcp_server() — expose the agent as an MCP server”from agent_framework import Agentfrom agent_framework.openai import OpenAIChatClientimport asyncio
agent = Agent( client=OpenAIChatClient(), name="data-analyst", description="Analyses tabular data and produces summaries.", instructions="You are a data analyst.",)
# Runs a stdio-based MCP server — other agents and tools can connect to itif __name__ == "__main__": asyncio.run(agent.as_mcp_server())Compaction — staying inside the context window
Section titled “Compaction — staying inside the context window”Pass any CompactionStrategy implementation to automatically compress long conversations. Use the built-in strategies rather than rolling your own — they handle group-level atomicity so a tool call and its result are never split:
import asynciofrom agent_framework import ( Agent, CharacterEstimatorTokenizer, SlidingWindowStrategy, SummarizationStrategy, TokenBudgetComposedStrategy,)from agent_framework.openai import OpenAIChatClient
async def main() -> None: client = OpenAIChatClient()
# Option 1: Sliding window — deterministic, no extra LLM calls. # Keeps the last 20 non-system groups; system messages are preserved by default. agent_window = Agent( client=client, instructions="You are a long-running assistant.", compaction_strategy=SlidingWindowStrategy(keep_last_groups=20), )
# Option 2: Summarise older turns into a single message. # target_count=8 means "try to keep 8 groups; trigger when count reaches 10". agent_summary = Agent( client=client, instructions="You are a long-running assistant.", compaction_strategy=SummarizationStrategy( client=client, # uses the same client to produce the summary target_count=8, threshold=2, ), )
# Option 3: Token-budget composition — run strategies in order until # the total token count falls under 8,000. tokenizer = CharacterEstimatorTokenizer() # 4 chars ≈ 1 token; no dependencies agent_budget = Agent( client=client, instructions="You are a long-running assistant.", compaction_strategy=TokenBudgetComposedStrategy( token_budget=8_000, tokenizer=tokenizer, strategies=[ SlidingWindowStrategy(keep_last_groups=30), SummarizationStrategy(client=client, target_count=10), ], early_stop=True, # stop once budget is satisfied; default True ), tokenizer=tokenizer, )
asyncio.run(main())2. RawAgent — no middleware, no telemetry
Section titled “2. RawAgent — no middleware, no telemetry”Source: agent_framework/_agents.py — RawAgent(BaseAgent, Generic[OptionsCoT])
RawAgent is identical to Agent but skips the middleware and OpenTelemetry layers. Use it when you need the absolute lowest overhead — e.g. a tight inner loop, a micro-service where you add your own instrumentation, or testing in isolation.
from agent_framework import RawAgentfrom agent_framework.openai import OpenAIChatClient
async def raw_example() -> None: agent = RawAgent( client=OpenAIChatClient(), instructions="Answer only with a number.", ) r = await agent.run("What is 7 * 6?") print(r.text) # "42"RawAgent exposes the identical run() signature as Agent — streaming, sessions, per-call tools, options — so you can swap them without changing call sites.
When to choose RawAgent vs Agent
Section titled “When to choose RawAgent vs Agent”Agent | RawAgent | |
|---|---|---|
| OpenTelemetry spans | ✅ automatic | ❌ none |
Agent middleware (@agent_middleware) | ✅ | ❌ |
| Chat & function middleware | ✅ | ✅ (via middleware= param on run()) |
| Latency | Small overhead | Minimal |
| Recommended for | Most production code | Hot paths, testing |
3. FunctionTool + @tool — wrapping callables for models
Section titled “3. FunctionTool + @tool — wrapping callables for models”Source: agent_framework/_tools.py
FunctionTool is the object a model sees. @tool is the most ergonomic way to create one; FunctionTool(...) constructor is used for advanced cases (no implementation, explicit schema, runtime construction).
@tool — the fast path
Section titled “@tool — the fast path”from typing import Annotatedfrom agent_framework import tool, Agentfrom agent_framework.openai import OpenAIChatClient
@tooldef get_weather( city: Annotated[str, "City name, e.g. 'Amsterdam'"], unit: Annotated[str, "Temperature unit: 'celsius' or 'fahrenheit'"] = "celsius",) -> str: """Return current weather conditions for a city.""" return f"{city}: 22°{unit[0].upper()}, partly cloudy"
agent = Agent( client=OpenAIChatClient(), instructions="You are a weather assistant.", tools=[get_weather],)
result = await agent.run("What is the weather in Paris?")The decorator reads the function name, docstring, and Annotated hints automatically. Both sync and async functions work identically.
Full @tool options
Section titled “Full @tool options”from pydantic import BaseModel, Fieldfrom typing import Annotatedfrom agent_framework import tool
class CityInput(BaseModel): city: Annotated[str, Field(description="City name")] unit: str = "celsius"
@tool( name="weather_lookup", # override function name description="Fetch live weather.", # override docstring schema=CityInput, # explicit Pydantic schema approval_mode="always_require", # pause before executing max_invocations=10, # lifetime cap on this tool instance max_invocation_exceptions=3, # stop after 3 errors)def get_weather(city: str, unit: str = "celsius") -> str: return f"{city}: 22°{unit[0].upper()}"FunctionTool constructor — advanced use
Section titled “FunctionTool constructor — advanced use”Use the constructor directly when you need:
- A declaration-only tool (no Python implementation — the model plans around it but you handle execution client-side).
- A raw JSON schema instead of a Pydantic model.
- A custom result parser.
from pydantic import BaseModel, Fieldfrom typing import Annotatedfrom agent_framework import FunctionTool
# 1. Declaration-only — no func argumentsearch_tool = FunctionTool( name="web_search", description="Search the web for current information.", func=None, # makes declaration_only=True input_model={ # raw JSON schema "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "num_results": {"type": "integer", "description": "Number of results", "default": 5}, }, "required": ["query"], },)
# 2. Custom result parser — control exactly what the model seesimport json
def _parse_db_result(raw: list[dict]) -> str: """Format raw DB rows as a markdown table.""" if not raw: return "No results found." headers = list(raw[0].keys()) rows = [[str(r[h]) for h in headers] for r in raw] header_row = " | ".join(headers) sep = " | ".join(["---"] * len(headers)) data_rows = "\n".join(" | ".join(row) for row in rows) return f"| {header_row} |\n| {sep} |\n" + "\n".join(f"| {r} |" for r in data_rows.split("\n"))
async def query_database(sql: str) -> list[dict]: ... # execute SQL, return rows
db_tool = FunctionTool( name="query_db", description="Execute a read-only SQL query.", func=query_database, result_parser=_parse_db_result, max_invocations=20,)FunctionInvocationContext — injecting runtime context into tools
Section titled “FunctionInvocationContext — injecting runtime context into tools”If a tool declares a FunctionInvocationContext parameter, the framework injects it automatically:
from agent_framework import tool, FunctionInvocationContextfrom typing import Annotated
@toolasync def authenticated_fetch( ctx: FunctionInvocationContext, # auto-injected — don't list in schema url: Annotated[str, "URL to fetch"],) -> str: """Fetch a URL using the caller's auth token.""" token = ctx.kwargs.get("auth_token", "") # Use token to make authenticated request return f"Fetched {url} with token {token[:8]}..."4. InMemoryHistoryProvider — zero-config conversation memory
Section titled “4. InMemoryHistoryProvider — zero-config conversation memory”Source: agent_framework/_sessions.py
InMemoryHistoryProvider stores messages inside the AgentSession.state dict — no external database required. It is the default provider the framework attaches automatically when no providers are configured and service-side history is not active.
Constructor
Section titled “Constructor”InMemoryHistoryProvider( source_id: str | None = None, # defaults to "in_memory" *, load_messages: bool = True, store_inputs: bool = True, store_context_messages: bool = False, store_context_from: set[str] | None = None, store_outputs: bool = True, skip_excluded: bool = False,)Multi-turn conversation with explicit provider
Section titled “Multi-turn conversation with explicit provider”import asynciofrom agent_framework import Agent, InMemoryHistoryProviderfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: provider = InMemoryHistoryProvider(source_id="chat")
agent = Agent( client=OpenAIChatClient(), instructions="You are a helpful assistant. Remember what the user tells you.", context_providers=[provider], )
session = agent.create_session(session_id="user-42")
r1 = await agent.run("My name is Alice and I like jazz.", session=session) r2 = await agent.run("What do you know about me?", session=session) print(r2.text) # → "Your name is Alice and you like jazz."
# Inspect stored messages messages = session.state.get("messages", []) print(f"Stored {len(messages)} messages.")
asyncio.run(main())skip_excluded — respecting compaction markers
Section titled “skip_excluded — respecting compaction markers”When a CompactionStrategy marks old messages as excluded, set skip_excluded=True so the provider omits them when loading history:
from agent_framework import Agent, InMemoryHistoryProviderfrom agent_framework.openai import OpenAIChatClient
provider = InMemoryHistoryProvider( skip_excluded=True, # honours _excluded flags set by compaction)
agent = Agent( client=OpenAIChatClient(), instructions="Long-running assistant.", context_providers=[provider],)Persisting a session across processes
Section titled “Persisting a session across processes”AgentSession is JSON-serialisable. Dump it before a process ends, restore it on next startup:
import jsonfrom agent_framework import AgentSession
# Savesession = agent.create_session(session_id="user-42")# ... run some turns ...data = session.to_dict()with open("session.json", "w") as f: json.dump(data, f)
# Restorewith open("session.json") as f: data = json.load(f)restored = AgentSession.from_dict(data)# resume with the same agentresult = await agent.run("Continue where we left off.", session=restored)5. WorkflowBuilder — graph-based multi-agent orchestration
Section titled “5. WorkflowBuilder — graph-based multi-agent orchestration”Source: agent_framework/_workflows/_workflow_builder.py
WorkflowBuilder constructs a directed, typed workflow graph from Executor nodes. Call .build() to get an immutable Workflow you can run repeatedly.
Key edge methods
Section titled “Key edge methods”| Method | Topology | Use when |
|---|---|---|
add_edge(A, B) | A → B (one-to-one) | Sequential pipeline steps |
add_chain([A, B, C]) | A → B → C | Linear sequence shorthand |
add_fan_out_edges(A, [B, C]) | A → B, A → C (broadcast) | Parallel processing |
add_fan_in_edges([A, B], C) | A, B → C (aggregate) | Merge parallel results |
add_switch_case_edge_group(A, {B: cond, C: cond}) | A → B or C based on output | Conditional routing |
add_multi_selection_edge_group(A, [B, C], selector) | A → subset of [B, C] | Dynamic fan-out |
Linear pipeline with add_chain
Section titled “Linear pipeline with add_chain”from typing_extensions import Neverfrom agent_framework import ( Agent, Executor, WorkflowBuilder, WorkflowContext, handler,)from agent_framework.openai import OpenAIChatClient
class FetchExecutor(Executor): @handler async def process(self, topic: str, ctx: WorkflowContext[str]) -> None: # Pretend to fetch data await ctx.send_message(f"Raw data about '{topic}': [lots of text]")
class SummariseExecutor(Executor): def __init__(self): super().__init__(id="summarise") self._agent = Agent( client=OpenAIChatClient(), instructions="Summarise the provided text in one paragraph.", )
@handler async def process(self, raw: str, ctx: WorkflowContext[Never, str]) -> None: result = await self._agent.run(raw) await ctx.yield_output(result.text)
fetch = FetchExecutor(id="fetch")summarise = SummariseExecutor()
workflow = ( WorkflowBuilder(start_executor=fetch, output_from=[summarise]) .add_chain([fetch, summarise]) .build())
result = await workflow.run("the Python programming language")print(result.get_outputs()) # ['One-paragraph summary …']Fan-out + fan-in (parallel validation)
Section titled “Fan-out + fan-in (parallel validation)”from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handlerfrom typing_extensions import Never
class DataSource(Executor): @handler async def generate(self, seed: str, ctx: WorkflowContext[str]) -> None: for item in seed.split(","): await ctx.send_message(item.strip())
class ValidatorA(Executor): @handler async def validate(self, item: str, ctx: WorkflowContext) -> None: print(f"[A] validated: {item}")
class ValidatorB(Executor): @handler async def validate(self, item: str, ctx: WorkflowContext) -> None: print(f"[B] validated: {item}")
class Aggregator(Executor): @handler async def aggregate(self, _results: list, ctx: WorkflowContext[Never, str]) -> None: await ctx.yield_output("All validations passed.")
source = DataSource(id="source")val_a = ValidatorA(id="val_a")val_b = ValidatorB(id="val_b")agg = Aggregator(id="agg")
workflow = ( WorkflowBuilder(start_executor=source, output_from=[agg]) .add_fan_out_edges(source, [val_a, val_b]) .add_fan_in_edges([val_a, val_b], agg) .build())
result = await workflow.run("apple, banana, cherry")print(result.get_outputs())Conditional routing with add_switch_case_edge_group
Section titled “Conditional routing with add_switch_case_edge_group”from dataclasses import dataclassfrom agent_framework import Executor, WorkflowBuilder, WorkflowContext, handlerfrom typing_extensions import Never
@dataclassclass Scored: score: float text: str
class Scorer(Executor): @handler async def score(self, text: str, ctx: WorkflowContext[Scored]) -> None: score = len(text) / 100.0 # toy scoring await ctx.send_message(Scored(score=score, text=text))
class HighPathHandler(Executor): @handler async def handle(self, scored: Scored, ctx: WorkflowContext[Never, str]) -> None: await ctx.yield_output(f"HIGH: {scored.text}")
class LowPathHandler(Executor): @handler async def handle(self, scored: Scored, ctx: WorkflowContext[Never, str]) -> None: await ctx.yield_output(f"LOW: {scored.text}")
scorer = Scorer(id="scorer")high = HighPathHandler(id="high")low = LowPathHandler(id="low")
workflow = ( WorkflowBuilder(start_executor=scorer, output_from=[high, low]) .add_switch_case_edge_group( scorer, { high: lambda result: result.score >= 0.5, low: lambda result: result.score < 0.5, }, ) .build())
result = await workflow.run("A moderately long piece of text here")print(result.get_outputs())Wrapping agents directly in workflows
Section titled “Wrapping agents directly in workflows”WorkflowBuilder accepts Agent wherever it accepts Executor — it wraps it in an AgentExecutor automatically:
from agent_framework import Agent, WorkflowBuilderfrom agent_framework.openai import OpenAIChatClient
client = OpenAIChatClient()
extractor = Agent( client=client, name="extractor", instructions="Extract the key facts from the text.",)formatter = Agent( client=client, name="formatter", instructions="Format the facts as a bullet list.",)
workflow = ( WorkflowBuilder(start_executor=extractor, output_from=[formatter]) .add_edge(extractor, formatter) .build())
result = await workflow.run("The Eiffel Tower was built in 1889 and stands 330 m tall.")print(result.get_outputs())6. WorkflowContext — sending messages and yielding output from executors
Section titled “6. WorkflowContext — sending messages and yielding output from executors”Source: agent_framework/_workflows/_workflow_context.py — WorkflowContext[OutT, W_OutT]
WorkflowContext is the second parameter of every @handler method. The two generic parameters declare what types the executor sends to downstream executors (OutT) and what it yields as workflow output (W_OutT).
WorkflowContext # accepts anything, yields nothingWorkflowContext[str] # sends str to downstream, yields nothingWorkflowContext[str, int] # sends str, yields int as outputWorkflowContext[Never, str] # yields str only, sends nothingCore API
Section titled “Core API”| Method / property | Description |
|---|---|
await ctx.send_message(value, target_id=None) | Send a typed message to the next executor(s) |
await ctx.yield_output(value) | Emit a workflow output event |
await ctx.add_event(event) | Emit a custom WorkflowEvent |
await ctx.request_info(data, response_type, request_id=None) | HITL pause — suspends until a response is supplied |
ctx.get_state(key, default=None) | Read workflow-scoped key/value state |
ctx.set_state(key, value) | Write workflow-scoped key/value state |
ctx.request_id | ID of the incoming message (for tracing) |
ctx.source_executor_ids | List of upstream executor IDs that sent messages |
ctx.is_streaming | True if the workflow was started with stream=True |
Emitting custom events
Section titled “Emitting custom events”from agent_framework import Executor, WorkflowContext, WorkflowEvent, handlerfrom typing_extensions import Never
class ProgressExecutor(Executor): @handler async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: await ctx.add_event(WorkflowEvent(type="progress", data={"step": "start"}))
# ... do work ... processed = text.upper()
await ctx.add_event(WorkflowEvent(type="progress", data={"step": "done"})) await ctx.yield_output(processed)Using state across handlers
Section titled “Using state across handlers”from agent_framework import Executor, WorkflowContext, handler
class CountingExecutor(Executor): @handler async def count(self, item: str, ctx: WorkflowContext[str]) -> None: seen = ctx.get_state("seen_count", default=0) ctx.set_state("seen_count", seen + 1) await ctx.send_message(f"[{seen + 1}] {item}")7. FunctionalWorkflow / @workflow — Python-native pipelines
Section titled “7. FunctionalWorkflow / @workflow — Python-native pipelines”Source: agent_framework/_workflows/_functional.py
@workflow turns a plain async function into a FunctionalWorkflow that supports step caching, HITL, checkpointing, and streaming — without any graph wiring.
Experimental in 1.6.0. Import triggers an
ExperimentalWarning.
Basic workflow
Section titled “Basic workflow”import asynciofrom agent_framework import workflow, step, Agentfrom agent_framework.openai import OpenAIChatClient
@stepasync def fetch_context(topic: str) -> str: """Simulate fetching background context.""" return f"Background on {topic}: [fetched data]"
@stepasync def summarise(context: str) -> str: agent = Agent( client=OpenAIChatClient(), instructions="Summarise the following in one sentence.", ) result = await agent.run(context) return result.text
@workflowasync def research_pipeline(topic: str) -> str: context = await fetch_context(topic) return await summarise(context)
async def main() -> None: result = await research_pipeline.run("quantum computing") outputs = result.get_outputs() print(outputs[0]) # one-sentence summary
asyncio.run(main())Parallel steps with asyncio.gather
Section titled “Parallel steps with asyncio.gather”import asynciofrom agent_framework import workflow, step
@stepasync def fetch_news(topic: str) -> list[str]: return [f"News item 1 about {topic}", f"News item 2 about {topic}"]
@stepasync def fetch_wiki(topic: str) -> str: return f"Wikipedia summary of {topic}"
@workflowasync def parallel_research(topic: str) -> dict: news, wiki = await asyncio.gather( fetch_news(topic), fetch_wiki(topic), ) return {"news": news, "wiki": wiki}
result = await parallel_research.run("Mars exploration")print(result.get_outputs()[0]) # {"news": [...], "wiki": "..."}Streaming events
Section titled “Streaming events”from agent_framework import workflow, step
@stepasync def stage_one(text: str) -> str: return text.upper()
@stepasync def stage_two(text: str) -> str: return text[::-1]
@workflowasync def two_stage(text: str) -> str: a = await stage_one(text) return await stage_two(a)
stream = two_stage.run("hello", stream=True)async for event in stream: print(f"[{event.type}] {event.data}")
final = await stream.get_final_response()print(final.get_outputs()) # ['OLLEH']as_agent() — use a functional workflow as an agent
Section titled “as_agent() — use a functional workflow as an agent”from agent_framework import workflow, step, Agentfrom agent_framework.openai import OpenAIChatClient
@stepasync def process(text: str) -> str: return text.lower()
@workflowasync def normalise(text: str) -> str: return await process(text)
workflow_agent = normalise.as_agent( name="normaliser", description="Converts text to lowercase.",)
coordinator = Agent( client=OpenAIChatClient(), instructions="Use the normaliser for all text.", tools=[workflow_agent.as_tool()],)8. RunContext — HITL and state inside @workflow functions
Section titled “8. RunContext — HITL and state inside @workflow functions”Source: agent_framework/_workflows/_functional.py — RunContext
RunContext is injected automatically into any @workflow or @step function that declares it (by type annotation : RunContext or by parameter name ctx). It gives a workflow access to human-in-the-loop pauses, custom events, and per-run key/value state.
Declare it
Section titled “Declare it”from agent_framework import workflow, step, RunContext
@workflowasync def my_pipeline(data: str, ctx: RunContext) -> str: ... # ctx injected automaticallyHuman-in-the-loop with request_info
Section titled “Human-in-the-loop with request_info”import asynciofrom agent_framework import workflow, step, RunContext, WorkflowRunResult
@stepasync def draft_document(topic: str) -> str: return f"Draft document about {topic}. [Needs review]"
@workflowasync def review_pipeline(topic: str, ctx: RunContext) -> str: draft = await draft_document(topic)
# Pause execution; resume by calling .run(responses={request_id: value}) approved_text: str = await ctx.request_info( request_data={"draft": draft}, response_type=str, request_id="human-review", # stable ID for resumption ) return approved_text
async def hitl_demo() -> None: # First call: runs until request_info, then suspends result1 = await review_pipeline.run("climate change") # result1.get_request_info_events() contains the pending request
# Second call: supply the human response result2 = await review_pipeline.run( responses={"human-review": "Approved text: Climate change is real."} ) print(result2.get_outputs()[0])
asyncio.run(hitl_demo())Workflow-scoped state
Section titled “Workflow-scoped state”from agent_framework import workflow, step, RunContext
@stepasync def compute_chunk(chunk: str, ctx: RunContext) -> str: total = ctx.get_state("char_count", 0) ctx.set_state("char_count", total + len(chunk)) return chunk.upper()
@workflowasync def batch_processor(text: str, ctx: RunContext) -> str: chunks = text.split(". ") results = [await compute_chunk(c, ctx) for c in chunks] final_count = ctx.get_state("char_count", 0) return f"Processed {final_count} chars: {'. '.join(results)}"ctx.add_event — progress reporting
Section titled “ctx.add_event — progress reporting”from agent_framework import workflow, step, RunContext, WorkflowEvent
@workflowasync def tracked_pipeline(text: str, ctx: RunContext) -> str: await ctx.add_event(WorkflowEvent(type="status", data={"phase": "start"})) result = text.upper() await ctx.add_event(WorkflowEvent(type="status", data={"phase": "done", "length": len(result)})) return resultget_run_context() — access from nested utilities
Section titled “get_run_context() — access from nested utilities”If a utility function runs inside a workflow but can’t declare RunContext directly, use get_run_context():
from agent_framework import get_run_context, WorkflowEvent
async def log_progress(message: str) -> None: ctx = get_run_context() if ctx is not None: await ctx.add_event(WorkflowEvent(type="log", data={"msg": message})) else: print(message) # fallback when called outside a workflow9. InlineSkill — embedding structured knowledge into agents
Section titled “9. InlineSkill — embedding structured knowledge into agents”Source: agent_framework/_skills.py
Skills are structured knowledge blobs injected into the agent’s system context. An InlineSkill defines everything in code: instructions, resources (read-only data), and scripts (executable helpers).
Experimental in 1.6.0.
Minimal skill
Section titled “Minimal skill”from agent_framework import InlineSkill, SkillFrontmatter
sql_skill = InlineSkill( frontmatter=SkillFrontmatter( name="sql-query-helper", description="Guides correct SQL query construction.", ), instructions="""Use parameterised queries. Never interpolate user input into SQL strings.Always include a LIMIT clause on SELECT queries unless explicitly told not to.""",)Adding resources with @skill.resource
Section titled “Adding resources with @skill.resource”Resources expose read-only data that the agent can consult (schemas, reference tables, config):
from agent_framework import InlineSkill, SkillFrontmatter, SkillsProvider
db_skill = InlineSkill( frontmatter=SkillFrontmatter( name="database-expert", description="Expert in the company's database schema.", ), instructions="Use the attached schema to write accurate SQL queries.",)
@db_skill.resourcedef get_schema() -> str: """The current production database schema.""" return """ CREATE TABLE orders ( id SERIAL PRIMARY KEY, customer_id INT NOT NULL, total_amount DECIMAL(10,2), status VARCHAR(20) DEFAULT 'pending', created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE customers ( id SERIAL PRIMARY KEY, email VARCHAR(255) UNIQUE NOT NULL, name VARCHAR(100) ); """
@db_skill.resource(name="status_values", description="Valid order status values")def get_status_values() -> str: return "pending, processing, shipped, delivered, cancelled"Adding scripts with @skill.script
Section titled “Adding scripts with @skill.script”Scripts are callable utilities the agent can invoke (e.g. run an actual query, validate a schema):
from agent_framework import InlineSkill, SkillFrontmatterfrom typing import Annotated
analytics_skill = InlineSkill( frontmatter=SkillFrontmatter( name="analytics", description="Data analytics helper with live query support.", ), instructions="Use the run_query script to execute read-only SQL.",)
@analytics_skill.scriptasync def run_query(sql: Annotated[str, "Read-only SQL query to execute"]) -> str: """Execute a read-only SQL query and return results as CSV.""" # In practice, call your database here return "id,name,total\n1,Alice,500.00\n2,Bob,750.00"Attaching skills to an agent via SkillsProvider
Section titled “Attaching skills to an agent via SkillsProvider”import asynciofrom agent_framework import Agent, InlineSkill, SkillFrontmatter, SkillsProviderfrom agent_framework.openai import OpenAIChatClient
my_skill = InlineSkill( frontmatter=SkillFrontmatter(name="greeting-style", description="Defines greeting style."), instructions="Always greet users warmly and use their first name.",)
skills_provider = SkillsProvider(my_skill)
agent = Agent( client=OpenAIChatClient(), instructions="You are a customer support agent.", context_providers=[skills_provider],)
async def main(): session = agent.create_session() r = await agent.run("Hi, I'm Alex!", session=session) print(r.text) # → "Hi Alex! How can I help you today?"
asyncio.run(main())10. MCPStdioTool — connecting to stdio MCP servers
Section titled “10. MCPStdioTool — connecting to stdio MCP servers”Source: agent_framework/_mcp.py — MCPStdioTool(MCPTool)
The Agent Framework ships three MCP connectors. MCPStdioTool is the most portable — it launches a local process and communicates over stdio.
Constructor (key parameters)
Section titled “Constructor (key parameters)”MCPStdioTool( name: str, command: str, *, args: list[str] | None = None, env: dict[str, str] | None = None, encoding: str | None = None, description: str | None = None, approval_mode: "always_require" | "never_require" | MCPSpecificApproval | None = None, allowed_tools: Collection[str] | None = None, tool_name_prefix: str | None = None, load_tools: bool = True, load_prompts: bool = True, request_timeout: int | None = None, parse_tool_results: Callable | None = None,)Connect to the official filesystem server
Section titled “Connect to the official filesystem server”import asynciofrom agent_framework import Agent, MCPStdioToolfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: mcp_tool = MCPStdioTool( name="filesystem", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"], description="Read and write files in /tmp.", # Require approval before any destructive operation approval_mode={"always_require_approval": ["write_file", "delete_file"]}, )
async with mcp_tool: # connects, loads tools, disconnects on exit agent = Agent( client=OpenAIChatClient(), instructions="You are a file management assistant.", tools=[mcp_tool], ) result = await agent.run("List the files in /tmp and summarise their names.") print(result.text)
asyncio.run(main())Fine-grained approval with MCPSpecificApproval
Section titled “Fine-grained approval with MCPSpecificApproval”from agent_framework import MCPStdioTool
mcp = MCPStdioTool( name="code-runner", command="python", args=["-m", "my_mcp_server"], approval_mode={ "never_require_approval": ["read_file", "list_directory"], "always_require_approval": ["run_code", "write_file", "delete_file"], },)Filtering exposed tools with allowed_tools
Section titled “Filtering exposed tools with allowed_tools”from agent_framework import MCPStdioTool
# Only expose read operations from a broad MCP servermcp = MCPStdioTool( name="safe-fs", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/data"], allowed_tools=["read_file", "list_directory", "get_file_info"],)Custom result parser
Section titled “Custom result parser”import jsonfrom mcp import typesfrom agent_framework import MCPStdioTool
def _parse_result(result: types.CallToolResult) -> str: """Convert MCP result to a compact JSON string.""" content_strs = [] for block in result.content: if hasattr(block, "text"): content_strs.append(block.text) return json.dumps({"result": content_strs, "is_error": result.isError})
mcp = MCPStdioTool( name="custom-server", command="my_mcp_server", parse_tool_results=_parse_result,)MCPStreamableHTTPTool — HTTP-based MCP servers
Section titled “MCPStreamableHTTPTool — HTTP-based MCP servers”For remote MCP servers over HTTP/SSE:
from agent_framework import MCPStreamableHTTPTool, Agentfrom agent_framework.openai import OpenAIChatClientimport httpx
async def main() -> None: # With a custom httpx client (for auth headers, timeouts, proxies) http_client = httpx.AsyncClient( headers={"Authorization": "Bearer my-token"}, timeout=30.0, )
mcp_tool = MCPStreamableHTTPTool( name="remote-api", url="https://mcp.example.com/v1", description="Remote API MCP server.", http_client=http_client, approval_mode="never_require", tool_name_prefix="api", # exposed as api__tool_name )
async with mcp_tool: agent = Agent( client=OpenAIChatClient(), instructions="Use the remote API to fetch data.", tools=[mcp_tool], ) result = await agent.run("Fetch the latest metrics from the API.") print(result.text)Using header_provider for per-request auth
Section titled “Using header_provider for per-request auth”from agent_framework import MCPStreamableHTTPTool
def _inject_auth(kwargs: dict) -> dict[str, str]: """Forward the auth token set by middleware into MCP requests.""" token = kwargs.get("auth_token", "") return {"Authorization": f"Bearer {token}"} if token else {}
mcp = MCPStreamableHTTPTool( name="authed-api", url="https://api.example.com/mcp", header_provider=_inject_auth,)11. MemoryContextProvider + MemoryFileStore — cross-session long-term memory
Section titled “11. MemoryContextProvider + MemoryFileStore — cross-session long-term memory”Source: agent_framework/_harness/_memory.py
MemoryContextProvider gives an agent a structured long-term memory that persists across sessions and processes. After every turn it automatically extracts noteworthy facts into topic-based Markdown files (topics/), maintains a MEMORY.md index of all topics, and archives full conversation transcripts so older turns remain searchable via semantic queries.
Experimental in 1.6.0.
Architecture
Section titled “Architecture”MEMORY.md ← topic index (one pointer per topic, injected every turn)topics/ preferences.md ← user preferences projects.md ← ongoing work items ...transcripts/ 2026-01-01T10:00:00.jsonl ← full turn history, searchablemaintenance-state.jsonConstructor (key parameters)
Section titled “Constructor (key parameters)”MemoryContextProvider( recent_turns: int = 0, # inject N most-recent transcript turns each call load_tool_turns: bool = True, *, store: MemoryStore, # REQUIRED — MemoryFileStore or custom source_id: str = "memory", index_line_limit: int = 100, # topic pointers shown in MEMORY.md selection_limit: int = 10, # max topics auto-loaded per turn max_extractions: int = 5, # max facts extracted per turn consolidation_interval: timedelta = ...,# how often to consolidate topics consolidation_min_sessions: int = 3, consolidation_client: ..., # cheaper model for consolidation pass)MemoryFileStore is the file-backed implementation. It requires owner_state_key — the session-state key that holds the user/owner identifier so memory is correctly partitioned per user:
MemoryFileStore( base_path: str | Path, *, owner_state_key: str, # REQUIRED — e.g. "user_id" kind: str = "memory", owner_prefix: str = "", index_file_name: str = "MEMORY.md", topics_directory_name: str = "topics", transcripts_directory_name: str = "transcripts", state_file_name: str = "maintenance-state.json",)Minimal example — per-user persistent memory
Section titled “Minimal example — per-user persistent memory”import asynciofrom agent_framework import Agent, MemoryContextProvider, MemoryFileStorefrom agent_framework.openai import OpenAIChatClient
async def main() -> None: client = OpenAIChatClient()
# One store shared across all users; routing is by owner_state_key. store = MemoryFileStore( base_path="./memory-store", owner_state_key="user_id", # resolved from session.state["user_id"] )
provider = MemoryContextProvider( store=store, recent_turns=3, # inject last 3 transcript turns for recency max_extractions=5, # extract up to 5 facts per turn )
agent = Agent( client=client, instructions=( "You are a personal assistant with long-term memory. " "Use MEMORY.md to recall facts about the user." ), context_providers=[provider], )
# Set user_id in session state — the store uses this to route memory files. session = agent.create_session(session_id="alice-session-1") session.state["user_id"] = "alice"
# Turn 1 — agent will extract "prefers Python" into memory await agent.run("I prefer Python over JavaScript for backend work.", session=session)
# Turn 2 in a new session — memory is loaded from disk session2 = agent.create_session(session_id="alice-session-2") session2.state["user_id"] = "alice" r = await agent.run("What language should I use for a new backend service?", session=session2) print(r.text) # agent recalls the preference stored in the previous session
asyncio.run(main())Reading memory from application code
Section titled “Reading memory from application code”from agent_framework import AgentSession, MemoryFileStore
store = MemoryFileStore(base_path="./memory-store", owner_state_key="user_id")
# Create a minimal session to query memory for a specific usersession = AgentSession(session_id="query-session")session.state["user_id"] = "alice"
# List all topic files (synchronous — no await needed)topics = store.list_topics(session, source_id="memory")for topic in topics: print(f"{topic.topic}: {topic.summary}")
# Read a specific topic (synchronous)rec = store.get_topic(session, source_id="memory", topic="preferences")print(rec.content)Write a custom topic directly
Section titled “Write a custom topic directly”import datetimefrom agent_framework import AgentSession, MemoryFileStore, MemoryTopicRecord
store = MemoryFileStore(base_path="./memory-store", owner_state_key="user_id")session = AgentSession(session_id="setup")session.state["user_id"] = "alice"
# write_topic is synchronous — no awaitstore.write_topic( session, MemoryTopicRecord( topic="onboarding", slug="onboarding", summary="Alice joined on 2026-01-15 via the enterprise plan.", updated_at=datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), content="# Onboarding\n\nAlice joined 2026-01-15 via Enterprise plan. Primary contact: bob@corp.com.", ), source_id="memory",)12. TodoProvider + TodoStore — structured task tracking
Section titled “12. TodoProvider + TodoStore — structured task tracking”Source: agent_framework/_harness/_todo.py
TodoProvider is a ContextProvider that gives an agent a persistent to-do list. It injects instructions and five tools (add_todos, complete_todos, remove_todos, get_remaining_todos, get_all_todos) automatically — the agent plans its work by decomposing tasks into todos and marks them complete as it executes.
Experimental in 1.6.0.
TodoItem data model
Section titled “TodoItem data model”TodoItem is a plain class (not a dataclass) — it uses SerializationMixin with __slots__ for efficient serialisation:
class TodoItem: id: int title: str description: str | None # optional detail is_complete: bool # False by default
def __init__(self, id: int, title: str, description: str | None = None, is_complete: bool = False) -> None: ... def to_dict(self, *, exclude: set[str] | None = None, exclude_none: bool = True) -> dict: ... @classmethod def from_dict(cls, raw_item: dict, ...) -> "TodoItem": ...TodoStore implementations
Section titled “TodoStore implementations”| Store | Persistence | When to use |
|---|---|---|
TodoSessionStore (default) | AgentSession.state — lives as long as the session object | Single-process, ephemeral tasks |
TodoFileStore | JSON files under base_path/<session_id>/todos.json | Durable, cross-process, multi-session |
Custom TodoStore subclass | Cosmos DB, Redis, SQL, … | Multi-host production deployments |
TodoProvider constructor
Section titled “TodoProvider constructor”TodoProvider( source_id: str = "todo", # unique ID within the agent's context providers *, instructions: str | None = None, # override default todo instructions store: TodoStore | None = None, # defaults to TodoSessionStore())In-memory (session-only) todos
Section titled “In-memory (session-only) todos”import asynciofrom agent_framework import Agent, TodoProviderfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: agent = Agent( client=OpenAIChatClient(), instructions="You are a project manager. Always use todos to track work.", context_providers=[TodoProvider()], # uses TodoSessionStore by default )
session = agent.create_session() r = await agent.run( "Plan a three-step migration from PostgreSQL to CockroachDB.", session=session, ) print(r.text) # agent breaks the migration into tracked todo items
asyncio.run(main())File-backed todos (durable across restarts)
Section titled “File-backed todos (durable across restarts)”import asynciofrom agent_framework import Agent, TodoFileStore, TodoProviderfrom agent_framework.openai import OpenAIChatClient
async def main() -> None: store = TodoFileStore(base_path="./todos")
agent = Agent( client=OpenAIChatClient(), instructions="You are a project-management assistant.", context_providers=[TodoProvider(store=store)], )
# Reuse the same session_id to accumulate and resume todos across restarts session = agent.create_session(session_id="sprint-42")
await agent.run( "Plan the three phases of our Q3 product launch: marketing, engineering, support.", session=session, )
# Second turn — mark engineering as done await agent.run("Engineering phase is complete. Mark it done.", session=session)
# Inspect todo state directly from application code items, _ = await store.load_state(session, source_id="todo") pending = [i for i in items if not i.is_complete] print(f"{len(pending)} task(s) still open:") for item in pending: print(f" [{item.id}] {item.title}" + (f" — {item.description}" if item.description else ""))
asyncio.run(main())Custom TodoStore — Cosmos DB example
Section titled “Custom TodoStore — Cosmos DB example”from agent_framework import AgentSession, TodoItem, TodoStore
class CosmosDbTodoStore(TodoStore): """Example custom store backed by Azure Cosmos DB."""
def __init__(self, container): self._container = container
async def load_state(self, session: AgentSession, *, source_id: str) -> tuple[list[TodoItem], int]: doc_id = f"{session.session_id}:{source_id}" try: item = self._container.read_item(item=doc_id, partition_key=doc_id) items = [TodoItem.from_dict(i) for i in item.get("items", [])] return items, item.get("next_id", 1) except Exception: return [], 1
async def save_state(self, session: AgentSession, items: list[TodoItem], *, next_id: int, source_id: str) -> None: doc_id = f"{session.session_id}:{source_id}" self._container.upsert_item({ "id": doc_id, "items": [i.to_dict() for i in items], "next_id": next_id, })13. AgentMiddleware + ChatMiddleware — extensible processing pipelines
Section titled “13. AgentMiddleware + ChatMiddleware — extensible processing pipelines”Source: agent_framework/_middleware.py
The framework has three middleware layers, each intercepting at a different granularity:
| Layer | Class | Wraps | Use for |
|---|---|---|---|
| Agent | AgentMiddleware | The full agent.run() call | Logging, auth, spend limits, rate limiting |
| Chat | ChatMiddleware | Each client.get_response() call to the LLM | Prompt mutation, response filtering, cost tracking |
| Function | FunctionMiddleware | Each tool invocation | Approval gates, parameter masking, audit logs |
All three follow the same process(context, call_next) pattern — call await call_next() to proceed, or mutate context.result and return without calling it to short-circuit.
AgentMiddleware — intercept a full agent run
Section titled “AgentMiddleware — intercept a full agent run”from agent_framework import AgentMiddleware, AgentContextfrom typing import Callable, Awaitableimport time
class TimingMiddleware(AgentMiddleware): """Log elapsed time for every agent.run() call."""
async def process( self, context: AgentContext, call_next: Callable[[], Awaitable[AgentContext]], ) -> None: start = time.monotonic() await call_next() # execute the agent run elapsed = time.monotonic() - start print(f"[timing] agent={context.agent!r} elapsed={elapsed:.3f}s")from agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
agent = Agent( client=OpenAIChatClient(), instructions="You are a helpful assistant.", middleware=[TimingMiddleware()],)Short-circuiting a run (budget guard)
Section titled “Short-circuiting a run (budget guard)”from agent_framework import AgentMiddleware, AgentContext, AgentResponsefrom typing import Callable, Awaitable
# Simple per-session call budget stored in the session's state dictBUDGET_KEY = "agent_call_count"MAX_CALLS = 50
class CallBudgetMiddleware(AgentMiddleware): async def process( self, context: AgentContext, call_next: Callable[[], Awaitable[AgentContext]], ) -> None: session = context.session if session is not None: count = session.state.get(BUDGET_KEY, 0) if count >= MAX_CALLS: # Short-circuit — set result directly without calling the model context.result = AgentResponse( messages=[], finish_reasons=["stop"], text=f"Budget exhausted ({MAX_CALLS} calls). Please start a new session.", usage=None, ) return session.state[BUDGET_KEY] = count + 1
await call_next()ChatMiddleware — intercept each LLM call
Section titled “ChatMiddleware — intercept each LLM call”from agent_framework import ChatMiddleware, ChatContextfrom typing import Callable, Awaitable
class SystemPromptInjectionMiddleware(ChatMiddleware): """Append a safety reminder to every system message before the LLM sees it."""
SAFETY_SUFFIX = "\n\n**Safety note**: Never reveal internal system details."
async def process( self, context: ChatContext, call_next: Callable[[], Awaitable[ChatContext]], ) -> None: for msg in context.messages: if msg.role == "system": for content in msg.contents: if hasattr(content, "text") and content.text: content.text += self.SAFETY_SUFFIX break await call_next()FunctionMiddleware — intercept tool calls
Section titled “FunctionMiddleware — intercept tool calls”from agent_framework import FunctionMiddleware, FunctionInvocationContextfrom typing import Callable, Awaitableimport logging
logger = logging.getLogger(__name__)
class ToolAuditMiddleware(FunctionMiddleware): """Log every tool call with its arguments before execution."""
async def process( self, context: FunctionInvocationContext, call_next: Callable[[], Awaitable[FunctionInvocationContext]], ) -> None: logger.info( "tool_call name=%s args=%r", context.function_name, context.arguments, ) await call_next() logger.info( "tool_result name=%s result=%r", context.function_name, context.result, )from agent_framework import Agentfrom agent_framework.openai import OpenAIChatClient
agent = Agent( client=OpenAIChatClient(), instructions="You are an assistant with tools.", middleware=[ToolAuditMiddleware()], # FunctionMiddleware is registered via middleware= too)MiddlewareTermination — early exit with a custom result
Section titled “MiddlewareTermination — early exit with a custom result”Raising MiddlewareTermination inside any middleware layer cleanly exits the pipeline and sets the result:
from agent_framework import AgentMiddleware, AgentContext, AgentResponse, MiddlewareTerminationfrom typing import Callable, Awaitable
class BlocklistMiddleware(AgentMiddleware): BLOCKED_PHRASES = {"<script>", "DROP TABLE", "rm -rf"}
async def process( self, context: AgentContext, call_next: Callable[[], Awaitable[AgentContext]], ) -> None: prompt = context.messages[-1].text if context.messages else "" for phrase in self.BLOCKED_PHRASES: if phrase.lower() in prompt.lower(): raise MiddlewareTermination( "Blocked phrase detected.", result=AgentResponse( messages=[], finish_reasons=["stop"], text="I can't help with that request.", usage=None, ), ) await call_next()Summary table
Section titled “Summary table”| Class | Module | Stable? | Key takeaway |
|---|---|---|---|
Agent | _agents | ✅ 1.6.0 | Use this for all production agents; typed options for IDE completions |
RawAgent | _agents | ✅ 1.6.0 | Same interface, zero middleware/telemetry overhead |
FunctionTool / @tool | _tools | ✅ 1.6.0 | @tool for fast path; FunctionTool for declaration-only, custom schemas |
InMemoryHistoryProvider | _sessions | ✅ 1.6.0 | Zero-config multi-turn memory; serialise via AgentSession.to_dict() |
WorkflowBuilder | _workflows/_workflow_builder | ✅ 1.6.0 | Fluent API: chain, fan-out, fan-in, switch-case, multi-select |
WorkflowContext | _workflows/_workflow_context | ✅ 1.6.0 | Typed send/yield inside Executor handlers; HITL via request_info |
FunctionalWorkflow / @workflow | _workflows/_functional | ⚠️ Experimental | Pure Python pipelines — no graph wiring needed |
RunContext | _workflows/_functional | ⚠️ Experimental | HITL, state, and events inside @workflow / @step functions |
InlineSkill | _skills | ⚠️ Experimental | Embed schema, resources, and scripts as structured agent knowledge |
MCPStdioTool | _mcp | ✅ 1.6.0 | Launch and connect to any stdio MCP server; fine-grained approval |
MemoryContextProvider | _harness/_memory | ⚠️ Experimental | Cross-session long-term memory via topic files + transcript archive |
TodoProvider | _harness/_todo | ⚠️ Experimental | Structured task tracking with 5 auto-injected tools |
AgentMiddleware | _middleware | ✅ 1.6.0 | Intercept agent.run() — logging, auth, budgets, short-circuit |
ChatMiddleware | _middleware | ✅ 1.6.0 | Intercept each LLM call — prompt mutation, cost tracking |
FunctionMiddleware | _middleware | ✅ 1.6.0 | Intercept tool invocations — approval gates, audit logs |
All examples verified against agent-framework-core==1.6.0 (May 2026). Experimental APIs emit ExperimentalWarning on import and may change in patch releases.