Advanced

Structured Data Extraction at Scale

Lesson 3 of 4 Estimated Time 50 min

Structured Data Extraction at Scale

One of the most valuable uses of LLMs in production is extracting structured data from unstructured text. Instead of manually parsing documents, you can prompt LLMs to extract specific fields reliably. This lesson teaches you to build extraction pipelines that work at scale while handling edge cases.

The Challenge of Reliable Extraction

Naive extraction fails:

# Bad: Unreliable, inconsistent format
prompt = f"Extract the data from: {text}"
# Output might be: "The company is Acme Inc." or "Company: Acme" or JSON or...

# Good: Specify exact format and handle edge cases
prompt = f"""Extract person information into JSON.

Text: {text}

Required fields: name (string), email (string), phone (string, nullable)
Optional fields: title (string), company (string)

Rules:
- If field is missing, use null
- If multiple values exist, choose most recent
- Clean phone numbers to +1-XXX-XXX-XXXX format
- Validate email format

Respond with ONLY valid JSON, no other text."""

JSON Mode and Structured Outputs

Modern LLMs support constrained output formats:

import json
import openai
from typing import Optional

class StructuredExtractor:
    """
    Extract structured data with guaranteed format.
    Uses JSON mode where available.
    """

    def __init__(self, model: str = "gpt-4", use_json_mode: bool = True):
        self.model = model
        self.use_json_mode = use_json_mode

    def extract_with_json_mode(self,
                              text: str,
                              schema: dict) -> dict:
        """
        Extract data with JSON mode (guarantees valid JSON response).
        """
        prompt = f"""Extract information from this text and return valid JSON.

Text:
{text}

Expected JSON structure:
{json.dumps(schema, indent=2)}

Rules for extraction:
1. Use only fields defined in the schema
2. Use null for missing values
3. Validate data types
4. Return ONLY the JSON, no other text"""

        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )

        try:
            return json.loads(response.choices[0].message.content)
        except json.JSONDecodeError:
            return {"error": "Invalid JSON response", "raw": response.choices[0].message.content}

    def extract_with_schema(self,
                           text: str,
                           schema: dict,
                           validation_rules: dict = None) -> dict:
        """
        Extract with explicit schema and validation.
        """
        schema_description = self._describe_schema(schema)

        prompt = f"""Extract structured data from the following text.

{schema_description}

Text to extract from:
{text}

Extraction rules:
1. Map text content to schema fields
2. Apply validation rules
3. Use null for missing required fields (don't invent data)
4. Return JSON only"""

        if validation_rules:
            rules_text = self._describe_validation_rules(validation_rules)
            prompt += f"\n\nValidation rules:\n{rules_text}"

        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.0  # Deterministic for extraction
        )

        try:
            return json.loads(response.choices[0].message.content)
        except json.JSONDecodeError:
            return {"error": "Failed to parse response"}

    @staticmethod
    def _describe_schema(schema: dict) -> str:
        """Convert schema to human-readable description."""
        description = "Expected output schema:\n"
        for field, field_type in schema.items():
            description += f"- {field}: {field_type} (required)\n"
        return description

    @staticmethod
    def _describe_validation_rules(rules: dict) -> str:
        """Convert validation rules to text."""
        rules_text = ""
        for field, rule in rules.items():
            rules_text += f"- {field}: {rule}\n"
        return rules_text

# Usage
schema = {
    "candidate_name": "string",
    "email": "string",
    "phone": "string",
    "job_title": "string",
    "company": "string",
    "years_experience": "integer"
}

validation_rules = {
    "email": "Must match pattern user@domain.com",
    "phone": "Format as +1-XXX-XXX-XXXX",
    "years_experience": "Must be 0-70"
}

text = """
John Smith, john.smith@techcorp.com, (555) 123-4567

Senior Software Engineer at TechCorp with 8 years of experience.
"""

extractor = StructuredExtractor()
result = extractor.extract_with_json_mode(text, schema)
print(json.dumps(result, indent=2))

Schema Design for Extraction

Good schema design makes extraction more reliable:

from typing import List, Optional
from dataclasses import dataclass

@dataclass
class ExtractionSchema:
    """Define extraction schema for a document type."""
    name: str
    fields: dict  # field_name -> field_type
    required_fields: List[str]
    optional_fields: List[str]
    nested_structures: dict = None  # field_name -> nested schema

    def validate(self) -> List[str]:
        """Check schema validity."""
        errors = []

        # Check required fields are defined
        for field in self.required_fields:
            if field not in self.fields:
                errors.append(f"Required field {field} not in fields")

        # Check optional fields are defined
        for field in self.optional_fields:
            if field not in self.fields:
                errors.append(f"Optional field {field} not in fields")

        # Check no overlap
        overlap = set(self.required_fields) & set(self.optional_fields)
        if overlap:
            errors.append(f"Fields in both required and optional: {overlap}")

        return errors

