Advanced

Monitoring, Observability, and Cost Control

Lesson 2 of 4 Estimated Time 55 min

Monitoring, Observability, and Cost Control

Production LLM systems require comprehensive monitoring to track performance, reliability, costs, and quality metrics. Observability enables rapid diagnosis of issues, while cost control ensures systems remain economically viable. This lesson covers monitoring infrastructure, key metrics, and cost optimization strategies.

Logging LLM Calls

Structured logging captures detailed information about every LLM interaction:

import json
import logging
import time
from datetime import datetime
from typing import Optional, Dict, Any

class LLMCallLogger:
    """Log all LLM API calls with structured data."""

    def __init__(self, log_file: str = "llm_calls.log"):
        self.logger = logging.getLogger(__name__)
        self.log_file = log_file

        # Setup file handler with JSON formatter
        handler = logging.FileHandler(log_file)
        handler.setFormatter(JsonFormatter())
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_call(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: float,
        cost: float,
        success: bool,
        error_message: Optional[str] = None,
        metadata: Optional[Dict] = None
    ):
        """Log an LLM API call."""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "total_tokens": input_tokens + output_tokens,
            "latency_ms": latency_ms,
            "cost": cost,
            "success": success,
            "error_message": error_message,
            "metadata": metadata or {}
        }

        if success:
            self.logger.info(json.dumps(log_entry))
        else:
            self.logger.error(json.dumps(log_entry))

class JsonFormatter(logging.Formatter):
    """Format log records as JSON."""

    def format(self, record):
        if isinstance(record.msg, str):
            try:
                data = json.loads(record.msg)
                return json.dumps(data)
            except:
                return record.msg
        return record.msg

# Usage
logger = LLMCallLogger()

start_time = time.time()
# Make API call...
latency = (time.time() - start_time) * 1000

logger.log_call(
    model="claude-3-5-sonnet-20241022",
    input_tokens=150,
    output_tokens=450,
    latency_ms=latency,
    cost=0.0048,
    success=True,
    metadata={"user_id": "user123", "request_id": "req456"}
)

Distributed Tracing

Track requests across multiple services and systems:

import uuid
from typing import Optional
from datetime import datetime

class TraceContext:
    """Context for distributed tracing."""

    def __init__(self, trace_id: Optional[str] = None, span_id: Optional[str] = None):
        self.trace_id = trace_id or str(uuid.uuid4())
        self.span_id = span_id or str(uuid.uuid4())

class Span:
    """Represents a span in a trace."""

    def __init__(self, name: str, trace_context: TraceContext):
        self.name = name
        self.trace_context = trace_context
        self.start_time = datetime.utcnow()
        self.end_time: Optional[datetime] = None
        self.events = []
        self.tags = {}

    def add_event(self, name: str, attributes: Dict[str, Any] = None):
        """Add event to span."""
        self.events.append({
            "timestamp": datetime.utcnow().isoformat(),
            "name": name,
            "attributes": attributes or {}
        })

    def set_tag(self, key: str, value: Any):
        """Set span tag."""
        self.tags[key] = value

    def finish(self):
        """Mark span as finished."""
        self.end_time = datetime.utcnow()

    def duration_ms(self) -> float:
        """Get span duration in milliseconds."""
        end = self.end_time or datetime.utcnow()
        return (end - self.start_time).total_seconds() * 1000

    def to_dict(self) -> Dict:
        """Convert span to dictionary."""
        return {
            "trace_id": self.trace_context.trace_id,
            "span_id": self.trace_context.span_id,
            "name": self.name,
            "start_time": self.start_time.isoformat(),
            "end_time": (self.end_time or datetime.utcnow()).isoformat(),
            "duration_ms": self.duration_ms(),
            "events": self.events,
            "tags": self.tags
        }

class Tracer:
    """Collect and manage traces."""

    def __init__(self):
        self.spans: Dict[str, Span] = {}

    def create_span(self, name: str, trace_context: TraceContext) -> Span:
        """Create new span."""
        span = Span(name, trace_context)
        self.spans[span.span_id] = span
        return span

    def export_traces(self) -> list:
        """Export all traces."""
        return [span.to_dict() for span in self.spans.values()]

# Usage
tracer = Tracer()
trace_context = TraceContext()

# Trace API request
api_span = tracer.create_span("api_request", trace_context)
api_span.set_tag("endpoint", "/generate")

# Trace LLM call
llm_span = tracer.create_span("llm_call", trace_context)
llm_span.set_tag("model", "claude-3-5-sonnet-20241022")
llm_span.add_event("api_response", {"status": 200})
llm_span.finish()

