Structured Data Extraction at Scale
Structured Data Extraction at Scale
One of the most valuable uses of LLMs in production is extracting structured data from unstructured text. Instead of manually parsing documents, you can prompt LLMs to extract specific fields reliably. This lesson teaches you to build extraction pipelines that work at scale while handling edge cases.
The Challenge of Reliable Extraction
Naive extraction fails:
# Bad: Unreliable, inconsistent format
prompt = f"Extract the data from: {text}"
# Output might be: "The company is Acme Inc." or "Company: Acme" or JSON or...
# Good: Specify exact format and handle edge cases
prompt = f"""Extract person information into JSON.
Text: {text}
Required fields: name (string), email (string), phone (string, nullable)
Optional fields: title (string), company (string)
Rules:
- If field is missing, use null
- If multiple values exist, choose most recent
- Clean phone numbers to +1-XXX-XXX-XXXX format
- Validate email format
Respond with ONLY valid JSON, no other text."""
JSON Mode and Structured Outputs
Modern LLMs support constrained output formats:
import json
import openai
from typing import Optional
class StructuredExtractor:
"""
Extract structured data with guaranteed format.
Uses JSON mode where available.
"""
def __init__(self, model: str = "gpt-4", use_json_mode: bool = True):
self.model = model
self.use_json_mode = use_json_mode
def extract_with_json_mode(self,
text: str,
schema: dict) -> dict:
"""
Extract data with JSON mode (guarantees valid JSON response).
"""
prompt = f"""Extract information from this text and return valid JSON.
Text:
{text}
Expected JSON structure:
{json.dumps(schema, indent=2)}
Rules for extraction:
1. Use only fields defined in the schema
2. Use null for missing values
3. Validate data types
4. Return ONLY the JSON, no other text"""
response = openai.ChatCompletion.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return {"error": "Invalid JSON response", "raw": response.choices[0].message.content}
def extract_with_schema(self,
text: str,
schema: dict,
validation_rules: dict = None) -> dict:
"""
Extract with explicit schema and validation.
"""
schema_description = self._describe_schema(schema)
prompt = f"""Extract structured data from the following text.
{schema_description}
Text to extract from:
{text}
Extraction rules:
1. Map text content to schema fields
2. Apply validation rules
3. Use null for missing required fields (don't invent data)
4. Return JSON only"""
if validation_rules:
rules_text = self._describe_validation_rules(validation_rules)
prompt += f"\n\nValidation rules:\n{rules_text}"
response = openai.ChatCompletion.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.0 # Deterministic for extraction
)
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return {"error": "Failed to parse response"}
@staticmethod
def _describe_schema(schema: dict) -> str:
"""Convert schema to human-readable description."""
description = "Expected output schema:\n"
for field, field_type in schema.items():
description += f"- {field}: {field_type} (required)\n"
return description
@staticmethod
def _describe_validation_rules(rules: dict) -> str:
"""Convert validation rules to text."""
rules_text = ""
for field, rule in rules.items():
rules_text += f"- {field}: {rule}\n"
return rules_text
# Usage
schema = {
"candidate_name": "string",
"email": "string",
"phone": "string",
"job_title": "string",
"company": "string",
"years_experience": "integer"
}
validation_rules = {
"email": "Must match pattern user@domain.com",
"phone": "Format as +1-XXX-XXX-XXXX",
"years_experience": "Must be 0-70"
}
text = """
John Smith, john.smith@techcorp.com, (555) 123-4567
Senior Software Engineer at TechCorp with 8 years of experience.
"""
extractor = StructuredExtractor()
result = extractor.extract_with_json_mode(text, schema)
print(json.dumps(result, indent=2))
Schema Design for Extraction
Good schema design makes extraction more reliable:
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class ExtractionSchema:
"""Define extraction schema for a document type."""
name: str
fields: dict # field_name -> field_type
required_fields: List[str]
optional_fields: List[str]
nested_structures: dict = None # field_name -> nested schema
def validate(self) -> List[str]:
"""Check schema validity."""
errors = []
# Check required fields are defined
for field in self.required_fields:
if field not in self.fields:
errors.append(f"Required field {field} not in fields")
# Check optional fields are defined
for field in self.optional_fields:
if field not in self.fields:
errors.append(f"Optional field {field} not in fields")
# Check no overlap
overlap = set(self.required_fields) & set(self.optional_fields)
if overlap:
errors.append(f"Fields in both required and optional: {overlap}")
return errors
# Example: Job posting extraction schema
job_posting_schema = ExtractionSchema(
name="job_posting",
fields={
"job_title": "string",
"company": "string",
"salary_min": "integer (nullable)",
"salary_max": "integer (nullable)",
"currency": "string (USD/EUR/GBP)",
"job_type": "string (full-time/part-time/contract)",
"required_skills": "array of strings",
"experience_required": "integer (years)",
"location": "string",
"description": "string"
},
required_fields=["job_title", "company", "job_type", "description"],
optional_fields=["salary_min", "salary_max", "currency", "required_skills", "experience_required", "location"],
nested_structures={
"benefits": {
"health_insurance": "boolean",
"retirement_plan": "boolean",
"remote_eligible": "boolean"
}
}
)
# Validate before use
errors = job_posting_schema.validate()
if not errors:
print("Schema is valid")
Handling Edge Cases in Extraction
Real data is messy. Handle common issues:
class RobustExtractionHandler:
"""Handle edge cases in extraction."""
@staticmethod
def handle_missing_fields(extracted: dict, required_fields: list) -> dict:
"""Ensure all required fields exist."""
result = extracted.copy()
for field in required_fields:
if field not in result:
result[field] = None
return result
@staticmethod
def handle_ambiguous_values(text: str, field: str) -> Optional[str]:
"""
When text contains multiple values for one field,
choose the most likely one.
"""
if field == "phone":
# Extract all phone numbers, return first/most likely
import re
phones = re.findall(r'\+?1?\d{10}', text)
return phones[0] if phones else None
if field == "email":
import re
emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text)
return emails[0] if emails else None
return None
@staticmethod
def handle_nested_data(extracted: dict, nested_schema: dict) -> dict:
"""
Validate nested structures exist and have correct types.
"""
for field, schema in nested_schema.items():
if field not in extracted or extracted[field] is None:
# Create empty nested structure
extracted[field] = {key: None for key in schema.keys()}
elif not isinstance(extracted[field], dict):
# Try to convert to dict if it's a string
import json
try:
extracted[field] = json.loads(extracted[field])
except:
extracted[field] = {key: None for key in schema.keys()}
return extracted
@staticmethod
def validate_and_fix_types(extracted: dict, schema: dict) -> dict:
"""Validate and fix data types."""
import json
for field, expected_type in schema.items():
if field not in extracted:
continue
value = extracted[field]
if value is None:
continue
# Handle integer fields
if "integer" in expected_type:
try:
extracted[field] = int(value)
except (ValueError, TypeError):
extracted[field] = None
# Handle array fields
elif "array" in expected_type:
if isinstance(value, list):
pass # Already correct
elif isinstance(value, str):
# Try to parse as JSON array
try:
extracted[field] = json.loads(value)
except:
extracted[field] = [value] # Wrap in array
else:
extracted[field] = [value]
# Handle boolean fields
elif "boolean" in expected_type:
if isinstance(value, bool):
pass
elif isinstance(value, str):
extracted[field] = value.lower() in ("true", "yes", "1")
return extracted
@staticmethod
def handle_contradictory_values(extracted: dict,
contradiction_rules: dict) -> dict:
"""
Some fields contradict each other. Resolve conflicts.
contradiction_rules: {
("field_a", "field_b"): "priority: field_a"
}
"""
for (field1, field2), resolution in contradiction_rules.items():
if field1 in extracted and field2 in extracted:
if extracted[field1] and extracted[field2]:
# Both have values - apply resolution rule
if "priority" in resolution:
priority_field = resolution.split(":")[-1].strip()
if priority_field == field1:
extracted[field2] = None
else:
extracted[field1] = None
return extracted
Validation and Error Recovery
Verify extracted data and retry intelligently:
from typing import Tuple
class ExtractionValidator:
"""Validate extracted data and trigger re-extraction if needed."""
def __init__(self, extractor: StructuredExtractor):
self.extractor = extractor
def validate(self, extracted: dict, rules: dict) -> Tuple[bool, list]:
"""
Validate extracted data against rules.
Returns:
(is_valid, error_messages)
"""
errors = []
for field, rule in rules.items():
if field not in extracted:
continue
value = extracted[field]
# Required field is null
if "required" in rule and value is None:
errors.append(f"Required field {field} is null")
# Type checking
if "type" in rule:
expected_type = rule["type"]
if value is not None:
if expected_type == "email" and "@" not in str(value):
errors.append(f"{field} doesn't look like an email")
elif expected_type == "phone" and len(str(value)) < 10:
errors.append(f"{field} too short for phone number")
elif expected_type == "integer" and not isinstance(value, int):
errors.append(f"{field} is not an integer")
# Custom validation
if "validate" in rule:
validator = rule["validate"]
if not validator(value):
errors.append(f"{field} failed custom validation")
return len(errors) == 0, errors
def extract_with_retry(self,
text: str,
schema: dict,
validation_rules: dict,
max_retries: int = 2) -> dict:
"""
Extract data with automatic retry on validation failure.
"""
attempt = 0
while attempt < max_retries:
attempt += 1
# Extract
extracted = self.extractor.extract_with_schema(
text, schema, validation_rules
)
# Validate
is_valid, errors = self.validate(extracted, validation_rules)
if is_valid:
return {"success": True, "data": extracted, "attempts": attempt}
# If invalid and retries left, ask LLM to fix
if attempt < max_retries:
print(f"Extraction invalid. Errors: {errors}")
print(f"Retry {attempt}/{max_retries}...")
# Build prompt asking to fix specific errors
fix_prompt = f"""The extracted data had these issues:
{errors}
Original text: {text}
Please extract again, carefully addressing these specific issues."""
# Re-extract with fix prompt
extracted = self.extractor.extract_with_schema(
text, schema, validation_rules
)
else:
return {
"success": False,
"data": extracted,
"errors": errors,
"attempts": attempt
}
return {"success": False, "errors": ["Max retries exceeded"]}
Human-in-the-Loop for Low Confidence
For critical extractions, involve humans:
from enum import Enum
class ConfidenceLevel(Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class ConfidenceEstimator:
"""Estimate confidence in extraction."""
@staticmethod
def estimate(extracted: dict, validation_results: dict) -> ConfidenceLevel:
"""
Estimate confidence based on:
- How many fields were extracted
- How many validation errors
- Null count
"""
null_count = sum(1 for v in extracted.values() if v is None)
error_count = len(validation_results.get("errors", []))
total_fields = len(extracted)
if null_count == 0 and error_count == 0:
return ConfidenceLevel.HIGH
if null_count <= total_fields * 0.3 and error_count <= 1:
return ConfidenceLevel.MEDIUM
return ConfidenceLevel.LOW
class HumanInTheLoopManager:
"""
Route low-confidence extractions to humans for review.
"""
def __init__(self):
self.review_queue = []
def process_extraction(self,
text: str,
extracted: dict,
confidence: ConfidenceLevel) -> dict:
"""
Process extraction result, potentially sending to human review.
"""
if confidence == ConfidenceLevel.HIGH:
return {"action": "auto_approve", "data": extracted}
if confidence == ConfidenceLevel.MEDIUM:
return {"action": "approve_with_flag", "data": extracted, "flag": "Please verify"}
if confidence == ConfidenceLevel.LOW:
# Send to human review
review_item = {
"text": text,
"extracted": extracted,
"confidence": confidence.value,
"status": "pending_review"
}
self.review_queue.append(review_item)
return {
"action": "send_to_review",
"review_id": len(self.review_queue) - 1
}
Key Takeaway: Reliable extraction at scale requires schema definition, structured output formats, edge case handling, validation with retry logic, and human-in-the-loop for uncertain cases.
Exercise: Build an Extraction Pipeline for Job Postings
Create an end-to-end extraction system that parses job posting documents:
Requirements:
- Define extraction schema with required/optional fields
- Extract structured data with JSON mode
- Validate extracted data
- Handle missing/contradictory fields
- Retry on validation failure
- Route low-confidence extractions to human review
- Track extraction quality metrics
Starter code:
class JobPostingExtractor:
"""Extract structured data from job postings."""
def __init__(self):
self.extractor = StructuredExtractor()
self.validator = ExtractionValidator(self.extractor)
self.confidence_estimator = ConfidenceEstimator()
self.hitl_manager = HumanInTheLoopManager()
def extract_job(self, posting_text: str) -> dict:
"""
Extract job details from posting text.
Returns:
Dict with extraction result and processing action
"""
# TODO: Define schema
# TODO: Extract with validation
# TODO: Estimate confidence
# TODO: Route appropriately (auto-approve/review)
# TODO: Return structured result
pass
# Test with job posting
posting = """
Software Engineer - Senior Level
TechCorp Inc.
$150,000 - $200,000/year
San Francisco, CA (Remote eligible)
We're hiring a Senior Software Engineer...
Required: 5+ years Python, AWS
Benefits: Health insurance, 401k, equity
"""
extractor = JobPostingExtractor()
result = extractor.extract_job(posting)
Extension challenges:
- Implement multi-language extraction
- Add field relationship validation (e.g., min_salary < max_salary)
- Create extraction quality dashboard
- Build feedback loop to improve extraction accuracy
- Support custom field extraction (not just predefined schema)
By completing this exercise, you’ll understand how to extract reliable structured data from unstructured documents at scale.