You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
154 lines
4.9 KiB
154 lines
4.9 KiB
"""
|
|
indexer.py — Upserts parsed documents and embeddings into PostgreSQL.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
from datetime import date, datetime
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
|
|
from chunker import Chunk
|
|
from parser import ParsedDocument
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def sha256(text: str) -> str:
|
|
return hashlib.sha256(text.encode('utf-8')).hexdigest()
|
|
|
|
|
|
class DateTimeEncoder(json.JSONEncoder):
|
|
"""JSON encoder that handles date/datetime objects."""
|
|
def default(self, obj):
|
|
if isinstance(obj, (date, datetime)):
|
|
return obj.isoformat()
|
|
return super().default(obj)
|
|
|
|
|
|
async def upsert_document(
|
|
conn: asyncpg.Connection,
|
|
doc: ParsedDocument,
|
|
chunks: list[Chunk],
|
|
embeddings: list[list[float]],
|
|
) -> str:
|
|
"""
|
|
Upsert a document and its chunks atomically.
|
|
|
|
Returns the document UUID.
|
|
"""
|
|
content_hash = sha256(doc.content_raw)
|
|
|
|
async with conn.transaction():
|
|
# ---- Upsert document ----
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO documents
|
|
(path, title, content, content_hash, frontmatter, tags, aliases, word_count, indexed_at)
|
|
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, now())
|
|
ON CONFLICT (path) DO UPDATE SET
|
|
title = EXCLUDED.title,
|
|
content = EXCLUDED.content,
|
|
content_hash = EXCLUDED.content_hash,
|
|
frontmatter = EXCLUDED.frontmatter,
|
|
tags = EXCLUDED.tags,
|
|
aliases = EXCLUDED.aliases,
|
|
word_count = EXCLUDED.word_count,
|
|
indexed_at = now()
|
|
RETURNING id, (xmax = 0) AS inserted
|
|
""",
|
|
doc.path,
|
|
doc.title,
|
|
doc.content_raw,
|
|
content_hash,
|
|
json.dumps(doc.frontmatter, cls=DateTimeEncoder),
|
|
doc.tags,
|
|
doc.aliases,
|
|
doc.word_count,
|
|
)
|
|
doc_id: str = str(row['id'])
|
|
is_new = row['inserted']
|
|
logger.info('%s document %s (%s)', 'Inserted' if is_new else 'Updated', doc.path, doc_id)
|
|
|
|
# ---- Delete stale chunks ----
|
|
await conn.execute('DELETE FROM chunks WHERE document_id = $1', row['id'])
|
|
|
|
# ---- Insert chunks + embeddings ----
|
|
chunk_records = []
|
|
for chunk, embedding in zip(chunks, embeddings):
|
|
# Convert embedding list to pgvector string format
|
|
embedding_str = '[' + ','.join(str(x) for x in embedding) + ']'
|
|
chunk_records.append((
|
|
row['id'],
|
|
chunk.chunk_index,
|
|
chunk.content,
|
|
chunk.token_count,
|
|
embedding_str,
|
|
json.dumps(chunk.metadata),
|
|
))
|
|
|
|
await conn.executemany(
|
|
"""
|
|
INSERT INTO chunks (document_id, chunk_index, content, token_count, embedding, metadata)
|
|
VALUES ($1, $2, $3, $4, $5::vector, $6::jsonb)
|
|
""",
|
|
chunk_records,
|
|
)
|
|
logger.debug('Upserted %d chunks for document %s', len(chunk_records), doc.path)
|
|
|
|
# ---- Upsert relations (WikiLinks) ----
|
|
await conn.execute(
|
|
'DELETE FROM relations WHERE source_doc_id = $1 AND relation_type = $2',
|
|
row['id'],
|
|
'wikilink',
|
|
)
|
|
if doc.wikilinks:
|
|
relation_records = [
|
|
(row['id'], link, 'wikilink')
|
|
for link in doc.wikilinks
|
|
]
|
|
await conn.executemany(
|
|
"""
|
|
INSERT INTO relations (source_doc_id, target_path, relation_type)
|
|
VALUES ($1, $2, $3)
|
|
""",
|
|
relation_records,
|
|
)
|
|
# Resolve targets that already exist in the vault
|
|
await conn.execute(
|
|
"""
|
|
UPDATE relations r
|
|
SET target_doc_id = d.id
|
|
FROM documents d
|
|
WHERE r.source_doc_id = $1
|
|
AND r.relation_type = 'wikilink'
|
|
AND (d.path LIKE '%' || r.target_path || '%'
|
|
OR d.title = r.target_path
|
|
OR r.target_path = ANY(d.aliases))
|
|
""",
|
|
row['id'],
|
|
)
|
|
|
|
return doc_id
|
|
|
|
|
|
async def document_needs_reindex(conn: asyncpg.Connection, path: str, content_hash: str) -> bool:
|
|
"""Return True if the document is new or its content hash has changed."""
|
|
row = await conn.fetchrow(
|
|
'SELECT content_hash FROM documents WHERE path = $1',
|
|
path,
|
|
)
|
|
if row is None:
|
|
return True
|
|
return row['content_hash'] != content_hash
|
|
|
|
|
|
async def delete_document(conn: asyncpg.Connection, path: str) -> None:
|
|
"""Remove a document and its cascaded chunks/relations."""
|
|
result = await conn.execute('DELETE FROM documents WHERE path = $1', path)
|
|
logger.info('Deleted document %s (%s)', path, result)
|