Skip to content

Microsoft Agent Framework (Python) — Class Deep Dives

Microsoft Agent Framework Python — Class Deep Dives

Section titled “Microsoft Agent Framework Python — Class Deep Dives”

Verified against agent-framework-core 1.6.0 (released May 2026). Each section is derived from the installed source in agent_framework/ — constructor signatures, docstrings, and live behaviour.

Ten classes are covered here. They were picked to span the full breadth of the SDK: from the entry-point Agent down through tools, sessions, middleware, graph workflows, functional workflows, skills, and MCP integration.


Source: agent_framework/_agents.pyAgent(AgentMiddlewareLayer, AgentTelemetryLayer, RawAgent)

Agent is the recommended class for almost every use-case. It layers OpenTelemetry telemetry and agent middleware on top of RawAgent. Prefer RawAgent only for latency-critical hot paths.

Agent(
client: SupportsChatGetResponse[OptionsCoT],
instructions: str | None = None,
*,
id: str | None = None,
name: str | None = None,
description: str | None = None,
tools: ToolTypes | Callable | Sequence[ToolTypes | Callable] | None = None,
default_options: OptionsCoT | None = None,
context_providers: Sequence[ContextProvider] | None = None,
middleware: Sequence[MiddlewareTypes] | None = None,
require_per_service_call_history_persistence: bool = False,
compaction_strategy: CompactionStrategy | None = None,
tokenizer: TokenizerProtocol | None = None,
additional_properties: MutableMapping[str, Any] | None = None,
)

All keyword args are optional except client.

import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a concise assistant.",
)
response = await agent.run("Explain asyncio in one sentence.")
print(response.text)
asyncio.run(main())

OptionsCoT is a TypedDict that flows from the chat client. Use it to get IDE completions for provider-specific options like reasoning_effort.

from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient, OpenAIChatOptions
client = OpenAIChatClient(model="gpt-4o")
agent: Agent[OpenAIChatOptions] = Agent(
client=client,
name="reasoning-agent",
instructions="Solve step by step.",
default_options={
"temperature": 0.2,
"max_tokens": 2048,
"reasoning_effort": "high", # OpenAI-specific; IDE will autocomplete
},
)
# Override options at call time — merged with default_options
response = await agent.run(
"What is 137 × 89?",
options={"temperature": 0.0},
)
print(response.text)
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def stream_example() -> None:
agent = Agent(client=OpenAIChatClient(), instructions="You are a storyteller.")
stream = agent.run("Tell me a one-paragraph story.", stream=True)
async for update in stream:
print(update.text, end="", flush=True)
final = await stream.get_final_response()
print(f"\n\nFinished. Total chars: {len(final.text)}")

as_tool() — delegate an agent as a sub-agent tool

Section titled “as_tool() — delegate an agent as a sub-agent tool”
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
client = OpenAIChatClient()
research_agent = Agent(
client=client,
name="research",
description="Searches and summarises information on any topic.",
instructions="You are a research specialist. Be thorough and cite sources.",
)
coordinator = Agent(
client=client,
name="coordinator",
instructions="You are a coordinator. Delegate research to the research tool.",
tools=[
research_agent.as_tool(
arg_name="topic",
arg_description="The topic to research in depth.",
propagate_session=False, # default: independent session per call
)
],
)
result = await coordinator.run("Research the history of the Python language.")
print(result.text)

as_tool() parameters:

ParameterDefaultDescription
nameagent’s nameTool name exposed to the calling model
descriptionagent’s descriptionTool description
arg_name"task"Name of the single string argument
arg_description"Task for {name}"Argument description
approval_mode"never_require"Set to "always_require" to gate execution
stream_callbackNoneAsync callback receiving AgentResponseUpdate
propagate_sessionFalseShare parent session with sub-agent

as_mcp_server() — expose the agent as an MCP server

Section titled “as_mcp_server() — expose the agent as an MCP server”
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
import asyncio
agent = Agent(
client=OpenAIChatClient(),
name="data-analyst",
description="Analyses tabular data and produces summaries.",
instructions="You are a data analyst.",
)
# Runs a stdio-based MCP server — other agents and tools can connect to it
if __name__ == "__main__":
asyncio.run(agent.as_mcp_server())

Compaction — staying inside the context window

Section titled “Compaction — staying inside the context window”

Pass any CompactionStrategy implementation to automatically compress long conversations. Use the built-in strategies rather than rolling your own — they handle group-level atomicity so a tool call and its result are never split:

import asyncio
from agent_framework import (
Agent,
CharacterEstimatorTokenizer,
SlidingWindowStrategy,
SummarizationStrategy,
TokenBudgetComposedStrategy,
)
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
client = OpenAIChatClient()
# Option 1: Sliding window — deterministic, no extra LLM calls.
# Keeps the last 20 non-system groups; system messages are preserved by default.
agent_window = Agent(
client=client,
instructions="You are a long-running assistant.",
compaction_strategy=SlidingWindowStrategy(keep_last_groups=20),
)
# Option 2: Summarise older turns into a single message.
# target_count=8 means "try to keep 8 groups; trigger when count reaches 10".
agent_summary = Agent(
client=client,
instructions="You are a long-running assistant.",
compaction_strategy=SummarizationStrategy(
client=client, # uses the same client to produce the summary
target_count=8,
threshold=2,
),
)
# Option 3: Token-budget composition — run strategies in order until
# the total token count falls under 8,000.
tokenizer = CharacterEstimatorTokenizer() # 4 chars ≈ 1 token; no dependencies
agent_budget = Agent(
client=client,
instructions="You are a long-running assistant.",
compaction_strategy=TokenBudgetComposedStrategy(
token_budget=8_000,
tokenizer=tokenizer,
strategies=[
SlidingWindowStrategy(keep_last_groups=30),
SummarizationStrategy(client=client, target_count=10),
],
early_stop=True, # stop once budget is satisfied; default True
),
tokenizer=tokenizer,
)
asyncio.run(main())

