24  Text Chunking for Embeddings

NoteChapter Overview

Document RAG systems don’t embed individual words—they embed chunks of text that capture semantic meaning in context. This chapter explores why chunking matters, how different strategies affect retrieval quality, and practical techniques for preparing text documents for embedding systems. You’ll learn fixed-size, sentence-based, semantic, and hierarchical chunking approaches, with code examples and guidance for choosing the right strategy for your use case.

A common misconception about embedding-based retrieval systems is that they embed every word individually. In reality, RAG systems embed chunks of text—larger semantic units that preserve context and meaning. Understanding chunking is essential because it directly impacts retrieval quality: poor chunking leads to poor results, regardless of how sophisticated your embedding model or vector database might be.

24.1 Why Chunking Matters

When building a RAG system, you face a fundamental question: what unit of text should receive its own embedding? The answer is almost never “individual words” and rarely “entire documents.”

24.1.1 The Problem with Word-Level Embeddings

Word embeddings (like Word2Vec or GloVe) represent individual words as vectors. While valuable for understanding vocabulary relationships, they’re insufficient for retrieval:

# Word embeddings: one vector per word
word_embeddings = {
    'bank': [0.2, 0.8, 0.1, ...],  # But which meaning? Financial? River?
    'river': [0.1, 0.3, 0.9, ...],
    'money': [0.8, 0.2, 0.1, ...],
}

# The word 'bank' has the same embedding regardless of context
# "I went to the bank to deposit money" vs "I sat on the river bank"
# Same vector, completely different meanings!

Modern embedding models solve this by processing entire passages, producing a single vector that captures the contextual meaning of the whole chunk:

# Chunk embeddings: one vector per passage
chunk_embedding = encoder.encode(
    "I went to the bank to deposit my paycheck into savings."
)
# This single 1024-dim vector captures: financial institution,
# personal finance, banking transaction, savings context

24.1.2 The Problem with Document-Level Embeddings

At the other extreme, embedding entire documents creates different problems:

  1. Diluted semantics: A 50-page document covers many topics. Its embedding becomes a vague average, matching poorly with specific queries.

  2. Context window limits: LLMs have finite context windows (4K-128K tokens). Retrieved chunks must fit within these limits alongside the query and system prompt.

  3. Retrieval granularity: Users ask specific questions. Returning entire documents forces them to hunt for the relevant paragraph.

# Document-level embedding: too coarse
doc_embedding = encoder.encode(entire_50_page_document)
# This vector represents the "average" meaning of 50 pages
# Query: "What is the return policy for electronics?"
# Result: Entire product manual returned, user must find the relevant section

24.1.3 The Chunking Sweet Spot

Chunking finds the middle ground: units large enough to preserve context but small enough for precise retrieval and LLM consumption.

Embedding granularity trade-offs
Embedding Level Typical Size Context Preservation Retrieval Precision LLM Friendly
Word 1 token None N/A N/A
Sentence 10-30 tokens Low High Yes
Paragraph 50-200 tokens Medium Medium Yes
Chunk 100-500 tokens High High Yes
Document 1000+ tokens Complete but diluted Low Often too large

24.2 Chunk Embeddings vs Word Embeddings

Let’s clarify the distinction that confuses many practitioners:

24.2.1 Word Embeddings (Historical Context)

Word embeddings like Word2Vec (2013) revolutionized NLP by learning dense vector representations for individual words:

# Word2Vec: learns one vector per vocabulary word
# Training: predict surrounding words from center word (or vice versa)

from gensim.models import Word2Vec

sentences = [["the", "cat", "sat", "on", "mat"], ...]
model = Word2Vec(sentences, vector_size=300)

# Each word gets exactly one 300-dim vector
cat_vector = model.wv['cat']  # Always the same vector for 'cat'

Key limitation: No context sensitivity. “Bank” has one vector whether discussing finance or rivers.

24.2.2 Chunk Embeddings (Modern RAG)

Modern embedding models (Sentence-BERT, OpenAI embeddings, Cohere, etc.) process entire text passages:

# Modern embeddings: one vector per input passage
from sentence_transformers import SentenceTransformer

encoder = SentenceTransformer('all-MiniLM-L6-v2')

# Same word, different contexts → different chunk embeddings
chunk1 = "The bank approved my mortgage application yesterday."
chunk2 = "We had a picnic on the grassy bank beside the river."

emb1 = encoder.encode(chunk1)  # Financial context captured
emb2 = encoder.encode(chunk2)  # Nature context captured

# These embeddings are very different despite both containing 'bank'

24.2.3 The Transformation Process

Here’s what happens when you embed a chunk:

Input: "The quarterly financial report shows revenue increased
        by 15% compared to last year, driven primarily by
        strong performance in the cloud services division."

                    ↓ Embedding Model (e.g., all-MiniLM-L6-v2)

Output: [0.023, -0.156, 0.089, ..., 0.042]  # 384 dimensions

This single vector encodes:
- Topic: Financial/business reporting
- Sentiment: Positive (increased, strong)
- Entities: Cloud services, quarterly reports
- Relationships: Revenue growth, divisional performance
- Context: Corporate earnings, year-over-year comparison

The embedding model—typically a transformer—processes the entire chunk through attention layers that let every word influence every other word’s representation. The final vector is a learned compression of this contextual understanding.

24.3 Chunking Strategies

Different chunking strategies suit different use cases. Here’s a comprehensive overview:

24.3.1 Fixed-Size Chunking

The simplest approach: split text into chunks of N characters or tokens.

Show Fixed-Size Chunking Implementation
from typing import List

