Skip to content

Microsoft Agent Framework (Python) — MCP Integration

Model Context Protocol (MCP) servers are first-class tool sources in agent-framework. Three transports ship in agent_framework:

ClassTransportTypical use
MCPStdioToolSubprocess over stdioLocal tools — filesystem, git, npm-hosted servers
MCPStreamableHTTPToolStreamable HTTP (SSE)Remote / hosted MCP services
MCPWebsocketToolWebSocketBidirectional streaming services

All three are async context managers that connect lazily, discover tools and prompts from the server, and register them as FunctionTool instances on the agent.

Verified against agent-framework-core==1.6.0 and mcp==1.27.

The mcp package is required for any MCP tool:

Terminal window
pip install agent-framework # pulls mcp transitively
# or, for pruned installs:
pip install agent-framework-core mcp
pip install 'mcp[ws]' # only if you need MCPWebsocketTool
import asyncio
from agent_framework import Agent, MCPStdioTool
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
async with MCPStdioTool(
name="filesystem",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
description="Read and write files under /tmp",
) as fs:
agent = Agent(
client=OpenAIChatClient(),
instructions="You help the user manage files in /tmp.",
tools=fs,
)
response = await agent.run("List the files in /tmp and summarise their names.")
print(response.text)
asyncio.run(main())

Notes:

  • name is a tool group name — it becomes the prefix for the tools exposed to the model (e.g. filesystem_read_file). Override with tool_name_prefix="fs" to pick a shorter prefix.
  • command + args + env are forwarded to mcp.client.stdio.StdioServerParameters.
  • Use async with so the subprocess is cleaned up when the agent finishes.

Passing environment + encoding to a stdio child process

Section titled “Passing environment + encoding to a stdio child process”

MCPStdioTool forwards env, args, and encoding straight through to StdioServerParameters. Anything else — buffering knobs, custom cwd, etc. — comes through **kwargs (the constructor merges it with the explicit args before constructing StdioServerParameters). Use it to ship secrets to a child server or to pin the wire encoding when running on Windows:

import os
from agent_framework import MCPStdioTool
postgres_mcp = MCPStdioTool(
name="pg",
command="uvx",
args=["mcp-server-postgres"],
env={
# Passed to the MCP child as environment variables. The agent process
# still reads them here, so treat them as sensitive in logs/traces;
# the model won't see them unless you explicitly forward them.
"DATABASE_URL": os.environ["DATABASE_URL"],
"PGPASSWORD": os.environ["DB_PASSWORD"],
},
encoding="utf-8", # Avoid Windows cp1252 mojibake on logs / SQL output.
request_timeout=15, # Per-MCP-call timeout (seconds), independent of the child.
)

Treat the spawned process like any other dependency — log its stderr through your normal subprocess plumbing if the MCP server doesn’t already forward useful diagnostics over the JSON-RPC channel.

from agent_framework import Agent, MCPStreamableHTTPTool
from agent_framework.openai import OpenAIChatClient
async with MCPStreamableHTTPTool(
name="learn",
url="https://learn.microsoft.com/api/mcp",
description="Search official Microsoft Learn documentation.",
request_timeout=30,
) as learn:
agent = Agent(
client=OpenAIChatClient(),
instructions="Use the learn tool to answer Microsoft documentation questions.",
tools=learn,
)
response = await agent.run("How do I configure FoundryChatClient with Entra?")

Per-request headers (auth tokens, tenant IDs)

Section titled “Per-request headers (auth tokens, tenant IDs)”

Use header_provider to inject a header derived from function_invocation_kwargs on the outer agent.run(...) call. This avoids building a new httpx.AsyncClient per tenant.

from agent_framework import Agent, MCPStreamableHTTPTool
from agent_framework.openai import OpenAIChatClient
mcp = MCPStreamableHTTPTool(
name="billing-api",
url="https://mcp.example.com",
header_provider=lambda kwargs: {"Authorization": f"Bearer {kwargs['token']}"},
)
async with mcp:
agent = Agent(client=OpenAIChatClient(), tools=mcp)
await agent.run(
"What's my balance?",
function_invocation_kwargs={"token": user_token},
)

For custom TLS, retries, or observability pass an httpx.AsyncClient:

import httpx
client = httpx.AsyncClient(timeout=30, verify="/etc/ssl/corp-ca.pem")
mcp = MCPStreamableHTTPTool(name="internal", url="https://mcp.corp/api", http_client=client)