2. RawAgent — no middleware, no telemetry

Section titled “2. RawAgent — no middleware, no telemetry”

Source: agent_framework/_agents.pyRawAgent(BaseAgent, Generic[OptionsCoT])

RawAgent is identical to Agent but skips the middleware and OpenTelemetry layers. Use it when you need the absolute lowest overhead — e.g. a tight inner loop, a micro-service where you add your own instrumentation, or testing in isolation.

from agent_framework import RawAgent
from agent_framework.openai import OpenAIChatClient
async def raw_example() -> None:
agent = RawAgent(
client=OpenAIChatClient(),
instructions="Answer only with a number.",
)
r = await agent.run("What is 7 * 6?")
print(r.text) # "42"

RawAgent exposes the identical run() signature as Agent — streaming, sessions, per-call tools, options — so you can swap them without changing call sites.

AgentRawAgent
OpenTelemetry spans✅ automatic❌ none
Agent middleware (@agent_middleware)
Chat & function middleware✅ (via middleware= param on run())
LatencySmall overheadMinimal
Recommended forMost production codeHot paths, testing

3. FunctionTool + @tool — wrapping callables for models

Section titled “3. FunctionTool + @tool — wrapping callables for models”

Source: agent_framework/_tools.py

FunctionTool is the object a model sees. @tool is the most ergonomic way to create one; FunctionTool(...) constructor is used for advanced cases (no implementation, explicit schema, runtime construction).

from typing import Annotated
from agent_framework import tool, Agent
from agent_framework.openai import OpenAIChatClient
@tool
def get_weather(
city: Annotated[str, "City name, e.g. 'Amsterdam'"],
unit: Annotated[str, "Temperature unit: 'celsius' or 'fahrenheit'"] = "celsius",
) -> str:
"""Return current weather conditions for a city."""
return f"{city}: 22°{unit[0].upper()}, partly cloudy"
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a weather assistant.",
tools=[get_weather],
)
result = await agent.run("What is the weather in Paris?")

The decorator reads the function name, docstring, and Annotated hints automatically. Both sync and async functions work identically.

from pydantic import BaseModel, Field
from typing import Annotated
from agent_framework import tool
class CityInput(BaseModel):
city: Annotated[str, Field(description="City name")]
unit: str = "celsius"
@tool(
name="weather_lookup", # override function name
description="Fetch live weather.", # override docstring
schema=CityInput, # explicit Pydantic schema
approval_mode="always_require", # pause before executing
max_invocations=10, # lifetime cap on this tool instance
max_invocation_exceptions=3, # stop after 3 errors
)
def get_weather(city: str, unit: str = "celsius") -> str:
return f"{city}: 22°{unit[0].upper()}"

Use the constructor directly when you need:

  • A declaration-only tool (no Python implementation — the model plans around it but you handle execution client-side).
  • A raw JSON schema instead of a Pydantic model.
  • A custom result parser.
from pydantic import BaseModel, Field
from typing import Annotated
from agent_framework import FunctionTool
# 1. Declaration-only — no func argument
search_tool = FunctionTool(
name="web_search",
description="Search the web for current information.",
func=None, # makes declaration_only=True
input_model={ # raw JSON schema
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"num_results": {"type": "integer", "description": "Number of results", "default": 5},
},
"required": ["query"],
},
)
# 2. Custom result parser — control exactly what the model sees
import json
def _parse_db_result(raw: list[dict]) -> str:
"""Format raw DB rows as a markdown table."""
if not raw:
return "No results found."
headers = list(raw[0].keys())
rows = [[str(r[h]) for h in headers] for r in raw]
header_row = " | ".join(headers)
sep = " | ".join(["---"] * len(headers))
data_rows = "\n".join(" | ".join(row) for row in rows)
return f"| {header_row} |\n| {sep} |\n" + "\n".join(f"| {r} |" for r in data_rows.split("\n"))
async def query_database(sql: str) -> list[dict]:
... # execute SQL, return rows
db_tool = FunctionTool(
name="query_db",
description="Execute a read-only SQL query.",
func=query_database,
result_parser=_parse_db_result,
max_invocations=20,
)

FunctionInvocationContext — injecting runtime context into tools

Section titled “FunctionInvocationContext — injecting runtime context into tools”

If a tool declares a FunctionInvocationContext parameter, the framework injects it automatically:

from agent_framework import tool, FunctionInvocationContext
from typing import Annotated
@tool
async def authenticated_fetch(
ctx: FunctionInvocationContext, # auto-injected — don't list in schema
url: Annotated[str, "URL to fetch"],
) -> str:
"""Fetch a URL using the caller's auth token."""
token = ctx.kwargs.get("auth_token", "")
# Use token to make authenticated request
return f"Fetched {url} with token {token[:8]}..."

4. InMemoryHistoryProvider — zero-config conversation memory

Section titled “4. InMemoryHistoryProvider — zero-config conversation memory”

Source: agent_framework/_sessions.py

InMemoryHistoryProvider stores messages inside the AgentSession.state dict — no external database required. It is the default provider the framework attaches automatically when no providers are configured and service-side history is not active.

InMemoryHistoryProvider(
source_id: str | None = None, # defaults to "in_memory"
*,
load_messages: bool = True,
store_inputs: bool = True,
store_context_messages: bool = False,
store_context_from: set[str] | None = None,
store_outputs: bool = True,
skip_excluded: bool = False,
)

Multi-turn conversation with explicit provider

Section titled “Multi-turn conversation with explicit provider”
import asyncio
from agent_framework import Agent, InMemoryHistoryProvider
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
provider = InMemoryHistoryProvider(source_id="chat")
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a helpful assistant. Remember what the user tells you.",
context_providers=[provider],
)
session = agent.create_session(session_id="user-42")
r1 = await agent.run("My name is Alice and I like jazz.", session=session)
r2 = await agent.run("What do you know about me?", session=session)
print(r2.text) # → "Your name is Alice and you like jazz."
# Inspect stored messages
messages = session.state.get("messages", [])
print(f"Stored {len(messages)} messages.")
asyncio.run(main())

