You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
2.4 KiB

"""
pipeline.py — Orchestrates the full ingestion flow for a single file.
"""
from __future__ import annotations
import hashlib
import logging
from pathlib import Path
import asyncpg
from chunker import chunk_document
from embedder import get_embedder
from indexer import document_needs_reindex, upsert_document
from parser import parse_document
from settings import Settings
logger = logging.getLogger(__name__)
def _sha256(text: str) -> str:
return hashlib.sha256(text.encode('utf-8')).hexdigest()
async def ingest_file(
file_path: Path,
settings: Settings,
conn: asyncpg.Connection,
) -> bool:
"""
Full ingestion pipeline for a single Markdown file.
Returns True if the file was (re)indexed, False if skipped.
"""
vault_root = Path(settings.vault_path)
if not file_path.exists():
logger.warning('File not found, skipping: %s', file_path)
return False
if not file_path.suffix.lower() == '.md':
return False
raw_text = file_path.read_text(encoding='utf-8', errors='replace')
content_hash = _sha256(raw_text)
relative_path = str(file_path.relative_to(vault_root))
# Idempotency check
if not await document_needs_reindex(conn, relative_path, content_hash):
logger.debug('Skipping unchanged file: %s', relative_path)
return False
logger.info('Ingesting %s', relative_path)
# Parse
doc = parse_document(file_path, vault_root)
# Chunk
chunks = chunk_document(
doc.content_text,
target_tokens=settings.chunk_size,
overlap_tokens=settings.chunk_overlap,
)
if not chunks:
logger.warning('No chunks generated for %s', relative_path)
return False
# Embed
embedder = get_embedder(
provider=settings.embedding_provider,
ollama_url=settings.ollama_url,
model=settings.embedding_model,
)
texts = [c.content for c in chunks]
embeddings = embedder.embed_batch(texts)
# Validate embedding dimension consistency
if embeddings and len(embeddings[0]) != embedder.dimensions:
logger.error(
'Embedding dimension mismatch: expected %d, got %d',
embedder.dimensions,
len(embeddings[0]),
)
raise ValueError('Embedding dimension mismatch')
# Store
doc_id = await upsert_document(conn, doc, chunks, embeddings)
logger.info('Indexed %s%s (%d chunks)', relative_path, doc_id, len(chunks))
return True

Powered by TurnKey Linux.