โญ Featured ArticleAI & InfrastructureAdvanced

Detailed Guide to Building MCP Server in Production: Complete Technical Deep Dive

A comprehensive technical guide to designing, building, testing, deploying, and operating MCP (Model Context Protocol) servers in production environments. Covers architecture design, security hardening, performance optimization, observability, disaster recovery, and real-world patterns for integrating AI capabilities with existing business systems. Includes complete code examples, deployment strategies, and lessons from production deployments.

Yogesh Bhandari๐Ÿ“… December 5, 2025โฑ๏ธ 140 min read
Code ExamplesImplementation Guide

Tech Stack:

Python 3.11+FastAPIasyncioPostgreSQLRedisDockerKubernetesPrometheusGrafanaClaude APIModel Context ProtocolgRPCProtocol BuffersPydanticSQLAlchemyAlembicpytestlocustAWSGCPTerraform
#MCP#Model Context Protocol#LLM Integration#Production Systems#Server Architecture#Claude Integration#AI Infrastructure#API Design#System Reliability#Microservices#Software Architecture#Enterprise AI#Protocol Design#Rate Limiting#Error Handling#Observability#Security#DevOps#Cloud Infrastructure#Python Development

Introduction: The Era of AI-Native Infrastructure

The release of the Model Context Protocol (MCP) marks a fundamental shift in how applications integrate with language models. Unlike earlier approaches that treated AI as a simple API call, MCP enables bidirectional communication between LLMs and application infrastructure, creating a new class of AI-native systems.

What is MCP and Why It Matters

The Model Context Protocol is an open-source standard developed by Anthropic that defines how AI models (particularly Claude) interact with external tools, data sources, and services. It's a protocol layer between your application and language models.

Traditional LLM integration:

Application
    โ†“
Prompt crafting
    โ†“
LLM API (OpenAI/Anthropic)
    โ†“
Parse response
    โ†“
Action/Response

MCP-based integration:

Application
    โ†“
MCP Server (Your service)
    โ”œโ”€ Tools (function definitions)
    โ”œโ”€ Resources (data sources)
    โ”œโ”€ Prompts (system instructions)
    โ””โ”€ State (persistent context)
    โ†“
LLM (Claude)
    โ†“
Requests tools/resources from MCP Server
    โ†“
MCP Server executes, returns results
    โ†“
Claude reasons over results
    โ†“
Provides intelligent response

Why this matters for production systems:

  1. Structured integration: Define exactly what tools and data LLMs can access
  2. Security and governance: Control which operations LLMs can perform
  3. Efficiency: LLMs use tools intelligently instead of making wild API guesses
  4. Auditability: Every tool call is logged and traceable
  5. Scalability: One MCP server can serve multiple AI applications

The Challenge: MCP in Production

While MCP is powerful, running production MCP servers presents unique challenges:

  • Reliability: MCP calls from Claude need <5 second response times (or requests timeout)
  • Concurrency: Handling hundreds of simultaneous tool calls from different Claude sessions
  • Data consistency: Ensuring tool results reflect current state when Claude makes multiple calls
  • Security: Preventing models from accessing unauthorized resources or performing unintended actions
  • Cost: Optimizing token usage and API calls to minimize Claude API costs
  • Observability: Understanding what Claude is trying to do when things go wrong
  • Versioning: Managing protocol and API changes without breaking production Claude deployments

This article covers all aspects of building production MCP servers: architecture, development, testing, deployment, and operations.


1: Understanding MCP Architecture

Before building, you must understand MCP at a fundamental level.

Core MCP Concepts

Tools

A tool is a function that Claude can call to perform actions. Unlike function calling in other APIs, MCP tools are strongly typed with full schema definitions.

# Example: A tool that fetches customer information
{
    "name": "get_customer",
    "description": "Retrieve customer details by ID",
    "inputSchema": {
        "type": "object",
        "properties": {
            "customer_id": {
                "type": "string",
                "description": "The unique customer identifier"
            },
            "include_orders": {
                "type": "boolean",
                "description": "Whether to include order history",
                "default": false
            }
        },
        "required": ["customer_id"]
    }
}
python

Why strong typing matters:

  • Claude understands exactly what parameters are needed
  • No ambiguity in what the tool does
  • Easier validation and error handling
  • Better documentation for Claude

Resources

A resource is data that Claude can request. Unlike tools (which execute actions), resources are read-only data sources that Claude queries for context.

# Example: A resource that provides customer database schema
{
    "uri": "database://customers/schema",
    "name": "customers_schema",
    "description": "Schema of the customers table",
    "mimeType": "text/plain",
    "contents": "customers table: id (UUID), name (text), email (email), created_at (timestamp), tier (enum: free|pro|enterprise)"
}
python

Resources vs Tools:

AspectToolsResources
PurposeExecute actions, modify stateProvide context, read-only data
InvocationExplicit function callRequested when needed
Return typeStructured resultText, HTML, or binary
IdempotencyNot requiredShould be idempotent
PerformanceCan be slow (user waits)Should be fast (blocks Claude thinking)

Prompts

A prompt is a reusable instruction template that Claude can invoke. Useful for standardized workflows or complex reasoning patterns.

{
    "name": "analyze_customer_churn",
    "description": "Analyze why a customer might be churning",
    "arguments": [
        {
            "name": "customer_id",
            "description": "Customer to analyze",
            "required": true
        }
    ]
}
python

When Claude invokes this prompt, your MCP server returns a full system prompt with context pre-loaded:

You are an expert customer success analyst. You have been given access to the following customer data:

[Customer details fetched by MCP server]
[Order history]
[Support tickets]
[Product usage metrics]

Your task: Analyze this customer's data and provide 3 specific, actionable recommendations to prevent churn.

MCP Protocol Flow

Understanding the protocol flow is essential for debugging and optimization.

Complete MCP interaction sequence:

1. Client Initialization
   โ”œโ”€ Client sends: {"jsonrpc": "2.0", "method": "initialize", ...}
   โ””โ”€ Server responds: {"jsonrpc": "2.0", "result": {"serverInfo": {...}}}

2. List Available Capabilities
   โ”œโ”€ Client: {"method": "resources/list"}
   โ”œโ”€ Server: [{"uri": "...", "name": "...", ...}]
   โ”œโ”€ Client: {"method": "tools/list"}
   โ””โ”€ Server: [{"name": "...", "inputSchema": {...}}]

3. Claude Requests Data/Invokes Tool
   โ”œโ”€ Client (Claude): {"method": "resources/read", "params": {"uri": "..."}}
   โ”œโ”€ OR: {"method": "tools/call", "params": {"name": "...", "arguments": {...}}}
   โ””โ”€ Server processes request

4. MCP Server Responds
   โ”œโ”€ For resources: {"contents": [...], "mimeType": "text/plain"}
   โ””โ”€ For tools: {"content": [{"type": "text", "text": "Result..."}]}

5. Repeat steps 3-4 as Claude reasons
   โ””โ”€ Claude may call multiple tools sequentially

6. Claude Generates Response
   โ””โ”€ Client: {"method": "completion", "params": {...}}

Timing considerations:

Total time available for Claude to complete request: ~60 seconds
โ”œโ”€ Time spent thinking: variable
โ”œโ”€ Time spent calling MCP tools: must be < 5 seconds per call
โ”œโ”€ Time spent processing results: variable
โ””โ”€ If any MCP call exceeds 5s, Claude timeout

MCP Architecture Patterns

Different patterns suit different use cases.

Pattern 1: Monolithic MCP Server

All tools, resources, and prompts in a single service.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚         MCP Server (Single Process)     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                         โ”‚
โ”‚  โ”œโ”€ Customer tools                     โ”‚
โ”‚  โ”œโ”€ Order tools                        โ”‚
โ”‚  โ”œโ”€ Product tools                      โ”‚
โ”‚  โ”œโ”€ Analytics tools                    โ”‚
โ”‚  โ””โ”€ Payment tools                      โ”‚
โ”‚                                         โ”‚
โ”‚  โ”œโ”€ Customer resources                 โ”‚
โ”‚  โ”œโ”€ Product resources                  โ”‚
โ”‚  โ””โ”€ Schema resources                   โ”‚
โ”‚                                         โ”‚
โ”‚  โ”œโ”€ Database connection pool           โ”‚
โ”‚  โ”œโ”€ Cache layer (Redis)                โ”‚
โ”‚  โ””โ”€ Service dependencies               โ”‚
โ”‚                                         โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ†‘
    Claude API

When to use:

  • Small to medium applications (<20 tools)
  • Co-located team (single service owner)
  • Simple dependencies (one database, one cache)
  • <100 concurrent Claude sessions

Trade-offs:

  • โœ… Simple deployment
  • โœ… Easy debugging
  • โŒ Single point of failure
  • โŒ Hard to scale specific tools

Pattern 2: Federated MCP Servers

Multiple specialized MCP servers, each handling a domain.

         Claude API
            โ†“
    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    โ†“               โ†“           โ†“           โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Customer  โ”‚ โ”‚   Order    โ”‚ โ”‚  Product   โ”‚ โ”‚  Payment   โ”‚
โ”‚ MCP Server โ”‚ โ”‚ MCP Server โ”‚ โ”‚ MCP Server โ”‚ โ”‚ MCP Server โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
    โ†“               โ†“           โ†“           โ†“
 [Customer DB] [Order DB]   [Product DB] [Payment API]

When to use:

  • Large applications (50+ tools)
  • Multiple team ownership
  • Different scaling requirements per domain
  • >1000 concurrent Claude sessions

Trade-offs:

  • โœ… Independent scaling
  • โœ… Clear team boundaries
  • โœ… Easy to deploy changes per domain
  • โŒ Complex inter-server communication
  • โŒ Distributed tracing complexity

Pattern 3: Proxy/Gateway MCP Server

Single MCP server that proxies to backend services.

         Claude API
            โ†“
    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    โ”‚  MCP Gateway Server   โ”‚
    โ”‚  (Proxy + Dispatcher) โ”‚
    โ””โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜
      โ†“         โ†“      โ†“
   [Service 1] [Service 2] [Service 3]