def chunk_by_characters(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
    """Split text into fixed-size character chunks with overlap."""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        if chunk.strip():
            chunks.append(chunk)
        start = end - overlap if overlap < chunk_size else end
    return chunks

def chunk_by_tokens(text: str, chunk_size: int = 256, overlap: int = 25) -> List[str]:
    """Split text into fixed-size token chunks using simple whitespace tokenization."""
    tokens = text.split()  # Simple word-based tokenization
    chunks = []
    start = 0
    while start < len(tokens):
        end = min(start + chunk_size, len(tokens))
        chunk_tokens = tokens[start:end]
        chunk_text = ' '.join(chunk_tokens)
        if chunk_text.strip():
            chunks.append(chunk_text)
        # Ensure start always advances to avoid infinite loop at end of token list
        new_start = end - overlap if overlap < chunk_size else end
        start = new_start if new_start > start else end
    return chunks

# Usage example
text = "Machine learning transforms data processing. " * 50
char_chunks = chunk_by_characters(text, chunk_size=200, overlap=20)
token_chunks = chunk_by_tokens(text, chunk_size=50, overlap=5)
print(f"Character chunking: {len(char_chunks)} chunks")
print(f"Token chunking: {len(token_chunks)} chunks")
Character chunking: 13 chunks
Token chunking: 7 chunks

Pros:

  • Simple to implement and understand
  • Predictable chunk sizes for capacity planning
  • Works for any text without structural assumptions

Cons:

  • Breaks mid-sentence, mid-paragraph, even mid-word
  • No respect for semantic boundaries
  • May split critical information across chunks

When to use: Homogeneous text without clear structure, or as a baseline to compare against smarter strategies.

24.3.2 Sentence-Based Chunking

Split on sentence boundaries, grouping sentences to reach target size.

Show Sentence-Based Chunking
import re
from typing import List

def chunk_by_sentences(text: str, target_size: int = 256, max_size: int = 512) -> List[str]:
    """Group sentences into chunks of approximately target_size words."""
    # Split into sentences
    sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
    sentences = [s.strip() for s in sentences if s.strip()]

    chunks = []
    current_chunk = []
    current_words = 0

    for sentence in sentences:
        sentence_words = len(sentence.split())  # Simple word count
        if current_words + sentence_words > target_size and current_chunk:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
            current_words = 0
        current_chunk.append(sentence)
        current_words += sentence_words

    if current_chunk:
        chunks.append(" ".join(current_chunk))
    return chunks

# Usage example
text = "ML transforms data. Neural networks learn patterns. Deep learning uses layers. Transformers power NLP."
chunks = chunk_by_sentences(text, target_size=10)
print(f"Created {len(chunks)} sentence-based chunks")
Created 2 sentence-based chunks

Pros:

  • Preserves complete thoughts
  • Natural linguistic boundaries
  • Better semantic coherence than fixed-size

Cons:

  • Sentence detection can fail on abbreviations, URLs, code
  • Variable chunk sizes
  • May still split related sentences

When to use: Well-formed prose like articles, documentation, or reports.

24.3.3 Paragraph-Based Chunking

Use paragraph breaks as natural semantic boundaries.

Show Paragraph-Based Chunking
import re
from typing import List

def chunk_by_paragraphs(text: str, min_chunk_size: int = 100, max_chunk_size: int = 500) -> List[str]:
    """Split text on paragraph boundaries, combining short paragraphs."""
    paragraphs = re.split(r'\n\s*\n', text)
    paragraphs = [p.strip() for p in paragraphs if p.strip()]

    chunks = []
    current_chunk = []
    current_size = 0

    for para in paragraphs:
        para_size = len(para)
        if current_size + para_size <= max_chunk_size or not current_chunk:
            current_chunk.append(para)
            current_size += para_size
        else:
            chunks.append('\n\n'.join(current_chunk))
            current_chunk = [para]
            current_size = para_size

    if current_chunk:
        chunks.append('\n\n'.join(current_chunk))
    return chunks

# Usage example
text = "First paragraph here.\n\nSecond paragraph here.\n\nThird paragraph text."
chunks = chunk_by_paragraphs(text)
print(f"Created {len(chunks)} paragraph-based chunks")
Created 1 paragraph-based chunks

Pros:

  • Authors create paragraphs around coherent ideas
  • Strongest natural semantic boundaries
  • Often ideal chunk size naturally

Cons:

  • Paragraph length varies wildly
  • Some documents lack clear paragraphs
  • Very short paragraphs may lack context

When to use: Well-structured documents with clear paragraph formatting.

24.3.4 Semantic Chunking

Split based on topic shifts detected by embedding similarity.

Show Semantic Chunking
from typing import List
import re

# Check for optional dependencies
try:
    import numpy as np
    from sentence_transformers import SentenceTransformer
    HAS_SEMANTIC_DEPS = True
except ImportError:
    HAS_SEMANTIC_DEPS = False

def semantic_chunk(text: str, similarity_threshold: float = 0.5) -> List[str]:
    """Split text at semantic boundaries using embedding similarity."""
    # Split into sentences first
    sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    if len(sentences) < 2:
        return [text]

    if not HAS_SEMANTIC_DEPS:
        # Fallback: simple sentence grouping without embeddings
        chunks = []
        for i in range(0, len(sentences), 2):
            chunk = " ".join(sentences[i:i+2])
            chunks.append(chunk)
        return chunks

    # Embed sentences
    model = SentenceTransformer("all-MiniLM-L6-v2")
    embeddings = model.encode(sentences)

    # Calculate similarities between consecutive sentences
    similarities = []
    for i in range(len(embeddings) - 1):
        sim = np.dot(embeddings[i], embeddings[i + 1]) / (
            np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i + 1])
        )
        similarities.append(sim)

    # Split where similarity drops
    chunks = []
    current_chunk = [sentences[0]]
    for sentence, sim in zip(sentences[1:], similarities):
        if sim < similarity_threshold:
            chunks.append(" ".join(current_chunk))
            current_chunk = [sentence]
        else:
            current_chunk.append(sentence)
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    return chunks

# Usage example
text = "ML enables learning. AI powers systems. Dogs are animals. Cats like milk."
chunks = semantic_chunk(text, similarity_threshold=0.6)
print(f"Created {len(chunks)} semantic chunks")
if not HAS_SEMANTIC_DEPS:
    print("(Using fallback mode - install sentence-transformers for full functionality)")

Pros:

  • Chunks align with actual topic boundaries
  • Captures semantic coherence directly
  • Adapts to content structure

Cons:

  • Computationally expensive (requires embedding each sentence)
  • Threshold tuning required
  • May create very uneven chunk sizes

When to use: Documents with multiple topics, transcripts, or content without clear structural markers.

24.3.5 Recursive/Hierarchical Chunking

Try multiple splitters in order of preference, falling back as needed.

Show Recursive/Hierarchical Chunking
from typing import List, Optional

class RecursiveChunker:
    """Recursively split text using a hierarchy of separators."""
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50, separators: Optional[List[str]] = None):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or ["\n\n", "\n", ". ", ", ", " "]

    def chunk(self, text: str) -> List[str]:
        """Split text recursively using separator hierarchy."""
        return self._recursive_split(text, self.separators)

    def _recursive_split(self, text: str, separators: List[str]) -> List[str]:
        """Recursively split text, trying separators in order."""
        if len(text) <= self.chunk_size:
            return [text] if text.strip() else []
        if not separators:
            return self._force_split(text)

        current_sep = separators[0]
        remaining_seps = separators[1:]
        splits = text.split(current_sep)

        if len(splits) == 1:
            return self._recursive_split(text, remaining_seps)

        chunks = []
        current_chunk = []
        current_length = 0

        for split in splits:
            split_length = len(split) + len(current_sep)
            if current_length + split_length > self.chunk_size and current_chunk:
                chunks.append(current_sep.join(current_chunk))
                overlap_text = self._get_overlap(current_chunk, current_sep)
                current_chunk = [overlap_text] if overlap_text else []
                current_length = len(overlap_text) if overlap_text else 0
            current_chunk.append(split)
            current_length += split_length

        if current_chunk:
            remaining = current_sep.join(current_chunk)
            if len(remaining) > self.chunk_size:
                chunks.extend(self._recursive_split(remaining, remaining_seps))
            elif remaining.strip():
                chunks.append(remaining)
        return chunks

    def _get_overlap(self, parts: List[str], sep: str) -> str:
        """Get overlap text from the end of current chunk."""
        if not self.chunk_overlap or not parts:
            return ""
        overlap_parts = []
        overlap_length = 0
        for part in reversed(parts):
            if overlap_length + len(part) > self.chunk_overlap:
                break
            overlap_parts.insert(0, part)
            overlap_length += len(part) + len(sep)
        return sep.join(overlap_parts)

    def _force_split(self, text: str) -> List[str]:
        """Force split text at chunk_size boundaries."""
        chunks = []
        start = 0
        while start < len(text):
            end = start + self.chunk_size
            chunk = text[start:end]
            if chunk.strip():
                chunks.append(chunk)
            start = end - self.chunk_overlap
        return chunks

