Google ADK Advanced (Python)
Verified against google-adk==2.3.0 (google/adk/planners/, google/adk/tools/, google/adk/auth/).
Planners
Section titled “Planners”Planners modify how an LlmAgent reasons before it acts. Two implementations ship with ADK; you can also subclass BasePlanner.
BuiltInPlanner — native model thinking
Section titled “BuiltInPlanner — native model thinking”BuiltInPlanner wraps Gemini’s built-in chain-of-thought thinking feature. It injects thinking_config into every LlmRequest emitted by the agent. No extra prompt engineering needed — the model reasons internally.
from google.adk.agents import LlmAgentfrom google.adk.planners import BuiltInPlannerfrom google.genai import types
# Thinking with a token budgetagent = LlmAgent( name="analyst", model="gemini-2.5-pro", # thinking is a Gemini 2.5 feature instruction="Analyse the dataset step-by-step and produce a concise summary.", planner=BuiltInPlanner( thinking_config=types.ThinkingConfig( include_thoughts=True, # thoughts appear as Part(thought=True) in events thinking_budget=8192, # max tokens dedicated to reasoning ) ),)When include_thoughts=True is set, events will contain Part objects where part.thought is True. Filter them out when rendering to users:
async for event in runner.run_async(...): if event.content: visible_text = "".join( p.text or "" for p in event.content.parts or [] if p.text and not p.thought ) if visible_text: print(visible_text)BuiltInPlanner source (planners/built_in_planner.py):
build_planning_instructionreturnsNone— no extra system prompt is injected.process_planning_responsereturnsNone— response parts are not post-processed.apply_thinking_configis the actual workhorse — it setsllm_request.config.thinking_config.
Precedence: If you also set thinking_config inside generate_content_config on the agent, the planner’s config wins (a UserWarning is emitted).
PlanReActPlanner — textual Plan-then-Act
Section titled “PlanReActPlanner — textual Plan-then-Act”PlanReActPlanner does not require model-native thinking. Instead, it appends a system instruction that asks the model to output its reasoning wrapped in special tags (/*PLANNING*/, /*REASONING*/, /*ACTION*/, /*FINAL_ANSWER*/). The planner’s process_planning_response strips these into thought=True parts before returning.
from google.adk.planners import PlanReActPlannerfrom google.adk.agents import LlmAgent
agent = LlmAgent( name="planner_bot", model="gemini-2.5-flash", # works with any model — no native thinking needed instruction="You are a research assistant.", tools=[my_search_tool, my_db_tool], planner=PlanReActPlanner(),)The model will produce text like:
/*PLANNING*/I need to search for X, then look up Y in the database./*PLANNING*/
/*ACTION*/[function call: search(query="X")]ADK classifies /*PLANNING*/ / /*REASONING*/ / /*ACTION*/ blocks as thoughts and /*FINAL_ANSWER*/ as the user-visible response.
When to choose PlanReActPlanner over BuiltInPlanner:
- You are using a non-Gemini model that doesn’t support built-in thinking.
- You want the plan to be visible in the conversation history (it becomes a
thought=Truepart). - You want model-independent, reproducible reasoning traces.
Custom BasePlanner
Section titled “Custom BasePlanner”Implement build_planning_instruction to inject custom system text and process_planning_response to post-process parts.
from typing import List, Optionalfrom google.genai import typesfrom google.adk.planners.base_planner import BasePlannerfrom google.adk.agents.callback_context import CallbackContextfrom google.adk.agents.readonly_context import ReadonlyContextfrom google.adk.models.llm_request import LlmRequest
class BudgetPlanner(BasePlanner): """Inject a token-budget hint into the system instruction."""
def __init__(self, max_steps: int = 5): self.max_steps = max_steps
def build_planning_instruction( self, readonly_context: ReadonlyContext, llm_request: LlmRequest, ) -> Optional[str]: return ( f"You have at most {self.max_steps} tool calls to answer. " "Plan carefully before acting." )
def process_planning_response( self, callback_context: CallbackContext, response_parts: List[types.Part], ) -> Optional[List[types.Part]]: return None # no post-processing needed
agent = LlmAgent( name="budget_agent", model="gemini-2.5-flash", planner=BudgetPlanner(max_steps=3), tools=[...],)Custom BaseTool
Section titled “Custom BaseTool”FunctionTool covers most use cases. Subclass BaseTool when you need full control over the FunctionDeclaration sent to the model or when the tool must mutate the LlmRequest before it leaves (e.g. inject inline data, set response_modalities).
from typing import Any, Optionalfrom google.genai import typesfrom google.adk.tools.base_tool import BaseToolfrom google.adk.tools.tool_context import ToolContext
class DatabaseLookupTool(BaseTool): """Queries an internal product database by SKU."""
def __init__(self, db_pool): super().__init__( name="lookup_product", description="Retrieve product details by SKU from the internal database.", ) self._db = db_pool
def _get_declaration(self) -> types.FunctionDeclaration: return types.FunctionDeclaration( name=self.name, description=self.description, parameters=types.Schema( type=types.Type.OBJECT, properties={ "sku": types.Schema( type=types.Type.STRING, description="The SKU identifier (e.g. 'WIDGET-42').", ), }, required=["sku"], ), )
async def run_async( self, *, args: dict[str, Any], tool_context: ToolContext ) -> dict: sku = args.get("sku") if not sku: return {"error": "sku is required"} row = await self._db.fetchrow( "SELECT name, price, stock FROM products WHERE sku = $1", sku ) if row is None: return {"error": f"SKU {sku!r} not found"} tool_context.state["last_sku"] = sku return {"name": row["name"], "price": row["price"], "stock": row["stock"]}
# Usagedb_tool = DatabaseLookupTool(db_pool=my_pool)agent = LlmAgent( name="catalog", model="gemini-2.5-flash", instruction="Use lookup_product to answer product questions.", tools=[db_tool],)Overriding process_llm_request
Section titled “Overriding process_llm_request”Use this when you need to inject tool-related data into the request before it reaches the model — not the same as a before_model callback:
from google.adk.models.llm_request import LlmRequest
class ComputerUseTool(BaseTool): """Injects a computer-use tool declaration into the LlmRequest."""
def __init__(self): super().__init__( name="computer_use", description="Control the browser/desktop.", )
def _get_declaration(self) -> Optional[types.FunctionDeclaration]: return None # built-in; not declared in config.tools
async def process_llm_request( self, *, tool_context: ToolContext, llm_request: LlmRequest ) -> None: # Inject a built-in computer-use block llm_request.config = llm_request.config or types.GenerateContentConfig() llm_request.config.tools = llm_request.config.tools or [] llm_request.config.tools.append( types.Tool(computer_use=types.ToolComputerUse(...)) )
async def run_async( self, *, args: dict[str, Any], tool_context: ToolContext ) -> dict: ...Custom BaseToolset
Section titled “Custom BaseToolset”BaseToolset exposes a dynamic list of tools per invocation — useful when the available tools vary by user, tenant, or session state.
from typing import Optionalfrom google.adk.tools.base_toolset import BaseToolsetfrom google.adk.tools.base_tool import BaseToolfrom google.adk.tools.function_tool import FunctionToolfrom google.adk.agents.readonly_context import ReadonlyContext
class TenantToolset(BaseToolset): """Exposes only the tools the current tenant has permission to use."""
def __init__(self, permission_service): super().__init__() self._permissions = permission_service
async def get_tools( self, readonly_context: Optional[ReadonlyContext] = None ) -> list[BaseTool]: if readonly_context is None: return []
tenant_id = readonly_context.state.get("tenant_id") allowed = await self._permissions.get_allowed_tools(tenant_id)
tools = [] if "search" in allowed: tools.append(FunctionTool(func=self._search)) if "write" in allowed: tools.append(FunctionTool(func=self._write_record)) return tools
async def _search(self, query: str) -> dict: """Search tenant data.
Args: query: The search query. Returns: A dict with key `results`. """ ...
async def _write_record(self, record: dict) -> dict: """Write a record.
Args: record: The record to write. Returns: A dict with key `id` of the created record. """ ...
async def close(self) -> None: # release any held connections await self._permissions.close()
# Usage — toolset is resolved fresh on every invocationagent = LlmAgent( name="tenant_agent", model="gemini-2.5-flash", tools=[TenantToolset(permission_service=svc)],)Built-in toolset caching
Section titled “Built-in toolset caching”BaseToolset.get_tools_with_prefix (called by the framework) caches the tool list per invocation ID. If your toolset state doesn’t change within a turn, this is transparent. To disable the cache (for toolsets that change tool availability mid-turn):
class VolatileToolset(BaseToolset): def __init__(self): super().__init__() self._use_invocation_cache = False # force re-eval on every LLM call
async def get_tools(self, readonly_context=None) -> list[BaseTool]: ...Tool name prefixing
Section titled “Tool name prefixing”tool_name_prefix prevents collisions when the same toolset is registered multiple times (e.g. per-tenant filesystem toolsets):
fs_a = TenantToolset(permission_service=svc_a)fs_a.tool_name_prefix = "tenant_a"
fs_b = TenantToolset(permission_service=svc_b)fs_b.tool_name_prefix = "tenant_b"
# Model sees: tenant_a_search, tenant_b_search, etc.agent = LlmAgent(name="root", tools=[fs_a, fs_b])Authenticated tools
Section titled “Authenticated tools”API-key tools
Section titled “API-key tools”import httpxfrom google.adk.auth.auth_tool import AuthConfigfrom google.adk.auth.auth_schemes import APIKeySchemefrom google.adk.tools.function_tool import FunctionToolfrom google.adk.tools.tool_context import ToolContext
async def call_external_api(endpoint: str, tool_context: ToolContext) -> dict: """Call the external analytics API.
Args: endpoint: The API endpoint path. Returns: A dict with the API response data. """ auth_config = AuthConfig(auth_scheme=APIKeyScheme(name="X-Api-Key")) cred = tool_context.get_auth_response(auth_config) if cred is None: # First call — trigger the credential collection flow tool_context.request_credential(auth_config) return {"status": "auth_required"} api_key = cred.api_key.value async with httpx.AsyncClient() as client: resp = await client.get( f"https://analytics.example.com{endpoint}", headers={"X-Api-Key": api_key}, ) return resp.json()
tool = FunctionTool(func=call_external_api)OAuth2 / OIDC tools
Section titled “OAuth2 / OIDC tools”import httpxfrom google.adk.auth.auth_tool import AuthConfigfrom google.adk.auth.auth_schemes import OpenIdConnectWithConfigfrom google.adk.auth.auth_credential import AuthCredential, OAuth2Authfrom google.adk.tools.tool_context import ToolContext
GITHUB_OIDC = OpenIdConnectWithConfig( authorization_endpoint="https://github.com/login/oauth/authorize", token_endpoint="https://github.com/login/oauth/access_token", scopes=["read:user", "repo"],)
GITHUB_AUTH_CONFIG = AuthConfig( auth_scheme=GITHUB_OIDC, raw_auth_credential=AuthCredential( auth_type="oauth2", oauth2=OAuth2Auth( client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET", ), ),)
async def list_github_repos(tool_context: ToolContext) -> dict: """List the authenticated user's GitHub repositories.
Returns: A dict with key `repos` containing a list of repo names. """ cred = tool_context.get_auth_response(GITHUB_AUTH_CONFIG) if cred is None: # First call — ADK will trigger the OAuth flow tool_context.request_credential(GITHUB_AUTH_CONFIG) return {"status": "auth_required"} token = cred.oauth2.access_token async with httpx.AsyncClient() as client: resp = await client.get( "https://api.github.com/user/repos", headers={"Authorization": f"Bearer {token}"}, ) repos = [r["name"] for r in resp.json()] return {"repos": repos}Multi-agent coordination
Section titled “Multi-agent coordination”Fan-out with AgentTool
Section titled “Fan-out with AgentTool”Calling multiple specialists concurrently from a coordinator:
import asynciofrom google.adk.agents import LlmAgentfrom google.adk.tools import AgentToolfrom google.adk.runners import InMemoryRunnerfrom pydantic import BaseModel
class ResearchQuery(BaseModel): query: str
class CodeQuery(BaseModel): question: str
researcher = LlmAgent( name="researcher", model="gemini-2.5-flash", instruction="Research the query and cite sources.", input_schema=ResearchQuery, tools=[google_search],)
coder = LlmAgent( name="coder", model="gemini-2.5-pro", instruction="Answer the coding question with a minimal Python example.", input_schema=CodeQuery,)
coordinator = LlmAgent( name="coordinator", model="gemini-2.5-flash", instruction=( "For factual questions use `researcher`. " "For coding questions use `coder`. " "For questions mixing both, call both in parallel then synthesise." ), tools=[ AgentTool(agent=researcher), AgentTool(agent=coder), ],)Chaining via output_key and {state} templates
Section titled “Chaining via output_key and {state} templates”drafter = LlmAgent( name="drafter", model="gemini-2.5-flash", instruction="Write a 3-sentence press release about {topic}.", output_key="draft", # writes the reply to session.state["draft"])
editor = LlmAgent( name="editor", model="gemini-2.5-pro", instruction="Polish the following draft:\n\n{draft}\n\nReturn only the final text.", include_contents="none", # ignore history; read only from state)Run both in a Workflow sequential pipeline; {draft} in the editor’s instruction resolves to session.state["draft"] at runtime.
Human-in-the-loop (HITL)
Section titled “Human-in-the-loop (HITL)”Long-running tool
Section titled “Long-running tool”from google.adk.tools import LongRunningFunctionToolfrom google.adk.tools.tool_context import ToolContextimport asyncio
async def request_approval( amount: float, description: str, tool_context: ToolContext,) -> dict: """Request manager approval for an expense.
Args: amount: Expense amount in USD. description: What the expense is for. Returns: A dict with `status` ('pending', 'approved', or 'rejected'). """ ticket_id = f"EXP-{int(asyncio.get_event_loop().time())}" tool_context.state["approval_ticket"] = ticket_id # Persist the function call id so the webhook handler can reference it tool_context.state["approval_call_id"] = tool_context.function_call_id # Fire-and-forget: notify the approver via Slack / email await notify_approver(ticket_id, amount, description) return { "status": "pending", "ticket_id": ticket_id, "message": "Approval request sent. The manager will respond within 24 hours.", }
approval_tool = LongRunningFunctionTool(func=request_approval)
agent = LlmAgent( name="expense_bot", model="gemini-2.5-flash", instruction="Help the user submit expense reports.", tools=[approval_tool],)When the manager responds (via webhook or another invocation), retrieve the stored call ID from the session and resume via runner.run_async:
from google.genai import types
# Fetch the session to retrieve the persisted function call idsession = await runner.session_service.get_session( app_name="expense_app", user_id="u1", session_id="s1")call_id = session.state.get("approval_call_id")
async for event in runner.run_async( user_id="u1", session_id="s1", new_message=types.Content( role="user", parts=[ types.Part( function_response=types.FunctionResponse( name="request_approval", id=call_id, # retrieved from session state saved by the tool response={"status": "approved"}, ) ) ], ),): if event.is_final_response() and event.content: print("Agent:", "".join(p.text or "" for p in event.content.parts or []))Deployment — Cloud Run (FastAPI)
Section titled “Deployment — Cloud Run (FastAPI)”# main.py — FastAPI wrapper for Cloud Runimport osfrom contextlib import asynccontextmanagerfrom fastapi import FastAPI, HTTPExceptionfrom google.adk.agents import LlmAgentfrom google.adk.apps import Appfrom google.adk.runners import Runnerfrom google.adk.sessions import DatabaseSessionServicefrom google.adk.plugins import LoggingPluginfrom google.genai import types
SESSION_DB = os.environ["SESSION_DB_URL"]
agent = LlmAgent( name="assistant", model="gemini-2.5-flash", instruction="You are a helpful assistant.",)
app_container = App( name="assistant_app", root_agent=agent, plugins=[LoggingPlugin()],)
runner: Runner | None = None
@asynccontextmanagerasync def lifespan(app: FastAPI): global runner runner = Runner( app=app_container, session_service=DatabaseSessionService(db_url=SESSION_DB), ) yield await runner.close()
web_app = FastAPI(lifespan=lifespan)
@web_app.post("/chat/{session_id}")async def chat(session_id: str, body: dict): if runner is None: raise HTTPException(status_code=503, detail="Service not ready") user_id = body.get("user_id", "anonymous") text = body.get("message", "") if not text: raise HTTPException(status_code=400, detail="message is required")
# Ensure session exists svc = runner.session_service if not await svc.get_session(app_name="assistant_app", user_id=user_id, session_id=session_id): await svc.create_session( app_name="assistant_app", user_id=user_id, session_id=session_id )
response_text = "" async for event in runner.run_async( user_id=user_id, session_id=session_id, new_message=types.Content(role="user", parts=[types.Part(text=text)]), ): if event.is_final_response() and event.content: response_text = "".join(p.text or "" for p in event.content.parts or []) return {"reply": response_text}Cloud Run Dockerfile:
FROM python:3.12-slimWORKDIR /appCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txtCOPY . .EXPOSE 8080CMD ["uvicorn", "main:web_app", "--host", "0.0.0.0", "--port", "8080"]Deploy:
gcloud run deploy adk-assistant \ --source . \ --region us-central1 \ --set-env-vars SESSION_DB_URL=postgresql+asyncpg://user:pass@host/db \ --set-secrets GOOGLE_API_KEY=adk-api-key:latest \ --allow-unauthenticatedObservability — OpenTelemetry
Section titled “Observability — OpenTelemetry”ADK emits OpenTelemetry traces, metrics, and logs via google.adk.telemetry. Every run_async call creates a root span named adk.invocation; tool calls and agent-to-agent transfers create child spans.
Using the ADK-native maybe_set_otel_providers
Section titled “Using the ADK-native maybe_set_otel_providers”The recommended way to wire exporters is maybe_set_otel_providers from google.adk.telemetry.setup. It accepts OTelHooks bundles — dataclasses that carry span processors, metric readers, and log processors — and respects providers that were already set (it will not override an existing provider).
from opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.sdk.metrics.export import PeriodicExportingMetricReaderfrom opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from google.adk.telemetry.setup import OTelHooks, maybe_set_otel_providers
# ── OTLP collector (Jaeger, Tempo, Grafana, etc.) ─────────────────────────────from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterfrom opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporterfrom opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
hooks = OTelHooks( span_processors=[BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))], metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter(endpoint="http://otel-collector:4317"))], log_record_processors=[BatchLogRecordProcessor(OTLPLogExporter(endpoint="http://otel-collector:4317"))],)
maybe_set_otel_providers([hooks])Set OTEL_EXPORTER_OTLP_ENDPOINT in the environment and maybe_set_otel_providers([]) will pick it up automatically via the standard OTel SDK env var — no exporter objects needed.
Google Cloud Trace, Monitoring, and Logging
Section titled “Google Cloud Trace, Monitoring, and Logging”from google.adk.telemetry.google_cloud import get_gcp_exportersfrom google.adk.telemetry.setup import maybe_set_otel_providers
gcp_hooks = get_gcp_exporters( enable_cloud_tracing=True, enable_cloud_metrics=True, enable_cloud_logging=True, # google_auth=(credentials, project_id) # optional; defaults to google.auth.default())
maybe_set_otel_providers([gcp_hooks])get_gcp_exporters wires Cloud Trace, Cloud Monitoring, and Cloud Logging via the standard GCP OTel exporters. Install them first:
pip install opentelemetry-exporter-gcp-trace opentelemetry-exporter-gcp-monitoring opentelemetry-exporter-gcp-loggingWiring telemetry in a FastAPI app
Section titled “Wiring telemetry in a FastAPI app”Call maybe_set_otel_providers inside lifespan so it runs before the first ADK invocation:
from contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom google.adk.telemetry.google_cloud import get_gcp_exportersfrom google.adk.telemetry.setup import maybe_set_otel_providers
@asynccontextmanagerasync def lifespan(app: FastAPI): maybe_set_otel_providers([get_gcp_exporters(enable_cloud_tracing=True)]) yield
web_app = FastAPI(lifespan=lifespan)What ADK instruments (span names emitted by google.adk.telemetry):
| Span | Key attributes |
|---|---|
adk.invocation | invocation_id, session_id, user_id, app_name |
adk.agent.<name> | agent_name, agent_class |
adk.tool.<name> | tool_name, tool_type |
adk.model.<model> | model, prompt_tokens, output_tokens |
Alternative model providers
Section titled “Alternative model providers”LiteLlm — any OpenAI-compatible endpoint
Section titled “LiteLlm — any OpenAI-compatible endpoint”LiteLlm routes through LiteLLM, giving access to OpenAI, Anthropic, Cohere, Together.ai, Ollama, and hundreds of other providers. Requires pip install google-adk[extensions].
from google.adk.models.lite_llm import LiteLlmfrom google.adk.agents import LlmAgentimport os
# ── OpenAI GPT-4o ─────────────────────────────────────────────────────────────os.environ["OPENAI_API_KEY"] = "sk-..."
agent = LlmAgent( name="gpt_agent", model=LiteLlm(model="openai/gpt-4o"), instruction="You are a helpful assistant.",)
# ── Anthropic Claude (via LiteLLM) ─────────────────────────────────────────────os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
agent = LlmAgent( name="claude_litellm", model=LiteLlm(model="anthropic/claude-opus-4-5"), instruction="You are a thoughtful assistant.",)
# ── Azure OpenAI ──────────────────────────────────────────────────────────────os.environ["AZURE_API_KEY"] = "..."os.environ["AZURE_API_BASE"] = "https://my-resource.openai.azure.com"os.environ["AZURE_API_VERSION"] = "2024-02-01"
agent = LlmAgent( name="azure_agent", model=LiteLlm(model="azure/gpt-4o"), instruction="Answer questions concisely.",)
# ── Local Ollama ───────────────────────────────────────────────────────────────agent = LlmAgent( name="local_agent", model=LiteLlm(model="ollama/llama3.2", api_base="http://localhost:11434"), instruction="Run locally.",)LiteLlm constructor args: model (LiteLLM model string), api_base (custom endpoint), plus any LiteLLM kwarg (temperature, timeout, num_retries, etc.).
Anthropic Claude via AnthropicLlm (direct)
Section titled “Anthropic Claude via AnthropicLlm (direct)”AnthropicLlm is a direct Anthropic SDK integration (no LiteLLM proxy). Requires pip install google-adk[extensions].
from google.adk.models.anthropic_llm import Claudefrom google.adk.agents import LlmAgentimport os
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
agent = LlmAgent( name="claude_direct", model=Claude(model="claude-opus-4-5"), # or "claude-sonnet-4-6" / "claude-haiku-4-5-20251001" instruction="You are a helpful assistant.",)The "claude-" prefix is registered in LLMRegistry, so a bare string also works:
agent = LlmAgent(name="a", model="claude-opus-4-5")Mixing models in a multi-agent system
Section titled “Mixing models in a multi-agent system”Each agent can use a different model — route expensive reasoning to a powerful model and simple triage to a cheap one:
from google.adk.agents import LlmAgentfrom google.adk.models.lite_llm import LiteLlmfrom google.adk.models.anthropic_llm import Claude
triage = LlmAgent( name="triage", model="gemini-2.5-flash", # fast and cheap instruction="Classify the request as 'technical' or 'billing'.",)
technical = LlmAgent( name="technical", model=Claude(model="claude-opus-4-5"), # deep reasoning description="Handles complex technical questions.", instruction="Provide a detailed technical answer with code examples.",)
billing = LlmAgent( name="billing", model=LiteLlm(model="openai/gpt-4o-mini"), # cost-optimised description="Handles billing enquiries.", instruction="Look up the user's account and answer billing questions.",)
root = LlmAgent( name="router", model="gemini-2.5-flash", instruction="Route requests to the right specialist.", sub_agents=[technical, billing],)
## Security best practices
| Practice | Implementation ||---|---|| API keys / secrets | `google.cloud.secretmanager`; inject as env vars at deploy time || Least-privilege IAM | Use per-agent service accounts; `roles/aiplatform.user` for Vertex; `roles/storage.objectViewer` for GCS artifacts || Input sanitisation | Use a `before_model_callback` plugin to strip PII before it reaches the model || Tool guardrails | `before_tool_callback` plugin that returns `{"error": "..."}` for blocked operations || Egress control | VPC Service Controls or a proxy allowlist for tool HTTP calls || Credential rotation | Store credentials in Secret Manager with version rotation; reference via `AuthConfig.credential_key` |
## Gotchas
- `BuiltInPlanner.thinking_config` takes precedence over `generate_content_config.thinking_config` on the same agent — a `UserWarning` is emitted if both are set.- `PlanReActPlanner` requires the model to follow tag conventions (`/*PLANNING*/` etc.). Smaller or non-instruction-tuned models may not comply reliably.- `BaseTool._get_declaration` returning `None` means no function schema is added to `LlmRequest.config.tools`. This is intentional for truly built-in (Gemini-side) tools — call `process_llm_request` instead to inject them.- `BaseToolset.get_tools_with_prefix` is `@final` — override `get_tools` only.- Authenticated tools that call `tool_context.request_credential` **pause** the current invocation. ADK stores the credential request in the event stream; the next user turn should carry the credential response.- `AuthConfig.credential_key` defaults to a SHA-256 digest of the auth scheme + raw credential. Two toolsets with identical configs share the same stored credential automatically.