Advanced

Agentic RAG and Self-Correcting Systems

Lesson 2 of 4 Estimated Time 55 min

Agentic RAG and Self-Correcting Systems

Agentic RAG empowers the model to iteratively refine retrieval and reasoning. Rather than fixed retrieval pipelines, agentic systems allow the model to decide when to retrieve, what to retrieve, and how to refine answers through multiple reasoning steps.

Corrective RAG

Corrective RAG evaluates retrieval quality and reformulates queries when documents are unhelpful.

from typing import Optional
from enum import Enum
import anthropic

class RetrievalGrade(Enum):
    RELEVANT = "relevant"
    IRRELEVANT = "irrelevant"
    AMBIGUOUS = "ambiguous"

class CorrectiveRAG:
    """RAG system that corrects poor retrievals."""

    def __init__(self, retriever, grader_model: str = "claude-3-5-sonnet-20241022"):
        self.retriever = retriever
        self.grader_model = grader_model
        self.client = anthropic.Anthropic()

    def generate_with_correction(
        self,
        question: str,
        max_iterations: int = 3
    ) -> str:
        """Generate answer with iterative retrieval correction."""
        current_question = question
        iteration = 0

        while iteration < max_iterations:
            # Retrieve documents
            documents = self.retriever.retrieve(current_question, k=5)

            # Grade retrieval quality
            grade = self._grade_retrieval(current_question, documents)

            if grade == RetrievalGrade.RELEVANT:
                # Documents are good, generate answer
                return self._generate_answer(current_question, documents)

            elif grade == RetrievalGrade.IRRELEVANT:
                # No relevant documents, reformulate query
                current_question = self._reformulate_query(question)
                iteration += 1

            else:  # AMBIGUOUS
                # Partially relevant, ask for clarification
                return self._generate_answer_with_caveat(
                    current_question, documents
                )

        return "Unable to find relevant information after multiple retrieval attempts."

    def _grade_retrieval(self, question: str, documents: list) -> RetrievalGrade:
        """Evaluate relevance of retrieved documents."""
        doc_text = "\n".join([d["content"] for d in documents])

        response = self.client.messages.create(
            model=self.grader_model,
            max_tokens=50,
            system="Grade whether the provided documents are relevant to the question. Respond with ONLY: relevant, irrelevant, or ambiguous.",
            messages=[{
                "role": "user",
                "content": f"Question: {question}\n\nDocuments:\n{doc_text}"
            }]
        )

        grade_text = response.content[0].text.lower().strip()
        if "relevant" in grade_text:
            return RetrievalGrade.RELEVANT
        elif "irrelevant" in grade_text:
            return RetrievalGrade.IRRELEVANT
        else:
            return RetrievalGrade.AMBIGUOUS

    def _reformulate_query(self, original_question: str) -> str:
        """Create improved query."""
        response = self.client.messages.create(
            model=self.grader_model,
            max_tokens=100,
            system="Reformulate the user's question to improve search results. Return only the reformulated question.",
            messages=[{
                "role": "user",
                "content": f"Original question: {original_question}"
            }]
        )

        return response.content[0].text.strip()

    def _generate_answer(self, question: str, documents: list) -> str:
        """Generate answer from relevant documents."""
        doc_text = "\n".join([d["content"] for d in documents])

        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            system="Answer the question using the provided documents.",
            messages=[{
                "role": "user",
                "content": f"Question: {question}\n\nDocuments:\n{doc_text}"
            }]
        )

        return response.content[0].text

    def _generate_answer_with_caveat(
        self,
        question: str,
        documents: list
    ) -> str:
        """Generate answer noting limited information."""
        doc_text = "\n".join([d["content"] for d in documents])

        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            system="Answer the question using the provided documents. Note any gaps in available information.",
            messages=[{
                "role": "user",
                "content": f"Question: {question}\n\nDocuments:\n{doc_text}"
            }]
        )

        return response.content[0].text

Self-RAG

Self-RAG enables the model to decide when retrieval is necessary and evaluate its own answers.