When to use:

  • Existing microservices architecture
  • Need to gradually migrate to MCP
  • Want centralized tool authorization
  • Legacy systems integration

Trade-offs:

  • โœ… Minimal changes to existing services
  • โœ… Centralized security/auth
  • โŒ Extra network hop
  • โŒ Gateway becomes bottleneck

Comparison: Which Pattern for Production?

RequirementMonolithicFederatedGateway
Simplicityโœ…โœ…โœ…โŒโœ…โœ…
ScalabilityโŒโœ…โœ…โœ…โœ…
Observabilityโœ…โœ…โš ๏ธโœ…โœ…
Team autonomyโŒโœ…โœ…โœ…โš ๏ธ
Deployment speedโœ…โœ…โœ…โš ๏ธโœ…
Failure isolationโŒโœ…โœ…โœ…โš ๏ธ
Development complexityโœ…โŒโŒโš ๏ธ

Recommendation for production: Start with monolithic (simplicity, fast time-to-value), migrate to federated at scale (>50 tools, multiple teams).


2: Building Your First MCP Server

Now let's build a production-grade MCP server from scratch.

Project Setup and Dependencies

Create project structure:

mkdir mcp-production-server && cd mcp-production-server

# Python project structure
mkdir -p src/{mcp,tools,resources,database,config}
mkdir -p tests/{unit,integration,load}
mkdir -p scripts
mkdir -p deploy/{docker,kubernetes,terraform}

# Files
touch requirements.txt setup.py Dockerfile docker-compose.yml
touch pytest.ini .env.example
touch README.md DEVELOPMENT.md

# Git
git init
echo "venv/" > .gitignore
echo "*.pyc" >> .gitignore
echo ".env" >> .gitignore
bash

Core dependencies (requirements.txt):

# MCP
mcp==0.1.0

# Web framework
fastapi==0.104.1
uvicorn[standard]==0.24.0
httpx==0.25.2

# Database
sqlalchemy==2.0.23
alembic==1.12.1
psycopg2-binary==2.9.9

# Caching
redis==5.0.1

# Data validation
pydantic==2.5.0
pydantic-settings==2.1.0

# Async
anyio==3.7.1
asyncio-contextmanager==1.0.0

# Logging/Observability
python-json-logger==2.0.7
prometheus-client==0.19.0

# Testing
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-mock==3.12.0
httpx[testing]==0.25.2

# Load testing
locust==2.17.0

# Utils
python-dotenv==1.0.0
tenacity==8.2.3

Python version and virtual environment:

# Require Python 3.11+
python3.11 -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

pip install --upgrade pip
pip install -r requirements.txt
bash

Basic MCP Server Implementation

Create the main server file (src/mcp/server.py):

from mcp.server import Server
from mcp.server.models import InitializationOptions
from pydantic import BaseModel
import logging
from typing import Any

logger = logging.getLogger(__name__)

class MCPServer:
    """Production-grade MCP Server implementation"""
    
    def __init__(self, name: str = "ProductionMCP", version: str = "1.0.0"):
        self.name = name
        self.version = version
        self.server = Server(name)
        
        # Register handlers
        self._setup_handlers()
    
    def _setup_handlers(self):
        """Register all MCP message handlers"""
        
        @self.server.list_tools()
        async def list_tools() -> list[dict]:
            """Return list of available tools"""
            return [
                {
                    "name": "get_customer",
                    "description": "Fetch customer information by ID",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "customer_id": {
                                "type": "string",
                                "description": "Unique customer identifier"
                            }
                        },
                        "required": ["customer_id"]
                    }
                },
                {
                    "name": "create_order",
                    "description": "Create a new order for a customer",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "customer_id": {
                                "type": "string",
                                "description": "Customer placing the order"
                            },
                            "items": {
                                "type": "array",
                                "description": "Items in the order",
                                "items": {
                                    "type": "object",
                                    "properties": {
                                        "product_id": {"type": "string"},
                                        "quantity": {"type": "integer"}
                                    },
                                    "required": ["product_id", "quantity"]
                                }
                            }
                        },
                        "required": ["customer_id", "items"]
                    }
                }
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict) -> list[dict]:
            """Execute a tool and return result"""
            
            try:
                if name == "get_customer":
                    return await self._handle_get_customer(arguments)
                elif name == "create_order":
                    return await self._handle_create_order(arguments)
                else:
                    return [{"type": "text", "text": f"Unknown tool: {name}"}]
            
            except Exception as e:
                logger.error(f"Error calling tool {name}: {str(e)}", exc_info=True)
                return [{
                    "type": "text",
                    "text": f"Error: {str(e)}"
                }]
        
        @self.server.list_resources()
        async def list_resources() -> list[dict]:
            """Return list of available resources"""
            return [
                {
                    "uri": "database://schema/customers",
                    "name": "Customer Schema",
                    "description": "Database schema for customers table",
                    "mimeType": "text/plain"
                }
            ]
        
        @self.server.read_resource()
        async def read_resource(uri: str) -> str:
            """Read resource content"""
            if uri == "database://schema/customers":
                return """customers table schema:
                - id: UUID (primary key)
                - name: VARCHAR(255)
                - email: VARCHAR(255)
                - phone: VARCHAR(20)
                - created_at: TIMESTAMP
                - tier: ENUM(free, pro, enterprise)
                - status: ENUM(active, inactive, suspended)
                """
            else:
                return f"Unknown resource: {uri}"
    
    async def _handle_get_customer(self, arguments: dict) -> list[dict]:
        """Handle get_customer tool call"""
        customer_id = arguments.get("customer_id")
        
        # TODO: Query database
        customer_data = {
            "id": customer_id,
            "name": "John Doe",
            "email": "john@example.com",
            "tier": "pro",
            "orders": 15
        }
        
        return [{
            "type": "text",
            "text": f"Customer: {customer_data}"
        }]
    
    async def _handle_create_order(self, arguments: dict) -> list[dict]:
        """Handle create_order tool call"""
        customer_id = arguments.get("customer_id")
        items = arguments.get("items")
        
        # TODO: Create order in database
        order_id = "ord_123456"
        
        return [{
            "type": "text",
            "text": f"Order {order_id} created with {len(items)} items"
        }]
    
    async def start(self):
        """Start the MCP server"""
        logger.info(f"Starting {self.name} v{self.version}")
        await self.server.arun()
python

Create FastAPI wrapper (src/api/server.py):

from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from mcp.server import Server
import logging
import json
from typing import Any, Dict

app = FastAPI(
    title="MCP Server",
    description="Production-grade Model Context Protocol Server",
    version="1.0.0"
)

logger = logging.getLogger(__name__)

# Initialize MCP server
mcp = Server("ProductionMCP")

@app.on_event("startup")
async def startup():
    """Initialize MCP server on startup"""
    logger.info("MCP server starting up")

@app.on_event("shutdown")
async def shutdown():
    """Cleanup on shutdown"""
    logger.info("MCP server shutting down")

@app.post("/mcp")
async def handle_mcp_message(data: Dict[str, Any]):
    """
    Handle MCP messages
    
    This endpoint receives JSON-RPC 2.0 messages from Claude or MCP clients
    and routes them to appropriate handlers.
    """
    
    try:
        # Validate JSON-RPC format
        if not isinstance(data, dict):
            raise HTTPException(status_code=400, detail="Invalid request format")
        
        jsonrpc = data.get("jsonrpc", "2.0")
        method = data.get("method")
        params = data.get("params", {})
        request_id = data.get("id")
        
        logger.info(f"MCP request: {method}")
        
        # Route to appropriate handler
        if method == "tools/list":
            result = await handle_list_tools()
        elif method == "tools/call":
            result = await handle_call_tool(params)
        elif method == "resources/list":
            result = await handle_list_resources()
        elif method == "resources/read":
            result = await handle_read_resource(params)
        else:
            raise HTTPException(status_code=400, detail=f"Unknown method: {method}")
        
        # Format JSON-RPC response
        response = {
            "jsonrpc": jsonrpc,
            "result": result,
            "id": request_id
        }
        
        return JSONResponse(response)
    
    except Exception as e:
        logger.error(f"Error handling MCP request: {str(e)}", exc_info=True)
        return JSONResponse({
            "jsonrpc": "2.0",
            "error": {
                "code": -32603,
                "message": str(e)
            },
            "id": data.get("id")
        }, status_code=500)

@app.get("/health")
async def health():
    """Health check endpoint"""
    return {"status": "ok", "service": "mcp-server"}

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    # TODO: Return Prometheus metrics
    return {"message": "Metrics endpoint"}

async def handle_list_tools() -> list[dict]:
    """List available tools"""
    return [
        {
            "name": "get_customer",
            "description": "Fetch customer by ID",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "customer_id": {"type": "string"}
                },
                "required": ["customer_id"]
            }
        }
    ]

async def handle_call_tool(params: dict) -> dict:
    """Execute a tool"""
    tool_name = params.get("name")
    arguments = params.get("arguments", {})
    
    if tool_name == "get_customer":
        return {"success": True, "data": {"id": "cust_123"}}
    else:
        raise HTTPException(status_code=400, detail=f"Unknown tool: {tool_name}")

async def handle_list_resources() -> list[dict]:
    """List available resources"""
    return []

async def handle_read_resource(params: dict) -> str:
    """Read a resource"""
    return "resource content"
python

Create entry point (main.py):

import uvicorn
import logging
from src.api.server import app
from src.config import settings

# Setup logging
logging.basicConfig(
    level=settings.LOG_LEVEL,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=settings.PORT,
        workers=settings.WORKERS,
        log_level=settings.LOG_LEVEL.lower()
    )
python

Configuration Management

Create configuration (src/config/init.py):

from pydantic_settings import BaseSettings
from typing import Optional
import os

