""" chunker.py — Token-aware sliding-window text chunker. Splits document text into overlapping chunks of 500–800 tokens, preferring paragraph / heading boundaries over hard cuts. """ from __future__ import annotations import re from dataclasses import dataclass import tiktoken # --------------------------------------------------------------------------- # Types # --------------------------------------------------------------------------- @dataclass class Chunk: chunk_index: int content: str token_count: int metadata: dict # heading path, start_char, end_char, etc. # --------------------------------------------------------------------------- # Tokeniser # --------------------------------------------------------------------------- # cl100k_base works for most modern models; nomic-embed-text is BPE-compatible _TOKENISER = tiktoken.get_encoding('cl100k_base') def _count_tokens(text: str) -> int: return len(_TOKENISER.encode(text, disallowed_special=())) def _tokenise(text: str) -> list[int]: return _TOKENISER.encode(text, disallowed_special=()) def _decode(tokens: list[int]) -> str: return _TOKENISER.decode(tokens) # --------------------------------------------------------------------------- # Splitter helpers # --------------------------------------------------------------------------- _HEADING_RE = re.compile(r'^(#{1,6}\s.+)$', re.MULTILINE) _PARA_SEP = re.compile(r'\n{2,}') def _split_semantic_blocks(text: str) -> list[tuple[str, str]]: """ Split text into (heading_path, block_text) tuples at heading / paragraph boundaries. This is used to build chunks that respect document structure. """ blocks: list[tuple[str, str]] = [] current_heading = '' current_parts: list[str] = [] for para in _PARA_SEP.split(text): para = para.strip() if not para: continue heading_match = _HEADING_RE.match(para) if heading_match: # Flush current accumulation if current_parts: blocks.append((current_heading, '\n\n'.join(current_parts))) current_parts = [] current_heading = heading_match.group(1).lstrip('#').strip() current_parts.append(para) else: current_parts.append(para) if current_parts: blocks.append((current_heading, '\n\n'.join(current_parts))) return blocks # --------------------------------------------------------------------------- # Main chunker # --------------------------------------------------------------------------- def chunk_document( text: str, target_tokens: int = 700, overlap_tokens: int = 70, min_tokens: int = 50, ) -> list[Chunk]: """ Chunk ``text`` into overlapping token windows. Strategy: 1. Split into semantic blocks (heading sections / paragraphs). 2. Merge small blocks and split large blocks to hit ``target_tokens``. 3. Add overlapping context from the previous chunk. Args: text: Plain-text content to chunk. target_tokens: Target chunk size in tokens (default 700). overlap_tokens: Number of tokens to repeat from the previous chunk. min_tokens: Skip chunks shorter than this. Returns: List of :class:`Chunk` objects. """ if not text.strip(): return [] semantic_blocks = _split_semantic_blocks(text) raw_chunks: list[tuple[str, str]] = [] # (heading, text) for heading, block in semantic_blocks: block_tokens = _count_tokens(block) if block_tokens <= target_tokens: raw_chunks.append((heading, block)) else: # Split large blocks into token windows tokens = _tokenise(block) step = target_tokens - overlap_tokens start = 0 while start < len(tokens): end = min(start + target_tokens, len(tokens)) raw_chunks.append((heading, _decode(tokens[start:end]))) if end == len(tokens): break start += step # ---- Merge small adjacent chunks ---- merged: list[tuple[str, str]] = [] buffer_heading = '' buffer_text = '' buffer_tokens = 0 for heading, text_block in raw_chunks: block_tokens = _count_tokens(text_block) if buffer_tokens + block_tokens <= target_tokens: buffer_text = (buffer_text + '\n\n' + text_block).strip() buffer_tokens += block_tokens buffer_heading = heading or buffer_heading else: if buffer_text: merged.append((buffer_heading, buffer_text)) buffer_heading = heading buffer_text = text_block buffer_tokens = block_tokens if buffer_text: merged.append((buffer_heading, buffer_text)) # ---- Build final chunks with overlap ---- chunks: list[Chunk] = [] prev_overlap_text = '' for idx, (heading, chunk_text) in enumerate(merged): # Prepend overlap from previous chunk if prev_overlap_text: chunk_text = prev_overlap_text + '\n\n' + chunk_text token_count = _count_tokens(chunk_text) if token_count < min_tokens: continue chunks.append(Chunk( chunk_index=idx, content=chunk_text.strip(), token_count=token_count, metadata={'heading': heading, 'chunk_seq': idx}, )) # Compute overlap for next chunk tokens = _tokenise(chunk_text) prev_overlap_text = _decode(tokens[-overlap_tokens:]) if len(tokens) > overlap_tokens else chunk_text return chunks