from typing import Tuple

class SelfRAG:
    """RAG where model controls retrieval decisions."""

    def __init__(self, retriever, model: str = "claude-3-5-sonnet-20241022"):
        self.retriever = retriever
        self.model = model
        self.client = anthropic.Anthropic()

    def generate_with_self_reflection(self, question: str) -> str:
        """Generate answer with internal retrieval decisions."""
        # Step 1: Decide if retrieval is needed
        needs_retrieval = self._should_retrieve(question)

        if needs_retrieval:
            documents = self.retriever.retrieve(question, k=3)
            answer = self._generate_with_documents(question, documents)
        else:
            answer = self._generate_from_knowledge(question)

        # Step 2: Self-evaluate answer quality
        is_sufficient = self._is_answer_sufficient(question, answer)

        if not is_sufficient:
            # Refine answer with additional retrieval
            documents = self.retriever.retrieve(question, k=5)
            answer = self._generate_with_documents(question, documents)

        return answer

    def _should_retrieve(self, question: str) -> bool:
        """Decide if retrieval is necessary."""
        response = self.client.messages.create(
            model=self.model,
            max_tokens=50,
            system="Decide if you need to retrieve information to answer this question. Respond with ONLY: yes or no.",
            messages=[{
                "role": "user",
                "content": f"Question: {question}"
            }]
        )

        return "yes" in response.content[0].text.lower()

    def _generate_from_knowledge(self, question: str) -> str:
        """Generate answer from training knowledge."""
        response = self.client.messages.create(
            model=self.model,
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": question
            }]
        )

        return response.content[0].text

    def _generate_with_documents(self, question: str, documents: list) -> str:
        """Generate answer with retrieved context."""
        doc_text = "\n".join([d["content"] for d in documents])

        response = self.client.messages.create(
            model=self.model,
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": f"Question: {question}\n\nContext:\n{doc_text}"
            }]
        )

        return response.content[0].text

    def _is_answer_sufficient(self, question: str, answer: str) -> bool:
        """Evaluate if answer adequately addresses question."""
        response = self.client.messages.create(
            model=self.model,
            max_tokens=50,
            system="Evaluate if the answer adequately addresses the question. Respond with ONLY: yes or no.",
            messages=[{
                "role": "user",
                "content": f"Question: {question}\n\nAnswer: {answer}"
            }]
        )

        return "yes" in response.content[0].text.lower()

Adaptive Retrieval

Adapt retrieval strategy based on question type and context.

from typing import List, Dict
from dataclasses import dataclass

@dataclass
class QueryAnalysis:
    question_type: str  # factual, reasoning, comparative, etc.
    complexity: str  # simple, moderate, complex
    required_modalities: List[str]  # text, image, table, etc.
    estimated_answer_length: str  # short, medium, long

class AdaptiveRetriever:
    """Retrieve adaptively based on query characteristics."""

    def __init__(self, retriever):
        self.retriever = retriever
        self.client = anthropic.Anthropic()

    def retrieve_adaptive(self, question: str) -> List[dict]:
        """Retrieve documents with strategy adapted to question."""
        # Analyze question
        analysis = self._analyze_query(question)

        # Determine retrieval strategy
        strategy = self._select_strategy(analysis)

        # Execute retrieval
        documents = self._execute_retrieval(question, strategy)

        return documents

    def _analyze_query(self, question: str) -> QueryAnalysis:
        """Analyze question characteristics."""
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=200,
            system="Analyze the question and return JSON with question_type, complexity, required_modalities, and estimated_answer_length",
            messages=[{
                "role": "user",
                "content": f"Analyze: {question}"
            }]
        )

        # Parse response (simplified)
        text = response.content[0].text.lower()
        return QueryAnalysis(
            question_type="reasoning" if "why" in question else "factual",
            complexity="complex" if len(question) > 100 else "simple",
            required_modalities=["text"],
            estimated_answer_length="medium"
        )

    def _select_strategy(self, analysis: QueryAnalysis) -> Dict:
        """Select retrieval strategy based on analysis."""
        if analysis.question_type == "reasoning":
            return {
                "k": 10,
                "include_neighbors": True,
                "use_graph": True,
                "rerank": True
            }
        elif analysis.question_type == "comparative":
            return {
                "k": 15,
                "include_similar_docs": True,
                "use_graph": False,
                "rerank": True
            }
        else:
            return {
                "k": 5,
                "include_neighbors": False,
                "use_graph": False,
                "rerank": False
            }

    def _execute_retrieval(
        self,
        question: str,
        strategy: Dict
    ) -> List[dict]:
        """Execute retrieval with selected strategy."""
        k = strategy.get("k", 5)
        documents = self.retriever.retrieve(question, k=k)

        if strategy.get("rerank"):
            documents = self._rerank_documents(question, documents)

        return documents

    def _rerank_documents(
        self,
        question: str,
        documents: List[dict]
    ) -> List[dict]:
        """Rerank documents by relevance."""
        # In production: use cross-encoder for reranking
        return sorted(
            documents,
            key=lambda d: d.get("score", 0),
            reverse=True
        )

