You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
92 lines
2.4 KiB
92 lines
2.4 KiB
"""
|
|
pipeline.py — Orchestrates the full ingestion flow for a single file.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import asyncpg
|
|
|
|
from chunker import chunk_document
|
|
from embedder import get_embedder
|
|
from indexer import document_needs_reindex, upsert_document
|
|
from parser import parse_document
|
|
from settings import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _sha256(text: str) -> str:
|
|
return hashlib.sha256(text.encode('utf-8')).hexdigest()
|
|
|
|
|
|
async def ingest_file(
|
|
file_path: Path,
|
|
settings: Settings,
|
|
conn: asyncpg.Connection,
|
|
) -> bool:
|
|
"""
|
|
Full ingestion pipeline for a single Markdown file.
|
|
|
|
Returns True if the file was (re)indexed, False if skipped.
|
|
"""
|
|
vault_root = Path(settings.vault_path)
|
|
|
|
if not file_path.exists():
|
|
logger.warning('File not found, skipping: %s', file_path)
|
|
return False
|
|
|
|
if not file_path.suffix.lower() == '.md':
|
|
return False
|
|
|
|
raw_text = file_path.read_text(encoding='utf-8', errors='replace')
|
|
content_hash = _sha256(raw_text)
|
|
relative_path = str(file_path.relative_to(vault_root))
|
|
|
|
# Idempotency check
|
|
if not await document_needs_reindex(conn, relative_path, content_hash):
|
|
logger.debug('Skipping unchanged file: %s', relative_path)
|
|
return False
|
|
|
|
logger.info('Ingesting %s', relative_path)
|
|
|
|
# Parse
|
|
doc = parse_document(file_path, vault_root)
|
|
|
|
# Chunk
|
|
chunks = chunk_document(
|
|
doc.content_text,
|
|
target_tokens=settings.chunk_size,
|
|
overlap_tokens=settings.chunk_overlap,
|
|
)
|
|
|
|
if not chunks:
|
|
logger.warning('No chunks generated for %s', relative_path)
|
|
return False
|
|
|
|
# Embed
|
|
embedder = get_embedder(
|
|
provider=settings.embedding_provider,
|
|
ollama_url=settings.ollama_url,
|
|
model=settings.embedding_model,
|
|
)
|
|
texts = [c.content for c in chunks]
|
|
embeddings = embedder.embed_batch(texts)
|
|
|
|
# Validate embedding dimension consistency
|
|
if embeddings and len(embeddings[0]) != embedder.dimensions:
|
|
logger.error(
|
|
'Embedding dimension mismatch: expected %d, got %d',
|
|
embedder.dimensions,
|
|
len(embeddings[0]),
|
|
)
|
|
raise ValueError('Embedding dimension mismatch')
|
|
|
|
# Store
|
|
doc_id = await upsert_document(conn, doc, chunks, embeddings)
|
|
logger.info('Indexed %s → %s (%d chunks)', relative_path, doc_id, len(chunks))
|
|
return True
|