Advanced

Authentication, Authorization, and Data Privacy

Lesson 2 of 4 Estimated Time 50 min

Authentication, Authorization, and Data Privacy

Compliance requirements like GDPR and regulatory frameworks demand strict data privacy controls, access management, and audit trails. LLM applications handling sensitive data must implement PII protection, secure storage, and transparent data practices.

PII Detection and Handling

Identify and protect personally identifiable information throughout the system.

from typing import List, Dict
import re

class PIIDetector:
    """Detect personally identifiable information."""

    PII_PATTERNS = {
        "ssn": (r"\b\d{3}-\d{2}-\d{4}\b", "Social Security Number"),
        "credit_card": (r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "Credit Card"),
        "email": (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "Email Address"),
        "phone": (r"\b(?:\+1[-.]?)?\(?[0-9]{3}\)?[-.]?[0-9]{3}[-.]?[0-9]{4}\b", "Phone Number"),
        "passport": (r"\b[A-Z]{2}\d{7}\b", "Passport Number"),
        "drivers_license": (r"\b[A-Z]{1,2}\d{4,8}[A-Z]{0,2}\b", "Driver's License")
    }

    def detect_pii(self, text: str) -> Dict[str, List[str]]:
        """Find all PII in text."""
        detected = {}

        for pii_type, (pattern, label) in self.PII_PATTERNS.items():
            matches = re.findall(pattern, text)
            if matches:
                detected[pii_type] = {
                    "label": label,
                    "count": len(matches),
                    "examples": matches[:3]
                }

        return detected

    def mask_pii(self, text: str) -> str:
        """Replace PII with masked values."""
        masked = text

        for pii_type, (pattern, _) in self.PII_PATTERNS.items():
            replacement = f"[{pii_type.upper()}]"
            masked = re.sub(pattern, replacement, masked)

        return masked

class DataRetention:
    """Manage data retention and deletion per regulations."""

    def __init__(self):
        self.retention_policies: Dict[str, int] = {
            "user_requests": 90,  # days
            "model_outputs": 30,
            "logs": 365,
            "sensitive_data": 7
        }

    def should_delete(self, data_type: str, created_date) -> bool:
        """Check if data should be deleted."""
        from datetime import datetime, timedelta

        retention_days = self.retention_policies.get(data_type, 30)
        expiry_date = created_date + timedelta(days=retention_days)

        return datetime.now() > expiry_date

    def schedule_deletion(self, data_id: str, data_type: str, created_date):
        """Schedule data for deletion."""
        retention_days = self.retention_policies.get(data_type, 30)
        # In production: use scheduled deletion queue
        # delete_at = created_date + timedelta(days=retention_days)
        pass

GDPR and Data Subject Rights

Implement systems to support GDPR requirements including data access, portability, and deletion.

from typing import Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class DataAccessRequest:
    """Request for access to personal data."""
    request_id: str
    user_id: str
    request_type: str  # access, portability, deletion, correction
    status: str  # pending, in_progress, completed
    created_at: datetime
    requested_data: Optional[dict] = None

class GDPRCompliance:
    """Implement GDPR compliance mechanisms."""

    def __init__(self):
        self.requests: Dict[str, DataAccessRequest] = {}
        self.data_store: Dict[str, dict] = {}  # user_id -> data

    def create_access_request(
        self,
        user_id: str,
        request_type: str
    ) -> str:
        """Create data access request."""
        import uuid
        request_id = str(uuid.uuid4())

        request = DataAccessRequest(
            request_id=request_id,
            user_id=user_id,
            request_type=request_type,
            status="pending",
            created_at=datetime.now()
        )

        self.requests[request_id] = request
        return request_id

    def handle_data_access(self, user_id: str) -> dict:
        """Fulfill right to access."""
        if user_id in self.data_store:
            return self.data_store[user_id]
        return {}

    def handle_data_portability(self, user_id: str) -> str:
        """Fulfill right to data portability."""
        import json
        data = self.data_store.get(user_id, {})
        return json.dumps(data, indent=2)

    def handle_data_deletion(self, user_id: str) -> bool:
        """Fulfill right to deletion (Right to be Forgotten)."""
        if user_id in self.data_store:
            del self.data_store[user_id]
            return True
        return False

    def handle_data_correction(
        self,
        user_id: str,
        corrections: dict
    ) -> bool:
        """Fulfill right to data correction."""
        if user_id in self.data_store:
            self.data_store[user_id].update(corrections)
            return True
        return False

Role-Based Access Control (RBAC)

Implement RBAC to restrict actions based on user roles.

from enum import Enum
from typing import Set

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

class Role:
    """User role with associated permissions."""

    def __init__(self, name: str, permissions: Set[Permission]):
        self.name = name
        self.permissions = permissions

