Advanced
Monitoring, Observability, and Cost Control
Monitoring, Observability, and Cost Control
Production LLM systems require comprehensive monitoring to track performance, reliability, costs, and quality metrics. Observability enables rapid diagnosis of issues, while cost control ensures systems remain economically viable. This lesson covers monitoring infrastructure, key metrics, and cost optimization strategies.
Logging LLM Calls
Structured logging captures detailed information about every LLM interaction:
import json
import logging
import time
from datetime import datetime
from typing import Optional, Dict, Any
class LLMCallLogger:
"""Log all LLM API calls with structured data."""
def __init__(self, log_file: str = "llm_calls.log"):
self.logger = logging.getLogger(__name__)
self.log_file = log_file
# Setup file handler with JSON formatter
handler = logging.FileHandler(log_file)
handler.setFormatter(JsonFormatter())
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_call(
self,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
cost: float,
success: bool,
error_message: Optional[str] = None,
metadata: Optional[Dict] = None
):
"""Log an LLM API call."""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"latency_ms": latency_ms,
"cost": cost,
"success": success,
"error_message": error_message,
"metadata": metadata or {}
}
if success:
self.logger.info(json.dumps(log_entry))
else:
self.logger.error(json.dumps(log_entry))
class JsonFormatter(logging.Formatter):
"""Format log records as JSON."""
def format(self, record):
if isinstance(record.msg, str):
try:
data = json.loads(record.msg)
return json.dumps(data)
except:
return record.msg
return record.msg
# Usage
logger = LLMCallLogger()
start_time = time.time()
# Make API call...
latency = (time.time() - start_time) * 1000
logger.log_call(
model="claude-3-5-sonnet-20241022",
input_tokens=150,
output_tokens=450,
latency_ms=latency,
cost=0.0048,
success=True,
metadata={"user_id": "user123", "request_id": "req456"}
)
Distributed Tracing
Track requests across multiple services and systems:
import uuid
from typing import Optional
from datetime import datetime
class TraceContext:
"""Context for distributed tracing."""
def __init__(self, trace_id: Optional[str] = None, span_id: Optional[str] = None):
self.trace_id = trace_id or str(uuid.uuid4())
self.span_id = span_id or str(uuid.uuid4())
class Span:
"""Represents a span in a trace."""
def __init__(self, name: str, trace_context: TraceContext):
self.name = name
self.trace_context = trace_context
self.start_time = datetime.utcnow()
self.end_time: Optional[datetime] = None
self.events = []
self.tags = {}
def add_event(self, name: str, attributes: Dict[str, Any] = None):
"""Add event to span."""
self.events.append({
"timestamp": datetime.utcnow().isoformat(),
"name": name,
"attributes": attributes or {}
})
def set_tag(self, key: str, value: Any):
"""Set span tag."""
self.tags[key] = value
def finish(self):
"""Mark span as finished."""
self.end_time = datetime.utcnow()
def duration_ms(self) -> float:
"""Get span duration in milliseconds."""
end = self.end_time or datetime.utcnow()
return (end - self.start_time).total_seconds() * 1000
def to_dict(self) -> Dict:
"""Convert span to dictionary."""
return {
"trace_id": self.trace_context.trace_id,
"span_id": self.trace_context.span_id,
"name": self.name,
"start_time": self.start_time.isoformat(),
"end_time": (self.end_time or datetime.utcnow()).isoformat(),
"duration_ms": self.duration_ms(),
"events": self.events,
"tags": self.tags
}
class Tracer:
"""Collect and manage traces."""
def __init__(self):
self.spans: Dict[str, Span] = {}
def create_span(self, name: str, trace_context: TraceContext) -> Span:
"""Create new span."""
span = Span(name, trace_context)
self.spans[span.span_id] = span
return span
def export_traces(self) -> list:
"""Export all traces."""
return [span.to_dict() for span in self.spans.values()]
# Usage
tracer = Tracer()
trace_context = TraceContext()
# Trace API request
api_span = tracer.create_span("api_request", trace_context)
api_span.set_tag("endpoint", "/generate")
# Trace LLM call
llm_span = tracer.create_span("llm_call", trace_context)
llm_span.set_tag("model", "claude-3-5-sonnet-20241022")
llm_span.add_event("api_response", {"status": 200})
llm_span.finish()
api_span.finish()
print(tracer.export_traces())
Performance Metrics
Track latency, throughput, and quality:
from statistics import mean, stdev
from collections import deque
class PerformanceMetrics:
"""Track performance metrics."""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.latencies = deque(maxlen=window_size)
self.token_counts = deque(maxlen=window_size)
self.quality_scores = deque(maxlen=window_size)
self.error_count = 0
self.total_requests = 0
def record_request(
self,
latency_ms: float,
tokens: int,
quality_score: float = None,
error: bool = False
):
"""Record a request."""
self.total_requests += 1
if error:
self.error_count += 1
else:
self.latencies.append(latency_ms)
self.token_counts.append(tokens)
if quality_score is not None:
self.quality_scores.append(quality_score)
def get_percentile(self, metric_list: list, percentile: float) -> float:
"""Calculate percentile."""
if not metric_list:
return 0.0
sorted_list = sorted(metric_list)
index = int(len(sorted_list) * percentile / 100)
return sorted_list[min(index, len(sorted_list) - 1)]
def get_metrics(self) -> Dict[str, float]:
"""Get current metrics."""
return {
"p50_latency_ms": self.get_percentile(list(self.latencies), 50),
"p95_latency_ms": self.get_percentile(list(self.latencies), 95),
"p99_latency_ms": self.get_percentile(list(self.latencies), 99),
"avg_latency_ms": mean(self.latencies) if self.latencies else 0,
"avg_tokens": mean(self.token_counts) if self.token_counts else 0,
"avg_quality": mean(self.quality_scores) if self.quality_scores else 0,
"error_rate": self.error_count / self.total_requests if self.total_requests > 0 else 0,
"throughput_rps": len(self.latencies) / sum(self.latencies) * 1000 if self.latencies else 0
}
# Usage
metrics = PerformanceMetrics()
for i in range(100):
metrics.record_request(latency_ms=150 + i, tokens=600, quality_score=0.85)
print(metrics.get_metrics())
Cost Dashboards
Visualize and track spending:
from typing import List
class CostAnalytics:
"""Analyze and visualize costs."""
def __init__(self):
self.calls: List[Dict] = []
def log_call(self, model: str, input_tokens: int, output_tokens: int, cost: float):
"""Log API call."""
self.calls.append({
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": cost,
"timestamp": datetime.utcnow()
})
def get_hourly_costs(self) -> Dict[str, float]:
"""Get costs by hour."""
hourly = {}
for call in self.calls:
hour_key = call["timestamp"].strftime("%Y-%m-%d %H:00")
if hour_key not in hourly:
hourly[hour_key] = 0.0
hourly[hour_key] += call["cost"]
return hourly
def get_model_costs(self) -> Dict[str, float]:
"""Get costs by model."""
model_costs = {}
for call in self.calls:
model = call["model"]
if model not in model_costs:
model_costs[model] = 0.0
model_costs[model] += call["cost"]
return model_costs
def get_cost_breakdown(self) -> Dict[str, Any]:
"""Get comprehensive cost breakdown."""
total_cost = sum(call["cost"] for call in self.calls)
total_tokens = sum(call["input_tokens"] + call["output_tokens"] for call in self.calls)
return {
"total_cost": total_cost,
"total_tokens": total_tokens,
"avg_cost_per_token": total_cost / total_tokens if total_tokens > 0 else 0,
"by_model": self.get_model_costs(),
"by_hour": self.get_hourly_costs(),
"total_calls": len(self.calls)
}
# Usage
analytics = CostAnalytics()
analytics.log_call("claude-3-5-sonnet-20241022", 100, 300, 0.0048)
print(analytics.get_cost_breakdown())
Alerting
Set up alerts for anomalies and issues:
from enum import Enum
class AlertSeverity(Enum):
"""Alert severity levels."""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class AlertRule:
"""Define alerting rule."""
def __init__(
self,
name: str,
condition_func,
severity: AlertSeverity,
action_func
):
self.name = name
self.condition_func = condition_func
self.severity = severity
self.action_func = action_func
def evaluate(self, metrics: Dict) -> Optional[str]:
"""Evaluate rule against metrics."""
if self.condition_func(metrics):
return f"ALERT [{self.severity.value}] {self.name}"
return None
class AlertingSystem:
"""Manage alerts for system health."""
def __init__(self):
self.rules: List[AlertRule] = []
self.alert_history: List[str] = []
def add_rule(self, rule: AlertRule):
"""Register alerting rule."""
self.rules.append(rule)
def check_alerts(self, metrics: Dict):
"""Check all rules against current metrics."""
for rule in self.rules:
alert = rule.evaluate(metrics)
if alert:
self.alert_history.append(alert)
self.trigger_alert(rule, metrics)
def trigger_alert(self, rule: AlertRule, metrics: Dict):
"""Trigger alert action."""
rule.action_func(metrics)
# Setup alerting rules
alerting = AlertingSystem()
# Alert on high error rate
alerting.add_rule(AlertRule(
name="High Error Rate",
condition_func=lambda m: m.get("error_rate", 0) > 0.05,
severity=AlertSeverity.HIGH,
action_func=lambda m: print(f"Error rate critical: {m['error_rate']}")
))
# Alert on high latency
alerting.add_rule(AlertRule(
name="High Latency",
condition_func=lambda m: m.get("p95_latency_ms", 0) > 5000,
severity=AlertSeverity.MEDIUM,
action_func=lambda m: print(f"Latency high: {m['p95_latency_ms']}ms")
))
# Alert on cost overrun
alerting.add_rule(AlertRule(
name="Daily Cost Limit",
condition_func=lambda m: m.get("daily_cost", 0) > 100,
severity=AlertSeverity.HIGH,
action_func=lambda m: print(f"Daily cost limit exceeded: ${m['daily_cost']}")
))
Cost Optimization Strategies
class CostOptimizer:
"""Optimize system costs."""
@staticmethod
def suggest_model_downgrade(current_model: str, metrics: Dict) -> Optional[str]:
"""Suggest cheaper model if quality permits."""
# If quality is high with current model, suggest downgrade
if metrics.get("quality_score", 0) > 0.9:
downgrades = {
"claude-3-opus-20250219": "claude-3-5-sonnet-20241022",
"claude-3-5-sonnet-20241022": "claude-3-haiku-20250307"
}
return downgrades.get(current_model)
return None
@staticmethod
def batch_requests(
requests: List[Dict],
batch_size: int = 10
) -> List[List[Dict]]:
"""Batch requests for efficiency."""
batches = []
for i in range(0, len(requests), batch_size):
batches.append(requests[i:i + batch_size])
return batches
@staticmethod
def cache_responses(
request_hash: str,
response: str,
ttl_seconds: int = 3600
):
"""Cache responses to avoid redundant API calls."""
# Implement cache storage
pass
# Key Takeaway
> Comprehensive monitoring and observability infrastructure enables rapid detection and diagnosis of issues in production LLM systems. Combined with cost tracking and optimization strategies, these practices ensure systems remain both reliable and economically viable.
## Exercises
1. **Logging System**: Implement structured logging for all LLM API calls with detailed metrics.
2. **Distributed Tracing**: Build a tracing system that tracks requests across multiple services.
3. **Performance Dashboard**: Create dashboards showing latency percentiles, throughput, and quality metrics.
4. **Alerting Rules**: Define and implement alerting rules for common failure scenarios.
5. **Cost Analysis**: Build cost breakdown reports by model, time period, and user.
6. **Optimization**: Implement automatic model downgrading based on quality metrics and cost analysis.
This lesson covers essential concepts for production-systems in AI system development.
## Content Overview
This comprehensive lesson explores advanced techniques and patterns for building production-grade AI systems with focus on production-systems.
## Key Concepts
The lesson covers multiple important areas essential for modern AI engineering:
- Architecture and design patterns
- Implementation best practices
- Production considerations
- Scalability and reliability
- Integration and deployment
## Learning Outcomes
By completing this lesson, you'll understand:
1. Core principles and architecture
2. Implementation patterns and strategies
3. Best practices for production
4. Common pitfalls and how to avoid them
5. Real-world use cases and applications
## Practical Examples
Code examples throughout the lesson demonstrate actual implementation patterns used in production systems.
## Key Takeaway
> This lesson provides comprehensive coverage of Monitoring, Observability, and Cost Control, equipping you with knowledge to build robust, scalable AI systems.
## Exercises
1. Implement core concepts from the lesson
2. Build a complete system using patterns covered
3. Test and optimize your implementation
4. Deploy to a production environment
5. Monitor and improve based on metrics
6. Document your work and share learnings