Skip to content

Workflows (graph orchestration)

Verified against google-adk==2.0.0b1 (google/adk/workflow/).

Workflow is the graph-based orchestrator that replaces SequentialAgent, ParallelAgent, and LoopAgent in ADK 2.x. It is a BaseNode (not a BaseAgent) — wire it to a Runner via App(root_agent=workflow).

google.adk.workflow exports (workflow/__init__.py):

NamePurpose
WorkflowGraph orchestrator. Takes edges=[...] and runs the DAG.
BaseNodePydantic base for every node.
NodeSubclass-friendly base — implement run_node_impl.
nodeDecorator / function to wrap a callable, agent, or tool as a BaseNode.
FunctionNodeWhat @node produces for a function.
JoinNodeFan-in — waits for all predecessors before emitting.
EdgeExplicit from_node → to_node, optional route.
RetryConfigPer-node retry policy.
NodeTimeoutErrorRaised when a node exceeds its timeout.
STARTSentinel for the graph entry point.
DEFAULT_ROUTEMatches routes with no explicit mapping.
import asyncio
from google.adk.agents import LlmAgent
from google.adk.apps import App
from google.adk.runners import InMemoryRunner
from google.adk.workflow import Workflow, node, START
drafter = LlmAgent(
name="drafter",
model="gemini-2.5-flash",
instruction="Write a tight 3-sentence summary of the input.",
mode="single_turn",
)
polisher = LlmAgent(
name="polisher",
model="gemini-2.5-flash",
instruction="Shorten and sharpen the input. Return only the final text.",
mode="single_turn",
)
@node
def trim(node_input: str) -> str:
return node_input.strip()
pipeline = Workflow(
name="summarize_pipeline",
edges=[(START, trim, drafter, polisher)],
)
async def main():
app = App(name="demo", root_agent=pipeline)
runner = InMemoryRunner(app=app)
await runner.session_service.create_session(
app_name="demo", user_id="u1", session_id="s1"
)
events = await runner.run_debug(
"Electric cars sold well in Q1.", user_id="u1", session_id="s1"
)
asyncio.run(main())

A tuple in edges= is a chain — each adjacent pair becomes an Edge. START is the sentinel that receives new_message on invocation.

Four ways to declare edges, all verified in workflow/_workflow_graph.py:

edges = [(START, a, b, c)] # START→a, a→b, b→c
edges = [(START, (a, b, c), join)] # START fans out to a/b/c, all feed join
edges = [
(START, classifier, {"billing": billing_agent, "support": support_agent}),
]

Edges carry a route value. The source node picks one by setting ctx.route = "billing" (in a FunctionNode) or by the router’s output matching a key. Use DEFAULT_ROUTE for the fallback.

from google.adk.workflow import Edge
edges = [Edge(from_node=a, to_node=b, route="yes")]

Mix them freely. BaseAgent, BaseTool, and plain callables are auto-wrapped via build_node() when they appear in an edge.

from google.adk.workflow import node, RetryConfig
@node
async def fetch(node_input: str, ctx) -> dict:
ctx.state["last_query"] = node_input
return {"query": node_input, "results": [...]}
@node(
name="safe_fetch",
retry_config=RetryConfig(max_attempts=3),
timeout=10.0,
rerun_on_resume=True,
)
async def safe_fetch(node_input: str, ctx): ...

Signatures recognised by FunctionNode:

  • node_input — the incoming value (the predecessor’s output, or the user’s new_message for START successors).
  • ctx — the Context (state, run_node, route, interrupt, artifact helpers).
  • Any other parameter must be declared in the enclosing Workflow.state_schema — the framework injects its current value.

Return types are honoured: returning a value sets ctx.output; yielding values from an async generator lets you stream partials.

Use when you need class-level state or parallel_worker=True:

from google.adk.workflow import Node
from collections.abc import AsyncGenerator
from typing import Any
class DedupeNode(Node):
name: str = "dedupe"
seen: set[str] = set()
async def run_node_impl(self, *, ctx, node_input: list[str]) -> AsyncGenerator[Any, None]:
fresh = [x for x in node_input if x not in self.seen]
self.seen.update(fresh)
yield fresh

Setting parallel_worker=True lets the node be invoked concurrently per trigger without sharing state with other invocations.

FieldTypeDefaultNotes
namestrrequiredMust be a Python identifier
edgeslist[EdgeItem][]Tuples or Edge objects
max_concurrencyint | NoneNoneCaps concurrent graph-scheduled nodes
state_schematype[BaseModel]NoneValidates ctx.state mutations
rerun_on_resumeboolTrueWorkflow-level resume behaviour
input_schema / output_schemaSchemaTypeNoneValidates workflow-level in/out