api_span.finish()

print(tracer.export_traces())

Performance Metrics

Track latency, throughput, and quality:

from statistics import mean, stdev
from collections import deque

class PerformanceMetrics:
    """Track performance metrics."""

    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.latencies = deque(maxlen=window_size)
        self.token_counts = deque(maxlen=window_size)
        self.quality_scores = deque(maxlen=window_size)
        self.error_count = 0
        self.total_requests = 0

    def record_request(
        self,
        latency_ms: float,
        tokens: int,
        quality_score: float = None,
        error: bool = False
    ):
        """Record a request."""
        self.total_requests += 1

        if error:
            self.error_count += 1
        else:
            self.latencies.append(latency_ms)
            self.token_counts.append(tokens)

            if quality_score is not None:
                self.quality_scores.append(quality_score)

    def get_percentile(self, metric_list: list, percentile: float) -> float:
        """Calculate percentile."""
        if not metric_list:
            return 0.0

        sorted_list = sorted(metric_list)
        index = int(len(sorted_list) * percentile / 100)
        return sorted_list[min(index, len(sorted_list) - 1)]

    def get_metrics(self) -> Dict[str, float]:
        """Get current metrics."""
        return {
            "p50_latency_ms": self.get_percentile(list(self.latencies), 50),
            "p95_latency_ms": self.get_percentile(list(self.latencies), 95),
            "p99_latency_ms": self.get_percentile(list(self.latencies), 99),
            "avg_latency_ms": mean(self.latencies) if self.latencies else 0,
            "avg_tokens": mean(self.token_counts) if self.token_counts else 0,
            "avg_quality": mean(self.quality_scores) if self.quality_scores else 0,
            "error_rate": self.error_count / self.total_requests if self.total_requests > 0 else 0,
            "throughput_rps": len(self.latencies) / sum(self.latencies) * 1000 if self.latencies else 0
        }

# Usage
metrics = PerformanceMetrics()

for i in range(100):
    metrics.record_request(latency_ms=150 + i, tokens=600, quality_score=0.85)

print(metrics.get_metrics())

Cost Dashboards

Visualize and track spending:

from typing import List

class CostAnalytics:
    """Analyze and visualize costs."""

    def __init__(self):
        self.calls: List[Dict] = []

    def log_call(self, model: str, input_tokens: int, output_tokens: int, cost: float):
        """Log API call."""
        self.calls.append({
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost": cost,
            "timestamp": datetime.utcnow()
        })

    def get_hourly_costs(self) -> Dict[str, float]:
        """Get costs by hour."""
        hourly = {}

        for call in self.calls:
            hour_key = call["timestamp"].strftime("%Y-%m-%d %H:00")

            if hour_key not in hourly:
                hourly[hour_key] = 0.0

            hourly[hour_key] += call["cost"]

        return hourly

    def get_model_costs(self) -> Dict[str, float]:
        """Get costs by model."""
        model_costs = {}

        for call in self.calls:
            model = call["model"]

            if model not in model_costs:
                model_costs[model] = 0.0

            model_costs[model] += call["cost"]

        return model_costs

    def get_cost_breakdown(self) -> Dict[str, Any]:
        """Get comprehensive cost breakdown."""
        total_cost = sum(call["cost"] for call in self.calls)
        total_tokens = sum(call["input_tokens"] + call["output_tokens"] for call in self.calls)

        return {
            "total_cost": total_cost,
            "total_tokens": total_tokens,
            "avg_cost_per_token": total_cost / total_tokens if total_tokens > 0 else 0,
            "by_model": self.get_model_costs(),
            "by_hour": self.get_hourly_costs(),
            "total_calls": len(self.calls)
        }

# Usage
analytics = CostAnalytics()
analytics.log_call("claude-3-5-sonnet-20241022", 100, 300, 0.0048)
print(analytics.get_cost_breakdown())

Alerting

Set up alerts for anomalies and issues:

from enum import Enum

class AlertSeverity(Enum):
    """Alert severity levels."""
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

class AlertRule:
    """Define alerting rule."""

    def __init__(
        self,
        name: str,
        condition_func,
        severity: AlertSeverity,
        action_func
    ):
        self.name = name
        self.condition_func = condition_func
        self.severity = severity
        self.action_func = action_func

    def evaluate(self, metrics: Dict) -> Optional[str]:
        """Evaluate rule against metrics."""
        if self.condition_func(metrics):
            return f"ALERT [{self.severity.value}] {self.name}"
        return None

class AlertingSystem:
    """Manage alerts for system health."""

    def __init__(self):
        self.rules: List[AlertRule] = []
        self.alert_history: List[str] = []

    def add_rule(self, rule: AlertRule):
        """Register alerting rule."""
        self.rules.append(rule)

    def check_alerts(self, metrics: Dict):
        """Check all rules against current metrics."""
        for rule in self.rules:
            alert = rule.evaluate(metrics)

            if alert:
                self.alert_history.append(alert)
                self.trigger_alert(rule, metrics)

    def trigger_alert(self, rule: AlertRule, metrics: Dict):
        """Trigger alert action."""
        rule.action_func(metrics)

# Setup alerting rules
alerting = AlertingSystem()

# Alert on high error rate
alerting.add_rule(AlertRule(
    name="High Error Rate",
    condition_func=lambda m: m.get("error_rate", 0) > 0.05,
    severity=AlertSeverity.HIGH,
    action_func=lambda m: print(f"Error rate critical: {m['error_rate']}")
))

# Alert on high latency
alerting.add_rule(AlertRule(
    name="High Latency",
    condition_func=lambda m: m.get("p95_latency_ms", 0) > 5000,
    severity=AlertSeverity.MEDIUM,
    action_func=lambda m: print(f"Latency high: {m['p95_latency_ms']}ms")
))

# Alert on cost overrun
alerting.add_rule(AlertRule(
    name="Daily Cost Limit",
    condition_func=lambda m: m.get("daily_cost", 0) > 100,
    severity=AlertSeverity.HIGH,
    action_func=lambda m: print(f"Daily cost limit exceeded: ${m['daily_cost']}")
))

Cost Optimization Strategies

class CostOptimizer:
    """Optimize system costs."""

    @staticmethod
    def suggest_model_downgrade(current_model: str, metrics: Dict) -> Optional[str]:
        """Suggest cheaper model if quality permits."""
        # If quality is high with current model, suggest downgrade
        if metrics.get("quality_score", 0) > 0.9:
            downgrades = {
                "claude-3-opus-20250219": "claude-3-5-sonnet-20241022",
                "claude-3-5-sonnet-20241022": "claude-3-haiku-20250307"
            }
            return downgrades.get(current_model)
        return None

    @staticmethod
    def batch_requests(
        requests: List[Dict],
        batch_size: int = 10
    ) -> List[List[Dict]]:
        """Batch requests for efficiency."""
        batches = []
        for i in range(0, len(requests), batch_size):
            batches.append(requests[i:i + batch_size])
        return batches

    @staticmethod
    def cache_responses(
        request_hash: str,
        response: str,
        ttl_seconds: int = 3600
    ):
        """Cache responses to avoid redundant API calls."""
        # Implement cache storage
        pass

# Key Takeaway

> Comprehensive monitoring and observability infrastructure enables rapid detection and diagnosis of issues in production LLM systems. Combined with cost tracking and optimization strategies, these practices ensure systems remain both reliable and economically viable.

## Exercises

1. **Logging System**: Implement structured logging for all LLM API calls with detailed metrics.

2. **Distributed Tracing**: Build a tracing system that tracks requests across multiple services.

3. **Performance Dashboard**: Create dashboards showing latency percentiles, throughput, and quality metrics.

4. **Alerting Rules**: Define and implement alerting rules for common failure scenarios.

5. **Cost Analysis**: Build cost breakdown reports by model, time period, and user.

6. **Optimization**: Implement automatic model downgrading based on quality metrics and cost analysis.

This lesson covers essential concepts for production-systems in AI system development.

## Content Overview

This comprehensive lesson explores advanced techniques and patterns for building production-grade AI systems with focus on production-systems.

## Key Concepts

The lesson covers multiple important areas essential for modern AI engineering:

- Architecture and design patterns
- Implementation best practices
- Production considerations
- Scalability and reliability
- Integration and deployment

## Learning Outcomes

By completing this lesson, you'll understand:

1. Core principles and architecture
2. Implementation patterns and strategies  
3. Best practices for production
4. Common pitfalls and how to avoid them
5. Real-world use cases and applications

## Practical Examples

Code examples throughout the lesson demonstrate actual implementation patterns used in production systems.

## Key Takeaway

> This lesson provides comprehensive coverage of Monitoring, Observability, and Cost Control, equipping you with knowledge to build robust, scalable AI systems.

## Exercises

1. Implement core concepts from the lesson
2. Build a complete system using patterns covered
3. Test and optimize your implementation
4. Deploy to a production environment
5. Monitor and improve based on metrics
6. Document your work and share learnings