Detailed Guide to Building MCP Server in Production: Complete Technical Deep Dive
A comprehensive technical guide to designing, building, testing, deploying, and operating MCP (Model Context Protocol) servers in production environments. Covers architecture design, security hardening, performance optimization, observability, disaster recovery, and real-world patterns for integrating AI capabilities with existing business systems. Includes complete code examples, deployment strategies, and lessons from production deployments.
Tech Stack:
Introduction: The Era of AI-Native Infrastructure
The release of the Model Context Protocol (MCP) marks a fundamental shift in how applications integrate with language models. Unlike earlier approaches that treated AI as a simple API call, MCP enables bidirectional communication between LLMs and application infrastructure, creating a new class of AI-native systems.
What is MCP and Why It Matters
The Model Context Protocol is an open-source standard developed by Anthropic that defines how AI models (particularly Claude) interact with external tools, data sources, and services. It's a protocol layer between your application and language models.
Traditional LLM integration:
Application
โ
Prompt crafting
โ
LLM API (OpenAI/Anthropic)
โ
Parse response
โ
Action/Response
MCP-based integration:
Application
โ
MCP Server (Your service)
โโ Tools (function definitions)
โโ Resources (data sources)
โโ Prompts (system instructions)
โโ State (persistent context)
โ
LLM (Claude)
โ
Requests tools/resources from MCP Server
โ
MCP Server executes, returns results
โ
Claude reasons over results
โ
Provides intelligent response
Why this matters for production systems:
- Structured integration: Define exactly what tools and data LLMs can access
- Security and governance: Control which operations LLMs can perform
- Efficiency: LLMs use tools intelligently instead of making wild API guesses
- Auditability: Every tool call is logged and traceable
- Scalability: One MCP server can serve multiple AI applications
The Challenge: MCP in Production
While MCP is powerful, running production MCP servers presents unique challenges:
- Reliability: MCP calls from Claude need <5 second response times (or requests timeout)
- Concurrency: Handling hundreds of simultaneous tool calls from different Claude sessions
- Data consistency: Ensuring tool results reflect current state when Claude makes multiple calls
- Security: Preventing models from accessing unauthorized resources or performing unintended actions
- Cost: Optimizing token usage and API calls to minimize Claude API costs
- Observability: Understanding what Claude is trying to do when things go wrong
- Versioning: Managing protocol and API changes without breaking production Claude deployments
This article covers all aspects of building production MCP servers: architecture, development, testing, deployment, and operations.
1: Understanding MCP Architecture
Before building, you must understand MCP at a fundamental level.
Core MCP Concepts
Tools
A tool is a function that Claude can call to perform actions. Unlike function calling in other APIs, MCP tools are strongly typed with full schema definitions.
# Example: A tool that fetches customer information
{
"name": "get_customer",
"description": "Retrieve customer details by ID",
"inputSchema": {
"type": "object",
"properties": {
"customer_id": {
"type": "string",
"description": "The unique customer identifier"
},
"include_orders": {
"type": "boolean",
"description": "Whether to include order history",
"default": false
}
},
"required": ["customer_id"]
}
}
pythonWhy strong typing matters:
- Claude understands exactly what parameters are needed
- No ambiguity in what the tool does
- Easier validation and error handling
- Better documentation for Claude
Resources
A resource is data that Claude can request. Unlike tools (which execute actions), resources are read-only data sources that Claude queries for context.
# Example: A resource that provides customer database schema
{
"uri": "database://customers/schema",
"name": "customers_schema",
"description": "Schema of the customers table",
"mimeType": "text/plain",
"contents": "customers table: id (UUID), name (text), email (email), created_at (timestamp), tier (enum: free|pro|enterprise)"
}
pythonResources vs Tools:
| Aspect | Tools | Resources |
|---|---|---|
| Purpose | Execute actions, modify state | Provide context, read-only data |
| Invocation | Explicit function call | Requested when needed |
| Return type | Structured result | Text, HTML, or binary |
| Idempotency | Not required | Should be idempotent |
| Performance | Can be slow (user waits) | Should be fast (blocks Claude thinking) |
Prompts
A prompt is a reusable instruction template that Claude can invoke. Useful for standardized workflows or complex reasoning patterns.
{
"name": "analyze_customer_churn",
"description": "Analyze why a customer might be churning",
"arguments": [
{
"name": "customer_id",
"description": "Customer to analyze",
"required": true
}
]
}
pythonWhen Claude invokes this prompt, your MCP server returns a full system prompt with context pre-loaded:
You are an expert customer success analyst. You have been given access to the following customer data:
[Customer details fetched by MCP server]
[Order history]
[Support tickets]
[Product usage metrics]
Your task: Analyze this customer's data and provide 3 specific, actionable recommendations to prevent churn.
MCP Protocol Flow
Understanding the protocol flow is essential for debugging and optimization.
Complete MCP interaction sequence:
1. Client Initialization
โโ Client sends: {"jsonrpc": "2.0", "method": "initialize", ...}
โโ Server responds: {"jsonrpc": "2.0", "result": {"serverInfo": {...}}}
2. List Available Capabilities
โโ Client: {"method": "resources/list"}
โโ Server: [{"uri": "...", "name": "...", ...}]
โโ Client: {"method": "tools/list"}
โโ Server: [{"name": "...", "inputSchema": {...}}]
3. Claude Requests Data/Invokes Tool
โโ Client (Claude): {"method": "resources/read", "params": {"uri": "..."}}
โโ OR: {"method": "tools/call", "params": {"name": "...", "arguments": {...}}}
โโ Server processes request
4. MCP Server Responds
โโ For resources: {"contents": [...], "mimeType": "text/plain"}
โโ For tools: {"content": [{"type": "text", "text": "Result..."}]}
5. Repeat steps 3-4 as Claude reasons
โโ Claude may call multiple tools sequentially
6. Claude Generates Response
โโ Client: {"method": "completion", "params": {...}}
Timing considerations:
Total time available for Claude to complete request: ~60 seconds
โโ Time spent thinking: variable
โโ Time spent calling MCP tools: must be < 5 seconds per call
โโ Time spent processing results: variable
โโ If any MCP call exceeds 5s, Claude timeout
MCP Architecture Patterns
Different patterns suit different use cases.
Pattern 1: Monolithic MCP Server
All tools, resources, and prompts in a single service.
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MCP Server (Single Process) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโ Customer tools โ
โ โโ Order tools โ
โ โโ Product tools โ
โ โโ Analytics tools โ
โ โโ Payment tools โ
โ โ
โ โโ Customer resources โ
โ โโ Product resources โ
โ โโ Schema resources โ
โ โ
โ โโ Database connection pool โ
โ โโ Cache layer (Redis) โ
โ โโ Service dependencies โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
Claude API
When to use:
- Small to medium applications (<20 tools)
- Co-located team (single service owner)
- Simple dependencies (one database, one cache)
- <100 concurrent Claude sessions
Trade-offs:
- โ Simple deployment
- โ Easy debugging
- โ Single point of failure
- โ Hard to scale specific tools
Pattern 2: Federated MCP Servers
Multiple specialized MCP servers, each handling a domain.
Claude API
โ
โโโโโโโโโดโโโโโโโโฌโโโโโโโโโโโโฌโโโโโโโโโโโโ
โ โ โ โ
โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
โ Customer โ โ Order โ โ Product โ โ Payment โ
โ MCP Server โ โ MCP Server โ โ MCP Server โ โ MCP Server โ
โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
โ โ โ โ
[Customer DB] [Order DB] [Product DB] [Payment API]
When to use:
- Large applications (50+ tools)
- Multiple team ownership
- Different scaling requirements per domain
- >1000 concurrent Claude sessions
Trade-offs:
- โ Independent scaling
- โ Clear team boundaries
- โ Easy to deploy changes per domain
- โ Complex inter-server communication
- โ Distributed tracing complexity
Pattern 3: Proxy/Gateway MCP Server
Single MCP server that proxies to backend services.
Claude API
โ
โโโโโโโโโโโโโโโโโโโโโโโโโ
โ MCP Gateway Server โ
โ (Proxy + Dispatcher) โ
โโโฌโโโโโโโโโโฌโโโโโโโฌโโโโโ
โ โ โ
[Service 1] [Service 2] [Service 3]
When to use:
- Existing microservices architecture
- Need to gradually migrate to MCP
- Want centralized tool authorization
- Legacy systems integration
Trade-offs:
- โ Minimal changes to existing services
- โ Centralized security/auth
- โ Extra network hop
- โ Gateway becomes bottleneck
Comparison: Which Pattern for Production?
| Requirement | Monolithic | Federated | Gateway |
|---|---|---|---|
| Simplicity | โ โ โ | โ | โ โ |
| Scalability | โ | โ โ โ | โ |
| Observability | โ โ | โ ๏ธ | โ โ |
| Team autonomy | โ | โ โ โ | โ ๏ธ |
| Deployment speed | โ โ โ | โ ๏ธ | โ |
| Failure isolation | โ | โ โ โ | โ ๏ธ |
| Development complexity | โ | โโ | โ ๏ธ |
Recommendation for production: Start with monolithic (simplicity, fast time-to-value), migrate to federated at scale (>50 tools, multiple teams).
2: Building Your First MCP Server
Now let's build a production-grade MCP server from scratch.
Project Setup and Dependencies
Create project structure:
mkdir mcp-production-server && cd mcp-production-server
# Python project structure
mkdir -p src/{mcp,tools,resources,database,config}
mkdir -p tests/{unit,integration,load}
mkdir -p scripts
mkdir -p deploy/{docker,kubernetes,terraform}
# Files
touch requirements.txt setup.py Dockerfile docker-compose.yml
touch pytest.ini .env.example
touch README.md DEVELOPMENT.md
# Git
git init
echo "venv/" > .gitignore
echo "*.pyc" >> .gitignore
echo ".env" >> .gitignore
bashCore dependencies (requirements.txt):
# MCP
mcp==0.1.0
# Web framework
fastapi==0.104.1
uvicorn[standard]==0.24.0
httpx==0.25.2
# Database
sqlalchemy==2.0.23
alembic==1.12.1
psycopg2-binary==2.9.9
# Caching
redis==5.0.1
# Data validation
pydantic==2.5.0
pydantic-settings==2.1.0
# Async
anyio==3.7.1
asyncio-contextmanager==1.0.0
# Logging/Observability
python-json-logger==2.0.7
prometheus-client==0.19.0
# Testing
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-mock==3.12.0
httpx[testing]==0.25.2
# Load testing
locust==2.17.0
# Utils
python-dotenv==1.0.0
tenacity==8.2.3
Python version and virtual environment:
# Require Python 3.11+
python3.11 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install --upgrade pip
pip install -r requirements.txt
bashBasic MCP Server Implementation
Create the main server file (src/mcp/server.py):
from mcp.server import Server
from mcp.server.models import InitializationOptions
from pydantic import BaseModel
import logging
from typing import Any
logger = logging.getLogger(__name__)
class MCPServer:
"""Production-grade MCP Server implementation"""
def __init__(self, name: str = "ProductionMCP", version: str = "1.0.0"):
self.name = name
self.version = version
self.server = Server(name)
# Register handlers
self._setup_handlers()
def _setup_handlers(self):
"""Register all MCP message handlers"""
@self.server.list_tools()
async def list_tools() -> list[dict]:
"""Return list of available tools"""
return [
{
"name": "get_customer",
"description": "Fetch customer information by ID",
"inputSchema": {
"type": "object",
"properties": {
"customer_id": {
"type": "string",
"description": "Unique customer identifier"
}
},
"required": ["customer_id"]
}
},
{
"name": "create_order",
"description": "Create a new order for a customer",
"inputSchema": {
"type": "object",
"properties": {
"customer_id": {
"type": "string",
"description": "Customer placing the order"
},
"items": {
"type": "array",
"description": "Items in the order",
"items": {
"type": "object",
"properties": {
"product_id": {"type": "string"},
"quantity": {"type": "integer"}
},
"required": ["product_id", "quantity"]
}
}
},
"required": ["customer_id", "items"]
}
}
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[dict]:
"""Execute a tool and return result"""
try:
if name == "get_customer":
return await self._handle_get_customer(arguments)
elif name == "create_order":
return await self._handle_create_order(arguments)
else:
return [{"type": "text", "text": f"Unknown tool: {name}"}]
except Exception as e:
logger.error(f"Error calling tool {name}: {str(e)}", exc_info=True)
return [{
"type": "text",
"text": f"Error: {str(e)}"
}]
@self.server.list_resources()
async def list_resources() -> list[dict]:
"""Return list of available resources"""
return [
{
"uri": "database://schema/customers",
"name": "Customer Schema",
"description": "Database schema for customers table",
"mimeType": "text/plain"
}
]
@self.server.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource content"""
if uri == "database://schema/customers":
return """customers table schema:
- id: UUID (primary key)
- name: VARCHAR(255)
- email: VARCHAR(255)
- phone: VARCHAR(20)
- created_at: TIMESTAMP
- tier: ENUM(free, pro, enterprise)
- status: ENUM(active, inactive, suspended)
"""
else:
return f"Unknown resource: {uri}"
async def _handle_get_customer(self, arguments: dict) -> list[dict]:
"""Handle get_customer tool call"""
customer_id = arguments.get("customer_id")
# TODO: Query database
customer_data = {
"id": customer_id,
"name": "John Doe",
"email": "john@example.com",
"tier": "pro",
"orders": 15
}
return [{
"type": "text",
"text": f"Customer: {customer_data}"
}]
async def _handle_create_order(self, arguments: dict) -> list[dict]:
"""Handle create_order tool call"""
customer_id = arguments.get("customer_id")
items = arguments.get("items")
# TODO: Create order in database
order_id = "ord_123456"
return [{
"type": "text",
"text": f"Order {order_id} created with {len(items)} items"
}]
async def start(self):
"""Start the MCP server"""
logger.info(f"Starting {self.name} v{self.version}")
await self.server.arun()
pythonCreate FastAPI wrapper (src/api/server.py):
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from mcp.server import Server
import logging
import json
from typing import Any, Dict
app = FastAPI(
title="MCP Server",
description="Production-grade Model Context Protocol Server",
version="1.0.0"
)
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = Server("ProductionMCP")
@app.on_event("startup")
async def startup():
"""Initialize MCP server on startup"""
logger.info("MCP server starting up")
@app.on_event("shutdown")
async def shutdown():
"""Cleanup on shutdown"""
logger.info("MCP server shutting down")
@app.post("/mcp")
async def handle_mcp_message(data: Dict[str, Any]):
"""
Handle MCP messages
This endpoint receives JSON-RPC 2.0 messages from Claude or MCP clients
and routes them to appropriate handlers.
"""
try:
# Validate JSON-RPC format
if not isinstance(data, dict):
raise HTTPException(status_code=400, detail="Invalid request format")
jsonrpc = data.get("jsonrpc", "2.0")
method = data.get("method")
params = data.get("params", {})
request_id = data.get("id")
logger.info(f"MCP request: {method}")
# Route to appropriate handler
if method == "tools/list":
result = await handle_list_tools()
elif method == "tools/call":
result = await handle_call_tool(params)
elif method == "resources/list":
result = await handle_list_resources()
elif method == "resources/read":
result = await handle_read_resource(params)
else:
raise HTTPException(status_code=400, detail=f"Unknown method: {method}")
# Format JSON-RPC response
response = {
"jsonrpc": jsonrpc,
"result": result,
"id": request_id
}
return JSONResponse(response)
except Exception as e:
logger.error(f"Error handling MCP request: {str(e)}", exc_info=True)
return JSONResponse({
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": str(e)
},
"id": data.get("id")
}, status_code=500)
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "ok", "service": "mcp-server"}
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
# TODO: Return Prometheus metrics
return {"message": "Metrics endpoint"}
async def handle_list_tools() -> list[dict]:
"""List available tools"""
return [
{
"name": "get_customer",
"description": "Fetch customer by ID",
"inputSchema": {
"type": "object",
"properties": {
"customer_id": {"type": "string"}
},
"required": ["customer_id"]
}
}
]
async def handle_call_tool(params: dict) -> dict:
"""Execute a tool"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "get_customer":
return {"success": True, "data": {"id": "cust_123"}}
else:
raise HTTPException(status_code=400, detail=f"Unknown tool: {tool_name}")
async def handle_list_resources() -> list[dict]:
"""List available resources"""
return []
async def handle_read_resource(params: dict) -> str:
"""Read a resource"""
return "resource content"
pythonCreate entry point (main.py):
import uvicorn
import logging
from src.api.server import app
from src.config import settings
# Setup logging
logging.basicConfig(
level=settings.LOG_LEVEL,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=settings.PORT,
workers=settings.WORKERS,
log_level=settings.LOG_LEVEL.lower()
)
pythonConfiguration Management
Create configuration (src/config/init.py):
from pydantic_settings import BaseSettings
from typing import Optional
import os
class Settings(BaseSettings):
"""Application configuration"""
# Server
APP_NAME: str = "MCP Production Server"
VERSION: str = "1.0.0"
PORT: int = 8000
HOST: str = "0.0.0.0"
WORKERS: int = 4
# Environment
ENVIRONMENT: str = "development" # development, staging, production
DEBUG: bool = False
LOG_LEVEL: str = "INFO"
# Database
DATABASE_URL: str = "postgresql://user:password@localhost/mcp_db"
DATABASE_POOL_SIZE: int = 20
DATABASE_MAX_OVERFLOW: int = 10
DATABASE_POOL_TIMEOUT: int = 30
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
REDIS_CACHE_TTL: int = 3600
# Claude API
CLAUDE_API_KEY: str
CLAUDE_API_TIMEOUT: int = 60
# Security
API_KEY: Optional[str] = None
ALLOWED_ORIGINS: list = ["*"]
# Observability
PROMETHEUS_ENABLED: bool = True
JAEGER_ENABLED: bool = False
JAEGER_AGENT_HOST: str = "localhost"
JAEGER_AGENT_PORT: int = 6831
# Performance
REQUEST_TIMEOUT: int = 5
MAX_CONCURRENT_TOOLS: int = 100
TOOL_CALL_TIMEOUT: int = 5
# Rate limiting
RATE_LIMIT_ENABLED: bool = True
RATE_LIMIT_REQUESTS_PER_MINUTE: int = 1000
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = True
# Load settings
settings = Settings()
pythonCreate .env.example:
# Server
APP_NAME="MCP Production Server"
PORT=8000
ENVIRONMENT=development
DEBUG=true
LOG_LEVEL=DEBUG
# Database
DATABASE_URL=postgresql://postgres:password@localhost:5432/mcp_db
DATABASE_POOL_SIZE=20
# Redis
REDIS_URL=redis://localhost:6379/0
# Claude API
CLAUDE_API_KEY=sk-ant-xxx
# Security
API_KEY=your-secret-api-key
# Observability
PROMETHEUS_ENABLED=true
bashDatabase Integration
Create database layer (src/database/connection.py):
from sqlalchemy import create_engine, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from src.config import settings
import logging
logger = logging.getLogger(__name__)
# Database setup
Base = declarative_base()
# Async engine for async/await
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DEBUG,
pool_size=settings.DATABASE_POOL_SIZE,
max_overflow=settings.DATABASE_MAX_OVERFLOW,
pool_timeout=settings.DATABASE_POOL_TIMEOUT,
pool_pre_ping=True, # Test connections before using
)
# Session factory
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_db_session():
"""Get database session (for dependency injection)"""
async with AsyncSessionLocal() as session:
yield session
async def init_db():
"""Initialize database (create tables)"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Database initialized")
async def close_db():
"""Close database connections"""
await engine.dispose()
logger.info("Database connections closed")
pythonCreate models (src/database/models.py):
from sqlalchemy import Column, String, DateTime, Enum, Integer, Float, JSON
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime
import uuid
from src.database.connection import Base
class Customer(Base):
"""Customer model"""
__tablename__ = "customers"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(255), nullable=False)
email = Column(String(255), unique=True, nullable=False)
phone = Column(String(20))
tier = Column(String(50), default="free")
status = Column(String(50), default="active")
metadata = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Order(Base):
"""Order model"""
__tablename__ = "orders"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
customer_id = Column(UUID(as_uuid=True), nullable=False)
status = Column(String(50), default="pending")
total_amount = Column(Float, nullable=False)
item_count = Column(Integer, default=0)
metadata = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ToolCall(Base):
"""Log of all tool calls (for audit/observability)"""
__tablename__ = "tool_calls"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tool_name = Column(String(255), nullable=False)
arguments = Column(JSON, nullable=False)
result = Column(JSON)
error = Column(String(1000))
duration_ms = Column(Integer)
claude_session_id = Column(String(255))
created_at = Column(DateTime, default=datetime.utcnow)
python3: Advanced MCP Patterns for Production
Error Handling and Resilience
Production MCP servers must handle errors gracefully. Claude has strict timeouts; slow error handling violates them.
Create error handler (src/errors/handler.py):
from enum import Enum
from typing import Optional, Any
import logging
logger = logging.getLogger(__name__)
class ErrorCode(Enum):
"""Standard MCP error codes"""
TOOL_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
SERVER_ERROR = -32000
TIMEOUT = -32001
RESOURCE_NOT_FOUND = -32002
UNAUTHORIZED = -32003
class MCPError(Exception):
"""Base MCP error"""
def __init__(self, code: ErrorCode, message: str, data: Optional[Any] = None):
self.code = code
self.message = message
self.data = data
super().__init__(message)
def to_json_rpc(self):
"""Convert to JSON-RPC error format"""
return {
"code": self.code.value,
"message": self.message,
"data": self.data
}
class ToolNotFoundError(MCPError):
def __init__(self, tool_name: str):
super().__init__(
ErrorCode.TOOL_NOT_FOUND,
f"Tool '{tool_name}' not found"
)
class InvalidParamsError(MCPError):
def __init__(self, message: str, details: Optional[str] = None):
super().__init__(
ErrorCode.INVALID_PARAMS,
message,
{"details": details}
)
class TimeoutError(MCPError):
def __init__(self, tool_name: str, timeout_seconds: int):
super().__init__(
ErrorCode.TIMEOUT,
f"Tool '{tool_name}' exceeded {timeout_seconds}s timeout"
)
class UnauthorizedError(MCPError):
def __init__(self, resource: str):
super().__init__(
ErrorCode.UNAUTHORIZED,
f"Unauthorized access to {resource}"
)
class ErrorHandler:
"""Centralized error handling"""
@staticmethod
def handle_tool_error(tool_name: str, error: Exception) -> dict:
"""Handle tool execution error"""
if isinstance(error, MCPError):
logger.warning(f"MCP error in {tool_name}: {error.message}")
return error.to_json_rpc()
elif isinstance(error, ValueError):
logger.warning(f"Validation error in {tool_name}: {str(error)}")
return {
"code": ErrorCode.INVALID_PARAMS.value,
"message": f"Validation error: {str(error)}"
}
elif isinstance(error, TimeoutError):
logger.error(f"Timeout in {tool_name}")
return {
"code": ErrorCode.TIMEOUT.value,
"message": f"Tool execution timeout"
}
else:
logger.error(f"Unexpected error in {tool_name}: {str(error)}", exc_info=True)
return {
"code": ErrorCode.INTERNAL_ERROR.value,
"message": "Internal server error",
"data": {"tool": tool_name} if not settings.DEBUG else {"tool": tool_name, "error": str(error)}
}
@staticmethod
def validate_tool_arguments(arguments: dict, schema: dict) -> tuple[bool, Optional[str]]:
"""Validate tool arguments against schema"""
# Check required fields
required_fields = schema.get("required", [])
for field in required_fields:
if field not in arguments:
return False, f"Missing required field: {field}"
# Check types
properties = schema.get("properties", {})
for field, value in arguments.items():
if field in properties:
expected_type = properties[field].get("type")
if expected_type and not ErrorHandler._check_type(value, expected_type):
return False, f"Field '{field}' has wrong type"
return True, None
@staticmethod
def _check_type(value: Any, expected_type: str) -> bool:
"""Check if value matches expected type"""
type_mapping = {
"string": str,
"integer": int,
"number": (int, float),
"boolean": bool,
"array": list,
"object": dict
}
expected = type_mapping.get(expected_type)
return isinstance(value, expected) if expected else True
pythonCreate middleware for error handling:
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
from src.errors.handler import ErrorHandler, MCPError
logger = logging.getLogger(__name__)
class ErrorHandlingMiddleware(BaseHTTPMiddleware):
"""Global error handling middleware"""
async def dispatch(self, request: Request, call_next) -> Response:
"""Process request and handle errors"""
start_time = time.time()
try:
response = await call_next(request)
# Log successful requests
duration = time.time() - start_time
logger.info(
f"{request.method} {request.url.path} - {response.status_code} ({duration:.2f}s)"
)
return response
except MCPError as e:
# Known MCP error
logger.warning(f"MCP error: {e.message}")
return JSONResponse(
{
"jsonrpc": "2.0",
"error": e.to_json_rpc(),
"id": None
},
status_code=400
)
except Exception as e:
# Unexpected error
logger.error(f"Unhandled error: {str(e)}", exc_info=True)
return JSONResponse(
{
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": "Internal server error"
},
"id": None
},
status_code=500
)
pythonCaching for Performance
MCP tools must respond in <5 seconds. Caching frequently-accessed data is crucial.
Create cache layer (src/cache/redis.py):
import redis.asyncio as redis
import json
from typing import Any, Optional
import logging
from src.config import settings
logger = logging.getLogger(__name__)
class RedisCache:
"""Redis-based cache for MCP results"""
def __init__(self):
self.redis_client: Optional[redis.Redis] = None
self.ttl = settings.REDIS_CACHE_TTL
async def connect(self):
"""Connect to Redis"""
try:
self.redis_client = await redis.from_url(
settings.REDIS_URL,
decode_responses=True
)
# Test connection
await self.redis_client.ping()
logger.info("Connected to Redis")
except Exception as e:
logger.error(f"Failed to connect to Redis: {str(e)}")
self.redis_client = None
async def disconnect(self):
"""Disconnect from Redis"""
if self.redis_client:
await self.redis_client.close()
logger.info("Disconnected from Redis")
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
if not self.redis_client:
return None
try:
value = await self.redis_client.get(key)
if value:
logger.debug(f"Cache hit: {key}")
return json.loads(value)
except Exception as e:
logger.error(f"Cache get error: {str(e)}")
return None
async def set(self, key: str, value: Any, ttl: Optional[int] = None):
"""Set value in cache"""
if not self.redis_client:
return
try:
await self.redis_client.setex(
key,
ttl or self.ttl,
json.dumps(value)
)
logger.debug(f"Cache set: {key}")
except Exception as e:
logger.error(f"Cache set error: {str(e)}")
async def delete(self, key: str):
"""Delete value from cache"""
if not self.redis_client:
return
try:
await self.redis_client.delete(key)
logger.debug(f"Cache delete: {key}")
except Exception as e:
logger.error(f"Cache delete error: {str(e)}")
async def invalidate_pattern(self, pattern: str):
"""Invalidate all keys matching pattern"""
if not self.redis_client:
return
try:
keys = await self.redis_client.keys(pattern)
if keys:
await self.redis_client.delete(*keys)
logger.debug(f"Cache invalidated {len(keys)} keys matching {pattern}")
except Exception as e:
logger.error(f"Cache invalidation error: {str(e)}")
# Global cache instance
cache = RedisCache()
class CacheDecorator:
"""Decorator for caching tool results"""
def __init__(self, ttl: Optional[int] = None):
self.ttl = ttl
def __call__(self, func):
async def wrapper(*args, **kwargs):
# Generate cache key from function name and arguments
cache_key = f"{func.__name__}:{json.dumps(kwargs, sort_keys=True, default=str)}"
# Try cache
cached = await cache.get(cache_key)
if cached is not None:
return cached
# Execute function
result = await func(*args, **kwargs)
# Cache result
await cache.set(cache_key, result, self.ttl)
return result
return wrapper
pythonRequest/Response Serialization
Claude sends and receives specific formats. Proper serialization is critical.
Create serialization (src/serialization/init.py):
from typing import Any, Dict, List
from datetime import datetime
import json
class MCPSerializer:
"""Serialize/deserialize MCP data"""
@staticmethod
def serialize_tool_result(result: Any, mime_type: str = "text/plain") -> List[Dict]:
"""
Serialize tool result to MCP format
MCP requires results in specific format:
[
{"type": "text", "text": "..."},
{"type": "image", "data": "base64_data", "mimeType": "image/png"},
{"type": "resource", "resource": ...}
]
"""
if isinstance(result, str):
return [{"type": "text", "text": result}]
elif isinstance(result, dict):
return [{"type": "text", "text": json.dumps(result, indent=2)}]
elif isinstance(result, list):
return [{"type": "text", "text": json.dumps(result, indent=2)}]
elif isinstance(result, bytes):
import base64
return [{
"type": "image",
"data": base64.b64encode(result).decode(),
"mimeType": mime_type
}]
else:
return [{"type": "text", "text": str(result)}]
@staticmethod
def deserialize_tool_arguments(arguments: Dict) -> Dict:
"""Deserialize tool arguments from MCP format"""
return arguments # Usually already in correct format
@staticmethod
def format_error_response(error: str, code: int = -32603) -> Dict:
"""Format error response"""
return {
"error": {
"code": code,
"message": error
}
}
pythonTimeout Management
Claude has strict timeout requirements. Implementing proper timeout handling is essential.
Create timeout handler (src/timeout.py):
import asyncio
from functools import wraps
import logging
logger = logging.getLogger(__name__)
class TimeoutExceededError(Exception):
"""Raised when operation exceeds timeout"""
pass
def async_timeout(seconds: int):
"""Decorator for async function timeout"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds
)
except asyncio.TimeoutError:
tool_name = kwargs.get('tool_name') or 'unknown'
logger.error(f"Tool {tool_name} exceeded {seconds}s timeout")
raise TimeoutExceededError(
f"Tool execution exceeded {seconds}s timeout"
)
return wrapper
return decorator
class TimeoutManager:
"""Manage timeouts for tool execution"""
TOOL_TIMEOUT = 5 # MCP requirement: <5s for tool calls
REQUEST_TIMEOUT = 60 # Total request timeout
@staticmethod
async def execute_with_timeout(
coro,
timeout_seconds: int = TOOL_TIMEOUT,
tool_name: str = "unknown"
):
"""Execute coroutine with timeout"""
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
logger.error(f"Timeout executing {tool_name}")
raise TimeoutExceededError(
f"{tool_name} exceeded {timeout_seconds}s timeout"
)
python4: Testing MCP Servers
Production code requires comprehensive testing. Testing MCP servers has unique challenges due to their async nature and Claude integration.
Unit Testing
Create unit tests (tests/unit/test_tools.py):
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from src.tools.customer import CustomerTool
from src.errors.handler import InvalidParamsError
class TestCustomerTool:
"""Test customer management tool"""
@pytest.fixture
async def tool(self):
"""Create tool instance"""
return CustomerTool()
@pytest.mark.asyncio
async def test_get_customer_success(self, tool):
"""Test successful customer retrieval"""
# Mock database
with patch.object(tool, 'db', new_callable=AsyncMock) as mock_db:
mock_db.get_customer.return_value = {
"id": "cust_123",
"name": "John Doe",
"email": "john@example.com"
}
result = await tool.get_customer("cust_123")
assert result["id"] == "cust_123"
assert result["name"] == "John Doe"
@pytest.mark.asyncio
async def test_get_customer_not_found(self, tool):
"""Test customer not found"""
with patch.object(tool, 'db', new_callable=AsyncMock) as mock_db:
mock_db.get_customer.return_value = None
with pytest.raises(Exception):
await tool.get_customer("nonexistent")
@pytest.mark.asyncio
async def test_create_customer_validation(self, tool):
"""Test input validation"""
# Missing required field
with pytest.raises(InvalidParamsError):
await tool.create_customer({"name": "John"}) # Missing email
@pytest.mark.asyncio
async def test_create_customer_success(self, tool):
"""Test successful customer creation"""
with patch.object(tool, 'db', new_callable=AsyncMock) as mock_db:
mock_db.create_customer.return_value = {
"id": "cust_new",
"name": "Jane Doe",
"email": "jane@example.com"
}
result = await tool.create_customer({
"name": "Jane Doe",
"email": "jane@example.com"
})
assert result["id"] == "cust_new"
pythonIntegration Testing
Create integration tests (tests/integration/test_mcp_flow.py):
import pytest
import httpx
from fastapi.testclient import TestClient
from src.api.server import app
class TestMCPFlow:
"""Test complete MCP interaction flow"""
@pytest.fixture
def client(self):
"""Create test client"""
return TestClient(app)
def test_list_tools(self, client):
"""Test listing available tools"""
response = client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "tools/list",
"id": 1
})
assert response.status_code == 200
data = response.json()
assert "result" in data
assert isinstance(data["result"], list)
assert len(data["result"]) > 0
# Verify tool structure
tool = data["result"][0]
assert "name" in tool
assert "description" in tool
assert "inputSchema" in tool
def test_call_tool_success(self, client):
"""Test successful tool call"""
response = client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "get_customer",
"arguments": {"customer_id": "cust_123"}
},
"id": 2
})
assert response.status_code == 200
data = response.json()
assert "result" in data
def test_call_unknown_tool(self, client):
"""Test calling unknown tool"""
response = client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "unknown_tool",
"arguments": {}
},
"id": 3
})
assert response.status_code in [200, 400]
data = response.json()
assert "error" in data
def test_list_resources(self, client):
"""Test listing resources"""
response = client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "resources/list",
"id": 4
})
assert response.status_code == 200
data = response.json()
assert "result" in data
assert isinstance(data["result"], list)
def test_read_resource(self, client):
"""Test reading resource"""
response = client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "resources/read",
"params": {"uri": "database://schema/customers"},
"id": 5
})
assert response.status_code == 200
data = response.json()
assert "result" in data
pythonLoad Testing
Create load test (tests/load/locustfile.py):
from locust import HttpUser, task, between
import json
import random
class MCPServerUser(HttpUser):
"""Simulate MCP client load"""
wait_time = between(1, 3) # Wait 1-3 seconds between requests
@task(3)
def list_tools(self):
"""List tools (high frequency)"""
self.client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "tools/list",
"id": 1
})
@task(2)
def call_get_customer(self):
"""Call get_customer tool"""
customer_id = f"cust_{random.randint(1, 1000)}"
self.client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "get_customer",
"arguments": {"customer_id": customer_id}
},
"id": 2
})
@task(1)
def call_create_order(self):
"""Call create_order tool"""
self.client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "create_order",
"arguments": {
"customer_id": f"cust_{random.randint(1, 1000)}",
"items": [
{
"product_id": f"prod_{random.randint(1, 100)}",
"quantity": random.randint(1, 5)
}
]
}
},
"id": 3
})
# Run with: locust -f tests/load/locustfile.py --host=http://localhost:8000
python5: Production Deployment
Deploying MCP servers to production requires careful planning.
Docker Containerization
Create Dockerfile:
# Build stage
FROM python:3.11-slim as builder
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir -r requirements.txt
# Production stage
FROM python:3.11-slim
WORKDIR /app
# Install runtime dependencies only
RUN apt-get update && apt-get install -y \
postgresql-client \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy virtual environment from builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["python", "-m", "uvicorn", "src.api.server:app", "--host", "0.0.0.0", "--port", "8000"]
dockerfileCreate docker-compose.yml for local development:
version: '3.9'
services:
# MCP Server
mcp-server:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
DATABASE_URL: postgresql://postgres:postgres@db:5432/mcp_db
REDIS_URL: redis://redis:6379/0
CLAUDE_API_KEY: ${CLAUDE_API_KEY}
ENVIRONMENT: development
DEBUG: "true"
depends_on:
- db
- redis
volumes:
- .:/app
command: uvicorn src.api.server:app --host 0.0.0.0 --reload
# PostgreSQL Database
db:
image: postgres:15-alpine
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: mcp_db
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
# Redis Cache
redis:
image: redis:7-alpine
ports:
- "6379:6379"
# Prometheus (metrics)
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
# Grafana (visualizations)
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana_data:/var/lib/grafana
volumes:
postgres_data:
prometheus_data:
grafana_data:
yamlKubernetes Deployment
Create Kubernetes manifests (deploy/kubernetes/mcp-server.yaml):
apiVersion: v1
kind: ConfigMap
metadata:
name: mcp-config
namespace: production
data:
LOG_LEVEL: "INFO"
ENVIRONMENT: "production"
---
apiVersion: v1
kind: Secret
metadata:
name: mcp-secrets
namespace: production
type: Opaque
stringData:
CLAUDE_API_KEY: "sk-ant-xxx"
DATABASE_URL: "postgresql://user:pass@db:5432/mcp"
REDIS_URL: "redis://redis:6379/0"
API_KEY: "secret-api-key"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server
namespace: production
labels:
app: mcp-server
spec:
replicas: 3 # High availability
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0 # Zero-downtime deployment
selector:
matchLabels:
app: mcp-server
template:
metadata:
labels:
app: mcp-server
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
# Anti-affinity for high availability
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- mcp-server
topologyKey: kubernetes.io/hostname
# Pod disruption budget
containers:
- name: mcp-server
image: mcp-server:v1.0.0
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 8000
protocol: TCP
- name: metrics
containerPort: 9090
protocol: TCP
# Environment from ConfigMap and Secrets
envFrom:
- configMapRef:
name: mcp-config
- secretRef:
name: mcp-secrets
# Resource requests (for scheduling)
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"
# Liveness probe (restart if unhealthy)
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
# Readiness probe (remove from load balancer if not ready)
readinessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 2
# Startup probe (for slow-starting apps)
startupProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 0
periodSeconds: 5
failureThreshold: 30
# Graceful shutdown
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 15"]
terminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: mcp-server
namespace: production
labels:
app: mcp-server
spec:
type: ClusterIP
selector:
app: mcp-server
ports:
- name: http
port: 80
targetPort: 8000
- name: metrics
port: 9090
targetPort: 9090
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: mcp-server-pdb
namespace: production
spec:
minAvailable: 1
selector:
matchLabels:
app: mcp-server
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-server-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 30
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
yaml6: Observability and Monitoring
Production MCP servers need comprehensive observability.
Metrics and Prometheus
Create metrics exporter (src/metrics.py):
from prometheus_client import Counter, Histogram, Gauge
import time
# Request metrics
tool_calls_total = Counter(
'mcp_tool_calls_total',
'Total tool calls',
['tool_name', 'status']
)
tool_call_duration_seconds = Histogram(
'mcp_tool_call_duration_seconds',
'Tool call duration in seconds',
['tool_name'],
buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0)
)
cache_hits = Counter(
'mcp_cache_hits_total',
'Total cache hits',
['cache_type']
)
cache_misses = Counter(
'mcp_cache_misses_total',
'Total cache misses',
['cache_type']
)
# Error metrics
tool_errors = Counter(
'mcp_tool_errors_total',
'Total tool errors',
['tool_name', 'error_type']
)
timeouts = Counter(
'mcp_timeouts_total',
'Total tool timeouts',
['tool_name']
)
# Database metrics
db_connections_active = Gauge(
'mcp_db_connections_active',
'Active database connections'
)
db_query_duration_seconds = Histogram(
'mcp_db_query_duration_seconds',
'Database query duration',
['operation'],
buckets=(0.001, 0.01, 0.05, 0.1, 0.5, 1.0)
)
# Queue metrics
redis_operations = Counter(
'mcp_redis_operations_total',
'Total Redis operations',
['operation', 'status']
)
redis_latency_seconds = Histogram(
'mcp_redis_latency_seconds',
'Redis operation latency',
['operation'],
buckets=(0.001, 0.005, 0.01, 0.05, 0.1)
)
class MetricsMiddleware:
"""Middleware to collect metrics"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
"""Record metrics for request"""
if scope["type"] != "http":
await self.app(scope, receive, send)
return
start_time = time.time()
# Wrap send to capture response
async def send_with_metrics(message):
if message["type"] == "http.response.start":
# Record request duration
duration = time.time() - start_time
status_code = message["status"]
await send(message)
await self.app(scope, receive, send_with_metrics)
# Endpoint to expose metrics
from fastapi import APIRouter
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
router = APIRouter()
@router.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
from fastapi import Response
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
pythonPrometheus configuration (prometheus.yml):
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
service: 'mcp-server'
scrape_configs:
- job_name: 'mcp-server'
static_configs:
- targets: ['localhost:8000']
metrics_path: '/metrics'
rule_files:
- 'alert-rules.yml'
alerting:
alertmanagers:
- static_configs:
- targets: ['localhost:9093']
yamlAlert rules (alert-rules.yml):
groups:
- name: mcp-alerts
interval: 30s
rules:
# High error rate
- alert: HighToolErrorRate
expr: |
(
sum(rate(mcp_tool_errors_total[5m])) by (tool_name)
/
sum(rate(mcp_tool_calls_total[5m])) by (tool_name)
) > 0.05
for: 5m
severity: critical
annotations:
summary: "Tool {{ $labels.tool_name }} has high error rate"
# Tool timeout
- alert: ToolTimeout
expr: rate(mcp_timeouts_total[5m]) > 0
for: 1m
severity: warning
annotations:
summary: "Tool {{ $labels.tool_name }} is timing out"
# Slow tool
- alert: SlowTool
expr: |
histogram_quantile(0.95, rate(mcp_tool_call_duration_seconds_bucket[5m])) > 2.0
for: 5m
severity: warning
annotations:
summary: "Tool {{ $labels.tool_name }} p95 latency > 2s"
# Cache miss rate
- alert: HighCacheMissRate
expr: |
(
sum(rate(mcp_cache_misses_total[5m]))
/
(sum(rate(mcp_cache_hits_total[5m])) + sum(rate(mcp_cache_misses_total[5m])))
) > 0.8
for: 10m
severity: warning
annotations:
summary: "Cache miss rate > 80%"
# Database connection issues
- alert: DatabaseConnectionPoolAlmostFull
expr: mcp_db_connections_active > 18 # Out of 20
for: 5m
severity: critical
annotations:
summary: "Database connection pool almost full"
yamlLogging and Tracing
Structured logging (src/logging_config.py):
import logging
import json
import sys
from pythonjsonlogger import jsonlogger
from src.config import settings
def setup_logging():
"""Configure structured JSON logging"""
# Root logger
logger = logging.getLogger()
logger.setLevel(settings.LOG_LEVEL)
# JSON formatter
handler = logging.StreamHandler(sys.stdout)
formatter = jsonlogger.JsonFormatter(
fmt='%(timestamp)s %(level)s %(name)s %(message)s %(request_id)s %(tool_name)s %(duration_ms)s',
timestamp=True
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
# Create logger
logger = setup_logging()
class StructuredLogger:
"""Wrapper for structured logging"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def tool_call(self, tool_name: str, arguments: dict, duration_ms: float, error: str = None):
"""Log tool call"""
self.logger.info(
"Tool called",
extra={
"tool_name": tool_name,
"arguments": json.dumps(arguments),
"duration_ms": duration_ms,
"error": error
}
)
def database_query(self, operation: str, duration_ms: float, rows_affected: int = None):
"""Log database operation"""
self.logger.debug(
"Database query",
extra={
"operation": operation,
"duration_ms": duration_ms,
"rows_affected": rows_affected
}
)
python7: Security Hardening
MCP servers handle sensitive operations. Security is paramount.
Authentication and Authorization
Create auth layer (src/security/auth.py):
from fastapi import HTTPException, Depends, Header
from typing import Optional
import logging
from src.config import settings
logger = logging.getLogger(__name__)
class AuthManager:
"""Handle API authentication and authorization"""
@staticmethod
def verify_api_key(x_api_key: Optional[str] = Header(None)) -> str:
"""Verify API key from header"""
if not settings.API_KEY:
# API key not configured, skip validation
return "anonymous"
if not x_api_key:
logger.warning("Missing API key")
raise HTTPException(status_code=401, detail="Missing API key")
if x_api_key != settings.API_KEY:
logger.warning(f"Invalid API key: {x_api_key[:5]}...")
raise HTTPException(status_code=403, detail="Invalid API key")
return "authenticated"
@staticmethod
def verify_claude_origin(origin: Optional[str] = Header(None)) -> bool:
"""Verify request comes from Claude"""
# TODO: Implement proper Claude request verification
# This would involve verifying JWT tokens or signatures
return True
@staticmethod
def require_auth():
"""Dependency for requiring authentication"""
def verify(auth_status: str = Depends(AuthManager.verify_api_key)):
if auth_status == "anonymous":
raise HTTPException(status_code=401, detail="Authentication required")
return verify
class ToolAuthorization:
"""Authorization for specific tools"""
TOOL_PERMISSIONS = {
"get_customer": ["read:customers"],
"create_order": ["write:orders"],
"delete_order": ["admin", "delete:orders"],
"create_customer": ["admin", "write:customers"],
}
@staticmethod
def can_access_tool(tool_name: str, user_roles: list) -> bool:
"""Check if user can access tool"""
required_roles = ToolAuthorization.TOOL_PERMISSIONS.get(tool_name, [])
# If no restrictions, allow access
if not required_roles:
return True
# Check if user has any required role
return any(role in user_roles for role in required_roles)
pythonInput Validation and Sanitization
Create validator (src/security/validation.py):
from pydantic import BaseModel, field_validator
import re
class CustomerToolInput(BaseModel):
"""Validated input for customer tool"""
customer_id: str
@field_validator('customer_id')
@classmethod
def validate_customer_id(cls, v):
"""Validate customer ID format"""
if not re.match(r'^cust_[a-z0-9]{20}$', v):
raise ValueError('Invalid customer ID format')
return v
class OrderToolInput(BaseModel):
"""Validated input for order tool"""
customer_id: str
items: list
@field_validator('customer_id')
@classmethod
def validate_customer_id(cls, v):
if not re.match(r'^cust_[a-z0-9]{20}$', v):
raise ValueError('Invalid customer ID format')
return v
@field_validator('items')
@classmethod
def validate_items(cls, v):
if not isinstance(v, list) or len(v) == 0:
raise ValueError('Items list cannot be empty')
for item in v:
if not isinstance(item, dict):
raise ValueError('Item must be object')
if 'product_id' not in item or 'quantity' not in item:
raise ValueError('Item missing required fields')
if not isinstance(item['quantity'], int) or item['quantity'] < 1:
raise ValueError('Quantity must be positive integer')
return v
class Sanitizer:
"""Sanitize tool arguments"""
@staticmethod
def sanitize_string(value: str, max_length: int = 1000) -> str:
"""Remove potentially harmful characters"""
# Truncate if too long
value = value[:max_length]
# Remove null bytes
value = value.replace('\x00', '')
# Remove control characters
value = ''.join(ch for ch in value if ord(ch) >= 32)
return value
@staticmethod
def sanitize_sql(value: str) -> str:
"""Additional SQL injection protection"""
# This is basic; use ORM for real protection
dangerous_keywords = ['DROP', 'DELETE', 'TRUNCATE', 'INSERT', 'UPDATE']
for keyword in dangerous_keywords:
if keyword in value.upper():
raise ValueError(f"Potentially dangerous SQL: {keyword}")
return value
pythonRate Limiting
Create rate limiter (src/security/rate_limit.py):
from fastapi import HTTPException
from redis.asyncio import Redis
import time
from src.config import settings
class RateLimiter:
"""Rate limiting for MCP server"""
def __init__(self, redis_client: Redis):
self.redis = redis_client
self.requests_per_minute = settings.RATE_LIMIT_REQUESTS_PER_MINUTE
async def check_rate_limit(self, client_id: str) -> bool:
"""Check if client has exceeded rate limit"""
if not settings.RATE_LIMIT_ENABLED:
return True
key = f"rate_limit:{client_id}"
current = await self.redis.incr(key)
# Set expiration on first request
if current == 1:
await self.redis.expire(key, 60)
if current > self.requests_per_minute:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded: {self.requests_per_minute}/minute"
)
return True
async def get_remaining(self, client_id: str) -> int:
"""Get remaining requests for client"""
key = f"rate_limit:{client_id}"
current = await self.redis.get(key)
current_count = int(current) if current else 0
return max(0, self.requests_per_minute - current_count)
python8: Advanced Production Patterns
Circuit Breaking and Fallbacks
Create circuit breaker (src/resilience/circuit_breaker.py):
from enum import Enum
import asyncio
import time
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
"""Circuit breaker for external service calls"""
def __init__(
self,
failure_threshold: int = 5,
timeout_seconds: int = 60,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.timeout_seconds = timeout_seconds
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
# Check if timeout expired
if time.time() - self.last_failure_time > self.timeout_seconds:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
# Success
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
# Circuit recovered
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
return result
except Exception as e:
# Failure
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
pythonRetry Logic with Exponential Backoff
Create retry handler (src/resilience/retry.py):
import asyncio
import logging
from typing import Callable, Any, Optional
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_log,
after_log
)
logger = logging.getLogger(__name__)
def async_retry_with_backoff(
max_attempts: int = 3,
base_wait: float = 0.1,
max_wait: float = 10.0,
exceptions: tuple = (Exception,)
):
"""Decorator for async retry with exponential backoff"""
return retry(
stop=stop_after_attempt(max_attempts),
wait=wait_exponential(multiplier=1, min=base_wait, max=max_wait),
retry=retry_if_exception_type(exceptions),
before=before_log(logger, logging.DEBUG),
after=after_log(logger, logging.DEBUG),
reraise=True
)
class RetryManager:
"""Manage retries for tool calls"""
@staticmethod
async def execute_with_retry(
func: Callable,
max_attempts: int = 3,
*args,
**kwargs
) -> Any:
"""Execute function with retries"""
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
# Exponential backoff
wait_time = 2 ** attempt
logger.warning(
f"Attempt {attempt + 1} failed, retrying in {wait_time}s: {str(e)}"
)
await asyncio.sleep(wait_time)
else:
logger.error(f"All {max_attempts} attempts failed: {str(e)}")
raise last_exception
pythonRequest Context and Tracing
Create request context (src/context.py):
import contextvars
import uuid
from typing import Optional
# Context variables
request_id: contextvars.ContextVar[str] = contextvars.ContextVar('request_id')
user_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('user_id', default=None)
claude_session_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('claude_session_id', default=None)
class RequestContext:
"""Manage request context"""
@staticmethod
def get_request_id() -> str:
"""Get current request ID"""
return request_id.get()
@staticmethod
def set_request_id(req_id: Optional[str] = None):
"""Set request ID"""
req_id = req_id or str(uuid.uuid4())
request_id.set(req_id)
return req_id
@staticmethod
def set_claude_session(session_id: str):
"""Set Claude session ID"""
claude_session_id.set(session_id)
@staticmethod
def get_all() -> dict:
"""Get all context variables"""
return {
"request_id": request_id.get(),
"user_id": user_id.get(),
"claude_session_id": claude_session_id.get()
}
# Middleware to inject context
from fastapi import Request
import uuid
async def context_middleware(request: Request, call_next):
"""Inject request context"""
# Generate or get request ID
req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
RequestContext.set_request_id(req_id)
# Extract Claude session if available
claude_session = request.headers.get("X-Claude-Session-ID")
if claude_session:
RequestContext.set_claude_session(claude_session)
response = await call_next(request)
# Add request ID to response
response.headers["X-Request-ID"] = req_id
return response
python9: Disaster Recovery and Backups
Production systems must have backup and recovery strategies.
Database Backups
Create backup script (scripts/backup.py):
#!/usr/bin/env python3
"""
Database backup script
Run daily via cron: 0 2 * * * /path/to/backup.py
"""
import asyncio
import subprocess
import datetime
import logging
import boto3
from pathlib import Path
from src.config import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BackupManager:
"""Manage database backups"""
def __init__(self):
self.s3 = boto3.client('s3')
self.backup_bucket = "mcp-backups"
async def backup_database(self) -> str:
"""Create database backup"""
# Parse database URL
from urllib.parse import urlparse
parsed = urlparse(settings.DATABASE_URL)
host = parsed.hostname
port = parsed.port or 5432
db_name = parsed.path.lstrip('/')
user = parsed.username
password = parsed.password
# Timestamp for backup filename
timestamp = datetime.datetime.now().isoformat()
filename = f"backup-{timestamp}.sql"
filepath = Path("/tmp") / filename
try:
# Run pg_dump
cmd = [
"pg_dump",
f"--host={host}",
f"--port={port}",
f"--username={user}",
f"--file={filepath}",
db_name
]
env = {"PGPASSWORD": password}
result = subprocess.run(
cmd,
capture_output=True,
env=env,
timeout=3600
)
if result.returncode != 0:
raise Exception(f"pg_dump failed: {result.stderr.decode()}")
logger.info(f"Database backup created: {filepath}")
# Upload to S3
self._upload_to_s3(filepath, filename)
# Cleanup local backup
filepath.unlink()
logger.info(f"Backup uploaded to S3: {filename}")
return filename
except Exception as e:
logger.error(f"Backup failed: {str(e)}")
raise
def _upload_to_s3(self, filepath: Path, filename: str):
"""Upload backup to S3"""
self.s3.upload_file(
str(filepath),
self.backup_bucket,
f"postgres/{filename}"
)
async def restore_from_backup(self, backup_filename: str):
"""Restore database from backup"""
logger.info(f"Restoring from backup: {backup_filename}")
# Download from S3
filepath = Path("/tmp") / backup_filename
self.s3.download_file(
self.backup_bucket,
f"postgres/{backup_filename}",
str(filepath)
)
# Parse database URL
from urllib.parse import urlparse
parsed = urlparse(settings.DATABASE_URL)
host = parsed.hostname
port = parsed.port or 5432
db_name = parsed.path.lstrip('/')
user = parsed.username
password = parsed.password
try:
# Drop and recreate database
cmd_drop = [
"psql",
f"--host={host}",
f"--port={port}",
f"--username={user}",
f"--command=DROP DATABASE IF EXISTS {db_name};"
]
env = {"PGPASSWORD": password}
subprocess.run(cmd_drop, capture_output=True, env=env)
cmd_create = [
"psql",
f"--host={host}",
f"--port={port}",
f"--username={user}",
f"--command=CREATE DATABASE {db_name};"
]
subprocess.run(cmd_create, capture_output=True, env=env)
# Restore
cmd_restore = [
"psql",
f"--host={host}",
f"--port={port}",
f"--username={user}",
f"--file={filepath}",
db_name
]
result = subprocess.run(
cmd_restore,
capture_output=True,
env=env,
timeout=3600
)
if result.returncode != 0:
raise Exception(f"psql restore failed: {result.stderr.decode()}")
logger.info("Database restored successfully")
# Cleanup
filepath.unlink()
except Exception as e:
logger.error(f"Restore failed: {str(e)}")
raise
async def main():
"""Main backup routine"""
manager = BackupManager()
await manager.backup_database()
if __name__ == "__main__":
asyncio.run(main())
pythonDisaster Recovery Plan
Create recovery runbook (docs/DISASTER_RECOVERY.md):
# Disaster Recovery Plan
## Recovery Time Objective (RTO): 1 hour
## Recovery Point Objective (RPO): 1 day
### Scenarios
#### Scenario 1: Service Crashed
**Detection**: Health checks fail, Kubernetes restarts pod
**MTTR**: <1 minute (automatic restart)
**Recovery**: Kubernetes handles automatically
#### Scenario 2: Database Corruption
**Detection**: Query errors, data integrity checks fail
**MTTR**: <30 minutes
**Recovery**:
1. Alert fires, on-call engineer notified
2. Stop all MCP server connections
3. Restore database from most recent clean backup
4. Run integrity checks
5. Resume MCP server
#### Scenario 3: Complete Data Loss
**Detection**: Database completely unavailable
**MTTR**: 1-4 hours
**Recovery**:
1. Provision new database instance
2. Restore from S3 backup
3. Perform verification
4. Redirect MCP servers to new database
#### Scenario 4: Corrupted Redis Cache
**Detection**: Cache operations failing
**MTTR**: <5 minutes
**Recovery**:
1. Flush Redis (data is cache, not authoritative)
2. Clear cache keys
3. Cache will rebuild on first requests
### Regular Testing
- Monthly backup restore test
- Quarterly full disaster recovery drill
- Runbook update after each incident
### Backup Schedule
- Database: Daily at 2 AM UTC
- Configuration: On each deployment
- Retention: 30 days
markdown10: Performance Optimization
Making MCP servers fast is essential for Claude integration.
Query Optimization
Create query optimizer (src/database/optimizer.py):
from sqlalchemy import select, text
import logging
from src.database.models import Customer, Order
logger = logging.getLogger(__name__)
class QueryOptimizer:
"""Database query optimization patterns"""
@staticmethod
def get_customer_with_orders(db_session, customer_id: str):
"""
โ BAD: N+1 query problem
- First query: SELECT * FROM customers WHERE id = ?
- Second query (for each customer): SELECT * FROM orders WHERE customer_id = ?
"""
customer = db_session.query(Customer).filter_by(id=customer_id).first()
# Accessing customer.orders triggers another query
orders = customer.orders # N+1 query!
@staticmethod
def get_customer_with_orders_optimized(db_session, customer_id: str):
"""
โ
GOOD: Single query with join
"""
query = (
select(Customer)
.where(Customer.id == customer_id)
.options(
# Eager load orders
joinedload(Customer.orders)
)
)
customer = db_session.execute(query).scalar_one_or_none()
# customer.orders already loaded, no additional query
@staticmethod
def get_customers_batch(db_session, customer_ids: list):
"""
โ
GOOD: Batch query instead of loop
"""
query = select(Customer).where(Customer.id.in_(customer_ids))
customers = db_session.execute(query).scalars().all()
# Single query for all customers
# Database indexes
DATABASE_INDEXES = """
-- Essential indexes for performance
CREATE INDEX idx_customers_id ON customers(id);
CREATE INDEX idx_orders_customer_id ON orders(customer_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_tool_calls_tool_name ON tool_calls(tool_name);
CREATE INDEX idx_tool_calls_created_at ON tool_calls(created_at);
-- Composite indexes for common queries
CREATE INDEX idx_orders_customer_status ON orders(customer_id, status);
CREATE INDEX idx_tool_calls_tool_created ON tool_calls(tool_name, created_at);
"""
pythonConnection Pooling
Connection pool configuration:
# src/database/connection.py (updated)
# Optimal pool sizing
"""
Pool size formula: (number of workers ร 2) + spare connections
For typical 4-worker setup:
- Pool size: (4 ร 2) + 5 = 13
- Max overflow: 5
- Total possible: 18 connections
"""
engine = create_async_engine(
settings.DATABASE_URL,
# Pool configuration
pool_size=13, # Base pool size
max_overflow=5, # Additional overflow connections
pool_timeout=30, # Timeout waiting for connection
pool_pre_ping=True, # Test connections before using
pool_recycle=3600, # Recycle connections every hour
# Performance tuning
connect_args={
"timeout": 10,
"command_timeout": 30,
"server_settings": {
"application_name": "mcp_server",
"jit": "off" # Disable JIT for faster queries
}
}
)
pythonCaching Strategy
Multi-layer caching:
Request
โ
L1: Local in-memory cache (100ms)
โ Miss? โ
L2: Redis cache (5ms)
โ Miss? โ
L3: Database (50-200ms)
โโโ Populate L2 and L1
from functools import lru_cache
import aiocache
class CacheStrategy:
"""Multi-layer caching"""
# L1: Local memory cache
@lru_cache(maxsize=1000)
def get_product_schema(self, product_type: str):
"""Cache product schemas in memory"""
return self._load_schema(product_type)
# L2: Redis cache (shared across instances)
async def get_customer_cached(self, customer_id: str):
"""Cache customer data in Redis"""
cache_key = f"customer:{customer_id}"
# Try cache first
cached = await self.cache.get(cache_key)
if cached:
return cached
# Fetch from database
customer = await self.db.get_customer(customer_id)
# Cache for 1 hour
await self.cache.set(cache_key, customer, ttl=3600)
return customer
async def invalidate_customer(self, customer_id: str):
"""Invalidate cached customer"""
await self.cache.delete(f"customer:{customer_id}")
pythonBatch Operations
class BatchOptimizer:
"""Optimize operations with batching"""
async def get_customers_batch(self, customer_ids: list):
"""
โ
Better: Single query for all customers
Instead of: for customer_id in customer_ids: get_customer(...)
"""
query = select(Customer).where(Customer.id.in_(customer_ids))
return await self.db.execute(query)
async def create_orders_batch(self, orders: list):
"""
โ
Better: Single INSERT with multiple values
"""
query = insert(Order).values(orders)
await self.db.execute(query)
python11: Production Operations
Day-to-day running of production MCP servers.
Deployment Strategy
Blue-green deployment with zero downtime:
#!/bin/bash
# deploy.sh
set -e
# Configuration
NAMESPACE="production"
SERVICE_NAME="mcp-server"
NEW_VERSION="v1.2.3"
IMAGE="mcp-server:$NEW_VERSION"
echo "Starting blue-green deployment..."
# Step 1: Pull latest image
docker pull $IMAGE
# Step 2: Deploy green (new version)
kubectl set image deployment/mcp-server-green \
mcp-server=$IMAGE \
-n $NAMESPACE
# Step 3: Wait for green to be ready
kubectl rollout status deployment/mcp-server-green \
-n $NAMESPACE \
--timeout=5m
# Step 4: Run health checks on green
echo "Running health checks on green..."
GREEN_POD=$(kubectl get pod -l deployment=mcp-server-green -n $NAMESPACE -o jsonpath='{.items[0].metadata.name}')
kubectl exec $GREEN_POD -n $NAMESPACE -- curl -f http://localhost:8000/health
# Step 5: Switch traffic from blue to green
kubectl patch service $SERVICE_NAME \
-n $NAMESPACE \
-p '{"spec":{"selector":{"deployment":"mcp-server-green"}}}'
echo "Switched traffic to green deployment"
# Step 6: Monitor for issues (5 minutes)
sleep 300
# Step 7: If successful, update blue and mark as primary
kubectl set image deployment/mcp-server-blue \
mcp-server=$IMAGE \
-n $NAMESPACE
echo "Blue-green deployment complete"
bashScaling Strategy
Horizontal scaling based on metrics:
# Kubernetes HPA configuration
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-server-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
minReplicas: 3 # Minimum availability
maxReplicas: 20 # Maximum cost control
metrics:
# Scale on CPU utilization
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70 # Scale up at 70% CPU
# Scale on memory utilization
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80 # Scale up at 80% memory
# Scale on custom metric (tool call rate)
- type: Pods
pods:
metric:
name: mcp_tool_calls_per_second
target:
type: AverageValue
averageValue: "100" # 100 calls/sec per pod
behavior:
# Fast scale up
scaleUp:
stabilizationWindowSeconds: 0 # Immediate
policies:
- type: Percent
value: 100 # Double the pods
periodSeconds: 30
# Slow scale down (prevent thrashing)
scaleDown:
stabilizationWindowSeconds: 300 # Wait 5 minutes
policies:
- type: Percent
value: 50 # Reduce by 50%
periodSeconds: 60
yamlMonitoring Dashboards
Grafana dashboard queries:
# Tool call volume (last hour)
sum(rate(mcp_tool_calls_total[1m]))
# Error rate by tool
sum(rate(mcp_tool_errors_total[5m])) by (tool_name) / sum(rate(mcp_tool_calls_total[5m])) by (tool_name)
# Tool latency (p95)
histogram_quantile(0.95, rate(mcp_tool_call_duration_seconds_bucket[5m])) by (tool_name)
# Cache hit rate
sum(rate(mcp_cache_hits_total[5m])) / (sum(rate(mcp_cache_hits_total[5m])) + sum(rate(mcp_cache_misses_total[5m])))
# Database connections
mcp_db_connections_active
# Request latency
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))
12: Advanced Use Cases and Patterns
Real-time Data Synchronization
For MCP tools that depend on constantly-changing data:
class RealtimeDataSync:
"""Keep local cache in sync with external data source"""
def __init__(self, redis_client):
self.redis = redis_client
self.last_sync = None
async def sync_customer_data(self):
"""
Sync customer data from external service periodically
Run every 60 seconds
"""
# Get last sync timestamp
last_sync = await self.redis.get("customer_sync:last")
# Fetch incremental changes since last sync
if last_sync:
changes = await self._fetch_changes(last_sync)
else:
changes = await self._fetch_all_customers()
# Update local cache
for customer_id, customer_data in changes.items():
cache_key = f"customer:{customer_id}"
await self.redis.setex(cache_key, 3600, json.dumps(customer_data))
# Update sync timestamp
await self.redis.set("customer_sync:last", time.time())
pythonTool Composition and Workflows
Running complex workflows using multiple tools:
class OrderWorkflow:
"""Multi-tool workflow for order processing"""
async def process_order(self, order_details: dict):
"""
Complex workflow:
1. Validate customer exists
2. Check inventory
3. Process payment
4. Create order
5. Send confirmation
"""
customer_id = order_details["customer_id"]
items = order_details["items"]
# Step 1: Validate customer
customer = await self.get_customer(customer_id)
if not customer:
raise ValueError(f"Customer {customer_id} not found")
# Step 2: Check inventory for all items
for item in items:
available = await self.check_inventory(
item["product_id"],
item["quantity"]
)
if not available:
raise ValueError(f"Insufficient inventory for {item['product_id']}")
# Step 3: Process payment
payment_result = await self.process_payment(
customer_id,
self._calculate_total(items)
)
if not payment_result["success"]:
raise ValueError(f"Payment failed: {payment_result['error']}")
# Step 4: Create order
order = await self.create_order(customer_id, items)
# Step 5: Send confirmation (fire-and-forget)
asyncio.create_task(self.send_confirmation(customer["email"], order))
return order
pythonFederated MCP Servers
When you need multiple MCP servers talking to each other:
class FederatedMCPClient:
"""Call tools in other MCP servers"""
def __init__(self):
self.servers = {
"customers": "http://mcp-customers:8000",
"orders": "http://mcp-orders:8000",
"payments": "http://mcp-payments:8000"
}
async def call_remote_tool(self, server: str, tool: str, arguments: dict):
"""Call tool in remote MCP server"""
url = self.servers.get(server)
if not url:
raise ValueError(f"Unknown server: {server}")
async with httpx.AsyncClient() as client:
response = await client.post(
f"{url}/mcp",
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool,
"arguments": arguments
}
},
timeout=5.0
)
if response.status_code != 200:
raise Exception(f"Server error: {response.text}")
data = response.json()
if "error" in data:
raise Exception(f"Tool error: {data['error']}")
return data["result"]
async def create_order_federated(self, customer_id: str, items: list):
"""Create order using federated services"""
# Call customer service
customer = await self.call_remote_tool(
"customers", "get_customer",
{"customer_id": customer_id}
)
# Call payment service
payment = await self.call_remote_tool(
"payments", "process_payment",
{"customer_id": customer_id, "amount": 99.99}
)
# Call order service
order = await self.call_remote_tool(
"orders", "create_order",
{"customer_id": customer_id, "items": items}
)
return order
python13: Troubleshooting and Debugging
Common issues and how to resolve them.
Claude Timeouts
Problem: Tool calls timeout after 5 seconds
Causes:
- Database query too slow
- External API call hanging
- Serialization taking too long
Solutions:
# Add timeout monitoring
@app.post("/mcp")
async def handle_mcp(data: dict):
start = time.time()
# Process request
result = await process_tool_call(data)
duration = time.time() - start
# Warn if close to timeout
if duration > 4.0:
logger.warning(f"Tool call took {duration:.2f}s, close to 5s limit")
metrics.tool_call_duration_warning.inc()
return result
# Add slow query logs
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
pythonHigh Memory Usage
Problem: MCP server memory grows over time
Causes:
- Memory leak in tool implementation
- Cache not expiring
- Large objects not being garbage collected
Solutions:
# Monitor memory usage
import psutil
import os
async def monitor_memory():
"""Log memory usage periodically"""
process = psutil.Process(os.getpid())
while True:
memory_info = process.memory_info()
memory_percent = process.memory_percent()
logger.info(f"Memory: {memory_info.rss / 1024 / 1024:.1f} MB ({memory_percent:.1f}%)")
if memory_percent > 80:
logger.warning("High memory usage detected")
# Trigger garbage collection
import gc
gc.collect()
await asyncio.sleep(300) # Every 5 minutes
# Implement cache expiration
await redis_client.expire(cache_key, 3600) # Always set TTL
pythonDatabase Connection Issues
Problem: "Connection pool exhausted" errors
Causes:
- Connections not being returned to pool
- Long-running transactions
- Too many concurrent requests
Solutions:
# Use context managers for connections
async def get_customer(customer_id: str):
# WRONG: Connection held for entire function
# async with db.get_connection() as conn: ...
# RIGHT: Connection released as soon as query completes
async with db_session() as session:
return await session.get(Customer, customer_id)
# Connection returned here
# Add connection pool monitoring
async def monitor_connection_pool():
"""Monitor database connection pool"""
while True:
pool_state = await db.get_pool_state()
logger.info(
f"DB Pool: {pool_state['active']} active, "
f"{pool_state['idle']} idle, "
f"{pool_state['queued']} queued"
)
if pool_state['queued'] > 0:
logger.warning("Requests queued for database connection")
await asyncio.sleep(30)
pythonConclusion: Production MCP Servers at Scale
Building production MCP servers requires:
- Architecture: Choose monolithic, federated, or gateway pattern based on scale
- Development: Use asyncio, proper error handling, input validation
- Testing: Unit, integration, and load tests catch problems early
- Deployment: Docker, Kubernetes, blue-green deployments minimize downtime
- Observability: Prometheus metrics, structured logging, distributed tracing
- Resilience: Circuit breakers, retries, fallbacks, graceful degradation
- Security: Authentication, authorization, rate limiting, input validation
- Performance: Query optimization, caching, connection pooling, batch operations
- Operations: Monitoring dashboards, alerting, runbooks, regular drills
The key to production success is understanding your constraints (Claude's 5-second timeout) and designing for failure (assuming things will go wrong and preparing accordingly).
Start simple, monitor carefully, and evolve your architecture as you scale.
Appendix: Complete Example Project
A reference implementation is available at: https://github.com/afrankenstine/mcp-production-server
Key files:
src/: Source codetests/: Comprehensive test suitedeploy/: Kubernetes and Docker filesscripts/: Operational scriptsdocs/: Runbooks and guides
References and Further Reading
Official Documentation:
Best Practices:
Tools and Libraries: