You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
119 lines
3.8 KiB
119 lines
3.8 KiB
"""
|
|
watcher.py — File system watcher that triggers ingestion on vault changes.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
from threading import Thread
|
|
|
|
import asyncpg
|
|
from watchdog.events import FileSystemEvent, FileSystemEventHandler
|
|
from watchdog.observers import Observer
|
|
|
|
from pipeline import ingest_file
|
|
from settings import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VaultEventHandler(FileSystemEventHandler):
|
|
"""Enqueues changed/created Markdown file paths."""
|
|
|
|
def __init__(self, queue: Queue) -> None:
|
|
super().__init__()
|
|
self._queue = queue
|
|
|
|
def on_created(self, event: FileSystemEvent) -> None:
|
|
self._enqueue(event)
|
|
|
|
def on_modified(self, event: FileSystemEvent) -> None:
|
|
self._enqueue(event)
|
|
|
|
def on_deleted(self, event: FileSystemEvent) -> None:
|
|
if not event.is_directory and str(event.src_path).endswith('.md'):
|
|
self._queue.put(('delete', event.src_path))
|
|
|
|
def _enqueue(self, event: FileSystemEvent) -> None:
|
|
if not event.is_directory and str(event.src_path).endswith('.md'):
|
|
self._queue.put(('upsert', event.src_path))
|
|
|
|
|
|
async def process_queue(
|
|
queue: Queue,
|
|
settings: Settings,
|
|
pool: asyncpg.Pool,
|
|
) -> None:
|
|
"""Drain the event queue and process each file."""
|
|
pending: set[str] = set()
|
|
DEBOUNCE_SECONDS = 2.0
|
|
|
|
while True:
|
|
# Collect all queued events (debounce rapid saves)
|
|
deadline = time.monotonic() + DEBOUNCE_SECONDS
|
|
while time.monotonic() < deadline:
|
|
try:
|
|
action, path = queue.get_nowait()
|
|
pending.add((action, path))
|
|
except Exception:
|
|
await asyncio.sleep(0.1)
|
|
|
|
for action, path in list(pending):
|
|
try:
|
|
async with pool.acquire() as conn:
|
|
if action == 'upsert':
|
|
await ingest_file(Path(path), settings, conn)
|
|
elif action == 'delete':
|
|
from indexer import delete_document
|
|
relative = str(Path(path).relative_to(Path(settings.vault_path)))
|
|
await delete_document(conn, relative)
|
|
except Exception as exc:
|
|
logger.error('Error processing %s %s: %s', action, path, exc, exc_info=True)
|
|
|
|
pending.clear()
|
|
|
|
|
|
async def initial_scan(settings: Settings, pool: asyncpg.Pool) -> None:
|
|
"""Index all Markdown files in the vault at startup."""
|
|
vault_path = Path(settings.vault_path)
|
|
md_files = list(vault_path.rglob('*.md'))
|
|
logger.info('Initial scan: found %d Markdown files', len(md_files))
|
|
|
|
for i, file_path in enumerate(md_files):
|
|
try:
|
|
async with pool.acquire() as conn:
|
|
indexed = await ingest_file(file_path, settings, conn)
|
|
if indexed:
|
|
logger.info('[%d/%d] Indexed %s', i + 1, len(md_files), file_path.name)
|
|
except Exception as exc:
|
|
logger.error('Failed to index %s: %s', file_path, exc, exc_info=True)
|
|
|
|
logger.info('Initial scan complete.')
|
|
|
|
|
|
async def run_watcher(settings: Settings) -> None:
|
|
"""Entry point: start file watcher + initial scan."""
|
|
pool = await asyncpg.create_pool(settings.database_url, min_size=2, max_size=10)
|
|
|
|
await initial_scan(settings, pool)
|
|
|
|
event_queue: Queue = Queue()
|
|
handler = VaultEventHandler(event_queue)
|
|
observer = Observer()
|
|
observer.schedule(handler, settings.vault_path, recursive=True)
|
|
observer.start()
|
|
logger.info('Watching vault at %s', settings.vault_path)
|
|
|
|
try:
|
|
await process_queue(event_queue, settings, pool)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
observer.stop()
|
|
observer.join()
|
|
await pool.close()
|