Experimental.FunctionalWorkflow, @workflow, @step, and RunContext are marked ExperimentalFeature in agent-framework-core==1.5.0. The API is stable enough to build on but may change between minor releases.
Functional workflows let you write a workflow as a plain async Python function — no executor classes, no graph wiring, no edge objects. Control flow is ordinary Python: if/else, for, asyncio.gather. The framework tracks step results, emits events, handles HITL pauses, and persists checkpoints automatically.
Verified against agent-framework-core==1.5.0 (agent_framework._workflows._functional).
Declare a RunContext parameter (by type annotation or by the name ctx) anywhere in the @workflow function or inside @step functions to access workflow-only features:
from agent_framework import RunContext, workflow
Method
Purpose
await ctx.request_info(data, response_type)
Pause the workflow and ask for external input (human-in-the-loop)
await ctx.request_info(data, response_type) suspends the workflow on first call — the framework never exposes the internal WorkflowInterrupted signal to your code.
WorkflowRunResult.get_request_info_events() returns a list of pending WorkflowEvent objects; each has a request_id and data.
Resume by calling run(responses={request_id: value}) — the same @workflow function re-executes and request_info returns value directly on the second pass.
Pass request_id= explicitly to request_info(...) when you want a stable ID (e.g. one tied to a database row) rather than a generated UUID.
State survives checkpoints — get_state / set_state values are persisted when a checkpoint is taken.
Reserved key prefix. Keys that start with _ are reserved for framework bookkeeping (e.g. _step_cache, _original_message). Passing one to set_state raises ValueError immediately:
ctx.set_state("_my_key", value) # ValueError: State key '_my_key' starts with '_' ...
ctx.set_state("my_key", value) # ✓ correct
Values stored under user keys must be JSON-serialisable when checkpoint storage is configured.
get_run_context() — accessing RunContext from nested helpers
When a utility function deep in the call stack needs to emit events or read state, pass ctx explicitly or call get_run_context(). The RunContext is stored in a ContextVar for the duration of the @workflow call, so get_run_context() retrieves it from any depth without threading the parameter through every signature:
import asyncio
from agent_framework import (
Agent,
WorkflowEvent,
get_run_context,
step,
workflow,
)
from agent_framework.openai import OpenAIChatClient
client =OpenAIChatClient()
extractor =Agent(client=client,name="extractor",instructions="Extract five key facts as a numbered list.")
asyncdef_emit_progress(message: str) -> None:
"""Utility helper — no RunContext parameter needed at the call site."""
await_emit_progress(f"Extracting facts about '{topic}' …")
result =await extractor.run(f"Give me five facts about: {topic}")
await_emit_progress("Extraction complete.")
return[line for line in result.text.split("\n") if line.strip()]
@workflow
asyncdefanalysis_pipeline(topic: str) -> str:
facts =awaitextract_facts(topic)
return"\n".join(f"• {f}"for f in facts)
result = asyncio.run(analysis_pipeline.run("quantum entanglement"))
print(result.get_outputs()[-1])
get_run_context() returns None when called outside a running workflow, which makes helpers reusable from both workflow and non-workflow callsites. The guard if ctx is not None is the idiomatic pattern.
Thread safety note.ContextVar propagation follows Python’s standard rules — the value is inherited by tasks created with asyncio.create_task() and asyncio.gather(), and is also propagated to threads spawned with asyncio.to_thread() (Python 3.9+). However, it is not inherited by raw threading.Thread instances. If you use custom threading, pass ctx explicitly rather than relying on get_run_context().
angles =await asyncio.gather(*[get_angle(topic, p)for p in perspectives])
return"\n".join(angles)
result = asyncio.run(multi_angle_research.run("AI Act compliance"))
print(result.get_outputs()[-1])
@step result caching is keyed by (step_name, invocation_index) — each call to get_angle in the gather gets its own cache slot, so resume-from-checkpoint correctly re-populates only the steps that didn’t complete.
result = asyncio.run(long_pipeline.run(checkpoint_id=latest.checkpoint_id))
print(result.get_outputs()[-1])
Pass checkpoint_storage= either in @workflow(checkpoint_storage=...) (per-workflow default) or as a run() override — the run() argument takes precedence.
Restoring a checkpoint with a HITL response in one call
Combine checkpoint_id= and responses= to restore a saved checkpoint and inject a human response in the same call. This is the standard pattern for workflows that were suspended at request_info, checkpointed to durable storage, and later resumed from a web hook or queue consumer in a different process:
import asyncio
from agent_framework import FileCheckpointStorage, RunContext, step, workflow
responses={"review": decision},# supply the human decision
)
print(result.get_outputs()[-1])
# Process 1
cp_id = asyncio.run(run_first_pass())
# Process 2 — completely separate; shares only the checkpoint_id
asyncio.run(run_resume(cp_id,decision="approve"))
message is mutually exclusive with checkpoint_id. Combine responses= with eithermessage (fresh run with pre-loaded answers) or checkpoint_id (restore + respond), but never both message and checkpoint_id together.
Pass include_status_events=True to include framework-generated type='status' events alongside your custom events in result.events. Status events mark lifecycle transitions such as step_started and step_completed for every @step function. They are excluded by default to keep the event list clean.
result =await my_workflow.run("input",include_status_events=True)
status_events =[e for e in result.events if e.type =="status"]
for e in status_events:
print(e.data) # e.g. {"phase": "step_started", "step": "research", "index": 0}
include_status_events: bool=False,# include step_started / step_completed events
**kwargs,# extra kwargs stored in RunContext._run_kwargs
)
message and checkpoint_id are mutually exclusive. responses may be combined with either. At least one of message, responses, or checkpoint_id must be supplied.
result = asyncio.run(pipeline.run("... article text ..."))
print(result.get_outputs()[-1])
as_agent() returns a FunctionalWorkflowAgent — it has the same run() signature as Agent and plugs into WorkflowBuilder, SequentialBuilder, and the as_tool() helper:
# Wrap the functional workflow agent as a tool for a supervisor agent
tool = fact_checker_agent.as_tool(
description="Check factual claims in an article and return credibility scores.",
)
supervisor =Agent(client=client,instructions="You coordinate research and fact-checking.",tools=[tool])
Research pipeline with reviewer. Use @step for the main stages (research, draft, review) and request_info in the final step so a human can approve before the document is published.
Dynamic fan-out. Use asyncio.gather over a list built at runtime — the step cache handles variable-length parallelism gracefully, where a static graph with hard-coded fan-out wouldn’t.
Multi-stage HITL. Call request_info multiple times in the same workflow function; each call suspends independently. Respond to all pending events in a single run(responses={...}) call to resume them together.
Cost guardrail. Track spending in ctx.set_state("spend", ...) across steps and call request_info when a budget is exceeded — the workflow pauses until a manager approves continuation.
Incremental streaming UI.ctx.add_event(WorkflowEvent(type="progress", data=...)) inside a step gives a streaming caller real-time progress updates without polling.
Testing without a model. Since @step functions are callable as regular async functions, mock the underlying agents and call steps directly in pytest — the full @workflow integration test runs against the real model.