All fields from BaseNode (retry_config, timeout, wait_for_output, …) apply too.

A node can steer the graph by setting ctx.route:

@node
async def classify(node_input: str, ctx):
intent = "billing" if "invoice" in node_input.lower() else "support"
ctx.route = intent
return node_input
workflow = Workflow(
name="triage",
edges=[
(START, classify, {
"billing": billing_agent,
"support": support_agent,
DEFAULT_ROUTE: fallback_agent,
}),
],
)

Replace LoopAgent with a routing map that either loops back to the same node or flows to a terminal node (any node with no outgoing edges). There is no END_NODE sentinel — terminality is structural.

from google.adk.workflow import Workflow, node, START
@node(rerun_on_resume=True)
async def critic(draft: str, ctx) -> str:
if len(draft) < 500:
ctx.route = "done"
return draft
ctx.route = "continue"
return draft[:500] # trimmed draft fed back in
@node
def publish(draft: str) -> str:
return draft # terminal: no outgoing edge
loop = Workflow(
name="refine",
edges=[
(START, critic, {"continue": critic, "done": publish}),
],
)

publish is the terminal node — the workflow finishes when routing lands there. Persist iteration count in ctx.state if you need max_iterations semantics, and set max_concurrency=1 to keep the loop single-threaded.

Fan-in that waits for all declared predecessors. Useful after a (START, (a, b, c), join) fan-out.

from google.adk.agents import LlmAgent
from google.adk.workflow import JoinNode, Workflow, node, START
a = LlmAgent(name="a", model="gemini-2.5-flash", instruction="Reply 'A'.", mode="single_turn")
b = LlmAgent(name="b", model="gemini-2.5-flash", instruction="Reply 'B'.", mode="single_turn")
join = JoinNode(name="merge")
@node
def finalize(node_input: dict) -> str:
# node_input is a dict keyed by predecessor name
return f"A={node_input['a']} B={node_input['b']}"
wf = Workflow(
name="fanin",
edges=[(START, (a, b), join, finalize)],
)

JoinNode receives a dict of {predecessor_name: output} as its node_input.

Call another node from inside a node — the result is awaited in-place. The caller must have rerun_on_resume=True or run_node raises ValueError (agents/context.py:399-405).

@node(rerun_on_resume=True)
async def supervisor(q: str, ctx):
research = await ctx.run_node(research_agent, q)
answer = await ctx.run_node(writer_agent, research)
return answer

Yield a RequestInput from a node to pause the workflow until Runner.run_async is called again with a new_message carrying a matching function-response. Pair with auth_config= on the @node to gate on an OAuth flow.

from google.adk.events.request_input import RequestInput
@node(rerun_on_resume=True)
async def approve(draft: str, ctx):
decision = yield RequestInput(
id="approval",
hint="Approve the draft? yes/no",
)
if decision == "yes":
return draft
ctx.route = "rewrite"

Combine with ResumabilityConfig(is_resumable=True) on the App so state is persisted across the pause.

from google.adk.workflow import node, RetryConfig
@node(retry_config=RetryConfig(max_attempts=3, backoff_base=2.0), timeout=30.0)
async def flaky(q: str, ctx): ...

A node that exceeds timeout is cancelled and raises NodeTimeoutError. If retries are configured, the node is restarted.

edges=[(START, step1, step2, step3)] — direct replacement for SequentialAgent.

edges=[(START, split, (worker1, worker2, worker3), join, summarize)] — fan-out with JoinNode.

classify node sets ctx.route to a string; routing map fans to the matching agent; optional DEFAULT_ROUTE catches unknowns.

Wrap the scraper with retry_config=RetryConfig(max_attempts=5) and timeout=20. Log retries via LoggingPlugin on the App.

Insert a @node(rerun_on_resume=True, auth_config=...) that yields RequestInput between producer and publisher. The workflow pauses, the event is persisted, Runner.run_async resumes on the next user turn.

  • Nodes are Pydantic models — if you subclass Node, annotate fields or they won’t serialise.
  • Workflow is not a BaseAgent. Runner(agent=workflow) fails. Use App(root_agent=workflow) and Runner(app=app, session_service=...).
  • ctx.run_node(callable) requires rerun_on_resume=True on the calling node.
  • wait_for_output=True means a node must yield output/route before it’s marked complete. Forget that and the workflow deadlocks.
  • When a tuple contains only one element (e.g. (START, single_node)), you still get a single edge — not sugar for fan-out. Fan-out needs a nested tuple: (START, (a, b)).
  • Setting nodes= explicitly on Workflow raises — nodes are inferred from edges.