MCPWebsocketTool opens a persistent bidirectional WebSocket connection to an MCP server. Requires pip install 'mcp[ws]'.

import asyncio
from agent_framework import Agent, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
async with MCPWebsocketTool(
name="realtime",
url="wss://service.example.com/mcp",
description="Real-time event streaming service",
tool_name_prefix="rt", # prefix exposed tool names, e.g. rt_subscribe
approval_mode="never_require", # or "always_require"
request_timeout=30, # seconds per MCP call, default 30
) as rt:
agent = Agent(client=OpenAIChatClient(), tools=rt)
response = await agent.run("Subscribe to order events for account acct-123")
print(response.text)
asyncio.run(main())

MCPWebsocketTool forwards extra keyword arguments to the underlying WebSocket client. Pass additional_headers (or any kwarg your WebSocket library accepts) to authenticate:

import os
from agent_framework import MCPWebsocketTool
ws_tool = MCPWebsocketTool(
name="events",
url="wss://events.example.com/mcp",
description="Authenticated event stream",
request_timeout=60,
additional_headers={
"Authorization": f"Bearer {os.environ['EVENTS_TOKEN']}",
"X-Client-Id": "agent-service",
},
)

Per-tool approval with MCPSpecificApproval

Section titled “Per-tool approval with MCPSpecificApproval”

Use MCPSpecificApproval to gate only the write/action tools while letting read tools run freely:

from agent_framework import MCPWebsocketTool, MCPSpecificApproval
ws_approval: MCPSpecificApproval = {
"always_require_approval": ["rt_publish_event", "rt_delete_subscription"],
"never_require_approval": ["rt_subscribe", "rt_list_subscriptions"],
}
async with MCPWebsocketTool(
name="realtime",
url="wss://service.example.com/mcp",
approval_mode=ws_approval,
) as rt:
agent = Agent(client=OpenAIChatClient(), tools=rt)

When a tool requires approval, the workflow emits a function_approval_request event. Respond with event.data.to_function_approval_response(approved=True) and re-run — see the HITL page for the full loop.

Note: MCPWebsocketTool keeps a single persistent connection open for the lifetime of the async with block. If the server drops the connection, call await rt.connect(reset=True) to re-establish it. WebSocket servers that rotate tokens will require you to rebuild the tool instance with the new credentials.

The approval_mode parameter accepts four shapes — three string sentinels and one typed dict:

ValueEffect
"always_require"Every tool invocation emits a function_approval_request event.
"never_require"Bypass approval entirely — the tool runs as soon as the model calls it.
MCPSpecificApproval (dict)Per-tool whitelist / blacklist.
None (default)Inherit the server’s default — usually "never_require".

For per-tool control, use the typed dict — it gives you static type checking and the exact key names the framework expects:

from agent_framework import MCPStdioTool, MCPSpecificApproval
# Use the TypedDict for the dict literal — IDE autocompletes the keys.
git_approval: MCPSpecificApproval = {
"always_require_approval": ["git_push", "git_reset", "git_force_push"],
"never_require_approval": ["git_status", "git_diff", "git_log"],
}
mcp = MCPStdioTool(
name="git",
command="uvx",
args=["mcp-server-git"],
approval_mode=git_approval,
)

Tools listed in both lists require approval (the safe default). Tools not in either list inherit the server-side default. When approval is required the workflow emits a function_approval_request event; respond with event.data.to_function_approval_response(approved=True) and re-run with that response. See the Human-in-the-loop page for the full loop.

Pair the per-tool whitelist with a FunctionMiddleware that logs every approval-required call so a security team can review history asynchronously:

import json
import logging
from agent_framework import FunctionMiddleware, FunctionInvocationContext
audit_log = logging.getLogger("agent.audit")
class ApprovalAudit(FunctionMiddleware):
"""Log structured records for every tool that requires approval, before it runs."""
async def process(self, context: FunctionInvocationContext, call_next) -> None:
if context.function.approval_mode == "always_require":
audit_log.info(
"approval_required",
extra={
"tool": context.function.name,
"args": json.dumps(context.arguments or {}, default=str),
"session_id": (context.session.session_id if context.session else None),
},
)
await call_next()
agent = Agent(
client=OpenAIChatClient(),
middleware=[ApprovalAudit()],
tools=[mcp],
)