# Usage example
text = "ML enables learning. AI powers systems. " * 20
chunker = RecursiveChunker(chunk_size=200, chunk_overlap=20)
chunks = chunker.chunk(text)
print(f"Created {len(chunks)} chunks from text")
Created 5 chunks from text

Pros:

  • Respects document hierarchy (sections → paragraphs → sentences)
  • Graceful degradation for messy documents
  • Flexible target sizes

Cons:

  • More complex implementation
  • Order of separators matters
  • May still produce uneven chunks

When to use: Documents with mixed structure, or when you need consistent chunk sizes with best-effort boundary respect.

24.3.6 Sliding Window with Overlap

Create overlapping chunks to preserve context at boundaries.

Show Sliding Window Chunking
from dataclasses import dataclass
from typing import List

@dataclass
class ChunkWithMetadata:
    """A chunk with position metadata for deduplication."""
    text: str
    start_char: int
    end_char: int
    chunk_index: int

def sliding_window_chunks(text: str, window_size: int = 500, stride: int = 400) -> List[ChunkWithMetadata]:
    """
    Create overlapping chunks using a sliding window.

    Args:
        window_size: Size of each chunk in characters
        stride: How far to move the window (overlap = window_size - stride)
    """
    if stride > window_size:
        stride = window_size

    chunks = []
    start = 0
    chunk_index = 0

    while start < len(text):
        end = min(start + window_size, len(text))
        chunk_text = text[start:end].strip()

        if chunk_text:
            chunks.append(ChunkWithMetadata(
                text=chunk_text, start_char=start, end_char=end, chunk_index=chunk_index
            ))
            chunk_index += 1
        start += stride

    return chunks

def calculate_overlap(start1: int, end1: int, start2: int, end2: int) -> float:
    """Calculate overlap ratio between two ranges."""
    overlap_start = max(start1, start2)
    overlap_end = min(end1, end2)
    if overlap_start >= overlap_end:
        return 0.0
    overlap_length = overlap_end - overlap_start
    min_length = min(end1 - start1, end2 - start2)
    return overlap_length / min_length

# Usage example
text = "Machine learning transforms data. Neural networks learn patterns. " * 10
chunks = sliding_window_chunks(text, window_size=200, stride=150)
print(f"Created {len(chunks)} overlapping chunks")
if len(chunks) > 1:
    overlap = calculate_overlap(chunks[0].start_char, chunks[0].end_char,
                                chunks[1].start_char, chunks[1].end_char)
    print(f"Overlap between chunks: {overlap:.1%}")
Created 5 overlapping chunks
Overlap between chunks: 25.0%

Pros:

  • Information at chunk boundaries appears in multiple chunks
  • Reduces risk of splitting critical context
  • Better recall for boundary-spanning queries

Cons:

  • Increases storage requirements (overlap percentage)
  • May retrieve duplicate information
  • Requires deduplication in results

When to use: When retrieval quality matters more than storage efficiency, especially for dense technical content.

24.4 Document-Type Specific Strategies

Different document types require different chunking approaches:

24.4.1 PDF Documents

PDFs present unique challenges: headers/footers on every page, multi-column layouts, embedded tables, and inconsistent text extraction.

Show PDF Chunking
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional

# Check for optional PDF dependency
try:
    import fitz  # PyMuPDF
    HAS_PYMUPDF = True
except ImportError:
    HAS_PYMUPDF = False

@dataclass
class PDFChunk:
    """A chunk extracted from a PDF with metadata."""
    text: str
    page_numbers: List[int]
    section_title: Optional[str] = None
    chunk_type: str = "text"
    metadata: Dict = field(default_factory=dict)

class PDFChunker:
    """Extract and chunk text from PDFs while preserving structure."""
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50,
                 remove_headers_footers: bool = True, detect_sections: bool = True):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.remove_headers_footers = remove_headers_footers
        self.detect_sections = detect_sections

    def chunk_pdf(self, pdf_path: str) -> List[PDFChunk]:
        """Extract and chunk a PDF document."""
        if not HAS_PYMUPDF:
            raise ImportError("PyMuPDF (fitz) is required for PDF processing. "
                            "Install with: pip install PyMuPDF")
        doc = fitz.open(pdf_path)
        chunks = []
        current_section = None

        for page_num, page in enumerate(doc):
            text = page.get_text("text")
            if self.remove_headers_footers:
                text = self._remove_headers_footers(text, page_num, len(doc))
            if self.detect_sections:
                sections = self._detect_sections(text)
                for section_title, section_text in sections:
                    current_section = section_title or current_section
                    page_chunks = self._chunk_text(section_text, page_num + 1, current_section)
                    chunks.extend(page_chunks)
            else:
                page_chunks = self._chunk_text(text, page_num + 1, current_section)
                chunks.extend(page_chunks)
        doc.close()
        return self._merge_small_chunks(chunks)

    def _detect_sections(self, text: str) -> List[tuple]:
        """Detect section headers and split text accordingly."""
        header_patterns = [
            r"^(?:Chapter\s+)?(\d+\.?\s+[A-Z][^\n]+)$",
            r"^([A-Z][A-Z\s]+)$",
        ]
        sections = []
        current_title = None
        current_text = []
        for line in text.split("\n"):
            is_header = False
            for pattern in header_patterns:
                match = re.match(pattern, line.strip())
                if match:
                    if current_text:
                        sections.append((current_title, "\n".join(current_text)))
                    current_title = match.group(1)
                    current_text = []
                    is_header = True
                    break
            if not is_header:
                current_text.append(line)
        if current_text:
            sections.append((current_title, "\n".join(current_text)))
        return sections if sections else [(None, text)]

    def _remove_headers_footers(self, text: str, page_num: int, total_pages: int) -> str:
        """Remove common header/footer patterns."""
        lines = text.split("\n")
        filtered = []
        for i, line in enumerate(lines):
            if i < 3 and len(line.strip()) < 50 and not any(c.islower() for c in line):
                continue
            filtered.append(line)
        return "\n".join(filtered)

    def _chunk_text(self, text: str, page_num: int, section_title: Optional[str]) -> List[PDFChunk]:
        """Chunk text into appropriately sized pieces."""
        # Simple chunking for example
        chunks = []
        words = text.split()
        current_chunk = []
        for word in words:
            current_chunk.append(word)
            if len(" ".join(current_chunk)) >= self.chunk_size:
                chunks.append(PDFChunk(
                    text=" ".join(current_chunk),
                    page_numbers=[page_num],
                    section_title=section_title
                ))
                current_chunk = []
        if current_chunk:
            chunks.append(PDFChunk(
                text=" ".join(current_chunk),
                page_numbers=[page_num],
                section_title=section_title
            ))
        return chunks

    def _merge_small_chunks(self, chunks: List[PDFChunk]) -> List[PDFChunk]:
        """Merge chunks that are too small."""
        if not chunks:
            return chunks
        merged = []
        current = chunks[0]
        for next_chunk in chunks[1:]:
            can_merge = (current.section_title == next_chunk.section_title and
                        len(current.text) + len(next_chunk.text) < self.chunk_size)
            if can_merge:
                current = PDFChunk(
                    text=current.text + "\n\n" + next_chunk.text,
                    page_numbers=list(set(current.page_numbers + next_chunk.page_numbers)),
                    section_title=current.section_title
                )
            else:
                merged.append(current)
                current = next_chunk
        merged.append(current)
        return merged

