Intermediate

Advanced MCP: Resources, Sampling, and Roots

Lesson 3 of 4 Estimated Time 50 min

Advanced MCP: Resources, Sampling, and Roots

Advanced MCP features enable sophisticated resource management, efficient data handling through sampling, and flexible capability negotiation. This lesson explores these advanced patterns for building powerful, scalable MCP servers.

Resource Templates

Resource templates allow dynamic resource generation without explicitly listing every resource:

from mcp.server import Server
from mcp.types import (
    Resource, ReadResourceResult, ListResourcesResult
)
from typing import List, Dict

class TemplatedResourceServer:
    """Server using resource templates."""

    def __init__(self):
        self.server = Server("templated-server")
        self.setup_handlers()

    def setup_handlers(self):
        """Setup resource handlers."""

        @self.server.list_resources()
        async def list_resources() -> ListResourcesResult:
            """List resources including templates."""
            resources = []

            # Static resources
            resources.append(Resource(
                uri="static:///config",
                name="Server Configuration",
                description="Server settings and metadata"
            ))

            # Template-based resources for database tables
            tables = ["users", "products", "orders", "inventory"]
            for table in tables:
                resources.append(Resource(
                    uri=f"database:///{table}",
                    name=f"Table: {table}",
                    description=f"Access {table} table"
                ))

            # Template-based resources for API endpoints
            endpoints = ["health", "metrics", "logs"]
            for endpoint in endpoints:
                resources.append(Resource(
                    uri=f"api:///{endpoint}",
                    name=f"API: {endpoint}",
                    description=f"Access {endpoint} endpoint"
                ))

            return ListResourcesResult(resources=resources)

        @self.server.read_resource()
        async def read_resource(uri: str) -> str:
            """Read resource with template matching."""
            # Parse template URI
            if uri.startswith("database:///"):
                table = uri.replace("database:///", "")
                return await self._read_table(table)

            elif uri.startswith("api:///"):
                endpoint = uri.replace("api:///", "")
                return await self._read_api(endpoint)

            elif uri == "static:///config":
                return '{"version": "1.0", "status": "healthy"}'

            else:
                raise ValueError(f"Unknown resource: {uri}")

    async def _read_table(self, table: str) -> str:
        """Read database table content."""
        # Fetch table schema and sample data
        return f'{{"table": "{table}", "rows": []}}'

    async def _read_api(self, endpoint: str) -> str:
        """Read API endpoint content."""
        # Fetch from API endpoint
        return f'{{"endpoint": "{endpoint}", "status": "ok"}}'

Sampling and Pagination

For large resources, implement sampling and pagination to avoid overwhelming clients:

from dataclasses import dataclass
from typing import Optional

@dataclass
class SamplingConfig:
    """Configuration for resource sampling."""
    sample_size: int = 1000
    sample_strategy: str = "random"  # random, sequential, stratified

class SampledResourceServer:
    """Server implementing resource sampling."""

    def __init__(self):
        self.server = Server("sampled-server")
        self.sampling_config = SamplingConfig()
        self.setup_handlers()

    def setup_handlers(self):
        """Setup handlers with sampling."""

        @self.server.read_resource()
        async def read_resource(uri: str) -> str:
            """Read resource with sampling support."""
            if uri.startswith("large-dataset:///"):
                dataset_name = uri.replace("large-dataset:///", "")
                return await self._read_large_dataset(dataset_name)
            raise ValueError(f"Unknown resource: {uri}")

    async def _read_large_dataset(self, dataset: str) -> str:
        """Read large dataset with sampling."""
        # Fetch full dataset
        full_data = await self._fetch_dataset(dataset)

        # Apply sampling
        sampled_data = self._apply_sampling(full_data)

        return str(sampled_data)

    def _apply_sampling(self, data: List[Dict]) -> List[Dict]:
        """Apply sampling strategy."""
        import random

        if len(data) <= self.sampling_config.sample_size:
            return data

        config = self.sampling_config

        if config.sample_strategy == "random":
            return random.sample(data, config.sample_size)

        elif config.sample_strategy == "sequential":
            # Take first N items
            return data[:config.sample_size]

        elif config.sample_strategy == "stratified":
            # Stratified sampling across data ranges
            step = len(data) // config.sample_size
            return data[::step][:config.sample_size]

        return data[:config.sample_size]

    async def _fetch_dataset(self, name: str) -> List[Dict]:
        """Fetch dataset from storage."""
        # Simulate large dataset
        return [{"id": i, "value": f"item_{i}"} for i in range(100000)]