class Settings(BaseSettings):
    """Application configuration"""
    
    # Server
    APP_NAME: str = "MCP Production Server"
    VERSION: str = "1.0.0"
    PORT: int = 8000
    HOST: str = "0.0.0.0"
    WORKERS: int = 4
    
    # Environment
    ENVIRONMENT: str = "development"  # development, staging, production
    DEBUG: bool = False
    LOG_LEVEL: str = "INFO"
    
    # Database
    DATABASE_URL: str = "postgresql://user:password@localhost/mcp_db"
    DATABASE_POOL_SIZE: int = 20
    DATABASE_MAX_OVERFLOW: int = 10
    DATABASE_POOL_TIMEOUT: int = 30
    
    # Redis
    REDIS_URL: str = "redis://localhost:6379/0"
    REDIS_CACHE_TTL: int = 3600
    
    # Claude API
    CLAUDE_API_KEY: str
    CLAUDE_API_TIMEOUT: int = 60
    
    # Security
    API_KEY: Optional[str] = None
    ALLOWED_ORIGINS: list = ["*"]
    
    # Observability
    PROMETHEUS_ENABLED: bool = True
    JAEGER_ENABLED: bool = False
    JAEGER_AGENT_HOST: str = "localhost"
    JAEGER_AGENT_PORT: int = 6831
    
    # Performance
    REQUEST_TIMEOUT: int = 5
    MAX_CONCURRENT_TOOLS: int = 100
    TOOL_CALL_TIMEOUT: int = 5
    
    # Rate limiting
    RATE_LIMIT_ENABLED: bool = True
    RATE_LIMIT_REQUESTS_PER_MINUTE: int = 1000
    
    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"
        case_sensitive = True

# Load settings
settings = Settings()
python

Create .env.example:

# Server
APP_NAME="MCP Production Server"
PORT=8000
ENVIRONMENT=development
DEBUG=true
LOG_LEVEL=DEBUG

# Database
DATABASE_URL=postgresql://postgres:password@localhost:5432/mcp_db
DATABASE_POOL_SIZE=20

# Redis
REDIS_URL=redis://localhost:6379/0

# Claude API
CLAUDE_API_KEY=sk-ant-xxx

# Security
API_KEY=your-secret-api-key

# Observability
PROMETHEUS_ENABLED=true
bash

Database Integration

Create database layer (src/database/connection.py):

from sqlalchemy import create_engine, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from src.config import settings
import logging

logger = logging.getLogger(__name__)

# Database setup
Base = declarative_base()

# Async engine for async/await
engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DEBUG,
    pool_size=settings.DATABASE_POOL_SIZE,
    max_overflow=settings.DATABASE_MAX_OVERFLOW,
    pool_timeout=settings.DATABASE_POOL_TIMEOUT,
    pool_pre_ping=True,  # Test connections before using
)

# Session factory
AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

async def get_db_session():
    """Get database session (for dependency injection)"""
    async with AsyncSessionLocal() as session:
        yield session

async def init_db():
    """Initialize database (create tables)"""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    logger.info("Database initialized")

async def close_db():
    """Close database connections"""
    await engine.dispose()
    logger.info("Database connections closed")
python

Create models (src/database/models.py):

from sqlalchemy import Column, String, DateTime, Enum, Integer, Float, JSON
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime
import uuid
from src.database.connection import Base

class Customer(Base):
    """Customer model"""
    __tablename__ = "customers"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(255), nullable=False)
    email = Column(String(255), unique=True, nullable=False)
    phone = Column(String(20))
    tier = Column(String(50), default="free")
    status = Column(String(50), default="active")
    metadata = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class Order(Base):
    """Order model"""
    __tablename__ = "orders"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    customer_id = Column(UUID(as_uuid=True), nullable=False)
    status = Column(String(50), default="pending")
    total_amount = Column(Float, nullable=False)
    item_count = Column(Integer, default=0)
    metadata = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class ToolCall(Base):
    """Log of all tool calls (for audit/observability)"""
    __tablename__ = "tool_calls"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    tool_name = Column(String(255), nullable=False)
    arguments = Column(JSON, nullable=False)
    result = Column(JSON)
    error = Column(String(1000))
    duration_ms = Column(Integer)
    claude_session_id = Column(String(255))
    created_at = Column(DateTime, default=datetime.utcnow)
python

3: Advanced MCP Patterns for Production

Error Handling and Resilience

Production MCP servers must handle errors gracefully. Claude has strict timeouts; slow error handling violates them.

Create error handler (src/errors/handler.py):

from enum import Enum
from typing import Optional, Any
import logging

logger = logging.getLogger(__name__)

class ErrorCode(Enum):
    """Standard MCP error codes"""
    TOOL_NOT_FOUND = -32601
    INVALID_PARAMS = -32602
    INTERNAL_ERROR = -32603
    SERVER_ERROR = -32000
    TIMEOUT = -32001
    RESOURCE_NOT_FOUND = -32002
    UNAUTHORIZED = -32003

class MCPError(Exception):
    """Base MCP error"""
    def __init__(self, code: ErrorCode, message: str, data: Optional[Any] = None):
        self.code = code
        self.message = message
        self.data = data
        super().__init__(message)
    
    def to_json_rpc(self):
        """Convert to JSON-RPC error format"""
        return {
            "code": self.code.value,
            "message": self.message,
            "data": self.data
        }

class ToolNotFoundError(MCPError):
    def __init__(self, tool_name: str):
        super().__init__(
            ErrorCode.TOOL_NOT_FOUND,
            f"Tool '{tool_name}' not found"
        )

class InvalidParamsError(MCPError):
    def __init__(self, message: str, details: Optional[str] = None):
        super().__init__(
            ErrorCode.INVALID_PARAMS,
            message,
            {"details": details}
        )

class TimeoutError(MCPError):
    def __init__(self, tool_name: str, timeout_seconds: int):
        super().__init__(
            ErrorCode.TIMEOUT,
            f"Tool '{tool_name}' exceeded {timeout_seconds}s timeout"
        )

class UnauthorizedError(MCPError):
    def __init__(self, resource: str):
        super().__init__(
            ErrorCode.UNAUTHORIZED,
            f"Unauthorized access to {resource}"
        )

class ErrorHandler:
    """Centralized error handling"""
    
    @staticmethod
    def handle_tool_error(tool_name: str, error: Exception) -> dict:
        """Handle tool execution error"""
        
        if isinstance(error, MCPError):
            logger.warning(f"MCP error in {tool_name}: {error.message}")
            return error.to_json_rpc()
        
        elif isinstance(error, ValueError):
            logger.warning(f"Validation error in {tool_name}: {str(error)}")
            return {
                "code": ErrorCode.INVALID_PARAMS.value,
                "message": f"Validation error: {str(error)}"
            }
        
        elif isinstance(error, TimeoutError):
            logger.error(f"Timeout in {tool_name}")
            return {
                "code": ErrorCode.TIMEOUT.value,
                "message": f"Tool execution timeout"
            }
        
        else:
            logger.error(f"Unexpected error in {tool_name}: {str(error)}", exc_info=True)
            return {
                "code": ErrorCode.INTERNAL_ERROR.value,
                "message": "Internal server error",
                "data": {"tool": tool_name} if not settings.DEBUG else {"tool": tool_name, "error": str(error)}
            }
    
    @staticmethod
    def validate_tool_arguments(arguments: dict, schema: dict) -> tuple[bool, Optional[str]]:
        """Validate tool arguments against schema"""
        
        # Check required fields
        required_fields = schema.get("required", [])
        for field in required_fields:
            if field not in arguments:
                return False, f"Missing required field: {field}"
        
        # Check types
        properties = schema.get("properties", {})
        for field, value in arguments.items():
            if field in properties:
                expected_type = properties[field].get("type")
                if expected_type and not ErrorHandler._check_type(value, expected_type):
                    return False, f"Field '{field}' has wrong type"
        
        return True, None
    
    @staticmethod
    def _check_type(value: Any, expected_type: str) -> bool:
        """Check if value matches expected type"""
        type_mapping = {
            "string": str,
            "integer": int,
            "number": (int, float),
            "boolean": bool,
            "array": list,
            "object": dict
        }
        
        expected = type_mapping.get(expected_type)
        return isinstance(value, expected) if expected else True
python

Create middleware for error handling:

from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
from src.errors.handler import ErrorHandler, MCPError

logger = logging.getLogger(__name__)

class ErrorHandlingMiddleware(BaseHTTPMiddleware):
    """Global error handling middleware"""
    
    async def dispatch(self, request: Request, call_next) -> Response:
        """Process request and handle errors"""
        
        start_time = time.time()
        
        try:
            response = await call_next(request)
            
            # Log successful requests
            duration = time.time() - start_time
            logger.info(
                f"{request.method} {request.url.path} - {response.status_code} ({duration:.2f}s)"
            )
            
            return response
        
        except MCPError as e:
            # Known MCP error
            logger.warning(f"MCP error: {e.message}")
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "error": e.to_json_rpc(),
                    "id": None
                },
                status_code=400
            )
        
        except Exception as e:
            # Unexpected error
            logger.error(f"Unhandled error: {str(e)}", exc_info=True)
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "error": {
                        "code": -32603,
                        "message": "Internal server error"
                    },
                    "id": None
                },
                status_code=500
            )
python

Caching for Performance

MCP tools must respond in <5 seconds. Caching frequently-accessed data is crucial.

Create cache layer (src/cache/redis.py):

import redis.asyncio as redis
import json
from typing import Any, Optional
import logging
from src.config import settings

logger = logging.getLogger(__name__)