# Example: Job posting extraction schema
job_posting_schema = ExtractionSchema(
    name="job_posting",
    fields={
        "job_title": "string",
        "company": "string",
        "salary_min": "integer (nullable)",
        "salary_max": "integer (nullable)",
        "currency": "string (USD/EUR/GBP)",
        "job_type": "string (full-time/part-time/contract)",
        "required_skills": "array of strings",
        "experience_required": "integer (years)",
        "location": "string",
        "description": "string"
    },
    required_fields=["job_title", "company", "job_type", "description"],
    optional_fields=["salary_min", "salary_max", "currency", "required_skills", "experience_required", "location"],
    nested_structures={
        "benefits": {
            "health_insurance": "boolean",
            "retirement_plan": "boolean",
            "remote_eligible": "boolean"
        }
    }
)

# Validate before use
errors = job_posting_schema.validate()
if not errors:
    print("Schema is valid")

Handling Edge Cases in Extraction

Real data is messy. Handle common issues:

class RobustExtractionHandler:
    """Handle edge cases in extraction."""

    @staticmethod
    def handle_missing_fields(extracted: dict, required_fields: list) -> dict:
        """Ensure all required fields exist."""
        result = extracted.copy()

        for field in required_fields:
            if field not in result:
                result[field] = None

        return result

    @staticmethod
    def handle_ambiguous_values(text: str, field: str) -> Optional[str]:
        """
        When text contains multiple values for one field,
        choose the most likely one.
        """
        if field == "phone":
            # Extract all phone numbers, return first/most likely
            import re
            phones = re.findall(r'\+?1?\d{10}', text)
            return phones[0] if phones else None

        if field == "email":
            import re
            emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text)
            return emails[0] if emails else None

        return None

    @staticmethod
    def handle_nested_data(extracted: dict, nested_schema: dict) -> dict:
        """
        Validate nested structures exist and have correct types.
        """
        for field, schema in nested_schema.items():
            if field not in extracted or extracted[field] is None:
                # Create empty nested structure
                extracted[field] = {key: None for key in schema.keys()}
            elif not isinstance(extracted[field], dict):
                # Try to convert to dict if it's a string
                import json
                try:
                    extracted[field] = json.loads(extracted[field])
                except:
                    extracted[field] = {key: None for key in schema.keys()}

        return extracted

    @staticmethod
    def validate_and_fix_types(extracted: dict, schema: dict) -> dict:
        """Validate and fix data types."""
        import json

        for field, expected_type in schema.items():
            if field not in extracted:
                continue

            value = extracted[field]

            if value is None:
                continue

            # Handle integer fields
            if "integer" in expected_type:
                try:
                    extracted[field] = int(value)
                except (ValueError, TypeError):
                    extracted[field] = None

            # Handle array fields
            elif "array" in expected_type:
                if isinstance(value, list):
                    pass  # Already correct
                elif isinstance(value, str):
                    # Try to parse as JSON array
                    try:
                        extracted[field] = json.loads(value)
                    except:
                        extracted[field] = [value]  # Wrap in array
                else:
                    extracted[field] = [value]

            # Handle boolean fields
            elif "boolean" in expected_type:
                if isinstance(value, bool):
                    pass
                elif isinstance(value, str):
                    extracted[field] = value.lower() in ("true", "yes", "1")

        return extracted

    @staticmethod
    def handle_contradictory_values(extracted: dict,
                                    contradiction_rules: dict) -> dict:
        """
        Some fields contradict each other. Resolve conflicts.

        contradiction_rules: {
            ("field_a", "field_b"): "priority: field_a"
        }
        """
        for (field1, field2), resolution in contradiction_rules.items():
            if field1 in extracted and field2 in extracted:
                if extracted[field1] and extracted[field2]:
                    # Both have values - apply resolution rule
                    if "priority" in resolution:
                        priority_field = resolution.split(":")[-1].strip()
                        if priority_field == field1:
                            extracted[field2] = None
                        else:
                            extracted[field1] = None

        return extracted

Validation and Error Recovery

Verify extracted data and retry intelligently:

from typing import Tuple

