How to Build a Model Context Protocol (MCP) Tool Registry From Scratch: Versioning, Discoverability, and Safe Hot-Swapping in Production
I have enough material. Now I'll write the comprehensive blog post using my expertise in MCP architecture, backend engineering, and production deployment patterns.
The Model Context Protocol has quietly become the backbone of how production AI agents discover and invoke external tools. Since Anthropic introduced it in late 2024, MCP has matured from a clever idea into an industry-wide standard: nearly every serious agentic framework in 2026 speaks MCP natively. But here's the problem nobody talks about at conferences.
Most teams treat their MCP tool servers like static config files. They deploy a list of tools, wire them into their agent, and then pray nothing breaks when they need to update one. When they inevitably do need to update, they take downtime. They break running sessions. They introduce subtle version mismatches that cause agents to call a tool with a schema that no longer exists on the server.
This guide is about doing it properly. We're going to build a production-grade MCP Tool Registry from scratch: one that supports semantic versioning, dynamic discoverability, and zero-downtime hot-swapping of tool implementations. By the end, you'll have a system where you can deploy a new version of a tool, gradually shift agent traffic to it, and roll back in seconds if something goes wrong. No restarts. No dropped sessions. No 3 AM incidents.
This is a backend engineer's guide. We'll write real code, make real architectural decisions, and talk about the tradeoffs honestly.
Understanding the Problem: Why a Naive MCP Setup Breaks in Production
Before we build the solution, let's be precise about the failure modes we're solving for. A typical naive MCP setup looks like this:
- One MCP server process exposes a fixed list of tools via
tools/list - The agent client connects at startup and caches the tool manifest
- Tool implementations live directly in the server process
- Updates require a full server restart and client reconnection
This works fine for a weekend prototype. In production, it creates four distinct failure categories:
- Schema drift: The agent holds a cached tool schema that diverges from what the server now expects. The agent calls
search_documentswith aqueryfield; the new server expectssearch_query. The call fails, the agent halts, the user is confused. - Deployment downtime: Any update to any tool forces a restart of the entire MCP server, breaking every active agent session simultaneously.
- No rollback path: If a new tool implementation has a bug, you have no clean way to revert without redeploying the old binary and restarting again.
- Zero discoverability: There's no way for a new agent instance to ask "what tools are available right now, and which version of each should I use?"
The solution is to treat your MCP tools the same way mature microservices teams treat APIs: with a registry, versioning contracts, and traffic management. Let's build it.
Architecture Overview: The Three-Layer Registry Model
Our registry is composed of three layers that work together:
- Layer 1: The Registry Store. A persistent, consistent data store that holds tool metadata, schemas, and version history. This is the source of truth.
- Layer 2: The Registry API. An HTTP service that exposes endpoints for registering tools, querying the catalog, and managing version routing rules. This is the control plane.
- Layer 3: The MCP Gateway. A long-running MCP server that delegates tool calls to the appropriate backend implementation based on registry routing rules. This is the data plane.
The key insight is the separation of the control plane (the Registry API) from the data plane (the MCP Gateway). You can update routing rules in the control plane without touching the gateway process. The gateway polls or subscribes to routing changes and updates its internal dispatch table in memory, with no restart required. That's how hot-swapping works.
Step 1: Design the Tool Registry Schema
Start with your data model. Every registered tool needs the following fields. We'll use PostgreSQL for the registry store because we need ACID guarantees on version state transitions.
-- tools: the canonical identity of a tool
CREATE TABLE tools (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL UNIQUE, e.g. "search_documents"
description TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
-- tool_versions: each versioned implementation of a tool
CREATE TABLE tool_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tool_id UUID NOT NULL REFERENCES tools(id),
version TEXT NOT NULL, semver: "1.2.0"
input_schema JSONB NOT NULL, JSON Schema for input validation
output_schema JSONB NOT NULL, JSON Schema for output validation
endpoint_url TEXT NOT NULL, where to dispatch calls
status TEXT NOT NULL , 'active', 'deprecated', 'retired'
CHECK (status IN ('active', 'deprecated', 'retired')),
deployed_at TIMESTAMPTZ DEFAULT now(),
deprecated_at TIMESTAMPTZ,
UNIQUE (tool_id, version)
);
-- routing_rules: controls which version receives traffic
CREATE TABLE routing_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tool_id UUID NOT NULL REFERENCES tools(id),
primary_version TEXT NOT NULL, receives 100% or (100 - canary_pct)%
canary_version TEXT, optional canary target
canary_pct INT DEFAULT 0 , 0-100
CHECK (canary_pct BETWEEN 0 AND 100),
updated_at TIMESTAMPTZ DEFAULT now()
);A few important design decisions here. The routing_rules table is separate from tool_versions because routing is an operational concern, not a versioning concern. You can change which version gets traffic without touching the version record itself. The canary_pct column gives you a built-in mechanism for gradual traffic shifting, which is the foundation of safe hot-swapping.
Step 2: Build the Registry API
The Registry API is a standard REST service. Here's a minimal but complete implementation in Python using FastAPI. We'll keep it focused on the endpoints that matter most for the hot-swap workflow.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncpg, os
app = FastAPI(title="MCP Tool Registry")
DB_URL = os.environ["DATABASE_URL"]
class ToolVersionCreate(BaseModel):
tool_name: str
description: str
version: str
input_schema: dict
output_schema: dict
endpoint_url: str
class RoutingUpdate(BaseModel):
primary_version: str
canary_version: Optional[str] = None
canary_pct: int = 0
@app.on_event("startup")
async def startup():
app.state.db = await asyncpg.create_pool(DB_URL)
# Register a new tool version
@app.post("/tools/{tool_name}/versions")
async def register_version(tool_name: str, payload: ToolVersionCreate):
async with app.state.db.acquire() as conn:
# Upsert the tool identity
tool = await conn.fetchrow(
"""
INSERT INTO tools (name, description)
VALUES ($1, $2)
ON CONFLICT (name) DO UPDATE SET description = EXCLUDED.description
RETURNING id
""",
tool_name, payload.description
)
# Insert the new version
await conn.execute(
"""
INSERT INTO tool_versions
(tool_id, version, input_schema, output_schema, endpoint_url, status)
VALUES ($1, $2, $3, $4, $5, 'active')
""",
tool["id"], payload.version,
payload.input_schema, payload.output_schema,
payload.endpoint_url
)
return {"status": "registered", "tool": tool_name, "version": payload.version}
# Update routing rules (the hot-swap trigger)
@app.put("/tools/{tool_name}/routing")
async def update_routing(tool_name: str, payload: RoutingUpdate):
async with app.state.db.acquire() as conn:
tool = await conn.fetchrow(
"SELECT id FROM tools WHERE name = $1", tool_name
)
if not tool:
raise HTTPException(status_code=404, detail="Tool not found")
await conn.execute(
"""
INSERT INTO routing_rules
(tool_id, primary_version, canary_version, canary_pct)
VALUES ($1, $2, $3, $4)
ON CONFLICT (tool_id) DO UPDATE
SET primary_version = EXCLUDED.primary_version,
canary_version = EXCLUDED.canary_version,
canary_pct = EXCLUDED.canary_pct,
updated_at = now()
""",
tool["id"], payload.primary_version,
payload.canary_version, payload.canary_pct
)
return {"status": "routing updated", "tool": tool_name}
# The catalog endpoint: what the MCP Gateway polls
@app.get("/catalog")
async def get_catalog():
async with app.state.db.acquire() as conn:
rows = await conn.fetch(
"""
SELECT
t.name, t.description,
tv.version, tv.input_schema, tv.output_schema,
tv.endpoint_url, tv.status,
rr.primary_version, rr.canary_version, rr.canary_pct
FROM tools t
JOIN tool_versions tv ON tv.tool_id = t.id
LEFT JOIN routing_rules rr ON rr.tool_id = t.id
WHERE tv.status = 'active'
ORDER BY t.name, tv.version
"""
)
return [dict(r) for r in rows]The GET /catalog endpoint is the most important one. It's what the MCP Gateway will poll every few seconds to keep its internal dispatch table current. Notice it joins routing rules so the gateway always knows which version is primary and whether a canary is active.
Step 3: Build the MCP Gateway with Hot-Swap Dispatch
The MCP Gateway is where the real engineering happens. It's a long-running MCP server that:
- Periodically fetches the catalog from the Registry API
- Maintains an in-memory dispatch table mapping tool names to versioned HTTP backends
- Handles incoming
tools/listandtools/callrequests from agent clients - Routes calls to the correct backend based on current routing rules, including canary splitting
import asyncio, random, httpx, json
from typing import Any
class DispatchTable:
"""Thread-safe, hot-swappable routing table for MCP tools."""
def __init__(self):
self._table: dict = {} # tool_name -> routing config
self._schemas: dict = {} # tool_name -> input/output schemas
self._lock = asyncio.Lock()
async def refresh(self, catalog: list[dict]):
"""Atomically replace the dispatch table from a fresh catalog."""
new_table = {}
new_schemas = {}
for entry in catalog:
name = entry["name"]
if name not in new_table:
new_table[name] = {
"primary_version": entry.get("primary_version"),
"canary_version": entry.get("canary_version"),
"canary_pct": entry.get("canary_pct", 0),
"versions": {}
}
new_schemas[name] = {
"description": entry["description"],
"input_schema": entry["input_schema"]
}
new_table[name]["versions"][entry["version"]] = entry["endpoint_url"]
async with self._lock:
self._table = new_table
self._schemas = new_schemas
def resolve_endpoint(self, tool_name: str) -> str | None:
"""Pick the correct backend URL using canary routing logic."""
config = self._table.get(tool_name)
if not config:
return None
canary_pct = config.get("canary_pct", 0)
canary_version = config.get("canary_version")
# Canary routing: send X% of traffic to canary version
if canary_version and canary_pct > 0:
if random.randint(1, 100) <= canary_pct:
url = config["versions"].get(canary_version)
if url:
return url
# Default: route to primary version
primary = config.get("primary_version")
return config["versions"].get(primary) if primary else None
def list_tools(self) -> list[dict]:
"""Return the MCP-compatible tool manifest for tools/list."""
tools = []
for name, schema in self._schemas.items():
tools.append({
"name": name,
"description": schema["description"],
"inputSchema": schema["input_schema"]
})
return tools
class MCPGateway:
def __init__(self, registry_url: str, poll_interval: int = 5):
self.registry_url = registry_url
self.poll_interval = poll_interval
self.dispatch = DispatchTable()
self._http = httpx.AsyncClient(timeout=10.0)
async def start_polling(self):
"""Background task: keep the dispatch table fresh."""
while True:
try:
resp = await self._http.get(f"{self.registry_url}/catalog")
resp.raise_for_status()
catalog = resp.json()
await self.dispatch.refresh(catalog)
except Exception as e:
print(f"[registry-poll] error: {e}")
await asyncio.sleep(self.poll_interval)
async def handle_tools_list(self) -> dict:
"""Respond to MCP tools/list requests."""
return {"tools": self.dispatch.list_tools()}
async def handle_tools_call(self, tool_name: str, arguments: dict) -> dict:
"""Dispatch a tool call to the correct versioned backend."""
endpoint = self.dispatch.resolve_endpoint(tool_name)
if not endpoint:
return {
"isError": True,
"content": [{"type": "text", "text": f"Tool '{tool_name}' not found in registry"}]
}
try:
resp = await self._http.post(
endpoint,
json={"tool": tool_name, "arguments": arguments},
headers={"Content-Type": "application/json"}
)
resp.raise_for_status()
result = resp.json()
return {"content": [{"type": "text", "text": json.dumps(result)}]}
except httpx.HTTPStatusError as e:
return {
"isError": True,
"content": [{"type": "text", "text": f"Backend error: {e.response.status_code}"}]
}
except Exception as e:
return {
"isError": True,
"content": [{"type": "text", "text": f"Dispatch error: {str(e)}"}]
}The critical design in DispatchTable.refresh() is the atomic swap. We build the entire new table in local variables first, then replace both _table and _schemas under a single lock acquisition. An agent calling tools/list will never see a partially updated state. It either sees the old table or the new one, never a mix.
Step 4: Implement the Safe Hot-Swap Workflow
Now let's put it all together into a concrete operational workflow. Here's the exact sequence of steps to deploy a new tool version with zero downtime.
Phase 1: Register the New Version
Deploy your new tool implementation as a separate HTTP service (a sidecar, a new container, a Lambda function, whatever fits your infrastructure). Then register it in the registry without touching the routing rules.
curl -X POST https://registry.internal/tools/search_documents/versions \
-H "Content-Type: application/json" \
-d '{
"tool_name": "search_documents",
"description": "Search the document corpus using semantic similarity",
"version": "2.1.0",
"input_schema": {
"type": "object",
"properties": {
"search_query": {"type": "string"},
"top_k": {"type": "integer", "default": 5}
},
"required": ["search_query"]
},
"output_schema": {
"type": "object",
"properties": {
"results": {"type": "array"}
}
},
"endpoint_url": "https://tools-v2.internal/search_documents"
}'At this point, version 2.1.0 is registered but receives zero traffic. The routing rule still points all traffic to 1.3.0. Nothing is broken. Nothing has changed from the agent's perspective.
Phase 2: Start the Canary
Send 10% of traffic to the new version to validate it in production.
curl -X PUT https://registry.internal/tools/search_documents/routing \
-H "Content-Type: application/json" \
-d '{
"primary_version": "1.3.0",
"canary_version": "2.1.0",
"canary_pct": 10
}'Within 5 seconds (one poll cycle), every MCP Gateway instance in your fleet picks up this change. 10% of search_documents calls now go to the new backend. Monitor your error rates, latency p99, and output quality metrics. If anything looks wrong, roll back instantly.
Phase 3: Progressive Rollout
Gradually increase the canary percentage. A typical schedule might be: 10% for 30 minutes, 25% for 30 minutes, 50% for 1 hour, then 100%.
# Shift to 100% on new version
curl -X PUT https://registry.internal/tools/search_documents/routing \
-H "Content-Type: application/json" \
-d '{
"primary_version": "2.1.0",
"canary_version": null,
"canary_pct": 0
}'Phase 4: Rollback (When You Need It)
If something goes wrong at any phase, rollback is a single API call that takes effect within one poll cycle.
# Emergency rollback: all traffic back to 1.3.0
curl -X PUT https://registry.internal/tools/search_documents/routing \
-H "Content-Type: application/json" \
-d '{
"primary_version": "1.3.0",
"canary_version": null,
"canary_pct": 0
}'No restarts. No redeployments. No broken sessions. The gateway picks up the new routing rule on its next poll and the old backend handles all traffic again within seconds.
Step 5: Add Schema Validation to Prevent Drift
Hot-swapping is only safe if you validate that tool call arguments match the schema of the version being invoked. Add this validation layer inside the gateway's dispatch logic.
from jsonschema import validate, ValidationError
async def handle_tools_call_validated(
self, tool_name: str, arguments: dict
) -> dict:
# Fetch the current schema for this tool
schema_info = self.dispatch._schemas.get(tool_name)
if not schema_info:
return {"isError": True, "content": [
{"type": "text", "text": f"Tool '{tool_name}' not found"}
]}
# Validate arguments against the registered input schema
try:
validate(instance=arguments, schema=schema_info["input_schema"])
except ValidationError as e:
return {"isError": True, "content": [
{"type": "text", "text": f"Schema validation failed: {e.message}"}
]}
# Proceed with dispatch
return await self.handle_tools_call(tool_name, arguments)This is especially important during canary rollouts where two different schema versions may be active simultaneously. If an agent was initialized with the v1.3.0 schema (which uses query as the field name) but gets routed to the v2.1.0 backend (which expects search_query), the validator catches the mismatch before it reaches the backend and returns a clean error instead of a cryptic 422 or 500.
Step 6: Implement Tool Discoverability for New Agent Instances
Discoverability is about more than just tools/list. A sophisticated agent system needs to be able to ask richer questions: "What tools are available for working with documents?", "Which tools are stable versus experimental?", "What changed between version 1.x and 2.x of this tool?"
Add a few more endpoints to the Registry API to support this.
# Search tools by capability tag
@app.get("/tools/search")
async def search_tools(tag: Optional[str] = None, status: str = "active"):
async with app.state.db.acquire() as conn:
rows = await conn.fetch(
"""
SELECT t.name, t.description, tv.version, tv.status
FROM tools t
JOIN tool_versions tv ON tv.tool_id = t.id
WHERE tv.status = $1
ORDER BY t.name
""",
status
)
return [dict(r) for r in rows]
# Get the full version history of a tool
@app.get("/tools/{tool_name}/versions")
async def list_versions(tool_name: str):
async with app.state.db.acquire() as conn:
rows = await conn.fetch(
"""
SELECT tv.version, tv.status, tv.deployed_at, tv.deprecated_at,
tv.input_schema, tv.output_schema
FROM tool_versions tv
JOIN tools t ON t.id = tv.tool_id
WHERE t.name = $1
ORDER BY tv.deployed_at DESC
""",
tool_name
)
if not rows:
raise HTTPException(status_code=404, detail="Tool not found")
return [dict(r) for r in rows]
# Get the changelog between two versions (requires a changelogs table)
@app.get("/tools/{tool_name}/changelog")
async def get_changelog(tool_name: str, from_version: str, to_version: str):
async with app.state.db.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT cl.summary, cl.breaking_changes, cl.migration_notes
FROM changelogs cl
JOIN tools t ON t.id = cl.tool_id
WHERE t.name = $1
AND cl.from_version = $2
AND cl.to_version = $3
""",
tool_name, from_version, to_version
)
if not row:
raise HTTPException(status_code=404, detail="Changelog not found")
return dict(row)The /changelog endpoint is particularly powerful when you have agents that need to adapt their behavior based on tool changes. An orchestration layer can fetch the changelog and include it in the agent's system prompt when a tool version changes, so the agent knows the field names have changed without requiring a full re-initialization.
Operational Considerations and Production Hardening
Poll Interval vs. Webhook Push
The 5-second polling interval in our gateway works well for most cases, but it creates a 5-second window where a rollback isn't fully in effect. For critical systems, replace polling with a webhook push model: the Registry API publishes routing change events to a message bus (Kafka, Redis Streams, or NATS), and each gateway instance subscribes and updates its dispatch table immediately. This reduces the propagation delay to under 100ms in practice.
Circuit Breaking on Tool Backends
Wrap each backend HTTP call in a circuit breaker. If the canary backend starts returning errors above a threshold, the circuit opens and the gateway automatically falls back to the primary version for affected calls. Libraries like circuitbreaker for Python or opossum for Node.js make this straightforward to add.
Versioning Strategy: When to Use Major vs. Minor Versions
Adopt a strict convention and enforce it in your CI pipeline:
- Patch (1.0.x): Bug fixes only. No schema changes. Safe for immediate full rollout.
- Minor (1.x.0): Additive schema changes (new optional fields). Backward compatible. Use a canary rollout.
- Major (x.0.0): Breaking schema changes (renamed fields, removed fields, changed types). Always requires a canary rollout and a migration period where both versions are simultaneously active.
Observability: What to Instrument
Instrument the gateway to emit the following metrics per tool call:
mcp_tool_calls_totallabeled bytool_name,version, andoutcome(success/error)mcp_tool_latency_secondslabeled bytool_nameandversion(histogram)mcp_schema_validation_failures_totallabeled bytool_namemcp_registry_poll_lag_seconds(how stale is the dispatch table?)
These metrics let you build dashboards that show, in real time, whether the canary version is performing better or worse than the primary. Set an alert on error rate delta between versions: if the canary error rate exceeds the primary by more than 5%, page someone and roll back.
Putting It All Together: The Complete Deployment Pipeline
Here's what a fully automated pipeline looks like when integrated with a CI/CD system like GitHub Actions or ArgoCD:
- Developer merges a PR that changes
search_documentsto use a new field name. - CI builds and pushes a new container image tagged
search_documents:2.1.0. - CD deploys the new container alongside the existing one (both are running).
- A post-deploy script calls
POST /tools/search_documents/versionsto register v2.1.0 in the registry. - Another script calls
PUT /tools/search_documents/routingwithcanary_pct: 10. - An automated canary analysis job monitors metrics for 30 minutes.
- If metrics pass, the job progressively increases canary percentage to 100%.
- If metrics fail, the job calls the routing API to set canary_pct back to 0 and fires an alert.
- Once at 100%, the old container is scaled down and v1.3.0 is marked deprecated in the registry.
This entire process happens without a single agent session being interrupted and without a single human needing to manually SSH into a server.
Conclusion: Treat Your MCP Tools Like First-Class APIs
The teams that will win with agentic AI systems in 2026 are not the ones with the most capable models. They're the ones who build the infrastructure to iterate on their tools safely and rapidly. A model that can call 50 well-maintained, versioned, observable tools will outperform a model that can call 200 fragile ones.
The registry pattern we've built here gives you four superpowers that a naive MCP setup simply cannot match: atomic hot-swapping without session disruption, progressive canary rollouts with instant rollback, schema validation that prevents silent mismatch failures, and rich discoverability that lets agents and orchestrators reason about what tools exist and how they've changed.
Start simple. Build the registry store and the catalog endpoint first. Wire up the gateway to poll it. Add canary routing. Then layer in observability and automation. Each step makes your production system more resilient than the last. Your future self, woken up at 3 AM by a pager, will be grateful you did.