Agentic RAG and Self-Correcting Systems
Agentic RAG and Self-Correcting Systems
Agentic RAG empowers the model to iteratively refine retrieval and reasoning. Rather than fixed retrieval pipelines, agentic systems allow the model to decide when to retrieve, what to retrieve, and how to refine answers through multiple reasoning steps.
Corrective RAG
Corrective RAG evaluates retrieval quality and reformulates queries when documents are unhelpful.
from typing import Optional
from enum import Enum
import anthropic
class RetrievalGrade(Enum):
RELEVANT = "relevant"
IRRELEVANT = "irrelevant"
AMBIGUOUS = "ambiguous"
class CorrectiveRAG:
"""RAG system that corrects poor retrievals."""
def __init__(self, retriever, grader_model: str = "claude-3-5-sonnet-20241022"):
self.retriever = retriever
self.grader_model = grader_model
self.client = anthropic.Anthropic()
def generate_with_correction(
self,
question: str,
max_iterations: int = 3
) -> str:
"""Generate answer with iterative retrieval correction."""
current_question = question
iteration = 0
while iteration < max_iterations:
# Retrieve documents
documents = self.retriever.retrieve(current_question, k=5)
# Grade retrieval quality
grade = self._grade_retrieval(current_question, documents)
if grade == RetrievalGrade.RELEVANT:
# Documents are good, generate answer
return self._generate_answer(current_question, documents)
elif grade == RetrievalGrade.IRRELEVANT:
# No relevant documents, reformulate query
current_question = self._reformulate_query(question)
iteration += 1
else: # AMBIGUOUS
# Partially relevant, ask for clarification
return self._generate_answer_with_caveat(
current_question, documents
)
return "Unable to find relevant information after multiple retrieval attempts."
def _grade_retrieval(self, question: str, documents: list) -> RetrievalGrade:
"""Evaluate relevance of retrieved documents."""
doc_text = "\n".join([d["content"] for d in documents])
response = self.client.messages.create(
model=self.grader_model,
max_tokens=50,
system="Grade whether the provided documents are relevant to the question. Respond with ONLY: relevant, irrelevant, or ambiguous.",
messages=[{
"role": "user",
"content": f"Question: {question}\n\nDocuments:\n{doc_text}"
}]
)
grade_text = response.content[0].text.lower().strip()
if "relevant" in grade_text:
return RetrievalGrade.RELEVANT
elif "irrelevant" in grade_text:
return RetrievalGrade.IRRELEVANT
else:
return RetrievalGrade.AMBIGUOUS
def _reformulate_query(self, original_question: str) -> str:
"""Create improved query."""
response = self.client.messages.create(
model=self.grader_model,
max_tokens=100,
system="Reformulate the user's question to improve search results. Return only the reformulated question.",
messages=[{
"role": "user",
"content": f"Original question: {original_question}"
}]
)
return response.content[0].text.strip()
def _generate_answer(self, question: str, documents: list) -> str:
"""Generate answer from relevant documents."""
doc_text = "\n".join([d["content"] for d in documents])
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system="Answer the question using the provided documents.",
messages=[{
"role": "user",
"content": f"Question: {question}\n\nDocuments:\n{doc_text}"
}]
)
return response.content[0].text
def _generate_answer_with_caveat(
self,
question: str,
documents: list
) -> str:
"""Generate answer noting limited information."""
doc_text = "\n".join([d["content"] for d in documents])
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system="Answer the question using the provided documents. Note any gaps in available information.",
messages=[{
"role": "user",
"content": f"Question: {question}\n\nDocuments:\n{doc_text}"
}]
)
return response.content[0].text
Self-RAG
Self-RAG enables the model to decide when retrieval is necessary and evaluate its own answers.
from typing import Tuple
class SelfRAG:
"""RAG where model controls retrieval decisions."""
def __init__(self, retriever, model: str = "claude-3-5-sonnet-20241022"):
self.retriever = retriever
self.model = model
self.client = anthropic.Anthropic()
def generate_with_self_reflection(self, question: str) -> str:
"""Generate answer with internal retrieval decisions."""
# Step 1: Decide if retrieval is needed
needs_retrieval = self._should_retrieve(question)
if needs_retrieval:
documents = self.retriever.retrieve(question, k=3)
answer = self._generate_with_documents(question, documents)
else:
answer = self._generate_from_knowledge(question)
# Step 2: Self-evaluate answer quality
is_sufficient = self._is_answer_sufficient(question, answer)
if not is_sufficient:
# Refine answer with additional retrieval
documents = self.retriever.retrieve(question, k=5)
answer = self._generate_with_documents(question, documents)
return answer
def _should_retrieve(self, question: str) -> bool:
"""Decide if retrieval is necessary."""
response = self.client.messages.create(
model=self.model,
max_tokens=50,
system="Decide if you need to retrieve information to answer this question. Respond with ONLY: yes or no.",
messages=[{
"role": "user",
"content": f"Question: {question}"
}]
)
return "yes" in response.content[0].text.lower()
def _generate_from_knowledge(self, question: str) -> str:
"""Generate answer from training knowledge."""
response = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=[{
"role": "user",
"content": question
}]
)
return response.content[0].text
def _generate_with_documents(self, question: str, documents: list) -> str:
"""Generate answer with retrieved context."""
doc_text = "\n".join([d["content"] for d in documents])
response = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Question: {question}\n\nContext:\n{doc_text}"
}]
)
return response.content[0].text
def _is_answer_sufficient(self, question: str, answer: str) -> bool:
"""Evaluate if answer adequately addresses question."""
response = self.client.messages.create(
model=self.model,
max_tokens=50,
system="Evaluate if the answer adequately addresses the question. Respond with ONLY: yes or no.",
messages=[{
"role": "user",
"content": f"Question: {question}\n\nAnswer: {answer}"
}]
)
return "yes" in response.content[0].text.lower()
Adaptive Retrieval
Adapt retrieval strategy based on question type and context.
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class QueryAnalysis:
question_type: str # factual, reasoning, comparative, etc.
complexity: str # simple, moderate, complex
required_modalities: List[str] # text, image, table, etc.
estimated_answer_length: str # short, medium, long
class AdaptiveRetriever:
"""Retrieve adaptively based on query characteristics."""
def __init__(self, retriever):
self.retriever = retriever
self.client = anthropic.Anthropic()
def retrieve_adaptive(self, question: str) -> List[dict]:
"""Retrieve documents with strategy adapted to question."""
# Analyze question
analysis = self._analyze_query(question)
# Determine retrieval strategy
strategy = self._select_strategy(analysis)
# Execute retrieval
documents = self._execute_retrieval(question, strategy)
return documents
def _analyze_query(self, question: str) -> QueryAnalysis:
"""Analyze question characteristics."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=200,
system="Analyze the question and return JSON with question_type, complexity, required_modalities, and estimated_answer_length",
messages=[{
"role": "user",
"content": f"Analyze: {question}"
}]
)
# Parse response (simplified)
text = response.content[0].text.lower()
return QueryAnalysis(
question_type="reasoning" if "why" in question else "factual",
complexity="complex" if len(question) > 100 else "simple",
required_modalities=["text"],
estimated_answer_length="medium"
)
def _select_strategy(self, analysis: QueryAnalysis) -> Dict:
"""Select retrieval strategy based on analysis."""
if analysis.question_type == "reasoning":
return {
"k": 10,
"include_neighbors": True,
"use_graph": True,
"rerank": True
}
elif analysis.question_type == "comparative":
return {
"k": 15,
"include_similar_docs": True,
"use_graph": False,
"rerank": True
}
else:
return {
"k": 5,
"include_neighbors": False,
"use_graph": False,
"rerank": False
}
def _execute_retrieval(
self,
question: str,
strategy: Dict
) -> List[dict]:
"""Execute retrieval with selected strategy."""
k = strategy.get("k", 5)
documents = self.retriever.retrieve(question, k=k)
if strategy.get("rerank"):
documents = self._rerank_documents(question, documents)
return documents
def _rerank_documents(
self,
question: str,
documents: List[dict]
) -> List[dict]:
"""Rerank documents by relevance."""
# In production: use cross-encoder for reranking
return sorted(
documents,
key=lambda d: d.get("score", 0),
reverse=True
)
Tool-Augmented Agents
Enable agents to use tools for retrieval and computation.
from enum import Enum
from typing import Callable, Dict, Optional, Tuple
class ToolType(Enum):
RETRIEVE = "retrieve"
CALCULATE = "calculate"
SEARCH = "search"
SUMMARIZE = "summarize"
class ToolAgent:
"""Agent that can invoke tools."""
def __init__(self, tools: Dict[str, Callable]):
self.tools = tools
self.client = anthropic.Anthropic()
self.max_iterations = 10
async def run(self, question: str) -> str:
"""Run agent until answer is obtained."""
messages = [{"role": "user", "content": question}]
iteration = 0
while iteration < self.max_iterations:
# Get response from model
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=self._build_system_prompt(),
messages=messages
)
# Check if done
if response.stop_reason == "end_turn":
return response.content[0].text
# Parse tool call
tool_call = self._parse_tool_call(response.content[0].text)
if not tool_call:
return response.content[0].text
# Execute tool
tool_name, tool_args = tool_call
if tool_name in self.tools:
result = self.tools[tool_name](**tool_args)
messages.append({"role": "assistant", "content": response.content[0].text})
messages.append({
"role": "user",
"content": f"Tool result: {result}"
})
else:
return f"Unknown tool: {tool_name}"
iteration += 1
return "Max iterations reached"
def _build_system_prompt(self) -> str:
"""Build system prompt listing available tools."""
tool_descriptions = "\n".join([
f"- {tool_name}: {tool.__doc__}" for tool_name, tool in self.tools.items()
])
return f"""You are a helpful assistant. Use the following tools to answer questions:
{tool_descriptions}
When you need to use a tool, write [TOOL_CALL] tool_name(arg1, arg2) [/TOOL_CALL]"""
def _parse_tool_call(self, text: str) -> Optional[Tuple[str, dict]]:
"""Parse tool call from response."""
import re
match = re.search(r"\[TOOL_CALL\]\s*(\w+)\((.*?)\)\s*\[/TOOL_CALL\]", text)
if match:
tool_name = match.group(1)
args_str = match.group(2)
# Parse arguments (simplified)
return (tool_name, {"query": args_str})
return None
Key Takeaway
Agentic RAG gives models control over retrieval decisions. Corrective RAG handles poor retrievals, Self-RAG evaluates answers, and Adaptive Retrieval adjusts strategies per question type for optimal results.
Exercises
- Implement corrective RAG with query reformulation
- Build self-evaluating system that knows when to retrieve
- Create adaptive retriever with question-type strategies
- Implement tool-augmented agent with 5+ tools
- Add iterative refinement loop with convergence detection
- Measure improvement over fixed-pipeline RAG
- Deploy agentic system with streaming responses