Class deep dives — volume 22 (credential exchange pipeline, workflow errors, replay sequencing, conformance testing, tool-connection analysis & GoogleSearchAgentTool)
Source-verified against google-adk==2.3.0 (installed from PyPI, June 2026). Every field name, signature, constant, and code example is drawn from the installed package source at /usr/local/lib/python3.11/dist-packages/google/adk/.
| # | Class / group | Module | Status |
|---|---|---|---|
| 1 | GoogleSearchAgentTool + create_google_search_agent | google.adk.tools.google_search_agent_tool | Stable (internal workaround) |
| 2 | BaseCredentialExchanger + ExchangeResult + CredentialExchangeError | google.adk.auth.exchanger.base_credential_exchanger | @experimental |
| 3 | CredentialExchangerRegistry | google.adk.auth.exchanger.credential_exchanger_registry | @experimental |
| 4 | OAuth2CredentialExchanger | google.adk.auth.exchanger.oauth2_credential_exchanger | @experimental |
| 5 | SessionStateCredentialService | google.adk.auth.credential_service.session_state_credential_service | @experimental |
| 6 | NodeInterruptedError + NodeTimeoutError + DynamicNodeFailError | google.adk.workflow._errors | Stable (internal) |
| 7 | ReplaySequenceBarrier | google.adk.workflow.utils._replay_sequence_barrier | Stable (internal) |
| 8 | TestSpec + UserMessage + TestCase | google.adk.cli.conformance.test_case | Stable |
| 9 | AdkWebServerClient | google.adk.cli.conformance.adk_web_server_client | Stable |
| 10 | ToolConnectionAnalyzer + ToolConnectionMap + StatefulParameter | google.adk.tools.environment_simulation.tool_connection_analyzer + .tool_connection_map | @experimental(ENVIRONMENT_SIMULATION) |
1 · GoogleSearchAgentTool + create_google_search_agent — google_search workaround for multi-tool agents
Section titled “1 · GoogleSearchAgentTool + create_google_search_agent — google_search workaround for multi-tool agents”Source: google/adk/tools/google_search_agent_tool.py
GoogleSearchAgentTool is a concrete AgentTool subclass that solves a specific Gemini API constraint: the google_search built-in tool cannot be combined with other tools in a single LlmAgent tool list because Gemini rejects requests that mix native built-in tools with custom function calls. The workaround wraps the search capability in a dedicated sub-agent that the parent agent delegates to via AgentTool, sidestepping the restriction entirely.
create_google_search_agent(model) is the companion factory function that builds the correctly-named and instructed sub-agent. Both the function and the class are intended to be used together: call the factory, then pass the returned LlmAgent to GoogleSearchAgentTool.__init__. GoogleSearchAgentTool calls super().__init__(agent=self.agent, propagate_grounding_metadata=True) — the propagate_grounding_metadata=True flag ensures that Gemini’s grounding metadata (search attribution, rendered content, web sources) bubbles up from the sub-agent’s response into the parent agent’s event stream.
The class carries a source-level TODO(b/448114567) marker noting it should be removed once the API restriction is lifted.
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
| Fixed sub-agent name | create_google_search_agent always creates an agent named 'google_search_agent' — do not rename it or sub-agent routing will break. |
| Grounding metadata propagation | propagate_grounding_metadata=True is hardcoded in GoogleSearchAgentTool.__init__, ensuring search attribution reaches the parent event stream. |
| Model delegation | The model for the sub-agent is supplied by the caller; the parent can use a different model (e.g., cheaper model for search, more expensive for reasoning). |
| Parallel search requests | Because this is an AgentTool, the parent agent can call it in parallel with other tools — multiple search intents in one LLM turn are each dispatched to a fresh sub-invocation of the search agent. |
| Tool list constraint | The parent LlmAgent must not also include google_search directly — only GoogleSearchAgentTool should appear. Including both causes the original API error. |
Example 1 — combining google_search with a custom function tool
Section titled “Example 1 — combining google_search with a custom function tool”import asynciofrom google.adk.agents import LlmAgentfrom google.adk.runners import InMemoryRunnerfrom google.adk.tools.google_search_agent_tool import ( GoogleSearchAgentTool, create_google_search_agent,)from google.genai import types
# 1. Build the dedicated search sub-agent.search_agent = create_google_search_agent(model="gemini-2.5-flash")search_tool = GoogleSearchAgentTool(agent=search_agent)
# 2. Define a custom tool that cannot share a tool list with google_search.def get_weather(city: str) -> dict: """Return current weather for a city.""" return {"city": city, "temp_c": 22, "condition": "sunny"}
# 3. Build the parent agent — search tool and custom tool coexist here.agent = LlmAgent( name="research_assistant", model="gemini-2.5-flash", instruction=( "You can search the web via the google_search_agent tool and also " "check weather. Combine both when the user asks about travel." ), tools=[search_tool, get_weather],)
async def main(): runner = InMemoryRunner(agent=agent, app_name="multi_tool_demo") session = await runner.session_service.create_session( app_name="multi_tool_demo", user_id="u1" ) # new_message must be types.Content — use UserContent for convenience. async for event in runner.run_async( user_id="u1", session_id=session.id, new_message=types.UserContent("What's the weather in Paris and what are the top tourist spots there?"), ): if event.is_final_response(): print(event.content.parts[0].text)
asyncio.run(main())Example 2 — using a more capable model for search delegation
Section titled “Example 2 — using a more capable model for search delegation”import asynciofrom google.adk.agents import LlmAgentfrom google.adk.runners import InMemoryRunnerfrom google.adk.tools.google_search_agent_tool import ( GoogleSearchAgentTool, create_google_search_agent,)from google.genai import types
# Use a lightweight model for the search sub-agent to minimise cost.# The parent uses a more capable model for synthesis.search_agent = create_google_search_agent(model="gemini-2.0-flash")search_tool = GoogleSearchAgentTool(agent=search_agent)
def save_report(title: str, content: str) -> dict: """Persist a research report.""" return {"saved": True, "title": title, "length": len(content)}
parent = LlmAgent( name="research_writer", model="gemini-2.5-pro", instruction="Research topics via google_search_agent and then save reports.", tools=[search_tool, save_report],)
async def main(): runner = InMemoryRunner(agent=parent, app_name="research_writer") session = await runner.session_service.create_session( app_name="research_writer", user_id="u1" ) async for event in runner.run_async( user_id="u1", session_id=session.id, new_message=types.UserContent("Research the history of the internet and save a report titled 'Internet History'"), ): if event.is_final_response(): print(event.content.parts[0].text)
asyncio.run(main())Example 3 — inspecting grounding metadata from a search sub-agent
Section titled “Example 3 — inspecting grounding metadata from a search sub-agent”import asynciofrom google.adk.agents import LlmAgentfrom google.adk.runners import InMemoryRunnerfrom google.adk.tools.google_search_agent_tool import ( GoogleSearchAgentTool, create_google_search_agent,)from google.genai import types
search_tool = GoogleSearchAgentTool(agent=create_google_search_agent("gemini-2.5-flash"))
agent = LlmAgent( name="grounding_demo", model="gemini-2.5-flash", instruction="Answer questions by searching the web.", tools=[search_tool],)
async def main(): runner = InMemoryRunner(agent=agent, app_name="grounding_demo") session = await runner.session_service.create_session( app_name="grounding_demo", user_id="u1" ) async for event in runner.run_async( user_id="u1", session_id=session.id, new_message=types.UserContent("What is the current version of Python?"), ): # Grounding metadata is surfaced via event.grounding_metadata # because propagate_grounding_metadata=True is set on the tool. if event.grounding_metadata: print("Search grounding sources:") for chunk in event.grounding_metadata.grounding_chunks or []: if chunk.web: print(f" - {chunk.web.title}: {chunk.web.uri}") if event.is_final_response(): print("\nFinal answer:", event.content.parts[0].text)
asyncio.run(main())2 · BaseCredentialExchanger + ExchangeResult + CredentialExchangeError — credential exchange interface
Section titled “2 · BaseCredentialExchanger + ExchangeResult + CredentialExchangeError — credential exchange interface”Source: google/adk/auth/exchanger/base_credential_exchanger.py
This module defines the three building blocks of ADK’s credential exchange pipeline, introduced in 2.3.0 as @experimental:
CredentialExchangeError— base exception for all exchange failures; raise it from custom exchangers to signal unrecoverable errors (as opposed to returning the original credential unchanged).ExchangeResult— aNamedTuplewith two fields:credential: AuthCredential(the resulting credential, possibly modified) andwas_exchanged: bool(whether the exchange actually occurred). Callers inspectwas_exchangedto detect no-ops.BaseCredentialExchanger— abstract base class with a single@abstractmethod:async def exchange(auth_credential, auth_scheme=None) -> ExchangeResult. Implementations may return the original credential withwas_exchanged=Falsewhen exchange is not applicable or not possible.
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
ExchangeResult is a NamedTuple | Immutable, unpacks as (credential, was_exchanged) — safe to destructure in callers. |
| No-op return pattern | Return ExchangeResult(original_credential, False) rather than raising when exchange simply isn’t needed (e.g., token already present). |
auth_scheme is optional | Some exchangers (e.g., API key rotators) don’t need the scheme; others (e.g., OAuth2CredentialExchanger) require it and raise CredentialExchangeError if absent. |
@experimental decorator | Emits a UserWarning on first import; may change without notice. |
Example 1 — implementing a custom API key rotating exchanger
Section titled “Example 1 — implementing a custom API key rotating exchanger”from typing import Optionalfrom google.adk.auth.auth_credential import AuthCredentialfrom google.adk.auth.auth_schemes import AuthSchemefrom google.adk.auth.exchanger.base_credential_exchanger import ( BaseCredentialExchanger, CredentialExchangeError, ExchangeResult,)
class ApiKeyRotatingExchanger(BaseCredentialExchanger): """Exchanges a placeholder API key for a fresh rotated key."""
def __init__(self, key_vault_url: str): self._vault_url = key_vault_url
async def exchange( self, auth_credential: AuthCredential, auth_scheme: Optional[AuthScheme] = None, ) -> ExchangeResult: if auth_credential.api_key is None: return ExchangeResult(auth_credential, False)
# Fetch a fresh key from the vault (simplified). fresh_key = await self._fetch_key_from_vault(auth_credential.api_key) if fresh_key is None: raise CredentialExchangeError( f"Vault returned no key for {self._vault_url}" ) updated = auth_credential.model_copy(update={"api_key": fresh_key}) return ExchangeResult(updated, True)
async def _fetch_key_from_vault(self, current_key: str) -> Optional[str]: # In production: call Secret Manager or Vault API. return "rotated-" + current_keyExample 2 — inspecting ExchangeResult
Section titled “Example 2 — inspecting ExchangeResult”import asynciofrom google.adk.auth.auth_credential import ( AuthCredential, AuthCredentialTypes, HttpAuth, HttpCredentials,)from google.adk.auth.exchanger.base_credential_exchanger import ExchangeResult
async def demonstrate_exchange_result(): # ExchangeResult is a NamedTuple — can unpack or use named attributes. # auth_type is required; http must wrap HttpCredentials in HttpAuth. fake_cred = AuthCredential( auth_type=AuthCredentialTypes.HTTP, http=HttpAuth(scheme="bearer", credentials=HttpCredentials(token="old-token")), ) result = ExchangeResult(credential=fake_cred, was_exchanged=False)
credential, was_exchanged = result # NamedTuple unpacking print(f"was_exchanged={was_exchanged}") # False
if not result.was_exchanged: print("Credential was returned unchanged — skip saving to store.") else: print("Credential was exchanged — persist updated credential.")
asyncio.run(demonstrate_exchange_result())Example 3 — error handling patterns for custom exchangers
Section titled “Example 3 — error handling patterns for custom exchangers”import asynciofrom typing import Optionalfrom google.adk.auth.auth_credential import ( AuthCredential, AuthCredentialTypes, HttpAuth, HttpCredentials,)from google.adk.auth.auth_schemes import AuthSchemefrom google.adk.auth.exchanger.base_credential_exchanger import ( BaseCredentialExchanger, CredentialExchangeError, ExchangeResult,)
class ServiceAccountTokenExchanger(BaseCredentialExchanger): """Exchanges a service account JSON key for a short-lived bearer token."""
async def exchange( self, auth_credential: AuthCredential, auth_scheme: Optional[AuthScheme] = None, ) -> ExchangeResult: if not auth_credential.service_account: # Not applicable to this credential type — signal no-op. return ExchangeResult(auth_credential, False)
try: token = await self._mint_token(auth_credential.service_account) except ConnectionError as exc: # Re-raise domain-specific errors as CredentialExchangeError # so callers can catch a single exception type. raise CredentialExchangeError( f"Failed to mint service account token: {exc}" ) from exc
# http expects HttpAuth, not a raw HttpCredentials or dict. # Also update auth_type — model_copy preserves the original value # (SERVICE_ACCOUNT), which would misdirect downstream dispatchers. updated = auth_credential.model_copy( update={ "auth_type": AuthCredentialTypes.HTTP, "http": HttpAuth(scheme="bearer", credentials=HttpCredentials(token=token)), "service_account": None, } ) return ExchangeResult(updated, True)
async def _mint_token(self, service_account) -> str: return "ya29.minted-token"
async def main(): exchanger = ServiceAccountTokenExchanger() # auth_type is required on AuthCredential. cred = AuthCredential(auth_type=AuthCredentialTypes.API_KEY, api_key="placeholder") try: result = await exchanger.exchange(cred) print("Exchanged:", result.was_exchanged) except CredentialExchangeError as e: print("Exchange failed:", e)
asyncio.run(main())3 · CredentialExchangerRegistry — type-keyed exchanger registry
Section titled “3 · CredentialExchangerRegistry — type-keyed exchanger registry”Source: google/adk/auth/exchanger/credential_exchanger_registry.py
CredentialExchangerRegistry is an @experimental registry that maps AuthCredentialTypes enum values to BaseCredentialExchanger instances. Its internal store is a plain dict[AuthCredentialTypes, BaseCredentialExchanger] (_exchangers). The two public methods are register(credential_type, exchanger_instance) and get_exchanger(credential_type) -> Optional[BaseCredentialExchanger].
The registry pattern decouples exchange logic selection from the auth pipeline: the pipeline asks the registry for an exchanger by credential type and, if one is found, delegates to it — without needing to know which concrete exchanger handles each type. CredentialManager builds its own internal registry; register custom exchangers on a manager instance via manager.register_credential_exchanger(credential_type, exchanger_instance) — there is no public API to inject an externally built registry.
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
| Dict-backed | _exchangers is a plain dict — last register() call for a given type wins; no duplicate protection. |
Returns None for missing types | get_exchanger() returns None rather than raising for unregistered types — callers should check before calling .exchange(). |
| Instance-based | One exchanger instance per type — the instance may carry per-type configuration (e.g., token endpoint URL). |
@experimental | Emits a UserWarning on import; API may change. |
Example 1 — building and populating a registry
Section titled “Example 1 — building and populating a registry”import warningswarnings.filterwarnings("ignore", category=UserWarning)
from google.adk.auth.auth_credential import AuthCredentialTypesfrom google.adk.auth.exchanger.credential_exchanger_registry import CredentialExchangerRegistryfrom google.adk.auth.exchanger.oauth2_credential_exchanger import OAuth2CredentialExchanger
registry = CredentialExchangerRegistry()
# Register the built-in OAuth2 exchanger for OAuth2 credential types.oauth2_exchanger = OAuth2CredentialExchanger()registry.register(AuthCredentialTypes.OAUTH2, oauth2_exchanger)
# Register for OIDC as well.registry.register(AuthCredentialTypes.OPEN_ID_CONNECT, oauth2_exchanger)
# Look up by type.exchanger = registry.get_exchanger(AuthCredentialTypes.OAUTH2)if exchanger: print("Found exchanger:", type(exchanger).__name__) # OAuth2CredentialExchangerelse: print("No exchanger registered for OAuth2")Example 2 — custom exchanger in a registry
Section titled “Example 2 — custom exchanger in a registry”import warningswarnings.filterwarnings("ignore", category=UserWarning)
import asynciofrom typing import Optionalfrom google.adk.auth.auth_credential import AuthCredential, AuthCredentialTypesfrom google.adk.auth.auth_schemes import AuthSchemefrom google.adk.auth.exchanger.base_credential_exchanger import ( BaseCredentialExchanger, ExchangeResult,)from google.adk.auth.exchanger.credential_exchanger_registry import CredentialExchangerRegistry
class ServiceTokenExchanger(BaseCredentialExchanger): def __init__(self, service_url: str): self._url = service_url
async def exchange( self, auth_credential: AuthCredential, auth_scheme: Optional[AuthScheme] = None ) -> ExchangeResult: # Exchange service account for bearer token (simplified). return ExchangeResult(auth_credential, False)
registry = CredentialExchangerRegistry()registry.register( AuthCredentialTypes.SERVICE_ACCOUNT, ServiceTokenExchanger(service_url="https://iam.googleapis.com/token"),)
async def run_exchange(cred: AuthCredential, cred_type: AuthCredentialTypes): exchanger = registry.get_exchanger(cred_type) if not exchanger: print(f"No exchanger for {cred_type}") return result = await exchanger.exchange(cred) print("was_exchanged:", result.was_exchanged)
asyncio.run(run_exchange(AuthCredential(api_key="key"), AuthCredentialTypes.SERVICE_ACCOUNT))Example 3 — overriding a registered exchanger at runtime
Section titled “Example 3 — overriding a registered exchanger at runtime”import warningswarnings.filterwarnings("ignore", category=UserWarning)
from google.adk.auth.auth_credential import AuthCredentialTypesfrom google.adk.auth.exchanger.credential_exchanger_registry import CredentialExchangerRegistryfrom google.adk.auth.exchanger.oauth2_credential_exchanger import OAuth2CredentialExchanger
class MockOAuth2Exchanger(OAuth2CredentialExchanger): """Test double that always reports exchange as skipped."""
async def exchange(self, auth_credential, auth_scheme=None): result = await super().exchange(auth_credential, auth_scheme) return result._replace(was_exchanged=False)
registry = CredentialExchangerRegistry()registry.register(AuthCredentialTypes.OAUTH2, OAuth2CredentialExchanger())
# In tests: swap in a mock without modifying the pipeline.registry.register(AuthCredentialTypes.OAUTH2, MockOAuth2Exchanger())
exchanger = registry.get_exchanger(AuthCredentialTypes.OAUTH2)print(type(exchanger).__name__) # MockOAuth2Exchanger — the latest register() wins4 · OAuth2CredentialExchanger — OAuth2 credential exchange (client credentials, auth code, PKCE)
Section titled “4 · OAuth2CredentialExchanger — OAuth2 credential exchange (client credentials, auth code, PKCE)”Source: google/adk/auth/exchanger/oauth2_credential_exchanger.py
OAuth2CredentialExchanger is the built-in @experimental implementation of BaseCredentialExchanger for OAuth2 and OIDC credentials. It handles two grant types — client_credentials and authorization_code — and requires authlib to be installed (gracefully skips with a warning if not available).
The exchange() method performs these steps in order:
- Scheme check — raises
CredentialExchangeErrorimmediately ifauth_schemeisNone; a scheme is always required. - authlib guard — if
authlibis not installed, returnsExchangeResult(credential, False)with a warning — never raises. - No-op check — if
auth_credential.oauth2.access_tokenis already set, return immediately withwas_exchanged=False. - Grant type detection — inspects the
auth_scheme.flows(forOAuth2schemes) orauth_scheme.grant_types_supported(forOpenIdConnectWithConfig); defaults OIDC toauthorization_codeunlessclient_credentialsis explicitly supported. - Client credentials exchange — calls
client.fetch_token(token_endpoint, grant_type=OAuthGrantType.CLIENT_CREDENTIALS)viaauthlib. - Auth code exchange — calls
client.fetch_token(token_endpoint, authorization_response=..., code=..., grant_type=...)with optionalcode_verifierfor PKCE; strips a trailing#fromauth_response_uribefore passing toauthlib(which can add extraneous trailing hashes).
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
| Scheme required | auth_scheme must not be None — the method raises CredentialExchangeError before any other check. |
| authlib guard | If authlib is not installed, returns ExchangeResult(credential, False) after a warning — checked after scheme validation. |
| Short-circuit on existing token | Checks auth_credential.oauth2.access_token presence after scheme/authlib checks — avoids redundant token fetches on repeat invocations. |
PKCE via code_verifier | If auth_credential.oauth2.code_verifier is set, it is forwarded to fetch_token() as code_verifier= keyword arg. |
Trailing # strip | _normalize_auth_uri() drops a trailing # from auth_response_uri — works around an authlib edge case. |
| Error handling | Logs errors but returns ExchangeResult(credential, False) on exchange failure — callers can still use the original credential. |
Example 1 — client credentials grant exchange
Section titled “Example 1 — client credentials grant exchange”import asyncioimport warningswarnings.filterwarnings("ignore", category=UserWarning)
from fastapi.openapi.models import OAuth2, OAuthFlows, OAuthFlowClientCredentialsfrom google.adk.auth.auth_credential import AuthCredential, OAuth2Authfrom google.adk.auth.exchanger.oauth2_credential_exchanger import OAuth2CredentialExchanger
async def demo_client_credentials(): # Build an OAuth2 scheme with a clientCredentials flow. scheme = OAuth2( flows=OAuthFlows( clientCredentials=OAuthFlowClientCredentials( tokenUrl="https://auth.example.com/oauth2/token", scopes={"read:data": "Read access"}, ) ) )
# Credential carries client_id and client_secret. cred = AuthCredential( oauth2=OAuth2Auth( client_id="my-service-client", client_secret="secret", ) )
exchanger = OAuth2CredentialExchanger() result = await exchanger.exchange(cred, auth_scheme=scheme) print("Exchanged:", result.was_exchanged) if result.was_exchanged: print("Access token:", result.credential.oauth2.access_token)
asyncio.run(demo_client_credentials())Example 2 — authorization code grant with PKCE
Section titled “Example 2 — authorization code grant with PKCE”import asyncioimport warningswarnings.filterwarnings("ignore", category=UserWarning)
from fastapi.openapi.models import OAuth2, OAuthFlows, OAuthFlowAuthorizationCodefrom google.adk.auth.auth_credential import AuthCredential, OAuth2Authfrom google.adk.auth.exchanger.oauth2_credential_exchanger import OAuth2CredentialExchanger
async def demo_auth_code_with_pkce(): scheme = OAuth2( flows=OAuthFlows( authorizationCode=OAuthFlowAuthorizationCode( authorizationUrl="https://auth.example.com/authorize", tokenUrl="https://auth.example.com/oauth2/token", scopes={"openid": "OpenID", "profile": "Profile"}, ) ) )
# Credential carries the auth code returned from the redirect callback # and the PKCE code_verifier generated before the authorization redirect. cred = AuthCredential( oauth2=OAuth2Auth( client_id="my-app-client", client_secret="secret", auth_code="AUTH_CODE_FROM_REDIRECT", auth_response_uri="https://app.example.com/callback?code=AUTH_CODE_FROM_REDIRECT#", code_verifier="random-verifier-string-64-chars", ) )
exchanger = OAuth2CredentialExchanger() result = await exchanger.exchange(cred, auth_scheme=scheme) print("Exchanged:", result.was_exchanged)
asyncio.run(demo_auth_code_with_pkce())Example 3 — short-circuit on already-valid token
Section titled “Example 3 — short-circuit on already-valid token”import asyncioimport warningswarnings.filterwarnings("ignore", category=UserWarning)
from fastapi.openapi.models import OAuth2, OAuthFlows, OAuthFlowClientCredentialsfrom google.adk.auth.auth_credential import AuthCredential, OAuth2Authfrom google.adk.auth.exchanger.oauth2_credential_exchanger import OAuth2CredentialExchanger
async def demo_skip_exchange_when_token_present(): scheme = OAuth2( flows=OAuthFlows( clientCredentials=OAuthFlowClientCredentials( tokenUrl="https://auth.example.com/oauth2/token", scopes={}, ) ) )
# Credential already has an access_token — exchange should be skipped. cred = AuthCredential( oauth2=OAuth2Auth( client_id="client", client_secret="secret", access_token="already-valid-token", ) )
exchanger = OAuth2CredentialExchanger() result = await exchanger.exchange(cred, auth_scheme=scheme) print("Exchanged:", result.was_exchanged) # False — no network call made print("Token unchanged:", result.credential.oauth2.access_token) # already-valid-token
asyncio.run(demo_skip_exchange_when_token_present())5 · SessionStateCredentialService — session-state credential persistence
Section titled “5 · SessionStateCredentialService — session-state credential persistence”Source: google/adk/auth/credential_service/session_state_credential_service.py
SessionStateCredentialService is an @experimental BaseCredentialService that stores and retrieves AuthCredential objects directly in the session’s mutable state dictionary. It is the simplest possible persistence backend — no external storage, no encryption — suitable for development and low-sensitivity workloads.
load_credential(auth_config, callback_context) performs a callback_context.state.get(auth_config.credential_key) and returns the value (or None if absent). save_credential(auth_config, callback_context) writes auth_config.exchanged_auth_credential to callback_context.state[auth_config.credential_key].
The source carries a prominent docstring caveat: storing credentials in session state may not be secure — use at your own risk. For production, prefer InMemoryCredentialService (volatile, in-process) or a custom BaseCredentialService backed by a secrets manager.
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
| State key | auth_config.credential_key is used as the dict key — derives from the tool’s auth scheme configuration. |
| Scope tied to session | Credentials expire when the session is deleted or garbage-collected; there is no cross-session sharing. |
| No serialisation | Stores the AuthCredential object directly in session state — relies on the session service’s serialisation for persistence across restarts. |
@experimental | Emits a UserWarning on import. |
Example 1 — registering SessionStateCredentialService with an agent
Section titled “Example 1 — registering SessionStateCredentialService with an agent”import warningswarnings.filterwarnings("ignore", category=UserWarning)
from fastapi.openapi.models import OAuth2, OAuthFlows, OAuthFlowAuthorizationCodefrom google.adk.agents import LlmAgentfrom google.adk.apps.app import Appfrom google.adk.auth.auth_credential import AuthCredential, AuthCredentialTypes, OAuth2Authfrom google.adk.auth.auth_tool import AuthConfigfrom google.adk.auth.credential_service.session_state_credential_service import ( SessionStateCredentialService,)from google.adk.runners import Runnerfrom google.adk.sessions import InMemorySessionServicefrom google.adk.tools.authenticated_function_tool import AuthenticatedFunctionTool
def get_calendar_events(credential: AuthCredential) -> list: """Fetch calendar events using the injected OAuth2 credential.""" # ADK injects the exchanged credential as the 'credential' argument. token = credential.oauth2.access_token if credential.oauth2 else None return [{"title": "Team meeting", "time": "10:00", "token_used": token}]
# Use OAuth2 so CredentialManager actually consults the credential service.# HTTP/API_KEY credentials are treated as "ready" immediately and bypass# load_credential/save_credential entirely.oauth2_scheme = OAuth2( flows=OAuthFlows( authorizationCode=OAuthFlowAuthorizationCode( authorizationUrl="https://accounts.google.com/o/oauth2/auth", tokenUrl="https://oauth2.googleapis.com/token", ) ))
calendar_tool = AuthenticatedFunctionTool( func=get_calendar_events, auth_config=AuthConfig( auth_scheme=oauth2_scheme, raw_auth_credential=AuthCredential( auth_type=AuthCredentialTypes.OAUTH2, oauth2=OAuth2Auth( client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET", ), ), ),)
agent = LlmAgent( name="calendar_agent", model="gemini-2.5-flash", instruction="Help manage calendar events.", tools=[calendar_tool],)
session_service = InMemorySessionService()# App uses root_agent (extra="forbid" — no agent/credential_service fields).# credential_service belongs on Runner, not App.app = App( name="calendar_app", root_agent=agent,)runner = Runner( app=app, session_service=session_service, credential_service=SessionStateCredentialService(),)print("App configured with SessionStateCredentialService and OAuth2 AuthenticatedFunctionTool")Example 2 — manually testing load/save via a mock CallbackContext
Section titled “Example 2 — manually testing load/save via a mock CallbackContext”import warningswarnings.filterwarnings("ignore", category=UserWarning)
import asynciofrom unittest.mock import MagicMockfrom google.adk.auth.auth_credential import AuthCredential, OAuth2Authfrom google.adk.auth.auth_tool import AuthConfigfrom google.adk.auth.credential_service.session_state_credential_service import ( SessionStateCredentialService,)
async def test_session_state_round_trip(): service = SessionStateCredentialService()
# Build a mock auth_config whose credential_key is "oauth2_creds". auth_config = MagicMock() auth_config.credential_key = "oauth2_creds" auth_config.exchanged_auth_credential = AuthCredential( oauth2=OAuth2Auth(access_token="test-token-123") )
# Build a mock callback_context with a real mutable state dict. state = {} callback_context = MagicMock() callback_context.state = state
# Test save. await service.save_credential(auth_config, callback_context) print("State keys after save:", list(state.keys())) # ['oauth2_creds']
# Test load. loaded = await service.load_credential(auth_config, callback_context) print("Loaded access_token:", loaded.oauth2.access_token) # test-token-123
# Test missing credential. auth_config.credential_key = "nonexistent" missing = await service.load_credential(auth_config, callback_context) print("Missing credential:", missing) # None
asyncio.run(test_session_state_round_trip())Example 3 — custom credential service for comparison (Redis-backed)
Section titled “Example 3 — custom credential service for comparison (Redis-backed)”import warningswarnings.filterwarnings("ignore", category=UserWarning)
import asyncioimport jsonfrom typing import Optionalfrom google.adk.auth.auth_credential import AuthCredentialfrom google.adk.auth.auth_tool import AuthConfigfrom google.adk.auth.credential_service.base_credential_service import BaseCredentialService
class RedisCredentialService(BaseCredentialService): """Production credential service backed by Redis for cross-session sharing.
Contrast with SessionStateCredentialService: credentials survive session deletion and are accessible across multiple sessions for the same user. """
def __init__(self, redis_client, ttl_seconds: int = 3600): self._redis = redis_client self._ttl = ttl_seconds
async def load_credential( self, auth_config: AuthConfig, callback_context ) -> Optional[AuthCredential]: key = f"creds:{auth_config.credential_key}:{callback_context.user_id}" raw = await self._redis.get(key) if raw is None: return None return AuthCredential.model_validate(json.loads(raw))
async def save_credential( self, auth_config: AuthConfig, callback_context ) -> None: key = f"creds:{auth_config.credential_key}:{callback_context.user_id}" cred = auth_config.exchanged_auth_credential await self._redis.setex(key, self._ttl, cred.model_dump_json())
# Usage:# import redis.asyncio as aioredis# redis_client = aioredis.from_url("redis://localhost:6379")# service = RedisCredentialService(redis_client, ttl_seconds=7200)print("RedisCredentialService defined — use as drop-in for SessionStateCredentialService")6 · NodeInterruptedError + NodeTimeoutError + DynamicNodeFailError — workflow error taxonomy
Section titled “6 · NodeInterruptedError + NodeTimeoutError + DynamicNodeFailError — workflow error taxonomy”Source: google/adk/workflow/_errors.py
Three distinct exception types form the workflow framework’s internal error taxonomy, each with a specific base class that governs how it interacts with user code and retry logic:
NodeInterruptedError(BaseException)— raised exclusively byctx.run_node()when a dynamic child node has unresolved HITL interrupt IDs. ExtendsBaseExceptiondeliberately —except Exceptionwill not catch it, ensuring the interrupt propagates up to the framework’sNodeRunnerrather than being accidentally swallowed by user-level error handling.NodeTimeoutError(Exception)— raised when a node exceeds its configuredtimeoutseconds. ExtendsException(notBaseException) so it is catchable byretry_config— a timed-out node can be automatically retried. Constructor uses keyword-only args:NodeTimeoutError(node_name="X", timeout=5.0)→ message"Node 'X' timed out after 5.0 seconds.".DynamicNodeFailError(Exception)— raised when a dynamic node (one invoked viactx.run_node()) fails; wraps the underlying exception and carrieserror: Exceptionanderror_node_path: strattributes. Not raised for static Workflow graph nodes — those propagate the original exception directly.
Key behaviours
Section titled “Key behaviours”| Exception | Base class | except Exception catches it? | Retryable? | Detail |
|---|---|---|---|---|
NodeInterruptedError | BaseException | No | No | HITL signal; caught only by the framework’s NodeRunner |
NodeTimeoutError | Exception | Yes | Yes | Works with retry_config; carries node_name and timeout attrs |
DynamicNodeFailError | Exception | Yes | No | Carries error (original exception) and error_node_path attrs |
Example 1 — understanding NodeInterruptedError cannot be caught by user code
Section titled “Example 1 — understanding NodeInterruptedError cannot be caught by user code”from google.adk.workflow._errors import NodeInterruptedError
# NodeInterruptedError extends BaseException, NOT Exception.# The following user-level try/except will NOT intercept it.try: raise NodeInterruptedError("HITL interrupt occurred")except Exception as e: print("Caught by 'except Exception' — this line NEVER runs")except BaseException as e: print(f"Caught by 'except BaseException': {type(e).__name__}") # NodeInterruptedError — the framework's NodeRunner catches it here.Example 2 — NodeTimeoutError in a retryable workflow node
Section titled “Example 2 — NodeTimeoutError in a retryable workflow node”import asynciofrom google.adk.agents import LlmAgentfrom google.adk.workflow._workflow import Workflowfrom google.adk.workflow._node import nodefrom google.adk.workflow._retry_config import RetryConfigfrom google.adk.workflow._errors import NodeTimeoutError
@node( timeout=5.0, # Node must complete within 5 seconds. retry_config=RetryConfig(max_attempts=3, initial_delay=1.0, backoff_factor=2.0),)async def slow_api_call(ctx): """Calls an external API that may time out.""" import asyncio # Simulate a slow API. In production, a real HTTP call goes here. await asyncio.sleep(10) # Will trigger NodeTimeoutError after 5s. return {"result": "ok"}
async def main(): workflow = Workflow( name="timeout_demo", # Workflow uses edges, not a nodes list — use ("START", node) tuple syntax. edges=[("START", slow_api_call)], ) # NodeTimeoutError is raised internally and caught by retry_config. # After 3 failed attempts, NodeTimeoutError propagates directly to the # caller — DynamicNodeFailError only wraps failures from ctx.run_node() # dynamic children, not from static Workflow graph edges. print("Workflow defined — NodeTimeoutError triggers retry_config automatically")
asyncio.run(main())Example 3 — inspecting DynamicNodeFailError in error handling
Section titled “Example 3 — inspecting DynamicNodeFailError in error handling”from google.adk.workflow._errors import DynamicNodeFailError
# DynamicNodeFailError carries the original error and the node path.original_error = ValueError("API returned 500")fail_error = DynamicNodeFailError( message="Dynamic node failed at /root/fetch_data", error=original_error, error_node_path="/root/fetch_data",)
print("error_node_path:", fail_error.error_node_path) # /root/fetch_dataprint("wrapped error:", fail_error.error) # ValueError: API returned 500print("str(fail_error):", str(fail_error)) # Dynamic node failed at /root/fetch_data
# Pattern: catch DynamicNodeFailError to identify which child failed.try: raise fail_errorexcept DynamicNodeFailError as e: print(f"Node '{e.error_node_path}' failed with: {type(e.error).__name__}: {e.error}")7 · ReplaySequenceBarrier — asyncio chronological replay ordering
Section titled “7 · ReplaySequenceBarrier — asyncio chronological replay ordering”Source: google/adk/workflow/utils/_replay_sequence_barrier.py
ReplaySequenceBarrier enforces deterministic replay order across concurrently-replaying workflow nodes. When a workflow is rehydrated from session history, multiple nodes may resume simultaneously but their side effects must reproduce in the original chronological order. The barrier solves this using a pre-built dictionary of asyncio.Event objects — one per sequence key — with a chain-unlock mechanism.
Construction: ReplaySequenceBarrier(sequence: list[str], timeout_sec: float = 15.0) — the sequence list defines the intended execution order. An asyncio.Event is created for each key; the first key’s event is pre-set (.set()) so the first node can proceed immediately.
wait(key) — if key is in the sequence, awaits its event with timeout_sec seconds. If key is not in the sequence (“silent” nodes that only wrote state but produced no output), it returns immediately (fast-forward). A timeout raises RuntimeError("Replay divergence detected: Timed out waiting for sequence key '...' to be unblocked.").
check_and_advance(key) — called after a node completes. If key matches sequence[current_index], increments current_index and sets the next event, unblocking the next waiting node.
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
| First key pre-set | sequence[0]’s event is set() in __init__ — the first node does not wait. |
| ”Silent” node fast-forward | Nodes not in sequence skip the wait() call and execute immediately, without blocking the chain. |
| 15-second default timeout | asyncio.wait_for(event.wait(), timeout=15.0) — raises RuntimeError with “Replay divergence detected” message if exceeded. |
| Chain-unlock pattern | Each check_and_advance() call unlocks exactly one subsequent event — strictly serial unblocking. |
| asyncio-native | Uses asyncio.Event — must be created and used within the same event loop. |
Example 1 — basic barrier for three sequential nodes
Section titled “Example 1 — basic barrier for three sequential nodes”import asynciofrom google.adk.workflow.utils._replay_sequence_barrier import ReplaySequenceBarrier
async def demo_sequence_barrier(): # Three nodes in chronological order from session history. sequence = ["fetch_data", "transform_data", "save_result"] barrier = ReplaySequenceBarrier(sequence, timeout_sec=5.0)
async def replay_node(name: str, delay: float = 0): """Simulates a replaying node.""" await asyncio.sleep(delay) # Simulate concurrent replay startup. await barrier.wait(name) # Block until it's this node's turn. print(f"Node '{name}' replaying...") await asyncio.sleep(0.05) # Simulate replay work. barrier.check_and_advance(name) # Unlock the next node.
# All three nodes start concurrently but replay in chronological order. await asyncio.gather( replay_node("save_result", delay=0), # Starts first but waits. replay_node("transform_data", delay=0), # Waits for fetch_data. replay_node("fetch_data", delay=0), # Proceeds immediately. ) print("All nodes replayed in correct order: fetch → transform → save")
asyncio.run(demo_sequence_barrier())Example 2 — silent node fast-forward
Section titled “Example 2 — silent node fast-forward”import asynciofrom google.adk.workflow.utils._replay_sequence_barrier import ReplaySequenceBarrier
async def demo_silent_node(): # 'log_event' produced only state updates — not in the sequence. sequence = ["fetch_data", "save_result"] barrier = ReplaySequenceBarrier(sequence, timeout_sec=5.0)
async def replay_with_silent(name: str): await barrier.wait(name) # 'log_event' is not in sequence — fast-forwards. print(f"Node '{name}' replaying") barrier.check_and_advance(name)
await asyncio.gather( replay_with_silent("fetch_data"), replay_with_silent("save_result"), replay_with_silent("log_event"), # Not in sequence; executes immediately. ) print("Done — log_event ran without blocking the sequence chain")
asyncio.run(demo_silent_node())Example 3 — timeout detection for diverged replay
Section titled “Example 3 — timeout detection for diverged replay”import asynciofrom google.adk.workflow.utils._replay_sequence_barrier import ReplaySequenceBarrier
async def demo_timeout(): # Sequence expects 'node_a' before 'node_b', but 'node_a' never advances. barrier = ReplaySequenceBarrier(["node_a", "node_b"], timeout_sec=0.5)
# Manually set 'node_a' as the first, but never call check_and_advance("node_a"). # 'node_b' will time out trying to wait. try: await barrier.wait("node_b") except RuntimeError as e: # RuntimeError: Replay divergence detected: Timed out waiting for # sequence key 'node_b' to be unblocked. assert "Replay divergence detected" in str(e) print("Caught expected divergence error:", e)
asyncio.run(demo_timeout())8 · TestSpec + UserMessage + TestCase — conformance test DSL
Section titled “8 · TestSpec + UserMessage + TestCase — conformance test DSL”Source: google/adk/cli/conformance/test_case.py
ADK 2.3.0 adds a YAML-driven conformance test DSL for exercising agents through the adk web server. The three types form the data layer of the adk conformance CLI command.
UserMessage(BaseModel)— represents one turn of user input. Exactly one oftext: Optional[str]orcontent: Optional[types.UserContent]should be set (oneof semantics, not enforced by Pydantic). An optionalstate_delta: Optional[dict[str, Any]]applies state changes before the turn is sent.TestSpec(BaseModel)— the human-authored test specification loaded fromspec.yaml. Configuresextra="forbid"so unknown YAML keys are rejected. Fields:description: str,agent: str(ADK agent name),initial_state: dict[str, Any](session creation state),user_messages: list[UserMessage].TestCase— a@dataclass(not Pydantic) composed after filesystem discovery. Fields:category: str(parent folder name),name: str(test folder name),dir: Path(absolute path to the test folder),test_spec: TestSpec.
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
extra="forbid" on TestSpec | Unknown YAML keys cause ValidationError at load time — typos in spec files are caught early. |
TestCase is a @dataclass | Discovered at runtime by ConformanceTestRunner._discover_test_cases() — not loaded from YAML directly. |
state_delta per message | Each UserMessage can carry a state_delta that is applied to the session before sending the message — enables stateful multi-turn test scenarios. |
| Category/name from path | category = test_case_dir.parent.name, name = test_case_dir.name — folder structure tests/{category}/{name}/spec.yaml. |
| Recordings files | Replay mode looks for generated-recordings.yaml (non-streaming) or generated-recordings-sse.yaml (SSE). |
Example 1 — writing a spec.yaml and loading it in Python
Section titled “Example 1 — writing a spec.yaml and loading it in Python”# spec.yaml (lives at tests/search/basic_search/spec.yaml):# description: "Verify the agent uses google_search for factual questions"# agent: research_agent# initial_state:# user_tier: premium# user_messages:# - text: "What is the capital of France?"# - text: "And what is its population?"# state_delta:# follow_up: true
import yamlfrom pathlib import Pathfrom google.adk.cli.conformance.test_case import TestSpec, UserMessage, TestCase
spec_yaml = """description: "Verify the agent uses google_search for factual questions"agent: research_agentinitial_state: user_tier: premiumuser_messages: - text: "What is the capital of France?" - text: "And what is its population?" state_delta: follow_up: true"""
spec_dict = yaml.safe_load(spec_yaml)test_spec = TestSpec.model_validate(spec_dict)print("Agent under test:", test_spec.agent) # research_agentprint("Initial state:", test_spec.initial_state) # {'user_tier': 'premium'}print("Turns:", len(test_spec.user_messages)) # 2print("Turn 2 state_delta:", test_spec.user_messages[1].state_delta) # {'follow_up': True}
# Build a TestCase (normally done by ConformanceTestRunner).test_case = TestCase( category="search", name="basic_search", dir=Path("tests/search/basic_search"), test_spec=test_spec,)print("TestCase path:", test_case.dir)Example 2 — unknown keys are rejected by TestSpec
Section titled “Example 2 — unknown keys are rejected by TestSpec”import yamlfrom google.adk.cli.conformance.test_case import TestSpecfrom pydantic import ValidationError
bad_spec_yaml = """description: "Test with unknown key"agent: my_agentunknown_field: this_should_failuser_messages: []"""
try: TestSpec.model_validate(yaml.safe_load(bad_spec_yaml))except ValidationError as e: # extra='forbid' catches the typo immediately. print("Validation error:", e.error_count(), "error(s)") for error in e.errors(): print(f" - {error['loc']}: {error['msg']}")Example 3 — UserMessage with typed content (not just text)
Section titled “Example 3 — UserMessage with typed content (not just text)”from google.genai import typesfrom google.adk.cli.conformance.test_case import UserMessage
# Text-based user message (most common).text_msg = UserMessage(text="Show me a recipe for pasta.")print("text_msg.text:", text_msg.text)
# Content-based message with an image part.image_part = types.Part( inline_data=types.Blob(mime_type="image/jpeg", data=b"\xff\xd8\xff") # minimal JPEG header)content_msg = UserMessage( content=types.UserContent(parts=[image_part]), state_delta={"has_image": True},)print("content_msg.content type:", type(content_msg.content).__name__)print("content_msg.state_delta:", content_msg.state_delta)9 · AdkWebServerClient — conformance test HTTP client with SSE streaming
Section titled “9 · AdkWebServerClient — conformance test HTTP client with SSE streaming”Source: google/adk/cli/conformance/adk_web_server_client.py
AdkWebServerClient is an async HTTP client purpose-built for ADK conformance testing. It wraps httpx.AsyncClient and communicates with the adk web FastAPI server (/apps/..., /run_sse, /version). The client supports both manual lifecycle management and async context manager usage.
Key methods:
create_session(app_name, user_id, state=None)→Sessionrun_agent(request, mode=None, test_case_dir=None, user_message_index=None)→AsyncGenerator[Event, None]— streams SSE events; injects_adk_recordings_configor_adk_replay_configintorequest.state_deltawhenmodeis"record"or"replay".update_session(app_name, user_id, session_id, state_delta)→Session— applies state changes without running the agent (viaPATCH).get_artifact_version_metadata(...)andlist_artifact_versions_metadata(...)— artifact introspection.
The client has a single internal httpx.AsyncClient instance (_client) created lazily on first _get_client() call and reused across calls. close() / __aexit__ dispose it via aclose().
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
Default base_url | http://127.0.0.1:8000 — matches the default port of adk web. |
Default timeout | 30.0 seconds — wrap with httpx.Timeout(...) internally. |
_adk_replay_config injection | Sets streaming_mode: "sse" or "none" based on request.streaming — must match the mode used during recording. |
| SSE parse | Reads lines starting with "data:", strips the prefix, parses JSON, validates as Event. Lines with "error" key in the JSON raise RuntimeError. |
state_delta mutation | run_agent() mutates request.state_delta in place when mode is set — initialises the dict if None. |
| Context manager lifecycle | async with AdkWebServerClient() as client: ... — aclose() is called on __aexit__. |
Example 1 — running an agent and streaming events
Section titled “Example 1 — running an agent and streaming events”import asynciofrom pathlib import Pathfrom google.adk.cli.conformance.adk_web_server_client import AdkWebServerClientfrom google.adk.cli.adk_web_server import RunAgentRequestfrom google.genai import types
async def run_conformance_replay(test_case_dir: str, user_message_index: int): """Run a single conformance replay turn and print collected events.""" async with AdkWebServerClient(base_url="http://127.0.0.1:8000") as client: # Create a session for the test. session = await client.create_session( app_name="research_app", user_id="conformance_tester", state={"user_tier": "premium"}, ) print("Created session:", session.id)
# new_message is typed as types.Content, not a raw string. request = RunAgentRequest( app_name="research_app", user_id="conformance_tester", session_id=session.id, new_message=types.Content( role="user", parts=[types.Part(text="What is the capital of France?")], ), streaming=False, )
events = [] async for event in client.run_agent( request, mode="replay", test_case_dir=test_case_dir, user_message_index=user_message_index, ): events.append(event) if event.is_final_response(): print("Final response:", event.content.parts[0].text)
print(f"Total events received: {len(events)}")
# Cleanup. await client.delete_session( app_name="research_app", user_id="conformance_tester", session_id=session.id, )
# asyncio.run(run_conformance_replay("tests/search/basic_search", user_message_index=0))print("AdkWebServerClient example defined — start 'adk web' before running")Example 2 — record mode to generate a golden YAML fixture
Section titled “Example 2 — record mode to generate a golden YAML fixture”import asynciofrom google.adk.cli.conformance.adk_web_server_client import AdkWebServerClientfrom google.adk.cli.adk_web_server import RunAgentRequestfrom google.genai import types
async def record_golden_run(output_dir: str): """Record agent responses as a golden fixture for future conformance tests.""" async with AdkWebServerClient() as client: session = await client.create_session( app_name="research_app", user_id="golden_recorder", )
user_messages = [ "What is the capital of France?", "What is its population?", ]
for index, message in enumerate(user_messages): # new_message is types.Content — wrap each string turn. request = RunAgentRequest( app_name="research_app", user_id="golden_recorder", session_id=session.id, new_message=types.Content( role="user", parts=[types.Part(text=message)] ), streaming=False, ) # mode="record" injects _adk_recordings_config into state_delta. async for event in client.run_agent( request, mode="record", test_case_dir=output_dir, user_message_index=index, ): if event.is_final_response(): print(f"Turn {index}: recorded response")
print(f"Golden recording saved to {output_dir}/generated-recordings.yaml")
# asyncio.run(record_golden_run("tests/search/basic_search"))print("Record example defined — requires running 'adk web' server")Example 3 — applying state_delta between turns via update_session
Section titled “Example 3 — applying state_delta between turns via update_session”import asynciofrom google.adk.cli.conformance.adk_web_server_client import AdkWebServerClientfrom google.adk.cli.adk_web_server import RunAgentRequestfrom google.genai import types
async def multi_turn_with_state_update(): """Demonstrate mid-session state mutation without running the agent.""" async with AdkWebServerClient() as client: session = await client.create_session( app_name="stateful_app", user_id="test_user", state={"phase": "onboarding"}, ) print("Initial phase:", session.state.get("phase")) # onboarding
# Mutate state between turns without invoking the agent. updated_session = await client.update_session( app_name="stateful_app", user_id="test_user", session_id=session.id, state_delta={"phase": "active", "credits": 100}, ) print("Updated phase:", updated_session.state.get("phase")) # active
request = RunAgentRequest( app_name="stateful_app", user_id="test_user", session_id=session.id, new_message=types.Content( role="user", parts=[types.Part(text="Check my account status")] ), streaming=False, ) async for event in client.run_agent(request): if event.is_final_response(): print("Agent response received")
# asyncio.run(multi_turn_with_state_update())print("Multi-turn state update example defined")10 · ToolConnectionAnalyzer + ToolConnectionMap + StatefulParameter — LLM-driven tool dependency analysis
Section titled “10 · ToolConnectionAnalyzer + ToolConnectionMap + StatefulParameter — LLM-driven tool dependency analysis”Source: google/adk/tools/environment_simulation/tool_connection_analyzer.py and tool_connection_map.py
These three classes form the analysis layer of ADK’s @experimental(FeatureName.ENVIRONMENT_SIMULATION) environment simulation feature. Rather than requiring developers to manually specify which tools create or consume stateful identifiers (like ticket_id or order_id), ToolConnectionAnalyzer delegates this analysis to an LLM.
StatefulParameter(BaseModel) — represents a single stateful parameter and its tool associations: parameter_name: str, creating_tools: List[str], consuming_tools: List[str].
ToolConnectionMap(BaseModel) — a Pydantic model with a single field stateful_parameters: List[StatefulParameter]. Created via ToolConnectionMap.model_validate(response_json) from the LLM’s JSON output.
ToolConnectionAnalyzer — takes llm_name: str and llm_config: GenerateContentConfig in its constructor. Its analyze(tools: List[BaseTool]) -> ToolConnectionMap method builds tool schema JSON, sends it to the LLM with a structured prompt, and parses the JSON response with re.sub stripping markdown fences before json.loads. The analysis result is then passed to ToolSpecMockStrategy to generate realistic mock responses.
Key behaviours
Section titled “Key behaviours”| Behaviour | Detail |
|---|---|
| LLM-driven analysis | analyze() calls self._llm.generate_content_async(request) with response_mime_type="application/json" — forces JSON-only output. |
| Tool schema extraction | Calls tool._get_declaration().model_dump(exclude_none=True) per tool — any tool without a declaration is silently skipped. |
| Markdown fence stripping | re.sub(r"^```[a-zA-Z]*\n", "", text) + re.sub(r"\n```$", "", text) — handles LLMs that wrap JSON in code blocks. |
| Graceful degradation | On json.JSONDecodeError, the handler intends to log a warning and return ToolConnectionMap(stateful_parameters=[]). Bug in ADK 2.3.0: the except clause omits as e, so the log call references an unbound name and a NameError escapes instead — treat JSON parse failures as fatal until this is patched upstream. |
| Creating vs consuming | A “creating” tool generates or modifies a resource; a “consuming” tool only reads it. ToolSpecMockStrategy uses this to update state_store only after creating tool calls. |
| Feature flag guard | @experimental(FeatureName.ENVIRONMENT_SIMULATION) — emits UserWarning on import. |
Example 1 — analyzing a customer support toolset
Section titled “Example 1 — analyzing a customer support toolset”import asyncioimport warningswarnings.filterwarnings("ignore", category=UserWarning)
from google.adk.tools.function_tool import FunctionToolfrom google.adk.tools.environment_simulation.tool_connection_analyzer import ToolConnectionAnalyzerfrom google.genai import types as genai_types
def create_ticket(user_id: str, subject: str, priority: str) -> dict: """Create a new support ticket for a user.""" return {"ticket_id": "TKT-001", "status": "open"}
def get_ticket(ticket_id: str) -> dict: """Retrieve details for a specific support ticket.""" return {"ticket_id": ticket_id, "status": "open", "subject": "..."}
def cancel_ticket(ticket_id: str) -> dict: """Cancel an existing support ticket.""" return {"ticket_id": ticket_id, "status": "cancelled"}
def list_user_tickets(user_id: str) -> list: """List all tickets for a given user.""" return []
tools = [ FunctionTool(func=create_ticket), FunctionTool(func=get_ticket), FunctionTool(func=cancel_ticket), FunctionTool(func=list_user_tickets),]
# In production: analyzer calls the LLM to detect stateful parameters.# cancel_ticket mutates ticket state, so a well-tuned LLM should classify# it as a creating tool (writes back to state_store), not consuming.# analyzer = ToolConnectionAnalyzer(# llm_name="gemini-2.5-flash",# llm_config=genai_types.GenerateContentConfig(temperature=0),# )# connection_map = await analyzer.analyze(tools)# print(connection_map.stateful_parameters[0].parameter_name) # ticket_id# print(connection_map.stateful_parameters[0].creating_tools) # ['create_ticket', 'cancel_ticket']# print(connection_map.stateful_parameters[0].consuming_tools) # ['get_ticket', 'list_user_tickets']
print("ToolConnectionAnalyzer example defined — requires Gemini API key to run")print("Tools:", [t.name for t in tools])Example 2 — manually building a ToolConnectionMap for testing
Section titled “Example 2 — manually building a ToolConnectionMap for testing”import warningswarnings.filterwarnings("ignore", category=UserWarning)
from google.adk.tools.environment_simulation.tool_connection_map import ( ToolConnectionMap, StatefulParameter,)
# Build a ToolConnectionMap directly for use with ToolSpecMockStrategy# without calling the LLM (e.g., in unit tests or when the schema is known).connection_map = ToolConnectionMap( stateful_parameters=[ StatefulParameter( parameter_name="order_id", # Mutating operations (cancel changes order state) belong in creating_tools # so ToolSpecMockStrategy writes the updated state back to state_store. creating_tools=["create_order", "clone_order", "cancel_order"], consuming_tools=["get_order", "list_orders"], ), StatefulParameter( parameter_name="user_id", # delete_user mutates state — it must be a creating tool, not consuming. creating_tools=["create_user", "delete_user"], consuming_tools=["get_user", "list_user_orders"], ), ])
print("Stateful parameters:", len(connection_map.stateful_parameters))for param in connection_map.stateful_parameters: print(f" {param.parameter_name}:") print(f" creating: {param.creating_tools}") print(f" consuming: {param.consuming_tools}")
# Serialise for inspection.print("\nJSON dump:")print(connection_map.model_dump_json(indent=2))Example 3 — integrating ToolConnectionMap with EnvironmentSimulationConfig
Section titled “Example 3 — integrating ToolConnectionMap with EnvironmentSimulationConfig”import warningswarnings.filterwarnings("ignore", category=UserWarning)
import warningswarnings.filterwarnings("ignore", category=UserWarning)
from google.adk.tools.environment_simulation.tool_connection_map import ( ToolConnectionMap, StatefulParameter,)from google.adk.tools.environment_simulation.environment_simulation_config import ( EnvironmentSimulationConfig, MockStrategy, ToolSimulationConfig,)
# Pre-build the connection map (produced by ToolConnectionAnalyzer, or hand-coded).# The map is consumed internally by ToolSpecMockStrategy; it is NOT a field on# EnvironmentSimulationConfig itself.connection_map = ToolConnectionMap( stateful_parameters=[ StatefulParameter( parameter_name="reservation_id", creating_tools=["make_reservation"], consuming_tools=["get_reservation", "cancel_reservation"], ) ])
# EnvironmentSimulationConfig requires tool_simulation_configs (non-empty list).# Each ToolSimulationConfig maps a tool name to a mock strategy.simulation_config = EnvironmentSimulationConfig( simulation_model="gemini-2.5-flash", tool_simulation_configs=[ ToolSimulationConfig( tool_name="make_reservation", mock_strategy_type=MockStrategy.MOCK_STRATEGY_TOOL_SPEC, ), ToolSimulationConfig( tool_name="get_reservation", mock_strategy_type=MockStrategy.MOCK_STRATEGY_TOOL_SPEC, ), ToolSimulationConfig( tool_name="cancel_reservation", mock_strategy_type=MockStrategy.MOCK_STRATEGY_TOOL_SPEC, ), ],)
print("Simulation config built successfully")print("Creating tools for reservation_id:", connection_map.stateful_parameters[0].creating_tools)# NOTE: EnvironmentSimulationEngine.__init__ only accepts EnvironmentSimulationConfig.# There is no public argument to inject a prebuilt ToolConnectionMap — the engine# builds its own map internally by running ToolConnectionAnalyzer on first use.# The connection_map above illustrates the data structure and can be used for# unit testing ToolConnectionAnalyzer outputs, but cannot be wired into the engine.