class RedisCache:
    """Redis-based cache for MCP results"""
    
    def __init__(self):
        self.redis_client: Optional[redis.Redis] = None
        self.ttl = settings.REDIS_CACHE_TTL
    
    async def connect(self):
        """Connect to Redis"""
        try:
            self.redis_client = await redis.from_url(
                settings.REDIS_URL,
                decode_responses=True
            )
            # Test connection
            await self.redis_client.ping()
            logger.info("Connected to Redis")
        except Exception as e:
            logger.error(f"Failed to connect to Redis: {str(e)}")
            self.redis_client = None
    
    async def disconnect(self):
        """Disconnect from Redis"""
        if self.redis_client:
            await self.redis_client.close()
            logger.info("Disconnected from Redis")
    
    async def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        if not self.redis_client:
            return None
        
        try:
            value = await self.redis_client.get(key)
            if value:
                logger.debug(f"Cache hit: {key}")
                return json.loads(value)
        except Exception as e:
            logger.error(f"Cache get error: {str(e)}")
        
        return None
    
    async def set(self, key: str, value: Any, ttl: Optional[int] = None):
        """Set value in cache"""
        if not self.redis_client:
            return
        
        try:
            await self.redis_client.setex(
                key,
                ttl or self.ttl,
                json.dumps(value)
            )
            logger.debug(f"Cache set: {key}")
        except Exception as e:
            logger.error(f"Cache set error: {str(e)}")
    
    async def delete(self, key: str):
        """Delete value from cache"""
        if not self.redis_client:
            return
        
        try:
            await self.redis_client.delete(key)
            logger.debug(f"Cache delete: {key}")
        except Exception as e:
            logger.error(f"Cache delete error: {str(e)}")
    
    async def invalidate_pattern(self, pattern: str):
        """Invalidate all keys matching pattern"""
        if not self.redis_client:
            return
        
        try:
            keys = await self.redis_client.keys(pattern)
            if keys:
                await self.redis_client.delete(*keys)
                logger.debug(f"Cache invalidated {len(keys)} keys matching {pattern}")
        except Exception as e:
            logger.error(f"Cache invalidation error: {str(e)}")

# Global cache instance
cache = RedisCache()

class CacheDecorator:
    """Decorator for caching tool results"""
    
    def __init__(self, ttl: Optional[int] = None):
        self.ttl = ttl
    
    def __call__(self, func):
        async def wrapper(*args, **kwargs):
            # Generate cache key from function name and arguments
            cache_key = f"{func.__name__}:{json.dumps(kwargs, sort_keys=True, default=str)}"
            
            # Try cache
            cached = await cache.get(cache_key)
            if cached is not None:
                return cached
            
            # Execute function
            result = await func(*args, **kwargs)
            
            # Cache result
            await cache.set(cache_key, result, self.ttl)
            
            return result
        
        return wrapper
python

Request/Response Serialization

Claude sends and receives specific formats. Proper serialization is critical.

Create serialization (src/serialization/init.py):

from typing import Any, Dict, List
from datetime import datetime
import json

class MCPSerializer:
    """Serialize/deserialize MCP data"""
    
    @staticmethod
    def serialize_tool_result(result: Any, mime_type: str = "text/plain") -> List[Dict]:
        """
        Serialize tool result to MCP format
        
        MCP requires results in specific format:
        [
            {"type": "text", "text": "..."},
            {"type": "image", "data": "base64_data", "mimeType": "image/png"},
            {"type": "resource", "resource": ...}
        ]
        """
        
        if isinstance(result, str):
            return [{"type": "text", "text": result}]
        
        elif isinstance(result, dict):
            return [{"type": "text", "text": json.dumps(result, indent=2)}]
        
        elif isinstance(result, list):
            return [{"type": "text", "text": json.dumps(result, indent=2)}]
        
        elif isinstance(result, bytes):
            import base64
            return [{
                "type": "image",
                "data": base64.b64encode(result).decode(),
                "mimeType": mime_type
            }]
        
        else:
            return [{"type": "text", "text": str(result)}]
    
    @staticmethod
    def deserialize_tool_arguments(arguments: Dict) -> Dict:
        """Deserialize tool arguments from MCP format"""
        return arguments  # Usually already in correct format
    
    @staticmethod
    def format_error_response(error: str, code: int = -32603) -> Dict:
        """Format error response"""
        return {
            "error": {
                "code": code,
                "message": error
            }
        }
python

Timeout Management

Claude has strict timeout requirements. Implementing proper timeout handling is essential.

Create timeout handler (src/timeout.py):

import asyncio
from functools import wraps
import logging

logger = logging.getLogger(__name__)

class TimeoutExceededError(Exception):
    """Raised when operation exceeds timeout"""
    pass