skip_excluded — respecting compaction markers

Section titled “skip_excluded — respecting compaction markers”

When a CompactionStrategy marks old messages as excluded, set skip_excluded=True so the provider omits them when loading history:

from agent_framework import Agent, InMemoryHistoryProvider
from agent_framework.openai import OpenAIChatClient
provider = InMemoryHistoryProvider(
skip_excluded=True, # honours _excluded flags set by compaction
)
agent = Agent(
client=OpenAIChatClient(),
instructions="Long-running assistant.",
context_providers=[provider],
)

AgentSession is JSON-serialisable. Dump it before a process ends, restore it on next startup:

import json
from agent_framework import AgentSession
# Save
session = agent.create_session(session_id="user-42")
# ... run some turns ...
data = session.to_dict()
with open("session.json", "w") as f:
json.dump(data, f)
# Restore
with open("session.json") as f:
data = json.load(f)
restored = AgentSession.from_dict(data)
# resume with the same agent
result = await agent.run("Continue where we left off.", session=restored)

5. WorkflowBuilder — graph-based multi-agent orchestration

Section titled “5. WorkflowBuilder — graph-based multi-agent orchestration”

Source: agent_framework/_workflows/_workflow_builder.py

WorkflowBuilder constructs a directed, typed workflow graph from Executor nodes. Call .build() to get an immutable Workflow you can run repeatedly.

