Intermediate

MCP in Production: Deployment and Integration

Lesson 4 of 4 Estimated Time 50 min

MCP in Production: Deployment and Integration

Deploying MCP servers to production requires careful attention to security, scalability, monitoring, and integration patterns. This lesson covers production best practices, authentication strategies, deployment options, and operational considerations.

Production Server Architecture

import logging
import asyncio
from typing import Dict, Optional
from mcp.server import Server
from mcp.types import (
    Resource, Tool, TextContent, CallToolResult,
    ListResourcesResult, ListToolsResult
)
import os
from dotenv import load_dotenv

# Load environment configuration
load_dotenv()

class ProductionMCPServer:
    """Production-grade MCP server with best practices."""

    def __init__(self):
        self.server = Server("production-server")
        self.logger = self._setup_logging()
        self.config = self._load_config()
        self.auth_token = self.config.get("auth_token")
        self.setup_handlers()

    def _setup_logging(self) -> logging.Logger:
        """Configure structured logging."""
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)

        # File handler
        file_handler = logging.FileHandler("mcp_server.log")
        file_handler.setLevel(logging.DEBUG)

        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)

        # Formatter
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)

        logger.addHandler(file_handler)
        logger.addHandler(console_handler)

        return logger

    def _load_config(self) -> Dict:
        """Load configuration from environment."""
        return {
            "auth_token": os.getenv("MCP_AUTH_TOKEN"),
            "max_resource_size": int(os.getenv("MAX_RESOURCE_SIZE", "10485760")),
            "request_timeout": int(os.getenv("REQUEST_TIMEOUT", "30")),
            "rate_limit": int(os.getenv("RATE_LIMIT", "100")),
            "environment": os.getenv("ENVIRONMENT", "production")
        }

    def setup_handlers(self):
        """Setup server handlers with security and monitoring."""

        @self.server.list_resources()
        async def list_resources() -> ListResourcesResult:
            """List resources with logging."""
            self.logger.info("list_resources called")
            return ListResourcesResult(
                resources=[
                    Resource(
                        uri="secure:///data",
                        name="Protected Data",
                        description="Requires authentication"
                    )
                ]
            )

        @self.server.read_resource()
        async def read_resource(uri: str) -> str:
            """Read resource with authentication and monitoring."""
            self.logger.info(f"read_resource: {uri}")

            # Check authentication
            if not self._is_authenticated():
                self.logger.warning(f"Unauthorized resource access: {uri}")
                raise PermissionError("Unauthorized")

            # Check resource size limits
            content = await self._fetch_resource(uri)
            if len(content) > self.config["max_resource_size"]:
                raise ValueError("Resource exceeds size limit")

            return content

        @self.server.list_tools()
        async def list_tools() -> ListToolsResult:
            """List available tools."""
            self.logger.info("list_tools called")
            return ListToolsResult(
                tools=[
                    Tool(
                        name="secure_operation",
                        description="Secure operation requiring auth",
                        inputSchema={"type": "object", "properties": {}}
                    )
                ]
            )

        @self.server.call_tool()
        async def call_tool(name: str, arguments: Dict) -> CallToolResult:
            """Execute tool with security and error handling."""
            self.logger.info(f"call_tool: {name} with {arguments}")

            try:
                # Authentication check
                if not self._is_authenticated():
                    self.logger.warning(f"Unauthorized tool call: {name}")
                    return CallToolResult(
                        content=[TextContent(type="text", text="Unauthorized")],
                        isError=True
                    )

                # Input validation
                if not self._validate_input(name, arguments):
                    self.logger.warning(f"Invalid input for {name}: {arguments}")
                    return CallToolResult(
                        content=[TextContent(type="text", text="Invalid input")],
                        isError=True
                    )

                # Rate limiting
                if not self._check_rate_limit():
                    self.logger.warning("Rate limit exceeded")
                    return CallToolResult(
                        content=[TextContent(type="text", text="Rate limit exceeded")],
                        isError=True
                    )

                # Execute tool with timeout
                result = await asyncio.wait_for(
                    self._execute_tool(name, arguments),
                    timeout=self.config["request_timeout"]
                )

                self.logger.info(f"Tool {name} completed successfully")
                return CallToolResult(
                    content=[TextContent(type="text", text=result)],
                    isError=False
                )

            except asyncio.TimeoutError:
                self.logger.error(f"Tool {name} timeout")
                return CallToolResult(
                    content=[TextContent(type="text", text="Tool execution timeout")],
                    isError=True
                )
            except Exception as e:
                self.logger.error(f"Tool {name} error: {str(e)}")
                return CallToolResult(
                    content=[TextContent(type="text", text=f"Error: {str(e)}")],
                    isError=True
                )

    def _is_authenticated(self) -> bool:
        """Check authentication."""
        # In production, validate tokens properly
        return bool(self.auth_token)

    def _validate_input(self, tool_name: str, arguments: Dict) -> bool:
        """Validate tool input."""
        # Implement tool-specific validation
        return True

    def _check_rate_limit(self) -> bool:
        """Check rate limiting."""
        # Implement rate limiting logic
        return True

    async def _fetch_resource(self, uri: str) -> str:
        """Fetch resource content."""
        return "Resource content"

    async def _execute_tool(self, name: str, arguments: Dict) -> str:
        """Execute the actual tool."""
        return "Tool result"

    async def run(self):
        """Run server."""
        self.logger.info("Starting production MCP server")
        async with self.server:
            await asyncio.Event().wait()

