Intermediate

Building an End-to-End RAG Pipeline

Lesson 2 of 4 Estimated Time 55 min

Building an End-to-End RAG Pipeline

You now understand embeddings and vector databases. Let’s build a complete RAG system: ingest documents, chunk them, embed, retrieve relevant passages, and generate answers using those passages as context.

The Complete RAG Flow

  1. Ingestion: Load documents from various sources
  2. Chunking: Split into manageable pieces
  3. Embedding: Convert chunks to vectors
  4. Storage: Store in vector database
  5. Retrieval: Find relevant chunks for a query
  6. Generation: Feed chunks as context to the model
  7. Response: Return answer to user

Implementation from Scratch

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_core.runnables import RunnablePassthrough

class RAGSystem:
    """Complete RAG system."""

    def __init__(self, vector_db_path: str = "./vectorstore"):
        self.embeddings = OpenAIEmbeddings()
        self.model = ChatOpenAI(model="gpt-3.5-turbo")
        self.vector_db_path = vector_db_path
        self.vectorstore = None
        self.retriever = None

    def ingest_documents(self, file_paths: list[str]):
        """Load and process documents."""
        # Load
        docs = []
        for file_path in file_paths:
            loader = TextLoader(file_path)
            docs.extend(loader.load())

        # Split
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        split_docs = splitter.split_documents(docs)

        # Embed and store
        self.vectorstore = Chroma.from_documents(
            documents=split_docs,
            embedding=self.embeddings,
            persist_directory=self.vector_db_path
        )

        # Create retriever
        self.retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": 3}
        )

        return len(split_docs)

    def build_rag_chain(self):
        """Build the RAG chain using LCEL."""
        # Prompt that uses retrieved context
        prompt = ChatPromptTemplate.from_template("""
        You are a helpful assistant. Answer the question using the provided context.
        If you cannot answer from the context, say so.

        Context:
        {context}

        Question: {question}

        Answer:""")

        # Chain that formats context and question
        def format_docs(docs):
            return "\n\n".join([d.page_content for d in docs])

        chain = (
            {
                "context": self.retriever | RunnablePassthrough.map(format_docs),
                "question": RunnablePassthrough()
            }
            | prompt
            | self.model
            | StrOutputParser()
        )

        return chain

    def query(self, question: str) -> str:
        """Ask a question and get an answer."""
        if not self.retriever:
            raise ValueError("No documents ingested. Call ingest_documents first.")

        chain = self.build_rag_chain()
        return chain.invoke(question)

# Usage
rag = RAGSystem()

# Ingest your documents
num_chunks = rag.ingest_documents(["document1.txt", "document2.txt"])
print(f"Ingested {num_chunks} chunks")

# Ask questions
answer = rag.query("What does the document say about X?")
print(f"Answer: {answer}")

Retrieval Customization

Fine-tune how documents are retrieved:

class AdvancedRetriever:
    """Customized retrieval strategies."""

    def __init__(self, vectorstore):
        self.vectorstore = vectorstore

    def get_retriever_with_score_threshold(self, threshold: float = 0.7):
        """Only return documents above a similarity threshold."""
        return self.vectorstore.as_retriever(
            search_kwargs={
                "k": 10,
                "score_threshold": threshold
            }
        )

    def get_retriever_with_metadata_filter(self, filter_dict: dict):
        """Filter by metadata."""
        return self.vectorstore.as_retriever(
            search_kwargs={
                "k": 3,
                "filter": filter_dict
            }
        )

    def get_max_marginal_relevance_retriever(self, k: int = 3):
        """Retrieve diverse documents (avoid redundancy)."""
        return self.vectorstore.as_retriever(
            search_type="mmr",
            search_kwargs={
                "k": k,
                "fetch_k": k * 2  # Fetch more, then select diverse subset
            }
        )

# Usage
retriever_diverse = AdvancedRetriever(vectorstore).get_max_marginal_relevance_retriever()
docs = retriever_diverse.get_relevant_documents("query")

Augmenting the Prompt with Retrieved Context

Different ways to incorporate retrieved documents:

from langchain_core.prompts import ChatPromptTemplate

# Strategy 1: Simple concatenation
simple_prompt = ChatPromptTemplate.from_template("""
Context:
{context}

Question: {question}

Answer:""")

# Strategy 2: Explicit document references
explicit_prompt = ChatPromptTemplate.from_template("""
Based on the following documents:

{context}

Answer this question: {question}

Cite which document(s) support your answer.""")

# Strategy 3: Multiple retrieval rounds
multi_round_prompt = ChatPromptTemplate.from_template("""
First search: {first_search_context}

Based on that, here's a follow-up search: {second_search_context}

Now answer: {question}""")

Generation with Retrieved Context

Build the generation step:

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel

class Answer(BaseModel):
    """Structured answer with citations."""
    answer: str
    source_documents: list[str]
    confidence: float

