|
|
"""
|
|
|
chunker.py — Token-aware sliding-window text chunker.
|
|
|
|
|
|
Splits document text into overlapping chunks of 500–800 tokens,
|
|
|
preferring paragraph / heading boundaries over hard cuts.
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import re
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
import tiktoken
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Types
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
|
class Chunk:
|
|
|
chunk_index: int
|
|
|
content: str
|
|
|
token_count: int
|
|
|
metadata: dict # heading path, start_char, end_char, etc.
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Tokeniser
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# cl100k_base works for most modern models; nomic-embed-text is BPE-compatible
|
|
|
_TOKENISER = tiktoken.get_encoding('cl100k_base')
|
|
|
|
|
|
|
|
|
def _count_tokens(text: str) -> int:
|
|
|
return len(_TOKENISER.encode(text, disallowed_special=()))
|
|
|
|
|
|
|
|
|
def _tokenise(text: str) -> list[int]:
|
|
|
return _TOKENISER.encode(text, disallowed_special=())
|
|
|
|
|
|
|
|
|
def _decode(tokens: list[int]) -> str:
|
|
|
return _TOKENISER.decode(tokens)
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Splitter helpers
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_HEADING_RE = re.compile(r'^(#{1,6}\s.+)$', re.MULTILINE)
|
|
|
_PARA_SEP = re.compile(r'\n{2,}')
|
|
|
|
|
|
|
|
|
def _split_semantic_blocks(text: str) -> list[tuple[str, str]]:
|
|
|
"""
|
|
|
Split text into (heading_path, block_text) tuples at heading / paragraph
|
|
|
boundaries. This is used to build chunks that respect document structure.
|
|
|
"""
|
|
|
blocks: list[tuple[str, str]] = []
|
|
|
current_heading = ''
|
|
|
current_parts: list[str] = []
|
|
|
|
|
|
for para in _PARA_SEP.split(text):
|
|
|
para = para.strip()
|
|
|
if not para:
|
|
|
continue
|
|
|
heading_match = _HEADING_RE.match(para)
|
|
|
if heading_match:
|
|
|
# Flush current accumulation
|
|
|
if current_parts:
|
|
|
blocks.append((current_heading, '\n\n'.join(current_parts)))
|
|
|
current_parts = []
|
|
|
current_heading = heading_match.group(1).lstrip('#').strip()
|
|
|
current_parts.append(para)
|
|
|
else:
|
|
|
current_parts.append(para)
|
|
|
|
|
|
if current_parts:
|
|
|
blocks.append((current_heading, '\n\n'.join(current_parts)))
|
|
|
|
|
|
return blocks
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Main chunker
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def chunk_document(
|
|
|
text: str,
|
|
|
target_tokens: int = 700,
|
|
|
overlap_tokens: int = 70,
|
|
|
min_tokens: int = 50,
|
|
|
) -> list[Chunk]:
|
|
|
"""
|
|
|
Chunk ``text`` into overlapping token windows.
|
|
|
|
|
|
Strategy:
|
|
|
1. Split into semantic blocks (heading sections / paragraphs).
|
|
|
2. Merge small blocks and split large blocks to hit ``target_tokens``.
|
|
|
3. Add overlapping context from the previous chunk.
|
|
|
|
|
|
Args:
|
|
|
text: Plain-text content to chunk.
|
|
|
target_tokens: Target chunk size in tokens (default 700).
|
|
|
overlap_tokens: Number of tokens to repeat from the previous chunk.
|
|
|
min_tokens: Skip chunks shorter than this.
|
|
|
|
|
|
Returns:
|
|
|
List of :class:`Chunk` objects.
|
|
|
"""
|
|
|
if not text.strip():
|
|
|
return []
|
|
|
|
|
|
semantic_blocks = _split_semantic_blocks(text)
|
|
|
raw_chunks: list[tuple[str, str]] = [] # (heading, text)
|
|
|
|
|
|
for heading, block in semantic_blocks:
|
|
|
block_tokens = _count_tokens(block)
|
|
|
if block_tokens <= target_tokens:
|
|
|
raw_chunks.append((heading, block))
|
|
|
else:
|
|
|
# Split large blocks into token windows
|
|
|
tokens = _tokenise(block)
|
|
|
step = target_tokens - overlap_tokens
|
|
|
start = 0
|
|
|
while start < len(tokens):
|
|
|
end = min(start + target_tokens, len(tokens))
|
|
|
raw_chunks.append((heading, _decode(tokens[start:end])))
|
|
|
if end == len(tokens):
|
|
|
break
|
|
|
start += step
|
|
|
|
|
|
# ---- Merge small adjacent chunks ----
|
|
|
merged: list[tuple[str, str]] = []
|
|
|
buffer_heading = ''
|
|
|
buffer_text = ''
|
|
|
buffer_tokens = 0
|
|
|
|
|
|
for heading, text_block in raw_chunks:
|
|
|
block_tokens = _count_tokens(text_block)
|
|
|
if buffer_tokens + block_tokens <= target_tokens:
|
|
|
buffer_text = (buffer_text + '\n\n' + text_block).strip()
|
|
|
buffer_tokens += block_tokens
|
|
|
buffer_heading = heading or buffer_heading
|
|
|
else:
|
|
|
if buffer_text:
|
|
|
merged.append((buffer_heading, buffer_text))
|
|
|
buffer_heading = heading
|
|
|
buffer_text = text_block
|
|
|
buffer_tokens = block_tokens
|
|
|
|
|
|
if buffer_text:
|
|
|
merged.append((buffer_heading, buffer_text))
|
|
|
|
|
|
# ---- Build final chunks with overlap ----
|
|
|
chunks: list[Chunk] = []
|
|
|
prev_overlap_text = ''
|
|
|
|
|
|
for idx, (heading, chunk_text) in enumerate(merged):
|
|
|
# Prepend overlap from previous chunk
|
|
|
if prev_overlap_text:
|
|
|
chunk_text = prev_overlap_text + '\n\n' + chunk_text
|
|
|
|
|
|
token_count = _count_tokens(chunk_text)
|
|
|
|
|
|
if token_count < min_tokens:
|
|
|
continue
|
|
|
|
|
|
chunks.append(Chunk(
|
|
|
chunk_index=idx,
|
|
|
content=chunk_text.strip(),
|
|
|
token_count=token_count,
|
|
|
metadata={'heading': heading, 'chunk_seq': idx},
|
|
|
))
|
|
|
|
|
|
# Compute overlap for next chunk
|
|
|
tokens = _tokenise(chunk_text)
|
|
|
prev_overlap_text = _decode(tokens[-overlap_tokens:]) if len(tokens) > overlap_tokens else chunk_text
|
|
|
|
|
|
return chunks
|