def async_timeout(seconds: int):
    """Decorator for async function timeout"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=seconds
                )
            except asyncio.TimeoutError:
                tool_name = kwargs.get('tool_name') or 'unknown'
                logger.error(f"Tool {tool_name} exceeded {seconds}s timeout")
                raise TimeoutExceededError(
                    f"Tool execution exceeded {seconds}s timeout"
                )
        return wrapper
    return decorator

class TimeoutManager:
    """Manage timeouts for tool execution"""
    
    TOOL_TIMEOUT = 5  # MCP requirement: <5s for tool calls
    REQUEST_TIMEOUT = 60  # Total request timeout
    
    @staticmethod
    async def execute_with_timeout(
        coro,
        timeout_seconds: int = TOOL_TIMEOUT,
        tool_name: str = "unknown"
    ):
        """Execute coroutine with timeout"""
        try:
            return await asyncio.wait_for(coro, timeout=timeout_seconds)
        except asyncio.TimeoutError:
            logger.error(f"Timeout executing {tool_name}")
            raise TimeoutExceededError(
                f"{tool_name} exceeded {timeout_seconds}s timeout"
            )
python

4: Testing MCP Servers

Production code requires comprehensive testing. Testing MCP servers has unique challenges due to their async nature and Claude integration.

Unit Testing

Create unit tests (tests/unit/test_tools.py):

import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from src.tools.customer import CustomerTool
from src.errors.handler import InvalidParamsError

class TestCustomerTool:
    """Test customer management tool"""
    
    @pytest.fixture
    async def tool(self):
        """Create tool instance"""
        return CustomerTool()
    
    @pytest.mark.asyncio
    async def test_get_customer_success(self, tool):
        """Test successful customer retrieval"""
        
        # Mock database
        with patch.object(tool, 'db', new_callable=AsyncMock) as mock_db:
            mock_db.get_customer.return_value = {
                "id": "cust_123",
                "name": "John Doe",
                "email": "john@example.com"
            }
            
            result = await tool.get_customer("cust_123")
            
            assert result["id"] == "cust_123"
            assert result["name"] == "John Doe"
    
    @pytest.mark.asyncio
    async def test_get_customer_not_found(self, tool):
        """Test customer not found"""
        
        with patch.object(tool, 'db', new_callable=AsyncMock) as mock_db:
            mock_db.get_customer.return_value = None
            
            with pytest.raises(Exception):
                await tool.get_customer("nonexistent")
    
    @pytest.mark.asyncio
    async def test_create_customer_validation(self, tool):
        """Test input validation"""
        
        # Missing required field
        with pytest.raises(InvalidParamsError):
            await tool.create_customer({"name": "John"})  # Missing email
    
    @pytest.mark.asyncio
    async def test_create_customer_success(self, tool):
        """Test successful customer creation"""
        
        with patch.object(tool, 'db', new_callable=AsyncMock) as mock_db:
            mock_db.create_customer.return_value = {
                "id": "cust_new",
                "name": "Jane Doe",
                "email": "jane@example.com"
            }
            
            result = await tool.create_customer({
                "name": "Jane Doe",
                "email": "jane@example.com"
            })
            
            assert result["id"] == "cust_new"
python

Integration Testing

Create integration tests (tests/integration/test_mcp_flow.py):

import pytest
import httpx
from fastapi.testclient import TestClient
from src.api.server import app

class TestMCPFlow:
    """Test complete MCP interaction flow"""
    
    @pytest.fixture
    def client(self):
        """Create test client"""
        return TestClient(app)
    
    def test_list_tools(self, client):
        """Test listing available tools"""
        
        response = client.post("/mcp", json={
            "jsonrpc": "2.0",
            "method": "tools/list",
            "id": 1
        })
        
        assert response.status_code == 200
        data = response.json()
        assert "result" in data
        assert isinstance(data["result"], list)
        assert len(data["result"]) > 0
        
        # Verify tool structure
        tool = data["result"][0]
        assert "name" in tool
        assert "description" in tool
        assert "inputSchema" in tool
    
    def test_call_tool_success(self, client):
        """Test successful tool call"""
        
        response = client.post("/mcp", json={
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": "get_customer",
                "arguments": {"customer_id": "cust_123"}
            },
            "id": 2
        })
        
        assert response.status_code == 200
        data = response.json()
        assert "result" in data
    
    def test_call_unknown_tool(self, client):
        """Test calling unknown tool"""
        
        response = client.post("/mcp", json={
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": "unknown_tool",
                "arguments": {}
            },
            "id": 3
        })
        
        assert response.status_code in [200, 400]
        data = response.json()
        assert "error" in data
    
    def test_list_resources(self, client):
        """Test listing resources"""
        
        response = client.post("/mcp", json={
            "jsonrpc": "2.0",
            "method": "resources/list",
            "id": 4
        })
        
        assert response.status_code == 200
        data = response.json()
        assert "result" in data
        assert isinstance(data["result"], list)
    
    def test_read_resource(self, client):
        """Test reading resource"""
        
        response = client.post("/mcp", json={
            "jsonrpc": "2.0",
            "method": "resources/read",
            "params": {"uri": "database://schema/customers"},
            "id": 5
        })
        
        assert response.status_code == 200
        data = response.json()
        assert "result" in data
python

Load Testing

Create load test (tests/load/locustfile.py):

from locust import HttpUser, task, between
import json
import random

class MCPServerUser(HttpUser):
    """Simulate MCP client load"""
    
    wait_time = between(1, 3)  # Wait 1-3 seconds between requests
    
    @task(3)
    def list_tools(self):
        """List tools (high frequency)"""
        self.client.post("/mcp", json={
            "jsonrpc": "2.0",
            "method": "tools/list",
            "id": 1
        })
    
    @task(2)
    def call_get_customer(self):
        """Call get_customer tool"""
        customer_id = f"cust_{random.randint(1, 1000)}"
        self.client.post("/mcp", json={
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": "get_customer",
                "arguments": {"customer_id": customer_id}
            },
            "id": 2
        })
    
    @task(1)
    def call_create_order(self):
        """Call create_order tool"""
        self.client.post("/mcp", json={
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": "create_order",
                "arguments": {
                    "customer_id": f"cust_{random.randint(1, 1000)}",
                    "items": [
                        {
                            "product_id": f"prod_{random.randint(1, 100)}",
                            "quantity": random.randint(1, 5)
                        }
                    ]
                }
            },
            "id": 3
        })

# Run with: locust -f tests/load/locustfile.py --host=http://localhost:8000
python

5: Production Deployment

Deploying MCP servers to production requires careful planning.

Docker Containerization

Create Dockerfile:

# Build stage
FROM python:3.11-slim as builder

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements
COPY requirements.txt .

# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir -r requirements.txt

# Production stage
FROM python:3.11-slim

WORKDIR /app

# Install runtime dependencies only
RUN apt-get update && apt-get install -y \
    postgresql-client \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Copy virtual environment from builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Copy application code
COPY . .

# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["python", "-m", "uvicorn", "src.api.server:app", "--host", "0.0.0.0", "--port", "8000"]
dockerfile

Create docker-compose.yml for local development:

version: '3.9'

services:
  # MCP Server
  mcp-server:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL: postgresql://postgres:postgres@db:5432/mcp_db
      REDIS_URL: redis://redis:6379/0
      CLAUDE_API_KEY: ${CLAUDE_API_KEY}
      ENVIRONMENT: development
      DEBUG: "true"
    depends_on:
      - db
      - redis
    volumes:
      - .:/app
    command: uvicorn src.api.server:app --host 0.0.0.0 --reload

  # PostgreSQL Database
  db:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: mcp_db
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  # Redis Cache
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  # Prometheus (metrics)
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus

  # Grafana (visualizations)
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  postgres_data:
  prometheus_data:
  grafana_data:
yaml

Kubernetes Deployment

Create Kubernetes manifests (deploy/kubernetes/mcp-server.yaml):

apiVersion: v1
kind: ConfigMap
metadata:
  name: mcp-config
  namespace: production
data:
  LOG_LEVEL: "INFO"
  ENVIRONMENT: "production"

---
apiVersion: v1
kind: Secret
metadata:
  name: mcp-secrets
  namespace: production
type: Opaque
stringData:
  CLAUDE_API_KEY: "sk-ant-xxx"
  DATABASE_URL: "postgresql://user:pass@db:5432/mcp"
  REDIS_URL: "redis://redis:6379/0"
  API_KEY: "secret-api-key"

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-server
  namespace: production
  labels:
    app: mcp-server
spec:
  replicas: 3  # High availability
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0  # Zero-downtime deployment
  
  selector:
    matchLabels:
      app: mcp-server
  
  template:
    metadata:
      labels:
        app: mcp-server
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    
    spec:
      # Anti-affinity for high availability
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchExpressions:
              - key: app
                operator: In
                values:
                - mcp-server
            topologyKey: kubernetes.io/hostname
      
      # Pod disruption budget
      containers:
      - name: mcp-server
        image: mcp-server:v1.0.0
        imagePullPolicy: IfNotPresent
        
        ports:
        - name: http
          containerPort: 8000
          protocol: TCP
        - name: metrics
          containerPort: 9090
          protocol: TCP
        
        # Environment from ConfigMap and Secrets
        envFrom:
        - configMapRef:
            name: mcp-config
        - secretRef:
            name: mcp-secrets
        
        # Resource requests (for scheduling)
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
          limits:
            cpu: "1000m"
            memory: "1Gi"
        
        # Liveness probe (restart if unhealthy)
        livenessProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3
        
        # Readiness probe (remove from load balancer if not ready)
        readinessProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 10
          periodSeconds: 5
          timeoutSeconds: 3
          failureThreshold: 2
        
        # Startup probe (for slow-starting apps)
        startupProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 0
          periodSeconds: 5
          failureThreshold: 30
        
        # Graceful shutdown
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 15"]
        
        terminationGracePeriodSeconds: 30

---
apiVersion: v1
kind: Service
metadata:
  name: mcp-server
  namespace: production
  labels:
    app: mcp-server
spec:
  type: ClusterIP
  selector:
    app: mcp-server
  ports:
  - name: http
    port: 80
    targetPort: 8000
  - name: metrics
    port: 9090
    targetPort: 9090

---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: mcp-server-pdb
  namespace: production
spec:
  minAvailable: 1
  selector:
    matchLabels:
      app: mcp-server

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-server-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-server
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
      - type: Percent
        value: 100
        periodSeconds: 30
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
yaml

6: Observability and Monitoring

Production MCP servers need comprehensive observability.

Metrics and Prometheus

Create metrics exporter (src/metrics.py):

from prometheus_client import Counter, Histogram, Gauge
import time

# Request metrics
tool_calls_total = Counter(
    'mcp_tool_calls_total',
    'Total tool calls',
    ['tool_name', 'status']
)

tool_call_duration_seconds = Histogram(
    'mcp_tool_call_duration_seconds',
    'Tool call duration in seconds',
    ['tool_name'],
    buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0)
)

cache_hits = Counter(
    'mcp_cache_hits_total',
    'Total cache hits',
    ['cache_type']
)

cache_misses = Counter(
    'mcp_cache_misses_total',
    'Total cache misses',
    ['cache_type']
)

# Error metrics
tool_errors = Counter(
    'mcp_tool_errors_total',
    'Total tool errors',
    ['tool_name', 'error_type']
)

timeouts = Counter(
    'mcp_timeouts_total',
    'Total tool timeouts',
    ['tool_name']
)

# Database metrics
db_connections_active = Gauge(
    'mcp_db_connections_active',
    'Active database connections'
)

db_query_duration_seconds = Histogram(
    'mcp_db_query_duration_seconds',
    'Database query duration',
    ['operation'],
    buckets=(0.001, 0.01, 0.05, 0.1, 0.5, 1.0)
)

# Queue metrics
redis_operations = Counter(
    'mcp_redis_operations_total',
    'Total Redis operations',
    ['operation', 'status']
)

redis_latency_seconds = Histogram(
    'mcp_redis_latency_seconds',
    'Redis operation latency',
    ['operation'],
    buckets=(0.001, 0.005, 0.01, 0.05, 0.1)
)

class MetricsMiddleware:
    """Middleware to collect metrics"""
    
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        """Record metrics for request"""
        
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        
        start_time = time.time()
        
        # Wrap send to capture response
        async def send_with_metrics(message):
            if message["type"] == "http.response.start":
                # Record request duration
                duration = time.time() - start_time
                status_code = message["status"]
            
            await send(message)
        
        await self.app(scope, receive, send_with_metrics)

# Endpoint to expose metrics
from fastapi import APIRouter
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST

router = APIRouter()

@router.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    from fastapi import Response
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
python

Prometheus configuration (prometheus.yml):

global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    service: 'mcp-server'

scrape_configs:
  - job_name: 'mcp-server'
    static_configs:
      - targets: ['localhost:8000']
    metrics_path: '/metrics'

rule_files:
  - 'alert-rules.yml'

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['localhost:9093']
yaml

Alert rules (alert-rules.yml):

groups:
  - name: mcp-alerts
    interval: 30s
    rules:
      
      # High error rate
      - alert: HighToolErrorRate
        expr: |
          (
            sum(rate(mcp_tool_errors_total[5m])) by (tool_name)
            /
            sum(rate(mcp_tool_calls_total[5m])) by (tool_name)
          ) > 0.05
        for: 5m
        severity: critical
        annotations:
          summary: "Tool {{ $labels.tool_name }} has high error rate"
          
      # Tool timeout
      - alert: ToolTimeout
        expr: rate(mcp_timeouts_total[5m]) > 0
        for: 1m
        severity: warning
        annotations:
          summary: "Tool {{ $labels.tool_name }} is timing out"
      
      # Slow tool
      - alert: SlowTool
        expr: |
          histogram_quantile(0.95, rate(mcp_tool_call_duration_seconds_bucket[5m])) > 2.0
        for: 5m
        severity: warning
        annotations:
          summary: "Tool {{ $labels.tool_name }} p95 latency > 2s"
      
      # Cache miss rate
      - alert: HighCacheMissRate
        expr: |
          (
            sum(rate(mcp_cache_misses_total[5m]))
            /
            (sum(rate(mcp_cache_hits_total[5m])) + sum(rate(mcp_cache_misses_total[5m])))
          ) > 0.8
        for: 10m
        severity: warning
        annotations:
          summary: "Cache miss rate > 80%"
      
      # Database connection issues
      - alert: DatabaseConnectionPoolAlmostFull
        expr: mcp_db_connections_active > 18  # Out of 20
        for: 5m
        severity: critical
        annotations:
          summary: "Database connection pool almost full"
yaml

Logging and Tracing

Structured logging (src/logging_config.py):

import logging
import json
import sys
from pythonjsonlogger import jsonlogger
from src.config import settings

def setup_logging():
    """Configure structured JSON logging"""
    
    # Root logger
    logger = logging.getLogger()
    logger.setLevel(settings.LOG_LEVEL)
    
    # JSON formatter
    handler = logging.StreamHandler(sys.stdout)
    formatter = jsonlogger.JsonFormatter(
        fmt='%(timestamp)s %(level)s %(name)s %(message)s %(request_id)s %(tool_name)s %(duration_ms)s',
        timestamp=True
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    return logger

# Create logger
logger = setup_logging()

class StructuredLogger:
    """Wrapper for structured logging"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
    
    def tool_call(self, tool_name: str, arguments: dict, duration_ms: float, error: str = None):
        """Log tool call"""
        self.logger.info(
            "Tool called",
            extra={
                "tool_name": tool_name,
                "arguments": json.dumps(arguments),
                "duration_ms": duration_ms,
                "error": error
            }
        )
    
    def database_query(self, operation: str, duration_ms: float, rows_affected: int = None):
        """Log database operation"""
        self.logger.debug(
            "Database query",
            extra={
                "operation": operation,
                "duration_ms": duration_ms,
                "rows_affected": rows_affected
            }
        )
python

7: Security Hardening

MCP servers handle sensitive operations. Security is paramount.

Authentication and Authorization

Create auth layer (src/security/auth.py):

from fastapi import HTTPException, Depends, Header
from typing import Optional
import logging
from src.config import settings

logger = logging.getLogger(__name__)