class ExtractionValidator:
    """Validate extracted data and trigger re-extraction if needed."""

    def __init__(self, extractor: StructuredExtractor):
        self.extractor = extractor

    def validate(self, extracted: dict, rules: dict) -> Tuple[bool, list]:
        """
        Validate extracted data against rules.

        Returns:
            (is_valid, error_messages)
        """
        errors = []

        for field, rule in rules.items():
            if field not in extracted:
                continue

            value = extracted[field]

            # Required field is null
            if "required" in rule and value is None:
                errors.append(f"Required field {field} is null")

            # Type checking
            if "type" in rule:
                expected_type = rule["type"]
                if value is not None:
                    if expected_type == "email" and "@" not in str(value):
                        errors.append(f"{field} doesn't look like an email")
                    elif expected_type == "phone" and len(str(value)) < 10:
                        errors.append(f"{field} too short for phone number")
                    elif expected_type == "integer" and not isinstance(value, int):
                        errors.append(f"{field} is not an integer")

            # Custom validation
            if "validate" in rule:
                validator = rule["validate"]
                if not validator(value):
                    errors.append(f"{field} failed custom validation")

        return len(errors) == 0, errors

    def extract_with_retry(self,
                          text: str,
                          schema: dict,
                          validation_rules: dict,
                          max_retries: int = 2) -> dict:
        """
        Extract data with automatic retry on validation failure.
        """
        attempt = 0

        while attempt < max_retries:
            attempt += 1

            # Extract
            extracted = self.extractor.extract_with_schema(
                text, schema, validation_rules
            )

            # Validate
            is_valid, errors = self.validate(extracted, validation_rules)

            if is_valid:
                return {"success": True, "data": extracted, "attempts": attempt}

            # If invalid and retries left, ask LLM to fix
            if attempt < max_retries:
                print(f"Extraction invalid. Errors: {errors}")
                print(f"Retry {attempt}/{max_retries}...")

                # Build prompt asking to fix specific errors
                fix_prompt = f"""The extracted data had these issues:
{errors}

Original text: {text}

Please extract again, carefully addressing these specific issues."""

                # Re-extract with fix prompt
                extracted = self.extractor.extract_with_schema(
                    text, schema, validation_rules
                )
            else:
                return {
                    "success": False,
                    "data": extracted,
                    "errors": errors,
                    "attempts": attempt
                }

        return {"success": False, "errors": ["Max retries exceeded"]}

Human-in-the-Loop for Low Confidence

For critical extractions, involve humans:

from enum import Enum

class ConfidenceLevel(Enum):
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

class ConfidenceEstimator:
    """Estimate confidence in extraction."""

    @staticmethod
    def estimate(extracted: dict, validation_results: dict) -> ConfidenceLevel:
        """
        Estimate confidence based on:
        - How many fields were extracted
        - How many validation errors
        - Null count
        """
        null_count = sum(1 for v in extracted.values() if v is None)
        error_count = len(validation_results.get("errors", []))
        total_fields = len(extracted)

        if null_count == 0 and error_count == 0:
            return ConfidenceLevel.HIGH

        if null_count <= total_fields * 0.3 and error_count <= 1:
            return ConfidenceLevel.MEDIUM

        return ConfidenceLevel.LOW

class HumanInTheLoopManager:
    """
    Route low-confidence extractions to humans for review.
    """

    def __init__(self):
        self.review_queue = []

    def process_extraction(self,
                          text: str,
                          extracted: dict,
                          confidence: ConfidenceLevel) -> dict:
        """
        Process extraction result, potentially sending to human review.
        """
        if confidence == ConfidenceLevel.HIGH:
            return {"action": "auto_approve", "data": extracted}

        if confidence == ConfidenceLevel.MEDIUM:
            return {"action": "approve_with_flag", "data": extracted, "flag": "Please verify"}

        if confidence == ConfidenceLevel.LOW:
            # Send to human review
            review_item = {
                "text": text,
                "extracted": extracted,
                "confidence": confidence.value,
                "status": "pending_review"
            }
            self.review_queue.append(review_item)
            return {
                "action": "send_to_review",
                "review_id": len(self.review_queue) - 1
            }

Key Takeaway: Reliable extraction at scale requires schema definition, structured output formats, edge case handling, validation with retry logic, and human-in-the-loop for uncertain cases.

Exercise: Build an Extraction Pipeline for Job Postings

Create an end-to-end extraction system that parses job posting documents:

Requirements:

  1. Define extraction schema with required/optional fields
  2. Extract structured data with JSON mode
  3. Validate extracted data
  4. Handle missing/contradictory fields
  5. Retry on validation failure
  6. Route low-confidence extractions to human review
  7. Track extraction quality metrics

Starter code:

class JobPostingExtractor:
    """Extract structured data from job postings."""

    def __init__(self):
        self.extractor = StructuredExtractor()
        self.validator = ExtractionValidator(self.extractor)
        self.confidence_estimator = ConfidenceEstimator()
        self.hitl_manager = HumanInTheLoopManager()

    def extract_job(self, posting_text: str) -> dict:
        """
        Extract job details from posting text.

        Returns:
            Dict with extraction result and processing action
        """
        # TODO: Define schema
        # TODO: Extract with validation
        # TODO: Estimate confidence
        # TODO: Route appropriately (auto-approve/review)
        # TODO: Return structured result

        pass

# Test with job posting
posting = """
Software Engineer - Senior Level
TechCorp Inc.
$150,000 - $200,000/year
San Francisco, CA (Remote eligible)

We're hiring a Senior Software Engineer...
Required: 5+ years Python, AWS
Benefits: Health insurance, 401k, equity
"""

extractor = JobPostingExtractor()
result = extractor.extract_job(posting)

Extension challenges:

  • Implement multi-language extraction
  • Add field relationship validation (e.g., min_salary < max_salary)
  • Create extraction quality dashboard
  • Build feedback loop to improve extraction accuracy
  • Support custom field extraction (not just predefined schema)

By completing this exercise, you’ll understand how to extract reliable structured data from unstructured documents at scale.