# Build generation chain
class GenerationPipeline:
    """Generate answers from retrieved context."""

    def __init__(self):
        self.model = ChatOpenAI(model="gpt-4-turbo")

    def generate_with_citations(self, context: list[str], question: str) -> dict:
        """Generate answer and track citations."""
        context_text = "\n\n".join(context)

        prompt = ChatPromptTemplate.from_template("""
        Based on this context:
        {context}

        Answer this question and cite your sources:
        {question}

        Format your answer as JSON with fields: answer, sources, confidence.""")

        messages = prompt.format_messages(context=context_text, question=question)
        response = self.model.invoke(messages)

        import json
        try:
            parsed = json.loads(response.content)
            return parsed
        except json.JSONDecodeError:
            return {
                "answer": response.content,
                "sources": context,
                "confidence": 0.5
            }

# Usage
gen = GenerationPipeline()
result = gen.generate_with_citations(context_docs, "What is X?")

Full Pipeline in One Chain

Combine everything with LCEL:

from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Initialize components
embeddings = OpenAIEmbeddings()
model = ChatOpenAI(model="gpt-3.5-turbo")

# Load vector store
vectorstore = Chroma(
    persist_directory="./vectorstore",
    embedding_function=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# Prompt
prompt = ChatPromptTemplate.from_template("""
You are a helpful assistant. Use the context to answer the question.

Context:
{context}

Question: {question}

Answer:""")

# Helper to format docs
def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

# Full chain
rag_chain = (
    {
        "context": retriever | (lambda docs: format_docs(docs)),
        "question": RunnablePassthrough()
    }
    | prompt
    | model
    | StrOutputParser()
)

# Use it
answer = rag_chain.invoke("What is machine learning?")
print(answer)

Handling Multi-Format Documents

Process different document types:

from pathlib import Path
from langchain_community.document_loaders import (
    TextLoader, PyPDFLoader, CSVLoader, WebBaseLoader
)

class MultiFormatIngester:
    """Ingest documents of various formats."""

    @staticmethod
    def ingest(directory: str) -> list:
        """Load all documents from directory."""
        docs = []
        directory = Path(directory)

        # Text files
        for file_path in directory.glob("*.txt"):
            loader = TextLoader(str(file_path))
            docs.extend(loader.load())

        # PDFs
        for file_path in directory.glob("*.pdf"):
            loader = PyPDFLoader(str(file_path))
            docs.extend(loader.load())

        # CSV files
        for file_path in directory.glob("*.csv"):
            loader = CSVLoader(str(file_path))
            docs.extend(loader.load())

        return docs

# Usage
docs = MultiFormatIngester.ingest("./documents")

Production RAG System

Putting it all together:

import logging
from datetime import datetime

class ProductionRAG:
    """Production-ready RAG system."""

    def __init__(self, name: str):
        self.name = name
        self.created_at = datetime.now()
        self.logger = logging.getLogger(name)
        self.query_count = 0
        self.error_count = 0

    def setup(self, documents_path: str, vector_db_path: str):
        """Initialize the system."""
        try:
            self.rag = RAGSystem(vector_db_path=vector_db_path)
            num_chunks = self.rag.ingest_documents([documents_path])
            self.logger.info(f"Ingested {num_chunks} chunks")
        except Exception as e:
            self.logger.error(f"Setup failed: {e}")
            raise

    def answer(self, question: str, return_sources: bool = False) -> dict:
        """Answer a question."""
        try:
            self.query_count += 1

            # Get answer
            answer = self.rag.query(question)

            # Optionally get source documents
            sources = None
            if return_sources:
                docs = self.rag.retriever.get_relevant_documents(question)
                sources = [d.page_content[:100] + "..." for d in docs]

            return {
                "success": True,
                "answer": answer,
                "sources": sources,
                "query_number": self.query_count
            }

        except Exception as e:
            self.error_count += 1
            self.logger.error(f"Query failed: {e}")
            return {
                "success": False,
                "error": str(e),
                "answer": None
            }

    def get_stats(self) -> dict:
        """Get system statistics."""
        return {
            "name": self.name,
            "created": self.created_at.isoformat(),
            "total_queries": self.query_count,
            "errors": self.error_count,
            "error_rate": self.error_count / max(1, self.query_count)
        }

# Usage
rag_system = ProductionRAG("my-rag-system")
rag_system.setup("documents.txt", "./vectorstore")

result = rag_system.answer("What is RAG?", return_sources=True)
print(f"Answer: {result['answer']}")
print(f"Stats: {rag_system.get_stats()}")

Key Takeaway

An end-to-end RAG pipeline flows: ingest → chunk → embed → store → retrieve → generate → answer. Use LCEL to compose retrieval and generation into a single chain. Customize retrieval with score thresholds and metadata filters. Augment prompts with retrieved context. Return sources with answers for transparency. Build production systems with error handling and monitoring.

Exercises

  1. Basic RAG: Build a simple RAG system that ingests a document and answers questions about it.

  2. Prompt variations: Compare different prompt strategies for incorporating context. Which works best?

  3. Multi-format: Build a system that ingests .txt, .pdf, and .csv files.

  4. With citations: Modify the generator to return sources alongside answers.

  5. Error handling: Add comprehensive error handling for missing documents, failed queries, etc.

  6. Production system: Build a RAG system with logging, metrics, and graceful error handling.