class AuthManager:
    """Handle API authentication and authorization"""
    
    @staticmethod
    def verify_api_key(x_api_key: Optional[str] = Header(None)) -> str:
        """Verify API key from header"""
        
        if not settings.API_KEY:
            # API key not configured, skip validation
            return "anonymous"
        
        if not x_api_key:
            logger.warning("Missing API key")
            raise HTTPException(status_code=401, detail="Missing API key")
        
        if x_api_key != settings.API_KEY:
            logger.warning(f"Invalid API key: {x_api_key[:5]}...")
            raise HTTPException(status_code=403, detail="Invalid API key")
        
        return "authenticated"
    
    @staticmethod
    def verify_claude_origin(origin: Optional[str] = Header(None)) -> bool:
        """Verify request comes from Claude"""
        
        # TODO: Implement proper Claude request verification
        # This would involve verifying JWT tokens or signatures
        return True
    
    @staticmethod
    def require_auth():
        """Dependency for requiring authentication"""
        def verify(auth_status: str = Depends(AuthManager.verify_api_key)):
            if auth_status == "anonymous":
                raise HTTPException(status_code=401, detail="Authentication required")
        return verify

class ToolAuthorization:
    """Authorization for specific tools"""
    
    TOOL_PERMISSIONS = {
        "get_customer": ["read:customers"],
        "create_order": ["write:orders"],
        "delete_order": ["admin", "delete:orders"],
        "create_customer": ["admin", "write:customers"],
    }
    
    @staticmethod
    def can_access_tool(tool_name: str, user_roles: list) -> bool:
        """Check if user can access tool"""
        
        required_roles = ToolAuthorization.TOOL_PERMISSIONS.get(tool_name, [])
        
        # If no restrictions, allow access
        if not required_roles:
            return True
        
        # Check if user has any required role
        return any(role in user_roles for role in required_roles)
python

Input Validation and Sanitization

Create validator (src/security/validation.py):

from pydantic import BaseModel, field_validator
import re

class CustomerToolInput(BaseModel):
    """Validated input for customer tool"""
    customer_id: str
    
    @field_validator('customer_id')
    @classmethod
    def validate_customer_id(cls, v):
        """Validate customer ID format"""
        if not re.match(r'^cust_[a-z0-9]{20}$', v):
            raise ValueError('Invalid customer ID format')
        return v

class OrderToolInput(BaseModel):
    """Validated input for order tool"""
    customer_id: str
    items: list
    
    @field_validator('customer_id')
    @classmethod
    def validate_customer_id(cls, v):
        if not re.match(r'^cust_[a-z0-9]{20}$', v):
            raise ValueError('Invalid customer ID format')
        return v
    
    @field_validator('items')
    @classmethod
    def validate_items(cls, v):
        if not isinstance(v, list) or len(v) == 0:
            raise ValueError('Items list cannot be empty')
        
        for item in v:
            if not isinstance(item, dict):
                raise ValueError('Item must be object')
            if 'product_id' not in item or 'quantity' not in item:
                raise ValueError('Item missing required fields')
            if not isinstance(item['quantity'], int) or item['quantity'] < 1:
                raise ValueError('Quantity must be positive integer')
        
        return v

class Sanitizer:
    """Sanitize tool arguments"""
    
    @staticmethod
    def sanitize_string(value: str, max_length: int = 1000) -> str:
        """Remove potentially harmful characters"""
        
        # Truncate if too long
        value = value[:max_length]
        
        # Remove null bytes
        value = value.replace('\x00', '')
        
        # Remove control characters
        value = ''.join(ch for ch in value if ord(ch) >= 32)
        
        return value
    
    @staticmethod
    def sanitize_sql(value: str) -> str:
        """Additional SQL injection protection"""
        
        # This is basic; use ORM for real protection
        dangerous_keywords = ['DROP', 'DELETE', 'TRUNCATE', 'INSERT', 'UPDATE']
        
        for keyword in dangerous_keywords:
            if keyword in value.upper():
                raise ValueError(f"Potentially dangerous SQL: {keyword}")
        
        return value
python

Rate Limiting

Create rate limiter (src/security/rate_limit.py):

from fastapi import HTTPException
from redis.asyncio import Redis
import time
from src.config import settings

class RateLimiter:
    """Rate limiting for MCP server"""
    
    def __init__(self, redis_client: Redis):
        self.redis = redis_client
        self.requests_per_minute = settings.RATE_LIMIT_REQUESTS_PER_MINUTE
    
    async def check_rate_limit(self, client_id: str) -> bool:
        """Check if client has exceeded rate limit"""
        
        if not settings.RATE_LIMIT_ENABLED:
            return True
        
        key = f"rate_limit:{client_id}"
        current = await self.redis.incr(key)
        
        # Set expiration on first request
        if current == 1:
            await self.redis.expire(key, 60)
        
        if current > self.requests_per_minute:
            raise HTTPException(
                status_code=429,
                detail=f"Rate limit exceeded: {self.requests_per_minute}/minute"
            )
        
        return True
    
    async def get_remaining(self, client_id: str) -> int:
        """Get remaining requests for client"""
        
        key = f"rate_limit:{client_id}"
        current = await self.redis.get(key)
        current_count = int(current) if current else 0
        
        return max(0, self.requests_per_minute - current_count)
python

8: Advanced Production Patterns

Circuit Breaking and Fallbacks

Create circuit breaker (src/resilience/circuit_breaker.py):

from enum import Enum
import asyncio
import time
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"  # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    """Circuit breaker for external service calls"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout_seconds: int = 60,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.timeout_seconds = timeout_seconds
        self.success_threshold = success_threshold
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""
        
        if self.state == CircuitState.OPEN:
            # Check if timeout expired
            if time.time() - self.last_failure_time > self.timeout_seconds:
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            
            # Success
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.success_threshold:
                    # Circuit recovered
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
            else:
                self.failure_count = 0
            
            return result
        
        except Exception as e:
            # Failure
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise
python

Retry Logic with Exponential Backoff

Create retry handler (src/resilience/retry.py):

import asyncio
import logging
from typing import Callable, Any, Optional
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_log,
    after_log
)

logger = logging.getLogger(__name__)

def async_retry_with_backoff(
    max_attempts: int = 3,
    base_wait: float = 0.1,
    max_wait: float = 10.0,
    exceptions: tuple = (Exception,)
):
    """Decorator for async retry with exponential backoff"""
    
    return retry(
        stop=stop_after_attempt(max_attempts),
        wait=wait_exponential(multiplier=1, min=base_wait, max=max_wait),
        retry=retry_if_exception_type(exceptions),
        before=before_log(logger, logging.DEBUG),
        after=after_log(logger, logging.DEBUG),
        reraise=True
    )

class RetryManager:
    """Manage retries for tool calls"""
    
    @staticmethod
    async def execute_with_retry(
        func: Callable,
        max_attempts: int = 3,
        *args,
        **kwargs
    ) -> Any:
        """Execute function with retries"""
        
        last_exception = None
        
        for attempt in range(max_attempts):
            try:
                return await func(*args, **kwargs)
            
            except Exception as e:
                last_exception = e
                
                if attempt < max_attempts - 1:
                    # Exponential backoff
                    wait_time = 2 ** attempt
                    logger.warning(
                        f"Attempt {attempt + 1} failed, retrying in {wait_time}s: {str(e)}"
                    )
                    await asyncio.sleep(wait_time)
                else:
                    logger.error(f"All {max_attempts} attempts failed: {str(e)}")
        
        raise last_exception
python

Request Context and Tracing

Create request context (src/context.py):

import contextvars
import uuid
from typing import Optional

# Context variables
request_id: contextvars.ContextVar[str] = contextvars.ContextVar('request_id')
user_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('user_id', default=None)
claude_session_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('claude_session_id', default=None)

class RequestContext:
    """Manage request context"""
    
    @staticmethod
    def get_request_id() -> str:
        """Get current request ID"""
        return request_id.get()
    
    @staticmethod
    def set_request_id(req_id: Optional[str] = None):
        """Set request ID"""
        req_id = req_id or str(uuid.uuid4())
        request_id.set(req_id)
        return req_id
    
    @staticmethod
    def set_claude_session(session_id: str):
        """Set Claude session ID"""
        claude_session_id.set(session_id)
    
    @staticmethod
    def get_all() -> dict:
        """Get all context variables"""
        return {
            "request_id": request_id.get(),
            "user_id": user_id.get(),
            "claude_session_id": claude_session_id.get()
        }

# Middleware to inject context
from fastapi import Request
import uuid

async def context_middleware(request: Request, call_next):
    """Inject request context"""
    
    # Generate or get request ID
    req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    RequestContext.set_request_id(req_id)
    
    # Extract Claude session if available
    claude_session = request.headers.get("X-Claude-Session-ID")
    if claude_session:
        RequestContext.set_claude_session(claude_session)
    
    response = await call_next(request)
    
    # Add request ID to response
    response.headers["X-Request-ID"] = req_id
    
    return response
python

9: Disaster Recovery and Backups

Production systems must have backup and recovery strategies.

Database Backups

Create backup script (scripts/backup.py):

#!/usr/bin/env python3
"""
Database backup script
Run daily via cron: 0 2 * * * /path/to/backup.py
"""