Combine with structured logging (JSON output → SIEM) and you have an auditable record of every privileged MCP invocation.

MCP servers sometimes expose hundreds of tools. Restrict what the model sees:

mcp = MCPStreamableHTTPTool(
name="github",
url="https://mcp.github.com",
allowed_tools=["list_issues", "create_issue", "get_pr"],
)

Or disable MCP prompts entirely when you only want tools:

MCPStdioTool(name="fs", command="...", load_prompts=False)

After entering the async with block the tool populates functions (and optionally prompts). Use this to log what’s been loaded or to build a runtime tool picker:

async with MCPStdioTool(
name="github",
command="npx",
args=["-y", "@modelcontextprotocol/server-github"],
) as mcp:
for fn in mcp.functions:
print(f"{fn.name}: {(fn.description or '')[:60]}")
# Prompts advertised by the server (if load_prompts was True):
for prompt in getattr(mcp, "prompts", []):
print(f"prompt {prompt.name}: {prompt.description}")

The default parser coerces MCP CallToolResult into a string for the model. Override it when the server returns structured data you want to surface as multi-part content (images plus alt text, JSON plus a summary, etc.):

from mcp import types
from agent_framework import Content, MCPStreamableHTTPTool
def parse_image_result(result: types.CallToolResult) -> list[Content]:
out: list[Content] = []
for c in result.content:
if isinstance(c, types.TextContent):
out.append(Content.from_text(c.text))
elif isinstance(c, types.ImageContent):
# MCP images come as base64 — preserve them as a data URI.
data_uri = f"data:{c.mimeType};base64,{c.data}"
out.append(Content.from_uri(uri=data_uri, media_type=c.mimeType))
elif isinstance(c, types.EmbeddedResource):
res = c.resource
text = getattr(res, "text", None)
if text:
out.append(Content.from_text(text))
if result.isError:
out.append(Content.from_error(message="MCP server returned an error"))
return out
mcp = MCPStreamableHTTPTool(
name="diagrammer",
url="https://diagrammer.example.com/mcp",
parse_tool_results=parse_image_result,
)

Per-tool overrides are also possible — connect once, inspect mcp.functions, then set result_parser on the individual FunctionTool instances you care about:

async with MCPStreamableHTTPTool(name="analytics", url="https://analytics.example.com/mcp") as mcp:
# Find the specific tool and override only its parser.
query = next(f for f in mcp.functions if f.name == "analytics_run_query")
query.result_parser = lambda r: Content.from_text(f"rows: {len(r.content)}")
agent = Agent(client=OpenAIChatClient(), tools=mcp)

You don’t need an agent to invoke MCP tools — call_tool() and get_prompt() let you drive the MCP server directly from your own code. Useful for testing, for orchestrators that don’t use the agent tool loop, and for inspecting what a tool returns before wiring it to an agent:

import asyncio
from agent_framework import MCPStdioTool, MCPStreamableHTTPTool
async def direct_invocation() -> None:
# Stdio example — query a filesystem server directly
async with MCPStdioTool(
name="fs",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
) as fs:
# List what tools are available after connection
for fn in fs.functions:
print(fn.name, "", (fn.description or "")[:60])
# Call a tool by name; kwargs match the tool's JSON schema parameters
result = await fs.call_tool("fs_list_directory", path="/tmp")
print("directory listing:", result)
# If the server exposes prompts, fetch one
if getattr(fs, "prompts", []):
prompt_text = await fs.get_prompt(
fs.prompts[0].name,
path="/tmp",
)
print("prompt:", prompt_text[:200])
async def http_direct() -> None:
# HTTP example — call a remote tool without an agent
async with MCPStreamableHTTPTool(
name="learn",
url="https://learn.microsoft.com/api/mcp",
allowed_tools=["search"],
) as learn:
# Verify the server is connected
assert learn.is_connected
result = await learn.call_tool("search", query="FoundryChatClient authentication")
print(result)
asyncio.run(direct_invocation())
asyncio.run(http_direct())

call_tool returns either a str (when the default parser is used) or list[Content] (when parse_tool_results= was set on the tool). The tool must be in allowed_tools (if that param is set) and the session must be connected — otherwise the call raises.

get_prompt returns the rendered prompt text as a str. Prompts are only loaded when load_prompts=True (the default); disable with MCPStdioTool(..., load_prompts=False) to skip the prompts handshake entirely on servers that don’t use them.