Pagination Support

class PaginatedResourceServer:
    """Server with pagination support."""

    def __init__(self):
        self.server = Server("paginated-server")
        self.page_size = 100
        self.setup_handlers()

    def setup_handlers(self):
        """Setup paginated resource handler."""

        @self.server.read_resource()
        async def read_resource(uri: str) -> str:
            """Read paginated resource."""
            # Parse pagination from URI
            # Format: resource://path?page=1&page_size=50

            import urllib.parse

            parsed = urllib.parse.urlparse(uri)
            params = urllib.parse.parse_qs(parsed.query)

            page = int(params.get("page", ["1"])[0])
            page_size = int(params.get("page_size", [str(self.page_size)])[0])

            # Fetch paginated data
            start = (page - 1) * page_size
            end = start + page_size

            data = await self._fetch_range(parsed.path, start, end)

            return str({
                "page": page,
                "page_size": page_size,
                "data": data,
                "has_next": end < await self._get_total_count(parsed.path)
            })

    async def _fetch_range(self, resource: str, start: int, end: int) -> List:
        """Fetch range of data."""
        # Fetch items from start to end
        return list(range(start, end))

    async def _get_total_count(self, resource: str) -> int:
        """Get total count for pagination."""
        return 10000

Capability Negotiation

Servers and clients exchange capabilities to determine supported features:

from typing import Dict, Any

@dataclass
class ServerCapabilities:
    """Define server capabilities."""
    resources: Dict[str, Any]
    tools: Dict[str, Any]
    prompts: Dict[str, Any]

class CapabilityNegotiatingServer:
    """Server supporting capability negotiation."""

    def __init__(self):
        self.server = Server("capability-server")
        self.capabilities = self._define_capabilities()
        self.client_capabilities = {}
        self.setup_handlers()

    def _define_capabilities(self) -> ServerCapabilities:
        """Define what this server supports."""
        return ServerCapabilities(
            resources={
                "subscribe": True,  # Server supports resource subscriptions
                "streaming": True,  # Server supports streaming resources
                "sampling": {
                    "enabled": True,
                    "strategies": ["random", "sequential", "stratified"]
                }
            },
            tools={
                "async": True,  # Tools can be async
                "streaming": False,  # No streaming tool results
                "batching": True  # Can batch multiple tool calls
            },
            prompts={
                "dynamic": True,  # Supports dynamic prompts
                "caching": True  # Supports prompt caching
            }
        )

    def setup_handlers(self):
        """Setup capability-aware handlers."""

        @self.server.initialize()
        async def initialize(request: Dict) -> Dict:
            """Initialize with capability exchange."""
            client_caps = request.get("capabilities", {})
            self.client_capabilities = client_caps

            # Return server capabilities
            return {
                "serverInfo": {
                    "name": "capability-server",
                    "version": "1.0.0"
                },
                "capabilities": {
                    "resources": self.capabilities.resources,
                    "tools": self.capabilities.tools,
                    "prompts": self.capabilities.prompts
                }
            }

    def client_supports(self, capability: str) -> bool:
        """Check if client supports a capability."""
        # Navigate capability tree
        parts = capability.split(".")
        current = self.client_capabilities

        for part in parts:
            if isinstance(current, dict):
                current = current.get(part)
            else:
                return False

        return bool(current)

Root Resources

Root resources provide entry points into resource hierarchies:

class HierarchicalResourceServer:
    """Server with root resources and hierarchies."""

    def __init__(self):
        self.server = Server("hierarchical-server")
        self.setup_handlers()

    def setup_handlers(self):
        """Setup hierarchical resource structure."""

        @self.server.list_resources()
        async def list_resources() -> ListResourcesResult:
            """List root resources."""
            return ListResourcesResult(
                resources=[
                    Resource(
                        uri="workspace://",
                        name="Workspace Root",
                        description="Root of workspace hierarchy"
                    ),
                    Resource(
                        uri="knowledge://",
                        name="Knowledge Base Root",
                        description="Root of knowledge base"
                    ),
                    Resource(
                        uri="datastore://",
                        name="Datastore Root",
                        description="Root of data storage"
                    )
                ]
            )

        @self.server.read_resource()
        async def read_resource(uri: str) -> str:
            """Read resource or list children."""
            if uri == "workspace://":
                return await self._list_workspace_children()
            elif uri.startswith("workspace://"):
                return await self._read_workspace_resource(uri)
            elif uri.startswith("knowledge://"):
                return await self._read_knowledge_resource(uri)
            else:
                raise ValueError(f"Unknown resource: {uri}")

    async def _list_workspace_children(self) -> str:
        """List children of workspace root."""
        children = [
            "workspace://projects",
            "workspace://files",
            "workspace://settings"
        ]
        return str({"children": children})

    async def _read_workspace_resource(self, uri: str) -> str:
        """Read workspace resource."""
        resource_type = uri.replace("workspace://", "").split("/")[0]
        return f'{{"type": "{resource_type}", "items": []}}'

    async def _read_knowledge_resource(self, uri: str) -> str:
        """Read knowledge base resource."""
        return '{"documents": []}'

Subscription Support

Allow clients to subscribe to resource updates:

from typing import Callable, List
import asyncio

class SubscribableResourceServer:
    """Server supporting resource subscriptions."""

    def __init__(self):
        self.server = Server("subscribable-server")
        self.subscribers: Dict[str, List[Callable]] = {}
        self.setup_handlers()

    def setup_handlers(self):
        """Setup subscription handlers."""

        @self.server.subscribe_resource()
        async def subscribe(uri: str) -> None:
            """Subscribe to resource updates."""
            if uri not in self.subscribers:
                self.subscribers[uri] = []

            async def send_update():
                """Periodically send updates."""
                while True:
                    await asyncio.sleep(5)
                    # Generate update
                    update = await self._generate_update(uri)
                    # Send to all subscribers
                    for callback in self.subscribers[uri]:
                        await callback(update)

            task = asyncio.create_task(send_update())
            self.subscribers[uri].append(lambda update: self._handle_update(uri, update))

        @self.server.unsubscribe_resource()
        async def unsubscribe(uri: str) -> None:
            """Unsubscribe from resource."""
            if uri in self.subscribers:
                del self.subscribers[uri]

    async def _generate_update(self, uri: str) -> str:
        """Generate resource update."""
        return f'{{"uri": "{uri}", "timestamp": 0}}'

    async def _handle_update(self, uri: str, update: str) -> None:
        """Handle and broadcast update."""
        pass

Best Practices for Advanced Features

1. Resource Versioning

Track resource versions for change detection:

@dataclass
class VersionedResource:
    """Resource with version tracking."""
    uri: str
    version: int
    content: str
    modified_at: float

2. Efficient Caching

Cache frequently accessed resources:

import time
from functools import lru_cache

class CachingResourceServer:
    """Server with resource caching."""

    def __init__(self):
        self.cache = {}
        self.cache_ttl = 3600  # 1 hour

    def _get_cached(self, uri: str) -> Optional[str]:
        """Get cached resource."""
        if uri in self.cache:
            content, timestamp = self.cache[uri]
            if time.time() - timestamp < self.cache_ttl:
                return content
            del self.cache[uri]
        return None

    def _set_cache(self, uri: str, content: str):
        """Cache resource."""
        self.cache[uri] = (content, time.time())

3. Graceful Degradation

Implement fallbacks when features unavailable:

async def read_resource_graceful(self, uri: str) -> str:
    """Read resource with graceful fallback."""
    try:
        # Try premium feature
        return await self._read_with_sampling(uri)
    except NotImplementedError:
        # Fallback to basic reading
        return await self._read_basic(uri)

Key Takeaway

Advanced MCP features like resource templates, sampling, pagination, and capability negotiation enable building sophisticated servers that scale efficiently and adapt to client capabilities. These patterns are essential for production systems handling large datasets and diverse client requirements.

Exercises

  1. Template Resources: Build a server that dynamically generates resources based on templates for database tables and API endpoints.

  2. Sampling Implementation: Implement multiple sampling strategies (random, sequential, stratified) for large datasets.

  3. Pagination: Add pagination support to resources with queryable page parameters.

  4. Capability Negotiation: Create a server that reports its capabilities and adapts behavior based on client capabilities.

  5. Hierarchical Resources: Design a resource hierarchy with root resources and child exploration.

  6. Performance Testing: Compare performance of sampled vs. full resources and measure caching effectiveness.