MethodTopologyUse when
add_edge(A, B)A → B (one-to-one)Sequential pipeline steps
add_chain([A, B, C])A → B → CLinear sequence shorthand
add_fan_out_edges(A, [B, C])A → B, A → C (broadcast)Parallel processing
add_fan_in_edges([A, B], C)A, B → C (aggregate)Merge parallel results
add_switch_case_edge_group(A, {B: cond, C: cond})A → B or C based on outputConditional routing
add_multi_selection_edge_group(A, [B, C], selector)A → subset of [B, C]Dynamic fan-out
from typing_extensions import Never
from agent_framework import (
Agent, Executor, WorkflowBuilder, WorkflowContext, handler,
)
from agent_framework.openai import OpenAIChatClient
class FetchExecutor(Executor):
@handler
async def process(self, topic: str, ctx: WorkflowContext[str]) -> None:
# Pretend to fetch data
await ctx.send_message(f"Raw data about '{topic}': [lots of text]")
class SummariseExecutor(Executor):
def __init__(self):
super().__init__(id="summarise")
self._agent = Agent(
client=OpenAIChatClient(),
instructions="Summarise the provided text in one paragraph.",
)
@handler
async def process(self, raw: str, ctx: WorkflowContext[Never, str]) -> None:
result = await self._agent.run(raw)
await ctx.yield_output(result.text)
fetch = FetchExecutor(id="fetch")
summarise = SummariseExecutor()
workflow = (
WorkflowBuilder(start_executor=fetch, output_from=[summarise])
.add_chain([fetch, summarise])
.build()
)
result = await workflow.run("the Python programming language")
print(result.get_outputs()) # ['One-paragraph summary …']
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
from typing_extensions import Never
class DataSource(Executor):
@handler
async def generate(self, seed: str, ctx: WorkflowContext[str]) -> None:
for item in seed.split(","):
await ctx.send_message(item.strip())
class ValidatorA(Executor):
@handler
async def validate(self, item: str, ctx: WorkflowContext) -> None:
print(f"[A] validated: {item}")
class ValidatorB(Executor):
@handler
async def validate(self, item: str, ctx: WorkflowContext) -> None:
print(f"[B] validated: {item}")
class Aggregator(Executor):
@handler
async def aggregate(self, _results: list, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output("All validations passed.")
source = DataSource(id="source")
val_a = ValidatorA(id="val_a")
val_b = ValidatorB(id="val_b")
agg = Aggregator(id="agg")
workflow = (
WorkflowBuilder(start_executor=source, output_from=[agg])
.add_fan_out_edges(source, [val_a, val_b])
.add_fan_in_edges([val_a, val_b], agg)
.build()
)
result = await workflow.run("apple, banana, cherry")
print(result.get_outputs())

Conditional routing with add_switch_case_edge_group

Section titled “Conditional routing with add_switch_case_edge_group”
from dataclasses import dataclass
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
from typing_extensions import Never
@dataclass
class Scored:
score: float
text: str
class Scorer(Executor):
@handler
async def score(self, text: str, ctx: WorkflowContext[Scored]) -> None:
score = len(text) / 100.0 # toy scoring
await ctx.send_message(Scored(score=score, text=text))
class HighPathHandler(Executor):
@handler
async def handle(self, scored: Scored, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"HIGH: {scored.text}")
class LowPathHandler(Executor):
@handler
async def handle(self, scored: Scored, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"LOW: {scored.text}")
scorer = Scorer(id="scorer")
high = HighPathHandler(id="high")
low = LowPathHandler(id="low")
workflow = (
WorkflowBuilder(start_executor=scorer, output_from=[high, low])
.add_switch_case_edge_group(
scorer,
{
high: lambda result: result.score >= 0.5,
low: lambda result: result.score < 0.5,
},
)
.build()
)
result = await workflow.run("A moderately long piece of text here")
print(result.get_outputs())

WorkflowBuilder accepts Agent wherever it accepts Executor — it wraps it in an AgentExecutor automatically:

from agent_framework import Agent, WorkflowBuilder
from agent_framework.openai import OpenAIChatClient
client = OpenAIChatClient()
extractor = Agent(
client=client,
name="extractor",
instructions="Extract the key facts from the text.",
)
formatter = Agent(
client=client,
name="formatter",
instructions="Format the facts as a bullet list.",
)
workflow = (
WorkflowBuilder(start_executor=extractor, output_from=[formatter])
.add_edge(extractor, formatter)
.build()
)
result = await workflow.run("The Eiffel Tower was built in 1889 and stands 330 m tall.")
print(result.get_outputs())

6. WorkflowContext — sending messages and yielding output from executors

Section titled “6. WorkflowContext — sending messages and yielding output from executors”

Source: agent_framework/_workflows/_workflow_context.pyWorkflowContext[OutT, W_OutT]

WorkflowContext is the second parameter of every @handler method. The two generic parameters declare what types the executor sends to downstream executors (OutT) and what it yields as workflow output (W_OutT).

WorkflowContext # accepts anything, yields nothing
WorkflowContext[str] # sends str to downstream, yields nothing
WorkflowContext[str, int] # sends str, yields int as output
WorkflowContext[Never, str] # yields str only, sends nothing
Method / propertyDescription
await ctx.send_message(value, target_id=None)Send a typed message to the next executor(s)
await ctx.yield_output(value)Emit a workflow output event
await ctx.add_event(event)Emit a custom WorkflowEvent
await ctx.request_info(data, response_type, request_id=None)HITL pause — suspends until a response is supplied
ctx.get_state(key, default=None)Read workflow-scoped key/value state
ctx.set_state(key, value)Write workflow-scoped key/value state
ctx.request_idID of the incoming message (for tracing)
ctx.source_executor_idsList of upstream executor IDs that sent messages
ctx.is_streamingTrue if the workflow was started with stream=True
from agent_framework import Executor, WorkflowContext, WorkflowEvent, handler
from typing_extensions import Never
class ProgressExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.add_event(WorkflowEvent(type="progress", data={"step": "start"}))
# ... do work ...
processed = text.upper()
await ctx.add_event(WorkflowEvent(type="progress", data={"step": "done"}))
await ctx.yield_output(processed)
from agent_framework import Executor, WorkflowContext, handler
class CountingExecutor(Executor):
@handler
async def count(self, item: str, ctx: WorkflowContext[str]) -> None:
seen = ctx.get_state("seen_count", default=0)
ctx.set_state("seen_count", seen + 1)
await ctx.send_message(f"[{seen + 1}] {item}")

7. FunctionalWorkflow / @workflow — Python-native pipelines

Section titled “7. FunctionalWorkflow / @workflow — Python-native pipelines”

Source: agent_framework/_workflows/_functional.py

@workflow turns a plain async function into a FunctionalWorkflow that supports step caching, HITL, checkpointing, and streaming — without any graph wiring.

Experimental in 1.6.0. Import triggers an ExperimentalWarning.

import asyncio
from agent_framework import workflow, step, Agent
from agent_framework.openai import OpenAIChatClient
@step
async def fetch_context(topic: str) -> str:
"""Simulate fetching background context."""
return f"Background on {topic}: [fetched data]"
@step
async def summarise(context: str) -> str:
agent = Agent(
client=OpenAIChatClient(),
instructions="Summarise the following in one sentence.",
)
result = await agent.run(context)
return result.text
@workflow
async def research_pipeline(topic: str) -> str:
context = await fetch_context(topic)
return await summarise(context)
async def main() -> None:
result = await research_pipeline.run("quantum computing")
outputs = result.get_outputs()
print(outputs[0]) # one-sentence summary
asyncio.run(main())
import asyncio
from agent_framework import workflow, step
@step
async def fetch_news(topic: str) -> list[str]:
return [f"News item 1 about {topic}", f"News item 2 about {topic}"]
@step
async def fetch_wiki(topic: str) -> str:
return f"Wikipedia summary of {topic}"
@workflow
async def parallel_research(topic: str) -> dict:
news, wiki = await asyncio.gather(
fetch_news(topic),
fetch_wiki(topic),
)
return {"news": news, "wiki": wiki}
result = await parallel_research.run("Mars exploration")
print(result.get_outputs()[0]) # {"news": [...], "wiki": "..."}
from agent_framework import workflow, step
@step
async def stage_one(text: str) -> str:
return text.upper()
@step
async def stage_two(text: str) -> str:
return text[::-1]
@workflow
async def two_stage(text: str) -> str:
a = await stage_one(text)
return await stage_two(a)
stream = two_stage.run("hello", stream=True)
async for event in stream:
print(f"[{event.type}] {event.data}")
final = await stream.get_final_response()
print(final.get_outputs()) # ['OLLEH']

as_agent() — use a functional workflow as an agent

Section titled “as_agent() — use a functional workflow as an agent”
from agent_framework import workflow, step, Agent
from agent_framework.openai import OpenAIChatClient
@step
async def process(text: str) -> str:
return text.lower()
@workflow
async def normalise(text: str) -> str:
return await process(text)
workflow_agent = normalise.as_agent(
name="normaliser",
description="Converts text to lowercase.",
)
coordinator = Agent(
client=OpenAIChatClient(),
instructions="Use the normaliser for all text.",
tools=[workflow_agent.as_tool()],
)

8. RunContext — HITL and state inside @workflow functions

Section titled “8. RunContext — HITL and state inside @workflow functions”

Source: agent_framework/_workflows/_functional.pyRunContext

RunContext is injected automatically into any @workflow or @step function that declares it (by type annotation : RunContext or by parameter name ctx). It gives a workflow access to human-in-the-loop pauses, custom events, and per-run key/value state.

from agent_framework import workflow, step, RunContext
@workflow
async def my_pipeline(data: str, ctx: RunContext) -> str:
... # ctx injected automatically
import asyncio
from agent_framework import workflow, step, RunContext, WorkflowRunResult
@step
async def draft_document(topic: str) -> str:
return f"Draft document about {topic}. [Needs review]"
@workflow
async def review_pipeline(topic: str, ctx: RunContext) -> str:
draft = await draft_document(topic)
# Pause execution; resume by calling .run(responses={request_id: value})
approved_text: str = await ctx.request_info(
request_data={"draft": draft},
response_type=str,
request_id="human-review", # stable ID for resumption
)
return approved_text
async def hitl_demo() -> None:
# First call: runs until request_info, then suspends
result1 = await review_pipeline.run("climate change")
# result1.get_request_info_events() contains the pending request
# Second call: supply the human response
result2 = await review_pipeline.run(
responses={"human-review": "Approved text: Climate change is real."}
)
print(result2.get_outputs()[0])
asyncio.run(hitl_demo())
from agent_framework import workflow, step, RunContext
@step
async def compute_chunk(chunk: str, ctx: RunContext) -> str:
total = ctx.get_state("char_count", 0)
ctx.set_state("char_count", total + len(chunk))
return chunk.upper()
@workflow
async def batch_processor(text: str, ctx: RunContext) -> str:
chunks = text.split(". ")
results = [await compute_chunk(c, ctx) for c in chunks]
final_count = ctx.get_state("char_count", 0)
return f"Processed {final_count} chars: {'. '.join(results)}"
from agent_framework import workflow, step, RunContext, WorkflowEvent
@workflow
async def tracked_pipeline(text: str, ctx: RunContext) -> str:
await ctx.add_event(WorkflowEvent(type="status", data={"phase": "start"}))
result = text.upper()
await ctx.add_event(WorkflowEvent(type="status", data={"phase": "done", "length": len(result)}))
return result

get_run_context() — access from nested utilities

Section titled “get_run_context() — access from nested utilities”

If a utility function runs inside a workflow but can’t declare RunContext directly, use get_run_context():

from agent_framework import get_run_context, WorkflowEvent
async def log_progress(message: str) -> None:
ctx = get_run_context()
if ctx is not None:
await ctx.add_event(WorkflowEvent(type="log", data={"msg": message}))
else:
print(message) # fallback when called outside a workflow

9. InlineSkill — embedding structured knowledge into agents

Section titled “9. InlineSkill — embedding structured knowledge into agents”

Source: agent_framework/_skills.py

Skills are structured knowledge blobs injected into the agent’s system context. An InlineSkill defines everything in code: instructions, resources (read-only data), and scripts (executable helpers).

Experimental in 1.6.0.

from agent_framework import InlineSkill, SkillFrontmatter
sql_skill = InlineSkill(
frontmatter=SkillFrontmatter(
name="sql-query-helper",
description="Guides correct SQL query construction.",
),
instructions="""
Use parameterised queries. Never interpolate user input into SQL strings.
Always include a LIMIT clause on SELECT queries unless explicitly told not to.
""",
)

Resources expose read-only data that the agent can consult (schemas, reference tables, config):

from agent_framework import InlineSkill, SkillFrontmatter, SkillsProvider
db_skill = InlineSkill(
frontmatter=SkillFrontmatter(
name="database-expert",
description="Expert in the company's database schema.",
),
instructions="Use the attached schema to write accurate SQL queries.",
)
@db_skill.resource
def get_schema() -> str:
"""The current production database schema."""
return """
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INT NOT NULL,
total_amount DECIMAL(10,2),
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(100)
);
"""
@db_skill.resource(name="status_values", description="Valid order status values")
def get_status_values() -> str:
return "pending, processing, shipped, delivered, cancelled"

Scripts are callable utilities the agent can invoke (e.g. run an actual query, validate a schema):

from agent_framework import InlineSkill, SkillFrontmatter
from typing import Annotated
analytics_skill = InlineSkill(
frontmatter=SkillFrontmatter(
name="analytics",
description="Data analytics helper with live query support.",
),
instructions="Use the run_query script to execute read-only SQL.",
)
@analytics_skill.script
async def run_query(sql: Annotated[str, "Read-only SQL query to execute"]) -> str:
"""Execute a read-only SQL query and return results as CSV."""
# In practice, call your database here
return "id,name,total\n1,Alice,500.00\n2,Bob,750.00"

Attaching skills to an agent via SkillsProvider

Section titled “Attaching skills to an agent via SkillsProvider”
import asyncio
from agent_framework import Agent, InlineSkill, SkillFrontmatter, SkillsProvider
from agent_framework.openai import OpenAIChatClient
my_skill = InlineSkill(
frontmatter=SkillFrontmatter(name="greeting-style", description="Defines greeting style."),
instructions="Always greet users warmly and use their first name.",
)
skills_provider = SkillsProvider(my_skill)
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a customer support agent.",
context_providers=[skills_provider],
)
async def main():
session = agent.create_session()
r = await agent.run("Hi, I'm Alex!", session=session)
print(r.text) # → "Hi Alex! How can I help you today?"
asyncio.run(main())

