Advanced

Building Knowledge-Intensive Applications

Lesson 4 of 4 Estimated Time 50 min

Building Knowledge-Intensive Applications

Knowledge-intensive applications answer complex questions requiring synthesis across multiple documents and data sources. Examples include customer support systems, legal research platforms, and technical documentation assistants. Success requires sophisticated retrieval, ranking, and generation strategies.

Document QA at Scale

Large-scale document QA systems must handle millions of documents while maintaining search latency under 500ms.

from typing import List, Optional, Tuple
from dataclasses import dataclass
import time

@dataclass
class SearchResult:
    document_id: str
    title: str
    snippet: str
    relevance_score: float
    metadata: dict

class ScalableDocumentQA:
    """Question answering over large document collections."""

    def __init__(self, max_docs: int = 1_000_000):
        self.max_docs = max_docs
        self.doc_cache = {}
        self.index = None
        self.embedder = None

    def answer_question(
        self,
        question: str,
        top_k: int = 5,
        timeout_ms: int = 500
    ) -> Tuple[str, List[SearchResult]]:
        """Answer question with latency constraints."""
        start_time = time.time()

        # Step 1: Retrieve candidate documents with timeout
        time_remaining = timeout_ms / 1000
        results = self._retrieve_with_timeout(
            question,
            k=top_k * 2,
            timeout=time_remaining
        )

        # Step 2: Rerank if time permits
        time_used = (time.time() - start_time) * 1000
        if time_used < timeout_ms * 0.7:
            results = self._rerank_results(question, results)

        # Step 3: Generate answer from top results
        answer = self._generate_answer(question, results[:top_k])

        return answer, results[:top_k]

    def _retrieve_with_timeout(
        self,
        query: str,
        k: int,
        timeout: float
    ) -> List[SearchResult]:
        """Retrieve documents with timeout."""
        # In production: use BM25 for fast initial retrieval
        results = []
        # Simulate retrieval with latency check
        for i in range(min(k, 100)):
            if time.time() > timeout:
                break
            # result = SearchResult(...)
            # results.append(result)
            pass
        return results

    def _rerank_results(
        self,
        query: str,
        results: List[SearchResult]
    ) -> List[SearchResult]:
        """Rerank results using cross-encoder."""
        # In production: use cross-encoder model
        return sorted(
            results,
            key=lambda r: r.relevance_score,
            reverse=True
        )

    def _generate_answer(
        self,
        question: str,
        results: List[SearchResult]
    ) -> str:
        """Generate answer from top documents."""
        context = "\n".join([r.snippet for r in results])
        # In production: call LLM with context
        return f"Answer based on {len(results)} documents"

Semantic Search Optimization

Optimize semantic search for large collections through hierarchical retrieval.

from typing import Dict, List
import numpy as np

class HierarchicalSemanticSearch:
    """Multi-level semantic search for scale."""

    def __init__(
        self,
        embedding_dim: int = 384,
        partition_size: int = 10000
    ):
        self.embedding_dim = embedding_dim
        self.partition_size = partition_size
        self.partitions: Dict[int, np.ndarray] = {}
        self.partition_metadata: Dict[int, list] = {}

    def index_documents(self, embeddings: List[np.ndarray], metadata: List[dict]):
        """Index documents in partitions."""
        for i in range(0, len(embeddings), self.partition_size):
            partition_id = i // self.partition_size
            partition_emb = embeddings[i:i + self.partition_size]
            partition_meta = metadata[i:i + self.partition_size]

            self.partitions[partition_id] = np.array(partition_emb)
            self.partition_metadata[partition_id] = partition_meta

    def search(self, query_embedding: np.ndarray, k: int = 5) -> List[dict]:
        """Search across partitions."""
        # Step 1: Find best partitions
        partition_scores = {}
        for partition_id, partition_emb in self.partitions.items():
            # Compute centroid distance
            centroid = partition_emb.mean(axis=0)
            distance = np.linalg.norm(query_embedding - centroid)
            partition_scores[partition_id] = distance

        # Step 2: Search best partitions
        best_partitions = sorted(
            partition_scores.items(),
            key=lambda x: x[1]
        )[:3]  # Search top 3 partitions

        all_results = []
        for partition_id, _ in best_partitions:
            partition_emb = self.partitions[partition_id]
            distances = np.linalg.norm(
                partition_emb - query_embedding,
                axis=1
            )
            top_indices = np.argsort(distances)[:k]

            for idx in top_indices:
                all_results.append({
                    "doc_id": partition_id,
                    "index": idx,
                    "score": float(1 - distances[idx]),
                    "metadata": self.partition_metadata[partition_id][idx]
                })

        # Step 3: Global top-k
        return sorted(
            all_results,
            key=lambda x: x["score"],
            reverse=True
        )[:k]

Multi-Document Summarization

Synthesize information across many documents for comprehensive answers.

import anthropic

