You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
4.9 KiB

"""
indexer.py — Upserts parsed documents and embeddings into PostgreSQL.
"""
from __future__ import annotations
import hashlib
import json
import logging
from datetime import date, datetime
from typing import Any
import asyncpg
from chunker import Chunk
from parser import ParsedDocument
logger = logging.getLogger(__name__)
def sha256(text: str) -> str:
return hashlib.sha256(text.encode('utf-8')).hexdigest()
class DateTimeEncoder(json.JSONEncoder):
"""JSON encoder that handles date/datetime objects."""
def default(self, obj):
if isinstance(obj, (date, datetime)):
return obj.isoformat()
return super().default(obj)
async def upsert_document(
conn: asyncpg.Connection,
doc: ParsedDocument,
chunks: list[Chunk],
embeddings: list[list[float]],
) -> str:
"""
Upsert a document and its chunks atomically.
Returns the document UUID.
"""
content_hash = sha256(doc.content_raw)
async with conn.transaction():
# ---- Upsert document ----
row = await conn.fetchrow(
"""
INSERT INTO documents
(path, title, content, content_hash, frontmatter, tags, aliases, word_count, indexed_at)
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, now())
ON CONFLICT (path) DO UPDATE SET
title = EXCLUDED.title,
content = EXCLUDED.content,
content_hash = EXCLUDED.content_hash,
frontmatter = EXCLUDED.frontmatter,
tags = EXCLUDED.tags,
aliases = EXCLUDED.aliases,
word_count = EXCLUDED.word_count,
indexed_at = now()
RETURNING id, (xmax = 0) AS inserted
""",
doc.path,
doc.title,
doc.content_raw,
content_hash,
json.dumps(doc.frontmatter, cls=DateTimeEncoder),
doc.tags,
doc.aliases,
doc.word_count,
)
doc_id: str = str(row['id'])
is_new = row['inserted']
logger.info('%s document %s (%s)', 'Inserted' if is_new else 'Updated', doc.path, doc_id)
# ---- Delete stale chunks ----
await conn.execute('DELETE FROM chunks WHERE document_id = $1', row['id'])
# ---- Insert chunks + embeddings ----
chunk_records = []
for chunk, embedding in zip(chunks, embeddings):
# Convert embedding list to pgvector string format
embedding_str = '[' + ','.join(str(x) for x in embedding) + ']'
chunk_records.append((
row['id'],
chunk.chunk_index,
chunk.content,
chunk.token_count,
embedding_str,
json.dumps(chunk.metadata),
))
await conn.executemany(
"""
INSERT INTO chunks (document_id, chunk_index, content, token_count, embedding, metadata)
VALUES ($1, $2, $3, $4, $5::vector, $6::jsonb)
""",
chunk_records,
)
logger.debug('Upserted %d chunks for document %s', len(chunk_records), doc.path)
# ---- Upsert relations (WikiLinks) ----
await conn.execute(
'DELETE FROM relations WHERE source_doc_id = $1 AND relation_type = $2',
row['id'],
'wikilink',
)
if doc.wikilinks:
relation_records = [
(row['id'], link, 'wikilink')
for link in doc.wikilinks
]
await conn.executemany(
"""
INSERT INTO relations (source_doc_id, target_path, relation_type)
VALUES ($1, $2, $3)
""",
relation_records,
)
# Resolve targets that already exist in the vault
await conn.execute(
"""
UPDATE relations r
SET target_doc_id = d.id
FROM documents d
WHERE r.source_doc_id = $1
AND r.relation_type = 'wikilink'
AND (d.path LIKE '%' || r.target_path || '%'
OR d.title = r.target_path
OR r.target_path = ANY(d.aliases))
""",
row['id'],
)
return doc_id
async def document_needs_reindex(conn: asyncpg.Connection, path: str, content_hash: str) -> bool:
"""Return True if the document is new or its content hash has changed."""
row = await conn.fetchrow(
'SELECT content_hash FROM documents WHERE path = $1',
path,
)
if row is None:
return True
return row['content_hash'] != content_hash
async def delete_document(conn: asyncpg.Connection, path: str) -> None:
"""Remove a document and its cascaded chunks/relations."""
result = await conn.execute('DELETE FROM documents WHERE path = $1', path)
logger.info('Deleted document %s (%s)', path, result)

Powered by TurnKey Linux.