Authentication, Authorization, and Data Privacy
Authentication, Authorization, and Data Privacy
Compliance requirements like GDPR and regulatory frameworks demand strict data privacy controls, access management, and audit trails. LLM applications handling sensitive data must implement PII protection, secure storage, and transparent data practices.
PII Detection and Handling
Identify and protect personally identifiable information throughout the system.
from typing import List, Dict
import re
class PIIDetector:
"""Detect personally identifiable information."""
PII_PATTERNS = {
"ssn": (r"\b\d{3}-\d{2}-\d{4}\b", "Social Security Number"),
"credit_card": (r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "Credit Card"),
"email": (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "Email Address"),
"phone": (r"\b(?:\+1[-.]?)?\(?[0-9]{3}\)?[-.]?[0-9]{3}[-.]?[0-9]{4}\b", "Phone Number"),
"passport": (r"\b[A-Z]{2}\d{7}\b", "Passport Number"),
"drivers_license": (r"\b[A-Z]{1,2}\d{4,8}[A-Z]{0,2}\b", "Driver's License")
}
def detect_pii(self, text: str) -> Dict[str, List[str]]:
"""Find all PII in text."""
detected = {}
for pii_type, (pattern, label) in self.PII_PATTERNS.items():
matches = re.findall(pattern, text)
if matches:
detected[pii_type] = {
"label": label,
"count": len(matches),
"examples": matches[:3]
}
return detected
def mask_pii(self, text: str) -> str:
"""Replace PII with masked values."""
masked = text
for pii_type, (pattern, _) in self.PII_PATTERNS.items():
replacement = f"[{pii_type.upper()}]"
masked = re.sub(pattern, replacement, masked)
return masked
class DataRetention:
"""Manage data retention and deletion per regulations."""
def __init__(self):
self.retention_policies: Dict[str, int] = {
"user_requests": 90, # days
"model_outputs": 30,
"logs": 365,
"sensitive_data": 7
}
def should_delete(self, data_type: str, created_date) -> bool:
"""Check if data should be deleted."""
from datetime import datetime, timedelta
retention_days = self.retention_policies.get(data_type, 30)
expiry_date = created_date + timedelta(days=retention_days)
return datetime.now() > expiry_date
def schedule_deletion(self, data_id: str, data_type: str, created_date):
"""Schedule data for deletion."""
retention_days = self.retention_policies.get(data_type, 30)
# In production: use scheduled deletion queue
# delete_at = created_date + timedelta(days=retention_days)
pass
GDPR and Data Subject Rights
Implement systems to support GDPR requirements including data access, portability, and deletion.
from typing import Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DataAccessRequest:
"""Request for access to personal data."""
request_id: str
user_id: str
request_type: str # access, portability, deletion, correction
status: str # pending, in_progress, completed
created_at: datetime
requested_data: Optional[dict] = None
class GDPRCompliance:
"""Implement GDPR compliance mechanisms."""
def __init__(self):
self.requests: Dict[str, DataAccessRequest] = {}
self.data_store: Dict[str, dict] = {} # user_id -> data
def create_access_request(
self,
user_id: str,
request_type: str
) -> str:
"""Create data access request."""
import uuid
request_id = str(uuid.uuid4())
request = DataAccessRequest(
request_id=request_id,
user_id=user_id,
request_type=request_type,
status="pending",
created_at=datetime.now()
)
self.requests[request_id] = request
return request_id
def handle_data_access(self, user_id: str) -> dict:
"""Fulfill right to access."""
if user_id in self.data_store:
return self.data_store[user_id]
return {}
def handle_data_portability(self, user_id: str) -> str:
"""Fulfill right to data portability."""
import json
data = self.data_store.get(user_id, {})
return json.dumps(data, indent=2)
def handle_data_deletion(self, user_id: str) -> bool:
"""Fulfill right to deletion (Right to be Forgotten)."""
if user_id in self.data_store:
del self.data_store[user_id]
return True
return False
def handle_data_correction(
self,
user_id: str,
corrections: dict
) -> bool:
"""Fulfill right to data correction."""
if user_id in self.data_store:
self.data_store[user_id].update(corrections)
return True
return False
Role-Based Access Control (RBAC)
Implement RBAC to restrict actions based on user roles.
from enum import Enum
from typing import Set
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class Role:
"""User role with associated permissions."""
def __init__(self, name: str, permissions: Set[Permission]):
self.name = name
self.permissions = permissions
class RBAC:
"""Role-Based Access Control system."""
def __init__(self):
self.roles = {
"viewer": Role("viewer", {Permission.READ}),
"editor": Role("editor", {Permission.READ, Permission.WRITE}),
"admin": Role("admin", {
Permission.READ,
Permission.WRITE,
Permission.DELETE,
Permission.ADMIN
})
}
self.user_roles: Dict[str, List[str]] = {}
def assign_role(self, user_id: str, role_name: str):
"""Assign role to user."""
if role_name in self.roles:
if user_id not in self.user_roles:
self.user_roles[user_id] = []
self.user_roles[user_id].append(role_name)
def has_permission(
self,
user_id: str,
permission: Permission
) -> bool:
"""Check if user has permission."""
user_roles = self.user_roles.get(user_id, [])
for role_name in user_roles:
if role_name in self.roles:
if permission in self.roles[role_name].permissions:
return True
return False
def can_access_resource(
self,
user_id: str,
resource_id: str,
action: Permission
) -> bool:
"""Check resource-level access."""
# In production: check resource-specific ACLs
return self.has_permission(user_id, action)
Audit Trails and Compliance Logging
Maintain audit trails for regulatory compliance and security investigation.
from dataclasses import dataclass
from datetime import datetime
@dataclass
class AuditLog:
"""Record of system action for compliance."""
log_id: str
user_id: str
action: str
resource: str
status: str # success, failure
timestamp: datetime
ip_address: str
details: dict
class ComplianceLogger:
"""Log actions for compliance and auditing."""
def __init__(self):
self.logs: List[AuditLog] = []
def log_action(
self,
user_id: str,
action: str,
resource: str,
status: str = "success",
details: dict = None
):
"""Log compliance-relevant action."""
import uuid
log = AuditLog(
log_id=str(uuid.uuid4()),
user_id=user_id,
action=action,
resource=resource,
status=status,
timestamp=datetime.now(),
ip_address="0.0.0.0", # Extract from request in production
details=details or {}
)
self.logs.append(log)
def export_audit_log(
self,
start_date: datetime,
end_date: datetime,
user_id: Optional[str] = None
) -> List[AuditLog]:
"""Export audit logs for compliance."""
filtered = [
log for log in self.logs
if start_date <= log.timestamp <= end_date
]
if user_id:
filtered = [log for log in filtered if log.user_id == user_id]
return filtered
def log_data_processing(
self,
user_id: str,
data_type: str,
action: str,
pii_found: bool = False
):
"""Log data processing for GDPR compliance."""
self.log_action(
user_id=user_id,
action=f"data_processing_{action}",
resource=data_type,
details={"pii_detected": pii_found}
)
Data Encryption
Encrypt sensitive data at rest and in transit.
from cryptography.fernet import Fernet
import os
class EncryptionManager:
"""Manage data encryption."""
def __init__(self):
# In production: load from secure key management service
self.encryption_key = Fernet.generate_key()
self.cipher = Fernet(self.encryption_key)
def encrypt_data(self, plaintext: str) -> str:
"""Encrypt sensitive data."""
encrypted = self.cipher.encrypt(plaintext.encode())
return encrypted.decode()
def decrypt_data(self, ciphertext: str) -> str:
"""Decrypt sensitive data."""
decrypted = self.cipher.decrypt(ciphertext.encode())
return decrypted.decode()
def encrypt_at_rest(self, data: dict, sensitive_fields: List[str]) -> dict:
"""Encrypt sensitive fields before storage."""
encrypted = data.copy()
for field in sensitive_fields:
if field in encrypted and encrypted[field]:
encrypted[field] = self.encrypt_data(str(encrypted[field]))
return encrypted
def decrypt_before_use(
self,
data: dict,
sensitive_fields: List[str]
) -> dict:
"""Decrypt sensitive fields from storage."""
decrypted = data.copy()
for field in sensitive_fields:
if field in decrypted and decrypted[field]:
try:
decrypted[field] = self.decrypt_data(decrypted[field])
except:
pass # Log error in production
return decrypted
Key Takeaway
GDPR compliance and data privacy require PII detection, data retention policies, RBAC, comprehensive audit trails, and encryption. Implement these systematically across all data handling.
Exercises
- Build PII detector covering 6+ data types
- Implement GDPR access, deletion, and portability handlers
- Create RBAC system with role inheritance
- Build audit logger for compliance reporting
- Encrypt/decrypt sensitive fields
- Track data retention and schedule deletion
- Generate GDPR compliance reports