You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

183 lines
5.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
chunker.py — Token-aware sliding-window text chunker.
Splits document text into overlapping chunks of 500800 tokens,
preferring paragraph / heading boundaries over hard cuts.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
import tiktoken
# ---------------------------------------------------------------------------
# Types
# ---------------------------------------------------------------------------
@dataclass
class Chunk:
chunk_index: int
content: str
token_count: int
metadata: dict # heading path, start_char, end_char, etc.
# ---------------------------------------------------------------------------
# Tokeniser
# ---------------------------------------------------------------------------
# cl100k_base works for most modern models; nomic-embed-text is BPE-compatible
_TOKENISER = tiktoken.get_encoding('cl100k_base')
def _count_tokens(text: str) -> int:
return len(_TOKENISER.encode(text, disallowed_special=()))
def _tokenise(text: str) -> list[int]:
return _TOKENISER.encode(text, disallowed_special=())
def _decode(tokens: list[int]) -> str:
return _TOKENISER.decode(tokens)
# ---------------------------------------------------------------------------
# Splitter helpers
# ---------------------------------------------------------------------------
_HEADING_RE = re.compile(r'^(#{1,6}\s.+)$', re.MULTILINE)
_PARA_SEP = re.compile(r'\n{2,}')
def _split_semantic_blocks(text: str) -> list[tuple[str, str]]:
"""
Split text into (heading_path, block_text) tuples at heading / paragraph
boundaries. This is used to build chunks that respect document structure.
"""
blocks: list[tuple[str, str]] = []
current_heading = ''
current_parts: list[str] = []
for para in _PARA_SEP.split(text):
para = para.strip()
if not para:
continue
heading_match = _HEADING_RE.match(para)
if heading_match:
# Flush current accumulation
if current_parts:
blocks.append((current_heading, '\n\n'.join(current_parts)))
current_parts = []
current_heading = heading_match.group(1).lstrip('#').strip()
current_parts.append(para)
else:
current_parts.append(para)
if current_parts:
blocks.append((current_heading, '\n\n'.join(current_parts)))
return blocks
# ---------------------------------------------------------------------------
# Main chunker
# ---------------------------------------------------------------------------
def chunk_document(
text: str,
target_tokens: int = 700,
overlap_tokens: int = 70,
min_tokens: int = 50,
) -> list[Chunk]:
"""
Chunk ``text`` into overlapping token windows.
Strategy:
1. Split into semantic blocks (heading sections / paragraphs).
2. Merge small blocks and split large blocks to hit ``target_tokens``.
3. Add overlapping context from the previous chunk.
Args:
text: Plain-text content to chunk.
target_tokens: Target chunk size in tokens (default 700).
overlap_tokens: Number of tokens to repeat from the previous chunk.
min_tokens: Skip chunks shorter than this.
Returns:
List of :class:`Chunk` objects.
"""
if not text.strip():
return []
semantic_blocks = _split_semantic_blocks(text)
raw_chunks: list[tuple[str, str]] = [] # (heading, text)
for heading, block in semantic_blocks:
block_tokens = _count_tokens(block)
if block_tokens <= target_tokens:
raw_chunks.append((heading, block))
else:
# Split large blocks into token windows
tokens = _tokenise(block)
step = target_tokens - overlap_tokens
start = 0
while start < len(tokens):
end = min(start + target_tokens, len(tokens))
raw_chunks.append((heading, _decode(tokens[start:end])))
if end == len(tokens):
break
start += step
# ---- Merge small adjacent chunks ----
merged: list[tuple[str, str]] = []
buffer_heading = ''
buffer_text = ''
buffer_tokens = 0
for heading, text_block in raw_chunks:
block_tokens = _count_tokens(text_block)
if buffer_tokens + block_tokens <= target_tokens:
buffer_text = (buffer_text + '\n\n' + text_block).strip()
buffer_tokens += block_tokens
buffer_heading = heading or buffer_heading
else:
if buffer_text:
merged.append((buffer_heading, buffer_text))
buffer_heading = heading
buffer_text = text_block
buffer_tokens = block_tokens
if buffer_text:
merged.append((buffer_heading, buffer_text))
# ---- Build final chunks with overlap ----
chunks: list[Chunk] = []
prev_overlap_text = ''
for idx, (heading, chunk_text) in enumerate(merged):
# Prepend overlap from previous chunk
if prev_overlap_text:
chunk_text = prev_overlap_text + '\n\n' + chunk_text
token_count = _count_tokens(chunk_text)
if token_count < min_tokens:
continue
chunks.append(Chunk(
chunk_index=idx,
content=chunk_text.strip(),
token_count=token_count,
metadata={'heading': heading, 'chunk_seq': idx},
))
# Compute overlap for next chunk
tokens = _tokenise(chunk_text)
prev_overlap_text = _decode(tokens[-overlap_tokens:]) if len(tokens) > overlap_tokens else chunk_text
return chunks

Powered by TurnKey Linux.