import asyncio
import subprocess
import datetime
import logging
import boto3
from pathlib import Path
from src.config import settings

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class BackupManager:
    """Manage database backups"""
    
    def __init__(self):
        self.s3 = boto3.client('s3')
        self.backup_bucket = "mcp-backups"
    
    async def backup_database(self) -> str:
        """Create database backup"""
        
        # Parse database URL
        from urllib.parse import urlparse
        parsed = urlparse(settings.DATABASE_URL)
        
        host = parsed.hostname
        port = parsed.port or 5432
        db_name = parsed.path.lstrip('/')
        user = parsed.username
        password = parsed.password
        
        # Timestamp for backup filename
        timestamp = datetime.datetime.now().isoformat()
        filename = f"backup-{timestamp}.sql"
        filepath = Path("/tmp") / filename
        
        try:
            # Run pg_dump
            cmd = [
                "pg_dump",
                f"--host={host}",
                f"--port={port}",
                f"--username={user}",
                f"--file={filepath}",
                db_name
            ]
            
            env = {"PGPASSWORD": password}
            
            result = subprocess.run(
                cmd,
                capture_output=True,
                env=env,
                timeout=3600
            )
            
            if result.returncode != 0:
                raise Exception(f"pg_dump failed: {result.stderr.decode()}")
            
            logger.info(f"Database backup created: {filepath}")
            
            # Upload to S3
            self._upload_to_s3(filepath, filename)
            
            # Cleanup local backup
            filepath.unlink()
            
            logger.info(f"Backup uploaded to S3: {filename}")
            
            return filename
        
        except Exception as e:
            logger.error(f"Backup failed: {str(e)}")
            raise
    
    def _upload_to_s3(self, filepath: Path, filename: str):
        """Upload backup to S3"""
        
        self.s3.upload_file(
            str(filepath),
            self.backup_bucket,
            f"postgres/{filename}"
        )
    
    async def restore_from_backup(self, backup_filename: str):
        """Restore database from backup"""
        
        logger.info(f"Restoring from backup: {backup_filename}")
        
        # Download from S3
        filepath = Path("/tmp") / backup_filename
        
        self.s3.download_file(
            self.backup_bucket,
            f"postgres/{backup_filename}",
            str(filepath)
        )
        
        # Parse database URL
        from urllib.parse import urlparse
        parsed = urlparse(settings.DATABASE_URL)
        
        host = parsed.hostname
        port = parsed.port or 5432
        db_name = parsed.path.lstrip('/')
        user = parsed.username
        password = parsed.password
        
        try:
            # Drop and recreate database
            cmd_drop = [
                "psql",
                f"--host={host}",
                f"--port={port}",
                f"--username={user}",
                f"--command=DROP DATABASE IF EXISTS {db_name};"
            ]
            
            env = {"PGPASSWORD": password}
            subprocess.run(cmd_drop, capture_output=True, env=env)
            
            cmd_create = [
                "psql",
                f"--host={host}",
                f"--port={port}",
                f"--username={user}",
                f"--command=CREATE DATABASE {db_name};"
            ]
            
            subprocess.run(cmd_create, capture_output=True, env=env)
            
            # Restore
            cmd_restore = [
                "psql",
                f"--host={host}",
                f"--port={port}",
                f"--username={user}",
                f"--file={filepath}",
                db_name
            ]
            
            result = subprocess.run(
                cmd_restore,
                capture_output=True,
                env=env,
                timeout=3600
            )
            
            if result.returncode != 0:
                raise Exception(f"psql restore failed: {result.stderr.decode()}")
            
            logger.info("Database restored successfully")
            
            # Cleanup
            filepath.unlink()
        
        except Exception as e:
            logger.error(f"Restore failed: {str(e)}")
            raise

async def main():
    """Main backup routine"""
    
    manager = BackupManager()
    await manager.backup_database()

if __name__ == "__main__":
    asyncio.run(main())
python

Disaster Recovery Plan

Create recovery runbook (docs/DISASTER_RECOVERY.md):

# Disaster Recovery Plan

## Recovery Time Objective (RTO): 1 hour
## Recovery Point Objective (RPO): 1 day

### Scenarios

#### Scenario 1: Service Crashed
**Detection**: Health checks fail, Kubernetes restarts pod
**MTTR**: <1 minute (automatic restart)
**Recovery**: Kubernetes handles automatically

#### Scenario 2: Database Corruption
**Detection**: Query errors, data integrity checks fail
**MTTR**: <30 minutes
**Recovery**:
1. Alert fires, on-call engineer notified
2. Stop all MCP server connections
3. Restore database from most recent clean backup
4. Run integrity checks
5. Resume MCP server

#### Scenario 3: Complete Data Loss
**Detection**: Database completely unavailable
**MTTR**: 1-4 hours
**Recovery**:
1. Provision new database instance
2. Restore from S3 backup
3. Perform verification
4. Redirect MCP servers to new database

#### Scenario 4: Corrupted Redis Cache
**Detection**: Cache operations failing
**MTTR**: <5 minutes
**Recovery**:
1. Flush Redis (data is cache, not authoritative)
2. Clear cache keys
3. Cache will rebuild on first requests

### Regular Testing
- Monthly backup restore test
- Quarterly full disaster recovery drill
- Runbook update after each incident

### Backup Schedule
- Database: Daily at 2 AM UTC
- Configuration: On each deployment
- Retention: 30 days
markdown

10: Performance Optimization

Making MCP servers fast is essential for Claude integration.

Query Optimization

Create query optimizer (src/database/optimizer.py):

from sqlalchemy import select, text
import logging
from src.database.models import Customer, Order

logger = logging.getLogger(__name__)

class QueryOptimizer:
    """Database query optimization patterns"""
    
    @staticmethod
    def get_customer_with_orders(db_session, customer_id: str):
        """
        โŒ BAD: N+1 query problem
        - First query: SELECT * FROM customers WHERE id = ?
        - Second query (for each customer): SELECT * FROM orders WHERE customer_id = ?
        """
        
        customer = db_session.query(Customer).filter_by(id=customer_id).first()
        # Accessing customer.orders triggers another query
        orders = customer.orders  # N+1 query!
    
    @staticmethod
    def get_customer_with_orders_optimized(db_session, customer_id: str):
        """
        โœ… GOOD: Single query with join
        """
        
        query = (
            select(Customer)
            .where(Customer.id == customer_id)
            .options(
                # Eager load orders
                joinedload(Customer.orders)
            )
        )
        
        customer = db_session.execute(query).scalar_one_or_none()
        # customer.orders already loaded, no additional query
    
    @staticmethod
    def get_customers_batch(db_session, customer_ids: list):
        """
        โœ… GOOD: Batch query instead of loop
        """
        
        query = select(Customer).where(Customer.id.in_(customer_ids))
        customers = db_session.execute(query).scalars().all()
        # Single query for all customers

# Database indexes
DATABASE_INDEXES = """
-- Essential indexes for performance

CREATE INDEX idx_customers_id ON customers(id);
CREATE INDEX idx_orders_customer_id ON orders(customer_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_tool_calls_tool_name ON tool_calls(tool_name);
CREATE INDEX idx_tool_calls_created_at ON tool_calls(created_at);

-- Composite indexes for common queries
CREATE INDEX idx_orders_customer_status ON orders(customer_id, status);
CREATE INDEX idx_tool_calls_tool_created ON tool_calls(tool_name, created_at);
"""
python

Connection Pooling

Connection pool configuration:

# src/database/connection.py (updated)

# Optimal pool sizing
"""
Pool size formula: (number of workers ร— 2) + spare connections

For typical 4-worker setup:
- Pool size: (4 ร— 2) + 5 = 13
- Max overflow: 5
- Total possible: 18 connections
"""

engine = create_async_engine(
    settings.DATABASE_URL,
    
    # Pool configuration
    pool_size=13,  # Base pool size
    max_overflow=5,  # Additional overflow connections
    pool_timeout=30,  # Timeout waiting for connection
    pool_pre_ping=True,  # Test connections before using
    pool_recycle=3600,  # Recycle connections every hour
    
    # Performance tuning
    connect_args={
        "timeout": 10,
        "command_timeout": 30,
        "server_settings": {
            "application_name": "mcp_server",
            "jit": "off"  # Disable JIT for faster queries
        }
    }
)
python

Caching Strategy

Multi-layer caching:

Request
  โ†“
L1: Local in-memory cache (100ms)
  โ”‚ Miss? โ†“
L2: Redis cache (5ms)
  โ”‚ Miss? โ†“
L3: Database (50-200ms)
  โ””โ”€โ†’ Populate L2 and L1
from functools import lru_cache
import aiocache

class CacheStrategy:
    """Multi-layer caching"""
    
    # L1: Local memory cache
    @lru_cache(maxsize=1000)
    def get_product_schema(self, product_type: str):
        """Cache product schemas in memory"""
        return self._load_schema(product_type)
    
    # L2: Redis cache (shared across instances)
    async def get_customer_cached(self, customer_id: str):
        """Cache customer data in Redis"""
        
        cache_key = f"customer:{customer_id}"
        
        # Try cache first
        cached = await self.cache.get(cache_key)
        if cached:
            return cached
        
        # Fetch from database
        customer = await self.db.get_customer(customer_id)
        
        # Cache for 1 hour
        await self.cache.set(cache_key, customer, ttl=3600)
        
        return customer
    
    async def invalidate_customer(self, customer_id: str):
        """Invalidate cached customer"""
        await self.cache.delete(f"customer:{customer_id}")
python

Batch Operations

class BatchOptimizer:
    """Optimize operations with batching"""
    
    async def get_customers_batch(self, customer_ids: list):
        """
        โœ… Better: Single query for all customers
        Instead of: for customer_id in customer_ids: get_customer(...)
        """
        
        query = select(Customer).where(Customer.id.in_(customer_ids))
        return await self.db.execute(query)
    
    async def create_orders_batch(self, orders: list):
        """
        โœ… Better: Single INSERT with multiple values
        """
        
        query = insert(Order).values(orders)
        await self.db.execute(query)
python

11: Production Operations

Day-to-day running of production MCP servers.

Deployment Strategy

Blue-green deployment with zero downtime:

#!/bin/bash
# deploy.sh

set -e

# Configuration
NAMESPACE="production"
SERVICE_NAME="mcp-server"
NEW_VERSION="v1.2.3"
IMAGE="mcp-server:$NEW_VERSION"

echo "Starting blue-green deployment..."

# Step 1: Pull latest image
docker pull $IMAGE

# Step 2: Deploy green (new version)
kubectl set image deployment/mcp-server-green \
  mcp-server=$IMAGE \
  -n $NAMESPACE

# Step 3: Wait for green to be ready
kubectl rollout status deployment/mcp-server-green \
  -n $NAMESPACE \
  --timeout=5m

# Step 4: Run health checks on green
echo "Running health checks on green..."
GREEN_POD=$(kubectl get pod -l deployment=mcp-server-green -n $NAMESPACE -o jsonpath='{.items[0].metadata.name}')
kubectl exec $GREEN_POD -n $NAMESPACE -- curl -f http://localhost:8000/health

# Step 5: Switch traffic from blue to green
kubectl patch service $SERVICE_NAME \
  -n $NAMESPACE \
  -p '{"spec":{"selector":{"deployment":"mcp-server-green"}}}'

echo "Switched traffic to green deployment"

# Step 6: Monitor for issues (5 minutes)
sleep 300