class RBAC:
    """Role-Based Access Control system."""

    def __init__(self):
        self.roles = {
            "viewer": Role("viewer", {Permission.READ}),
            "editor": Role("editor", {Permission.READ, Permission.WRITE}),
            "admin": Role("admin", {
                Permission.READ,
                Permission.WRITE,
                Permission.DELETE,
                Permission.ADMIN
            })
        }
        self.user_roles: Dict[str, List[str]] = {}

    def assign_role(self, user_id: str, role_name: str):
        """Assign role to user."""
        if role_name in self.roles:
            if user_id not in self.user_roles:
                self.user_roles[user_id] = []
            self.user_roles[user_id].append(role_name)

    def has_permission(
        self,
        user_id: str,
        permission: Permission
    ) -> bool:
        """Check if user has permission."""
        user_roles = self.user_roles.get(user_id, [])

        for role_name in user_roles:
            if role_name in self.roles:
                if permission in self.roles[role_name].permissions:
                    return True

        return False

    def can_access_resource(
        self,
        user_id: str,
        resource_id: str,
        action: Permission
    ) -> bool:
        """Check resource-level access."""
        # In production: check resource-specific ACLs
        return self.has_permission(user_id, action)

Audit Trails and Compliance Logging

Maintain audit trails for regulatory compliance and security investigation.

from dataclasses import dataclass
from datetime import datetime

@dataclass
class AuditLog:
    """Record of system action for compliance."""
    log_id: str
    user_id: str
    action: str
    resource: str
    status: str  # success, failure
    timestamp: datetime
    ip_address: str
    details: dict

class ComplianceLogger:
    """Log actions for compliance and auditing."""

    def __init__(self):
        self.logs: List[AuditLog] = []

    def log_action(
        self,
        user_id: str,
        action: str,
        resource: str,
        status: str = "success",
        details: dict = None
    ):
        """Log compliance-relevant action."""
        import uuid
        log = AuditLog(
            log_id=str(uuid.uuid4()),
            user_id=user_id,
            action=action,
            resource=resource,
            status=status,
            timestamp=datetime.now(),
            ip_address="0.0.0.0",  # Extract from request in production
            details=details or {}
        )

        self.logs.append(log)

    def export_audit_log(
        self,
        start_date: datetime,
        end_date: datetime,
        user_id: Optional[str] = None
    ) -> List[AuditLog]:
        """Export audit logs for compliance."""
        filtered = [
            log for log in self.logs
            if start_date <= log.timestamp <= end_date
        ]

        if user_id:
            filtered = [log for log in filtered if log.user_id == user_id]

        return filtered

    def log_data_processing(
        self,
        user_id: str,
        data_type: str,
        action: str,
        pii_found: bool = False
    ):
        """Log data processing for GDPR compliance."""
        self.log_action(
            user_id=user_id,
            action=f"data_processing_{action}",
            resource=data_type,
            details={"pii_detected": pii_found}
        )

Data Encryption

Encrypt sensitive data at rest and in transit.

from cryptography.fernet import Fernet
import os

class EncryptionManager:
    """Manage data encryption."""

    def __init__(self):
        # In production: load from secure key management service
        self.encryption_key = Fernet.generate_key()
        self.cipher = Fernet(self.encryption_key)

    def encrypt_data(self, plaintext: str) -> str:
        """Encrypt sensitive data."""
        encrypted = self.cipher.encrypt(plaintext.encode())
        return encrypted.decode()

    def decrypt_data(self, ciphertext: str) -> str:
        """Decrypt sensitive data."""
        decrypted = self.cipher.decrypt(ciphertext.encode())
        return decrypted.decode()

    def encrypt_at_rest(self, data: dict, sensitive_fields: List[str]) -> dict:
        """Encrypt sensitive fields before storage."""
        encrypted = data.copy()

        for field in sensitive_fields:
            if field in encrypted and encrypted[field]:
                encrypted[field] = self.encrypt_data(str(encrypted[field]))

        return encrypted

    def decrypt_before_use(
        self,
        data: dict,
        sensitive_fields: List[str]
    ) -> dict:
        """Decrypt sensitive fields from storage."""
        decrypted = data.copy()

        for field in sensitive_fields:
            if field in decrypted and decrypted[field]:
                try:
                    decrypted[field] = self.decrypt_data(decrypted[field])
                except:
                    pass  # Log error in production

        return decrypted

Key Takeaway

GDPR compliance and data privacy require PII detection, data retention policies, RBAC, comprehensive audit trails, and encryption. Implement these systematically across all data handling.

Exercises

  1. Build PII detector covering 6+ data types
  2. Implement GDPR access, deletion, and portability handlers
  3. Create RBAC system with role inheritance
  4. Build audit logger for compliance reporting
  5. Encrypt/decrypt sensitive fields
  6. Track data retention and schedule deletion
  7. Generate GDPR compliance reports