# Usage example
if HAS_PYMUPDF:
    print("PDFChunker ready for processing PDF documents with structure preservation")
else:
    print("PDFChunker defined (install PyMuPDF for PDF processing: pip install PyMuPDF)")

24.4.2 HTML Documents

HTML carries structural information that aids chunking:

Show HTML Chunking
import re
from dataclasses import dataclass
from typing import List, Optional

# Check for optional HTML parsing dependency
try:
    from bs4 import BeautifulSoup
    HAS_BS4 = True
except ImportError:
    HAS_BS4 = False

@dataclass
class HTMLChunk:
    """A chunk extracted from HTML with metadata."""
    text: str
    tag_path: str
    heading: Optional[str] = None

class HTMLChunker:
    """Extract and chunk text from HTML while preserving semantic structure."""
    BLOCK_TAGS = {"article", "section", "div", "p", "blockquote", "li"}
    SECTION_TAGS = {"article", "section", "main", "aside"}
    HEADING_TAGS = ["h1", "h2", "h3", "h4", "h5", "h6"]

    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50,
                 preserve_structure: bool = True, include_headings: bool = True):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.preserve_structure = preserve_structure
        self.include_headings = include_headings

    def chunk_html(self, html: str) -> List[HTMLChunk]:
        """Extract and chunk HTML content."""
        if not HAS_BS4:
            raise ImportError("BeautifulSoup is required for HTML processing. "
                            "Install with: pip install beautifulsoup4")
        soup = BeautifulSoup(html, "html.parser")

        # Remove script and style elements
        for element in soup(["script", "style", "nav", "footer", "header"]):
            element.decompose()

        if self.preserve_structure:
            return self._chunk_by_structure(soup)
        else:
            return self._chunk_flat(soup)

    def _chunk_by_structure(self, soup) -> List[HTMLChunk]:
        """Chunk based on HTML structure (sections, articles, etc.)."""
        chunks = []
        current_heading = None

        main_content = (soup.find("main") or soup.find("article") or
                       soup.find("div", class_=re.compile(r"content|main|article")) or
                       soup.body or soup)

        for section in self._find_sections(main_content):
            section_heading = self._extract_heading(section)
            if section_heading:
                current_heading = section_heading

            section_text = self._extract_text(section)
            if not section_text.strip():
                continue

            section_chunks = self._split_text(section_text, current_heading,
                                              self._get_tag_path(section))
            chunks.extend(section_chunks)

        return chunks

    def _chunk_flat(self, soup) -> List[HTMLChunk]:
        """Simple flat chunking of all text content."""
        text = soup.get_text(separator="\n", strip=True)
        return self._split_text(text, None, "body")

    def _find_sections(self, element):
        """Find content sections in the HTML."""
        sections = element.find_all(self.SECTION_TAGS)
        if sections:
            yield from sections
        else:
            for child in element.children:
                if hasattr(child, "name") and child.name in self.BLOCK_TAGS:
                    yield child

    def _extract_heading(self, element) -> Optional[str]:
        """Extract the heading for a section."""
        for tag in self.HEADING_TAGS:
            heading = element.find(tag)
            if heading:
                return heading.get_text(strip=True)
        return None

    def _extract_text(self, element) -> str:
        """Extract clean text from an element."""
        text = element.get_text(separator="\n", strip=True)
        return re.sub(r"\n{3,}", "\n\n", text)

    def _get_tag_path(self, element) -> str:
        """Get the tag path to an element."""
        path = []
        current = element
        while current and hasattr(current, "name") and current.name:
            tag_info = current.name
            if current.get("id"):
                tag_info += f"#{current['id']}"
            path.insert(0, tag_info)
            current = current.parent
        return " > ".join(path[-4:])

    def _split_text(self, text: str, heading: Optional[str], tag_path: str) -> List[HTMLChunk]:
        """Split text into chunks."""
        if self.include_headings and heading:
            text = f"## {heading}\n\n{text}"

        chunks = []
        words = text.split()
        current_chunk = []
        for word in words:
            current_chunk.append(word)
            if len(" ".join(current_chunk)) >= self.chunk_size:
                chunks.append(HTMLChunk(
                    text=" ".join(current_chunk),
                    tag_path=tag_path,
                    heading=heading
                ))
                current_chunk = []
        if current_chunk:
            chunks.append(HTMLChunk(
                text=" ".join(current_chunk),
                tag_path=tag_path,
                heading=heading
            ))
        return chunks

# Usage example
if HAS_BS4:
    print("HTMLChunker ready for processing HTML documents")
else:
    print("HTMLChunker defined (install beautifulsoup4 for HTML processing: pip install beautifulsoup4)")

24.4.3 Markdown Documents

Markdown headers provide explicit hierarchy:

Show Markdown Chunking
import re
from dataclasses import dataclass
from typing import List

@dataclass
class MarkdownChunk:
    """A chunk from a Markdown document with context."""
    text: str
    header_hierarchy: List[str]
    header_level: int
    start_line: int
    end_line: int

class MarkdownChunker:
    """Chunk Markdown documents while preserving header hierarchy."""
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50,
                 include_header_context: bool = True, min_chunk_size: int = 100):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.include_header_context = include_header_context
        self.min_chunk_size = min_chunk_size

    def chunk_markdown(self, markdown: str) -> List[MarkdownChunk]:
        """Chunk a Markdown document."""
        lines = markdown.split("\n")
        sections = self._parse_sections(lines)
        chunks = self._chunk_sections(sections)
        return chunks

    def _parse_sections(self, lines: List[str]) -> List[dict]:
        """Parse Markdown into sections based on headers."""
        sections = []
        current_section = {"headers": [], "content": [], "start_line": 0, "level": 0}
        header_stack = []

        for i, line in enumerate(lines):
            header_match = re.match(r"^(#{1,6})\s+(.+)$", line)

            if header_match:
                if current_section["content"]:
                    current_section["end_line"] = i - 1
                    sections.append(current_section)

                level = len(header_match.group(1))
                title = header_match.group(2).strip()

                while header_stack and header_stack[-1][0] >= level:
                    header_stack.pop()

                header_stack.append((level, title))

                current_section = {
                    "headers": [h[1] for h in header_stack],
                    "content": [line],
                    "start_line": i,
                    "level": level,
                }
            else:
                current_section["content"].append(line)

        if current_section["content"]:
            current_section["end_line"] = len(lines) - 1
            sections.append(current_section)

        return sections

    def _chunk_sections(self, sections: List[dict]) -> List[MarkdownChunk]:
        """Chunk each section into appropriately sized pieces."""
        chunks = []

        for section in sections:
            content = "\n".join(section["content"])

            if self.include_header_context and section["headers"]:
                header_context = " > ".join(section["headers"]) + "\n\n"
            else:
                header_context = ""

            if len(content) <= self.chunk_size:
                if content.strip():
                    chunks.append(MarkdownChunk(
                        text=header_context + content if header_context else content,
                        header_hierarchy=section["headers"],
                        header_level=section["level"],
                        start_line=section["start_line"],
                        end_line=section.get("end_line", section["start_line"]),
                    ))
            else:
                sub_chunks = self._split_section(content, header_context)
                for sub_text in sub_chunks:
                    chunks.append(MarkdownChunk(
                        text=sub_text,
                        header_hierarchy=section["headers"],
                        header_level=section["level"],
                        start_line=section["start_line"],
                        end_line=section.get("end_line", section["start_line"]),
                    ))

        return chunks

    def _split_section(self, content: str, header_context: str) -> List[str]:
        """Split a large section into smaller chunks."""
        effective_chunk_size = self.chunk_size - len(header_context)
        chunks = []
        words = content.split()
        current_chunk = []

        for word in words:
            current_chunk.append(word)
            if len(" ".join(current_chunk)) >= effective_chunk_size:
                chunk_text = " ".join(current_chunk)
                chunks.append(header_context + chunk_text if header_context else chunk_text)
                current_chunk = []

        if current_chunk:
            chunk_text = " ".join(current_chunk)
            chunks.append(header_context + chunk_text if header_context else chunk_text)

        return chunks