class MultiDocumentSynthesis:
    """Synthesize answers from multiple documents."""

    def __init__(self):
        self.client = anthropic.Anthropic()

    def synthesize_answer(
        self,
        question: str,
        documents: List[str],
        max_context_tokens: int = 8000
    ) -> str:
        """Synthesize answer from multiple documents."""
        # Step 1: Compress each document
        compressed_docs = []
        token_budget = max_context_tokens

        for doc in documents:
            if token_budget < 500:
                break

            compressed = self._compress_document(
                doc,
                question,
                min_tokens=200,
                max_tokens=min(1000, token_budget)
            )
            compressed_docs.append(compressed)
            token_budget -= len(compressed.split())

        # Step 2: Synthesize from compressed docs
        context = "\n---\n".join(compressed_docs)
        return self._synthesize_with_context(question, context)

    def _compress_document(
        self,
        document: str,
        question: str,
        min_tokens: int = 100,
        max_tokens: int = 1000
    ) -> str:
        """Extract relevant information from document."""
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=max_tokens,
            system=f"Extract key information from this document relevant to: {question}. Be concise but comprehensive.",
            messages=[{
                "role": "user",
                "content": f"Document:\n{document}"
            }]
        )

        return response.content[0].text

    def _synthesize_with_context(self, question: str, context: str) -> str:
        """Synthesize final answer."""
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            system="Synthesize a comprehensive answer based on the provided information.",
            messages=[{
                "role": "user",
                "content": f"Question: {question}\n\nInformation:\n{context}"
            }]
        )

        return response.content[0].text

Query Expansion and Variation

Improve recall by searching multiple query formulations.

class QueryExpansion:
    """Generate query variations for better recall."""

    def __init__(self):
        self.client = anthropic.Anthropic()

    def expand_query(
        self,
        original_query: str,
        num_variations: int = 5
    ) -> List[str]:
        """Generate query variations."""
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=500,
            system=f"Generate {num_variations} different ways to ask the same question. Return only the questions, one per line.",
            messages=[{
                "role": "user",
                "content": f"Original query: {original_query}"
            }]
        )

        variations = response.content[0].text.split("\n")
        return [original_query] + [v.strip() for v in variations if v.strip()][:num_variations]

    def search_with_expansion(
        self,
        retriever,
        query: str,
        k: int = 5
    ) -> List[dict]:
        """Retrieve using expanded queries."""
        expanded = self.expand_query(query, num_variations=3)

        # Retrieve for each variation
        all_results = {}
        for variant in expanded:
            results = retriever.retrieve(variant, k=k)
            for result in results:
                doc_id = result.get("id")
                if doc_id not in all_results:
                    all_results[doc_id] = result
                else:
                    # Aggregate scores
                    all_results[doc_id]["score"] = (
                        all_results[doc_id]["score"] +
                        result.get("score", 0)
                    ) / 2

        # Return top-k by aggregated score
        return sorted(
            all_results.values(),
            key=lambda x: x.get("score", 0),
            reverse=True
        )[:k]

Answer Verification and Fact-Checking

Verify generated answers against source documents.

class AnswerVerification:
    """Verify and fact-check generated answers."""

    def __init__(self):
        self.client = anthropic.Anthropic()

    def verify_answer(
        self,
        question: str,
        answer: str,
        source_documents: List[str],
        confidence_threshold: float = 0.7
    ) -> dict:
        """Verify answer facts against sources."""
        # Extract claims from answer
        claims = self._extract_claims(answer)

        # Verify each claim
        verification_results = {}
        for claim in claims:
            verified = self._verify_claim(claim, source_documents)
            verification_results[claim] = verified

        # Compute overall confidence
        verified_count = sum(
            1 for v in verification_results.values() if v["verified"]
        )
        confidence = verified_count / len(claims) if claims else 0

        return {
            "answer": answer,
            "confidence": confidence,
            "meets_threshold": confidence >= confidence_threshold,
            "claim_details": verification_results
        }

    def _extract_claims(self, answer: str) -> List[str]:
        """Extract factual claims from answer."""
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=500,
            system="Extract factual claims (not opinions). Return one claim per line.",
            messages=[{
                "role": "user",
                "content": f"Answer:\n{answer}"
            }]
        )

        claims = response.content[0].text.split("\n")
        return [c.strip() for c in claims if c.strip()]

    def _verify_claim(self, claim: str, documents: List[str]) -> dict:
        """Check if claim is supported by documents."""
        doc_context = "\n---\n".join(documents[:5])  # Limit context

        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=100,
            system="Determine if the claim is supported by the provided documents. Respond with: SUPPORTED, CONTRADICTED, or NOT_FOUND.",
            messages=[{
                "role": "user",
                "content": f"Claim: {claim}\n\nDocuments:\n{doc_context}"
            }]
        )

        result_text = response.content[0].text.upper()
        return {
            "claim": claim,
            "verified": "SUPPORTED" in result_text,
            "contradicted": "CONTRADICTED" in result_text,
            "not_found": "NOT_FOUND" in result_text
        }

Key Takeaway

Knowledge-intensive applications require hierarchical retrieval for scale, multi-document synthesis, query expansion for recall, and answer verification for reliability. Each technique addresses specific production challenges.

Exercises

  1. Build document QA system handling 100K+ documents
  2. Implement hierarchical semantic search with partitioning
  3. Create multi-document synthesizer for comprehensive answers
  4. Build query expansion system with semantic variations
  5. Implement answer verification with fact-checking
  6. Optimize end-to-end latency across all components
  7. Measure and improve recall across document collections