Tool-Augmented Agents

Enable agents to use tools for retrieval and computation.

from enum import Enum
from typing import Callable, Dict, Optional, Tuple

class ToolType(Enum):
    RETRIEVE = "retrieve"
    CALCULATE = "calculate"
    SEARCH = "search"
    SUMMARIZE = "summarize"

class ToolAgent:
    """Agent that can invoke tools."""

    def __init__(self, tools: Dict[str, Callable]):
        self.tools = tools
        self.client = anthropic.Anthropic()
        self.max_iterations = 10

    async def run(self, question: str) -> str:
        """Run agent until answer is obtained."""
        messages = [{"role": "user", "content": question}]
        iteration = 0

        while iteration < self.max_iterations:
            # Get response from model
            response = self.client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1024,
                system=self._build_system_prompt(),
                messages=messages
            )

            # Check if done
            if response.stop_reason == "end_turn":
                return response.content[0].text

            # Parse tool call
            tool_call = self._parse_tool_call(response.content[0].text)
            if not tool_call:
                return response.content[0].text

            # Execute tool
            tool_name, tool_args = tool_call
            if tool_name in self.tools:
                result = self.tools[tool_name](**tool_args)
                messages.append({"role": "assistant", "content": response.content[0].text})
                messages.append({
                    "role": "user",
                    "content": f"Tool result: {result}"
                })
            else:
                return f"Unknown tool: {tool_name}"

            iteration += 1

        return "Max iterations reached"

    def _build_system_prompt(self) -> str:
        """Build system prompt listing available tools."""
        tool_descriptions = "\n".join([
            f"- {tool_name}: {tool.__doc__}" for tool_name, tool in self.tools.items()
        ])
        return f"""You are a helpful assistant. Use the following tools to answer questions:
        {tool_descriptions}
        When you need to use a tool, write [TOOL_CALL] tool_name(arg1, arg2) [/TOOL_CALL]"""

    def _parse_tool_call(self, text: str) -> Optional[Tuple[str, dict]]:
        """Parse tool call from response."""
        import re
        match = re.search(r"\[TOOL_CALL\]\s*(\w+)\((.*?)\)\s*\[/TOOL_CALL\]", text)
        if match:
            tool_name = match.group(1)
            args_str = match.group(2)
            # Parse arguments (simplified)
            return (tool_name, {"query": args_str})
        return None

Key Takeaway

Agentic RAG gives models control over retrieval decisions. Corrective RAG handles poor retrievals, Self-RAG evaluates answers, and Adaptive Retrieval adjusts strategies per question type for optimal results.

Exercises

  1. Implement corrective RAG with query reformulation
  2. Build self-evaluating system that knows when to retrieve
  3. Create adaptive retriever with question-type strategies
  4. Implement tool-augmented agent with 5+ tools
  5. Add iterative refinement loop with convergence detection
  6. Measure improvement over fixed-pipeline RAG
  7. Deploy agentic system with streaming responses