Building Knowledge-Intensive Applications
Building Knowledge-Intensive Applications
Knowledge-intensive applications answer complex questions requiring synthesis across multiple documents and data sources. Examples include customer support systems, legal research platforms, and technical documentation assistants. Success requires sophisticated retrieval, ranking, and generation strategies.
Document QA at Scale
Large-scale document QA systems must handle millions of documents while maintaining search latency under 500ms.
from typing import List, Optional, Tuple
from dataclasses import dataclass
import time
@dataclass
class SearchResult:
document_id: str
title: str
snippet: str
relevance_score: float
metadata: dict
class ScalableDocumentQA:
"""Question answering over large document collections."""
def __init__(self, max_docs: int = 1_000_000):
self.max_docs = max_docs
self.doc_cache = {}
self.index = None
self.embedder = None
def answer_question(
self,
question: str,
top_k: int = 5,
timeout_ms: int = 500
) -> Tuple[str, List[SearchResult]]:
"""Answer question with latency constraints."""
start_time = time.time()
# Step 1: Retrieve candidate documents with timeout
time_remaining = timeout_ms / 1000
results = self._retrieve_with_timeout(
question,
k=top_k * 2,
timeout=time_remaining
)
# Step 2: Rerank if time permits
time_used = (time.time() - start_time) * 1000
if time_used < timeout_ms * 0.7:
results = self._rerank_results(question, results)
# Step 3: Generate answer from top results
answer = self._generate_answer(question, results[:top_k])
return answer, results[:top_k]
def _retrieve_with_timeout(
self,
query: str,
k: int,
timeout: float
) -> List[SearchResult]:
"""Retrieve documents with timeout."""
# In production: use BM25 for fast initial retrieval
results = []
# Simulate retrieval with latency check
for i in range(min(k, 100)):
if time.time() > timeout:
break
# result = SearchResult(...)
# results.append(result)
pass
return results
def _rerank_results(
self,
query: str,
results: List[SearchResult]
) -> List[SearchResult]:
"""Rerank results using cross-encoder."""
# In production: use cross-encoder model
return sorted(
results,
key=lambda r: r.relevance_score,
reverse=True
)
def _generate_answer(
self,
question: str,
results: List[SearchResult]
) -> str:
"""Generate answer from top documents."""
context = "\n".join([r.snippet for r in results])
# In production: call LLM with context
return f"Answer based on {len(results)} documents"
Semantic Search Optimization
Optimize semantic search for large collections through hierarchical retrieval.
from typing import Dict, List
import numpy as np
class HierarchicalSemanticSearch:
"""Multi-level semantic search for scale."""
def __init__(
self,
embedding_dim: int = 384,
partition_size: int = 10000
):
self.embedding_dim = embedding_dim
self.partition_size = partition_size
self.partitions: Dict[int, np.ndarray] = {}
self.partition_metadata: Dict[int, list] = {}
def index_documents(self, embeddings: List[np.ndarray], metadata: List[dict]):
"""Index documents in partitions."""
for i in range(0, len(embeddings), self.partition_size):
partition_id = i // self.partition_size
partition_emb = embeddings[i:i + self.partition_size]
partition_meta = metadata[i:i + self.partition_size]
self.partitions[partition_id] = np.array(partition_emb)
self.partition_metadata[partition_id] = partition_meta
def search(self, query_embedding: np.ndarray, k: int = 5) -> List[dict]:
"""Search across partitions."""
# Step 1: Find best partitions
partition_scores = {}
for partition_id, partition_emb in self.partitions.items():
# Compute centroid distance
centroid = partition_emb.mean(axis=0)
distance = np.linalg.norm(query_embedding - centroid)
partition_scores[partition_id] = distance
# Step 2: Search best partitions
best_partitions = sorted(
partition_scores.items(),
key=lambda x: x[1]
)[:3] # Search top 3 partitions
all_results = []
for partition_id, _ in best_partitions:
partition_emb = self.partitions[partition_id]
distances = np.linalg.norm(
partition_emb - query_embedding,
axis=1
)
top_indices = np.argsort(distances)[:k]
for idx in top_indices:
all_results.append({
"doc_id": partition_id,
"index": idx,
"score": float(1 - distances[idx]),
"metadata": self.partition_metadata[partition_id][idx]
})
# Step 3: Global top-k
return sorted(
all_results,
key=lambda x: x["score"],
reverse=True
)[:k]
Multi-Document Summarization
Synthesize information across many documents for comprehensive answers.
import anthropic
class MultiDocumentSynthesis:
"""Synthesize answers from multiple documents."""
def __init__(self):
self.client = anthropic.Anthropic()
def synthesize_answer(
self,
question: str,
documents: List[str],
max_context_tokens: int = 8000
) -> str:
"""Synthesize answer from multiple documents."""
# Step 1: Compress each document
compressed_docs = []
token_budget = max_context_tokens
for doc in documents:
if token_budget < 500:
break
compressed = self._compress_document(
doc,
question,
min_tokens=200,
max_tokens=min(1000, token_budget)
)
compressed_docs.append(compressed)
token_budget -= len(compressed.split())
# Step 2: Synthesize from compressed docs
context = "\n---\n".join(compressed_docs)
return self._synthesize_with_context(question, context)
def _compress_document(
self,
document: str,
question: str,
min_tokens: int = 100,
max_tokens: int = 1000
) -> str:
"""Extract relevant information from document."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=max_tokens,
system=f"Extract key information from this document relevant to: {question}. Be concise but comprehensive.",
messages=[{
"role": "user",
"content": f"Document:\n{document}"
}]
)
return response.content[0].text
def _synthesize_with_context(self, question: str, context: str) -> str:
"""Synthesize final answer."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system="Synthesize a comprehensive answer based on the provided information.",
messages=[{
"role": "user",
"content": f"Question: {question}\n\nInformation:\n{context}"
}]
)
return response.content[0].text
Query Expansion and Variation
Improve recall by searching multiple query formulations.
class QueryExpansion:
"""Generate query variations for better recall."""
def __init__(self):
self.client = anthropic.Anthropic()
def expand_query(
self,
original_query: str,
num_variations: int = 5
) -> List[str]:
"""Generate query variations."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
system=f"Generate {num_variations} different ways to ask the same question. Return only the questions, one per line.",
messages=[{
"role": "user",
"content": f"Original query: {original_query}"
}]
)
variations = response.content[0].text.split("\n")
return [original_query] + [v.strip() for v in variations if v.strip()][:num_variations]
def search_with_expansion(
self,
retriever,
query: str,
k: int = 5
) -> List[dict]:
"""Retrieve using expanded queries."""
expanded = self.expand_query(query, num_variations=3)
# Retrieve for each variation
all_results = {}
for variant in expanded:
results = retriever.retrieve(variant, k=k)
for result in results:
doc_id = result.get("id")
if doc_id not in all_results:
all_results[doc_id] = result
else:
# Aggregate scores
all_results[doc_id]["score"] = (
all_results[doc_id]["score"] +
result.get("score", 0)
) / 2
# Return top-k by aggregated score
return sorted(
all_results.values(),
key=lambda x: x.get("score", 0),
reverse=True
)[:k]
Answer Verification and Fact-Checking
Verify generated answers against source documents.
class AnswerVerification:
"""Verify and fact-check generated answers."""
def __init__(self):
self.client = anthropic.Anthropic()
def verify_answer(
self,
question: str,
answer: str,
source_documents: List[str],
confidence_threshold: float = 0.7
) -> dict:
"""Verify answer facts against sources."""
# Extract claims from answer
claims = self._extract_claims(answer)
# Verify each claim
verification_results = {}
for claim in claims:
verified = self._verify_claim(claim, source_documents)
verification_results[claim] = verified
# Compute overall confidence
verified_count = sum(
1 for v in verification_results.values() if v["verified"]
)
confidence = verified_count / len(claims) if claims else 0
return {
"answer": answer,
"confidence": confidence,
"meets_threshold": confidence >= confidence_threshold,
"claim_details": verification_results
}
def _extract_claims(self, answer: str) -> List[str]:
"""Extract factual claims from answer."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
system="Extract factual claims (not opinions). Return one claim per line.",
messages=[{
"role": "user",
"content": f"Answer:\n{answer}"
}]
)
claims = response.content[0].text.split("\n")
return [c.strip() for c in claims if c.strip()]
def _verify_claim(self, claim: str, documents: List[str]) -> dict:
"""Check if claim is supported by documents."""
doc_context = "\n---\n".join(documents[:5]) # Limit context
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=100,
system="Determine if the claim is supported by the provided documents. Respond with: SUPPORTED, CONTRADICTED, or NOT_FOUND.",
messages=[{
"role": "user",
"content": f"Claim: {claim}\n\nDocuments:\n{doc_context}"
}]
)
result_text = response.content[0].text.upper()
return {
"claim": claim,
"verified": "SUPPORTED" in result_text,
"contradicted": "CONTRADICTED" in result_text,
"not_found": "NOT_FOUND" in result_text
}
Key Takeaway
Knowledge-intensive applications require hierarchical retrieval for scale, multi-document synthesis, query expansion for recall, and answer verification for reliability. Each technique addresses specific production challenges.
Exercises
- Build document QA system handling 100K+ documents
- Implement hierarchical semantic search with partitioning
- Create multi-document synthesizer for comprehensive answers
- Build query expansion system with semantic variations
- Implement answer verification with fact-checking
- Optimize end-to-end latency across all components
- Measure and improve recall across document collections