Authentication and Authorization

Token-Based Authentication

import jwt
from typing import Optional
from datetime import datetime, timedelta

class TokenAuth:
    """Handle token-based authentication."""

    def __init__(self, secret_key: str):
        self.secret_key = secret_key

    def generate_token(self, user_id: str, expires_in_hours: int = 24) -> str:
        """Generate JWT token."""
        payload = {
            "user_id": user_id,
            "exp": datetime.utcnow() + timedelta(hours=expires_in_hours),
            "iat": datetime.utcnow()
        }
        return jwt.encode(payload, self.secret_key, algorithm="HS256")

    def verify_token(self, token: str) -> Optional[str]:
        """Verify token and return user_id."""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
            return payload.get("user_id")
        except jwt.InvalidTokenError:
            return None

class AuthenticatedServer:
    """Server with token-based authentication."""

    def __init__(self, secret_key: str):
        self.auth = TokenAuth(secret_key)
        self.current_user: Optional[str] = None
        self.logger = logging.getLogger(__name__)

    def authenticate_request(self, auth_header: Optional[str]) -> bool:
        """Authenticate incoming request."""
        if not auth_header:
            return False

        try:
            # Extract token from "Bearer <token>"
            parts = auth_header.split()
            if len(parts) != 2 or parts[0] != "Bearer":
                return False

            token = parts[1]
            user_id = self.auth.verify_token(token)

            if user_id:
                self.current_user = user_id
                self.logger.info(f"Authenticated user: {user_id}")
                return True

            return False

        except Exception as e:
            self.logger.error(f"Authentication error: {e}")
            return False

Role-Based Access Control

from enum import Enum

class Role(Enum):
    """User roles."""
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"

class RBACServer:
    """Server with role-based access control."""

    def __init__(self):
        self.user_roles: Dict[str, Role] = {}
        self.resource_access: Dict[str, set] = {
            "admin_data": {Role.ADMIN},
            "user_data": {Role.ADMIN, Role.USER},
            "public_data": {Role.ADMIN, Role.USER, Role.GUEST}
        }

    def check_access(self, user_id: str, resource: str) -> bool:
        """Check if user can access resource."""
        user_role = self.user_roles.get(user_id)
        if user_role is None:
            return False

        required_roles = self.resource_access.get(resource, set())
        return user_role in required_roles

    def set_user_role(self, user_id: str, role: Role):
        """Assign role to user."""
        self.user_roles[user_id] = role

Deployment Options

Docker Containerization

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy application
COPY . .

# Set environment
ENV ENVIRONMENT=production
ENV MCP_AUTH_TOKEN=${MCP_AUTH_TOKEN}

# Run server
CMD ["python", "-m", "mcp_server"]
# docker-compose.yml
version: '3.8'

