Advanced
AI Incident Detection
AI Incident Detection
Monitoring for AI-Specific Attacks
Detecting security incidents in AI systems requires understanding what normal looks like and identifying anomalies. This lesson covers detection strategies specific to AI.
AI-Specific Anomalies
Unlike traditional systems, AI anomalies include:
class AIAnomalyDetector:
def __init__(self):
self.baselines = {
'response_time': 2.5, # seconds
'error_rate': 0.01, # 1%
'injection_attempts': 0.1, # per 1000 requests
'data_extracted': 0, # bytes/hour
}
def detect_anomalies(self, current_metrics):
"""Detect AI-specific anomalies."""
anomalies = []
# 1. Behavioral Drift
if self.detect_behavioral_drift(current_metrics):
anomalies.append({
'type': 'behavioral_drift',
'severity': 'HIGH',
'description': 'Model outputs have changed unexpectedly'
})
# 2. Data Exfiltration
if self.detect_data_exfiltration(current_metrics):
anomalies.append({
'type': 'data_exfiltration',
'severity': 'CRITICAL',
'description': 'Unusual amounts of sensitive data in responses'
})
# 3. Injection Attack Pattern
if self.detect_injection_patterns(current_metrics):
anomalies.append({
'type': 'injection_attack',
'severity': 'CRITICAL',
'description': 'Multiple attempted prompt injections detected'
})
# 4. Resource Exhaustion
if self.detect_dos(current_metrics):
anomalies.append({
'type': 'denial_of_service',
'severity': 'HIGH',
'description': 'Unusual spike in requests (potential DoS)'
})
# 5. Model Performance Degradation
if self.detect_performance_degradation(current_metrics):
anomalies.append({
'type': 'performance_degradation',
'severity': 'MEDIUM',
'description': 'Model accuracy has decreased'
})
return anomalies
def detect_behavioral_drift(self, metrics):
"""Detect if model behavior has changed unexpectedly."""
# Compare recent outputs to baseline
# If suddenly generating very different content, might indicate:
# - Model poisoning
# - Fine-tuning gone wrong
# - Injection attack
drift_score = self.calculate_output_drift(metrics)
return drift_score > 0.3 # 30% change = anomaly
def detect_data_exfiltration(self, metrics):
"""Detect if sensitive data is being leaked."""
# Monitor for:
# - Unusual patterns in outputs (structured data, lists)
# - PII detected in responses
# - Same data appearing repeatedly (suggests extraction)
pii_in_output = metrics.get('pii_detected', 0)
return pii_in_output > 0 # Any PII = concern
def detect_injection_patterns(self, metrics):
"""Detect attempted prompt injections."""
injection_attempts = metrics.get('injection_attempts', 0)
baseline = self.baselines['injection_attempts']
return injection_attempts > baseline * 10 # 10x spike
def detect_dos(self, metrics):
"""Detect denial of service attacks."""
request_rate = metrics.get('requests_per_minute', 0)
# Baseline: 100 req/min, alert at 1000 req/min
return request_rate > 1000
def detect_performance_degradation(self, metrics):
"""Detect model performance issues."""
current_accuracy = metrics.get('accuracy', 0.95)
baseline = self.baselines.get('accuracy', 0.95)
# Alert if accuracy drops >5%
return (baseline - current_accuracy) / baseline > 0.05
Real-Time Monitoring
class RealtimeAIMonitoring:
def __init__(self):
self.metrics_buffer = []
self.alert_queue = []
def process_request(self, request_data):
"""Process request and collect metrics."""
# Extract metrics
metrics = {
'timestamp': datetime.now(),
'request_id': request_data['id'],
'prompt_length': len(request_data['prompt']),
'response_length': len(request_data['response']),
'response_time': request_data['duration_ms'],
'contains_pii': self.detect_pii(request_data['response']),
'injection_detected': self.detect_injection(request_data['prompt']),
}
# Store in buffer
self.metrics_buffer.append(metrics)
# Analyze for anomalies
if len(self.metrics_buffer) > 100:
anomalies = self.analyze_metrics()
for anomaly in anomalies:
self.alert_queue.append(anomaly)
# Immediate action for critical
if anomaly['severity'] == 'CRITICAL':
self.take_emergency_action(anomaly)
return metrics
def analyze_metrics(self):
"""Analyze buffered metrics for patterns."""
recent = self.metrics_buffer[-100:]
# Calculate statistics
stats = {
'avg_response_time': sum(m['response_time'] for m in recent) / len(recent),
'error_count': sum(1 for m in recent if m.get('error')),
'pii_detections': sum(1 for m in recent if m['contains_pii']),
'injection_attempts': sum(1 for m in recent if m['injection_detected']),
}
# Detect anomalies
anomalies = []
if stats['pii_detections'] > 5:
anomalies.append({
'type': 'data_leakage',
'severity': 'CRITICAL',
'count': stats['pii_detections']
})
if stats['injection_attempts'] > 10:
anomalies.append({
'type': 'injection_attack',
'severity': 'CRITICAL',
'count': stats['injection_attempts']
})
if stats['avg_response_time'] > 10000: # 10 seconds
anomalies.append({
'type': 'slowdown',
'severity': 'MEDIUM',
'avg_time': stats['avg_response_time']
})
return anomalies
def take_emergency_action(self, anomaly):
"""Take immediate action on critical anomalies."""
if anomaly['type'] == 'data_leakage':
# Stop service to prevent further leakage
self.alert_team('CRITICAL: Data leakage detected, service suspended')
self.suspend_service()
elif anomaly['type'] == 'injection_attack':
# Enable extra filtering
self.alert_team('CRITICAL: Injection attack detected, extra filtering enabled')
self.enable_emergency_filtering()
Logging for Investigation
class AISecurityLogging:
def __init__(self):
self.logs = []
def log_request(self, request_id, request, response, metadata):
"""Log request for investigation."""
log_entry = {
'timestamp': datetime.now(),
'request_id': request_id,
'prompt': request['prompt'][:200], # Truncate for privacy
'response': response[:200],
'prompt_hash': hashlib.sha256(request['prompt'].encode()).hexdigest(),
'response_hash': hashlib.sha256(response.encode()).hexdigest(),
'metadata': metadata,
'suspicious_indicators': [],
}
# Check for suspicious content
if self.has_injection_indicators(request['prompt']):
log_entry['suspicious_indicators'].append('injection_attempt')
if self.contains_pii(response):
log_entry['suspicious_indicators'].append('pii_in_response')
self.logs.append(log_entry)
return log_entry
def has_injection_indicators(self, prompt):
"""Check for injection indicators."""
patterns = [
r'ignore.*instruction',
r'system.*prompt',
r'new instruction',
]
return any(re.search(p, prompt, re.IGNORECASE) for p in patterns)
def contains_pii(self, text):
"""Check if text contains PII."""
patterns = {
'ssn': r'\d{3}-\d{2}-\d{4}',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b',
}
return any(re.search(p, text) for p in patterns.values())
def query_logs(self, criteria):
"""Query logs for investigation."""
results = []
for log in self.logs:
if 'time_range' in criteria:
if not (criteria['time_range'][0] <= log['timestamp'] <= criteria['time_range'][1]):
continue
if 'suspicious_only' in criteria and criteria['suspicious_only']:
if not log['suspicious_indicators']:
continue
results.append(log)
return results
Key Takeaway
Key Takeaway: AI incident detection requires monitoring for AI-specific anomalies: behavioral drift, data exfiltration, injection attempts, resource exhaustion, and performance degradation. Use real-time metrics collection, pattern analysis, and detailed logging to detect and investigate incidents.
Exercise: Build Detection System
- Define baselines for your AI system
- Implement anomaly detection for each threat
- Set up real-time monitoring of requests/responses
- Create alerting for critical anomalies
- Implement detailed logging for investigation
- Test detection with simulated attacks
Next Lesson: Incident Response Procedures—responding to detected incidents. EOF