MCP in Production: Deployment and Integration
MCP in Production: Deployment and Integration
Deploying MCP servers to production requires careful attention to security, scalability, monitoring, and integration patterns. This lesson covers production best practices, authentication strategies, deployment options, and operational considerations.
Production Server Architecture
import logging
import asyncio
from typing import Dict, Optional
from mcp.server import Server
from mcp.types import (
Resource, Tool, TextContent, CallToolResult,
ListResourcesResult, ListToolsResult
)
import os
from dotenv import load_dotenv
# Load environment configuration
load_dotenv()
class ProductionMCPServer:
"""Production-grade MCP server with best practices."""
def __init__(self):
self.server = Server("production-server")
self.logger = self._setup_logging()
self.config = self._load_config()
self.auth_token = self.config.get("auth_token")
self.setup_handlers()
def _setup_logging(self) -> logging.Logger:
"""Configure structured logging."""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# File handler
file_handler = logging.FileHandler("mcp_server.log")
file_handler.setLevel(logging.DEBUG)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Formatter
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def _load_config(self) -> Dict:
"""Load configuration from environment."""
return {
"auth_token": os.getenv("MCP_AUTH_TOKEN"),
"max_resource_size": int(os.getenv("MAX_RESOURCE_SIZE", "10485760")),
"request_timeout": int(os.getenv("REQUEST_TIMEOUT", "30")),
"rate_limit": int(os.getenv("RATE_LIMIT", "100")),
"environment": os.getenv("ENVIRONMENT", "production")
}
def setup_handlers(self):
"""Setup server handlers with security and monitoring."""
@self.server.list_resources()
async def list_resources() -> ListResourcesResult:
"""List resources with logging."""
self.logger.info("list_resources called")
return ListResourcesResult(
resources=[
Resource(
uri="secure:///data",
name="Protected Data",
description="Requires authentication"
)
]
)
@self.server.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource with authentication and monitoring."""
self.logger.info(f"read_resource: {uri}")
# Check authentication
if not self._is_authenticated():
self.logger.warning(f"Unauthorized resource access: {uri}")
raise PermissionError("Unauthorized")
# Check resource size limits
content = await self._fetch_resource(uri)
if len(content) > self.config["max_resource_size"]:
raise ValueError("Resource exceeds size limit")
return content
@self.server.list_tools()
async def list_tools() -> ListToolsResult:
"""List available tools."""
self.logger.info("list_tools called")
return ListToolsResult(
tools=[
Tool(
name="secure_operation",
description="Secure operation requiring auth",
inputSchema={"type": "object", "properties": {}}
)
]
)
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict) -> CallToolResult:
"""Execute tool with security and error handling."""
self.logger.info(f"call_tool: {name} with {arguments}")
try:
# Authentication check
if not self._is_authenticated():
self.logger.warning(f"Unauthorized tool call: {name}")
return CallToolResult(
content=[TextContent(type="text", text="Unauthorized")],
isError=True
)
# Input validation
if not self._validate_input(name, arguments):
self.logger.warning(f"Invalid input for {name}: {arguments}")
return CallToolResult(
content=[TextContent(type="text", text="Invalid input")],
isError=True
)
# Rate limiting
if not self._check_rate_limit():
self.logger.warning("Rate limit exceeded")
return CallToolResult(
content=[TextContent(type="text", text="Rate limit exceeded")],
isError=True
)
# Execute tool with timeout
result = await asyncio.wait_for(
self._execute_tool(name, arguments),
timeout=self.config["request_timeout"]
)
self.logger.info(f"Tool {name} completed successfully")
return CallToolResult(
content=[TextContent(type="text", text=result)],
isError=False
)
except asyncio.TimeoutError:
self.logger.error(f"Tool {name} timeout")
return CallToolResult(
content=[TextContent(type="text", text="Tool execution timeout")],
isError=True
)
except Exception as e:
self.logger.error(f"Tool {name} error: {str(e)}")
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")],
isError=True
)
def _is_authenticated(self) -> bool:
"""Check authentication."""
# In production, validate tokens properly
return bool(self.auth_token)
def _validate_input(self, tool_name: str, arguments: Dict) -> bool:
"""Validate tool input."""
# Implement tool-specific validation
return True
def _check_rate_limit(self) -> bool:
"""Check rate limiting."""
# Implement rate limiting logic
return True
async def _fetch_resource(self, uri: str) -> str:
"""Fetch resource content."""
return "Resource content"
async def _execute_tool(self, name: str, arguments: Dict) -> str:
"""Execute the actual tool."""
return "Tool result"
async def run(self):
"""Run server."""
self.logger.info("Starting production MCP server")
async with self.server:
await asyncio.Event().wait()
Authentication and Authorization
Token-Based Authentication
import jwt
from typing import Optional
from datetime import datetime, timedelta
class TokenAuth:
"""Handle token-based authentication."""
def __init__(self, secret_key: str):
self.secret_key = secret_key
def generate_token(self, user_id: str, expires_in_hours: int = 24) -> str:
"""Generate JWT token."""
payload = {
"user_id": user_id,
"exp": datetime.utcnow() + timedelta(hours=expires_in_hours),
"iat": datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm="HS256")
def verify_token(self, token: str) -> Optional[str]:
"""Verify token and return user_id."""
try:
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
return payload.get("user_id")
except jwt.InvalidTokenError:
return None
class AuthenticatedServer:
"""Server with token-based authentication."""
def __init__(self, secret_key: str):
self.auth = TokenAuth(secret_key)
self.current_user: Optional[str] = None
self.logger = logging.getLogger(__name__)
def authenticate_request(self, auth_header: Optional[str]) -> bool:
"""Authenticate incoming request."""
if not auth_header:
return False
try:
# Extract token from "Bearer <token>"
parts = auth_header.split()
if len(parts) != 2 or parts[0] != "Bearer":
return False
token = parts[1]
user_id = self.auth.verify_token(token)
if user_id:
self.current_user = user_id
self.logger.info(f"Authenticated user: {user_id}")
return True
return False
except Exception as e:
self.logger.error(f"Authentication error: {e}")
return False
Role-Based Access Control
from enum import Enum
class Role(Enum):
"""User roles."""
ADMIN = "admin"
USER = "user"
GUEST = "guest"
class RBACServer:
"""Server with role-based access control."""
def __init__(self):
self.user_roles: Dict[str, Role] = {}
self.resource_access: Dict[str, set] = {
"admin_data": {Role.ADMIN},
"user_data": {Role.ADMIN, Role.USER},
"public_data": {Role.ADMIN, Role.USER, Role.GUEST}
}
def check_access(self, user_id: str, resource: str) -> bool:
"""Check if user can access resource."""
user_role = self.user_roles.get(user_id)
if user_role is None:
return False
required_roles = self.resource_access.get(resource, set())
return user_role in required_roles
def set_user_role(self, user_id: str, role: Role):
"""Assign role to user."""
self.user_roles[user_id] = role
Deployment Options
Docker Containerization
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application
COPY . .
# Set environment
ENV ENVIRONMENT=production
ENV MCP_AUTH_TOKEN=${MCP_AUTH_TOKEN}
# Run server
CMD ["python", "-m", "mcp_server"]
# docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
ports:
- "8000:8000"
environment:
- MCP_AUTH_TOKEN=${MCP_AUTH_TOKEN}
- ENVIRONMENT=production
- RATE_LIMIT=100
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
Kubernetes Deployment
# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server
labels:
app: mcp-server
spec:
replicas: 3
selector:
matchLabels:
app: mcp-server
template:
metadata:
labels:
app: mcp-server
spec:
containers:
- name: mcp-server
image: mcp-server:latest
ports:
- containerPort: 8000
env:
- name: MCP_AUTH_TOKEN
valueFrom:
secretKeyRef:
name: mcp-secrets
key: auth-token
- name: ENVIRONMENT
value: "production"
resources:
requests:
memory: "128Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: mcp-server-service
spec:
selector:
app: mcp-server
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
Monitoring and Observability
from prometheus_client import Counter, Histogram, start_http_server
import time
class MonitoredServer:
"""Server with Prometheus monitoring."""
def __init__(self):
# Metrics
self.request_count = Counter(
"mcp_requests_total",
"Total requests",
["method"]
)
self.request_duration = Histogram(
"mcp_request_duration_seconds",
"Request duration",
["method"]
)
self.errors = Counter(
"mcp_errors_total",
"Total errors",
["method", "error_type"]
)
# Start metrics server
start_http_server(8001)
def record_request(self, method: str, duration: float, error: Optional[str] = None):
"""Record request metrics."""
self.request_count.labels(method=method).inc()
self.request_duration.labels(method=method).observe(duration)
if error:
self.errors.labels(method=method, error_type=error).inc()
Health Checks
class HealthCheckServer:
"""Server with health check endpoints."""
def __init__(self):
self.dependencies = {}
def check_health(self) -> Dict[str, bool]:
"""Check server and dependency health."""
health = {
"server": True,
"dependencies": {}
}
# Check dependencies
for dep_name, dep_check in self.dependencies.items():
try:
health["dependencies"][dep_name] = dep_check()
except Exception:
health["dependencies"][dep_name] = False
health["server"] = False
return health
def add_dependency_check(self, name: str, check_func):
"""Register dependency health check."""
self.dependencies[name] = check_func
Configuration Management
from pydantic import BaseSettings
class ProductionSettings(BaseSettings):
"""Production configuration."""
app_name: str = "mcp-server"
auth_token: str
database_url: str
log_level: str = "INFO"
max_resource_size: int = 10485760
request_timeout: int = 30
rate_limit: int = 100
environment: str = "production"
class Config:
env_file = ".env.production"
env_file_encoding = "utf-8"
Best Practices Checklist
Security
- Use environment variables for secrets
- Implement authentication on all endpoints
- Validate and sanitize all inputs
- Use HTTPS in production
- Implement rate limiting
- Log security events
Reliability
- Add health checks
- Implement circuit breakers
- Use timeouts on all operations
- Add retry logic with exponential backoff
- Monitor error rates
- Use structured logging
Performance
- Cache frequently accessed resources
- Implement pagination for large datasets
- Monitor response times
- Profile resource usage
- Optimize database queries
- Use connection pooling
Operational Excellence
- Centralize configuration
- Use version control for all configs
- Automate deployment
- Monitor key metrics
- Document runbooks
- Plan for disaster recovery
Key Takeaway
Production deployment of MCP servers requires attention to authentication, authorization, monitoring, health checks, and proper containerization. Following these patterns ensures reliable, secure, and observable servers in production environments.
Exercises
-
Secured Server: Build a server with token-based authentication and role-based access control.
-
Docker Deployment: Containerize an MCP server and create a docker-compose file for local testing.
-
Kubernetes Deployment: Deploy an MCP server to Kubernetes with proper resource limits and health checks.
-
Monitoring: Add Prometheus metrics to track request rates, latency, and error rates.
-
Configuration: Implement environment-based configuration management.
-
Incident Response: Design monitoring and alerting rules for common failure scenarios.