10. MCPStdioTool — connecting to stdio MCP servers

Section titled “10. MCPStdioTool — connecting to stdio MCP servers”

Source: agent_framework/_mcp.pyMCPStdioTool(MCPTool)

The Agent Framework ships three MCP connectors. MCPStdioTool is the most portable — it launches a local process and communicates over stdio.

MCPStdioTool(
name: str,
command: str,
*,
args: list[str] | None = None,
env: dict[str, str] | None = None,
encoding: str | None = None,
description: str | None = None,
approval_mode: "always_require" | "never_require" | MCPSpecificApproval | None = None,
allowed_tools: Collection[str] | None = None,
tool_name_prefix: str | None = None,
load_tools: bool = True,
load_prompts: bool = True,
request_timeout: int | None = None,
parse_tool_results: Callable | None = None,
)
import asyncio
from agent_framework import Agent, MCPStdioTool
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
mcp_tool = MCPStdioTool(
name="filesystem",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
description="Read and write files in /tmp.",
# Require approval before any destructive operation
approval_mode={"always_require_approval": ["write_file", "delete_file"]},
)
async with mcp_tool: # connects, loads tools, disconnects on exit
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a file management assistant.",
tools=[mcp_tool],
)
result = await agent.run("List the files in /tmp and summarise their names.")
print(result.text)
asyncio.run(main())