# Usage example
print("MarkdownChunker ready for processing Markdown documents")
MarkdownChunker ready for processing Markdown documents

24.4.4 Source Code

Code requires special handling to preserve syntactic units:

Show Code Chunking
import ast
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class CodeChunk:
    """A chunk of source code with metadata."""
    code: str
    language: str
    chunk_type: str  # function, class, module, block
    name: Optional[str] = None
    docstring: Optional[str] = None

class CodeChunker:
    """Chunk source code while preserving syntactic structure."""
    def __init__(self, chunk_size: int = 1000, include_docstrings: bool = True,
                 include_imports: bool = True):
        self.chunk_size = chunk_size
        self.include_docstrings = include_docstrings
        self.include_imports = include_imports

    def chunk_python(self, code: str) -> List[CodeChunk]:
        """Chunk Python code using AST parsing."""
        try:
            tree = ast.parse(code)
            return self._chunk_python_ast(code, tree)
        except SyntaxError:
            return []

    def _chunk_python_ast(self, code: str, tree: ast.Module) -> List[CodeChunk]:
        """Extract chunks from Python AST."""
        chunks = []
        lines = code.split("\n")

        # Extract imports as a single chunk
        if self.include_imports:
            imports = self._extract_imports(tree, lines)
            if imports:
                chunks.append(CodeChunk(
                    code=imports, language="python", chunk_type="imports", name="imports"
                ))

        # Extract classes and functions
        for node in ast.walk(tree):
            if isinstance(node, ast.ClassDef):
                chunk = self._extract_class(node, lines)
                if chunk:
                    chunks.append(chunk)
            elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                if not self._is_method(node, tree):
                    chunk = self._extract_function(node, lines)
                    if chunk:
                        chunks.append(chunk)

        return chunks

    def _extract_imports(self, tree: ast.Module, lines: List[str]) -> str:
        """Extract all import statements."""
        import_lines = []
        for node in ast.iter_child_nodes(tree):
            if isinstance(node, (ast.Import, ast.ImportFrom)):
                start = node.lineno - 1
                end = node.end_lineno if hasattr(node, "end_lineno") else start + 1
                import_lines.extend(lines[start:end])
        return "\n".join(import_lines)

    def _extract_class(self, node: ast.ClassDef, lines: List[str]) -> CodeChunk:
        """Extract a class definition."""
        start = node.lineno - 1
        end = node.end_lineno if hasattr(node, "end_lineno") else len(lines)
        code = "\n".join(lines[start:end])
        docstring = ast.get_docstring(node) if self.include_docstrings else None
        return CodeChunk(
            code=code, language="python", chunk_type="class",
            name=node.name, docstring=docstring
        )

    def _extract_function(self, node, lines: List[str]) -> CodeChunk:
        """Extract a function definition."""
        start = node.lineno - 1
        end = node.end_lineno if hasattr(node, "end_lineno") else len(lines)
        code = "\n".join(lines[start:end])
        docstring = ast.get_docstring(node) if self.include_docstrings else None
        return CodeChunk(
            code=code, language="python", chunk_type="function",
            name=node.name, docstring=docstring
        )

    def _is_method(self, node, tree: ast.Module) -> bool:
        """Check if a function is a method inside a class."""
        for parent in ast.walk(tree):
            if isinstance(parent, ast.ClassDef):
                for child in ast.iter_child_nodes(parent):
                    if child is node:
                        return True
        return False

# Usage example
sample_code = '''
import numpy as np

class DataProcessor:
    """Process data."""
    def __init__(self):
        self.data = []

def calculate(x, y):
    """Calculate something."""
    return x + y
'''

chunker = CodeChunker()
chunks = chunker.chunk_python(sample_code)
print(f"Found {len(chunks)} code chunks")
Found 3 code chunks

24.5 Chunk Size Optimization

Choosing optimal chunk size involves balancing competing concerns:

24.5.1 The Trade-off Triangle

                    CONTEXT
                      /\
                     /  \
                    /    \
                   /      \
                  /        \
                 /__________\
           PRECISION      EFFICIENCY

Larger chunks → More context, less precision, fewer chunks
Smaller chunks → Less context, more precision, more chunks

24.5.2 Empirical Sizing Guidelines

Based on production experience across different use cases:

Chunk size recommendations by use case
Use Case Recommended Size Overlap Rationale
Q&A over documentation 256-512 tokens 10-20% Balance context with precision
Legal document search 512-1024 tokens 20-30% Preserve legal context and cross-references
Customer support 128-256 tokens 10% Short, focused answers needed
Academic papers 512-768 tokens 15% Preserve argument flow
Code documentation 256-512 tokens 0% Function/class boundaries are natural
Chat/transcript search 128-256 tokens 20% Conversational turns are short

24.5.3 Finding Your Optimal Size

Show Chunk Size Evaluation
from dataclasses import dataclass
from typing import Dict, List

# Check for numpy
try:
    import numpy as np
    HAS_NUMPY = True
except ImportError:
    HAS_NUMPY = False

@dataclass
class EvaluationResult:
    """Results from chunk size evaluation."""
    chunk_size: int
    num_chunks: int
    avg_chunk_length: float
    retrieval_precision: float
    retrieval_recall: float
    retrieval_f1: float

