Advanced

Building Automated Prompt Pipelines

Lesson 1 of 4 Estimated Time 55 min

Building Automated Prompt Pipelines

In the Foundations and Intermediate phases, you learned how to craft individual prompts and refine them iteratively. But in production environments, you rarely run a single prompt in isolation. Instead, you orchestrate multiple prompts, processing steps, and error handlers into pipelines that run automatically at scale.

This lesson teaches you how to architect, build, and deploy prompt pipelines that handle real-world complexity: data preprocessing, error recovery, logging, and observability.

From Manual Prompting to Pipelines

When you’re experimenting with prompts, you might work like this:

  1. Write a prompt in ChatGPT
  2. Copy the output
  3. Paste it into another tool
  4. Manually format the result
  5. Store it somewhere

This manual workflow doesn’t scale. A pipeline automates all these steps:

Raw Input Data → Validation → Preprocessing → LLM Call → Postprocessing → Output Storage

                                            Error Handling & Retry Logic

Real-world pipeline examples:

  • Content Generation: Raw article outline → generate sections → combine → format → publish
  • Data Classification: Customer feedback → normalize text → classify sentiment → route to department
  • Code Analysis: Raw source code → extract functions → analyze each → generate report
  • Support Automation: Ticket text → extract intent → generate response → check for accuracy → send

Pipeline Architecture Patterns

A robust pipeline has five distinct phases:

1. Input Validation and Normalization

Before you send data to an LLM, validate and normalize it:

def validate_and_normalize_input(raw_input: str, max_length: int = 5000) -> str:
    """
    Validate input and normalize whitespace.

    Raises:
        ValueError: If input is empty or exceeds max_length
    """
    if not raw_input or not raw_input.strip():
        raise ValueError("Input cannot be empty")

    # Remove extra whitespace
    normalized = " ".join(raw_input.split())

    if len(normalized) > max_length:
        raise ValueError(f"Input exceeds {max_length} characters")

    return normalized

2. Preprocessing

Transform data into a form optimized for the LLM:

import re
from datetime import datetime

def preprocess_customer_feedback(feedback: str) -> dict:
    """
    Clean and enrich customer feedback before LLM processing.
    """
    # Remove PII (phone numbers, emails)
    cleaned = re.sub(r'\b[\w\.-]+@[\w\.-]+\.\w+\b', '[EMAIL]', feedback)
    cleaned = re.sub(r'\+?1?\d{9,15}', '[PHONE]', cleaned)

    # Extract metadata
    return {
        "original": feedback,
        "cleaned": cleaned,
        "length": len(cleaned),
        "timestamp": datetime.now().isoformat(),
        "has_urgency": any(word in cleaned.lower() for word in ["urgent", "asap", "critical"])
    }

3. LLM Invocation

Call the language model with proper error handling:

import openai
import time
from typing import Optional

