Building an End-to-End RAG Pipeline
Building an End-to-End RAG Pipeline
You now understand embeddings and vector databases. Let’s build a complete RAG system: ingest documents, chunk them, embed, retrieve relevant passages, and generate answers using those passages as context.
The Complete RAG Flow
- Ingestion: Load documents from various sources
- Chunking: Split into manageable pieces
- Embedding: Convert chunks to vectors
- Storage: Store in vector database
- Retrieval: Find relevant chunks for a query
- Generation: Feed chunks as context to the model
- Response: Return answer to user
Implementation from Scratch
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_core.runnables import RunnablePassthrough
class RAGSystem:
"""Complete RAG system."""
def __init__(self, vector_db_path: str = "./vectorstore"):
self.embeddings = OpenAIEmbeddings()
self.model = ChatOpenAI(model="gpt-3.5-turbo")
self.vector_db_path = vector_db_path
self.vectorstore = None
self.retriever = None
def ingest_documents(self, file_paths: list[str]):
"""Load and process documents."""
# Load
docs = []
for file_path in file_paths:
loader = TextLoader(file_path)
docs.extend(loader.load())
# Split
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
split_docs = splitter.split_documents(docs)
# Embed and store
self.vectorstore = Chroma.from_documents(
documents=split_docs,
embedding=self.embeddings,
persist_directory=self.vector_db_path
)
# Create retriever
self.retriever = self.vectorstore.as_retriever(
search_kwargs={"k": 3}
)
return len(split_docs)
def build_rag_chain(self):
"""Build the RAG chain using LCEL."""
# Prompt that uses retrieved context
prompt = ChatPromptTemplate.from_template("""
You are a helpful assistant. Answer the question using the provided context.
If you cannot answer from the context, say so.
Context:
{context}
Question: {question}
Answer:""")
# Chain that formats context and question
def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
chain = (
{
"context": self.retriever | RunnablePassthrough.map(format_docs),
"question": RunnablePassthrough()
}
| prompt
| self.model
| StrOutputParser()
)
return chain
def query(self, question: str) -> str:
"""Ask a question and get an answer."""
if not self.retriever:
raise ValueError("No documents ingested. Call ingest_documents first.")
chain = self.build_rag_chain()
return chain.invoke(question)
# Usage
rag = RAGSystem()
# Ingest your documents
num_chunks = rag.ingest_documents(["document1.txt", "document2.txt"])
print(f"Ingested {num_chunks} chunks")
# Ask questions
answer = rag.query("What does the document say about X?")
print(f"Answer: {answer}")
Retrieval Customization
Fine-tune how documents are retrieved:
class AdvancedRetriever:
"""Customized retrieval strategies."""
def __init__(self, vectorstore):
self.vectorstore = vectorstore
def get_retriever_with_score_threshold(self, threshold: float = 0.7):
"""Only return documents above a similarity threshold."""
return self.vectorstore.as_retriever(
search_kwargs={
"k": 10,
"score_threshold": threshold
}
)
def get_retriever_with_metadata_filter(self, filter_dict: dict):
"""Filter by metadata."""
return self.vectorstore.as_retriever(
search_kwargs={
"k": 3,
"filter": filter_dict
}
)
def get_max_marginal_relevance_retriever(self, k: int = 3):
"""Retrieve diverse documents (avoid redundancy)."""
return self.vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": k,
"fetch_k": k * 2 # Fetch more, then select diverse subset
}
)
# Usage
retriever_diverse = AdvancedRetriever(vectorstore).get_max_marginal_relevance_retriever()
docs = retriever_diverse.get_relevant_documents("query")
Augmenting the Prompt with Retrieved Context
Different ways to incorporate retrieved documents:
from langchain_core.prompts import ChatPromptTemplate
# Strategy 1: Simple concatenation
simple_prompt = ChatPromptTemplate.from_template("""
Context:
{context}
Question: {question}
Answer:""")
# Strategy 2: Explicit document references
explicit_prompt = ChatPromptTemplate.from_template("""
Based on the following documents:
{context}
Answer this question: {question}
Cite which document(s) support your answer.""")
# Strategy 3: Multiple retrieval rounds
multi_round_prompt = ChatPromptTemplate.from_template("""
First search: {first_search_context}
Based on that, here's a follow-up search: {second_search_context}
Now answer: {question}""")
Generation with Retrieved Context
Build the generation step:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel
class Answer(BaseModel):
"""Structured answer with citations."""
answer: str
source_documents: list[str]
confidence: float
# Build generation chain
class GenerationPipeline:
"""Generate answers from retrieved context."""
def __init__(self):
self.model = ChatOpenAI(model="gpt-4-turbo")
def generate_with_citations(self, context: list[str], question: str) -> dict:
"""Generate answer and track citations."""
context_text = "\n\n".join(context)
prompt = ChatPromptTemplate.from_template("""
Based on this context:
{context}
Answer this question and cite your sources:
{question}
Format your answer as JSON with fields: answer, sources, confidence.""")
messages = prompt.format_messages(context=context_text, question=question)
response = self.model.invoke(messages)
import json
try:
parsed = json.loads(response.content)
return parsed
except json.JSONDecodeError:
return {
"answer": response.content,
"sources": context,
"confidence": 0.5
}
# Usage
gen = GenerationPipeline()
result = gen.generate_with_citations(context_docs, "What is X?")
Full Pipeline in One Chain
Combine everything with LCEL:
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Initialize components
embeddings = OpenAIEmbeddings()
model = ChatOpenAI(model="gpt-3.5-turbo")
# Load vector store
vectorstore = Chroma(
persist_directory="./vectorstore",
embedding_function=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# Prompt
prompt = ChatPromptTemplate.from_template("""
You are a helpful assistant. Use the context to answer the question.
Context:
{context}
Question: {question}
Answer:""")
# Helper to format docs
def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
# Full chain
rag_chain = (
{
"context": retriever | (lambda docs: format_docs(docs)),
"question": RunnablePassthrough()
}
| prompt
| model
| StrOutputParser()
)
# Use it
answer = rag_chain.invoke("What is machine learning?")
print(answer)
Handling Multi-Format Documents
Process different document types:
from pathlib import Path
from langchain_community.document_loaders import (
TextLoader, PyPDFLoader, CSVLoader, WebBaseLoader
)
class MultiFormatIngester:
"""Ingest documents of various formats."""
@staticmethod
def ingest(directory: str) -> list:
"""Load all documents from directory."""
docs = []
directory = Path(directory)
# Text files
for file_path in directory.glob("*.txt"):
loader = TextLoader(str(file_path))
docs.extend(loader.load())
# PDFs
for file_path in directory.glob("*.pdf"):
loader = PyPDFLoader(str(file_path))
docs.extend(loader.load())
# CSV files
for file_path in directory.glob("*.csv"):
loader = CSVLoader(str(file_path))
docs.extend(loader.load())
return docs
# Usage
docs = MultiFormatIngester.ingest("./documents")
Production RAG System
Putting it all together:
import logging
from datetime import datetime
class ProductionRAG:
"""Production-ready RAG system."""
def __init__(self, name: str):
self.name = name
self.created_at = datetime.now()
self.logger = logging.getLogger(name)
self.query_count = 0
self.error_count = 0
def setup(self, documents_path: str, vector_db_path: str):
"""Initialize the system."""
try:
self.rag = RAGSystem(vector_db_path=vector_db_path)
num_chunks = self.rag.ingest_documents([documents_path])
self.logger.info(f"Ingested {num_chunks} chunks")
except Exception as e:
self.logger.error(f"Setup failed: {e}")
raise
def answer(self, question: str, return_sources: bool = False) -> dict:
"""Answer a question."""
try:
self.query_count += 1
# Get answer
answer = self.rag.query(question)
# Optionally get source documents
sources = None
if return_sources:
docs = self.rag.retriever.get_relevant_documents(question)
sources = [d.page_content[:100] + "..." for d in docs]
return {
"success": True,
"answer": answer,
"sources": sources,
"query_number": self.query_count
}
except Exception as e:
self.error_count += 1
self.logger.error(f"Query failed: {e}")
return {
"success": False,
"error": str(e),
"answer": None
}
def get_stats(self) -> dict:
"""Get system statistics."""
return {
"name": self.name,
"created": self.created_at.isoformat(),
"total_queries": self.query_count,
"errors": self.error_count,
"error_rate": self.error_count / max(1, self.query_count)
}
# Usage
rag_system = ProductionRAG("my-rag-system")
rag_system.setup("documents.txt", "./vectorstore")
result = rag_system.answer("What is RAG?", return_sources=True)
print(f"Answer: {result['answer']}")
print(f"Stats: {rag_system.get_stats()}")
Key Takeaway
An end-to-end RAG pipeline flows: ingest → chunk → embed → store → retrieve → generate → answer. Use LCEL to compose retrieval and generation into a single chain. Customize retrieval with score thresholds and metadata filters. Augment prompts with retrieved context. Return sources with answers for transparency. Build production systems with error handling and monitoring.
Exercises
-
Basic RAG: Build a simple RAG system that ingests a document and answers questions about it.
-
Prompt variations: Compare different prompt strategies for incorporating context. Which works best?
-
Multi-format: Build a system that ingests .txt, .pdf, and .csv files.
-
With citations: Modify the generator to return sources alongside answers.
-
Error handling: Add comprehensive error handling for missing documents, failed queries, etc.
-
Production system: Build a RAG system with logging, metrics, and graceful error handling.