Fine-grained approval with MCPSpecificApproval

Section titled “Fine-grained approval with MCPSpecificApproval”
from agent_framework import MCPStdioTool
mcp = MCPStdioTool(
name="code-runner",
command="python",
args=["-m", "my_mcp_server"],
approval_mode={
"never_require_approval": ["read_file", "list_directory"],
"always_require_approval": ["run_code", "write_file", "delete_file"],
},
)

Filtering exposed tools with allowed_tools

Section titled “Filtering exposed tools with allowed_tools”
from agent_framework import MCPStdioTool
# Only expose read operations from a broad MCP server
mcp = MCPStdioTool(
name="safe-fs",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/data"],
allowed_tools=["read_file", "list_directory", "get_file_info"],
)
import json
from mcp import types
from agent_framework import MCPStdioTool
def _parse_result(result: types.CallToolResult) -> str:
"""Convert MCP result to a compact JSON string."""
content_strs = []
for block in result.content:
if hasattr(block, "text"):
content_strs.append(block.text)
return json.dumps({"result": content_strs, "is_error": result.isError})
mcp = MCPStdioTool(
name="custom-server",
command="my_mcp_server",
parse_tool_results=_parse_result,
)

MCPStreamableHTTPTool — HTTP-based MCP servers

Section titled “MCPStreamableHTTPTool — HTTP-based MCP servers”

For remote MCP servers over HTTP/SSE:

from agent_framework import MCPStreamableHTTPTool, Agent
from agent_framework.openai import OpenAIChatClient
import httpx
async def main() -> None:
# With a custom httpx client (for auth headers, timeouts, proxies)
http_client = httpx.AsyncClient(
headers={"Authorization": "Bearer my-token"},
timeout=30.0,
)
mcp_tool = MCPStreamableHTTPTool(
name="remote-api",
url="https://mcp.example.com/v1",
description="Remote API MCP server.",
http_client=http_client,
approval_mode="never_require",
tool_name_prefix="api", # exposed as api__tool_name
)
async with mcp_tool:
agent = Agent(
client=OpenAIChatClient(),
instructions="Use the remote API to fetch data.",
tools=[mcp_tool],
)
result = await agent.run("Fetch the latest metrics from the API.")
print(result.text)

Using header_provider for per-request auth

Section titled “Using header_provider for per-request auth”
from agent_framework import MCPStreamableHTTPTool
def _inject_auth(kwargs: dict) -> dict[str, str]:
"""Forward the auth token set by middleware into MCP requests."""
token = kwargs.get("auth_token", "")
return {"Authorization": f"Bearer {token}"} if token else {}
mcp = MCPStreamableHTTPTool(
name="authed-api",
url="https://api.example.com/mcp",
header_provider=_inject_auth,
)

11. MemoryContextProvider + MemoryFileStore — cross-session long-term memory

Section titled “11. MemoryContextProvider + MemoryFileStore — cross-session long-term memory”

Source: agent_framework/_harness/_memory.py

MemoryContextProvider gives an agent a structured long-term memory that persists across sessions and processes. After every turn it automatically extracts noteworthy facts into topic-based Markdown files (topics/), maintains a MEMORY.md index of all topics, and archives full conversation transcripts so older turns remain searchable via semantic queries.

Experimental in 1.6.0.

MEMORY.md ← topic index (one pointer per topic, injected every turn)
topics/
preferences.md ← user preferences
projects.md ← ongoing work items
...
transcripts/
2026-01-01T10:00:00.jsonl ← full turn history, searchable
maintenance-state.json
MemoryContextProvider(
recent_turns: int = 0, # inject N most-recent transcript turns each call
load_tool_turns: bool = True,
*,
store: MemoryStore, # REQUIRED — MemoryFileStore or custom
source_id: str = "memory",
index_line_limit: int = 100, # topic pointers shown in MEMORY.md
selection_limit: int = 10, # max topics auto-loaded per turn
max_extractions: int = 5, # max facts extracted per turn
consolidation_interval: timedelta = ...,# how often to consolidate topics
consolidation_min_sessions: int = 3,
consolidation_client: ..., # cheaper model for consolidation pass
)

MemoryFileStore is the file-backed implementation. It requires owner_state_key — the session-state key that holds the user/owner identifier so memory is correctly partitioned per user:

MemoryFileStore(
base_path: str | Path,
*,
owner_state_key: str, # REQUIRED — e.g. "user_id"
kind: str = "memory",
owner_prefix: str = "",
index_file_name: str = "MEMORY.md",
topics_directory_name: str = "topics",
transcripts_directory_name: str = "transcripts",
state_file_name: str = "maintenance-state.json",
)

Minimal example — per-user persistent memory