services:
  mcp-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MCP_AUTH_TOKEN=${MCP_AUTH_TOKEN}
      - ENVIRONMENT=production
      - RATE_LIMIT=100
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

Kubernetes Deployment

# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-server
  labels:
    app: mcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-server
  template:
    metadata:
      labels:
        app: mcp-server
    spec:
      containers:
      - name: mcp-server
        image: mcp-server:latest
        ports:
        - containerPort: 8000
        env:
        - name: MCP_AUTH_TOKEN
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: auth-token
        - name: ENVIRONMENT
          value: "production"
        resources:
          requests:
            memory: "128Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: mcp-server-service
spec:
  selector:
    app: mcp-server
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

Monitoring and Observability

from prometheus_client import Counter, Histogram, start_http_server
import time

class MonitoredServer:
    """Server with Prometheus monitoring."""

    def __init__(self):
        # Metrics
        self.request_count = Counter(
            "mcp_requests_total",
            "Total requests",
            ["method"]
        )
        self.request_duration = Histogram(
            "mcp_request_duration_seconds",
            "Request duration",
            ["method"]
        )
        self.errors = Counter(
            "mcp_errors_total",
            "Total errors",
            ["method", "error_type"]
        )

        # Start metrics server
        start_http_server(8001)

    def record_request(self, method: str, duration: float, error: Optional[str] = None):
        """Record request metrics."""
        self.request_count.labels(method=method).inc()
        self.request_duration.labels(method=method).observe(duration)

        if error:
            self.errors.labels(method=method, error_type=error).inc()

Health Checks

class HealthCheckServer:
    """Server with health check endpoints."""

    def __init__(self):
        self.dependencies = {}

    def check_health(self) -> Dict[str, bool]:
        """Check server and dependency health."""
        health = {
            "server": True,
            "dependencies": {}
        }

        # Check dependencies
        for dep_name, dep_check in self.dependencies.items():
            try:
                health["dependencies"][dep_name] = dep_check()
            except Exception:
                health["dependencies"][dep_name] = False
                health["server"] = False

        return health

    def add_dependency_check(self, name: str, check_func):
        """Register dependency health check."""
        self.dependencies[name] = check_func

Configuration Management

from pydantic import BaseSettings

class ProductionSettings(BaseSettings):
    """Production configuration."""
    app_name: str = "mcp-server"
    auth_token: str
    database_url: str
    log_level: str = "INFO"
    max_resource_size: int = 10485760
    request_timeout: int = 30
    rate_limit: int = 100
    environment: str = "production"

    class Config:
        env_file = ".env.production"
        env_file_encoding = "utf-8"

Best Practices Checklist

Security

  • Use environment variables for secrets
  • Implement authentication on all endpoints
  • Validate and sanitize all inputs
  • Use HTTPS in production
  • Implement rate limiting
  • Log security events

Reliability

  • Add health checks
  • Implement circuit breakers
  • Use timeouts on all operations
  • Add retry logic with exponential backoff
  • Monitor error rates
  • Use structured logging

Performance

  • Cache frequently accessed resources
  • Implement pagination for large datasets
  • Monitor response times
  • Profile resource usage
  • Optimize database queries
  • Use connection pooling

Operational Excellence

  • Centralize configuration
  • Use version control for all configs
  • Automate deployment
  • Monitor key metrics
  • Document runbooks
  • Plan for disaster recovery

Key Takeaway

Production deployment of MCP servers requires attention to authentication, authorization, monitoring, health checks, and proper containerization. Following these patterns ensures reliable, secure, and observable servers in production environments.

Exercises

  1. Secured Server: Build a server with token-based authentication and role-based access control.

  2. Docker Deployment: Containerize an MCP server and create a docker-compose file for local testing.

  3. Kubernetes Deployment: Deploy an MCP server to Kubernetes with proper resource limits and health checks.

  4. Monitoring: Add Prometheus metrics to track request rates, latency, and error rates.

  5. Configuration: Implement environment-based configuration management.

  6. Incident Response: Design monitoring and alerting rules for common failure scenarios.