mcp.is_connected is True after a successful connect() or __aenter__ and False after close() or __aexit__. You can also reconnect after a transport error:

async with MCPStreamableHTTPTool(name="api", url="https://api.example.com/mcp") as mcp:
print(mcp.is_connected) # True
print(len(mcp.functions)) # number of tools loaded from the server
# Outside the context manager:
print(mcp.is_connected) # False

Some MCP servers call back into the client to perform model sampling on the client’s behalf — for example, a planning server might ask the client to generate sub-task descriptions. Pass a client= parameter so the framework can satisfy those server-initiated sampling requests:

from agent_framework import Agent, MCPStreamableHTTPTool
from agent_framework.openai import OpenAIChatClient
# The client= parameter is used when the MCP server sends a sampling request
# back to your process. The framework routes it through OpenAIChatClient,
# which means the same model/deployment handles both agent calls and server
# callbacks — keeping billing and logging in one place.
mcp = MCPStreamableHTTPTool(
name="planner",
url="https://planner.example.com/mcp",
client=OpenAIChatClient(model="gpt-4o"),
)
async with mcp:
agent = Agent(client=OpenAIChatClient(), tools=mcp)
response = await agent.run("Build a week-long study plan for distributed systems.")
print(response.text)

When no client= is passed, server-side sampling callbacks raise NotImplementedError — leaving it unset is safe for servers that don’t use sampling.

Hosted MCP — the SupportsMCPTool protocol

Section titled “Hosted MCP — the SupportsMCPTool protocol”

The three MCP classes above run the MCP client in your process. Some providers (notably OpenAI’s Responses API via certain deployments, and the Foundry model garden) can run the MCP client server-side — you tell the provider the MCP URL, the provider opens the connection, discovers tools, and calls them for you. No subprocess or HTTP client in your Python process.

Chat clients that support this implement SupportsMCPTool. Feature-detect at runtime before calling get_mcp_tool(...):

from agent_framework import Agent, SupportsMCPTool
from agent_framework.openai import OpenAIChatClient
client = OpenAIChatClient(model="gpt-5")
if isinstance(client, SupportsMCPTool):
mcp_tool = client.get_mcp_tool(
name="learn",
url="https://learn.microsoft.com/api/mcp",
)
agent = Agent(
client=client,
instructions="Answer Microsoft documentation questions.",
tools=[mcp_tool],
)
response = await agent.run("How does DefaultAzureCredential pick a credential?")
else:
# Fall back to in-process MCP.
from agent_framework import MCPStreamableHTTPTool
async with MCPStreamableHTTPTool(name="learn", url="https://learn.microsoft.com/api/mcp") as mcp_tool:
agent = Agent(client=client, tools=[mcp_tool])
response = await agent.run("...")

When to prefer hosted MCP:

  • The MCP server and the model provider already have a trust relationship — no need to re-mint auth tokens in your code.
  • You don’t want to manage a long-running HTTP client or handle reconnects.
  • Latency matters — the provider can often keep a warm connection open across requests.

When to stick with in-process (MCPStdioTool / MCPStreamableHTTPTool / MCPWebsocketTool):

  • You need header_provider= for multi-tenant auth — hosted MCP typically doesn’t support per-request headers.
  • You want approval_mode gating every individual tool — this is an in-process feature.
  • You need to parse CallToolResult with a custom parse_tool_results= callback.

The SupportsMCPTool protocol is runtime_checkable, so the isinstance(...) guard is a normal runtime check — no stub subclassing needed. See the Advanced → Capability protocols section for the full set of Supports* protocols (file search, web search, code interpreter, image generation).

The async with mcp_tool: idiom handles connect + close for you. For long-running servers where you want the same tool instance to survive across many agent invocations, manage the lifecycle explicitly:

from agent_framework import MCPStreamableHTTPTool
from agent_framework.openai import OpenAIChatClient
mcp = MCPStreamableHTTPTool(name="learn", url="https://learn.microsoft.com/api/mcp")
client = OpenAIChatClient() # reuse one HTTP pool
await mcp.connect() # open once
try:
for _ in range(100):
agent = Agent(client=client, tools=mcp) # reuses the open session
await agent.run("")
finally:
await mcp.close() # close once

If the remote server drops the connection or rotates a token, call connect(reset=True) to tear down the stale session and open a fresh one without replacing the tool instance:

try:
response = await agent.run("Query the MCP server")
except ConnectionError:
await mcp.connect(reset=True) # reset + reconnect in one call
response = await agent.run("Query the MCP server")

Pair reset=True with your own backoff logic when you want a long-lived MCP tool that self-heals across transient failures — the default async with scope can’t cover that because it wants a clean open/close.

terminate_on_close=True (the default for HTTP transports) closes the underlying httpx connection when close() runs. Set it to False when your http_client= is shared with other callers and you don’t want the tool to terminate the pool:

import httpx
from agent_framework import MCPStreamableHTTPTool
shared = httpx.AsyncClient(timeout=30) # used by other parts of your app
mcp = MCPStreamableHTTPTool(
name="internal",
url="https://mcp.corp/api",
http_client=shared,
terminate_on_close=False, # don't kill the shared client
request_timeout=15, # per-request timeout (seconds)
)

request_timeout applies to every MCP call (tool invocation, prompt fetch, server ping) — set it independently of the httpx client’s global timeout when you want tighter per-call bounds.

Flip the direction — let other agents consume yours over MCP. Every Agent (and concrete BaseAgent subclass) ships an as_mcp_server() helper that wraps the agent in an mcp.server.lowlevel.Server exposing one tool. Drop it into any MCP runner — stdio, the official mcp dev harness, or a hosted streamable-HTTP service:

import anyio
from mcp.server.stdio import stdio_server
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(
client=OpenAIChatClient(),
name="docs_agent",
description="Answers questions about our internal documentation.",
instructions="Only answer using the indexed docs.",
)
server = agent.as_mcp_server(
server_name="docs-mcp",
version="1.0.0",
instructions="Use docs_agent for any question about internal docs.",
)
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
anyio.run(main)

What the wrapper does for you:

  • Calls agent.as_tool() internally to produce the single advertised tool.
  • Wires list_tools, call_tool, and set_logging_level handlers on the MCP server.
  • Forwards the agent’s name / description to the MCP tool surface.
  • Maps the agent’s text output to MCP TextContent. Image/audio outputs are dropped with a warning — MCP server-side rich content forwarding isn’t implemented yet.

Now any MCP client — Claude Desktop, an LLM IDE, or another Agent configured with MCPStdioTool / MCPStreamableHTTPTool — can drive the agent through the standard tool surface. This is the cleanest way to publish a single-purpose agent for cross-language consumption.

as_mcp_server() plus stdio plus another agent’s MCPStdioTool lets you compose two agents over MCP locally without any HTTP plumbing. Useful for pipeline-style architectures where each step is a small, replaceable agent:

# child_agent.py — packaged as an executable script
import anyio
from mcp.server.stdio import stdio_server
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
async def main() -> None:
agent = Agent(client=OpenAIChatClient(), name="summariser", instructions="Summarise.")
server = agent.as_mcp_server(server_name="summariser")
async with stdio_server() as (r, w):
await server.run(r, w, server.create_initialization_options())
anyio.run(main)
supervisor.py
async with MCPStdioTool(
name="summariser",
command="python",
args=["child_agent.py"],
) as child:
supervisor = Agent(client=OpenAIChatClient(), name="supervisor", tools=child)
response = await supervisor.run("Summarise the attached doc.")

The two processes talk over MCP — kill or restart the child without touching the supervisor.

For HTTP/SSE deployments, the agent_framework.devui and agent_framework_chatkit hosting packages turn as_mcp_server() output into a streamable-HTTP endpoint with auth, multi-session routing, and OpenTelemetry tracing — see those sub-packages for production recipes.

Multi-MCP agent. Pass a list — every MCP tool’s public functions are aggregated under its own prefix:

async with (
MCPStdioTool(name="fs", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]) as fs,
MCPStreamableHTTPTool(name="learn", url="https://learn.microsoft.com/api/mcp") as learn,
):
agent = Agent(client=OpenAIChatClient(), tools=[fs, learn])

Tool + MCP mix. MCP tools combine with plain @tool-decorated functions:

from agent_framework import tool
@tool
def summarise(text: str) -> str:
return " ".join(text.split()[:50])
async with MCPStdioTool(name="fs", command="...") as fs:
agent = Agent(client=OpenAIChatClient(), tools=[fs, summarise])

Quarantine risky servers. Combine approval_mode="always_require" with function middleware that logs every invocation before approval.