Microsoft Agent Framework (Python) — Class Deep Dives Vol. 12
Microsoft Agent Framework Python — Class Deep Dives Vol. 12
Section titled “Microsoft Agent Framework Python — Class Deep Dives Vol. 12”Verified against agent-framework 1.8.1 (installed June 2026). Every constructor
signature, parameter description, and code example was derived from the installed package
source at /tmp/af_install/agent_framework/. No API name has been guessed or inferred
from documentation alone.
Previous volumes:
- Vol. 1 —
Agent,RawAgent,FunctionTool,WorkflowBuilder,RunContext,InlineSkill,MCPStdioTool - Vol. 2 —
FileHistoryProvider, middleware ABCs,CompactionProvider, strategy classes,FileCheckpointStorage,LocalEvaluator,WorkflowRunResult - Vol. 3 — harness providers, compaction strategies,
WorkflowViz, MCP transports - Vol. 4 — message/chat types,
ResponseStream,AgentContext, functional workflows,SkillsSourcecomposition, eval data model, tokenizer,ConversationSplit - Vol. 5 —
Executor,AgentExecutor, edge groups,Runner,SessionContext,AgentSession,BaseChatClient,SecretString,WorkflowCheckpoint, exception hierarchy - Vol. 6 — feature staging,
WorkflowRunState,WorkflowExecutor,AgentResponse, embedding clients,FunctionInvocationConfiguration,ClassSkill,Annotation, capability protocols, middleware layers - Vol. 7 —
ContextProvider,BackgroundTaskInfo, orchestration builders,AgentFactory,SecureAgentConfig,ObservabilitySettings - Vol. 8 — file store hierarchy,
FileAccessProvider,MCPSkill,ToolMode, eval helpers,ChatContext,WorkflowAgent, compaction, history providers, skills composition - Vol. 9 —
OllamaChatClient,PurviewPolicyMiddleware,DurableAIAgent,GitHubCopilotAgent,HyperlightExecuteCodeTool,Mem0ContextProvider, Redis providers, Magentic internals,FileSkillsSource - Vol. 10 —
Workflow,InProcRunnerContext,FunctionExecutor,FunctionInvocationLayer, memory harness, todo harness,DeduplicatingSkillsSource,SkillsProvider,MCPTaskOptions,InMemoryCheckpointStorage,BaseAgent - Vol. 11 — telemetry layers,
Edge+EdgeGroupprimitives,Case+Default,EdgeRunnerhierarchy,ExecutionContext,WorkflowGraphValidator,MCPTool, serialization mixin,Evaluator,PerServiceCallHistoryPersistingMiddleware
This volume covers ten class groups that were not documented in earlier volumes — focusing on the Skills type-system ABCs, the code-defined and file-backed skill script primitives, the structural agent protocol, the workflow runner-context protocol, the serialisable edge-routing descriptors, the workflow validation error taxonomy, the A2A protocol integration, and the remaining specialised exception classes:
| # | Class / group | Module |
|---|---|---|
| 1 | Skill + SkillResource + SkillScript | agent_framework._skills |
| 2 | FileSkill | agent_framework._skills |
| 3 | InlineSkillResource + InlineSkillScript | agent_framework._skills |
| 4 | FileSkillScript + SkillScriptRunner | agent_framework._skills |
| 5 | SupportsAgentRun | agent_framework._agents |
| 6 | RunnerContext | agent_framework._workflows._runner_context |
| 7 | SwitchCaseEdgeGroupCase + SwitchCaseEdgeGroupDefault | agent_framework._workflows._edge |
| 8 | ValidationTypeEnum + WorkflowValidationError + subclasses | agent_framework._workflows._validation |
| 9 | A2AAgent + A2AAgentSession + A2AExecutor | agent_framework.a2a |
| 10 | WorkflowRunnerException + WorkflowCheckpointException + MiddlewareException | agent_framework.exceptions |
Experimental notice: All Skills classes carry the
@experimental(feature_id=ExperimentalFeature.SKILLS)decorator. They emitExperimentalWarningon import and must not be used in production without accepting API-stability risk. Suppress the warning withwarnings.filterwarnings("ignore", category=ExperimentalWarning)in tests.
1 · Skill + SkillResource + SkillScript
Section titled “1 · Skill + SkillResource + SkillScript”Module: agent_framework._skills
These three abstract base classes form the Skills type system. Every concrete skill
class (FileSkill, InlineSkill, ClassSkill) extends Skill; every resource
implementation extends SkillResource; every script implementation extends SkillScript.
Understanding these ABCs is the prerequisite for writing custom skill backends.
Class signatures
Section titled “Class signatures”class Skill(ABC): @property @abstractmethod def frontmatter(self) -> SkillFrontmatter: ...
@abstractmethod async def get_content(self) -> str: ...
async def get_resource(self, name: str) -> SkillResource | None: return None # concrete classes override
async def get_script(self, name: str) -> SkillScript | None: return None # concrete classes override
class SkillResource(ABC): def __init__( self, *, name: str, # required; raises ValueError if empty or whitespace-only description: str | None = None, ) -> None: ...
@abstractmethod async def read(self, **kwargs: Any) -> Any: ...
class SkillScript(ABC): def __init__( self, *, name: str, # required; raises ValueError if empty or whitespace-only description: str | None = None, ) -> None: ...
@property def parameters_schema(self) -> dict[str, Any] | None: return None # concrete classes may override
@abstractmethod async def run( self, skill: Skill, args: dict[str, Any] | list[str] | None = None, **kwargs: Any, ) -> Any: ...Key points
Section titled “Key points”Skill.get_content()is the mandatory method — it returns the full skill description (SKILL.md text for file skills, synthesised XML for code-defined skills).SkillsProvidercalls this to inject the skill into the system prompt.Skill.get_resource()andSkill.get_script()returnNoneby default. Override them only in concrete classes that support resources or scripts.SkillResource.read()receives**kwargs— these are the runtime keyword arguments forwarded from the agent invocation. A resource backed by a function that does not declare**kwargswill still work; the framework inspects the signature and omits unsupported kwargs.SkillScript.parameters_schemadrives the JSON Schema that the LLM sees when deciding how to call the script. ReturnNone(the default) to expose no parameter information; override to return a valid JSON Schema dict.SkillScript.run()receivesargsthat may be adict(for inline/function-backed scripts) or alist[str](for file-backed, CLI-style scripts). Implementations must handle or reject the list form explicitly.
Example 1 — Custom Skill backed by a database row
Section titled “Example 1 — Custom Skill backed by a database row”import asyncioimport warningsfrom agent_framework import SkillResource, SkillScript, SkillFrontmatterfrom agent_framework._skills import Skillfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
class DBSkill(Skill): """A skill whose content is fetched from a database on demand."""
def __init__(self, row: dict): self._row = row self._fm = SkillFrontmatter( name=row["name"], description=row.get("description"), )
@property def frontmatter(self) -> SkillFrontmatter: return self._fm
async def get_content(self) -> str: # In production this might hit an async DB driver return self._row["content"]
async def main(): row = { "name": "sql-assistant", "description": "Knows your database schema", "content": "# SQL Assistant\nYou know the schema: users(id, email), orders(id, user_id, amount).", } skill = DBSkill(row) print(skill.frontmatter.name) # sql-assistant print(await skill.get_content()) # # SQL Assistant\n... print(await skill.get_resource("x")) # None — default
asyncio.run(main())Example 2 — Custom SkillResource with dynamic content
Section titled “Example 2 — Custom SkillResource with dynamic content”import asyncioimport warningsfrom agent_framework._skills import SkillResourcefrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
class LiveSchemaResource(SkillResource): """Returns the current database schema on every read."""
async def read(self, **kwargs) -> str: # Simulate async DB introspection return "CREATE TABLE users (id INT, email TEXT);"
async def main(): res = LiveSchemaResource(name="schema", description="Live DB schema") print(res.name) # schema print(await res.read()) # CREATE TABLE users ...
asyncio.run(main())Example 3 — Custom SkillScript with typed args
Section titled “Example 3 — Custom SkillScript with typed args”import asyncioimport warningsfrom typing import Anyfrom agent_framework._skills import SkillScript, Skillfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
class SummaryScript(SkillScript): @property def parameters_schema(self) -> dict[str, Any]: return { "type": "object", "properties": { "max_words": {"type": "integer", "description": "Maximum words in summary"} }, "required": ["max_words"], }
async def run(self, skill: Skill, args: dict | list | None = None, **kwargs) -> str: if isinstance(args, list): raise TypeError("SummaryScript requires keyword args, not a list.") max_words = (args or {}).get("max_words", 100) content = await skill.get_content() words = content.split()[:max_words] return " ".join(words)2 · FileSkill
Section titled “2 · FileSkill”Module: agent_framework._skills
FileSkill is the concrete Skill implementation backed by a SKILL.md file on disk.
FileSkillsSource creates FileSkill instances by scanning directories; you can also
construct one manually for testing or custom discovery scenarios.
Class signature
Section titled “Class signature”class FileSkill(Skill): def __init__( self, *, frontmatter: SkillFrontmatter, content: str, # raw SKILL.md text (including YAML frontmatter) path: str, # absolute path to the skill directory resources: Sequence[SkillResource] | None = None, scripts: Sequence[SkillScript] | None = None, ) -> None: ...
@property def frontmatter(self) -> SkillFrontmatter: ...
async def get_content(self) -> str: # Returns content with an appended <scripts> XML block when scripts exist. # Result is cached after the first call. ...
async def get_resource(self, name: str) -> SkillResource | None: # Case-insensitive lookup across self._resources ...
async def get_script(self, name: str) -> SkillScript | None: # Case-insensitive lookup across self._scripts ...Key points
Section titled “Key points”get_content()appends a<scripts>XML block to the raw content whenscriptsis non-empty. This makes script names and parameter schemas discoverable by the LLM without manual documentation. The synthesised block looks like:<scripts><script name="analyze"><description>Run analysis</description><parameters_schema>{"type": "object", ...}</parameters_schema></script></scripts>- The result of
get_content()is cached after the first call. If you mutate_scriptsafter construction the cache will not reflect the change. get_resource()andget_script()do case-insensitive name matching, so"Schema"and"schema"are the same resource.pathis an informational attribute only —FileSkillitself does not read from disk.FileSkillsSourcereads the SKILL.md file before constructing theFileSkill.
Example 1 — Constructing a FileSkill manually (testing)
Section titled “Example 1 — Constructing a FileSkill manually (testing)”import asyncioimport warningsfrom agent_framework import FileSkill, SkillFrontmatterfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
async def main(): fm = SkillFrontmatter(name="test-skill", description="A test skill") skill = FileSkill( frontmatter=fm, content="# Test Skill\nDo things.", path="/skills/test-skill", )
content = await skill.get_content() print(content) # "# Test Skill\nDo things."
# Second call returns cached value content2 = await skill.get_content() assert content is content2 # same object — cached
asyncio.run(main())Example 2 — FileSkill with scripts produces XML block
Section titled “Example 2 — FileSkill with scripts produces XML block”import asyncioimport warningsfrom agent_framework import FileSkill, SkillFrontmatter, InlineSkillScriptfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
async def run_analysis(query: str) -> str: return f"Analysis result for: {query}"
async def main(): fm = SkillFrontmatter(name="analytics-skill") script = InlineSkillScript(name="analyze", description="Analyse data", function=run_analysis) skill = FileSkill( frontmatter=fm, content="# Analytics\nYou help analyse data.", path="/skills/analytics", scripts=[script], )
content = await skill.get_content() print(content) # Includes appended: # <scripts> # <script name="analyze"> # <description>Analyse data</description> # <parameters_schema>{"type":"object","properties":{"query":{...}},...}</parameters_schema> # </script> # </scripts>
asyncio.run(main())Example 3 — Case-insensitive resource lookup
Section titled “Example 3 — Case-insensitive resource lookup”import asyncioimport warningsfrom agent_framework import FileSkill, SkillFrontmatter, InlineSkillResourcefrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
async def main(): fm = SkillFrontmatter(name="my-skill") res = InlineSkillResource(name="Schema", content="id INT, name TEXT") skill = FileSkill( frontmatter=fm, content="# My Skill", path="/skills/my-skill", resources=[res], )
found = await skill.get_resource("schema") # lowercase — still found assert found is res missing = await skill.get_resource("other") assert missing is None
asyncio.run(main())3 · InlineSkillResource + InlineSkillScript
Section titled “3 · InlineSkillResource + InlineSkillScript”Module: agent_framework._skills
These two classes are the code-defined counterparts to FileSkillScript. Use them when
you want a skill resource or script backed by a Python callable rather than a file on disk.
Class signatures
Section titled “Class signatures”class InlineSkillResource(SkillResource): def __init__( self, *, name: str, description: str | None = None, content: str | None = None, # static string content function: Callable[..., Any] | None = None, # dynamic callable # Exactly one of content or function must be provided ) -> None: ...
async def read(self, **kwargs: Any) -> Any: # Returns content directly, or calls function (awaiting if async) ...
class InlineSkillScript(SkillScript): def __init__( self, *, name: str, description: str | None = None, function: Callable[..., Any], # required ) -> None: ...
@property def parameters_schema(self) -> dict[str, Any] | None: # Lazily generated from function's signature via FunctionTool introspection # Returns None for functions with no introspectable parameters ...
async def run( self, skill: Skill, args: dict[str, Any] | list[str] | None = None, **kwargs: Any, ) -> Any: # Raises TypeError if args is a list (list form only for file-based scripts) ...Key points
Section titled “Key points”InlineSkillResource:
- Mutual exclusivity — exactly one of
contentorfunctionmust be provided. Providing neither or both raisesValueError. - The
_accepts_kwargsflag is precomputed at construction time by inspecting the function signature. This avoids repeatedinspect.signature()calls on everyread(). functionmay be sync or async — the framework awaits it automatically.
InlineSkillScript:
parameters_schemais lazily generated on first access by creating a temporaryFunctionTooland calling.parameters(). If the function has no parameters or none with introspectable types it returnsNone.argsmust be adictorNone. Passing alist[str]raisesTypeErrorbecause keyword-style binding is the only supported mode for inline scripts.- Like resources,
functionmay be sync or async.
Example 1 — Static vs callable InlineSkillResource
Section titled “Example 1 — Static vs callable InlineSkillResource”import asyncioimport warningsfrom agent_framework import InlineSkillResourcefrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
async def fetch_config() -> str: return '{"model": "gpt-4.1", "max_tokens": 4096}'
async def main(): # Static resource static_res = InlineSkillResource(name="readme", content="This skill does X.") print(await static_res.read()) # "This skill does X."
# Callable resource (async) dynamic_res = InlineSkillResource(name="config", function=fetch_config) print(await dynamic_res.read()) # '{"model": "gpt-4.1", "max_tokens": 4096}'
# Callable with kwargs forwarding def schema_for_table(table: str = "users") -> str: schemas = {"users": "id INT, email TEXT", "orders": "id INT, amount DECIMAL"} return schemas.get(table, "unknown")
schema_res = InlineSkillResource(name="table-schema", function=schema_for_table) # kwargs are only forwarded if the function declares **kwargs print(await schema_res.read()) # "id INT, email TEXT" (uses default)
asyncio.run(main())Example 2 — InlineSkillScript with auto-generated schema
Section titled “Example 2 — InlineSkillScript with auto-generated schema”import asyncioimport warningsfrom agent_framework import InlineSkillScript, SkillFrontmatter, InlineSkillfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
async def summarise(text: str, max_words: int = 50) -> str: words = text.split()[:max_words] return " ".join(words) + ("..." if len(text.split()) > max_words else "")
async def main(): script = InlineSkillScript(name="summarise", description="Summarise text", function=summarise)
# Schema is generated lazily from the function signature schema = script.parameters_schema print(schema) # {"type": "object", "properties": {"text": {...}, "max_words": {...}}, "required": ["text"]}
# Run with keyword args skill_content = "Skill instructions here." fm = SkillFrontmatter(name="demo") skill = InlineSkill(frontmatter=fm, instructions=skill_content) result = await script.run(skill, args={"text": "Hello world this is a test sentence with many words."}) print(result) # "Hello world this is a test sentence with many words."
asyncio.run(main())Example 3 — Composing a skill with resources and scripts
Section titled “Example 3 — Composing a skill with resources and scripts”import asyncioimport warningsfrom agent_framework import InlineSkill, InlineSkillResource, InlineSkillScript, SkillFrontmatterfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
DATABASE_SCHEMA = "users(id INT, email TEXT), orders(id INT, user_id INT, amount DECIMAL)"
async def query_db(sql: str) -> str: # Simulated query execution return f"Result of: {sql}"
async def main(): fm = SkillFrontmatter(name="db-assistant", description="Database query skill") skill = InlineSkill( frontmatter=fm, instructions="You help users query the database. Use the schema resource before writing SQL.", resources=[ InlineSkillResource(name="schema", content=DATABASE_SCHEMA), ], scripts=[ InlineSkillScript(name="query", description="Execute a SQL query", function=query_db), ], )
print(skill.frontmatter.name) # db-assistant resource = await skill.get_resource("schema") print(await resource.read()) # users(id INT, ... script = await skill.get_script("query") result = await script.run(skill, args={"sql": "SELECT * FROM users LIMIT 10"}) print(result) # Result of: SELECT...
asyncio.run(main())4 · FileSkillScript + SkillScriptRunner
Section titled “4 · FileSkillScript + SkillScriptRunner”Module: agent_framework._skills
FileSkillScript is the file-path-backed counterpart to InlineSkillScript. Where inline
scripts run a Python callable in-process, file scripts delegate execution to a
SkillScriptRunner — a @runtime_checkable protocol that can be any callable matching
the required signature.
Class signatures
Section titled “Class signatures”class FileSkillScript(SkillScript): def __init__( self, *, name: str, description: str | None = None, full_path: str, # absolute path; raises ValueError if relative or empty runner: SkillScriptRunner | None = None, # required for execution ) -> None: ...
@property def parameters_schema(self) -> dict[str, Any] | None: # Always returns {"type": "array", "items": {"type": "string"}} # Signals to the LLM to pass CLI-style positional arguments as a JSON string array ...
async def run( self, skill: Skill, args: dict[str, Any] | list[str] | None = None, **kwargs: Any, ) -> Any: # Raises TypeError if skill is not a FileSkill # Raises ValueError if runner is None # Delegates to runner(skill, self, args); awaits if runner returns a coroutine ...
@runtime_checkableclass SkillScriptRunner(Protocol): def __call__( self, skill: FileSkill, script: FileSkillScript, args: dict[str, Any] | list[str] | None = None, ) -> Any: ...Key points
Section titled “Key points”full_pathmust be an absolute path. Providing a relative path or empty string raisesValueErrorat construction time — this prevents silent path-resolution bugs.parameters_schemaalways returns the fixed CLI array schema:{"type": "array", "items": {"type": "string"}}. This tells the LLM that file scripts accept positional string arguments like command-line tools, not named keyword arguments.run()performs two type checks before delegating: it verifies theskillis aFileSkilland that arunnerwas provided. Both checks produce descriptiveTypeError/ValueErrormessages.SkillScriptRunneris a@runtime_checkableProtocol. Any sync or async callable with the matching signature satisfies it — you canisinstance(fn, SkillScriptRunner)to verify.- Since the runner can return a coroutine,
run()awaits it wheninspect.isawaitable(result)isTrue.
Example 1 — Simple subprocess runner
Section titled “Example 1 — Simple subprocess runner”import asyncioimport subprocessimport warningsfrom agent_framework import FileSkill, FileSkillScript, SkillFrontmatterfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
def subprocess_runner(skill: FileSkill, script: FileSkillScript, args=None) -> str: """Run the script file as a subprocess and return stdout.""" cmd = ["python3", script.full_path] + (args or []) result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: raise RuntimeError(f"Script failed: {result.stderr}") return result.stdout.strip()
async def main(): fm = SkillFrontmatter(name="data-pipeline") script = FileSkillScript( name="transform.py", description="Transform data file", full_path="/skills/data-pipeline/scripts/transform.py", runner=subprocess_runner, )
# Check parameters schema print(script.parameters_schema) # {"type": "array", "items": {"type": "string"}} # The LLM will pass: ["--input", "data.csv", "--output", "out.json"]
asyncio.run(main())Example 2 — Async runner with sandboxed execution
Section titled “Example 2 — Async runner with sandboxed execution”import asyncioimport warningsfrom agent_framework import FileSkill, FileSkillScript, SkillFrontmatterfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
async def sandboxed_runner(skill: FileSkill, script: FileSkillScript, args=None) -> str: """Async runner that executes in a sandboxed environment.""" # Simulate async sandboxed execution await asyncio.sleep(0) # yield to event loop script_name = script.name skill_name = skill.frontmatter.name return f"[SANDBOX] Executed {script_name} from {skill_name} with args: {args}"
async def main(): fm = SkillFrontmatter(name="secure-skill") script = FileSkillScript( name="run.py", full_path="/skills/secure-skill/scripts/run.py", runner=sandboxed_runner, # async runner — awaited automatically )
fm2 = SkillFrontmatter(name="secure-skill") skill = FileSkill( frontmatter=fm2, content="# Secure Skill", path="/skills/secure-skill", scripts=[script], )
result = await script.run(skill, args=["--mode", "strict", "--input", "file.txt"]) print(result) # [SANDBOX] Executed run.py from secure-skill with args: ['--mode', 'strict', '--input', 'file.txt']
asyncio.run(main())Example 3 — Verifying a callable satisfies SkillScriptRunner
Section titled “Example 3 — Verifying a callable satisfies SkillScriptRunner”import warningsfrom agent_framework import SkillScriptRunnerfrom agent_framework._feature_stage import ExperimentalWarning
warnings.filterwarnings("ignore", category=ExperimentalWarning)
def my_runner(skill, script, args=None): return "done"
class CallableRunner: def __call__(self, skill, script, args=None): return "done"
print(isinstance(my_runner, SkillScriptRunner)) # True — plain functionprint(isinstance(CallableRunner(), SkillScriptRunner)) # True — callable class5 · SupportsAgentRun
Section titled “5 · SupportsAgentRun”Module: agent_framework._agents
SupportsAgentRun is a @runtime_checkable structural protocol that defines the minimal
interface any agent must expose to participate in workflows, orchestration, and A2A. You
never need to inherit from it — any class with the right attributes and methods satisfies
it automatically.
Interface summary
Section titled “Interface summary”@runtime_checkableclass SupportsAgentRun(Protocol): id: str name: str | None description: str | None
@overload def run( self, messages: AgentRunInputs | None = None, *, stream: Literal[False] = ..., session: AgentSession | None = None, function_invocation_kwargs: Mapping[str, Any] | None = None, client_kwargs: Mapping[str, Any] | None = None, ) -> Awaitable[AgentResponse[Any]]: ...
@overload def run( self, messages: AgentRunInputs | None = None, *, stream: Literal[True], session: AgentSession | None = None, function_invocation_kwargs: Mapping[str, Any] | None = None, client_kwargs: Mapping[str, Any] | None = None, ) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...
def create_session(self, *, session_id: str | None = None) -> AgentSession: ... def get_session(self, service_session_id: str, *, session_id: str | None = None) -> AgentSession: ...Key points
Section titled “Key points”WorkflowBuilder.add_edge(),add_chain(),add_fan_out_edges()etc. all acceptSupportsAgentRunalongsideExecutor. Any conforming agent can be dropped into a workflow graph.- The protocol is
@runtime_checkable— you can guard withisinstance(obj, SupportsAgentRun)before adding an agent to a workflow or passing it toA2AExecutor. - All three built-in agent classes (
Agent,RawAgent,WorkflowAgent) satisfy this protocol. Third-party agents from other frameworks can also participate if they expose these attributes and methods. function_invocation_kwargsallows per-call overrides of tool invocation configuration without modifying the agent constructor.client_kwargsare forwarded verbatim to the underlying chat client (e.g.{"max_tokens": 1000}for token capping).
Example 1 — Minimal custom agent without inheriting framework classes
Section titled “Example 1 — Minimal custom agent without inheriting framework classes”import asynciofrom agent_framework import AgentResponse, AgentResponseUpdate, AgentSession, SupportsAgentRun
class EchoAgent: """A fully custom agent that echoes input — no framework inheritance needed."""
def __init__(self, name: str): self.id = f"echo-{name}" self.name = name self.description = "Echoes input back"
async def run(self, messages=None, *, stream=False, session=None, **kwargs): text = str(messages) if messages else "echo" if stream: async def _stream(): yield AgentResponseUpdate(content=text) return _stream() return AgentResponse(messages=[], text=text)
def create_session(self, *, session_id=None): return AgentSession(session_id=session_id)
def get_session(self, service_session_id, *, session_id=None): return AgentSession(service_session_id=service_session_id, session_id=session_id)
async def main(): agent = EchoAgent("my-echo")
# Verify protocol compliance assert isinstance(agent, SupportsAgentRun)
response = await agent.run("Hello!") print(response.text) # "Hello!"
asyncio.run(main())Example 2 — Using a custom agent in a workflow
Section titled “Example 2 — Using a custom agent in a workflow”import asynciofrom agent_framework import WorkflowBuilder, Agent, AgentResponse, AgentSession, SupportsAgentRunfrom agent_framework.openai import OpenAIChatClient
class SentimentAgent: id = "sentiment" name = "Sentiment" description = "Classifies sentiment"
async def run(self, messages=None, *, stream=False, session=None, **kwargs): text = str(messages or "") sentiment = "positive" if "good" in text.lower() else "negative" return AgentResponse(messages=[], text=sentiment)
def create_session(self, *, session_id=None): return AgentSession(session_id=session_id)
def get_session(self, sid, *, session_id=None): return AgentSession(service_session_id=sid, session_id=session_id)
async def main(): llm_agent = Agent(client=OpenAIChatClient(), instructions="Summarize the text.") sentiment = SentimentAgent()
assert isinstance(sentiment, SupportsAgentRun)
# Both can be used in the same workflow graph workflow = ( WorkflowBuilder() .add_chain([llm_agent, sentiment]) .build() ) print("Workflow built with custom agent")
asyncio.run(main())Example 3 — Protocol guard before workflow registration
Section titled “Example 3 — Protocol guard before workflow registration”from agent_framework import SupportsAgentRun, WorkflowBuilder
def safe_add_to_workflow(builder: WorkflowBuilder, agents: list) -> WorkflowBuilder: """Only add agents that satisfy SupportsAgentRun; log others.""" for agent in agents: if isinstance(agent, SupportsAgentRun): builder.add_chain([agent]) else: print(f"Skipping {agent}: does not satisfy SupportsAgentRun") return builder6 · RunnerContext
Section titled “6 · RunnerContext”Module: agent_framework._workflows._runner_context
RunnerContext is the @runtime_checkable protocol that the workflow execution engine
(Runner) uses to communicate with its execution context. InProcRunnerContext (covered in
Vol. 10)
is the built-in implementation. Custom implementations can be used for testing, tracing, or
embedding workflows in non-standard environments.
Interface summary (key methods)
Section titled “Interface summary (key methods)”@runtime_checkableclass RunnerContext(Protocol): # Messaging async def send_message(self, message: WorkflowMessage) -> None: ... async def drain_messages(self) -> dict[str, list[WorkflowMessage]]: ... async def has_messages(self) -> bool: ...
# Events async def add_event(self, event: WorkflowEvent) -> None: ... async def drain_events(self) -> list[WorkflowEvent]: ... async def has_events(self) -> bool: ... async def next_event(self) -> WorkflowEvent: ...
# Streaming mode def set_streaming(self, streaming: bool) -> None: ... def is_streaming(self) -> bool: ...
# Checkpointing (optional — has_checkpointing() gates access) def has_checkpointing(self) -> bool: ... def set_runtime_checkpoint_storage(self, storage: CheckpointStorage) -> None: ... def clear_runtime_checkpoint_storage(self) -> None: ... async def create_checkpoint(self, workflow_name, graph_signature_hash, state, previous_checkpoint_id, iteration_count, metadata=None) -> CheckpointID: ... async def load_checkpoint(self, checkpoint_id: CheckpointID) -> WorkflowCheckpoint | None: ... async def apply_checkpoint(self, checkpoint: WorkflowCheckpoint) -> None: ...
# HITL async def add_request_info_event(self, event: WorkflowEvent[Any]) -> None: ... async def send_request_info_response(self, request_id: str, response: Any) -> None: ... async def get_pending_request_info_events(self) -> dict[str, WorkflowEvent[Any]]: ...
# Lifecycle def reset_for_new_run(self) -> None: ...
# Yield output classification def set_yield_output_classifier(self, classifier: YieldOutputClassifier) -> None: ... def classify_yielded_output(self, executor_id: str) -> YieldOutputEventType | None: ...Key points
Section titled “Key points”- Messaging — the runner routes
WorkflowMessageobjects between executors viasend_message()/drain_messages(). The dict returned bydrain_messages()maps executor IDs to message lists; the runner delivers each batch to the appropriate executor. - Events — workflow events (including
request_infoHITL events and output events) flow throughadd_event()/drain_events(). The caller iteratesnext_event()to consume a workflow’s output stream. - Checkpointing is guarded — always call
has_checkpointing()beforecreate_checkpoint()orload_checkpoint(). Calling checkpoint methods without storage configured raisesWorkflowCheckpointException. - HITL —
add_request_info_event()registers a pending human input request;send_request_info_response()resolves it;get_pending_request_info_events()returns all unresolved requests. The HITL mechanism is the same across allRunnerContextimplementations. - Streaming mode —
set_streaming(True)switches agents to streaming mode before the workflow run starts.Workflow.run()calls this based on thestream=Trueargument you pass.
Example 1 — Minimal test RunnerContext stub
Section titled “Example 1 — Minimal test RunnerContext stub”import asynciofrom collections import defaultdictfrom agent_framework import WorkflowEvent, WorkflowMessage
class TestRunnerContext: """Minimal in-memory RunnerContext for unit testing workflow executors."""
def __init__(self): self._messages: dict[str, list] = defaultdict(list) self._events: list = [] self._streaming = False
async def send_message(self, message: WorkflowMessage) -> None: self._messages[message.target_executor_id].append(message)
async def drain_messages(self) -> dict: result = dict(self._messages) self._messages.clear() return result
async def has_messages(self) -> bool: return bool(self._messages)
async def add_event(self, event: WorkflowEvent) -> None: self._events.append(event)
async def drain_events(self) -> list: events = list(self._events) self._events.clear() return events
async def has_events(self) -> bool: return bool(self._events)
async def next_event(self) -> WorkflowEvent: while not self._events: await asyncio.sleep(0.01) return self._events.pop(0)
def has_checkpointing(self) -> bool: return False
def set_runtime_checkpoint_storage(self, storage) -> None: pass
def clear_runtime_checkpoint_storage(self) -> None: pass
def reset_for_new_run(self) -> None: self._messages.clear() self._events.clear()
def set_streaming(self, streaming: bool) -> None: self._streaming = streaming
def is_streaming(self) -> bool: return self._streaming
async def add_request_info_event(self, event) -> None: self._events.append(event)
async def send_request_info_response(self, request_id: str, response) -> None: pass
async def get_pending_request_info_events(self) -> dict: return {}
def set_yield_output_classifier(self, classifier) -> None: pass
def classify_yielded_output(self, executor_id: str): return NoneExample 2 — Checking checkpointing before creating a checkpoint
Section titled “Example 2 — Checking checkpointing before creating a checkpoint”import asynciofrom agent_framework import InMemoryCheckpointStorage, InProcRunnerContext
async def main(): ctx = InProcRunnerContext() print(ctx.has_checkpointing()) # False — no storage configured
# Set storage at runtime (overrides build-time config for this run only) storage = InMemoryCheckpointStorage() ctx.set_runtime_checkpoint_storage(storage) print(ctx.has_checkpointing()) # True
# After this run, clear the override ctx.clear_runtime_checkpoint_storage() print(ctx.has_checkpointing()) # False again
asyncio.run(main())7 · SwitchCaseEdgeGroupCase + SwitchCaseEdgeGroupDefault
Section titled “7 · SwitchCaseEdgeGroupCase + SwitchCaseEdgeGroupDefault”Module: agent_framework._workflows._edge
These two classes are the serialisable counterparts to the runtime Case and Default
objects covered in Vol. 11.
Case/Default carry live callables; SwitchCaseEdgeGroupCase/SwitchCaseEdgeGroupDefault
carry only metadata — a target executor ID and a human-readable condition name — and can be
safely persisted to disk or sent over the wire.
Class signatures
Section titled “Class signatures”@dataclass(init=False)class SwitchCaseEdgeGroupCase(DictConvertible): target_id: str condition_name: str | None type: str # always "Case"
def __init__( self, condition: Callable[[Any], bool] | None, # None → missing-callable placeholder target_id: str, # required; raises ValueError if empty *, condition_name: str | None = None, ) -> None: ...
@property def condition(self) -> Callable[[Any], bool]: # Returns the live callable, or a placeholder that raises RuntimeError when invoked ...
def to_dict(self) -> dict[str, Any]: # {"target_id": ..., "type": "Case", "condition_name": ...} ...
@classmethod def from_dict(cls, data: dict[str, Any]) -> SwitchCaseEdgeGroupCase: # Restores from dict; condition is set to a missing-callable placeholder ...
@dataclass(init=False)class SwitchCaseEdgeGroupDefault(DictConvertible): target_id: str type: str # always "Default"
def __init__(self, target_id: str) -> None: ... # raises ValueError if empty
def to_dict(self) -> dict[str, Any]: # {"target_id": ..., "type": "Default"} ...
@classmethod def from_dict(cls, data: dict[str, Any]) -> SwitchCaseEdgeGroupDefault: ...Key points
Section titled “Key points”- Vs
Case/Default—CaseandDefault(runtime objects) hold live Python callables that cannot be serialised.SwitchCaseEdgeGroupCaseandSwitchCaseEdgeGroupDefaultstrip the callable and keep only the target ID and name. Use the serialisable variants for:- Storing workflow topology in a database or config file
- Inspecting what routing branches exist without running the workflow
- Checkpoint metadata (the
WorkflowCheckpointstores the serialised graph)
- Missing callable placeholder —
SwitchCaseEdgeGroupCase.from_dict()reconstructs a case withcondition=None. Accessing.conditionreturns a proxy that raisesRuntimeErroron invocation. This makes missing registrations immediately visible rather than silently routing to the wrong branch. condition_nameis auto-extracted from the callable’s__name__attribute when a real callable is provided. Lambda functions get"<lambda>"; named functions get their actual name.
Example 1 — Creating and serialising routing cases
Section titled “Example 1 — Creating and serialising routing cases”from agent_framework import SwitchCaseEdgeGroupCase, SwitchCaseEdgeGroupDefault
def is_error(payload) -> bool: return isinstance(payload, dict) and payload.get("status") == "error"
# Create case with live callablecase = SwitchCaseEdgeGroupCase(is_error, target_id="error_handler")print(case.condition_name) # "is_error"print(case.target_id) # "error_handler"print(case.type) # "Case"
# Serialisedata = case.to_dict()print(data)# {"target_id": "error_handler", "type": "Case", "condition_name": "is_error"}
# Deserialise — callable is replaced with a placeholderrestored = SwitchCaseEdgeGroupCase.from_dict(data)print(restored.target_id) # "error_handler"print(restored.condition_name) # "is_error"
# The condition property returns a placeholder — do not invoke without re-registeringtry: restored.condition({"status": "error"})except RuntimeError as e: print(f"Expected: {e}")
# Default branchdefault = SwitchCaseEdgeGroupDefault(target_id="default_handler")print(default.to_dict())# {"target_id": "default_handler", "type": "Default"}Example 2 — Inspecting serialised workflow topology
Section titled “Example 2 — Inspecting serialised workflow topology”import jsonfrom agent_framework import SwitchCaseEdgeGroupCase, SwitchCaseEdgeGroupDefault
# Imagine this dict was loaded from a database or YAML configstored_topology = { "switch_cases": [ {"target_id": "csv_handler", "type": "Case", "condition_name": "is_csv"}, {"target_id": "json_handler", "type": "Case", "condition_name": "is_json"}, {"target_id": "fallback", "type": "Default"}, ]}
def load_routing(topology: dict): for entry in topology["switch_cases"]: if entry["type"] == "Case": case = SwitchCaseEdgeGroupCase.from_dict(entry) print(f"Case: {case.condition_name!r} → {case.target_id!r}") elif entry["type"] == "Default": default = SwitchCaseEdgeGroupDefault.from_dict(entry) print(f"Default → {default.target_id!r}")
load_routing(stored_topology)# Case: 'is_csv' → 'csv_handler'# Case: 'is_json' → 'json_handler'# Default → 'fallback'Example 3 — Re-registering conditions after deserialization
Section titled “Example 3 — Re-registering conditions after deserialization”from agent_framework import SwitchCaseEdgeGroupCase, Case, Default, WorkflowBuilder, Agentfrom agent_framework.openai import OpenAIChatClient
def is_csv(payload) -> bool: return str(payload).strip().startswith("id,")
def is_json(payload) -> bool: return str(payload).strip().startswith("{")
def reload_condition(serialised_case: SwitchCaseEdgeGroupCase, registry: dict) -> Case: """Restore a serialised case with its live condition callable.""" condition_fn = registry.get(serialised_case.condition_name) if condition_fn is None: raise RuntimeError(f"Condition {serialised_case.condition_name!r} not found in registry.") return Case(condition_fn, target_id=serialised_case.target_id)
registry = {"is_csv": is_csv, "is_json": is_json}stored = {"target_id": "csv_handler", "type": "Case", "condition_name": "is_csv"}case_meta = SwitchCaseEdgeGroupCase.from_dict(stored)live_case = reload_condition(case_meta, registry)print(live_case.condition({"type": "not-csv"})) # False8 · ValidationTypeEnum + WorkflowValidationError + subclasses
Section titled “8 · ValidationTypeEnum + WorkflowValidationError + subclasses”Module: agent_framework._workflows._validation
The workflow graph validator raises structured validation errors before WorkflowBuilder.build() completes. Each error carries a ValidationTypeEnum tag that identifies the type of constraint that was violated.
Enum values
Section titled “Enum values”class ValidationTypeEnum(Enum): EDGE_DUPLICATION = "EDGE_DUPLICATION" EXECUTOR_DUPLICATION = "EXECUTOR_DUPLICATION" TYPE_COMPATIBILITY = "TYPE_COMPATIBILITY" GRAPH_CONNECTIVITY = "GRAPH_CONNECTIVITY" HANDLER_OUTPUT_ANNOTATION = "HANDLER_OUTPUT_ANNOTATION" OUTPUT_VALIDATION = "OUTPUT_VALIDATION"Exception hierarchy
Section titled “Exception hierarchy”class WorkflowValidationError(WorkflowException): def __init__(self, message: str, validation_type: ValidationTypeEnum) -> None: ... # __str__: "[VALIDATION_TYPE_VALUE] message"
class EdgeDuplicationError(WorkflowValidationError): # validation_type = ValidationTypeEnum.EDGE_DUPLICATION def __init__(self, edge_id: str) -> None: ... # message: "Duplicate edge detected: {edge_id}. Each edge in the workflow must be unique."
class TypeCompatibilityError(WorkflowValidationError): # validation_type = ValidationTypeEnum.TYPE_COMPATIBILITY def __init__( self, source_executor_id: str, target_executor_id: str, source_types: list[type], target_types: list[type], ) -> None: ... # message: "Type incompatibility between executors '...' -> '...'. Source outputs ... but target handles ..."
class GraphConnectivityError(WorkflowValidationError): # validation_type = ValidationTypeEnum.GRAPH_CONNECTIVITY def __init__(self, message: str) -> None: ...Key points
Section titled “Key points”WorkflowValidationError.__str__()prefixes the message with the enum value in brackets, e.g."[TYPE_COMPATIBILITY] Type incompatibility ...". This makes log output self-documenting.TypeCompatibilityErrorcarries the full source and target type lists as attributes (source_types,target_types,source_executor_id,target_executor_id) — useful for programmatic error reporting.EdgeDuplicationErrorcarriesedge_id— the string key of the duplicate edge.- Catching
WorkflowValidationErrorcatches all validation subtypes. Catching specific subclasses allows targeted recovery logic. HANDLER_OUTPUT_ANNOTATIONandOUTPUT_VALIDATIONcurrently map toWorkflowValidationError(no dedicated subclass) — they use the base class with the appropriatevalidation_type.
Example 1 — Catching and inspecting validation errors
Section titled “Example 1 — Catching and inspecting validation errors”import asynciofrom agent_framework import WorkflowBuilder, Agent, WorkflowValidationError, TypeCompatibilityErrorfrom agent_framework._workflows._validation import ValidationTypeEnumfrom agent_framework.openai import OpenAIChatClient
async def main(): agent_a = Agent(client=OpenAIChatClient(), instructions="Step A") agent_b = Agent(client=OpenAIChatClient(), instructions="Step B")
try: # This will fail validation if type constraints are violated workflow = ( WorkflowBuilder() .add_edge(agent_a, agent_b) .build() ) except TypeCompatibilityError as e: print(f"Type error: {e}") print(f"Source executor: {e.source_executor_id}") print(f"Target executor: {e.target_executor_id}") print(f"Validation type: {e.validation_type}") # ValidationTypeEnum.TYPE_COMPATIBILITY except WorkflowValidationError as e: print(f"Validation failed [{e.validation_type.value}]: {e.message}")
asyncio.run(main())Example 2 — Matching validation type in error handling
Section titled “Example 2 — Matching validation type in error handling”from agent_framework import WorkflowValidationErrorfrom agent_framework._workflows._validation import ValidationTypeEnum
def handle_build_error(exc: Exception) -> None: if not isinstance(exc, WorkflowValidationError): raise exc
match exc.validation_type: case ValidationTypeEnum.EDGE_DUPLICATION: print("Fix: remove the duplicate edge before building") case ValidationTypeEnum.EXECUTOR_DUPLICATION: print("Fix: each agent/executor must appear only once") case ValidationTypeEnum.TYPE_COMPATIBILITY: print("Fix: check output type of source matches input type of target") case ValidationTypeEnum.GRAPH_CONNECTIVITY: print("Fix: ensure all nodes are reachable from the start node") case ValidationTypeEnum.HANDLER_OUTPUT_ANNOTATION: print("Fix: add @handler return type annotation") case ValidationTypeEnum.OUTPUT_VALIDATION: print("Fix: verify output_from references a reachable executor") case _: print(f"Unknown validation type: {exc.validation_type}")Example 3 — Custom validation reporting
Section titled “Example 3 — Custom validation reporting”from dataclasses import dataclassfrom agent_framework import WorkflowValidationErrorfrom agent_framework._workflows._validation import ValidationTypeEnum, TypeCompatibilityError, EdgeDuplicationError
@dataclassclass ValidationReport: errors: list[WorkflowValidationError]
def summary(self) -> str: lines = [f"Workflow validation failed with {len(self.errors)} error(s):"] for err in self.errors: lines.append(f" [{err.validation_type.value}] {err.message}") return "\n".join(lines)
# Collect errors from a build attempterrors = []try: from agent_framework import WorkflowBuilder wf = WorkflowBuilder().build() # empty workflow — likely fails connectivity checkexcept WorkflowValidationError as e: errors.append(e)
if errors: report = ValidationReport(errors=errors) print(report.summary())9 · A2AAgent + A2AAgentSession + A2AExecutor
Section titled “9 · A2AAgent + A2AAgentSession + A2AExecutor”Module: agent_framework.a2a
The A2A (Agent-to-Agent) protocol integration lets the framework call remote agents over
HTTP/JSON-RPC (A2AAgent) and expose local agents as A2A servers (A2AExecutor).
A2AAgentSession extends AgentSession with A2A-specific conversation state.
Class signatures
Section titled “Class signatures”class A2AAgent(AgentTelemetryLayer, BaseAgent): AGENT_PROVIDER_NAME: Final[str] = "A2A"
def __init__( self, *, name: str | None = None, id: str | None = None, description: str | None = None, agent_card: AgentCard | None = None, # from a2a.types url: str | None = None, # alternative to agent_card client: Client | None = None, # pre-built A2A client http_client: httpx.AsyncClient | None = None, auth_interceptor: AuthInterceptor | None = None, timeout: float | httpx.Timeout | None = None, supported_protocol_bindings: list[str] | None = None, # default: ["JSONRPC"] **kwargs: Any, ) -> None: ... # Raises ValueError if neither agent_card nor url is provided and client is None
class A2AAgentSession(AgentSession): _CONTEXT_ID_KEY = "a2a_context_id" _TASK_ID_KEY = "a2a_task_id" _TASK_STATE_KEY = "a2a_task_state"
def __init__( self, *, context_id: str | None = None, # A2A conversation context ID task_id: str | None = None, # most recent A2A task ID task_state: TaskState | None = None, # from a2a.types (protobuf enum) ) -> None: ...
def to_dict(self) -> dict[str, Any]: ... @classmethod def from_dict(cls, data: dict[str, Any]) -> A2AAgentSession: ...
class A2AExecutor(AgentExecutor): def __init__( self, agent: SupportsAgentRun, stream: bool = False, run_kwargs: Mapping[str, Any] | None = None, ) -> None: ...Key points
Section titled “Key points”A2AAgent:
- Accepts exactly one of:
url(creates a minimalAgentCardinternally),agent_card(full card for transport negotiation), orclient(pre-builta2a.client.Client). - Creates two clients internally: a streaming client (SSE transport, for
stream=True) and a non-streaming client (single request/response, forstream=False). auth_interceptoris applied to both clients. Use it for API key, OAuth Bearer, or custom auth schemes.timeoutmay be afloat(applied to all four httpx timeout components) or anhttpx.Timeoutobject. Default is10sconnect,60sread,10swrite,5spool.supported_protocol_bindingsdefaults to["JSONRPC"]. The A2A spec treats binding identifiers as open strings — you can add custom values for non-standard transports.
A2AAgentSession:
- Extends
AgentSessionwith three extra state fields persisted via private class-level key constants. task_stateis a protobufTaskStateenum. Distinguishinginput-requiredfromcompletedstates lets callers decide whether to send a follow-up or start a fresh task.to_dict()/from_dict()are fully round-trip safe — the A2A state fields are added to (and restored from) the baseAgentSession.to_dict()output.
A2AExecutor:
- Bridges a local
SupportsAgentRunagent with the A2A server stack from thea2a-sdk. run_kwargsare forwarded verbatim toagent.run(...)— use them forclient_kwargs(e.g.{"max_tokens": 500}) orfunction_invocation_kwargs.stream=Trueswitches the agent to streaming mode; the executor wraps streaming updates as A2A SSE events.
Example 1 — Calling a remote A2A agent by URL
Section titled “Example 1 — Calling a remote A2A agent by URL”import asynciofrom agent_framework.a2a import A2AAgent
async def main(): # Connect to any A2A-compliant agent by URL remote = A2AAgent( name="Remote Analyst", url="http://analytics-service:9000/", timeout=30.0, )
response = await remote.run("Analyse Q1 sales data and summarise key trends.") print(response.text)
# Streaming stream = remote.run("Describe the top 5 products.", stream=True) async for update in stream: print(update.content, end="", flush=True) final = await stream.get_final_response() print(f"\nTokens used: {final.usage}")
asyncio.run(main())Example 2 — Multi-turn conversation with A2AAgentSession
Section titled “Example 2 — Multi-turn conversation with A2AAgentSession”import asynciofrom agent_framework.a2a import A2AAgent, A2AAgentSession
async def main(): agent = A2AAgent(url="http://remote-agent:9000/")
# Start a new session — context_id will be populated after the first response session = A2AAgentSession()
response1 = await agent.run("What is the weather in London?", session=session) print(response1.text) print(f"Context ID: {session.context_id}") # populated by the agent print(f"Task ID: {session.task_id}")
# Continue the same conversation using the session response2 = await agent.run("And in Paris?", session=session) print(response2.text)
# Persist and restore session stored = session.to_dict() # ... save stored somewhere ... restored = A2AAgentSession.from_dict(stored) print(restored.context_id) # same as session.context_id
asyncio.run(main())Example 3 — Exposing a local agent as an A2A server
Section titled “Example 3 — Exposing a local agent as an A2A server”import asynciofrom a2a.server.request_handlers import DefaultRequestHandlerfrom a2a.server.routes import create_jsonrpc_routes, create_agent_card_routesfrom a2a.server.tasks import InMemoryTaskStorefrom a2a.types import AgentCapabilities, AgentCard, AgentInterfacefrom starlette.applications import Starlettefrom agent_framework import Agentfrom agent_framework.openai import OpenAIChatClientfrom agent_framework.a2a import A2AExecutor
async def main(): # Local agent to expose local_agent = Agent( client=OpenAIChatClient(), instructions="You are a helpful assistant that answers questions concisely.", )
agent_card = AgentCard( name="Helpful Assistant", description="Answers questions via the A2A protocol.", version="1.0.0", default_input_modes=["text"], default_output_modes=["text"], capabilities=AgentCapabilities(streaming=True), supported_interfaces=[ AgentInterface(url="http://0.0.0.0:9000/", protocol_binding="JSONRPC"), ], skills=[], )
executor = A2AExecutor( agent=local_agent, stream=True, run_kwargs={"client_kwargs": {"max_tokens": 2000}}, )
request_handler = DefaultRequestHandler( agent_executor=executor, task_store=InMemoryTaskStore(), agent_card=agent_card, )
app = Starlette( routes=[ *create_agent_card_routes(agent_card), *create_jsonrpc_routes(request_handler, "/"), ], )
# Run with: uvicorn script:app --port 9000 print("A2A server ready. Run: uvicorn <module>:app --port 9000")
asyncio.run(main())Example 4 — Authenticating a remote agent call
Section titled “Example 4 — Authenticating a remote agent call”import asyncioimport httpxfrom agent_framework.a2a import A2AAgent
class BearerTokenInterceptor: """Simple Bearer token auth interceptor."""
def __init__(self, token: str): self._token = token
async def __call__(self, request: httpx.Request) -> httpx.Request: request.headers["Authorization"] = f"Bearer {self._token}" return request
async def main(): interceptor = BearerTokenInterceptor(token="my-api-key-12345") agent = A2AAgent( url="https://secure-agent.example.com/", auth_interceptor=interceptor, timeout=httpx.Timeout(connect=5.0, read=60.0, write=10.0, pool=5.0), )
response = await agent.run("What is 2 + 2?") print(response.text)
asyncio.run(main())10 · WorkflowRunnerException + WorkflowCheckpointException + MiddlewareException
Section titled “10 · WorkflowRunnerException + WorkflowCheckpointException + MiddlewareException”Module: agent_framework.exceptions
These three exception classes fill specific positions in the framework’s exception hierarchy. The main exception tree (covered in Vol. 5) provides the base classes; this volume covers the three leaf-level classes that production code is most likely to encounter in specialised error paths.
Complete exception hierarchy
Section titled “Complete exception hierarchy”AgentFrameworkException├── MiddlewareException ← raised during middleware pipeline execution├── UserInputRequiredException ← HITL: agent paused waiting for user input└── WorkflowException ├── WorkflowValidationError ← graph validation (see § 8) ├── WorkflowConvergenceException ← workflow did not converge in max iterations ├── WorkflowCheckpointException ← NOT covered yet ← checkpoint read/write failure └── WorkflowRunnerException ← NOT covered yet ← runner-level execution error └── WorkflowCheckpointException ← ALSO a WorkflowRunnerException subtypeClass signatures
Section titled “Class signatures”class MiddlewareException(AgentFrameworkException): """Raised when an error occurs during middleware pipeline execution.""" # No additional attributes beyond AgentFrameworkException. # constructor: (message, inner_exception=None, log_level=logging.DEBUG)
class WorkflowRunnerException(WorkflowException): """Base exception for workflow runner errors.""" # No additional attributes. # constructor: (message, inner_exception=None, log_level=logging.DEBUG)
class WorkflowCheckpointException(WorkflowRunnerException): """Raised for errors related to workflow checkpoints.""" # Subclass of WorkflowRunnerException — catch either for checkpoint failures. # constructor: (message, inner_exception=None, log_level=logging.DEBUG)Key points
Section titled “Key points”MiddlewareException:
- Raised when an exception propagates out of a middleware’s
on_invoke()(agent middleware) or corresponding chat/function middleware hook, and the framework wraps it inMiddlewareException. - Distinct from
MiddlewareTermination(a sentinel, not an exception) —MiddlewareTerminationis raised intentionally to short-circuit the pipeline;MiddlewareExceptionsignals an unexpected error. - Catch it to add logging or retry logic at the agent-call boundary without catching broader
AgentFrameworkExceptiontypes.
WorkflowRunnerException:
- The umbrella for errors that occur in the
Runner(the Pregel superstep engine) rather than in the workflow graph validator. - Typical causes: message delivery failure, executor crash during a superstep, or internal runner-state corruption.
- Catching
WorkflowRunnerExceptionalso catchesWorkflowCheckpointExceptionsince the latter subclasses it.
WorkflowCheckpointException:
- Raised when checkpoint I/O fails — a write cannot complete (disk full, permission denied), a read fails (storage unavailable), or a checkpoint with an incompatible topology hash is applied.
- Inherits from both
WorkflowRunnerExceptionandWorkflowException(via the chain), so it is caught by handlers for either parent class. - The framework raises this from
RunnerContext.create_checkpoint(),load_checkpoint(), andapply_checkpoint(). Ifhas_checkpointing()returnsFalse, accessing these methods without storage configured raisesWorkflowCheckpointException.
Example 1 — Catching checkpoint failures during a workflow run
Section titled “Example 1 — Catching checkpoint failures during a workflow run”import asynciofrom agent_framework import Workflow, WorkflowBuilder, Agentfrom agent_framework.exceptions import WorkflowCheckpointException, WorkflowRunnerExceptionfrom agent_framework.openai import OpenAIChatClientfrom agent_framework import FileCheckpointStorage
async def run_workflow_with_checkpointing(workflow: Workflow, prompt: str): try: result = await workflow.run( prompt, checkpoint_storage=FileCheckpointStorage("/tmp/checkpoints"), ) return result except WorkflowCheckpointException as e: # Checkpoint-specific failure — log and continue without checkpointing print(f"Checkpoint failed: {e}. Running without persistence.") return await workflow.run(prompt) except WorkflowRunnerException as e: # Broader runner failure — re-raise print(f"Runner error: {e}") raise
async def main(): agent = Agent(client=OpenAIChatClient(), instructions="Help the user.") workflow = WorkflowBuilder().add_chain([agent]).build() result = await run_workflow_with_checkpointing(workflow, "Hello!") print(result.output)
asyncio.run(main())Example 2 — Middleware error isolation
Section titled “Example 2 — Middleware error isolation”import asynciofrom agent_framework import Agent, AgentMiddleware, AgentContextfrom agent_framework.exceptions import MiddlewareExceptionfrom agent_framework.openai import OpenAIChatClient
class LoggingMiddleware(AgentMiddleware): async def on_invoke(self, context: AgentContext, next_func): print(f"[LOG] Invoking agent: {context.agent_id}") try: return await next_func(context) except MiddlewareException as e: print(f"[LOG] Middleware error: {e}") raise except Exception as e: # Wrap unexpected errors so they can be distinguished raise MiddlewareException( f"Unexpected error in {type(self).__name__}: {e}", inner_exception=e, ) from e
async def main(): agent = Agent( client=OpenAIChatClient(), instructions="You are a helpful assistant.", middleware=[LoggingMiddleware()], ) response = await agent.run("What is the capital of France?") print(response.text)
asyncio.run(main())Example 3 — Distinguishing exception types in a workflow supervisor
Section titled “Example 3 — Distinguishing exception types in a workflow supervisor”import asynciofrom agent_framework.exceptions import ( AgentFrameworkException, MiddlewareException, WorkflowCheckpointException, WorkflowRunnerException, WorkflowValidationError, WorkflowConvergenceException, UserInputRequiredException,)
async def supervised_run(coro): """Run a workflow coroutine and classify any framework exceptions.""" try: return await coro except UserInputRequiredException as e: print(f"[HITL] Paused — awaiting user input: {e}") except WorkflowCheckpointException as e: print(f"[CHECKPOINT] Storage failure: {e}. Retry without checkpointing.") except WorkflowConvergenceException as e: print(f"[CONVERGENCE] Workflow did not converge: {e}. Increase max_iterations.") except WorkflowValidationError as e: print(f"[VALIDATION] Build-time error [{e.validation_type.value}]: {e.message}") except WorkflowRunnerException as e: print(f"[RUNNER] Execution error: {e}") except MiddlewareException as e: print(f"[MIDDLEWARE] Pipeline error: {e}") except AgentFrameworkException as e: print(f"[FRAMEWORK] Unclassified error: {e}") return NoneCross-reference table
Section titled “Cross-reference table”Classes NOT covered in this volume but closely related:
| Class | Covered in |
|---|---|
InlineSkill | Vol. 1 |
ClassSkill + SkillFrontmatter | Vol. 6 |
FileSkillsSource + SkillsProvider | Vol. 6 (also Vol. 9) |
MCPSkill + MCPSkillsSource | Vol. 8 |
Case + Default (runtime) | Vol. 11 |
SwitchCaseEdgeGroup (builder) | Vol. 5 |
WorkflowGraphValidator | Vol. 11 |
AgentExecutor + AgentExecutorRequest/Response | Vol. 5 |
InProcRunnerContext | Vol. 10 |
AgentSession + ContextProvider | Vol. 5 |
BaseAgent | Vol. 10 |
| Full exception hierarchy | Vol. 5 |
MiddlewareTermination (sentinel) | Vol. 5 |