RAG with LlamaIndex
RAG with LlamaIndex
LlamaIndex (formerly GPT Index) provides a production-ready framework for building RAG systems with minimal boilerplate. It abstracts away document loading, chunking, indexing, and retrieval while maintaining flexibility.
LlamaIndex Architecture
LlamaIndex follows a modular architecture: data connectors load documents, node parsers chunk content, vector stores index embeddings, and query engines retrieve and synthesize answers.
from llama_index.core import Document, VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.core.embeddings import OpenAIEmbedding
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.anthropic import Claude
from typing import List
class LlamaIndexRAG:
"""RAG system using LlamaIndex."""
def __init__(self, docs_dir: str):
# Initialize embedding model
self.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
# Initialize LLM
self.llm = Claude(model="claude-3-5-sonnet-20241022")
# Load documents
self.documents = self._load_documents(docs_dir)
# Create index
self.index = self._create_index()
def _load_documents(self, docs_dir: str) -> List[Document]:
"""Load documents from directory."""
# In production:
# reader = SimpleDirectoryReader(input_dir=docs_dir)
# documents = reader.load_data()
return []
def _create_index(self):
"""Create vector store index."""
# In production:
# return VectorStoreIndex.from_documents(
# self.documents,
# embed_model=self.embed_model
# )
return None
def query(self, question: str) -> str:
"""Query the index."""
# query_engine = self.index.as_query_engine(llm=self.llm)
# response = query_engine.query(question)
# return str(response)
return ""
Data Connectors and Loaders
LlamaIndex supports multiple data sources through connectors.
from typing import Optional, List, Dict
class DataConnector:
"""Base class for data connectors."""
def load_data(self, *args, **kwargs) -> List[Document]:
"""Load data from source."""
raise NotImplementedError
class PDFConnector(DataConnector):
"""Load PDFs with metadata extraction."""
def load_data(self, file_path: str) -> List[Document]:
"""Load PDF file."""
# In production:
# from PyPDF2 import PdfReader
# reader = PdfReader(file_path)
# documents = []
# for i, page in enumerate(reader.pages):
# text = page.extract_text()
# documents.append(Document(
# text=text,
# metadata={"page": i, "source": file_path}
# ))
# return documents
return []
class WebConnector(DataConnector):
"""Load web content."""
def load_data(self, urls: List[str]) -> List[Document]:
"""Load web pages."""
# In production:
# import requests
# from bs4 import BeautifulSoup
# documents = []
# for url in urls:
# response = requests.get(url)
# soup = BeautifulSoup(response.content, 'html.parser')
# text = soup.get_text()
# documents.append(Document(
# text=text,
# metadata={"url": url}
# ))
# return documents
return []
class DatabaseConnector(DataConnector):
"""Load from SQL database."""
def load_data(
self,
connection_string: str,
tables: List[str]
) -> List[Document]:
"""Load database records as documents."""
# In production:
# import sqlalchemy
# engine = sqlalchemy.create_engine(connection_string)
# documents = []
# for table_name in tables:
# df = pd.read_sql_table(table_name, engine)
# for _, row in df.iterrows():
# documents.append(Document(
# text=row.to_string(),
# metadata={"table": table_name}
# ))
# return documents
return []
Node Parsers and Chunking
Control how documents are split into indexable units.
from typing import Optional, List
class NodeParser:
"""Base parser for creating nodes from documents."""
def parse_documents(self, documents: List[Document]) -> List:
"""Parse documents into nodes."""
raise NotImplementedError
class SimpleNodeParser(NodeParser):
"""Basic fixed-size chunking."""
def __init__(self, chunk_size: int = 1024, chunk_overlap: int = 20):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def parse_documents(self, documents: List[Document]) -> List:
"""Split documents into fixed-size chunks."""
nodes = []
for doc in documents:
chunks = self._chunk_text(
doc.text,
self.chunk_size,
self.chunk_overlap
)
for i, chunk in enumerate(chunks):
# node = TextNode(
# text=chunk,
# metadata={**doc.metadata, "chunk_index": i}
# )
# nodes.append(node)
pass
return nodes
def _chunk_text(
self,
text: str,
chunk_size: int,
overlap: int
) -> List[str]:
"""Split text into overlapping chunks."""
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunks.append(text[start:end])
start += chunk_size - overlap
return chunks
class SemanticNodeParser(NodeParser):
"""Chunk based on semantic boundaries."""
def __init__(self, similarity_threshold: float = 0.5):
self.threshold = similarity_threshold
# In production: from transformers import SentenceTransformer
# self.model = SentenceTransformer('all-MiniLM-L6-v2')
def parse_documents(self, documents: List[Document]) -> List:
"""Split documents at semantic boundaries."""
nodes = []
for doc in documents:
sentences = doc.text.split(". ")
chunks = self._chunk_by_similarity(sentences)
for i, chunk in enumerate(chunks):
# Create node
pass
return nodes
def _chunk_by_similarity(self, sentences: List[str]) -> List[str]:
"""Group sentences with high semantic similarity."""
if not sentences:
return []
# In production: compute embeddings and cluster
chunks = []
current_chunk = [sentences[0]]
for sentence in sentences[1:]:
# similarity = self._compute_similarity(
# current_chunk, sentence
# )
# if similarity > self.threshold:
# current_chunk.append(sentence)
# else:
# chunks.append(". ".join(current_chunk))
# current_chunk = [sentence]
pass
if current_chunk:
chunks.append(". ".join(current_chunk))
return chunks
Index Types
LlamaIndex supports multiple index types optimized for different scenarios.
class IndexType:
"""Different index types for different use cases."""
VECTOR = "vector" # Semantic search
TREE = "tree" # Hierarchical summarization
KEYWORD = "keyword" # BM25 keyword search
HYBRID = "hybrid" # Combine multiple indexes
class VectorIndex:
"""Vector store index for semantic search."""
def __init__(self, embed_model, llm):
self.embed_model = embed_model
self.llm = llm
def build(self, nodes):
"""Build vector index."""
# embeddings = [self.embed_model.get_text_embedding(node.text) for node in nodes]
# Store in vector database
pass
class TreeIndex:
"""Hierarchical tree index."""
def __init__(self, llm, chunk_size: int = 10):
self.llm = llm
self.chunk_size = chunk_size
def build(self, nodes):
"""Build tree structure."""
# Group nodes into chunks
# Summarize each chunk
# Create parent nodes from summaries
# Repeat until single root
pass
class HybridIndex:
"""Combine vector and keyword search."""
def __init__(self, embed_model, llm):
self.vector_index = VectorIndex(embed_model, llm)
self.keyword_index = KeywordIndex()
self.fusion_method = "rrf" # Reciprocal rank fusion
def build(self, nodes):
"""Build both indexes."""
self.vector_index.build(nodes)
self.keyword_index.build(nodes)
def search(self, query: str, k: int = 5):
"""Search both indexes and fuse results."""
vector_results = self.vector_index.search(query, k=k)
keyword_results = self.keyword_index.search(query, k=k)
return self._fuse_results(vector_results, keyword_results)
def _fuse_results(self, results1: List, results2: List) -> List:
"""Fuse results using reciprocal rank fusion."""
scores = {}
for rank, result in enumerate(results1):
scores[result.id] = scores.get(result.id, 0) + 1 / (60 + rank)
for rank, result in enumerate(results2):
scores[result.id] = scores.get(result.id, 0) + 1 / (60 + rank)
fused = sorted(
scores.items(),
key=lambda x: x[1],
reverse=True
)
return [item[0] for item in fused]
Query Engines
LlamaIndex query engines orchestrate retrieval and generation.
class QueryEngine:
"""Execute queries against index."""
def query(self, question: str) -> str:
"""Execute query."""
raise NotImplementedError
class SimpleQueryEngine(QueryEngine):
"""Basic RAG: retrieve then generate."""
def __init__(self, index, llm, retrieval_k: int = 3):
self.index = index
self.llm = llm
self.retrieval_k = retrieval_k
def query(self, question: str) -> str:
"""Retrieve relevant nodes and generate answer."""
# nodes = self.index.retrieve(question, k=self.retrieval_k)
# context = "\n".join([node.text for node in nodes])
# response = self.llm.complete(
# prompt=f"Context: {context}\n\nQuestion: {question}"
# )
# return response.text
return ""
class HierarchicalQueryEngine(QueryEngine):
"""Refine answer iteratively."""
def __init__(self, index, llm):
self.index = index
self.llm = llm
def query(self, question: str, max_iterations: int = 3) -> str:
"""Iteratively refine answer."""
current_answer = ""
current_question = question
for i in range(max_iterations):
# nodes = self.index.retrieve(current_question, k=5)
# context = "\n".join([node.text for node in nodes])
# response = self.llm.complete(
# prompt=f"Context: {context}\n\nQuestion: {current_question}"
# )
# current_answer = response.text
# if i < max_iterations - 1:
# # Refine question for next iteration
# current_question = self._refine_question(
# question, current_answer
# )
pass
return current_answer
def _refine_question(self, original: str, answer: str) -> str:
"""Generate refined question."""
# response = self.llm.complete(
# prompt=f"Original: {original}\nCurrent answer: {answer}\n\nRefine question:"
# )
# return response.text
return ""
Key Takeaway
LlamaIndex provides production-ready abstraction layers for building RAG systems. Its modular architecture supports flexible data sources, intelligent chunking, multiple index types, and sophisticated query engines.
Exercises
- Load documents from multiple sources (PDF, web, database)
- Implement custom node parser with semantic chunking
- Build hybrid index combining vector and keyword search
- Create multi-stage query engine with refinement loops
- Deploy RAG service with caching and monitoring
- Compare performance of different index types
- Fine-tune embedding model for domain-specific queries