# Step 7: If successful, update blue and mark as primary
kubectl set image deployment/mcp-server-blue \
  mcp-server=$IMAGE \
  -n $NAMESPACE

echo "Blue-green deployment complete"
bash

Scaling Strategy

Horizontal scaling based on metrics:

# Kubernetes HPA configuration
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-server-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-server
  
  minReplicas: 3      # Minimum availability
  maxReplicas: 20     # Maximum cost control
  
  metrics:
  # Scale on CPU utilization
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70  # Scale up at 70% CPU
  
  # Scale on memory utilization
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80  # Scale up at 80% memory
  
  # Scale on custom metric (tool call rate)
  - type: Pods
    pods:
      metric:
        name: mcp_tool_calls_per_second
      target:
        type: AverageValue
        averageValue: "100"  # 100 calls/sec per pod
  
  behavior:
    # Fast scale up
    scaleUp:
      stabilizationWindowSeconds: 0  # Immediate
      policies:
      - type: Percent
        value: 100  # Double the pods
        periodSeconds: 30
    
    # Slow scale down (prevent thrashing)
    scaleDown:
      stabilizationWindowSeconds: 300  # Wait 5 minutes
      policies:
      - type: Percent
        value: 50  # Reduce by 50%
        periodSeconds: 60
yaml

Monitoring Dashboards

Grafana dashboard queries:

# Tool call volume (last hour)
sum(rate(mcp_tool_calls_total[1m]))

# Error rate by tool
sum(rate(mcp_tool_errors_total[5m])) by (tool_name) / sum(rate(mcp_tool_calls_total[5m])) by (tool_name)

# Tool latency (p95)
histogram_quantile(0.95, rate(mcp_tool_call_duration_seconds_bucket[5m])) by (tool_name)

# Cache hit rate
sum(rate(mcp_cache_hits_total[5m])) / (sum(rate(mcp_cache_hits_total[5m])) + sum(rate(mcp_cache_misses_total[5m])))

# Database connections
mcp_db_connections_active

# Request latency
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))

12: Advanced Use Cases and Patterns

Real-time Data Synchronization

For MCP tools that depend on constantly-changing data:

class RealtimeDataSync:
    """Keep local cache in sync with external data source"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
        self.last_sync = None
    
    async def sync_customer_data(self):
        """
        Sync customer data from external service periodically
        Run every 60 seconds
        """
        
        # Get last sync timestamp
        last_sync = await self.redis.get("customer_sync:last")
        
        # Fetch incremental changes since last sync
        if last_sync:
            changes = await self._fetch_changes(last_sync)
        else:
            changes = await self._fetch_all_customers()
        
        # Update local cache
        for customer_id, customer_data in changes.items():
            cache_key = f"customer:{customer_id}"
            await self.redis.setex(cache_key, 3600, json.dumps(customer_data))
        
        # Update sync timestamp
        await self.redis.set("customer_sync:last", time.time())
python

Tool Composition and Workflows

Running complex workflows using multiple tools:

class OrderWorkflow:
    """Multi-tool workflow for order processing"""
    
    async def process_order(self, order_details: dict):
        """
        Complex workflow:
        1. Validate customer exists
        2. Check inventory
        3. Process payment
        4. Create order
        5. Send confirmation
        """
        
        customer_id = order_details["customer_id"]
        items = order_details["items"]
        
        # Step 1: Validate customer
        customer = await self.get_customer(customer_id)
        if not customer:
            raise ValueError(f"Customer {customer_id} not found")
        
        # Step 2: Check inventory for all items
        for item in items:
            available = await self.check_inventory(
                item["product_id"],
                item["quantity"]
            )
            if not available:
                raise ValueError(f"Insufficient inventory for {item['product_id']}")
        
        # Step 3: Process payment
        payment_result = await self.process_payment(
            customer_id,
            self._calculate_total(items)
        )
        
        if not payment_result["success"]:
            raise ValueError(f"Payment failed: {payment_result['error']}")
        
        # Step 4: Create order
        order = await self.create_order(customer_id, items)
        
        # Step 5: Send confirmation (fire-and-forget)
        asyncio.create_task(self.send_confirmation(customer["email"], order))
        
        return order
python

Federated MCP Servers

When you need multiple MCP servers talking to each other:

class FederatedMCPClient:
    """Call tools in other MCP servers"""
    
    def __init__(self):
        self.servers = {
            "customers": "http://mcp-customers:8000",
            "orders": "http://mcp-orders:8000",
            "payments": "http://mcp-payments:8000"
        }
    
    async def call_remote_tool(self, server: str, tool: str, arguments: dict):
        """Call tool in remote MCP server"""
        
        url = self.servers.get(server)
        if not url:
            raise ValueError(f"Unknown server: {server}")
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{url}/mcp",
                json={
                    "jsonrpc": "2.0",
                    "method": "tools/call",
                    "params": {
                        "name": tool,
                        "arguments": arguments
                    }
                },
                timeout=5.0
            )
            
            if response.status_code != 200:
                raise Exception(f"Server error: {response.text}")
            
            data = response.json()
            if "error" in data:
                raise Exception(f"Tool error: {data['error']}")
            
            return data["result"]
    
    async def create_order_federated(self, customer_id: str, items: list):
        """Create order using federated services"""
        
        # Call customer service
        customer = await self.call_remote_tool(
            "customers", "get_customer",
            {"customer_id": customer_id}
        )
        
        # Call payment service
        payment = await self.call_remote_tool(
            "payments", "process_payment",
            {"customer_id": customer_id, "amount": 99.99}
        )
        
        # Call order service
        order = await self.call_remote_tool(
            "orders", "create_order",
            {"customer_id": customer_id, "items": items}
        )
        
        return order
python

13: Troubleshooting and Debugging

Common issues and how to resolve them.

Claude Timeouts

Problem: Tool calls timeout after 5 seconds

Causes:

  1. Database query too slow
  2. External API call hanging
  3. Serialization taking too long

Solutions:

# Add timeout monitoring
@app.post("/mcp")
async def handle_mcp(data: dict):
    start = time.time()
    
    # Process request
    result = await process_tool_call(data)
    
    duration = time.time() - start
    
    # Warn if close to timeout
    if duration > 4.0:
        logger.warning(f"Tool call took {duration:.2f}s, close to 5s limit")
        metrics.tool_call_duration_warning.inc()
    
    return result

# Add slow query logs
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
python

High Memory Usage

Problem: MCP server memory grows over time

Causes:

  1. Memory leak in tool implementation
  2. Cache not expiring
  3. Large objects not being garbage collected

Solutions:

# Monitor memory usage
import psutil
import os

async def monitor_memory():
    """Log memory usage periodically"""
    process = psutil.Process(os.getpid())
    
    while True:
        memory_info = process.memory_info()
        memory_percent = process.memory_percent()
        
        logger.info(f"Memory: {memory_info.rss / 1024 / 1024:.1f} MB ({memory_percent:.1f}%)")
        
        if memory_percent > 80:
            logger.warning("High memory usage detected")
            # Trigger garbage collection
            import gc
            gc.collect()
        
        await asyncio.sleep(300)  # Every 5 minutes

# Implement cache expiration
await redis_client.expire(cache_key, 3600)  # Always set TTL
python

Database Connection Issues

Problem: "Connection pool exhausted" errors

Causes:

  1. Connections not being returned to pool
  2. Long-running transactions
  3. Too many concurrent requests

Solutions:

# Use context managers for connections
async def get_customer(customer_id: str):
    # WRONG: Connection held for entire function
    # async with db.get_connection() as conn: ...
    
    # RIGHT: Connection released as soon as query completes
    async with db_session() as session:
        return await session.get(Customer, customer_id)
    # Connection returned here

# Add connection pool monitoring
async def monitor_connection_pool():
    """Monitor database connection pool"""
    
    while True:
        pool_state = await db.get_pool_state()
        
        logger.info(
            f"DB Pool: {pool_state['active']} active, "
            f"{pool_state['idle']} idle, "
            f"{pool_state['queued']} queued"
        )
        
        if pool_state['queued'] > 0:
            logger.warning("Requests queued for database connection")
        
        await asyncio.sleep(30)
python

Conclusion: Production MCP Servers at Scale

Building production MCP servers requires:

  1. Architecture: Choose monolithic, federated, or gateway pattern based on scale
  2. Development: Use asyncio, proper error handling, input validation
  3. Testing: Unit, integration, and load tests catch problems early
  4. Deployment: Docker, Kubernetes, blue-green deployments minimize downtime
  5. Observability: Prometheus metrics, structured logging, distributed tracing
  6. Resilience: Circuit breakers, retries, fallbacks, graceful degradation
  7. Security: Authentication, authorization, rate limiting, input validation
  8. Performance: Query optimization, caching, connection pooling, batch operations
  9. Operations: Monitoring dashboards, alerting, runbooks, regular drills

The key to production success is understanding your constraints (Claude's 5-second timeout) and designing for failure (assuming things will go wrong and preparing accordingly).

Start simple, monitor carefully, and evolve your architecture as you scale.


Appendix: Complete Example Project

A reference implementation is available at: https://github.com/afrankenstine/mcp-production-server

Key files:

  • src/: Source code
  • tests/: Comprehensive test suite
  • deploy/: Kubernetes and Docker files
  • scripts/: Operational scripts
  • docs/: Runbooks and guides

References and Further Reading

Official Documentation:

Best Practices:

Tools and Libraries:

Published on 12/5/2025 by Yogesh Bhandari

Found this helpful? Share it with your network!

Share:๐Ÿฆ๐Ÿ’ผ

Yogesh Bhandari

Technology Visionary & Co-Founder

Building the future through cloud innovation, AI solutions, and open-source contributions.

CTO & Co-Founderโ˜๏ธ Cloud Expert๐Ÿš€ AI Pioneer
ยฉ 2025 Yogesh Bhandari.Made with in Nepal

Empowering organizations through cloud transformation, AI innovation, and scalable solutions.

๐ŸŒ Global Remoteโ€ขโ˜๏ธ Cloud-Firstโ€ข๐Ÿš€ Always Buildingโ€ข๐Ÿค Open to Collaborate