You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

47 lines
1.4 KiB

"""
ingestion/agent.py — Ingestion Agent: indexes new/changed files from the vault.
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'ingestion-worker'))
import asyncpg
from base_agent import BaseAgent
from pipeline import ingest_file
from settings import Settings as IngestionSettings
class IngestionAgent(BaseAgent):
agent_type = 'ingestion'
async def process(self, job_id: str, payload: dict) -> dict:
settings = IngestionSettings()
vault_root = Path(settings.vault_path)
if payload.get('reindex_all'):
md_files = list(vault_root.rglob('*.md'))
indexed = 0
skipped = 0
for fp in md_files:
async with self.pool.acquire() as conn:
result = await ingest_file(fp, settings, conn)
if result:
indexed += 1
else:
skipped += 1
return {'indexed': indexed, 'skipped': skipped, 'total': len(md_files)}
elif payload.get('path'):
file_path = vault_root / payload['path']
async with self.pool.acquire() as conn:
result = await ingest_file(file_path, settings, conn)
return {'indexed': 1 if result else 0, 'path': payload['path']}
return {'message': 'No action specified'}

Powered by TurnKey Linux.