RAG Evaluation and Optimization
RAG Evaluation and Optimization
How do you know if your RAG system is working? This lesson covers evaluation frameworks, metrics for measuring quality, debugging retrieval failures, and optimizing chunk sizes—all critical for production systems.
RAG Quality Metrics
from typing import List
from langchain_openai import ChatOpenAI
class RAGEvaluator:
"""Evaluate RAG system quality."""
def __init__(self):
self.model = ChatOpenAI()
def faithfulness(self, retrieved_docs: List[str], answer: str) -> float:
"""Does answer reflect the retrieved documents?"""
context = "\n".join(retrieved_docs)
prompt = f"""Evaluate if this answer is faithful to the documents.
Documents:
{context}
Answer: {answer}
Rate 0-1 how much the answer is supported by the documents.
Return only a number."""
response = self.model.invoke(prompt)
try:
return float(response.content.strip())
except:
return 0.5
def relevance(self, query: str, retrieved_docs: List[str]) -> float:
"""Are retrieved documents relevant to the query?"""
prompt = f"""Rate 0-1 how relevant these documents are to the query.
Query: {query}
Documents:
{chr(10).join(retrieved_docs)}
Return only a number."""
response = self.model.invoke(prompt)
try:
return float(response.content.strip())
except:
return 0.5
# Usage
evaluator = RAGEvaluator()
faith = evaluator.faithfulness(["doc1", "doc2"], "answer")
relevance = evaluator.relevance("query", ["doc1", "doc2"])
RAGAS Framework
RAGAS provides automated evaluation:
# pip install ragas
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall
)
# Your evaluation data
eval_dataset = {
"question": ["What is X?", "How does Y work?"],
"answer": ["X is...", "Y works by..."],
"contexts": [
[["Context for X"]],
[["Context for Y"]]
],
"ground_truth": ["Ground truth for X", "Ground truth for Y"]
}
# Evaluate
results = evaluate(
eval_dataset,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall
]
)
print(results)
Debugging Retrieval Failures
class RetrieverDebugger:
"""Debug why retrieval fails."""
def __init__(self, retriever, vectorstore):
self.retriever = retriever
self.vectorstore = vectorstore
def debug_query(self, query: str):
"""Analyze why a query doesn't work."""
results = {
"query": query,
"retrieved_documents": [],
"analysis": {}
}
# Get retrieved docs
retrieved = self.retriever.get_relevant_documents(query)
results["retrieved_documents"] = [doc.page_content[:100] for doc in retrieved]
# Analyze retrieval
if not retrieved:
results["analysis"]["issue"] = "No documents retrieved"
results["analysis"]["possible_causes"] = [
"Query too specific",
"Query language mismatch",
"No relevant documents in database"
]
elif len(retrieved) == 1:
results["analysis"]["issue"] = "Only one document retrieved"
results["analysis"]["recommendation"] = "Verify document diversity in store"
return results
# Usage
debugger = RetrieverDebugger(retriever, vectorstore)
debug_result = debugger.debug_query("extremely specific query")
print(debug_result)
Chunk Size Optimization
from langchain_text_splitters import RecursiveCharacterTextSplitter
class ChunkOptimizer:
"""Find optimal chunk size."""
@staticmethod
def test_chunk_sizes(text: str, sizes: List[int] = [256, 512, 1024, 2048]):
"""Test different chunk sizes."""
results = {}
for size in sizes:
splitter = RecursiveCharacterTextSplitter(
chunk_size=size,
chunk_overlap=int(size * 0.1)
)
chunks = splitter.split_text(text)
results[size] = {
"num_chunks": len(chunks),
"avg_length": sum(len(c) for c in chunks) / len(chunks),
"max_length": max(len(c) for c in chunks),
"min_length": min(len(c) for c in chunks)
}
return results
# Usage
text = "Your document"
optimization = ChunkOptimizer.test_chunk_sizes(text)
for size, stats in optimization.items():
print(f"Size {size}: {stats['num_chunks']} chunks, avg {stats['avg_length']:.0f} chars")
Measuring Retrieval Metrics
class RetrievalMetrics:
"""Measure retrieval performance."""
@staticmethod
def calculate_metrics(
queries: List[str],
ground_truth_docs: List[List[str]],
retrieved_docs: List[List[str]]
):
"""Calculate precision, recall, F1."""
precisions = []
recalls = []
for i, query in enumerate(queries):
gt = set(ground_truth_docs[i])
ret = set(retrieved_docs[i])
if len(ret) == 0:
precision = 0
else:
precision = len(gt & ret) / len(ret)
if len(gt) == 0:
recall = 1.0
else:
recall = len(gt & ret) / len(gt)
precisions.append(precision)
recalls.append(recall)
avg_precision = sum(precisions) / len(precisions)
avg_recall = sum(recalls) / len(recalls)
f1 = 2 * (avg_precision * avg_recall) / (avg_precision + avg_recall) if (avg_precision + avg_recall) > 0 else 0
return {
"avg_precision": avg_precision,
"avg_recall": avg_recall,
"f1": f1
}
# Usage
queries = ["q1", "q2", "q3"]
ground_truth = [["doc1", "doc2"], ["doc3"], ["doc4", "doc5"]]
retrieved = [["doc1", "doc2", "wrong"], ["doc3", "wrong"], ["doc4"]]
metrics = RetrievalMetrics.calculate_metrics(queries, ground_truth, retrieved)
print(f"Precision: {metrics['avg_precision']:.2f}")
print(f"Recall: {metrics['avg_recall']:.2f}")
print(f"F1: {metrics['f1']:.2f}")
Optimization Pipeline
class RAGOptimizer:
"""Optimize RAG system for better quality."""
def __init__(self, rag_system):
self.rag = rag_system
self.evaluator = RAGEvaluator()
def optimize_chunk_size(self, test_queries: List[str]):
"""Find best chunk size for your data."""
best_size = 1024
best_score = 0
for size in [256, 512, 1024, 2048]:
# Re-ingest with new size
self.rag.chunk_size = size
self.rag.reingest()
# Evaluate on test queries
scores = []
for query in test_queries:
docs = self.rag.retriever.get_relevant_documents(query)
answer = self.rag.query(query)
score = self.evaluator.faithfulness([d.page_content for d in docs], answer)
scores.append(score)
avg_score = sum(scores) / len(scores)
if avg_score > best_score:
best_score = avg_score
best_size = size
return best_size, best_score
def optimize_k(self, test_queries: List[str]):
"""Find best number of retrieved documents."""
best_k = 3
best_score = 0
for k in [1, 3, 5, 10]:
self.rag.retriever_k = k
scores = []
for query in test_queries:
docs = self.rag.retriever.get_relevant_documents(query)
answer = self.rag.query(query)
score = self.evaluator.relevance(query, [d.page_content for d in docs])
scores.append(score)
avg_score = sum(scores) / len(scores)
if avg_score > best_score:
best_score = avg_score
best_k = k
return best_k, best_score
# Usage
optimizer = RAGOptimizer(rag_system)
best_chunk, score = optimizer.optimize_chunk_size(["q1", "q2", "q3"])
print(f"Optimal chunk size: {best_chunk} (score: {score:.2f})")
Key Takeaway
Evaluate RAG quality with metrics: faithfulness (answer matches documents), relevance (documents match query), and retrieval metrics (precision, recall). Use RAGAS for automated evaluation. Debug failures by analyzing what documents are retrieved. Optimize chunk size and retrieval count on your actual data. Build optimization pipelines to continuously improve performance.
Exercises
-
Faithfulness evaluation: Evaluate answers from your RAG system on faithfulness to retrieved documents.
-
Retrieval metrics: Calculate precision, recall, and F1 on a golden dataset.
-
Chunk optimization: Test different chunk sizes on your documents. Measure impact on retrieval quality.
-
Debug failing queries: Identify queries that fail and analyze why retrieval isn’t working.
-
K optimization: Test different values of k (number of retrieved documents). Find the optimal value.
-
Evaluation pipeline: Build an evaluation pipeline that measures RAG quality on test queries.