def evaluate_chunk_sizes(documents: List[str], queries: List[str],
                        ground_truth: List[List[int]],
                        chunk_sizes: List[int] = None, top_k: int = 5) -> List[EvaluationResult]:
    """
    Evaluate retrieval quality across different chunk sizes.

    Args:
        documents: List of documents to chunk and index
        queries: List of test queries
        ground_truth: For each query, indices of relevant documents
        chunk_sizes: List of chunk sizes to evaluate
        top_k: Number of results to retrieve
    """
    if chunk_sizes is None:
        chunk_sizes = [128, 256, 512, 1024]

    results = []
    for chunk_size in chunk_sizes:
        # Simplified evaluation for demo
        result = EvaluationResult(
            chunk_size=chunk_size,
            num_chunks=len(documents) * (1000 // chunk_size),
            avg_chunk_length=chunk_size * 0.9,
            retrieval_precision=0.75,
            retrieval_recall=0.68,
            retrieval_f1=0.71
        )
        results.append(result)
    return results

def analyze_chunk_statistics(chunks: List[str]) -> Dict:
    """Analyze statistics of a chunking result."""
    lengths = [len(c) for c in chunks]
    if HAS_NUMPY:
        return {
            "num_chunks": len(chunks),
            "avg_length": np.mean(lengths),
            "std_length": np.std(lengths),
            "min_length": min(lengths) if lengths else 0,
            "max_length": max(lengths) if lengths else 0,
        }
    else:
        # Fallback without numpy
        avg = sum(lengths) / len(lengths) if lengths else 0
        variance = sum((x - avg) ** 2 for x in lengths) / len(lengths) if lengths else 0
        std = variance ** 0.5
        return {
            "num_chunks": len(chunks),
            "avg_length": avg,
            "std_length": std,
            "min_length": min(lengths) if lengths else 0,
            "max_length": max(lengths) if lengths else 0,
        }

# Usage example
sample_docs = ["ML transforms data. " * 50] * 3
sample_queries = ["What is machine learning?"]
ground_truth = [[0]]

results = evaluate_chunk_sizes(sample_docs, sample_queries, ground_truth,
                               chunk_sizes=[128, 256, 512])
print(f"Evaluated {len(results)} chunk sizes")
for r in results:
    print(f"Size {r.chunk_size}: F1={r.retrieval_f1:.3f}")

24.6 Metadata Preservation

Chunks without context are less useful. Preserve metadata for filtering and context:

Show Metadata Preservation
import hashlib
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional

@dataclass
class ChunkMetadata:
    """Comprehensive metadata for a text chunk."""
    source_id: str
    source_type: str  # pdf, html, markdown, etc.
    source_url: Optional[str] = None
    page_number: Optional[int] = None
    section_title: Optional[str] = None
    section_hierarchy: List[str] = field(default_factory=list)
    start_char: int = 0
    end_char: int = 0
    language: str = "en"
    content_type: str = "text"
    word_count: int = 0
    indexed_at: datetime = field(default_factory=datetime.now)
    custom: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict:
        """Convert to dictionary for storage."""
        return {
            "source_id": self.source_id,
            "source_type": self.source_type,
            "source_url": self.source_url,
            "page_number": self.page_number,
            "section_title": self.section_title,
            "section_hierarchy": self.section_hierarchy,
            "start_char": self.start_char,
            "end_char": self.end_char,
            "language": self.language,
            "content_type": self.content_type,
            "word_count": self.word_count,
            "indexed_at": self.indexed_at.isoformat(),
            "custom": self.custom,
        }

@dataclass
class EnrichedChunk:
    """A chunk with its text, embedding, and metadata."""
    chunk_id: str
    text: str
    metadata: ChunkMetadata
    embedding: Optional[List[float]] = None

    def __post_init__(self):
        if not self.chunk_id:
            self.chunk_id = self._generate_id()

    def _generate_id(self) -> str:
        """Generate a unique ID based on content and source."""
        content = f"{self.metadata.source_id}:{self.metadata.start_char}:{self.text[:100]}"
        return hashlib.sha256(content.encode()).hexdigest()[:16]

# Usage example
metadata = ChunkMetadata(
    source_id="doc_001",
    source_type="markdown",
    section_title="Introduction",
    word_count=42
)
chunk = EnrichedChunk(
    chunk_id="",
    text="Machine learning enables computers to learn from data.",
    metadata=metadata
)
print(f"Created chunk: {chunk.chunk_id}")
print(f"Metadata: {chunk.metadata.to_dict()}")
Created chunk: f896db71a4a94422
Metadata: {'source_id': 'doc_001', 'source_type': 'markdown', 'source_url': None, 'page_number': None, 'section_title': 'Introduction', 'section_hierarchy': [], 'start_char': 0, 'end_char': 0, 'language': 'en', 'content_type': 'text', 'word_count': 42, 'indexed_at': '2025-12-10T16:20:48.587060', 'custom': {}}

24.7 Handling Special Content

24.7.1 Tables

Tables require special handling—row-by-row chunking loses context:

Show Table Chunking
import re
from typing import List, Tuple

def detect_tables(text: str) -> List[Tuple[int, int, str]]:
    """Detect tables in text and return their positions."""
    tables = []
    # Detect Markdown tables
    md_table_pattern = r"(\|[^\n]+\|\n\|[-:| ]+\|\n(?:\|[^\n]+\|\n?)+)"
    for match in re.finditer(md_table_pattern, text):
        tables.append((match.start(), match.end(), match.group(0)))
    return sorted(tables, key=lambda x: x[0])

def parse_markdown_table(table_text: str) -> Tuple[List[str], List[List[str]]]:
    """Parse a Markdown table into headers and rows."""
    lines = [line.strip() for line in table_text.strip().split("\n")]
    if len(lines) < 2:
        return [], []

    # Parse header row
    headers = [cell.strip() for cell in lines[0].split("|")[1:-1]]

    # Skip separator line, parse data rows
    rows = []
    for line in lines[2:]:
        if line.startswith("|"):
            cells = [cell.strip() for cell in line.split("|")[1:-1]]
            rows.append(cells)

    return headers, rows

def table_to_text(headers: List[str], rows: List[List[str]], format: str = "natural") -> str:
    """Convert table to natural language for embedding."""
    if format == "natural":
        lines = []
        for row in rows:
            parts = []
            for header, value in zip(headers, row):
                if value and value != "-":
                    parts.append(f"{header} is {value}")
            if parts:
                lines.append(". ".join(parts) + ".")
        return "\n".join(lines)
    else:
        # Keep as markdown
        return (f"| {' | '.join(headers)} |\n" +
                f"|{'|'.join(['---'] * len(headers))}|\n" +
                "\n".join(f"| {' | '.join(row)} |" for row in rows))

# Usage example
sample_table = """
| Framework | Language | GPU Support |
|-----------|----------|-------------|
| TensorFlow | Python | Excellent |
| PyTorch | Python | Excellent |
| JAX | Python | Excellent |
"""

headers, rows = parse_markdown_table(sample_table)
natural_text = table_to_text(headers, rows, format="natural")
print("Table converted to natural language:")
print(natural_text)
Table converted to natural language:
Framework is TensorFlow. Language is Python. GPU Support is Excellent.
Framework is PyTorch. Language is Python. GPU Support is Excellent.
Framework is JAX. Language is Python. GPU Support is Excellent.

24.7.2 Lists

Numbered and bulleted lists should stay together when possible:

Show List Chunking
import re
from dataclasses import dataclass
from typing import List

@dataclass
class ListBlock:
    """A detected list block in text."""
    start_pos: int
    end_pos: int
    text: str
    list_type: str  # 'bullet', 'numbered'
    items: List[str]

def detect_lists(text: str) -> List[ListBlock]:
    """Detect list structures in text."""
    lists = []

    # Bullet list pattern
    bullet_pattern = r"((?:^[ \t]*[-*•][ \t]+.+$\n?)+)"

    # Numbered list pattern
    numbered_pattern = r"((?:^[ \t]*(?:\d+\.|[a-z]\.)[ \t]+.+$\n?)+)"

    for pattern, list_type in [(bullet_pattern, "bullet"), (numbered_pattern, "numbered")]:
        for match in re.finditer(pattern, text, re.MULTILINE):
            items = parse_list_items(match.group(0), list_type)
            lists.append(ListBlock(
                start_pos=match.start(),
                end_pos=match.end(),
                text=match.group(0),
                list_type=list_type,
                items=items,
            ))

    return sorted(lists, key=lambda x: x.start_pos)

def parse_list_items(list_text: str, list_type: str) -> List[str]:
    """Parse individual items from a list block."""
    if list_type == "bullet":
        pattern = r"^[ \t]*[-*•][ \t]+(.+)$"
    else:  # numbered
        pattern = r"^[ \t]*(?:\d+\.|[a-z]\.)[ \t]+(.+)$"

    items = []
    for match in re.finditer(pattern, list_text, re.MULTILINE):
        items.append(match.group(1).strip())

    return items

# Usage example
sample_text = """
Machine learning algorithms include:

- Linear Regression: Used for predicting continuous values
- Logistic Regression: Used for binary classification
- Decision Trees: Tree-based models
- Random Forests: Ensemble of decision trees
- Neural Networks: Deep learning models

Each has different use cases.
"""

lists = detect_lists(sample_text)
print(f"Found {len(lists)} lists")
for lst in lists:
    print(f"List type: {lst.list_type}, {len(lst.items)} items")
    for item in lst.items[:2]:
        print(f"  - {item}")
Found 1 lists
List type: bullet, 5 items
  - Linear Regression: Used for predicting continuous values
  - Logistic Regression: Used for binary classification

24.7.3 Code Blocks

Code embedded in documentation needs preservation:

Show Code Block Handling
import re
from dataclasses import dataclass
from typing import List, Tuple

@dataclass
class CodeBlock:
    """A code block extracted from documentation."""
    start_pos: int
    end_pos: int
    code: str
    language: str
    preceding_context: str = ""

def extract_code_blocks(text: str) -> Tuple[str, List[CodeBlock]]:
    """Extract fenced code blocks and replace with placeholders."""
    code_blocks = []
    placeholder_template = "<<<CODE_BLOCK_{}>>>"

    def replace_block(match):
        index = len(code_blocks)
        language = match.group(1) or "text"
        code = match.group(2)

        # Get preceding line for context
        start = match.start()
        preceding = text[max(0, start - 200):start]
        last_line = preceding.split("\n")[-1].strip()

        code_blocks.append(CodeBlock(
            start_pos=match.start(),
            end_pos=match.end(),
            code=code,
            language=language,
            preceding_context=last_line,
        ))
        return placeholder_template.format(index)

    # Match fenced code blocks
    pattern = r"```(\w*)\n(.*?)```"
    text_with_placeholders = re.sub(pattern, replace_block, text, flags=re.DOTALL)

    return text_with_placeholders, code_blocks

def restore_code_blocks(chunks: List[str], code_blocks: List[CodeBlock],
                       format: str = "inline") -> List[str]:
    """Restore code blocks to chunks."""
    placeholder_pattern = r"<<<CODE_BLOCK_(\d+)>>>"

    restored = []
    for chunk in chunks:
        matches = list(re.finditer(placeholder_pattern, chunk))

        if not matches:
            restored.append(chunk)
            continue

        result = chunk
        for match in reversed(matches):
            index = int(match.group(1))
            block = code_blocks[index]

            if format == "inline":
                replacement = f"```{block.language}\n{block.code}```"
            elif format == "reference":
                replacement = f"[Code block: {block.language}]"
            else:
                replacement = block.code

            result = result[:match.start()] + replacement + result[match.end():]

        restored.append(result)

    return restored

# Usage example
backticks = '`' * 3  # Avoid literal ``` which breaks Quarto parsing
sample_doc = f"""
Here's how to create embeddings:

{backticks}python
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(['Hello', 'World'])
{backticks}

This creates vector representations.
"""

text_with_placeholders, blocks = extract_code_blocks(sample_doc)
print(f"Extracted {len(blocks)} code blocks")
print(f"Text with placeholders: {text_with_placeholders[:100]}...")
Extracted 1 code blocks
Text with placeholders: 
Here's how to create embeddings:

<<<CODE_BLOCK_0>>>

This creates vector representations.
...

24.8 Production Chunking Pipeline

Putting it all together into a production-ready pipeline:

Show Production Chunking Pipeline
import hashlib
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional

class DocumentType(Enum):
    """Supported document types."""
    PLAIN_TEXT = "text"
    MARKDOWN = "markdown"
    HTML = "html"
    PDF = "pdf"
    CODE = "code"

@dataclass
class ProcessedChunk:
    """A fully processed chunk ready for embedding."""
    chunk_id: str
    text: str
    document_id: str
    chunk_index: int
    metadata: Dict[str, Any] = field(default_factory=dict)
    embedding: Optional[List[float]] = None

@dataclass
class PipelineConfig:
    """Configuration for the chunking pipeline."""
    chunk_size: int = 500
    chunk_overlap: int = 50
    min_chunk_size: int = 50
    preserve_structure: bool = True
    include_metadata: bool = True
    deduplicate: bool = True
    min_word_count: int = 10
    max_word_count: int = 2000

class ChunkingPipeline:
    """Production chunking pipeline with quality filtering and metadata enrichment."""
    def __init__(self, config: Optional[PipelineConfig] = None):
        self.config = config or PipelineConfig()

    def process_document(self, content: str, document_id: str,
                        document_type: Optional[DocumentType] = None,
                        source_metadata: Optional[Dict] = None) -> List[ProcessedChunk]:
        """Process a single document through the full pipeline."""
        if document_type is None:
            document_type = self._detect_type(content)

        # Simplified chunking for demo
        words = content.split()
        chunks = []
        current_chunk = []

        for word in words:
            current_chunk.append(word)
            if len(" ".join(current_chunk)) >= self.config.chunk_size:
                chunks.append(" ".join(current_chunk))
                current_chunk = []

        if current_chunk:
            chunks.append(" ".join(current_chunk))

        # Filter and enrich
        filtered = [c for c in chunks if len(c.split()) >= self.config.min_word_count]

        processed = []
        for i, text in enumerate(filtered):
            chunk_id = hashlib.sha256(f"{document_id}:{i}:{text[:50]}".encode()).hexdigest()[:16]
            metadata = {
                "document_type": document_type.value,
                "chunk_index": i,
                "word_count": len(text.split()),
                "processed_at": datetime.now().isoformat(),
            }
            processed.append(ProcessedChunk(
                chunk_id=chunk_id,
                text=text,
                document_id=document_id,
                chunk_index=i,
                metadata=metadata,
            ))

        return processed

    def _detect_type(self, content: str) -> DocumentType:
        """Detect document type from content."""
        if content.strip().startswith("#") or "```" in content:
            return DocumentType.MARKDOWN
        if "<html" in content.lower():
            return DocumentType.HTML
        return DocumentType.PLAIN_TEXT

# Usage example
config = PipelineConfig(chunk_size=200, min_word_count=10)
pipeline = ChunkingPipeline(config)

sample_doc = "Machine learning transforms data processing. " * 50
chunks = pipeline.process_document(sample_doc, "doc_001")
print(f"Processed {len(chunks)} chunks")
for chunk in chunks[:2]:
    print(f"  Chunk {chunk.chunk_index}: {chunk.metadata['word_count']} words")
Processed 11 chunks
  Chunk 0: 23 words
  Chunk 1: 23 words

24.9 Evaluating Chunk Quality

How do you know if your chunking strategy is working?

24.9.1 Retrieval Quality Metrics

Show Chunk Quality Evaluation
from dataclasses import dataclass
from typing import Dict, List, Optional

# Check for numpy
try:
    import numpy as np
    HAS_NUMPY = True
except ImportError:
    HAS_NUMPY = False

def _mean(values: List[float]) -> float:
    """Calculate mean without numpy."""
    return sum(values) / len(values) if values else 0.0

def _std(values: List[float]) -> float:
    """Calculate standard deviation without numpy."""
    if not values:
        return 0.0
    avg = _mean(values)
    variance = sum((x - avg) ** 2 for x in values) / len(values)
    return variance ** 0.5

@dataclass
class ChunkQualityMetrics:
    """Quality metrics for a set of chunks."""
    avg_chunk_size: float
    std_chunk_size: float
    min_chunk_size: int
    max_chunk_size: int
    avg_word_count: float
    unique_terms_ratio: float
    precision_at_k: Optional[float] = None
    recall_at_k: Optional[float] = None
    mrr: Optional[float] = None  # Mean Reciprocal Rank

def evaluate_chunk_quality(chunks: List[str], queries: Optional[List[str]] = None,
                          ground_truth: Optional[List[List[int]]] = None,
                          k: int = 5) -> ChunkQualityMetrics:
    """Evaluate the quality of a chunking strategy."""
    # Size metrics
    sizes = [len(c) for c in chunks]
    word_counts = [len(c.split()) for c in chunks]

    # Unique terms ratio
    all_terms = []
    for chunk in chunks:
        all_terms.extend(chunk.lower().split())
    unique_ratio = len(set(all_terms)) / len(all_terms) if all_terms else 0

    if HAS_NUMPY:
        avg_size = np.mean(sizes)
        std_size = np.std(sizes)
        avg_words = np.mean(word_counts)
    else:
        avg_size = _mean(sizes)
        std_size = _std(sizes)
        avg_words = _mean(word_counts)

    return ChunkQualityMetrics(
        avg_chunk_size=avg_size,
        std_chunk_size=std_size,
        min_chunk_size=min(sizes) if sizes else 0,
        max_chunk_size=max(sizes) if sizes else 0,
        avg_word_count=avg_words,
        unique_terms_ratio=unique_ratio,
    )

def suggest_improvements(metrics: ChunkQualityMetrics) -> List[str]:
    """Suggest improvements based on quality metrics."""
    suggestions = []

    if metrics.std_chunk_size > metrics.avg_chunk_size * 0.5:
        suggestions.append(
            "High chunk size variance detected. Consider using fixed-size chunking."
        )

    if metrics.avg_chunk_size < 100:
        suggestions.append(
            "Chunks are very small. Consider increasing chunk size to preserve more context."
        )

    if metrics.avg_chunk_size > 1000:
        suggestions.append(
            "Chunks are quite large. Consider reducing chunk size for better precision."
        )

    if metrics.unique_terms_ratio < 0.3:
        suggestions.append(
            "Low unique terms ratio indicates repetitive content. Consider deduplication."
        )

    return suggestions

# Usage example
sample_chunks = [
    "Machine learning is a subset of artificial intelligence.",
    "Neural networks are inspired by biological neurons.",
    "Deep learning uses multiple layers for feature extraction.",
]

metrics = evaluate_chunk_quality(sample_chunks)
print("Chunk Quality Metrics:")
print(f"  Avg chunk size: {metrics.avg_chunk_size:.0f} chars")
print(f"  Avg word count: {metrics.avg_word_count:.1f}")
print(f"  Unique terms ratio: {metrics.unique_terms_ratio:.2%}")

improvements = suggest_improvements(metrics)
if improvements:
    print("\nSuggested Improvements:")
    for suggestion in improvements:
        print(f"  - {suggestion}")

24.9.2 Common Failure Patterns

Chunking troubleshooting guide
Symptom Likely Cause Solution
Relevant info not retrieved Chunks too large, query buried Reduce chunk size
Retrieved chunks lack context Chunks too small Increase chunk size or overlap
Duplicate information in results Too much overlap Reduce overlap, add deduplication
Poor performance on tables Tables split incorrectly Use table-aware chunking
Code examples broken Split mid-function Use AST-aware code chunking
Headers orphaned from content Structural chunking too aggressive Keep headers with following content

24.10 Key Takeaways

  • RAG systems embed chunks, not words: Modern embedding models process entire passages to create single vectors that capture contextual meaning—this is fundamentally different from word embeddings like Word2Vec

  • Chunking directly impacts retrieval quality: Poor chunking strategies lead to poor results regardless of embedding model quality; it’s often the highest-leverage optimization available

  • Match strategy to content type: Fixed-size for unstructured text, sentence-based for prose, paragraph-based for well-formatted documents, semantic chunking for topic-diverse content, and recursive chunking for mixed-structure documents

  • Overlap prevents boundary information loss: 10-20% overlap ensures information at chunk boundaries appears in multiple chunks, improving recall at modest storage cost

  • Preserve metadata for filtering and context: Source document, section headers, page numbers, and timestamps enable hybrid search and help users understand retrieved content

  • Evaluate empirically on your data: Optimal chunk size depends on your specific content and queries; use evaluation frameworks to compare strategies systematically

24.11 Looking Ahead

This chapter covered text chunking—preparing documents for embedding. Chapter 25 explores the parallel challenge for visual data: how to prepare images for embedding systems, including preprocessing, region extraction, and handling large-scale imagery like satellite photos and medical scans.

24.12 Further Reading

  • Langchain Documentation: “Text Splitters” - Comprehensive guide to chunking implementations
  • Liu, N., et al. (2023). “Lost in the Middle: How Language Models Use Long Contexts.” arXiv:2307.03172
  • Shi, W., et al. (2023). “REPLUG: Retrieval-Augmented Black-Box Language Models.” arXiv:2301.12652
  • Gao, L., et al. (2023). “Precise Zero-Shot Dense Retrieval without Relevance Labels.” arXiv:2212.10496
  • Robertson, S., and Zaragoza, H. (2009). “The Probabilistic Relevance Framework: BM25 and Beyond.” Foundations and Trends in Information Retrieval