Building Automated Prompt Pipelines
Building Automated Prompt Pipelines
In the Foundations and Intermediate phases, you learned how to craft individual prompts and refine them iteratively. But in production environments, you rarely run a single prompt in isolation. Instead, you orchestrate multiple prompts, processing steps, and error handlers into pipelines that run automatically at scale.
This lesson teaches you how to architect, build, and deploy prompt pipelines that handle real-world complexity: data preprocessing, error recovery, logging, and observability.
From Manual Prompting to Pipelines
When you’re experimenting with prompts, you might work like this:
- Write a prompt in ChatGPT
- Copy the output
- Paste it into another tool
- Manually format the result
- Store it somewhere
This manual workflow doesn’t scale. A pipeline automates all these steps:
Raw Input Data → Validation → Preprocessing → LLM Call → Postprocessing → Output Storage
↓
Error Handling & Retry Logic
Real-world pipeline examples:
- Content Generation: Raw article outline → generate sections → combine → format → publish
- Data Classification: Customer feedback → normalize text → classify sentiment → route to department
- Code Analysis: Raw source code → extract functions → analyze each → generate report
- Support Automation: Ticket text → extract intent → generate response → check for accuracy → send
Pipeline Architecture Patterns
A robust pipeline has five distinct phases:
1. Input Validation and Normalization
Before you send data to an LLM, validate and normalize it:
def validate_and_normalize_input(raw_input: str, max_length: int = 5000) -> str:
"""
Validate input and normalize whitespace.
Raises:
ValueError: If input is empty or exceeds max_length
"""
if not raw_input or not raw_input.strip():
raise ValueError("Input cannot be empty")
# Remove extra whitespace
normalized = " ".join(raw_input.split())
if len(normalized) > max_length:
raise ValueError(f"Input exceeds {max_length} characters")
return normalized
2. Preprocessing
Transform data into a form optimized for the LLM:
import re
from datetime import datetime
def preprocess_customer_feedback(feedback: str) -> dict:
"""
Clean and enrich customer feedback before LLM processing.
"""
# Remove PII (phone numbers, emails)
cleaned = re.sub(r'\b[\w\.-]+@[\w\.-]+\.\w+\b', '[EMAIL]', feedback)
cleaned = re.sub(r'\+?1?\d{9,15}', '[PHONE]', cleaned)
# Extract metadata
return {
"original": feedback,
"cleaned": cleaned,
"length": len(cleaned),
"timestamp": datetime.now().isoformat(),
"has_urgency": any(word in cleaned.lower() for word in ["urgent", "asap", "critical"])
}
3. LLM Invocation
Call the language model with proper error handling:
import openai
import time
from typing import Optional
def call_llm_with_retry(
prompt: str,
model: str = "gpt-4",
max_retries: int = 3,
backoff_base: float = 2.0
) -> Optional[str]:
"""
Call OpenAI API with exponential backoff retry logic.
Args:
prompt: The prompt to send
model: Model to use
max_retries: Maximum number of retry attempts
backoff_base: Base for exponential backoff
Returns:
Generated text or None if all retries fail
"""
for attempt in range(max_retries):
try:
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1000
)
return response.choices[0].message.content
except openai.error.RateLimitError:
if attempt < max_retries - 1:
wait_time = backoff_base ** attempt
print(f"Rate limited. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
else:
print("Max retries exceeded for rate limit")
return None
except openai.error.APIError as e:
print(f"API error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
time.sleep(backoff_base ** attempt)
else:
return None
4. Postprocessing and Validation
Ensure the output is correct before returning it:
import json
def postprocess_classification_output(llm_output: str) -> dict:
"""
Extract and validate classification from LLM output.
Handles common formatting issues.
"""
# Try to extract JSON from response
try:
# Look for JSON block in markdown
if "```json" in llm_output:
json_str = llm_output.split("```json")[1].split("```")[0].strip()
elif "{" in llm_output:
json_str = llm_output[llm_output.find("{"):llm_output.rfind("}")+1]
else:
json_str = llm_output
data = json.loads(json_str)
# Validate required fields
if "category" not in data or "confidence" not in data:
raise ValueError("Missing required fields")
# Normalize confidence to 0-1
confidence = float(data["confidence"])
if not 0 <= confidence <= 1:
confidence = min(1, max(0, confidence / 100))
return {
"category": data["category"],
"confidence": confidence,
"raw_output": llm_output,
"valid": True
}
except (json.JSONDecodeError, ValueError) as e:
return {
"category": None,
"confidence": 0,
"raw_output": llm_output,
"valid": False,
"error": str(e)
}
5. Output Storage
Persist results reliably:
from typing import Any
import csv
from pathlib import Path
class PipelineOutput:
def __init__(self, output_dir: str):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def save_json(self, data: dict, filename: str):
"""Save output as JSON."""
import json
path = self.output_dir / filename
with open(path, 'w') as f:
json.dump(data, f, indent=2)
return str(path)
def append_csv(self, data: dict, filename: str):
"""Append output to CSV file."""
path = self.output_dir / filename
is_new_file = not path.exists()
with open(path, 'a', newline='') as f:
writer = csv.DictWriter(f, fieldnames=data.keys())
if is_new_file:
writer.writeheader()
writer.writerow(data)
return str(path)
Building an End-to-End Pipeline
Now let’s assemble these pieces into a complete pipeline class:
import logging
from dataclasses import dataclass
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
"""Configuration for the prompt pipeline."""
model: str = "gpt-4"
max_retries: int = 3
output_dir: str = "./pipeline_outputs"
log_level: str = "INFO"
class PromptPipeline:
"""
A complete prompt pipeline with validation, preprocessing,
LLM invocation, postprocessing, and output storage.
"""
def __init__(self, config: PipelineConfig):
self.config = config
self.output = PipelineOutput(config.output_dir)
logger.setLevel(config.log_level)
def process(self, raw_input: str) -> dict:
"""
Execute the complete pipeline.
Returns:
Dict with success status, output, and metadata
"""
try:
logger.info(f"Starting pipeline for input of length {len(raw_input)}")
# Phase 1: Validation
validated = validate_and_normalize_input(raw_input)
logger.info("Input validation passed")
# Phase 2: Preprocessing
preprocessed = preprocess_customer_feedback(validated)
logger.info(f"Preprocessing complete. Cleaned length: {preprocessed['length']}")
# Phase 3: LLM invocation
prompt = f"""Analyze the following customer feedback and classify it:
{preprocessed['cleaned']}
Respond with JSON:
{{"category": "positive|negative|neutral", "confidence": 0.0-1.0, "reasoning": "brief explanation"}}
"""
llm_output = call_llm_with_retry(
prompt,
model=self.config.model,
max_retries=self.config.max_retries
)
if llm_output is None:
raise RuntimeError("LLM invocation failed after all retries")
logger.info("LLM invocation successful")
# Phase 4: Postprocessing
result = postprocess_classification_output(llm_output)
logger.info(f"Postprocessing complete. Valid: {result['valid']}")
# Phase 5: Output storage
output_record = {
"timestamp": datetime.now().isoformat(),
"input_length": len(validated),
"has_urgency": preprocessed["has_urgency"],
**result
}
self.output.append_csv(output_record, "results.csv")
logger.info("Output saved")
return {
"success": True,
"data": result,
"metadata": output_record
}
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}", exc_info=True)
return {
"success": False,
"error": str(e),
"metadata": {"timestamp": datetime.now().isoformat()}
}
# Usage example
if __name__ == "__main__":
config = PipelineConfig(output_dir="./outputs")
pipeline = PromptPipeline(config)
test_input = "I absolutely love your product! Best purchase I've made all year."
result = pipeline.process(test_input)
print(result)
Error Handling Patterns
Pipelines fail. Smart error handling means your system doesn’t:
class PipelineError(Exception):
"""Base exception for pipeline errors."""
pass
class InputValidationError(PipelineError):
"""Input doesn't meet requirements."""
pass
class LLMError(PipelineError):
"""Error calling the LLM."""
pass
class PostprocessingError(PipelineError):
"""Error processing LLM output."""
pass
def process_with_fallback(pipeline: PromptPipeline, inputs: list) -> list:
"""
Process multiple inputs with fallback behavior.
"""
results = []
for i, input_data in enumerate(inputs):
try:
result = pipeline.process(input_data)
results.append(result)
except InputValidationError as e:
logger.warning(f"Item {i} failed validation: {e}")
results.append({"success": False, "error_type": "validation", "error": str(e)})
except LLMError as e:
logger.error(f"Item {i} failed LLM call: {e}")
# Use fallback response
results.append({"success": False, "error_type": "llm", "fallback": True})
except PostprocessingError as e:
logger.error(f"Item {i} failed postprocessing: {e}")
results.append({"success": False, "error_type": "postprocessing"})
return results
Logging and Observability
Monitor what’s happening inside your pipeline:
import json
from datetime import datetime
class PipelineLogger:
"""Log pipeline execution for debugging and monitoring."""
def __init__(self, log_file: str = "pipeline.log"):
self.log_file = log_file
def log_step(self, step_name: str, data: dict, status: str = "success"):
"""
Log a pipeline step execution.
"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"step": step_name,
"status": status,
"data": data
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + "\n")
def get_stats(self) -> dict:
"""Aggregate statistics from logs."""
stats = {"total": 0, "success": 0, "failed": 0, "steps": {}}
with open(self.log_file, 'r') as f:
for line in f:
entry = json.loads(line)
step = entry["step"]
status = entry["status"]
stats["total"] += 1
if status == "success":
stats["success"] += 1
else:
stats["failed"] += 1
if step not in stats["steps"]:
stats["steps"][step] = {"count": 0, "errors": 0}
stats["steps"][step]["count"] += 1
if status == "error":
stats["steps"][step]["errors"] += 1
return stats
Key Takeaway: A production-grade prompt pipeline separates concerns into validation, preprocessing, invocation, postprocessing, and storage layers. Each layer handles specific responsibilities, making the system maintainable and debuggable.
Exercise: Build a Document Processing Pipeline
You have a folder of customer support documents that need to be:
- Validated for minimum length
- Anonymized to remove sensitive information
- Summarized by an LLM
- Classified by sentiment and topic
- Saved with metadata
Requirements:
- Use the pipeline architecture from this lesson
- Include retry logic for LLM calls
- Log all steps with PipelineLogger
- Handle errors gracefully with fallbacks
- Store results in CSV with metadata
Starter code:
class DocumentProcessingPipeline(PromptPipeline):
def process_document(self, filepath: str) -> dict:
"""Process a single document through the pipeline."""
# TODO: Read file
# TODO: Validate length
# TODO: Anonymize content
# TODO: Call LLM for summary
# TODO: Call LLM for classification
# TODO: Store results
pass
# Main execution
if __name__ == "__main__":
pipeline = DocumentProcessingPipeline(
PipelineConfig(output_dir="./documents_processed")
)
# Process a folder of documents
import glob
documents = glob.glob("./documents/*.txt")
results = process_with_fallback(pipeline, documents)
# Print statistics
logger = pipeline.output
print(f"Processed {len(results)} documents")
print(f"Success rate: {sum(1 for r in results if r['success']) / len(results):.1%}")
Extension challenges:
- Add a queue system for processing documents asynchronously
- Implement caching to avoid reprocessing identical documents
- Create a monitoring dashboard showing pipeline health
- Add human-in-the-loop approval for low-confidence results
By completing this exercise, you’ll understand how to build production-ready pipelines that handle the messy reality of real data and API calls. The patterns you learn here apply to any task involving repeated LLM calls.