def call_llm_with_retry(
    prompt: str,
    model: str = "gpt-4",
    max_retries: int = 3,
    backoff_base: float = 2.0
) -> Optional[str]:
    """
    Call OpenAI API with exponential backoff retry logic.

    Args:
        prompt: The prompt to send
        model: Model to use
        max_retries: Maximum number of retry attempts
        backoff_base: Base for exponential backoff

    Returns:
        Generated text or None if all retries fail
    """
    for attempt in range(max_retries):
        try:
            response = openai.ChatCompletion.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                temperature=0.7,
                max_tokens=1000
            )
            return response.choices[0].message.content

        except openai.error.RateLimitError:
            if attempt < max_retries - 1:
                wait_time = backoff_base ** attempt
                print(f"Rate limited. Waiting {wait_time}s before retry...")
                time.sleep(wait_time)
            else:
                print("Max retries exceeded for rate limit")
                return None

        except openai.error.APIError as e:
            print(f"API error on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                time.sleep(backoff_base ** attempt)
            else:
                return None

4. Postprocessing and Validation

Ensure the output is correct before returning it:

import json

def postprocess_classification_output(llm_output: str) -> dict:
    """
    Extract and validate classification from LLM output.
    Handles common formatting issues.
    """
    # Try to extract JSON from response
    try:
        # Look for JSON block in markdown
        if "```json" in llm_output:
            json_str = llm_output.split("```json")[1].split("```")[0].strip()
        elif "{" in llm_output:
            json_str = llm_output[llm_output.find("{"):llm_output.rfind("}")+1]
        else:
            json_str = llm_output

        data = json.loads(json_str)

        # Validate required fields
        if "category" not in data or "confidence" not in data:
            raise ValueError("Missing required fields")

        # Normalize confidence to 0-1
        confidence = float(data["confidence"])
        if not 0 <= confidence <= 1:
            confidence = min(1, max(0, confidence / 100))

        return {
            "category": data["category"],
            "confidence": confidence,
            "raw_output": llm_output,
            "valid": True
        }

    except (json.JSONDecodeError, ValueError) as e:
        return {
            "category": None,
            "confidence": 0,
            "raw_output": llm_output,
            "valid": False,
            "error": str(e)
        }

5. Output Storage

Persist results reliably:

from typing import Any
import csv
from pathlib import Path

class PipelineOutput:
    def __init__(self, output_dir: str):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)

    def save_json(self, data: dict, filename: str):
        """Save output as JSON."""
        import json
        path = self.output_dir / filename
        with open(path, 'w') as f:
            json.dump(data, f, indent=2)
        return str(path)

    def append_csv(self, data: dict, filename: str):
        """Append output to CSV file."""
        path = self.output_dir / filename
        is_new_file = not path.exists()

        with open(path, 'a', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=data.keys())
            if is_new_file:
                writer.writeheader()
            writer.writerow(data)

        return str(path)

Building an End-to-End Pipeline

Now let’s assemble these pieces into a complete pipeline class:

import logging
from dataclasses import dataclass
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PipelineConfig:
    """Configuration for the prompt pipeline."""
    model: str = "gpt-4"
    max_retries: int = 3
    output_dir: str = "./pipeline_outputs"
    log_level: str = "INFO"

class PromptPipeline:
    """
    A complete prompt pipeline with validation, preprocessing,
    LLM invocation, postprocessing, and output storage.
    """

    def __init__(self, config: PipelineConfig):
        self.config = config
        self.output = PipelineOutput(config.output_dir)
        logger.setLevel(config.log_level)

    def process(self, raw_input: str) -> dict:
        """
        Execute the complete pipeline.

        Returns:
            Dict with success status, output, and metadata
        """
        try:
            logger.info(f"Starting pipeline for input of length {len(raw_input)}")

            # Phase 1: Validation
            validated = validate_and_normalize_input(raw_input)
            logger.info("Input validation passed")

            # Phase 2: Preprocessing
            preprocessed = preprocess_customer_feedback(validated)
            logger.info(f"Preprocessing complete. Cleaned length: {preprocessed['length']}")

            # Phase 3: LLM invocation
            prompt = f"""Analyze the following customer feedback and classify it:

{preprocessed['cleaned']}

Respond with JSON:
{{"category": "positive|negative|neutral", "confidence": 0.0-1.0, "reasoning": "brief explanation"}}
"""

            llm_output = call_llm_with_retry(
                prompt,
                model=self.config.model,
                max_retries=self.config.max_retries
            )

            if llm_output is None:
                raise RuntimeError("LLM invocation failed after all retries")

            logger.info("LLM invocation successful")

            # Phase 4: Postprocessing
            result = postprocess_classification_output(llm_output)
            logger.info(f"Postprocessing complete. Valid: {result['valid']}")

            # Phase 5: Output storage
            output_record = {
                "timestamp": datetime.now().isoformat(),
                "input_length": len(validated),
                "has_urgency": preprocessed["has_urgency"],
                **result
            }

            self.output.append_csv(output_record, "results.csv")
            logger.info("Output saved")

            return {
                "success": True,
                "data": result,
                "metadata": output_record
            }

        except Exception as e:
            logger.error(f"Pipeline failed: {str(e)}", exc_info=True)
            return {
                "success": False,
                "error": str(e),
                "metadata": {"timestamp": datetime.now().isoformat()}
            }

# Usage example
if __name__ == "__main__":
    config = PipelineConfig(output_dir="./outputs")
    pipeline = PromptPipeline(config)

    test_input = "I absolutely love your product! Best purchase I've made all year."
    result = pipeline.process(test_input)

    print(result)

Error Handling Patterns