Section titled “Minimal example — per-user persistent memory”
import asyncio
from agent_framework import Agent, MemoryContextProvider, MemoryFileStore
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
client = OpenAIChatClient()
# One store shared across all users; routing is by owner_state_key.
store = MemoryFileStore(
base_path="./memory-store",
owner_state_key="user_id", # resolved from session.state["user_id"]
)
provider = MemoryContextProvider(
store=store,
recent_turns=3, # inject last 3 transcript turns for recency
max_extractions=5, # extract up to 5 facts per turn
)
agent = Agent(
client=client,
instructions=(
"You are a personal assistant with long-term memory. "
"Use MEMORY.md to recall facts about the user."
),
context_providers=[provider],
)
# Set user_id in session state — the store uses this to route memory files.
session = agent.create_session(session_id="alice-session-1")
session.state["user_id"] = "alice"
# Turn 1 — agent will extract "prefers Python" into memory
await agent.run("I prefer Python over JavaScript for backend work.", session=session)
# Turn 2 in a new session — memory is loaded from disk
session2 = agent.create_session(session_id="alice-session-2")
session2.state["user_id"] = "alice"
r = await agent.run("What language should I use for a new backend service?", session=session2)
print(r.text) # agent recalls the preference stored in the previous session
asyncio.run(main())
from agent_framework import AgentSession, MemoryFileStore
store = MemoryFileStore(base_path="./memory-store", owner_state_key="user_id")
# Create a minimal session to query memory for a specific user
session = AgentSession(session_id="query-session")
session.state["user_id"] = "alice"
# List all topic files (synchronous — no await needed)
topics = store.list_topics(session, source_id="memory")
for topic in topics:
print(f"{topic.topic}: {topic.summary}")
# Read a specific topic (synchronous)
rec = store.get_topic(session, source_id="memory", topic="preferences")
print(rec.content)
import datetime
from agent_framework import AgentSession, MemoryFileStore, MemoryTopicRecord
store = MemoryFileStore(base_path="./memory-store", owner_state_key="user_id")
session = AgentSession(session_id="setup")
session.state["user_id"] = "alice"
# write_topic is synchronous — no await
store.write_topic(
session,
MemoryTopicRecord(
topic="onboarding",
slug="onboarding",
summary="Alice joined on 2026-01-15 via the enterprise plan.",
updated_at=datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
content="# Onboarding\n\nAlice joined 2026-01-15 via Enterprise plan. Primary contact: bob@corp.com.",
),
source_id="memory",
)

12. TodoProvider + TodoStore — structured task tracking

Section titled “12. TodoProvider + TodoStore — structured task tracking”

Source: agent_framework/_harness/_todo.py

TodoProvider is a ContextProvider that gives an agent a persistent to-do list. It injects instructions and five tools (add_todos, complete_todos, remove_todos, get_remaining_todos, get_all_todos) automatically — the agent plans its work by decomposing tasks into todos and marks them complete as it executes.

Experimental in 1.6.0.

TodoItem is a plain class (not a dataclass) — it uses SerializationMixin with __slots__ for efficient serialisation:

class TodoItem:
id: int
title: str
description: str | None # optional detail
is_complete: bool # False by default
def __init__(self, id: int, title: str, description: str | None = None, is_complete: bool = False) -> None: ...
def to_dict(self, *, exclude: set[str] | None = None, exclude_none: bool = True) -> dict: ...
@classmethod
def from_dict(cls, raw_item: dict, ...) -> "TodoItem": ...
StorePersistenceWhen to use
TodoSessionStore (default)AgentSession.state — lives as long as the session objectSingle-process, ephemeral tasks
TodoFileStoreJSON files under base_path/<session_id>/todos.jsonDurable, cross-process, multi-session
Custom TodoStore subclassCosmos DB, Redis, SQL, …Multi-host production deployments
TodoProvider(
source_id: str = "todo", # unique ID within the agent's context providers
*,
instructions: str | None = None, # override default todo instructions
store: TodoStore | None = None, # defaults to TodoSessionStore()
)
import asyncio
from agent_framework import Agent, TodoProvider
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a project manager. Always use todos to track work.",
context_providers=[TodoProvider()], # uses TodoSessionStore by default
)
session = agent.create_session()
r = await agent.run(
"Plan a three-step migration from PostgreSQL to CockroachDB.",
session=session,
)
print(r.text) # agent breaks the migration into tracked todo items
asyncio.run(main())

File-backed todos (durable across restarts)

Section titled “File-backed todos (durable across restarts)”
import asyncio
from agent_framework import Agent, TodoFileStore, TodoProvider
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
store = TodoFileStore(base_path="./todos")
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a project-management assistant.",
context_providers=[TodoProvider(store=store)],
)
# Reuse the same session_id to accumulate and resume todos across restarts
session = agent.create_session(session_id="sprint-42")
await agent.run(
"Plan the three phases of our Q3 product launch: marketing, engineering, support.",
session=session,
)
# Second turn — mark engineering as done
await agent.run("Engineering phase is complete. Mark it done.", session=session)
# Inspect todo state directly from application code
items, _ = await store.load_state(session, source_id="todo")
pending = [i for i in items if not i.is_complete]
print(f"{len(pending)} task(s) still open:")
for item in pending:
print(f" [{item.id}] {item.title}" + (f" — {item.description}" if item.description else ""))
asyncio.run(main())
from agent_framework import AgentSession, TodoItem, TodoStore
class CosmosDbTodoStore(TodoStore):
"""Example custom store backed by Azure Cosmos DB."""
def __init__(self, container):
self._container = container
async def load_state(self, session: AgentSession, *, source_id: str) -> tuple[list[TodoItem], int]:
doc_id = f"{session.session_id}:{source_id}"
try:
item = self._container.read_item(item=doc_id, partition_key=doc_id)
items = [TodoItem.from_dict(i) for i in item.get("items", [])]
return items, item.get("next_id", 1)
except Exception:
return [], 1
async def save_state(self, session: AgentSession, items: list[TodoItem], *, next_id: int, source_id: str) -> None:
doc_id = f"{session.session_id}:{source_id}"
self._container.upsert_item({
"id": doc_id,
"items": [i.to_dict() for i in items],
"next_id": next_id,
})

13. AgentMiddleware + ChatMiddleware — extensible processing pipelines

Section titled “13. AgentMiddleware + ChatMiddleware — extensible processing pipelines”

Source: agent_framework/_middleware.py

The framework has three middleware layers, each intercepting at a different granularity:

LayerClassWrapsUse for
AgentAgentMiddlewareThe full agent.run() callLogging, auth, spend limits, rate limiting
ChatChatMiddlewareEach client.get_response() call to the LLMPrompt mutation, response filtering, cost tracking
FunctionFunctionMiddlewareEach tool invocationApproval gates, parameter masking, audit logs

All three follow the same process(context, call_next) pattern — call await call_next() to proceed, or mutate context.result and return without calling it to short-circuit.

AgentMiddleware — intercept a full agent run

