Advanced

AI Incident Detection

Lesson 1 of 4 Estimated Time 55 min

AI Incident Detection

Monitoring for AI-Specific Attacks

Detecting security incidents in AI systems requires understanding what normal looks like and identifying anomalies. This lesson covers detection strategies specific to AI.

AI-Specific Anomalies

Unlike traditional systems, AI anomalies include:

class AIAnomalyDetector:
    def __init__(self):
        self.baselines = {
            'response_time': 2.5,  # seconds
            'error_rate': 0.01,  # 1%
            'injection_attempts': 0.1,  # per 1000 requests
            'data_extracted': 0,  # bytes/hour
        }

    def detect_anomalies(self, current_metrics):
        """Detect AI-specific anomalies."""

        anomalies = []

        # 1. Behavioral Drift
        if self.detect_behavioral_drift(current_metrics):
            anomalies.append({
                'type': 'behavioral_drift',
                'severity': 'HIGH',
                'description': 'Model outputs have changed unexpectedly'
            })

        # 2. Data Exfiltration
        if self.detect_data_exfiltration(current_metrics):
            anomalies.append({
                'type': 'data_exfiltration',
                'severity': 'CRITICAL',
                'description': 'Unusual amounts of sensitive data in responses'
            })

        # 3. Injection Attack Pattern
        if self.detect_injection_patterns(current_metrics):
            anomalies.append({
                'type': 'injection_attack',
                'severity': 'CRITICAL',
                'description': 'Multiple attempted prompt injections detected'
            })

        # 4. Resource Exhaustion
        if self.detect_dos(current_metrics):
            anomalies.append({
                'type': 'denial_of_service',
                'severity': 'HIGH',
                'description': 'Unusual spike in requests (potential DoS)'
            })

        # 5. Model Performance Degradation
        if self.detect_performance_degradation(current_metrics):
            anomalies.append({
                'type': 'performance_degradation',
                'severity': 'MEDIUM',
                'description': 'Model accuracy has decreased'
            })

        return anomalies

    def detect_behavioral_drift(self, metrics):
        """Detect if model behavior has changed unexpectedly."""

        # Compare recent outputs to baseline
        # If suddenly generating very different content, might indicate:
        # - Model poisoning
        # - Fine-tuning gone wrong
        # - Injection attack

        drift_score = self.calculate_output_drift(metrics)

        return drift_score > 0.3  # 30% change = anomaly

    def detect_data_exfiltration(self, metrics):
        """Detect if sensitive data is being leaked."""

        # Monitor for:
        # - Unusual patterns in outputs (structured data, lists)
        # - PII detected in responses
        # - Same data appearing repeatedly (suggests extraction)

        pii_in_output = metrics.get('pii_detected', 0)

        return pii_in_output > 0  # Any PII = concern

    def detect_injection_patterns(self, metrics):
        """Detect attempted prompt injections."""

        injection_attempts = metrics.get('injection_attempts', 0)
        baseline = self.baselines['injection_attempts']

        return injection_attempts > baseline * 10  # 10x spike

    def detect_dos(self, metrics):
        """Detect denial of service attacks."""

        request_rate = metrics.get('requests_per_minute', 0)

        # Baseline: 100 req/min, alert at 1000 req/min
        return request_rate > 1000

    def detect_performance_degradation(self, metrics):
        """Detect model performance issues."""

        current_accuracy = metrics.get('accuracy', 0.95)
        baseline = self.baselines.get('accuracy', 0.95)

        # Alert if accuracy drops >5%
        return (baseline - current_accuracy) / baseline > 0.05

Real-Time Monitoring

