Building Enterprise AI Platforms
Building Enterprise AI Platforms
Enterprise AI platforms serve multiple organizations or departments with shared infrastructure. They require multi-tenancy support, comprehensive governance frameworks, API gateways for controlled access, and compliance with regulatory standards. This lesson covers architecture patterns for building scalable, secure platforms.
Multi-Tenancy Architecture
Multi-tenancy allows multiple independent customers to share infrastructure while maintaining isolation and security.
from typing import Optional, Any
from dataclasses import dataclass
from enum import Enum
import uuid
class IsolationLevel(Enum):
DATABASE_PER_TENANT = "database_per_tenant"
SCHEMA_PER_TENANT = "schema_per_tenant"
ROW_LEVEL = "row_level"
@dataclass
class Tenant:
"""Represents an enterprise customer."""
tenant_id: str
name: str
api_key: str
isolation_level: IsolationLevel
features: list[str]
rate_limits: dict[str, int]
data_location: str = "us-east-1"
custom_models: dict[str, str] = None
@classmethod
def create(cls, name: str, isolation_level: IsolationLevel) -> "Tenant":
"""Create new tenant."""
return cls(
tenant_id=str(uuid.uuid4()),
name=name,
api_key=str(uuid.uuid4()),
isolation_level=isolation_level,
features=["basic"],
rate_limits={"requests_per_minute": 60},
custom_models={}
)
class TenantManager:
"""Manage tenant lifecycle and isolation."""
def __init__(self):
self.tenants: dict[str, Tenant] = {}
self.tenant_contexts: dict[str, dict] = {}
def register_tenant(self, tenant: Tenant):
"""Register new tenant."""
self.tenants[tenant.tenant_id] = tenant
# Create isolated context
if tenant.isolation_level == IsolationLevel.DATABASE_PER_TENANT:
self._create_isolated_database(tenant)
elif tenant.isolation_level == IsolationLevel.SCHEMA_PER_TENANT:
self._create_tenant_schema(tenant)
def _create_isolated_database(self, tenant: Tenant):
"""Create separate database for tenant."""
db_name = f"db_{tenant.tenant_id.replace('-', '_')}"
# In production: CREATE DATABASE IF NOT EXISTS db_name;
self.tenant_contexts[tenant.tenant_id] = {
"database": db_name,
"connection_string": f"postgresql://user:pass@host/{db_name}"
}
def _create_tenant_schema(self, tenant: Tenant):
"""Create schema within shared database."""
schema_name = f"tenant_{tenant.tenant_id.replace('-', '_')}"
# In production: CREATE SCHEMA IF NOT EXISTS schema_name;
self.tenant_contexts[tenant.tenant_id] = {
"schema": schema_name,
"connection_string": "postgresql://user:pass@host/shared_db"
}
def get_tenant(self, api_key: str) -> Optional[Tenant]:
"""Retrieve tenant by API key."""
for tenant in self.tenants.values():
if tenant.api_key == api_key:
return tenant
return None
class TenantContext:
"""Thread-safe tenant context for request handling."""
_current_tenant: dict = {}
@classmethod
def set_tenant(cls, tenant: Tenant):
"""Set current tenant for this request."""
cls._current_tenant = {
"tenant_id": tenant.tenant_id,
"api_key": tenant.api_key,
"isolation_level": tenant.isolation_level
}
@classmethod
def get_tenant_id(cls) -> str:
"""Get current tenant ID."""
return cls._current_tenant.get("tenant_id")
@classmethod
def clear(cls):
"""Clear tenant context."""
cls._current_tenant = {}
API Gateway Pattern
An API gateway serves as single entry point, handling authentication, rate limiting, request routing, and response aggregation.
from typing import Callable, Dict, List
from datetime import datetime, timedelta
import hashlib
import anthropic
class RateLimiter:
"""Track and enforce rate limits."""
def __init__(self):
self.buckets: Dict[str, List[float]] = {}
def is_allowed(self, tenant_id: str, limit: int, window_seconds: int) -> bool:
"""Check if request is within rate limit."""
now = datetime.now().timestamp()
window_start = now - window_seconds
if tenant_id not in self.buckets:
self.buckets[tenant_id] = []
# Clean old requests outside window
self.buckets[tenant_id] = [
ts for ts in self.buckets[tenant_id] if ts > window_start
]
if len(self.buckets[tenant_id]) >= limit:
return False
self.buckets[tenant_id].append(now)
return True
class APIGateway:
"""Enterprise API gateway."""
def __init__(self, tenant_manager: TenantManager):
self.tenant_manager = tenant_manager
self.rate_limiter = RateLimiter()
self.routes: Dict[str, Callable] = {}
self.client = anthropic.Anthropic()
def register_route(self, path: str, handler: Callable):
"""Register request handler."""
self.routes[path] = handler
async def handle_request(
self,
api_key: str,
path: str,
method: str,
body: dict
) -> dict:
"""Handle incoming API request."""
# Authenticate tenant
tenant = self.tenant_manager.get_tenant(api_key)
if not tenant:
return {"error": "Unauthorized", "status": 401}
# Set tenant context
TenantContext.set_tenant(tenant)
# Check rate limit
limit = tenant.rate_limits.get("requests_per_minute", 60)
if not self.rate_limiter.is_allowed(tenant.tenant_id, limit, 60):
return {"error": "Rate limit exceeded", "status": 429}
# Route request
handler = self.routes.get(path)
if not handler:
return {"error": "Not found", "status": 404}
try:
result = await handler(method, body, tenant)
return {"data": result, "status": 200}
except Exception as e:
return {"error": str(e), "status": 500}
async def handle_completion(self, method: str, body: dict, tenant: Tenant):
"""Handle LLM completion requests."""
if method != "POST":
raise ValueError("Method not allowed")
# Use tenant's custom model if available
model = body.get("model", "claude-3-5-sonnet-20241022")
if model in tenant.custom_models:
model = tenant.custom_models[model]
message = await self.client.messages.create(
model=model,
max_tokens=body.get("max_tokens", 1024),
messages=body.get("messages", [])
)
return {
"id": message.id,
"content": message.content[0].text,
"usage": {
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens
}
}
Data Governance and Compliance
Enterprise platforms must enforce data governance policies and maintain audit trails.
from enum import Enum
from typing import Any
from datetime import datetime
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class AuditLog:
"""Record of data access and modifications."""
log_id: str
tenant_id: str
user_id: str
action: str # read, write, delete, export
resource: str
timestamp: datetime
status: str # success, failure
details: dict
class DataGovernance:
"""Enforce data governance policies."""
def __init__(self):
self.policies: Dict[str, dict] = {}
self.audit_logs: List[AuditLog] = []
def add_policy(self, policy_id: str, rules: dict):
"""Add governance policy."""
self.policies[policy_id] = {
"id": policy_id,
"rules": rules,
"created_at": datetime.now(),
"enabled": True
}
def classify_data(self, data: str, tenant_id: str) -> DataClassification:
"""Classify data based on content."""
# Check for sensitive patterns
sensitive_patterns = {
"PII": r"\b\d{3}-\d{2}-\d{4}\b", # SSN
"EMAIL": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"CREDIT_CARD": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b"
}
for pattern_type, pattern in sensitive_patterns.items():
if self._matches_pattern(data, pattern):
return DataClassification.RESTRICTED
return DataClassification.INTERNAL
def _matches_pattern(self, text: str, pattern: str) -> bool:
"""Check if text matches regex pattern."""
import re
return bool(re.search(pattern, text))
def log_access(
self,
tenant_id: str,
user_id: str,
action: str,
resource: str,
status: str = "success"
):
"""Log data access for audit trail."""
log = AuditLog(
log_id=str(uuid.uuid4()),
tenant_id=tenant_id,
user_id=user_id,
action=action,
resource=resource,
timestamp=datetime.now(),
status=status,
details={}
)
self.audit_logs.append(log)
def export_audit_logs(
self,
tenant_id: str,
start_date: datetime,
end_date: datetime
) -> List[AuditLog]:
"""Export audit logs for compliance."""
return [
log for log in self.audit_logs
if log.tenant_id == tenant_id
and start_date <= log.timestamp <= end_date
]
Feature Management
Control feature access per tenant with feature flags.
from typing import Any
class Feature:
"""Represents a platform feature."""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.enabled_for: set[str] = set()
class FeatureManager:
"""Manage feature access per tenant."""
def __init__(self):
self.features: Dict[str, Feature] = {}
def register_feature(self, feature: Feature):
"""Register new feature."""
self.features[feature.name] = feature
def enable_for_tenant(self, feature_name: str, tenant_id: str):
"""Enable feature for specific tenant."""
if feature_name in self.features:
self.features[feature_name].enabled_for.add(tenant_id)
def is_enabled(self, feature_name: str, tenant_id: str) -> bool:
"""Check if feature is enabled for tenant."""
if feature_name not in self.features:
return False
return tenant_id in self.features[feature_name].enabled_for
def get_tenant_features(self, tenant_id: str) -> List[str]:
"""Get all features enabled for tenant."""
return [
feature_name for feature_name, feature in self.features.items()
if tenant_id in feature.enabled_for
]
Billing and Usage Tracking
Track usage metrics for billing and resource optimization.
from typing import Optional
@dataclass
class UsageMetrics:
"""Track tenant usage."""
tenant_id: str
period_start: datetime
period_end: datetime
api_calls: int
tokens_processed: int
model_calls: dict[str, int] # count per model
data_stored_gb: float
custom_models_deployed: int
class UsageTracker:
"""Track and report tenant usage."""
def __init__(self):
self.usage: Dict[str, UsageMetrics] = {}
def record_api_call(self, tenant_id: str):
"""Record API call."""
if tenant_id in self.usage:
self.usage[tenant_id].api_calls += 1
def record_tokens(self, tenant_id: str, count: int, model: str):
"""Record token usage."""
if tenant_id in self.usage:
self.usage[tenant_id].tokens_processed += count
if model not in self.usage[tenant_id].model_calls:
self.usage[tenant_id].model_calls[model] = 0
self.usage[tenant_id].model_calls[model] += 1
def get_usage_report(
self,
tenant_id: str,
start: datetime,
end: datetime
) -> dict:
"""Generate usage report for billing."""
metrics = self.usage.get(tenant_id)
if not metrics:
return {}
return {
"period": f"{start.date()} to {end.date()}",
"api_calls": metrics.api_calls,
"tokens_processed": metrics.tokens_processed,
"model_breakdown": metrics.model_calls,
"estimated_cost": self._calculate_cost(metrics)
}
def _calculate_cost(self, metrics: UsageMetrics) -> float:
"""Calculate usage-based cost."""
# Pricing: $0.003 per 1K input tokens, $0.015 per 1K output tokens
return (metrics.tokens_processed / 1000) * 0.005
Custom Model Management
Allow tenants to deploy custom models within the platform.
@dataclass
class CustomModel:
"""Tenant-deployed custom model."""
model_id: str
tenant_id: str
base_model: str
name: str
version: str
fine_tune_config: dict
deployed_at: datetime
status: str # training, ready, deprecated
class CustomModelManager:
"""Manage custom models per tenant."""
def __init__(self):
self.models: Dict[str, CustomModel] = {}
def deploy_model(
self,
tenant_id: str,
name: str,
base_model: str,
config: dict
) -> CustomModel:
"""Deploy custom model for tenant."""
model = CustomModel(
model_id=str(uuid.uuid4()),
tenant_id=tenant_id,
base_model=base_model,
name=name,
version="1.0",
fine_tune_config=config,
deployed_at=datetime.now(),
status="training"
)
self.models[model.model_id] = model
return model
def get_tenant_models(self, tenant_id: str) -> List[CustomModel]:
"""Get all models deployed by tenant."""
return [m for m in self.models.values()
if m.tenant_id == tenant_id]
Key Takeaway
Enterprise platforms require multi-tenancy isolation, API gateways for controlled access, comprehensive governance with audit trails, feature management, and usage tracking for billing and resource optimization.
Exercises
- Implement row-level isolation with SQL WHERE tenant_id filters
- Build API gateway with authentication and rate limiting
- Create data governance policies with PII detection
- Add feature flags for gradual feature rollout
- Implement usage tracking and billing calculation
- Deploy custom LoRA models per tenant
- Generate compliance reports for GDPR/SOC2 audits