""" watcher.py — File system watcher that triggers ingestion on vault changes. """ from __future__ import annotations import asyncio import logging import time from pathlib import Path from queue import Queue from threading import Thread import asyncpg from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer from pipeline import ingest_file from settings import Settings logger = logging.getLogger(__name__) class VaultEventHandler(FileSystemEventHandler): """Enqueues changed/created Markdown file paths.""" def __init__(self, queue: Queue) -> None: super().__init__() self._queue = queue def on_created(self, event: FileSystemEvent) -> None: self._enqueue(event) def on_modified(self, event: FileSystemEvent) -> None: self._enqueue(event) def on_deleted(self, event: FileSystemEvent) -> None: if not event.is_directory and str(event.src_path).endswith('.md'): self._queue.put(('delete', event.src_path)) def _enqueue(self, event: FileSystemEvent) -> None: if not event.is_directory and str(event.src_path).endswith('.md'): self._queue.put(('upsert', event.src_path)) async def process_queue( queue: Queue, settings: Settings, pool: asyncpg.Pool, ) -> None: """Drain the event queue and process each file.""" pending: set[str] = set() DEBOUNCE_SECONDS = 2.0 while True: # Collect all queued events (debounce rapid saves) deadline = time.monotonic() + DEBOUNCE_SECONDS while time.monotonic() < deadline: try: action, path = queue.get_nowait() pending.add((action, path)) except Exception: await asyncio.sleep(0.1) for action, path in list(pending): try: async with pool.acquire() as conn: if action == 'upsert': await ingest_file(Path(path), settings, conn) elif action == 'delete': from indexer import delete_document relative = str(Path(path).relative_to(Path(settings.vault_path))) await delete_document(conn, relative) except Exception as exc: logger.error('Error processing %s %s: %s', action, path, exc, exc_info=True) pending.clear() async def initial_scan(settings: Settings, pool: asyncpg.Pool) -> None: """Index all Markdown files in the vault at startup.""" vault_path = Path(settings.vault_path) md_files = list(vault_path.rglob('*.md')) logger.info('Initial scan: found %d Markdown files', len(md_files)) for i, file_path in enumerate(md_files): try: async with pool.acquire() as conn: indexed = await ingest_file(file_path, settings, conn) if indexed: logger.info('[%d/%d] Indexed %s', i + 1, len(md_files), file_path.name) except Exception as exc: logger.error('Failed to index %s: %s', file_path, exc, exc_info=True) logger.info('Initial scan complete.') async def run_watcher(settings: Settings) -> None: """Entry point: start file watcher + initial scan.""" pool = await asyncpg.create_pool(settings.database_url, min_size=2, max_size=10) await initial_scan(settings, pool) event_queue: Queue = Queue() handler = VaultEventHandler(event_queue) observer = Observer() observer.schedule(handler, settings.vault_path, recursive=True) observer.start() logger.info('Watching vault at %s', settings.vault_path) try: await process_queue(event_queue, settings, pool) except asyncio.CancelledError: pass finally: observer.stop() observer.join() await pool.close()