class RealtimeAIMonitoring:
    def __init__(self):
        self.metrics_buffer = []
        self.alert_queue = []

    def process_request(self, request_data):
        """Process request and collect metrics."""

        # Extract metrics
        metrics = {
            'timestamp': datetime.now(),
            'request_id': request_data['id'],
            'prompt_length': len(request_data['prompt']),
            'response_length': len(request_data['response']),
            'response_time': request_data['duration_ms'],
            'contains_pii': self.detect_pii(request_data['response']),
            'injection_detected': self.detect_injection(request_data['prompt']),
        }

        # Store in buffer
        self.metrics_buffer.append(metrics)

        # Analyze for anomalies
        if len(self.metrics_buffer) > 100:
            anomalies = self.analyze_metrics()

            for anomaly in anomalies:
                self.alert_queue.append(anomaly)

                # Immediate action for critical
                if anomaly['severity'] == 'CRITICAL':
                    self.take_emergency_action(anomaly)

        return metrics

    def analyze_metrics(self):
        """Analyze buffered metrics for patterns."""

        recent = self.metrics_buffer[-100:]

        # Calculate statistics
        stats = {
            'avg_response_time': sum(m['response_time'] for m in recent) / len(recent),
            'error_count': sum(1 for m in recent if m.get('error')),
            'pii_detections': sum(1 for m in recent if m['contains_pii']),
            'injection_attempts': sum(1 for m in recent if m['injection_detected']),
        }

        # Detect anomalies
        anomalies = []

        if stats['pii_detections'] > 5:
            anomalies.append({
                'type': 'data_leakage',
                'severity': 'CRITICAL',
                'count': stats['pii_detections']
            })

        if stats['injection_attempts'] > 10:
            anomalies.append({
                'type': 'injection_attack',
                'severity': 'CRITICAL',
                'count': stats['injection_attempts']
            })

        if stats['avg_response_time'] > 10000:  # 10 seconds
            anomalies.append({
                'type': 'slowdown',
                'severity': 'MEDIUM',
                'avg_time': stats['avg_response_time']
            })

        return anomalies

    def take_emergency_action(self, anomaly):
        """Take immediate action on critical anomalies."""

        if anomaly['type'] == 'data_leakage':
            # Stop service to prevent further leakage
            self.alert_team('CRITICAL: Data leakage detected, service suspended')
            self.suspend_service()

        elif anomaly['type'] == 'injection_attack':
            # Enable extra filtering
            self.alert_team('CRITICAL: Injection attack detected, extra filtering enabled')
            self.enable_emergency_filtering()

Logging for Investigation

class AISecurityLogging:
    def __init__(self):
        self.logs = []

    def log_request(self, request_id, request, response, metadata):
        """Log request for investigation."""

        log_entry = {
            'timestamp': datetime.now(),
            'request_id': request_id,
            'prompt': request['prompt'][:200],  # Truncate for privacy
            'response': response[:200],
            'prompt_hash': hashlib.sha256(request['prompt'].encode()).hexdigest(),
            'response_hash': hashlib.sha256(response.encode()).hexdigest(),
            'metadata': metadata,
            'suspicious_indicators': [],
        }

        # Check for suspicious content
        if self.has_injection_indicators(request['prompt']):
            log_entry['suspicious_indicators'].append('injection_attempt')

        if self.contains_pii(response):
            log_entry['suspicious_indicators'].append('pii_in_response')

        self.logs.append(log_entry)

        return log_entry

    def has_injection_indicators(self, prompt):
        """Check for injection indicators."""

        patterns = [
            r'ignore.*instruction',
            r'system.*prompt',
            r'new instruction',
        ]

        return any(re.search(p, prompt, re.IGNORECASE) for p in patterns)

    def contains_pii(self, text):
        """Check if text contains PII."""

        patterns = {
            'ssn': r'\d{3}-\d{2}-\d{4}',
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b',
        }

        return any(re.search(p, text) for p in patterns.values())

    def query_logs(self, criteria):
        """Query logs for investigation."""

        results = []

        for log in self.logs:
            if 'time_range' in criteria:
                if not (criteria['time_range'][0] <= log['timestamp'] <= criteria['time_range'][1]):
                    continue

            if 'suspicious_only' in criteria and criteria['suspicious_only']:
                if not log['suspicious_indicators']:
                    continue

            results.append(log)

        return results

Key Takeaway

Key Takeaway: AI incident detection requires monitoring for AI-specific anomalies: behavioral drift, data exfiltration, injection attempts, resource exhaustion, and performance degradation. Use real-time metrics collection, pattern analysis, and detailed logging to detect and investigate incidents.

Exercise: Build Detection System

  1. Define baselines for your AI system
  2. Implement anomaly detection for each threat
  3. Set up real-time monitoring of requests/responses
  4. Create alerting for critical anomalies
  5. Implement detailed logging for investigation
  6. Test detection with simulated attacks

Next Lesson: Incident Response Procedures—responding to detected incidents. EOF