Pipelines fail. Smart error handling means your system doesn’t:

class PipelineError(Exception):
    """Base exception for pipeline errors."""
    pass

class InputValidationError(PipelineError):
    """Input doesn't meet requirements."""
    pass

class LLMError(PipelineError):
    """Error calling the LLM."""
    pass

class PostprocessingError(PipelineError):
    """Error processing LLM output."""
    pass

def process_with_fallback(pipeline: PromptPipeline, inputs: list) -> list:
    """
    Process multiple inputs with fallback behavior.
    """
    results = []

    for i, input_data in enumerate(inputs):
        try:
            result = pipeline.process(input_data)
            results.append(result)

        except InputValidationError as e:
            logger.warning(f"Item {i} failed validation: {e}")
            results.append({"success": False, "error_type": "validation", "error": str(e)})

        except LLMError as e:
            logger.error(f"Item {i} failed LLM call: {e}")
            # Use fallback response
            results.append({"success": False, "error_type": "llm", "fallback": True})

        except PostprocessingError as e:
            logger.error(f"Item {i} failed postprocessing: {e}")
            results.append({"success": False, "error_type": "postprocessing"})

    return results

Logging and Observability

Monitor what’s happening inside your pipeline:

import json
from datetime import datetime

class PipelineLogger:
    """Log pipeline execution for debugging and monitoring."""

    def __init__(self, log_file: str = "pipeline.log"):
        self.log_file = log_file

    def log_step(self, step_name: str, data: dict, status: str = "success"):
        """
        Log a pipeline step execution.
        """
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "step": step_name,
            "status": status,
            "data": data
        }

        with open(self.log_file, 'a') as f:
            f.write(json.dumps(log_entry) + "\n")

    def get_stats(self) -> dict:
        """Aggregate statistics from logs."""
        stats = {"total": 0, "success": 0, "failed": 0, "steps": {}}

        with open(self.log_file, 'r') as f:
            for line in f:
                entry = json.loads(line)
                step = entry["step"]
                status = entry["status"]

                stats["total"] += 1
                if status == "success":
                    stats["success"] += 1
                else:
                    stats["failed"] += 1

                if step not in stats["steps"]:
                    stats["steps"][step] = {"count": 0, "errors": 0}
                stats["steps"][step]["count"] += 1
                if status == "error":
                    stats["steps"][step]["errors"] += 1

        return stats

Key Takeaway: A production-grade prompt pipeline separates concerns into validation, preprocessing, invocation, postprocessing, and storage layers. Each layer handles specific responsibilities, making the system maintainable and debuggable.

Exercise: Build a Document Processing Pipeline

You have a folder of customer support documents that need to be:

  1. Validated for minimum length
  2. Anonymized to remove sensitive information
  3. Summarized by an LLM
  4. Classified by sentiment and topic
  5. Saved with metadata

Requirements:

  • Use the pipeline architecture from this lesson
  • Include retry logic for LLM calls
  • Log all steps with PipelineLogger
  • Handle errors gracefully with fallbacks
  • Store results in CSV with metadata

Starter code:

class DocumentProcessingPipeline(PromptPipeline):
    def process_document(self, filepath: str) -> dict:
        """Process a single document through the pipeline."""
        # TODO: Read file
        # TODO: Validate length
        # TODO: Anonymize content
        # TODO: Call LLM for summary
        # TODO: Call LLM for classification
        # TODO: Store results
        pass

# Main execution
if __name__ == "__main__":
    pipeline = DocumentProcessingPipeline(
        PipelineConfig(output_dir="./documents_processed")
    )

    # Process a folder of documents
    import glob
    documents = glob.glob("./documents/*.txt")
    results = process_with_fallback(pipeline, documents)

    # Print statistics
    logger = pipeline.output
    print(f"Processed {len(results)} documents")
    print(f"Success rate: {sum(1 for r in results if r['success']) / len(results):.1%}")

Extension challenges:

  • Add a queue system for processing documents asynchronously
  • Implement caching to avoid reprocessing identical documents
  • Create a monitoring dashboard showing pipeline health
  • Add human-in-the-loop approval for low-confidence results

By completing this exercise, you’ll understand how to build production-ready pipelines that handle the messy reality of real data and API calls. The patterns you learn here apply to any task involving repeated LLM calls.