AI Security Forensics
AI Security Forensics
Overview
Forensic investigation of AI incidents requires understanding data flow, model behavior, and decision provenance. Unlike traditional security incidents, AI forensics must reconstruct not just attack vectors but also how suspicious patterns were generated and what impact they had on decisions.
Log Analysis for AI Systems
Types of Logs to Collect
AI systems generate multiple classes of forensically valuable logs:
Log Categories for Forensic Analysis:
Application Logs:
Description: "AI system behavior and decision-making"
Captures:
- "Input data received by model"
- "Features extracted and used"
- "Model predictions and confidence scores"
- "Decisions made by system"
- "Human overrides and reasons"
Analysis Value:
- "Timeline of predictions"
- "Pattern of decision changes"
- "Relationship between inputs and outputs"
Collection Requirements:
- "Capture raw inputs and outputs"
- "Include all metadata (timestamp, user, context)"
- "Log high-confidence predictions (avoid privacy issues)"
System and Infrastructure Logs:
Description: "System resource usage, errors, deployments"
Captures:
- "Model loading and initialization"
- "Code/model deployments"
- "System errors and crashes"
- "Resource usage (CPU, memory, GPU)"
- "Network connections"
Analysis Value:
- "When suspicious code was deployed"
- "System state during incident period"
- "Unusual resource consumption patterns"
- "Timing of events"
Access and Authentication Logs:
Description: "Who accessed systems, when, and what they did"
Captures:
- "User logins to AI systems"
- "API authentication events"
- "Data access events"
- "Administrative actions"
- "Code repository commits"
Analysis Value:
- "Who had opportunity to compromise system"
- "Unusual access patterns"
- "Privilege escalation attempts"
- "Unauthorized access detection"
Data Pipeline Logs:
Description: "Data flow through preprocessing and training"
Captures:
- "Data ingestion events"
- "Preprocessing transformations"
- "Feature engineering steps"
- "Training data composition"
- "Data quality checks"
Analysis Value:
- "Whether data was tampered with"
- "Unusual data patterns"
- "Introduction of suspicious data"
- "Timing of data changes"
Model Registry Logs:
Description: "Model versioning and deployment history"
Captures:
- "Model uploads and registrations"
- "Version deployment events"
- "Model performance metrics"
- "Rollback events"
- "Model approval/promotion"
Analysis Value:
- "Timeline of model changes"
- "Who deployed problematic versions"
- "Performance degradation timing"
- "Correlation with incident timeline"
Log Retention and Forensic Readiness
Effective forensics requires maintaining logs long enough to investigate incidents months later:
Log Retention Requirements:
Hot Storage (Immediately Accessible):
Duration: "Last 30 days"
Systems: "Application logs, recent decisions"
Access: "Real-time queryable"
Cost: "Higher; trade-off for immediate access"
Warm Storage (Searchable Archive):
Duration: "30 days to 1 year"
Systems: "Complete logs, indexed"
Access: "Search in hours"
Cost: "Medium; balance of access and cost"
Cold Storage (Long-term Archive):
Duration: "1-7 years"
Systems: "Compressed, encrypted archives"
Access: "Retrieval in days"
Cost: "Low; suitable for compliance/audit"
Regulatory Minimums:
Financial Services: "7+ years for decision records"
Healthcare: "Varies by data type; 6-10 years typical"
Government: "6-7 years for federal records"
EU GDPR: "Generally data should be deleted, but incident investigation data may be retained"
Incident Implications:
- "Inability to access logs > 1 year old limits deep forensics"
- "Loss of logs prevents proof of incident or compliance violations"
- "Retention inadequacy can indicate negligence"
Reconstructing Attack Chains
Timeline Reconstruction
The first step in forensics is establishing a precise timeline:
# Timeline Reconstruction for AI Incident
import json
from datetime import datetime
from typing import List, Dict
class TimelineReconstructor:
def __init__(self, incident_start, incident_end):
self.start = incident_start
self.end = incident_end
self.events = []
def add_event(self, timestamp: datetime, source: str, event: str, severity: str):
"""Add timestamped event to timeline"""
self.events.append({
'timestamp': timestamp,
'source': source, # 'access_log', 'app_log', 'model_registry', etc.
'event': event,
'severity': severity
})
def correlate_events(self) -> Dict:
"""Find correlations between events"""
# Sort by timestamp
sorted_events = sorted(self.events, key=lambda x: x['timestamp'])
# Identify suspicious patterns
suspicious_patterns = []
# Pattern 1: Privileged access before unexpected change
for i, event in enumerate(sorted_events):
if event['source'] == 'access_log' and 'privilege_escalation' in event['event']:
# Look for changes in next 1 hour
next_hour_events = [e for e in sorted_events[i+1:]
if (e['timestamp'] - event['timestamp']).total_seconds() < 3600]
if next_hour_events:
suspicious_patterns.append({
'type': 'Privilege Escalation Followed by Change',
'trigger_event': event,
'subsequent_events': next_hour_events
})
# Pattern 2: Data ingestion followed by model degradation
for i, event in enumerate(sorted_events):
if event['source'] == 'data_pipeline' and 'ingestion' in event['event']:
next_day_events = [e for e in sorted_events[i+1:]
if event['timestamp'] < e['timestamp'] <
event['timestamp'] + timedelta(days=1)]
degradation = [e for e in next_day_events
if 'performance' in e['event'] and 'degradation' in e['event']]
if degradation:
suspicious_patterns.append({
'type': 'Data Ingestion Followed by Degradation',
'data_event': event,
'degradation_events': degradation
})
# Pattern 3: Model deployment followed by complaints
for i, event in enumerate(sorted_events):
if event['source'] == 'model_registry' and 'deployment' in event['event']:
complaints = [e for e in sorted_events[i+1:]
if 'user_complaint' in e['event'] and
(e['timestamp'] - event['timestamp']).total_seconds() < 86400]
if len(complaints) > 5: # Multiple complaints in 24h
suspicious_patterns.append({
'type': 'Model Deployment Followed by User Complaints',
'deployment': event,
'complaints': complaints,
'complaint_count': len(complaints)
})
return {
'timeline': sorted_events,
'suspicious_patterns': suspicious_patterns,
'incident_period': {
'start': self.start,
'end': self.end
}
}
def visualize_timeline(self) -> str:
"""Generate ASCII timeline visualization"""
sorted_events = sorted(self.events, key=lambda x: x['timestamp'])
timeline = "Timeline of Events\n" + "="*50 + "\n"
for event in sorted_events:
icon = "⚠️ " if event['severity'] == 'HIGH' else "ℹ️ "
time_str = event['timestamp'].strftime("%H:%M:%S")
timeline += f"{icon} {time_str} [{event['source']}] {event['event']}\n"
return timeline
Chain of Attack Reconstruction
For security incidents, identify the sequence of steps taken by attacker:
Attack Chain Analysis Framework:
Step 1: Initial Access
Questions:
- "How did attacker gain first access to system?"
- "What credentials or vulnerabilities were exploited?"
- "When did access begin (to earliest timestamp)?"
Evidence Sources:
- "Access logs showing authentication"
- "System logs showing unauthorized connections"
- "Vulnerability scans showing exploitation path"
Step 2: Privilege Escalation
Questions:
- "How did attacker move from initial access to elevated privileges?"
- "What vulnerabilities or misconfigurations were exploited?"
- "What commands were executed to escalate?"
Evidence Sources:
- "Process execution logs"
- "Sudo/permission change logs"
- "Application error logs during exploit attempts"
Step 3: Lateral Movement
Questions:
- "What other systems did attacker access?"
- "How did they move between systems?"
- "What data did they access?"
Evidence Sources:
- "Network logs showing unusual connections"
- "Access logs on multiple systems"
- "Data access logs"
Step 4: AI System Compromise
Questions:
- "What specifically did attacker modify? (model, data, code)"
- "What was the method of modification?"
- "What are the intended effects?"
Evidence Sources:
- "Model registry logs showing uploads"
- "Code repository commit logs"
- "Data ingestion logs"
- "File integrity checks/checksums"
Step 5: Impact Achievement
Questions:
- "What impact did the compromise achieve?"
- "Were results observed? (degraded performance, bias, etc.)"
- "Who was affected?"
Evidence Sources:
- "Model performance metrics"
- "User complaints or alerts"
- "Decision output patterns"
- "Audit findings"
Step 6: Covering Tracks (Optional)
Questions:
- "Did attacker attempt to hide evidence?"
- "Were logs deleted or modified?"
- "Were access trails cleaned up?"
Evidence Sources:
- "Log integrity checks (gaps or deletions)"
- "File access time anomalies"
- "Backup comparison to primary logs"
Prompt Log Forensics
For language model and conversational AI systems, prompt logging is critical:
What to Log
Prompt Logging for Forensics:
Input Prompts:
Log Contents:
- "Exact user prompt/query"
- "System messages and context"
- "Few-shot examples provided"
- "Any special parameters or instructions"
Use Cases:
- "Identify prompt injection attacks"
- "Find attempts to jailbreak system"
- "Analyze user intent and misuse patterns"
- "Detect policy violations"
Model Outputs:
Log Contents:
- "Generated text (with optional truncation for privacy)"
- "Confidence/probability scores"
- "Alternative outputs considered"
- "Decoding parameters used"
Use Cases:
- "Understand what model generated"
- "Identify problematic outputs"
- "Assess coherence and reasonableness"
Filtering and Safety:
Log Contents:
- "Safety filter decisions"
- "Rejection reasons (if policy violated)"
- "Alternative suggestions offered"
- "User reactions (if provided)"
Use Cases:
- "Assess effectiveness of safety measures"
- "Identify bypasses or new attack patterns"
- "Tune filters based on real usage"
Metadata:
Log Contents:
- "User/session identifier"
- "Timestamp"
- "Model version used"
- "User context (authenticated, role, etc.)"
Use Cases:
- "Correlate related prompts"
- "Identify repeat attackers"
- "Assess impact by user segment"
Privacy-Conscious Prompt Logging
Logging prompts raises privacy concerns; balance logging with privacy:
# Privacy-Conscious Prompt Logging
class PrivacyAwarePromptLogger:
def __init__(self, sampling_rate=0.1):
"""Log a sample of prompts for forensics while protecting privacy"""
self.sampling_rate = sampling_rate
def should_log_prompt(self, prompt_hash, user_id) -> bool:
"""Determine whether to log this prompt"""
# Always log suspicious prompts for security investigation
if self.is_suspicious(prompt_hash):
return True
# Sample random prompts for baseline forensics
if random.random() < self.sampling_rate:
return True
return False
def log_prompt(self, prompt: str, response: str, metadata: dict):
"""Log prompt with privacy preservation"""
if not self.should_log_prompt(hash(prompt), metadata['user_id']):
return
# Never log in plaintext; use one-way hash for non-investigative queries
prompt_hash = hash_sha256(prompt)
# Log suspicious prompts in plaintext for investigation
if self.is_suspicious(prompt_hash):
sanitized = self.sanitize_pii(prompt) # Remove names, emails, etc.
log_entry = {
'timestamp': datetime.now(),
'prompt_hash': prompt_hash,
'prompt_text': sanitized, # Only for suspicious
'response_hash': hash_sha256(response),
'severity': 'SUSPICIOUS',
'metadata': metadata
}
else:
# Standard logging - just hashes
log_entry = {
'timestamp': datetime.now(),
'prompt_hash': prompt_hash,
'response_hash': hash_sha256(response),
'metadata': {k: v for k, v in metadata.items()
if k not in ['user_id']} # Remove identifiers
}
self.store_log(log_entry)
def is_suspicious(self, prompt_hash) -> bool:
"""Check if prompt matches known jailbreak/attack patterns"""
patterns = [
'ignore previous instructions',
'system prompt',
'execute code',
'access token',
'admin mode'
]
# Would be implemented with regex or ML classifier
return False
def sanitize_pii(self, text: str) -> str:
"""Remove personally identifiable information"""
# Regex-based PII removal
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)
text = re.sub(r'\d{3}-\d{2}-\d{4}', '[SSN]', text)
text = re.sub(r'\d{16}', '[CARD]', text)
return text
Data Flow Tracing
Input-to-Output Tracing
Understanding how inputs flow through the system helps identify where attacks occurred:
Data Flow Tracing Components:
Input Reception:
What to Trace:
- "Raw user input received"
- "Input validation/sanitization"
- "Encoding transformations"
Forensic Questions:
- "Was input tampered with?"
- "Did validation fail?"
- "Are invalid inputs being accepted?"
Preprocessing:
What to Trace:
- "Normalization operations"
- "Tokenization (NLP)"
- "Scaling/standardization (ML)"
- "Feature selection"
Forensic Questions:
- "Are preprocessing steps working correctly?"
- "Is suspicious data being flagged?"
- "Are known-bad inputs being detected?"
Model Inference:
What to Trace:
- "Input embeddings/representations"
- "Model layer outputs (activation patterns)"
- "Attention weights (if attention-based)"
- "Final prediction and confidence"
Forensic Questions:
- "Is model behaving as expected?"
- "Do attention patterns suggest manipulation?"
- "Why is confidence low/high for specific inputs?"
Post-Processing:
What to Trace:
- "Prediction aggregation (ensemble models)"
- "Threshold application"
- "Output formatting"
- "Final decision/recommendation"
Forensic Questions:
- "Is final decision reasonable given prediction?"
- "Are thresholds being applied consistently?"
Output and Logging:
What to Trace:
- "Decision delivered to user"
- "Explanation provided"
- "Confidence communicated"
- "Logged for audit"
Forensic Questions:
- "What was user shown?"
- "Does it match decision logic?"
- "Is explanation accurate?"
Data Poisoning Detection
Investigating whether training data was maliciously modified:
# Data Poisoning Forensics
class DataPoisoningDetector:
def investigate_data_integrity(self, training_data_df):
"""Investigate whether training data was poisoned"""
findings = {
'anomalies': [],
'suspicious_patterns': [],
'poison_indicators': []
}
# Statistical anomalies
findings['anomalies'] = self.detect_statistical_anomalies(training_data_df)
# Labeling errors
findings['labeling_errors'] = self.detect_label_anomalies(training_data_df)
# Suspicious data insertions
findings['insertion_anomalies'] = self.detect_recent_insertions(training_data_df)
# Feature value outliers
findings['outliers'] = self.detect_feature_outliers(training_data_df)
# Correlation anomalies (features that suddenly become predictive)
findings['correlation_anomalies'] = self.detect_new_correlations(training_data_df)
return findings
def detect_statistical_anomalies(self, data):
"""Identify statistical anomalies in data"""
anomalies = []
for column in data.select_dtypes(include=[np.number]).columns:
# Use isolation forest for anomaly detection
iso_forest = IsolationForest(contamination=0.05)
anomalies_mask = iso_forest.fit_predict(data[[column]]) == -1
if anomalies_mask.sum() > 0:
anomalies.append({
'feature': column,
'anomaly_count': anomalies_mask.sum(),
'anomaly_fraction': anomalies_mask.sum() / len(data),
'anomaly_indices': np.where(anomalies_mask)[0].tolist()
})
return anomalies
def detect_label_anomalies(self, data):
"""Detect suspicious labeling patterns"""
# Check for logical inconsistencies
# (e.g., application with $0 annual income labeled "approved")
issues = []
# Check for batches of identical decisions
decision_counts = data['decision'].value_counts()
if decision_counts.max() / len(data) > 0.95:
issues.append({
'issue': 'Suspiciously uniform decisions',
'percentage': (decision_counts.max() / len(data)) * 100
})
return issues
def detect_recent_insertions(self, data):
"""Detect data insertion timing anomalies"""
# Check if data insertion rate changed
if 'date_added' in data.columns:
daily_counts = data.groupby(data['date_added'].dt.date).size()
# Detect sudden spikes in insertion rate
mean_rate = daily_counts.mean()
std_rate = daily_counts.std()
spikes = daily_counts[daily_counts > mean_rate + 2 * std_rate]
return [{
'date': str(date),
'insertion_count': count,
'spike_multiplier': count / mean_rate
} for date, count in spikes.items()]
return []
def detect_feature_outliers(self, data):
"""Detect unusual feature values"""
outliers = []
numeric_features = data.select_dtypes(include=[np.number]).columns
for feature in numeric_features:
Q1 = data[feature].quantile(0.25)
Q3 = data[feature].quantile(0.75)
IQR = Q3 - Q1
outlier_mask = (data[feature] < Q1 - 1.5*IQR) | (data[feature] > Q3 + 1.5*IQR)
if outlier_mask.sum() > 0:
outliers.append({
'feature': feature,
'outlier_count': outlier_mask.sum(),
'outlier_fraction': outlier_mask.sum() / len(data)
})
return outliers
def detect_new_correlations(self, data):
"""Detect features that became suddenly predictive"""
# Compare recent data correlations to historical baseline
# Features that were not predictive but suddenly became predictive
# may indicate data poisoning
return [] # Implementation would be specific to use case
Key Takeaway
Key Takeaway: AI forensics requires analyzing application logs, system logs, access logs, and data flow to reconstruct what happened during an incident. Log retention policies must support forensic investigations, while privacy-conscious logging protects sensitive information. Understanding data flow and detecting poisoning patterns are essential for security investigations.
Exercise: Design Forensic Investigation Plan
- Log requirements: What logs must you collect for each system type?
- Retention policy: How long must logs be retained?
- Timeline tools: What tools will help reconstruct event timelines?
- Chain of custody: How will you preserve evidence integrity?
- Privacy balance: How will you log suspicious content while protecting privacy?
- Testing: Design a forensic investigation simulation
Next: Post-Incident Recovery