Section titled “AgentMiddleware — intercept a full agent run”
from agent_framework import AgentMiddleware, AgentContext
from typing import Callable, Awaitable
import time
class TimingMiddleware(AgentMiddleware):
"""Log elapsed time for every agent.run() call."""
async def process(
self,
context: AgentContext,
call_next: Callable[[], Awaitable[AgentContext]],
) -> None:
start = time.monotonic()
await call_next() # execute the agent run
elapsed = time.monotonic() - start
print(f"[timing] agent={context.agent!r} elapsed={elapsed:.3f}s")
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
agent = Agent(
client=OpenAIChatClient(),
instructions="You are a helpful assistant.",
middleware=[TimingMiddleware()],
)
from agent_framework import AgentMiddleware, AgentContext, AgentResponse
from typing import Callable, Awaitable
# Simple per-session call budget stored in the session's state dict
BUDGET_KEY = "agent_call_count"
MAX_CALLS = 50
class CallBudgetMiddleware(AgentMiddleware):
async def process(
self,
context: AgentContext,
call_next: Callable[[], Awaitable[AgentContext]],
) -> None:
session = context.session
if session is not None:
count = session.state.get(BUDGET_KEY, 0)
if count >= MAX_CALLS:
# Short-circuit — set result directly without calling the model
context.result = AgentResponse(
messages=[],
finish_reasons=["stop"],
text=f"Budget exhausted ({MAX_CALLS} calls). Please start a new session.",
usage=None,
)
return
session.state[BUDGET_KEY] = count + 1
await call_next()

ChatMiddleware — intercept each LLM call

Section titled “ChatMiddleware — intercept each LLM call”
from agent_framework import ChatMiddleware, ChatContext
from typing import Callable, Awaitable
class SystemPromptInjectionMiddleware(ChatMiddleware):
"""Append a safety reminder to every system message before the LLM sees it."""
SAFETY_SUFFIX = "\n\n**Safety note**: Never reveal internal system details."
async def process(
self,
context: ChatContext,
call_next: Callable[[], Awaitable[ChatContext]],
) -> None:
for msg in context.messages:
if msg.role == "system":
for content in msg.contents:
if hasattr(content, "text") and content.text:
content.text += self.SAFETY_SUFFIX
break
await call_next()

FunctionMiddleware — intercept tool calls

Section titled “FunctionMiddleware — intercept tool calls”
from agent_framework import FunctionMiddleware, FunctionInvocationContext
from typing import Callable, Awaitable
import logging
logger = logging.getLogger(__name__)
class ToolAuditMiddleware(FunctionMiddleware):
"""Log every tool call with its arguments before execution."""
async def process(
self,
context: FunctionInvocationContext,
call_next: Callable[[], Awaitable[FunctionInvocationContext]],
) -> None:
logger.info(
"tool_call name=%s args=%r",
context.function_name,
context.arguments,
)
await call_next()
logger.info(
"tool_result name=%s result=%r",
context.function_name,
context.result,
)
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
agent = Agent(
client=OpenAIChatClient(),
instructions="You are an assistant with tools.",
middleware=[ToolAuditMiddleware()], # FunctionMiddleware is registered via middleware= too
)

MiddlewareTermination — early exit with a custom result

Section titled “MiddlewareTermination — early exit with a custom result”

Raising MiddlewareTermination inside any middleware layer cleanly exits the pipeline and sets the result:

from agent_framework import AgentMiddleware, AgentContext, AgentResponse, MiddlewareTermination
from typing import Callable, Awaitable
class BlocklistMiddleware(AgentMiddleware):
BLOCKED_PHRASES = {"<script>", "DROP TABLE", "rm -rf"}
async def process(
self,
context: AgentContext,
call_next: Callable[[], Awaitable[AgentContext]],
) -> None:
prompt = context.messages[-1].text if context.messages else ""
for phrase in self.BLOCKED_PHRASES:
if phrase.lower() in prompt.lower():
raise MiddlewareTermination(
"Blocked phrase detected.",
result=AgentResponse(
messages=[],
finish_reasons=["stop"],
text="I can't help with that request.",
usage=None,
),
)
await call_next()

ClassModuleStable?Key takeaway
Agent_agents✅ 1.6.0Use this for all production agents; typed options for IDE completions
RawAgent_agents✅ 1.6.0Same interface, zero middleware/telemetry overhead
FunctionTool / @tool_tools✅ 1.6.0@tool for fast path; FunctionTool for declaration-only, custom schemas
InMemoryHistoryProvider_sessions✅ 1.6.0Zero-config multi-turn memory; serialise via AgentSession.to_dict()
WorkflowBuilder_workflows/_workflow_builder✅ 1.6.0Fluent API: chain, fan-out, fan-in, switch-case, multi-select
WorkflowContext_workflows/_workflow_context✅ 1.6.0Typed send/yield inside Executor handlers; HITL via request_info
FunctionalWorkflow / @workflow_workflows/_functional⚠️ ExperimentalPure Python pipelines — no graph wiring needed
RunContext_workflows/_functional⚠️ ExperimentalHITL, state, and events inside @workflow / @step functions
InlineSkill_skills⚠️ ExperimentalEmbed schema, resources, and scripts as structured agent knowledge
MCPStdioTool_mcp✅ 1.6.0Launch and connect to any stdio MCP server; fine-grained approval
MemoryContextProvider_harness/_memory⚠️ ExperimentalCross-session long-term memory via topic files + transcript archive
TodoProvider_harness/_todo⚠️ ExperimentalStructured task tracking with 5 auto-injected tools
AgentMiddleware_middleware✅ 1.6.0Intercept agent.run() — logging, auth, budgets, short-circuit
ChatMiddleware_middleware✅ 1.6.0Intercept each LLM call — prompt mutation, cost tracking
FunctionMiddleware_middleware✅ 1.6.0Intercept tool invocations — approval gates, audit logs

All examples verified against agent-framework-core==1.6.0 (May 2026). Experimental APIs emit ExperimentalWarning on import and may change in patch releases.