commit 626b04aa4e6ab5748b40d811faaf329528828f20 Author: Clawd Date: Thu Mar 5 19:37:02 2026 +0000 Initial commit: AI Second Brain Self-hosted knowledge management system with: - RAG API (FastAPI + pgvector) - Markdown vault (Obsidian/Logseq compatible) - Autonomous AI agents (ingestion, tagging, linking, summarization, maintenance) - Web UI (Next.js) - Docker Compose deployment - Ollama integration for local LLM inference Built by Copilot CLI, reviewed by Clawd. diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..504d792 --- /dev/null +++ b/.env.example @@ -0,0 +1,61 @@ +# ============================================================================= +# AI Second Brain — Environment Configuration +# Copy this file to .env and adjust values for your setup. +# ============================================================================= + +# --------------------------------------------------------------------------- +# Database +# --------------------------------------------------------------------------- +POSTGRES_PASSWORD=brain +POSTGRES_PORT=5432 + +# --------------------------------------------------------------------------- +# Ollama (local LLM) +# --------------------------------------------------------------------------- +OLLAMA_PORT=11434 +EMBEDDING_MODEL=nomic-embed-text +CHAT_MODEL=mistral + +# --------------------------------------------------------------------------- +# RAG API +# --------------------------------------------------------------------------- +API_PORT=8000 +LOG_LEVEL=INFO + +# Retrieval defaults +SEARCH_TOP_K=10 +SEARCH_THRESHOLD=0.65 +RERANK_ENABLED=false + +# Embedding provider: ollama | sentence_transformers +EMBEDDING_PROVIDER=ollama +EMBEDDING_DIMENSIONS=768 + +# CORS — comma-separated origins allowed to access the API +CORS_ORIGINS=http://localhost:3000 + +# --------------------------------------------------------------------------- +# Web UI +# --------------------------------------------------------------------------- +UI_PORT=3000 + +# --------------------------------------------------------------------------- +# Ingestion Worker +# --------------------------------------------------------------------------- +VAULT_PATH=/vault +CHUNK_SIZE=700 +CHUNK_OVERLAP=70 +POLL_INTERVAL=30 + +# --------------------------------------------------------------------------- +# AI Agents +# --------------------------------------------------------------------------- +INGESTION_POLL=15 +LINKING_POLL=60 +TAGGING_POLL=120 +SUMMARIZATION_POLL=300 +MAINTENANCE_POLL=3600 + +# Enable auto-tagging and summarization by agents +AUTO_TAG=true +AUTO_SUMMARIZE=true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a63afda --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +# Environment +.env +.env.local +*.env + +# Python +__pycache__/ +*.py[cod] +*$py.class +.venv/ +venv/ +*.egg-info/ + +# Node +node_modules/ +.next/ +out/ +build/ +dist/ + +# Data +*.db +*.sqlite +*.log + +# IDE +.vscode/ +.idea/ + +# Docker volumes (local) +postgres_data/ +redis_data/ +ollama_data/ + +# OS +.DS_Store +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..c78ca3a --- /dev/null +++ b/README.md @@ -0,0 +1,41 @@ +# AI Second Brain + +A fully self-hosted, offline-capable knowledge management system with AI-powered retrieval, autonomous agents, and a Markdown-first philosophy. + +## Quick Start + +```bash +cp .env.example .env +# edit .env as needed +docker compose up -d +# wait for ollama to pull models +docker compose exec ollama ollama pull nomic-embed-text +docker compose exec ollama ollama pull mistral +# open the UI +open http://localhost:3000 +``` + +## Architecture + +See [docs/architecture.md](docs/architecture.md) for the full design. + +## Components + +| Service | Port | Description | +|--------------------|-------|--------------------------------| +| Web UI | 3000 | Next.js knowledge interface | +| RAG API | 8000 | FastAPI retrieval service | +| Ollama | 11434 | Local LLM inference | +| PostgreSQL | 5432 | Vector + relational store | +| Redis | 6379 | Job queue | + +## Documentation + +- [Architecture](docs/architecture.md) +- [Setup Guide](docs/setup.md) +- [API Reference](docs/api.md) +- [Agents Guide](docs/agents.md) + +## License + +MIT diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..8415458 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,197 @@ +services: + + # --------------------------------------------------------------------------- + # PostgreSQL with pgvector + # --------------------------------------------------------------------------- + postgres: + image: pgvector/pgvector:pg16 + container_name: second-brain-postgres + restart: unless-stopped + environment: + POSTGRES_DB: second_brain + POSTGRES_USER: brain + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-brain} + volumes: + - postgres_data:/var/lib/postgresql/data + - ./infra/database/schema.sql:/docker-entrypoint-initdb.d/01_schema.sql:ro + ports: + - "${POSTGRES_PORT:-5432}:5432" + networks: + - brain-net + healthcheck: + test: ["CMD-SHELL", "pg_isready -U brain -d second_brain"] + interval: 10s + timeout: 5s + retries: 5 + + # --------------------------------------------------------------------------- + # Redis (job queue) + # --------------------------------------------------------------------------- + redis: + image: redis:7-alpine + container_name: second-brain-redis + restart: unless-stopped + volumes: + - redis_data:/data + networks: + - brain-net + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + + # --------------------------------------------------------------------------- + # Ollama (local LLM inference) + # --------------------------------------------------------------------------- + ollama: + image: ollama/ollama:latest + container_name: second-brain-ollama + restart: unless-stopped + volumes: + - ollama_data:/root/.ollama + ports: + - "${OLLAMA_PORT:-11434}:11434" + networks: + - brain-net + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:11434/api/tags"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 60s + + # --------------------------------------------------------------------------- + # Ollama model bootstrap (pulls required models on first start) + # --------------------------------------------------------------------------- + ollama-bootstrap: + image: ollama/ollama:latest + container_name: second-brain-ollama-bootstrap + depends_on: + ollama: + condition: service_healthy + volumes: + - ollama_data:/root/.ollama + networks: + - brain-net + entrypoint: ["/bin/sh", "-c"] + command: + - | + OLLAMA_HOST=ollama:11434 ollama pull ${EMBEDDING_MODEL:-nomic-embed-text} + OLLAMA_HOST=ollama:11434 ollama pull ${CHAT_MODEL:-mistral} + restart: "no" + + # --------------------------------------------------------------------------- + # RAG API (FastAPI) + # --------------------------------------------------------------------------- + rag-api: + build: + context: ./services/rag-api + dockerfile: Dockerfile + container_name: second-brain-rag-api + restart: unless-stopped + env_file: + - .env + environment: + DATABASE_URL: postgresql://brain:${POSTGRES_PASSWORD:-brain}@postgres:5432/second_brain + OLLAMA_URL: http://ollama:11434 + depends_on: + postgres: + condition: service_healthy + ollama: + condition: service_healthy + ports: + - "${API_PORT:-8000}:8000" + networks: + - brain-net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/health"] + interval: 15s + timeout: 5s + retries: 5 + start_period: 30s + + # --------------------------------------------------------------------------- + # Ingestion Worker + # --------------------------------------------------------------------------- + ingestion-worker: + build: + context: ./services/ingestion-worker + dockerfile: Dockerfile + container_name: second-brain-ingestion + restart: unless-stopped + env_file: + - .env + environment: + DATABASE_URL: postgresql://brain:${POSTGRES_PASSWORD:-brain}@postgres:5432/second_brain + OLLAMA_URL: http://ollama:11434 + VAULT_PATH: /vault + volumes: + - ./vault:/vault:ro + depends_on: + postgres: + condition: service_healthy + ollama: + condition: service_healthy + networks: + - brain-net + + # --------------------------------------------------------------------------- + # AI Agents + # --------------------------------------------------------------------------- + agents: + build: + context: ./services/agents + dockerfile: Dockerfile + container_name: second-brain-agents + restart: unless-stopped + env_file: + - .env + environment: + DATABASE_URL: postgresql://brain:${POSTGRES_PASSWORD:-brain}@postgres:5432/second_brain + OLLAMA_URL: http://ollama:11434 + VAULT_PATH: /vault + volumes: + - ./vault:/vault:ro + depends_on: + postgres: + condition: service_healthy + rag-api: + condition: service_healthy + networks: + - brain-net + + # --------------------------------------------------------------------------- + # Web UI (Next.js) + # --------------------------------------------------------------------------- + web-ui: + build: + context: ./services/web-ui + dockerfile: Dockerfile + container_name: second-brain-ui + restart: unless-stopped + environment: + NEXT_PUBLIC_API_URL: http://localhost:${API_PORT:-8000} + depends_on: + rag-api: + condition: service_healthy + ports: + - "${UI_PORT:-3000}:3000" + networks: + - brain-net + +volumes: + postgres_data: + redis_data: + ollama_data: + +networks: + brain-net: + driver: bridge diff --git a/docs/agents.md b/docs/agents.md new file mode 100644 index 0000000..0917afd --- /dev/null +++ b/docs/agents.md @@ -0,0 +1,174 @@ +# AI Agents Guide + +The Second Brain system includes five autonomous AI agents that run as background workers, continuously improving the knowledge base. + +--- + +## Architecture + +All agents inherit from `BaseAgent` and share: +- **Atomic job claiming** from `agent_jobs` table (no double-processing) +- **Exponential backoff retry** (max 3 retries, 2/4/8s delays) +- **Structured logging** to `agent_logs` table +- **Configurable poll intervals** via environment variables + +--- + +## Agents + +### 1. Ingestion Agent (`ingestion`) + +**Purpose:** Indexes new and modified Markdown files from the vault. + +**Triggers:** +- Queued job via the API (`POST /api/v1/index`) +- Full vault reindex job (`POST /api/v1/index/reindex`) +- File watcher events (from ingestion-worker) + +**What it does:** +1. Reads the target file(s) from the vault +2. Parses frontmatter, extracts WikiLinks and tags +3. Chunks content into 500–800 token segments +4. Generates embeddings via Ollama +5. Upserts document, chunks, and relations in PostgreSQL + +**Idempotency:** SHA-256 content hashing ensures unchanged files are skipped. + +--- + +### 2. Knowledge Linking Agent (`linking`) + +**Purpose:** Discovers semantic connections between documents and creates `ai-inferred` relation edges. + +**Triggers:** Runs periodically (default: every 60s). + +**What it does:** +1. Finds documents without AI-inferred links +2. For each: computes average chunk embedding +3. Finds top-5 semantically similar documents (cosine similarity > 0.75) +4. Inserts `ai-inferred` relations + +**Use case:** Surfaces non-obvious connections — e.g., a note about "attention mechanisms" linked to a note about "reading strategies" if the embeddings are similar. + +--- + +### 3. Tagging Agent (`tagging`) + +**Purpose:** Automatically suggests and applies tags to untagged documents using the LLM. + +**Triggers:** Runs periodically (default: every 120s). + +**What it does:** +1. Finds documents with no tags +2. Sends title + content excerpt to Ollama with a tagging prompt +3. Parses the LLM JSON response (3–7 suggested tags) +4. Writes tags back to the `documents` table + +**Prompt template:** Instructs the LLM to produce lowercase, hyphen-separated tags. + +**Note:** Tags written to the database only — to persist back to the Markdown file, run the optional vault sync script. + +--- + +### 4. Summarization Agent (`summarization`) + +**Purpose:** Generates concise summaries for long documents that lack one. + +**Triggers:** Runs periodically (default: every 300s). + +**Criteria:** +- Document word count > 500 +- `frontmatter.summary` is missing or empty + +**What it does:** +1. Sends title + content (up to 4000 chars) to Ollama +2. Receives a 2–4 sentence summary +3. Stores the summary in `documents.frontmatter.summary` + +The summary becomes available via the API and is displayed in the document viewer. + +--- + +### 5. Maintenance Agent (`maintenance`) + +**Purpose:** Health checks and housekeeping for the knowledge graph. + +**Triggers:** Runs daily (default: every 3600s). + +**What it does:** +1. Counts broken WikiLinks (links with no matching document) +2. Finds orphaned documents (no incoming or outgoing links) +3. Counts stale documents (not re-indexed in 7+ days) +4. Counts chunks with missing embeddings +5. Resolves previously broken WikiLinks that now have matching documents + +**Output:** A structured report written to `agent_jobs.result` and logged to `agent_logs`. + +--- + +## Monitoring Agents + +### Check agent job queue + +```sql +SELECT agent_type, status, COUNT(*) +FROM agent_jobs +GROUP BY agent_type, status +ORDER BY agent_type; +``` + +### View recent agent logs + +```sql +SELECT agent_type, level, message, created_at +FROM agent_logs +ORDER BY created_at DESC +LIMIT 50; +``` + +### View last maintenance report + +```sql +SELECT result +FROM agent_jobs +WHERE agent_type = 'maintenance' AND status = 'done' +ORDER BY completed_at DESC +LIMIT 1; +``` + +--- + +## Disabling Agents + +Set poll intervals to very large values in `.env` to effectively disable specific agents: + +```env +LINKING_POLL=999999 +TAGGING_POLL=999999 +``` + +--- + +## Adding a Custom Agent + +1. Create `services/agents/my-agent/agent.py`: + +```python +from base_agent import BaseAgent + +class MyAgent(BaseAgent): + agent_type = 'my-agent' + + async def process(self, job_id: str, payload: dict) -> dict: + # Your logic here + return {'done': True} +``` + +2. Register in `services/agents/main.py`: + +```python +from my_agent.agent import MyAgent +asyncio.create_task(MyAgent(pool, settings).run_forever(60)) +``` + +3. Enqueue jobs via the `agent_jobs` table or via the base class `enqueue()` method. diff --git a/docs/api.md b/docs/api.md new file mode 100644 index 0000000..4ae2d90 --- /dev/null +++ b/docs/api.md @@ -0,0 +1,178 @@ +# API Reference + +Base URL: `http://localhost:8000/api/v1` + +Interactive docs: `http://localhost:8000/docs` (Swagger UI) + +--- + +## Authentication + +No authentication is required by default (local-only deployment). Add a reverse proxy with auth for production. + +--- + +## Endpoints + +### POST `/search` + +Hybrid vector + full-text search across the knowledge base. + +**Request:** +```json +{ + "query": "machine learning transformers", + "limit": 10, + "threshold": 0.65, + "tags": ["ml", "ai"], + "hybrid": true +} +``` + +**Response:** +```json +{ + "results": [ + { + "document_id": "uuid", + "chunk_id": "uuid", + "title": "Introduction to Transformers", + "path": "resources/ml/transformers.md", + "content": "...chunk text...", + "score": 0.923, + "tags": ["ml", "transformers"], + "highlight": "...bolded match..." + } + ], + "total": 5, + "query_time_ms": 18.4 +} +``` + +--- + +### POST `/chat` + +RAG chat with streaming Server-Sent Events response. + +**Request:** +```json +{ + "message": "What do I know about neural networks?", + "context_limit": 5, + "stream": true +} +``` + +**Response (SSE stream):** +``` +data: {"type":"sources","sources":[{"title":"Neural Nets","path":"...","score":0.91}]} + +data: {"type":"token","token":"Neural"} + +data: {"type":"token","token":" networks"} + +data: {"type":"done"} +``` + +--- + +### GET `/document/{id}` + +Get a document by UUID. + +**Response:** Full document object including content, frontmatter, tags. + +--- + +### GET `/document/path/{path}` + +Get a document by its vault-relative path (e.g., `resources/ml/intro.md`). + +--- + +### GET `/document/{id}/related` + +Get related documents ordered by semantic similarity. + +**Query params:** `limit` (default: 5) + +--- + +### POST `/index` + +Queue a specific file for indexing. + +**Request:** +```json +{ "path": "notes/new-note.md" } +``` + +--- + +### POST `/index/reindex` + +Queue a full vault re-index. + +**Request:** +```json +{ "force": false } +``` +Set `force: true` to reindex even unchanged files. + +--- + +### GET `/tags` + +List all tags with document counts. + +**Response:** +```json +[ + {"tag": "machine-learning", "count": 42}, + {"tag": "python", "count": 38} +] +``` + +--- + +### GET `/graph` + +Get the knowledge graph (nodes = documents, edges = links). + +**Query params:** `limit` (default: 200) + +--- + +### GET `/stats` + +System statistics. + +**Response:** +```json +{ + "total_documents": 1234, + "total_chunks": 8765, + "total_relations": 3210, + "total_tags": 87, + "last_indexed": "2026-03-05T19:00:00Z", + "embedding_model": "nomic-embed-text", + "chat_model": "mistral" +} +``` + +--- + +### GET `/health` + +Health check. + +**Response:** +```json +{ + "status": "ok", + "database": "ok", + "ollama": "ok", + "version": "1.0.0" +} +``` diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..2c44e68 --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,420 @@ +# AI Second Brain — System Architecture + +> Version: 1.0.0 +> Date: 2026-03-05 +> Status: Design Document + +--- + +## Table of Contents + +1. [Overview](#overview) +2. [Core Components](#core-components) +3. [Data Flow](#data-flow) +4. [Database Schema](#database-schema) +5. [API Design](#api-design) +6. [Agent Architecture](#agent-architecture) +7. [Ingestion Pipeline](#ingestion-pipeline) +8. [Infrastructure](#infrastructure) +9. [Design Principles](#design-principles) + +--- + +## Overview + +The AI Second Brain is a fully self-hosted, offline-capable knowledge management system that treats a Markdown vault (Obsidian/Logseq compatible) as the single source of truth. All AI capabilities—embeddings, retrieval, generation, and autonomous agents—run locally. + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ AI SECOND BRAIN │ +│ │ +│ ┌──────────┐ ┌────────────┐ ┌──────────┐ ┌────────────┐ │ +│ │ EDITOR │───▶│ INGESTION │───▶│ STORAGE │───▶│ API │ │ +│ │ LAYER │ │ PIPELINE │ │ LAYER │ │ LAYER │ │ +│ └──────────┘ └────────────┘ └──────────┘ └────────────┘ │ +│ │ │ │ +│ Markdown Vault ┌────▼───────┐ │ +│ (Obsidian/Logseq) │ AI LAYER │ │ +│ │ (Ollama) │ │ +│ └────────────┘ │ +│ │ │ +│ ┌────▼───────┐ │ +│ │ INTERFACE │ │ +│ │ LAYER │ │ +│ └────────────┘ │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Core Components + +### 1. Editor Layer +- **Vault directory**: `./vault/` — plain Markdown files, fully compatible with Obsidian and Logseq +- **Format**: CommonMark + YAML frontmatter + `[[WikiLinks]]` +- **Source of truth**: All knowledge lives here; the database is a derived index +- **Sync**: File-system watching via `watchdog` triggers the ingestion pipeline + +### 2. Storage Layer +- **PostgreSQL 16** with **pgvector** extension +- Stores: document metadata, text chunks, embeddings (1536-dim or 768-dim), extracted entities, wikilink relations +- Vector index: IVFFlat or HNSW for ANN search + +### 3. Processing Layer (Ingestion Pipeline) +- File watcher monitors `./vault/**/*.md` +- Parser: frontmatter extraction (YAML), Markdown-to-text, WikiLink graph extraction +- Chunker: 500–800 token sliding window with 10% overlap +- Embeddings: Ollama (`nomic-embed-text`) or `sentence-transformers` (offline fallback) +- Idempotent: SHA-256 content hashing prevents redundant re-indexing + +### 4. API Layer +- **FastAPI** service exposing REST endpoints +- Retrieval: hybrid search (vector similarity + full-text BM25-style) +- Reranking: optional cross-encoder via `sentence-transformers` +- Async throughout; connection pooling with `asyncpg` + +### 5. AI Layer +- **Ollama** sidecar providing local LLM inference (Mistral, Llama 3, Phi-3, etc.) +- Embedding model: `nomic-embed-text` (768-dim) +- Chat/generation model: configurable (default: `mistral`) +- Agents use LangChain/LlamaIndex or direct Ollama API calls + +### 6. Agent Layer +- Long-running Python workers +- Agents: Ingestion, Knowledge Linking, Tagging, Summarization, Maintenance +- Message queue: Redis-backed job queue (ARQ) or simple PostgreSQL-backed queue +- Scheduled via cron-style configuration + +### 7. Interface Layer +- **Next.js** (React) web application +- Pages: Search, Chat, Document Viewer, Graph View (knowledge graph), Tag Browser +- API client calls the FastAPI backend +- Served as a Docker container (Node.js) + +--- + +## Data Flow + +### Ingestion Flow +``` +Markdown File (vault/) + │ + ▼ + File Watcher (watchdog) + │ + ▼ + Parse & Validate + ├── Extract YAML frontmatter (title, tags, date, aliases) + ├── Extract WikiLinks [[target]] + └── Convert Markdown → plain text + │ + ▼ + Content Hash (SHA-256) + └── Skip if unchanged (idempotent) + │ + ▼ + Chunker (500-800 tokens, 10% overlap) + │ + ▼ + Embedding Generation (Ollama nomic-embed-text) + │ + ▼ + Store in PostgreSQL + ├── documents table (metadata + full text) + ├── chunks table (chunk text + embedding vector) + ├── entities table (extracted NER if enabled) + └── relations table (WikiLink graph edges) +``` + +### Retrieval (RAG) Flow +``` +User Query + │ + ▼ +Query Embedding (Ollama) + │ + ▼ +Hybrid Search +├── Vector similarity (pgvector cosine distance) +└── Full-text search (PostgreSQL tsvector) + │ + ▼ +Reranker (optional cross-encoder) + │ + ▼ +Context Assembly (top-k chunks + metadata) + │ + ▼ +LLM Generation (Ollama) + │ + ▼ +Response + Citations +``` + +--- + +## Database Schema + +### Tables + +#### `documents` +```sql +CREATE TABLE documents ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + path TEXT NOT NULL UNIQUE, -- relative path in vault + title TEXT, + content TEXT NOT NULL, -- full markdown source + content_hash TEXT NOT NULL, -- SHA-256 for change detection + frontmatter JSONB DEFAULT '{}', -- parsed YAML frontmatter + tags TEXT[] DEFAULT '{}', + aliases TEXT[] DEFAULT '{}', + word_count INTEGER, + created_at TIMESTAMPTZ DEFAULT now(), + updated_at TIMESTAMPTZ DEFAULT now(), + indexed_at TIMESTAMPTZ, + fts_vector TSVECTOR -- full-text search index +); +CREATE INDEX idx_documents_path ON documents(path); +CREATE INDEX idx_documents_tags ON documents USING GIN(tags); +CREATE INDEX idx_documents_fts ON documents USING GIN(fts_vector); +``` + +#### `chunks` +```sql +CREATE TABLE chunks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + chunk_index INTEGER NOT NULL, + content TEXT NOT NULL, + token_count INTEGER, + embedding VECTOR(768), -- nomic-embed-text dimension + metadata JSONB DEFAULT '{}', + created_at TIMESTAMPTZ DEFAULT now() +); +CREATE INDEX idx_chunks_document_id ON chunks(document_id); +CREATE INDEX idx_chunks_embedding ON chunks USING ivfflat (embedding vector_cosine_ops) + WITH (lists = 100); +``` + +#### `entities` +```sql +CREATE TABLE entities ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + name TEXT NOT NULL, + entity_type TEXT NOT NULL, -- PERSON, ORG, CONCEPT, etc. + context TEXT, + created_at TIMESTAMPTZ DEFAULT now() +); +CREATE INDEX idx_entities_document_id ON entities(document_id); +CREATE INDEX idx_entities_name ON entities(name); +CREATE INDEX idx_entities_type ON entities(entity_type); +``` + +#### `relations` +```sql +CREATE TABLE relations ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source_doc_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + target_path TEXT NOT NULL, -- may not exist yet (forward links) + target_doc_id UUID REFERENCES documents(id) ON DELETE SET NULL, + relation_type TEXT DEFAULT 'wikilink', -- wikilink, tag, explicit + context TEXT, -- surrounding text + created_at TIMESTAMPTZ DEFAULT now() +); +CREATE INDEX idx_relations_source ON relations(source_doc_id); +CREATE INDEX idx_relations_target ON relations(target_doc_id); +CREATE INDEX idx_relations_target_path ON relations(target_path); +``` + +#### `agent_jobs` +```sql +CREATE TABLE agent_jobs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + agent_type TEXT NOT NULL, -- ingestion, linking, tagging, etc. + status TEXT DEFAULT 'pending', -- pending, running, done, failed + payload JSONB DEFAULT '{}', + result JSONB, + error TEXT, + created_at TIMESTAMPTZ DEFAULT now(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + retry_count INTEGER DEFAULT 0 +); +CREATE INDEX idx_agent_jobs_status ON agent_jobs(status); +CREATE INDEX idx_agent_jobs_type ON agent_jobs(agent_type); +``` + +#### `agent_logs` +```sql +CREATE TABLE agent_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_id UUID REFERENCES agent_jobs(id) ON DELETE SET NULL, + agent_type TEXT NOT NULL, + level TEXT DEFAULT 'info', + message TEXT NOT NULL, + metadata JSONB DEFAULT '{}', + created_at TIMESTAMPTZ DEFAULT now() +); +CREATE INDEX idx_agent_logs_job_id ON agent_logs(job_id); +CREATE INDEX idx_agent_logs_created ON agent_logs(created_at DESC); +``` + +--- + +## API Design + +### Base URL: `http://localhost:8000/api/v1` + +| Method | Endpoint | Description | +|--------|-----------------------|------------------------------------------| +| POST | `/search` | Hybrid vector + full-text search | +| POST | `/chat` | RAG chat with streaming response | +| GET | `/document/{id}` | Get document by ID | +| GET | `/document/path` | Get document by vault path | +| POST | `/index` | Manually trigger index of a file | +| POST | `/reindex` | Full vault reindex | +| GET | `/related/{id}` | Get related documents by embedding sim | +| GET | `/tags` | List all tags with counts | +| GET | `/graph` | WikiLink graph (nodes + edges) | +| GET | `/health` | Health check | +| GET | `/stats` | System statistics | + +### Request/Response Shapes + +#### POST `/search` +```json +// Request +{ + "query": "machine learning concepts", + "limit": 10, + "threshold": 0.7, + "tags": ["ml", "ai"], + "hybrid": true +} + +// Response +{ + "results": [ + { + "document_id": "uuid", + "chunk_id": "uuid", + "title": "Introduction to ML", + "path": "notes/ml-intro.md", + "content": "chunk text...", + "score": 0.92, + "tags": ["ml", "ai"], + "highlight": "...matched text..." + } + ], + "total": 42, + "query_time_ms": 23 +} +``` + +#### POST `/chat` +```json +// Request (SSE stream) +{ + "message": "What do I know about transformers?", + "conversation_id": "optional-uuid", + "context_limit": 5 +} + +// Response (Server-Sent Events) +data: {"token": "Transformers", "type": "token"} +data: {"token": " are", "type": "token"} +data: {"sources": [...], "type": "sources"} +data: {"type": "done"} +``` + +--- + +## Agent Architecture + +All agents inherit from a common `BaseAgent` class: + +``` +BaseAgent +├── IngestionAgent — watches vault, triggers indexing +├── LinkingAgent — discovers and creates knowledge links +├── TaggingAgent — auto-tags documents using LLM +├── SummarizationAgent — generates/updates document summaries +└── MaintenanceAgent — detects orphans, broken links, stale content +``` + +### Agent Lifecycle +1. Agent starts, reads config from environment +2. Polls `agent_jobs` table (or subscribes to PostgreSQL NOTIFY) +3. Claims job atomically (`UPDATE ... WHERE status='pending' RETURNING *`) +4. Executes job with retry logic (exponential backoff, max 3 retries) +5. Writes result / error back to `agent_jobs` +6. Logs to `agent_logs` + +### Scheduling +- **IngestionAgent**: event-driven (file watcher) + fallback poll every 30s +- **LinkingAgent**: runs after every ingestion batch +- **TaggingAgent**: runs on new/modified documents without tags +- **SummarizationAgent**: runs on documents >1000 words without summary +- **MaintenanceAgent**: scheduled daily at midnight + +--- + +## Ingestion Pipeline + +``` +services/ingestion-worker/ +├── watcher.py — watchdog file system monitor +├── parser.py — frontmatter + markdown + wikilink parser +├── chunker.py — token-aware sliding window chunker +├── embedder.py — Ollama / sentence-transformers embeddings +├── indexer.py — PostgreSQL upsert logic +└── pipeline.py — orchestrates the full ingestion flow +``` + +### Chunking Strategy +- **Method**: Sliding window, 500–800 tokens, 10% overlap +- **Splitter**: Prefer semantic boundaries (paragraphs, headings) over hard token cuts +- **Metadata preserved**: document_id, chunk_index, source heading path + +### Embedding Strategy +- **Primary**: Ollama `nomic-embed-text` (768-dim, fully offline) +- **Fallback**: `sentence-transformers/all-MiniLM-L6-v2` (384-dim, local model) +- **Batching**: 32 chunks per embedding request for efficiency + +--- + +## Infrastructure + +### Docker Services + +| Service | Image | Port | Description | +|--------------------|------------------------------|-------|----------------------------------| +| `postgres` | pgvector/pgvector:pg16 | 5432 | PostgreSQL + pgvector | +| `ollama` | ollama/ollama:latest | 11434 | Local LLM inference | +| `rag-api` | local/rag-api | 8000 | FastAPI retrieval service | +| `ingestion-worker` | local/ingestion-worker | — | Vault watcher + indexer | +| `agents` | local/agents | — | Background AI agents | +| `web-ui` | local/web-ui | 3000 | Next.js frontend | +| `redis` | redis:7-alpine | 6379 | Job queue + caching | + +### Volume Mounts +- `./vault:/vault:rw` — shared across all services needing vault access +- `postgres_data:/var/lib/postgresql/data` — persistent database +- `ollama_data:/root/.ollama` — pulled LLM models + +### Network +- Internal Docker network `second-brain-net` +- External ports: `3000` (UI), `8000` (API), `11434` (Ollama) + +--- + +## Design Principles + +1. **Vault is source of truth** — database is always a derived index, fully rebuildable +2. **Offline-first** — zero external API calls required; all AI runs locally via Ollama +3. **Idempotent ingestion** — SHA-256 hashing ensures files are not re-indexed unless changed +4. **No vendor lock-in** — all components are open source and self-hosted +5. **Modular** — each service can be replaced independently (swap Ollama for another runtime) +6. **Graceful degradation** — system works without agents running; agents enhance, not gate +7. **Markdown compatibility** — vault works as a standalone Obsidian/Logseq vault at all times diff --git a/docs/setup.md b/docs/setup.md new file mode 100644 index 0000000..852d105 --- /dev/null +++ b/docs/setup.md @@ -0,0 +1,167 @@ +# Setup Guide + +## Prerequisites + +- Docker & Docker Compose v2.20+ +- 16 GB RAM recommended (for local LLMs) +- GPU optional (CPU inference works but is slower) +- A Markdown vault (Obsidian/Logseq compatible directory) + +--- + +## Quick Start + +### 1. Clone and configure + +```bash +git clone second-brain +cd second-brain +cp .env.example .env +# Edit .env — at minimum, set POSTGRES_PASSWORD +``` + +### 2. Place your vault + +Copy your Markdown notes into `./vault/`, or mount your existing Obsidian/Logseq vault: + +```bash +# Option A: copy files +cp -r ~/obsidian-vault/* ./vault/ + +# Option B: symlink (Linux/macOS) +ln -s ~/obsidian-vault ./vault +``` + +The vault directory structure is preserved — subfolders become part of the document path. + +### 3. Start services + +```bash +docker compose up -d +``` + +This starts: +- PostgreSQL with pgvector (port 5432) +- Redis (port 6379) +- Ollama (port 11434) +- RAG API (port 8000) +- Ingestion Worker (background) +- AI Agents (background) +- Web UI (port 3000) + +### 4. Wait for model download + +Ollama pulls the embedding and chat models on first boot. This may take several minutes. + +```bash +# Watch the bootstrap container logs +docker compose logs -f ollama-bootstrap +``` + +### 5. Check the UI + +Open **http://localhost:3000** in your browser. + +--- + +## Service Ports + +| Service | Port | URL | +|-------------|-------|------------------------------| +| Web UI | 3000 | http://localhost:3000 | +| RAG API | 8000 | http://localhost:8000 | +| API Docs | 8000 | http://localhost:8000/docs | +| Ollama | 11434 | http://localhost:11434 | +| PostgreSQL | 5432 | localhost:5432 | + +--- + +## Configuration + +All configuration is in `.env`. Key settings: + +| Variable | Default | Description | +|-------------------|--------------------|------------------------------------| +| `CHAT_MODEL` | `mistral` | Ollama model for chat | +| `EMBEDDING_MODEL` | `nomic-embed-text` | Ollama model for embeddings | +| `CHUNK_SIZE` | `700` | Target tokens per chunk | +| `SEARCH_THRESHOLD`| `0.65` | Minimum similarity score (0–1) | +| `AUTO_TAG` | `true` | Enable LLM-based auto-tagging | +| `AUTO_SUMMARIZE` | `true` | Enable LLM-based auto-summarization| + +--- + +## Switching LLM Models + +The system is model-agnostic. To use a different model: + +```bash +# Pull the model +docker compose exec ollama ollama pull llama3 + +# Update .env +CHAT_MODEL=llama3 + +# Restart the affected services +docker compose restart rag-api agents +``` + +Popular model choices: +- `mistral` — fast, good quality (7B) +- `llama3` — excellent quality (8B/70B) +- `phi3` — lightweight, efficient (3.8B) +- `qwen2` — strong multilingual support + +--- + +## Re-indexing the Vault + +The ingestion worker automatically re-indexes changed files. To force a full re-index: + +```bash +curl -X POST http://localhost:8000/api/v1/index/reindex \ + -H "Content-Type: application/json" \ + -d '{"force": true}' +``` + +--- + +## Backup + +```bash +# Backup database +docker compose exec postgres pg_dump -U brain second_brain > backup.sql + +# Restore +docker compose exec -T postgres psql -U brain second_brain < backup.sql +``` + +The vault itself is just files — back it up with any file backup tool. + +--- + +## Stopping / Resetting + +```bash +# Stop all services (preserve data) +docker compose down + +# Full reset (DELETE all data!) +docker compose down -v +``` + +--- + +## Obsidian Compatibility + +The vault is fully compatible with Obsidian. You can: +- Open `./vault/` directly in Obsidian +- Use all Obsidian features (graph view, backlinks, templates, etc.) +- The system reads `[[WikiLinks]]`, `#tags`, and YAML frontmatter + +## Logseq Compatibility + +Point Logseq's graph folder to `./vault/`. The system handles: +- `[[Page references]]` +- `#tags` in journal and pages +- YAML frontmatter (or Logseq's `::` properties are stored as-is) diff --git a/infra/database/migrate.sh b/infra/database/migrate.sh new file mode 100755 index 0000000..3224b02 --- /dev/null +++ b/infra/database/migrate.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Applies database migrations in order. +# Usage: ./migrate.sh [up|down] + +set -euo pipefail + +DB_URL="${DATABASE_URL:-postgresql://brain:brain@localhost:5432/second_brain}" +MIGRATIONS_DIR="$(dirname "$0")/migrations" + +ACTION="${1:-up}" + +if [ "$ACTION" = "up" ]; then + echo "Applying schema..." + psql "$DB_URL" -f "$(dirname "$0")/schema.sql" + echo "Schema applied." +elif [ "$ACTION" = "down" ]; then + echo "Dropping schema..." + psql "$DB_URL" -c "DROP SCHEMA public CASCADE; CREATE SCHEMA public;" + echo "Schema dropped." +fi diff --git a/infra/database/schema.sql b/infra/database/schema.sql new file mode 100644 index 0000000..e2ac25e --- /dev/null +++ b/infra/database/schema.sql @@ -0,0 +1,195 @@ +-- AI Second Brain — PostgreSQL Schema +-- Requires: PostgreSQL 14+ with pgvector extension + +-- Enable extensions +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +CREATE EXTENSION IF NOT EXISTS vector; +CREATE EXTENSION IF NOT EXISTS pg_trgm; -- for fuzzy text search + +-- --------------------------------------------------------------------------- +-- DOCUMENTS +-- Represents a single Markdown file in the vault. +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS documents ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + path TEXT NOT NULL UNIQUE, -- relative path within vault + title TEXT, + content TEXT NOT NULL, -- full raw markdown + content_hash TEXT NOT NULL, -- SHA-256 for change detection + frontmatter JSONB NOT NULL DEFAULT '{}', + tags TEXT[] NOT NULL DEFAULT '{}', + aliases TEXT[] NOT NULL DEFAULT '{}', + word_count INTEGER, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + indexed_at TIMESTAMPTZ, + fts_vector TSVECTOR -- auto-maintained below +); + +CREATE INDEX IF NOT EXISTS idx_documents_path ON documents (path); +CREATE INDEX IF NOT EXISTS idx_documents_tags ON documents USING GIN (tags); +CREATE INDEX IF NOT EXISTS idx_documents_aliases ON documents USING GIN (aliases); +CREATE INDEX IF NOT EXISTS idx_documents_fts ON documents USING GIN (fts_vector); +CREATE INDEX IF NOT EXISTS idx_documents_frontmatter ON documents USING GIN (frontmatter); +CREATE INDEX IF NOT EXISTS idx_documents_updated ON documents (updated_at DESC); + +-- Auto-update fts_vector on insert/update +CREATE OR REPLACE FUNCTION documents_fts_trigger() +RETURNS TRIGGER AS $$ +BEGIN + NEW.fts_vector := + setweight(to_tsvector('english', coalesce(NEW.title, '')), 'A') || + setweight(to_tsvector('english', coalesce(array_to_string(NEW.tags, ' '), '')), 'B') || + setweight(to_tsvector('english', coalesce(NEW.content, '')), 'C'); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trig_documents_fts ON documents; +CREATE TRIGGER trig_documents_fts + BEFORE INSERT OR UPDATE ON documents + FOR EACH ROW EXECUTE FUNCTION documents_fts_trigger(); + +-- Auto-update updated_at timestamp +CREATE OR REPLACE FUNCTION set_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = now(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trig_documents_updated_at ON documents; +CREATE TRIGGER trig_documents_updated_at + BEFORE UPDATE ON documents + FOR EACH ROW EXECUTE FUNCTION set_updated_at(); + +-- --------------------------------------------------------------------------- +-- CHUNKS +-- Sliding-window text chunks from documents, each with an embedding vector. +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS chunks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + document_id UUID NOT NULL REFERENCES documents (id) ON DELETE CASCADE, + chunk_index INTEGER NOT NULL, + content TEXT NOT NULL, + token_count INTEGER, + embedding VECTOR(768), -- nomic-embed-text dimension + metadata JSONB NOT NULL DEFAULT '{}',-- heading path, page, etc. + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (document_id, chunk_index) +); + +CREATE INDEX IF NOT EXISTS idx_chunks_document_id ON chunks (document_id); + +-- HNSW index — fast approximate nearest-neighbour search +-- Requires pgvector >= 0.5.0. Falls back to IVFFlat if unavailable. +CREATE INDEX IF NOT EXISTS idx_chunks_embedding_hnsw + ON chunks USING hnsw (embedding vector_cosine_ops) + WITH (m = 16, ef_construction = 64); + +-- --------------------------------------------------------------------------- +-- ENTITIES +-- Named entities extracted from documents (optional NER layer). +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS entities ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + document_id UUID NOT NULL REFERENCES documents (id) ON DELETE CASCADE, + name TEXT NOT NULL, + entity_type TEXT NOT NULL, -- PERSON, ORG, CONCEPT, PLACE, etc. + context TEXT, -- surrounding sentence + confidence FLOAT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_entities_document_id ON entities (document_id); +CREATE INDEX IF NOT EXISTS idx_entities_name ON entities (name); +CREATE INDEX IF NOT EXISTS idx_entities_type ON entities (entity_type); +CREATE INDEX IF NOT EXISTS idx_entities_name_trgm ON entities USING GIN (name gin_trgm_ops); + +-- --------------------------------------------------------------------------- +-- RELATIONS +-- WikiLink / explicit relations between documents. +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS relations ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source_doc_id UUID NOT NULL REFERENCES documents (id) ON DELETE CASCADE, + target_path TEXT NOT NULL, -- raw link target (may be unresolved) + target_doc_id UUID REFERENCES documents (id) ON DELETE SET NULL, + relation_type TEXT NOT NULL DEFAULT 'wikilink', -- wikilink | tag | explicit | ai-inferred + label TEXT, -- optional human label for the edge + context TEXT, -- surrounding text of the link + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_relations_source ON relations (source_doc_id); +CREATE INDEX IF NOT EXISTS idx_relations_target_id ON relations (target_doc_id); +CREATE INDEX IF NOT EXISTS idx_relations_target_path ON relations (target_path); +CREATE INDEX IF NOT EXISTS idx_relations_type ON relations (relation_type); + +-- --------------------------------------------------------------------------- +-- AGENT JOBS +-- Persistent job queue consumed by AI agents. +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS agent_jobs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + agent_type TEXT NOT NULL, -- ingestion | linking | tagging | summarization | maintenance + status TEXT NOT NULL DEFAULT 'pending', -- pending | running | done | failed | cancelled + priority INTEGER NOT NULL DEFAULT 5, -- 1 (highest) .. 10 (lowest) + payload JSONB NOT NULL DEFAULT '{}', + result JSONB, + error TEXT, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 3, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + scheduled_for TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_agent_jobs_status ON agent_jobs (status); +CREATE INDEX IF NOT EXISTS idx_agent_jobs_type ON agent_jobs (agent_type); +CREATE INDEX IF NOT EXISTS idx_agent_jobs_scheduled ON agent_jobs (scheduled_for ASC) + WHERE status = 'pending'; + +-- --------------------------------------------------------------------------- +-- AGENT LOGS +-- Structured log entries written by agents. +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS agent_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_id UUID REFERENCES agent_jobs (id) ON DELETE SET NULL, + agent_type TEXT NOT NULL, + level TEXT NOT NULL DEFAULT 'info', -- debug | info | warning | error + message TEXT NOT NULL, + metadata JSONB NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_agent_logs_job_id ON agent_logs (job_id); +CREATE INDEX IF NOT EXISTS idx_agent_logs_created ON agent_logs (created_at DESC); +CREATE INDEX IF NOT EXISTS idx_agent_logs_level ON agent_logs (level); + +-- --------------------------------------------------------------------------- +-- SYSTEM CONFIG +-- Runtime key-value configuration, editable by agents and admins. +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS system_config ( + key TEXT PRIMARY KEY, + value JSONB NOT NULL, + description TEXT, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Seed default configuration +INSERT INTO system_config (key, value, description) VALUES + ('embedding_model', '"nomic-embed-text"', 'Ollama model for embeddings'), + ('chat_model', '"mistral"', 'Ollama model for chat/generation'), + ('chunk_size', '700', 'Target tokens per chunk'), + ('chunk_overlap', '70', 'Overlap tokens between chunks'), + ('search_top_k', '10', 'Default number of search results'), + ('search_threshold', '0.65', 'Minimum cosine similarity score'), + ('rerank_enabled', 'false', 'Enable cross-encoder reranking'), + ('auto_tag', 'true', 'Auto-tag documents via LLM'), + ('auto_summarize', 'true', 'Auto-summarize long documents') +ON CONFLICT (key) DO NOTHING; diff --git a/scripts/health.sh b/scripts/health.sh new file mode 100755 index 0000000..321bc5d --- /dev/null +++ b/scripts/health.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# scripts/health.sh — Check health of all services. + +set -euo pipefail + +API_URL="${API_URL:-http://localhost:8000}" +OLLAMA_URL="${OLLAMA_URL:-http://localhost:11434}" + +check() { + local name="$1" + local url="$2" + if curl -sf "$url" > /dev/null 2>&1; then + echo " ✓ $name" + else + echo " ✗ $name (unreachable: $url)" + fi +} + +echo "🩺 Second Brain — Health Check" +echo "" +check "RAG API" "$API_URL/api/v1/health" +check "Ollama" "$OLLAMA_URL/api/tags" + +echo "" +echo "Detailed API health:" +curl -sf "$API_URL/api/v1/health" | python3 -m json.tool 2>/dev/null || echo "(API unavailable)" +echo "" +echo "Stats:" +curl -sf "$API_URL/api/v1/stats" | python3 -m json.tool 2>/dev/null || echo "(API unavailable)" diff --git a/scripts/reindex.sh b/scripts/reindex.sh new file mode 100755 index 0000000..25dca36 --- /dev/null +++ b/scripts/reindex.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# scripts/reindex.sh — Trigger a full vault reindex via the API. + +set -euo pipefail + +API_URL="${API_URL:-http://localhost:8000}" +FORCE="${1:-false}" + +echo "🔄 Triggering vault reindex (force=$FORCE)..." + +RESPONSE=$(curl -sf -X POST "$API_URL/api/v1/index/reindex" \ + -H "Content-Type: application/json" \ + -d "{\"force\": $FORCE}") + +echo "$RESPONSE" | python3 -m json.tool 2>/dev/null || echo "$RESPONSE" +echo "✓ Reindex job queued." diff --git a/scripts/start.sh b/scripts/start.sh new file mode 100755 index 0000000..9dc60ec --- /dev/null +++ b/scripts/start.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# scripts/start.sh — Bootstrap and start the Second Brain stack. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +ROOT="$SCRIPT_DIR/.." + +cd "$ROOT" + +echo "🧠 AI Second Brain — startup" + +# Ensure .env exists +if [ ! -f .env ]; then + echo " → Creating .env from .env.example" + cp .env.example .env + echo " ⚠️ Edit .env before production use (set POSTGRES_PASSWORD etc.)" +fi + +# Ensure vault directory exists +mkdir -p vault + +echo " → Starting Docker services..." +docker compose up -d --build + +echo " → Waiting for services to be healthy..." +sleep 5 + +# Poll health endpoint +MAX_ATTEMPTS=30 +ATTEMPT=0 +until curl -sf http://localhost:8000/api/v1/health > /dev/null 2>&1; do + ATTEMPT=$((ATTEMPT + 1)) + if [ "$ATTEMPT" -ge "$MAX_ATTEMPTS" ]; then + echo " ✗ API did not become healthy after ${MAX_ATTEMPTS} attempts." + echo " Check logs with: docker compose logs rag-api" + exit 1 + fi + echo " ... waiting for API (${ATTEMPT}/${MAX_ATTEMPTS})" + sleep 5 +done + +echo "" +echo " ✓ Second Brain is running!" +echo "" +echo " 🌐 Web UI: http://localhost:$(grep UI_PORT .env | cut -d= -f2 || echo 3000)" +echo " 🔌 RAG API: http://localhost:$(grep API_PORT .env | cut -d= -f2 || echo 8000)" +echo " 📖 API Docs: http://localhost:$(grep API_PORT .env | cut -d= -f2 || echo 8000)/docs" +echo " 🤖 Ollama: http://localhost:$(grep OLLAMA_PORT .env | cut -d= -f2 || echo 11434)" +echo "" +echo " Run 'docker compose logs -f' to follow logs." diff --git a/services/agents/Dockerfile b/services/agents/Dockerfile new file mode 100644 index 0000000..cda3550 --- /dev/null +++ b/services/agents/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Install ingestion worker deps first (agents depend on ingestion modules) +COPY ../ingestion-worker/requirements.txt /tmp/ingestion-requirements.txt +RUN pip install --no-cache-dir -r /tmp/ingestion-requirements.txt + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy ingestion worker source (agents reuse parser, chunker, embedder, indexer, pipeline) +COPY ../ingestion-worker /app/ingestion-worker + +COPY . . + +ENV PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app:/app/ingestion-worker + +CMD ["python", "main.py"] diff --git a/services/agents/base_agent.py b/services/agents/base_agent.py new file mode 100644 index 0000000..ed7720d --- /dev/null +++ b/services/agents/base_agent.py @@ -0,0 +1,190 @@ +""" +base_agent.py — Abstract base class for all AI agents. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time +import traceback +from abc import ABC, abstractmethod +from typing import Any, Optional + +import asyncpg + +logger = logging.getLogger(__name__) + + +class BaseAgent(ABC): + """ + All agents inherit from this class. + + Responsibilities: + - Poll agent_jobs table for work + - Claim jobs atomically + - Execute with exponential-backoff retries + - Log results / errors to agent_logs + """ + + agent_type: str # Must be set by subclass + + def __init__(self, pool: asyncpg.Pool, settings: Any) -> None: + self.pool = pool + self.settings = settings + self._log = logging.getLogger(f'agent.{self.agent_type}') + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + async def run_forever(self, poll_interval: int = 10) -> None: + """Poll for jobs indefinitely.""" + self._log.info('Agent started (poll_interval=%ds)', poll_interval) + while True: + try: + job = await self._claim_job() + if job: + await self._execute(job) + else: + await asyncio.sleep(poll_interval) + except asyncio.CancelledError: + self._log.info('Agent shutting down') + return + except Exception as exc: + self._log.error('Unexpected error in agent loop: %s', exc, exc_info=True) + await asyncio.sleep(poll_interval) + + async def enqueue(self, payload: dict, priority: int = 5, delay_seconds: int = 0) -> str: + """Create a new job for this agent.""" + import uuid + from datetime import datetime, timezone, timedelta + job_id = str(uuid.uuid4()) + scheduled = datetime.now(timezone.utc) + if delay_seconds: + scheduled += timedelta(seconds=delay_seconds) + + async with self.pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO agent_jobs (id, agent_type, priority, payload, scheduled_for) + VALUES ($1::uuid, $2, $3, $4::jsonb, $5) + """, + job_id, self.agent_type, priority, json.dumps(payload), scheduled, + ) + return job_id + + # ------------------------------------------------------------------ + # Abstract + # ------------------------------------------------------------------ + + @abstractmethod + async def process(self, job_id: str, payload: dict) -> dict: + """Process a single job. Return result dict.""" + ... + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + async def _claim_job(self) -> Optional[asyncpg.Record]: + """Atomically claim the next pending job for this agent type.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow( + """ + UPDATE agent_jobs + SET status = 'running', started_at = now() + WHERE id = ( + SELECT id FROM agent_jobs + WHERE agent_type = $1 + AND status = 'pending' + AND scheduled_for <= now() + AND retry_count < max_retries + ORDER BY priority ASC, scheduled_for ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING * + """, + self.agent_type, + ) + return row + + async def _execute(self, job: asyncpg.Record) -> None: + job_id = str(job['id']) + payload = dict(job['payload'] or {}) + self._log.info('Processing job %s', job_id) + start = time.monotonic() + + try: + result = await self.process(job_id, payload) + elapsed = time.monotonic() - start + async with self.pool.acquire() as conn: + await conn.execute( + """ + UPDATE agent_jobs + SET status = 'done', result = $2::jsonb, completed_at = now() + WHERE id = $1::uuid + """, + job_id, json.dumps(result or {}), + ) + await self._log_event(job_id, 'info', f'Job done in {elapsed:.2f}s', result or {}) + + except Exception as exc: + elapsed = time.monotonic() - start + err_msg = str(exc) + self._log.error('Job %s failed: %s', job_id, err_msg, exc_info=True) + + async with self.pool.acquire() as conn: + row = await conn.fetchrow( + 'SELECT retry_count, max_retries FROM agent_jobs WHERE id = $1::uuid', job_id + ) + retries = (row['retry_count'] or 0) + 1 + max_retries = row['max_retries'] or 3 + + if retries < max_retries: + # Re-queue with exponential backoff + backoff = 2 ** retries + await conn.execute( + """ + UPDATE agent_jobs + SET status = 'pending', + retry_count = $2, + error = $3, + scheduled_for = now() + ($4 || ' seconds')::interval + WHERE id = $1::uuid + """, + job_id, retries, err_msg, str(backoff), + ) + await self._log_event(job_id, 'warning', + f'Retry {retries}/{max_retries} in {backoff}s', {}) + else: + await conn.execute( + """ + UPDATE agent_jobs + SET status = 'failed', error = $2, completed_at = now() + WHERE id = $1::uuid + """, + job_id, err_msg, + ) + await self._log_event(job_id, 'error', f'Job permanently failed: {err_msg}', {}) + + async def _log_event( + self, + job_id: Optional[str], + level: str, + message: str, + metadata: dict, + ) -> None: + try: + async with self.pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO agent_logs (job_id, agent_type, level, message, metadata) + VALUES ($1::uuid, $2, $3, $4, $5::jsonb) + """, + job_id, self.agent_type, level, message, json.dumps(metadata), + ) + except Exception as log_exc: + self._log.warning('Failed to write agent log: %s', log_exc) diff --git a/services/agents/ingestion/__init__.py b/services/agents/ingestion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/agents/ingestion/agent.py b/services/agents/ingestion/agent.py new file mode 100644 index 0000000..ef46e73 --- /dev/null +++ b/services/agents/ingestion/agent.py @@ -0,0 +1,46 @@ +""" +ingestion/agent.py — Ingestion Agent: indexes new/changed files from the vault. +""" + +from __future__ import annotations + +import asyncio +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'ingestion-worker')) + +import asyncpg + +from base_agent import BaseAgent +from pipeline import ingest_file +from settings import Settings as IngestionSettings + + +class IngestionAgent(BaseAgent): + agent_type = 'ingestion' + + async def process(self, job_id: str, payload: dict) -> dict: + settings = IngestionSettings() + vault_root = Path(settings.vault_path) + + if payload.get('reindex_all'): + md_files = list(vault_root.rglob('*.md')) + indexed = 0 + skipped = 0 + for fp in md_files: + async with self.pool.acquire() as conn: + result = await ingest_file(fp, settings, conn) + if result: + indexed += 1 + else: + skipped += 1 + return {'indexed': indexed, 'skipped': skipped, 'total': len(md_files)} + + elif payload.get('path'): + file_path = vault_root / payload['path'] + async with self.pool.acquire() as conn: + result = await ingest_file(file_path, settings, conn) + return {'indexed': 1 if result else 0, 'path': payload['path']} + + return {'message': 'No action specified'} diff --git a/services/agents/linking/__init__.py b/services/agents/linking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/agents/linking/agent.py b/services/agents/linking/agent.py new file mode 100644 index 0000000..ad055d7 --- /dev/null +++ b/services/agents/linking/agent.py @@ -0,0 +1,83 @@ +""" +linking/agent.py — Knowledge Linking Agent: infers and creates AI-powered document links. +""" + +from __future__ import annotations + +import json +import logging + +import asyncpg +import httpx + +from base_agent import BaseAgent + +logger = logging.getLogger('agent.linking') + + +class LinkingAgent(BaseAgent): + agent_type = 'linking' + + async def process(self, job_id: str, payload: dict) -> dict: + """ + For each document without AI-inferred links: + 1. Find top-5 semantically similar documents (vector search). + 2. Insert 'ai-inferred' relations. + """ + async with self.pool.acquire() as conn: + # Documents that have chunks but no ai-inferred relations + docs = await conn.fetch( + """ + SELECT DISTINCT d.id::text, d.title, d.path + FROM documents d + JOIN chunks c ON c.document_id = d.id + WHERE NOT EXISTS ( + SELECT 1 FROM relations r + WHERE r.source_doc_id = d.id AND r.relation_type = 'ai-inferred' + ) + LIMIT 50 + """ + ) + + linked = 0 + for doc in docs: + doc_id = doc['id'] + + # Find similar docs via average chunk embedding + similar = await conn.fetch( + """ + WITH doc_avg AS ( + SELECT AVG(embedding) AS avg_emb + FROM chunks WHERE document_id = $1::uuid + ) + SELECT d2.id::text AS target_id, d2.path AS target_path, + 1 - (AVG(c2.embedding) <=> (SELECT avg_emb FROM doc_avg)) AS score + FROM chunks c2 + JOIN documents d2 ON d2.id = c2.document_id + WHERE c2.document_id != $1::uuid + GROUP BY d2.id, d2.path + HAVING 1 - (AVG(c2.embedding) <=> (SELECT avg_emb FROM doc_avg)) > 0.75 + ORDER BY score DESC + LIMIT 5 + """, + doc_id, + ) + + if not similar: + continue + + records = [ + (doc_id, row['target_path'], row['target_id'], 'ai-inferred') + for row in similar + ] + await conn.executemany( + """ + INSERT INTO relations (source_doc_id, target_path, target_doc_id, relation_type) + VALUES ($1::uuid, $2, $3::uuid, $4) + ON CONFLICT DO NOTHING + """, + records, + ) + linked += len(similar) + + return {'documents_processed': len(docs), 'links_created': linked} diff --git a/services/agents/main.py b/services/agents/main.py new file mode 100644 index 0000000..e81be3d --- /dev/null +++ b/services/agents/main.py @@ -0,0 +1,92 @@ +""" +main.py — Agent worker entry point. Runs all agents concurrently. +""" + +from __future__ import annotations + +import asyncio +import logging +import sys +from pathlib import Path + +import asyncpg +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class AgentSettings(BaseSettings): + model_config = SettingsConfigDict(env_file='.env', extra='ignore') + database_url: str = 'postgresql://brain:brain@postgres:5432/second_brain' + ollama_url: str = 'http://ollama:11434' + chat_model: str = 'mistral' + log_level: str = 'INFO' + ingestion_poll: int = 15 + linking_poll: int = 30 + tagging_poll: int = 60 + summarization_poll: int = 120 + maintenance_poll: int = 3600 + + +def setup_logging(level: str) -> None: + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + datefmt='%Y-%m-%dT%H:%M:%S', + stream=sys.stdout, + ) + + +async def main() -> None: + settings = AgentSettings() + setup_logging(settings.log_level) + logger = logging.getLogger('agents') + logger.info('Starting agent workers...') + + # Add parent dirs to path for cross-service imports + sys.path.insert(0, str(Path(__file__).parent)) + sys.path.insert(0, str(Path(__file__).parent.parent / 'ingestion-worker')) + + pool = await asyncpg.create_pool(settings.database_url, min_size=2, max_size=10) + + # Import agents after path setup + from ingestion.agent import IngestionAgent + from linking.agent import LinkingAgent + from tagging.agent import TaggingAgent + from summarization.agent import SummarizationAgent + from maintenance.agent import MaintenanceAgent + + agents_tasks = [ + asyncio.create_task( + IngestionAgent(pool, settings).run_forever(settings.ingestion_poll) + ), + asyncio.create_task( + LinkingAgent(pool, settings).run_forever(settings.linking_poll) + ), + asyncio.create_task( + TaggingAgent(pool, settings).run_forever(settings.tagging_poll) + ), + asyncio.create_task( + SummarizationAgent(pool, settings).run_forever(settings.summarization_poll) + ), + asyncio.create_task( + MaintenanceAgent(pool, settings).run_forever(settings.maintenance_poll) + ), + ] + + logger.info('All agents running.') + + try: + await asyncio.gather(*agents_tasks) + except asyncio.CancelledError: + pass + finally: + for task in agents_tasks: + task.cancel() + await pool.close() + logger.info('Agent workers stopped.') + + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass diff --git a/services/agents/maintenance/__init__.py b/services/agents/maintenance/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/agents/maintenance/agent.py b/services/agents/maintenance/agent.py new file mode 100644 index 0000000..3e07f17 --- /dev/null +++ b/services/agents/maintenance/agent.py @@ -0,0 +1,78 @@ +""" +maintenance/agent.py — Maintenance Agent: detects broken links, orphaned documents, stale content. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone, timedelta + +from base_agent import BaseAgent + +logger = logging.getLogger('agent.maintenance') + + +class MaintenanceAgent(BaseAgent): + agent_type = 'maintenance' + + async def process(self, job_id: str, payload: dict) -> dict: + report = {} + + async with self.pool.acquire() as conn: + # 1. Broken WikiLinks (target_doc_id is NULL but target_path exists) + broken_links = await conn.fetchval( + """ + SELECT COUNT(*) FROM relations + WHERE relation_type = 'wikilink' AND target_doc_id IS NULL + """ + ) + report['broken_wikilinks'] = broken_links + + # 2. Orphaned documents (no incoming links and no outgoing links) + orphans = await conn.fetch( + """ + SELECT d.id::text, d.title, d.path + FROM documents d + WHERE NOT EXISTS ( + SELECT 1 FROM relations r WHERE r.target_doc_id = d.id + ) + AND NOT EXISTS ( + SELECT 1 FROM relations r WHERE r.source_doc_id = d.id + ) + LIMIT 20 + """ + ) + report['orphaned_documents'] = len(orphans) + report['orphan_paths'] = [r['path'] for r in orphans] + + # 3. Documents not re-indexed in >7 days + stale_cutoff = datetime.now(timezone.utc) - timedelta(days=7) + stale_count = await conn.fetchval( + 'SELECT COUNT(*) FROM documents WHERE indexed_at < $1 OR indexed_at IS NULL', + stale_cutoff, + ) + report['stale_documents'] = stale_count + + # 4. Documents with chunks but no embeddings + missing_embeddings = await conn.fetchval( + 'SELECT COUNT(*) FROM chunks WHERE embedding IS NULL' + ) + report['chunks_missing_embeddings'] = missing_embeddings + + # 5. Resolve previously broken WikiLinks that now have matching docs + resolved = await conn.execute( + """ + UPDATE relations r + SET target_doc_id = d.id + FROM documents d + WHERE r.target_doc_id IS NULL + AND r.relation_type = 'wikilink' + AND (d.path LIKE '%' || r.target_path || '%' + OR d.title = r.target_path + OR r.target_path = ANY(d.aliases)) + """ + ) + report['wikilinks_resolved'] = int(resolved.split()[-1]) + + logger.info('Maintenance report: %s', report) + return report diff --git a/services/agents/requirements.txt b/services/agents/requirements.txt new file mode 100644 index 0000000..c6ff77c --- /dev/null +++ b/services/agents/requirements.txt @@ -0,0 +1,4 @@ +asyncpg>=0.29.0 +pydantic-settings>=2.2.0 +httpx>=0.27.0 +pgvector>=0.2.5 diff --git a/services/agents/summarization/__init__.py b/services/agents/summarization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/agents/summarization/agent.py b/services/agents/summarization/agent.py new file mode 100644 index 0000000..5c03ee7 --- /dev/null +++ b/services/agents/summarization/agent.py @@ -0,0 +1,80 @@ +""" +summarization/agent.py — Summarization Agent: generates summaries for long documents. +""" + +from __future__ import annotations + +import logging +import re + +import httpx + +from base_agent import BaseAgent + +logger = logging.getLogger('agent.summarization') + +SUMMARY_PROMPT = """You are a knowledge management assistant. +Write a concise 2-4 sentence summary of the following document. +The summary should capture the main ideas and be useful for quick reference. +Respond with only the summary, no preamble. + +Title: {title} + +Content: +{content} + +Summary:""" + + +class SummarizationAgent(BaseAgent): + agent_type = 'summarization' + + async def process(self, job_id: str, payload: dict) -> dict: + ollama_url = self.settings.ollama_url + model = self.settings.chat_model + + async with self.pool.acquire() as conn: + # Long documents that don't have a summary in frontmatter + docs = await conn.fetch( + """ + SELECT id::text, title, content, frontmatter + FROM documents + WHERE word_count > 500 + AND (frontmatter->>'summary' IS NULL OR frontmatter->>'summary' = '') + LIMIT 10 + """ + ) + + summarized = 0 + for doc in docs: + doc_id = doc['id'] + title = doc['title'] or '' + content = (doc['content'] or '')[:4000] + + try: + summary = await self._generate_summary(title, content, ollama_url, model) + if summary: + fm = dict(doc['frontmatter'] or {}) + fm['summary'] = summary + await conn.execute( + "UPDATE documents SET frontmatter = $2::jsonb WHERE id = $1::uuid", + doc_id, __import__('json').dumps(fm), + ) + summarized += 1 + logger.debug('Summarized: %s', title) + except Exception as exc: + logger.warning('Failed to summarize %s: %s', doc_id, exc) + + return {'documents_summarized': summarized} + + async def _generate_summary( + self, title: str, content: str, ollama_url: str, model: str + ) -> str: + prompt = SUMMARY_PROMPT.format(title=title, content=content) + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.post( + f'{ollama_url.rstrip("/")}/api/generate', + json={'model': model, 'prompt': prompt, 'stream': False}, + ) + resp.raise_for_status() + return resp.json().get('response', '').strip() diff --git a/services/agents/tagging/__init__.py b/services/agents/tagging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/agents/tagging/agent.py b/services/agents/tagging/agent.py new file mode 100644 index 0000000..0b4e756 --- /dev/null +++ b/services/agents/tagging/agent.py @@ -0,0 +1,87 @@ +""" +tagging/agent.py — Tagging Agent: auto-tags documents using the LLM. +""" + +from __future__ import annotations + +import json +import logging +import re + +import httpx + +from base_agent import BaseAgent + +logger = logging.getLogger('agent.tagging') + +TAG_PROMPT = """You are a knowledge management assistant. +Given the following document, suggest 3-7 relevant tags. +Tags should be lowercase, hyphen-separated, single-concept keywords. +Respond ONLY with a JSON array of strings. Example: ["machine-learning", "python", "transformers"] + +Document title: {title} + +Document content (excerpt): +{excerpt} + +Tags:""" + + +class TaggingAgent(BaseAgent): + agent_type = 'tagging' + + async def process(self, job_id: str, payload: dict) -> dict: + ollama_url = self.settings.ollama_url + model = self.settings.chat_model + + async with self.pool.acquire() as conn: + # Documents without tags (or with empty tags array) + docs = await conn.fetch( + """ + SELECT id::text, title, content + FROM documents + WHERE array_length(tags, 1) IS NULL OR array_length(tags, 1) = 0 + LIMIT 20 + """ + ) + + tagged = 0 + for doc in docs: + doc_id = doc['id'] + title = doc['title'] or '' + excerpt = (doc['content'] or '')[:2000] + + try: + suggested_tags = await self._suggest_tags( + title, excerpt, ollama_url, model + ) + if suggested_tags: + await conn.execute( + 'UPDATE documents SET tags = $2 WHERE id = $1::uuid', + doc_id, suggested_tags, + ) + tagged += 1 + logger.debug('Tagged %s with %s', title, suggested_tags) + except Exception as exc: + logger.warning('Failed to tag document %s: %s', doc_id, exc) + + return {'documents_tagged': tagged} + + async def _suggest_tags( + self, title: str, excerpt: str, ollama_url: str, model: str + ) -> list[str]: + prompt = TAG_PROMPT.format(title=title, excerpt=excerpt) + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f'{ollama_url.rstrip("/")}/api/generate', + json={'model': model, 'prompt': prompt, 'stream': False}, + ) + resp.raise_for_status() + raw = resp.json().get('response', '').strip() + + # Extract JSON array from response + match = re.search(r'\[.*?\]', raw, re.DOTALL) + if match: + tags = json.loads(match.group()) + return [str(t).lower().strip() for t in tags if t] + return [] diff --git a/services/ingestion-worker/Dockerfile b/services/ingestion-worker/Dockerfile new file mode 100644 index 0000000..384677e --- /dev/null +++ b/services/ingestion-worker/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.12-slim + +WORKDIR /app + +# System deps +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential libpq-dev curl \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +ENV PYTHONUNBUFFERED=1 + +CMD ["python", "main.py"] diff --git a/services/ingestion-worker/chunker.py b/services/ingestion-worker/chunker.py new file mode 100644 index 0000000..0c0ec85 --- /dev/null +++ b/services/ingestion-worker/chunker.py @@ -0,0 +1,182 @@ +""" +chunker.py — Token-aware sliding-window text chunker. + +Splits document text into overlapping chunks of 500–800 tokens, +preferring paragraph / heading boundaries over hard cuts. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass + +import tiktoken + + +# --------------------------------------------------------------------------- +# Types +# --------------------------------------------------------------------------- + +@dataclass +class Chunk: + chunk_index: int + content: str + token_count: int + metadata: dict # heading path, start_char, end_char, etc. + + +# --------------------------------------------------------------------------- +# Tokeniser +# --------------------------------------------------------------------------- + +# cl100k_base works for most modern models; nomic-embed-text is BPE-compatible +_TOKENISER = tiktoken.get_encoding('cl100k_base') + + +def _count_tokens(text: str) -> int: + return len(_TOKENISER.encode(text, disallowed_special=())) + + +def _tokenise(text: str) -> list[int]: + return _TOKENISER.encode(text, disallowed_special=()) + + +def _decode(tokens: list[int]) -> str: + return _TOKENISER.decode(tokens) + + +# --------------------------------------------------------------------------- +# Splitter helpers +# --------------------------------------------------------------------------- + +_HEADING_RE = re.compile(r'^(#{1,6}\s.+)$', re.MULTILINE) +_PARA_SEP = re.compile(r'\n{2,}') + + +def _split_semantic_blocks(text: str) -> list[tuple[str, str]]: + """ + Split text into (heading_path, block_text) tuples at heading / paragraph + boundaries. This is used to build chunks that respect document structure. + """ + blocks: list[tuple[str, str]] = [] + current_heading = '' + current_parts: list[str] = [] + + for para in _PARA_SEP.split(text): + para = para.strip() + if not para: + continue + heading_match = _HEADING_RE.match(para) + if heading_match: + # Flush current accumulation + if current_parts: + blocks.append((current_heading, '\n\n'.join(current_parts))) + current_parts = [] + current_heading = heading_match.group(1).lstrip('#').strip() + current_parts.append(para) + else: + current_parts.append(para) + + if current_parts: + blocks.append((current_heading, '\n\n'.join(current_parts))) + + return blocks + + +# --------------------------------------------------------------------------- +# Main chunker +# --------------------------------------------------------------------------- + +def chunk_document( + text: str, + target_tokens: int = 700, + overlap_tokens: int = 70, + min_tokens: int = 50, +) -> list[Chunk]: + """ + Chunk ``text`` into overlapping token windows. + + Strategy: + 1. Split into semantic blocks (heading sections / paragraphs). + 2. Merge small blocks and split large blocks to hit ``target_tokens``. + 3. Add overlapping context from the previous chunk. + + Args: + text: Plain-text content to chunk. + target_tokens: Target chunk size in tokens (default 700). + overlap_tokens: Number of tokens to repeat from the previous chunk. + min_tokens: Skip chunks shorter than this. + + Returns: + List of :class:`Chunk` objects. + """ + if not text.strip(): + return [] + + semantic_blocks = _split_semantic_blocks(text) + raw_chunks: list[tuple[str, str]] = [] # (heading, text) + + for heading, block in semantic_blocks: + block_tokens = _count_tokens(block) + if block_tokens <= target_tokens: + raw_chunks.append((heading, block)) + else: + # Split large blocks into token windows + tokens = _tokenise(block) + step = target_tokens - overlap_tokens + start = 0 + while start < len(tokens): + end = min(start + target_tokens, len(tokens)) + raw_chunks.append((heading, _decode(tokens[start:end]))) + if end == len(tokens): + break + start += step + + # ---- Merge small adjacent chunks ---- + merged: list[tuple[str, str]] = [] + buffer_heading = '' + buffer_text = '' + buffer_tokens = 0 + + for heading, text_block in raw_chunks: + block_tokens = _count_tokens(text_block) + if buffer_tokens + block_tokens <= target_tokens: + buffer_text = (buffer_text + '\n\n' + text_block).strip() + buffer_tokens += block_tokens + buffer_heading = heading or buffer_heading + else: + if buffer_text: + merged.append((buffer_heading, buffer_text)) + buffer_heading = heading + buffer_text = text_block + buffer_tokens = block_tokens + + if buffer_text: + merged.append((buffer_heading, buffer_text)) + + # ---- Build final chunks with overlap ---- + chunks: list[Chunk] = [] + prev_overlap_text = '' + + for idx, (heading, chunk_text) in enumerate(merged): + # Prepend overlap from previous chunk + if prev_overlap_text: + chunk_text = prev_overlap_text + '\n\n' + chunk_text + + token_count = _count_tokens(chunk_text) + + if token_count < min_tokens: + continue + + chunks.append(Chunk( + chunk_index=idx, + content=chunk_text.strip(), + token_count=token_count, + metadata={'heading': heading, 'chunk_seq': idx}, + )) + + # Compute overlap for next chunk + tokens = _tokenise(chunk_text) + prev_overlap_text = _decode(tokens[-overlap_tokens:]) if len(tokens) > overlap_tokens else chunk_text + + return chunks diff --git a/services/ingestion-worker/embedder.py b/services/ingestion-worker/embedder.py new file mode 100644 index 0000000..51848fd --- /dev/null +++ b/services/ingestion-worker/embedder.py @@ -0,0 +1,119 @@ +""" +embedder.py — Embedding generation via Ollama or sentence-transformers fallback. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +import httpx +import numpy as np + +logger = logging.getLogger(__name__) + +# Dimensionality per model +_MODEL_DIMS: dict[str, int] = { + 'nomic-embed-text': 768, + 'all-minilm-l6-v2': 384, + 'mxbai-embed-large': 1024, +} + + +class OllamaEmbedder: + """Generate embeddings via the Ollama /api/embed endpoint.""" + + def __init__( + self, + base_url: str = 'http://ollama:11434', + model: str = 'nomic-embed-text', + timeout: float = 60.0, + batch_size: int = 32, + ) -> None: + self.base_url = base_url.rstrip('/') + self.model = model + self.timeout = timeout + self.batch_size = batch_size + self.dimensions = _MODEL_DIMS.get(model, 768) + + def embed_batch(self, texts: list[str]) -> list[list[float]]: + """Embed a list of texts, returning a list of float vectors.""" + all_embeddings: list[list[float]] = [] + + for i in range(0, len(texts), self.batch_size): + batch = texts[i : i + self.batch_size] + embeddings = self._call_ollama(batch) + all_embeddings.extend(embeddings) + + return all_embeddings + + def embed_single(self, text: str) -> list[float]: + return self.embed_batch([text])[0] + + def _call_ollama(self, texts: list[str], retries: int = 3) -> list[list[float]]: + url = f'{self.base_url}/api/embed' + payload: dict[str, Any] = {'model': self.model, 'input': texts} + + for attempt in range(1, retries + 1): + try: + with httpx.Client(timeout=self.timeout) as client: + resp = client.post(url, json=payload) + resp.raise_for_status() + data = resp.json() + return data['embeddings'] + except (httpx.HTTPError, KeyError) as exc: + logger.warning('Ollama embed attempt %d/%d failed: %s', attempt, retries, exc) + if attempt < retries: + time.sleep(2 ** attempt) # exponential backoff + else: + raise + + +class SentenceTransformerEmbedder: + """Local fallback embedder using sentence-transformers.""" + + def __init__( + self, + model_name: str = 'all-MiniLM-L6-v2', + batch_size: int = 32, + ) -> None: + # Lazy import so the module loads even if not installed + try: + from sentence_transformers import SentenceTransformer # type: ignore + except ImportError as exc: + raise ImportError( + 'sentence-transformers is required for the local fallback embedder. ' + 'Install it with: pip install sentence-transformers' + ) from exc + + logger.info('Loading sentence-transformer model: %s', model_name) + self._model = SentenceTransformer(model_name) + self.batch_size = batch_size + self.dimensions = self._model.get_sentence_embedding_dimension() + + def embed_batch(self, texts: list[str]) -> list[list[float]]: + vectors = self._model.encode( + texts, + batch_size=self.batch_size, + show_progress_bar=False, + normalize_embeddings=True, + ) + return [v.tolist() for v in vectors] + + def embed_single(self, text: str) -> list[float]: + return self.embed_batch([text])[0] + + +def get_embedder( + provider: str = 'ollama', + ollama_url: str = 'http://ollama:11434', + model: str = 'nomic-embed-text', +) -> OllamaEmbedder | SentenceTransformerEmbedder: + """Factory function returning the configured embedder.""" + if provider == 'ollama': + return OllamaEmbedder(base_url=ollama_url, model=model) + elif provider == 'sentence_transformers': + return SentenceTransformerEmbedder(model_name=model) + else: + raise ValueError(f'Unknown embedding provider: {provider!r}') diff --git a/services/ingestion-worker/indexer.py b/services/ingestion-worker/indexer.py new file mode 100644 index 0000000..a439707 --- /dev/null +++ b/services/ingestion-worker/indexer.py @@ -0,0 +1,142 @@ +""" +indexer.py — Upserts parsed documents and embeddings into PostgreSQL. +""" + +from __future__ import annotations + +import hashlib +import json +import logging +from typing import Any + +import asyncpg + +from chunker import Chunk +from parser import ParsedDocument + +logger = logging.getLogger(__name__) + + +def sha256(text: str) -> str: + return hashlib.sha256(text.encode('utf-8')).hexdigest() + + +async def upsert_document( + conn: asyncpg.Connection, + doc: ParsedDocument, + chunks: list[Chunk], + embeddings: list[list[float]], +) -> str: + """ + Upsert a document and its chunks atomically. + + Returns the document UUID. + """ + content_hash = sha256(doc.content_raw) + + async with conn.transaction(): + # ---- Upsert document ---- + row = await conn.fetchrow( + """ + INSERT INTO documents + (path, title, content, content_hash, frontmatter, tags, aliases, word_count, indexed_at) + VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, now()) + ON CONFLICT (path) DO UPDATE SET + title = EXCLUDED.title, + content = EXCLUDED.content, + content_hash = EXCLUDED.content_hash, + frontmatter = EXCLUDED.frontmatter, + tags = EXCLUDED.tags, + aliases = EXCLUDED.aliases, + word_count = EXCLUDED.word_count, + indexed_at = now() + RETURNING id, (xmax = 0) AS inserted + """, + doc.path, + doc.title, + doc.content_raw, + content_hash, + json.dumps(doc.frontmatter), + doc.tags, + doc.aliases, + doc.word_count, + ) + doc_id: str = str(row['id']) + is_new = row['inserted'] + logger.info('%s document %s (%s)', 'Inserted' if is_new else 'Updated', doc.path, doc_id) + + # ---- Delete stale chunks ---- + await conn.execute('DELETE FROM chunks WHERE document_id = $1', row['id']) + + # ---- Insert chunks + embeddings ---- + chunk_records = [] + for chunk, embedding in zip(chunks, embeddings): + chunk_records.append(( + row['id'], + chunk.chunk_index, + chunk.content, + chunk.token_count, + embedding, + json.dumps(chunk.metadata), + )) + + await conn.executemany( + """ + INSERT INTO chunks (document_id, chunk_index, content, token_count, embedding, metadata) + VALUES ($1, $2, $3, $4, $5::vector, $6::jsonb) + """, + chunk_records, + ) + logger.debug('Upserted %d chunks for document %s', len(chunk_records), doc.path) + + # ---- Upsert relations (WikiLinks) ---- + await conn.execute( + 'DELETE FROM relations WHERE source_doc_id = $1 AND relation_type = $2', + row['id'], + 'wikilink', + ) + if doc.wikilinks: + relation_records = [ + (row['id'], link, 'wikilink') + for link in doc.wikilinks + ] + await conn.executemany( + """ + INSERT INTO relations (source_doc_id, target_path, relation_type) + VALUES ($1, $2, $3) + """, + relation_records, + ) + # Resolve targets that already exist in the vault + await conn.execute( + """ + UPDATE relations r + SET target_doc_id = d.id + FROM documents d + WHERE r.source_doc_id = $1 + AND r.relation_type = 'wikilink' + AND (d.path LIKE '%' || r.target_path || '%' + OR d.title = r.target_path + OR r.target_path = ANY(d.aliases)) + """, + row['id'], + ) + + return doc_id + + +async def document_needs_reindex(conn: asyncpg.Connection, path: str, content_hash: str) -> bool: + """Return True if the document is new or its content hash has changed.""" + row = await conn.fetchrow( + 'SELECT content_hash FROM documents WHERE path = $1', + path, + ) + if row is None: + return True + return row['content_hash'] != content_hash + + +async def delete_document(conn: asyncpg.Connection, path: str) -> None: + """Remove a document and its cascaded chunks/relations.""" + result = await conn.execute('DELETE FROM documents WHERE path = $1', path) + logger.info('Deleted document %s (%s)', path, result) diff --git a/services/ingestion-worker/main.py b/services/ingestion-worker/main.py new file mode 100644 index 0000000..b30f5d1 --- /dev/null +++ b/services/ingestion-worker/main.py @@ -0,0 +1,33 @@ +""" +main.py — Ingestion worker entry point. +""" + +from __future__ import annotations + +import asyncio +import logging +import sys + +from settings import Settings +from watcher import run_watcher + + +def setup_logging(level: str) -> None: + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + datefmt='%Y-%m-%dT%H:%M:%S', + stream=sys.stdout, + ) + + +if __name__ == '__main__': + settings = Settings() + setup_logging(settings.log_level) + logger = logging.getLogger('ingestion-worker') + logger.info('Starting ingestion worker (vault=%s)', settings.vault_path) + + try: + asyncio.run(run_watcher(settings)) + except KeyboardInterrupt: + logger.info('Ingestion worker stopped.') diff --git a/services/ingestion-worker/parser.py b/services/ingestion-worker/parser.py new file mode 100644 index 0000000..d370808 --- /dev/null +++ b/services/ingestion-worker/parser.py @@ -0,0 +1,134 @@ +""" +parser.py — Markdown vault document parser. + +Extracts: + - YAML frontmatter (title, tags, aliases, date, custom fields) + - Plain text content (Markdown stripped) + - WikiLinks [[target|alias]] and #tags + - Word count +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import frontmatter # python-frontmatter + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + +@dataclass +class ParsedDocument: + path: str + title: str + content_raw: str # original markdown + content_text: str # plain text (markdown stripped) + frontmatter: dict[str, Any] + tags: list[str] + aliases: list[str] + wikilinks: list[str] # resolved link targets + word_count: int + + +# --------------------------------------------------------------------------- +# Regexes +# --------------------------------------------------------------------------- + +_WIKILINK_RE = re.compile(r'\[\[([^\[\]|]+)(?:\|[^\[\]]+)?\]\]') +_INLINE_TAG_RE = re.compile(r'(?]+>') +_HORIZONTAL_RULE_RE = re.compile(r'^[-*_]{3,}\s*$', re.MULTILINE) + + +def _strip_markdown(text: str) -> str: + """Convert Markdown to plain text (lightweight, no external deps).""" + # Remove code blocks first (preserve whitespace context) + text = _CODE_BLOCK_RE.sub(' ', text) + # Remove headings marker characters + text = _HEADING_RE.sub('', text) + # Replace Markdown links with their label + text = _MARKDOWN_LINK_RE.sub(r'\1', text) + # Replace WikiLinks with their display text (or target) + text = _WIKILINK_RE.sub(lambda m: m.group(1).split('/')[-1], text) + # Remove HTML tags + text = _HTML_RE.sub(' ', text) + # Remove horizontal rules + text = _HORIZONTAL_RULE_RE.sub('', text) + # Normalise whitespace + text = re.sub(r'\n{3,}', '\n\n', text) + return text.strip() + + +# --------------------------------------------------------------------------- +# Parser +# --------------------------------------------------------------------------- + +def parse_document(file_path: Path, vault_root: Path) -> ParsedDocument: + """ + Parse a single Markdown file and return a ``ParsedDocument``. + + Args: + file_path: Absolute path to the Markdown file. + vault_root: Absolute path to the vault root (used to compute relative path). + """ + raw_text = file_path.read_text(encoding='utf-8', errors='replace') + relative_path = str(file_path.relative_to(vault_root)) + + # Parse frontmatter + body + post = frontmatter.loads(raw_text) + fm: dict[str, Any] = dict(post.metadata) + body: str = post.content + + # ---- Title ---- + title: str = fm.get('title', '') + if not title: + # Fall back to first H1 heading + h1 = re.search(r'^#\s+(.+)$', body, re.MULTILINE) + if h1: + title = h1.group(1).strip() + else: + title = file_path.stem + + # ---- Tags ---- + fm_tags: list[str] = _normalise_list(fm.get('tags', [])) + inline_tags: list[str] = _INLINE_TAG_RE.findall(body) + tags = list(dict.fromkeys([t.lower().lstrip('#') for t in fm_tags + inline_tags])) + + # ---- Aliases ---- + aliases = _normalise_list(fm.get('aliases', [])) + + # ---- WikiLinks ---- + wikilinks = list(dict.fromkeys(_WIKILINK_RE.findall(body))) + + # ---- Plain text ---- + content_text = _strip_markdown(body) + word_count = len(content_text.split()) + + return ParsedDocument( + path=relative_path, + title=title, + content_raw=raw_text, + content_text=content_text, + frontmatter=fm, + tags=tags, + aliases=aliases, + wikilinks=wikilinks, + word_count=word_count, + ) + + +def _normalise_list(value: Any) -> list[str]: + """Accept str, list[str], or None and return list[str].""" + if not value: + return [] + if isinstance(value, str): + return [value] + return [str(v) for v in value] diff --git a/services/ingestion-worker/pipeline.py b/services/ingestion-worker/pipeline.py new file mode 100644 index 0000000..ccd9c72 --- /dev/null +++ b/services/ingestion-worker/pipeline.py @@ -0,0 +1,91 @@ +""" +pipeline.py — Orchestrates the full ingestion flow for a single file. +""" + +from __future__ import annotations + +import hashlib +import logging +from pathlib import Path + +import asyncpg + +from chunker import chunk_document +from embedder import get_embedder +from indexer import document_needs_reindex, upsert_document +from parser import parse_document +from settings import Settings + +logger = logging.getLogger(__name__) + + +def _sha256(text: str) -> str: + return hashlib.sha256(text.encode('utf-8')).hexdigest() + + +async def ingest_file( + file_path: Path, + settings: Settings, + conn: asyncpg.Connection, +) -> bool: + """ + Full ingestion pipeline for a single Markdown file. + + Returns True if the file was (re)indexed, False if skipped. + """ + vault_root = Path(settings.vault_path) + + if not file_path.exists(): + logger.warning('File not found, skipping: %s', file_path) + return False + + if not file_path.suffix.lower() == '.md': + return False + + raw_text = file_path.read_text(encoding='utf-8', errors='replace') + content_hash = _sha256(raw_text) + relative_path = str(file_path.relative_to(vault_root)) + + # Idempotency check + if not await document_needs_reindex(conn, relative_path, content_hash): + logger.debug('Skipping unchanged file: %s', relative_path) + return False + + logger.info('Ingesting %s', relative_path) + + # Parse + doc = parse_document(file_path, vault_root) + + # Chunk + chunks = chunk_document( + doc.content_text, + target_tokens=settings.chunk_size, + overlap_tokens=settings.chunk_overlap, + ) + + if not chunks: + logger.warning('No chunks generated for %s', relative_path) + return False + + # Embed + embedder = get_embedder( + provider=settings.embedding_provider, + ollama_url=settings.ollama_url, + model=settings.embedding_model, + ) + texts = [c.content for c in chunks] + embeddings = embedder.embed_batch(texts) + + # Validate embedding dimension consistency + if embeddings and len(embeddings[0]) != embedder.dimensions: + logger.error( + 'Embedding dimension mismatch: expected %d, got %d', + embedder.dimensions, + len(embeddings[0]), + ) + raise ValueError('Embedding dimension mismatch') + + # Store + doc_id = await upsert_document(conn, doc, chunks, embeddings) + logger.info('Indexed %s → %s (%d chunks)', relative_path, doc_id, len(chunks)) + return True diff --git a/services/ingestion-worker/requirements.txt b/services/ingestion-worker/requirements.txt new file mode 100644 index 0000000..02690c7 --- /dev/null +++ b/services/ingestion-worker/requirements.txt @@ -0,0 +1,10 @@ +watchdog>=4.0.0 +asyncpg>=0.29.0 +pgvector>=0.2.5 +pydantic-settings>=2.2.0 +httpx>=0.27.0 +python-frontmatter>=1.1.0 +markdown-it-py>=3.0.0 +tiktoken>=0.7.0 +sentence-transformers>=3.0.0 +numpy>=1.26.0 diff --git a/services/ingestion-worker/settings.py b/services/ingestion-worker/settings.py new file mode 100644 index 0000000..a1f7248 --- /dev/null +++ b/services/ingestion-worker/settings.py @@ -0,0 +1,33 @@ +""" +settings.py — Configuration for the ingestion worker, loaded from environment variables. +""" + +from __future__ import annotations + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file='.env', extra='ignore') + + # Database + database_url: str = 'postgresql://brain:brain@postgres:5432/second_brain' + + # Vault + vault_path: str = '/vault' + + # Ollama + ollama_url: str = 'http://ollama:11434' + + # Embedding + embedding_provider: str = 'ollama' # ollama | sentence_transformers + embedding_model: str = 'nomic-embed-text' + + # Chunking + chunk_size: int = 700 + chunk_overlap: int = 70 + + # Worker behaviour + poll_interval: int = 30 # seconds between fallback polls + batch_size: int = 20 # files per ingestion batch + log_level: str = 'INFO' diff --git a/services/ingestion-worker/watcher.py b/services/ingestion-worker/watcher.py new file mode 100644 index 0000000..6157065 --- /dev/null +++ b/services/ingestion-worker/watcher.py @@ -0,0 +1,118 @@ +""" +watcher.py — File system watcher that triggers ingestion on vault changes. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from pathlib import Path +from queue import Queue +from threading import Thread + +import asyncpg +from watchdog.events import FileSystemEvent, FileSystemEventHandler +from watchdog.observers import Observer + +from pipeline import ingest_file +from settings import Settings + +logger = logging.getLogger(__name__) + + +class VaultEventHandler(FileSystemEventHandler): + """Enqueues changed/created Markdown file paths.""" + + def __init__(self, queue: Queue) -> None: + super().__init__() + self._queue = queue + + def on_created(self, event: FileSystemEvent) -> None: + self._enqueue(event) + + def on_modified(self, event: FileSystemEvent) -> None: + self._enqueue(event) + + def on_deleted(self, event: FileSystemEvent) -> None: + if not event.is_directory and str(event.src_path).endswith('.md'): + self._queue.put(('delete', event.src_path)) + + def _enqueue(self, event: FileSystemEvent) -> None: + if not event.is_directory and str(event.src_path).endswith('.md'): + self._queue.put(('upsert', event.src_path)) + + +async def process_queue( + queue: Queue, + settings: Settings, + pool: asyncpg.Pool, +) -> None: + """Drain the event queue and process each file.""" + pending: set[str] = set() + DEBOUNCE_SECONDS = 2.0 + + while True: + # Collect all queued events (debounce rapid saves) + deadline = time.monotonic() + DEBOUNCE_SECONDS + while time.monotonic() < deadline: + try: + action, path = queue.get_nowait() + pending.add((action, path)) + except Exception: + await asyncio.sleep(0.1) + + for action, path in list(pending): + try: + async with pool.acquire() as conn: + if action == 'upsert': + await ingest_file(Path(path), settings, conn) + elif action == 'delete': + from indexer import delete_document + relative = str(Path(path).relative_to(Path(settings.vault_path))) + await delete_document(conn, relative) + except Exception as exc: + logger.error('Error processing %s %s: %s', action, path, exc, exc_info=True) + + pending.clear() + + +async def initial_scan(settings: Settings, pool: asyncpg.Pool) -> None: + """Index all Markdown files in the vault at startup.""" + vault_path = Path(settings.vault_path) + md_files = list(vault_path.rglob('*.md')) + logger.info('Initial scan: found %d Markdown files', len(md_files)) + + for i, file_path in enumerate(md_files): + try: + async with pool.acquire() as conn: + indexed = await ingest_file(file_path, settings, conn) + if indexed: + logger.info('[%d/%d] Indexed %s', i + 1, len(md_files), file_path.name) + except Exception as exc: + logger.error('Failed to index %s: %s', file_path, exc, exc_info=True) + + logger.info('Initial scan complete.') + + +async def run_watcher(settings: Settings) -> None: + """Entry point: start file watcher + initial scan.""" + pool = await asyncpg.create_pool(settings.database_url, min_size=2, max_size=10) + + await initial_scan(settings, pool) + + event_queue: Queue = Queue() + handler = VaultEventHandler(event_queue) + observer = Observer() + observer.schedule(handler, settings.vault_path, recursive=True) + observer.start() + logger.info('Watching vault at %s', settings.vault_path) + + try: + await process_queue(event_queue, settings, pool) + except asyncio.CancelledError: + pass + finally: + observer.stop() + observer.join() + await pool.close() diff --git a/services/rag-api/Dockerfile b/services/rag-api/Dockerfile new file mode 100644 index 0000000..a4f278f --- /dev/null +++ b/services/rag-api/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential libpq-dev curl \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/services/rag-api/core/__init__.py b/services/rag-api/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/rag-api/core/database.py b/services/rag-api/core/database.py new file mode 100644 index 0000000..5d9eda5 --- /dev/null +++ b/services/rag-api/core/database.py @@ -0,0 +1,35 @@ +""" +database.py — Async PostgreSQL connection pool (asyncpg). +""" + +from __future__ import annotations + +import asyncpg + +from core.settings import Settings + +_pool: asyncpg.Pool | None = None + + +async def create_pool(settings: Settings) -> asyncpg.Pool: + global _pool + _pool = await asyncpg.create_pool( + settings.database_url, + min_size=settings.db_pool_min, + max_size=settings.db_pool_max, + command_timeout=60, + ) + return _pool + + +async def get_pool() -> asyncpg.Pool: + if _pool is None: + raise RuntimeError('Database pool not initialised') + return _pool + + +async def close_pool() -> None: + global _pool + if _pool: + await _pool.close() + _pool = None diff --git a/services/rag-api/core/settings.py b/services/rag-api/core/settings.py new file mode 100644 index 0000000..67e53b3 --- /dev/null +++ b/services/rag-api/core/settings.py @@ -0,0 +1,39 @@ +""" +settings.py — RAG API configuration loaded from environment variables. +""" + +from __future__ import annotations + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file='.env', extra='ignore') + + # App + app_title: str = 'Second Brain RAG API' + app_version: str = '1.0.0' + log_level: str = 'INFO' + + # Database + database_url: str = 'postgresql://brain:brain@postgres:5432/second_brain' + db_pool_min: int = 2 + db_pool_max: int = 20 + + # Ollama + ollama_url: str = 'http://ollama:11434' + embedding_model: str = 'nomic-embed-text' + chat_model: str = 'mistral' + embedding_dimensions: int = 768 + + # Search defaults + search_top_k: int = 10 + search_threshold: float = 0.65 + rerank_enabled: bool = False + + # CORS (comma-separated origins) + cors_origins: str = 'http://localhost:3000' + + @property + def cors_origins_list(self) -> list[str]: + return [o.strip() for o in self.cors_origins.split(',') if o.strip()] diff --git a/services/rag-api/main.py b/services/rag-api/main.py new file mode 100644 index 0000000..b29ea56 --- /dev/null +++ b/services/rag-api/main.py @@ -0,0 +1,59 @@ +""" +main.py — FastAPI application entry point for the RAG API. +""" + +from __future__ import annotations + +import logging +import sys + +from contextlib import asynccontextmanager +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from core.database import create_pool, close_pool +from core.settings import Settings +from routers import search, chat, documents, index, meta + +# Global settings instance (imported by routers via dependency) +app_settings = Settings() + + +def setup_logging(level: str) -> None: + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + datefmt='%Y-%m-%dT%H:%M:%S', + stream=sys.stdout, + ) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + setup_logging(app_settings.log_level) + logging.getLogger('rag-api').info('Starting RAG API v%s', app_settings.app_version) + await create_pool(app_settings) + yield + await close_pool() + + +app = FastAPI( + title=app_settings.app_title, + version=app_settings.app_version, + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=app_settings.cors_origins_list, + allow_credentials=True, + allow_methods=['*'], + allow_headers=['*'], +) + +# Register routers +app.include_router(search.router, prefix='/api/v1') +app.include_router(chat.router, prefix='/api/v1') +app.include_router(documents.router, prefix='/api/v1') +app.include_router(index.router, prefix='/api/v1') +app.include_router(meta.router, prefix='/api/v1') diff --git a/services/rag-api/models/__init__.py b/services/rag-api/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/rag-api/models/requests.py b/services/rag-api/models/requests.py new file mode 100644 index 0000000..9695729 --- /dev/null +++ b/services/rag-api/models/requests.py @@ -0,0 +1,31 @@ +""" +models/requests.py — Pydantic request schemas for the RAG API. +""" + +from __future__ import annotations + +from typing import Optional +from pydantic import BaseModel, Field + + +class SearchRequest(BaseModel): + query: str = Field(..., min_length=1, max_length=2000) + limit: int = Field(default=10, ge=1, le=50) + threshold: float = Field(default=0.65, ge=0.0, le=1.0) + tags: Optional[list[str]] = None + hybrid: bool = True + + +class ChatRequest(BaseModel): + message: str = Field(..., min_length=1, max_length=4000) + conversation_id: Optional[str] = None + context_limit: int = Field(default=5, ge=1, le=20) + stream: bool = True + + +class IndexRequest(BaseModel): + path: str = Field(..., description='Relative path of file within the vault') + + +class ReindexRequest(BaseModel): + force: bool = False # If True, reindex even unchanged files diff --git a/services/rag-api/models/responses.py b/services/rag-api/models/responses.py new file mode 100644 index 0000000..a77db4c --- /dev/null +++ b/services/rag-api/models/responses.py @@ -0,0 +1,96 @@ +""" +models/responses.py — Pydantic response schemas for the RAG API. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Optional +from pydantic import BaseModel + + +class ChunkResult(BaseModel): + document_id: str + chunk_id: str + title: str + path: str + content: str + score: float + tags: list[str] + highlight: Optional[str] = None + + +class SearchResponse(BaseModel): + results: list[ChunkResult] + total: int + query_time_ms: float + + +class DocumentResponse(BaseModel): + id: str + path: str + title: str + content: str + frontmatter: dict[str, Any] + tags: list[str] + aliases: list[str] + word_count: Optional[int] + created_at: datetime + updated_at: datetime + indexed_at: Optional[datetime] + + +class RelatedDocument(BaseModel): + document_id: str + title: str + path: str + score: float + tags: list[str] + + +class GraphNode(BaseModel): + id: str + title: str + path: str + tags: list[str] + word_count: Optional[int] + + +class GraphEdge(BaseModel): + source: str + target: str + relation_type: str + label: Optional[str] + + +class GraphResponse(BaseModel): + nodes: list[GraphNode] + edges: list[GraphEdge] + + +class TagCount(BaseModel): + tag: str + count: int + + +class StatsResponse(BaseModel): + total_documents: int + total_chunks: int + total_relations: int + total_tags: int + last_indexed: Optional[datetime] + embedding_model: str + chat_model: str + + +class HealthResponse(BaseModel): + status: str + database: str + ollama: str + version: str + + +class JobResponse(BaseModel): + job_id: str + status: str + message: str diff --git a/services/rag-api/requirements.txt b/services/rag-api/requirements.txt new file mode 100644 index 0000000..d549b85 --- /dev/null +++ b/services/rag-api/requirements.txt @@ -0,0 +1,9 @@ +fastapi>=0.111.0 +uvicorn[standard]>=0.29.0 +asyncpg>=0.29.0 +pgvector>=0.2.5 +pydantic>=2.7.0 +pydantic-settings>=2.2.0 +httpx>=0.27.0 +python-multipart>=0.0.9 +sse-starlette>=2.1.0 diff --git a/services/rag-api/routers/__init__.py b/services/rag-api/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/rag-api/routers/chat.py b/services/rag-api/routers/chat.py new file mode 100644 index 0000000..947220c --- /dev/null +++ b/services/rag-api/routers/chat.py @@ -0,0 +1,52 @@ +""" +routers/chat.py — /chat endpoint with SSE streaming. +""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends +from fastapi.responses import StreamingResponse + +from core.database import get_pool +from core.settings import Settings +from models.requests import ChatRequest +from services.chat import stream_chat +from services.embedder import EmbedService +from services.retriever import hybrid_search + +router = APIRouter(prefix='/chat', tags=['chat']) + + +def _get_settings() -> Settings: + from main import app_settings + return app_settings + + +@router.post('') +async def chat(req: ChatRequest, settings: Settings = Depends(_get_settings)): + pool = await get_pool() + embedder = EmbedService(settings.ollama_url, settings.embedding_model) + embedding = await embedder.embed(req.message) + + async with pool.acquire() as conn: + context_chunks, _ = await hybrid_search( + conn=conn, + query=req.message, + embedding=embedding, + limit=req.context_limit, + threshold=settings.search_threshold, + ) + + return StreamingResponse( + stream_chat( + message=req.message, + context_chunks=context_chunks, + ollama_url=settings.ollama_url, + model=settings.chat_model, + ), + media_type='text/event-stream', + headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no', + }, + ) diff --git a/services/rag-api/routers/documents.py b/services/rag-api/routers/documents.py new file mode 100644 index 0000000..770eeac --- /dev/null +++ b/services/rag-api/routers/documents.py @@ -0,0 +1,67 @@ +""" +routers/documents.py — Document CRUD and graph endpoints. +""" + +from __future__ import annotations + +from typing import Optional +from fastapi import APIRouter, HTTPException, Depends +import asyncpg + +from core.database import get_pool +from core.settings import Settings +from models.responses import DocumentResponse, GraphResponse, GraphNode, GraphEdge, RelatedDocument, TagCount +from services.retriever import get_related + +router = APIRouter(prefix='/document', tags=['documents']) + + +def _get_settings() -> Settings: + from main import app_settings + return app_settings + + +@router.get('/{document_id}', response_model=DocumentResponse) +async def get_document(document_id: str): + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + 'SELECT * FROM documents WHERE id = $1::uuid', document_id + ) + if not row: + raise HTTPException(status_code=404, detail='Document not found') + return _row_to_doc(row) + + +@router.get('/path/{path:path}', response_model=DocumentResponse) +async def get_document_by_path(path: str): + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow('SELECT * FROM documents WHERE path = $1', path) + if not row: + raise HTTPException(status_code=404, detail='Document not found') + return _row_to_doc(row) + + +@router.get('/{document_id}/related', response_model=list[RelatedDocument]) +async def related_documents(document_id: str, limit: int = 5): + pool = await get_pool() + async with pool.acquire() as conn: + related = await get_related(conn, document_id, limit=limit) + return [RelatedDocument(**r) for r in related] + + +def _row_to_doc(row: asyncpg.Record) -> DocumentResponse: + return DocumentResponse( + id=str(row['id']), + path=row['path'], + title=row['title'] or '', + content=row['content'], + frontmatter=dict(row['frontmatter'] or {}), + tags=list(row['tags'] or []), + aliases=list(row['aliases'] or []), + word_count=row['word_count'], + created_at=row['created_at'], + updated_at=row['updated_at'], + indexed_at=row['indexed_at'], + ) diff --git a/services/rag-api/routers/index.py b/services/rag-api/routers/index.py new file mode 100644 index 0000000..2ac9d5e --- /dev/null +++ b/services/rag-api/routers/index.py @@ -0,0 +1,49 @@ +""" +routers/index.py — /index and /reindex endpoints. +""" + +from __future__ import annotations + +import uuid +from fastapi import APIRouter, BackgroundTasks, Depends + +from core.database import get_pool +from core.settings import Settings +from models.requests import IndexRequest, ReindexRequest +from models.responses import JobResponse + +router = APIRouter(prefix='/index', tags=['indexing']) + + +def _get_settings() -> Settings: + from main import app_settings + return app_settings + + +async def _enqueue_job(agent_type: str, payload: dict, pool) -> str: + job_id = str(uuid.uuid4()) + async with pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO agent_jobs (id, agent_type, payload) + VALUES ($1::uuid, $2, $3::jsonb) + """, + job_id, + agent_type, + __import__('json').dumps(payload), + ) + return job_id + + +@router.post('', response_model=JobResponse) +async def index_file(req: IndexRequest, settings: Settings = Depends(_get_settings)): + pool = await get_pool() + job_id = await _enqueue_job('ingestion', {'path': req.path, 'force': True}, pool) + return JobResponse(job_id=job_id, status='pending', message=f'Indexing {req.path}') + + +@router.post('/reindex', response_model=JobResponse) +async def reindex_vault(req: ReindexRequest, settings: Settings = Depends(_get_settings)): + pool = await get_pool() + job_id = await _enqueue_job('ingestion', {'reindex_all': True, 'force': req.force}, pool) + return JobResponse(job_id=job_id, status='pending', message='Full vault reindex queued') diff --git a/services/rag-api/routers/meta.py b/services/rag-api/routers/meta.py new file mode 100644 index 0000000..8ca031a --- /dev/null +++ b/services/rag-api/routers/meta.py @@ -0,0 +1,129 @@ +""" +routers/meta.py — /health, /stats, /tags, /graph endpoints. +""" + +from __future__ import annotations + +from fastapi import APIRouter +import httpx + +from core.database import get_pool +from core.settings import Settings +from models.responses import HealthResponse, StatsResponse, TagCount, GraphResponse, GraphNode, GraphEdge + +router = APIRouter(tags=['meta']) + + +def _get_settings() -> Settings: + from main import app_settings + return app_settings + + +@router.get('/health', response_model=HealthResponse) +async def health(): + settings = _get_settings() + db_status = 'ok' + ollama_status = 'ok' + + try: + pool = await get_pool() + async with pool.acquire() as conn: + await conn.fetchval('SELECT 1') + except Exception: + db_status = 'error' + + try: + async with httpx.AsyncClient(timeout=5.0) as client: + resp = await client.get(f'{settings.ollama_url}/api/tags') + if resp.status_code != 200: + ollama_status = 'error' + except Exception: + ollama_status = 'unavailable' + + overall = 'ok' if db_status == 'ok' else 'degraded' + return HealthResponse( + status=overall, + database=db_status, + ollama=ollama_status, + version=settings.app_version, + ) + + +@router.get('/stats', response_model=StatsResponse) +async def stats(): + settings = _get_settings() + pool = await get_pool() + async with pool.acquire() as conn: + docs = await conn.fetchval('SELECT COUNT(*) FROM documents') + chunks = await conn.fetchval('SELECT COUNT(*) FROM chunks') + relations = await conn.fetchval('SELECT COUNT(*) FROM relations') + tags_count = await conn.fetchval( + "SELECT COUNT(DISTINCT tag) FROM documents, unnest(tags) AS tag" + ) + last_indexed = await conn.fetchval( + 'SELECT MAX(indexed_at) FROM documents' + ) + return StatsResponse( + total_documents=docs or 0, + total_chunks=chunks or 0, + total_relations=relations or 0, + total_tags=tags_count or 0, + last_indexed=last_indexed, + embedding_model=settings.embedding_model, + chat_model=settings.chat_model, + ) + + +@router.get('/tags', response_model=list[TagCount]) +async def list_tags(): + pool = await get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT tag, COUNT(*) AS count + FROM documents, unnest(tags) AS tag + GROUP BY tag + ORDER BY count DESC, tag + """ + ) + return [TagCount(tag=row['tag'], count=row['count']) for row in rows] + + +@router.get('/graph', response_model=GraphResponse) +async def knowledge_graph(limit: int = 200): + pool = await get_pool() + async with pool.acquire() as conn: + doc_rows = await conn.fetch( + 'SELECT id, title, path, tags, word_count FROM documents LIMIT $1', + limit, + ) + rel_rows = await conn.fetch( + """ + SELECT r.source_doc_id::text, r.target_doc_id::text, r.relation_type, r.label + FROM relations r + WHERE r.target_doc_id IS NOT NULL + LIMIT $1 + """, + limit * 3, + ) + + nodes = [ + GraphNode( + id=str(row['id']), + title=row['title'] or '', + path=row['path'], + tags=list(row['tags'] or []), + word_count=row['word_count'], + ) + for row in doc_rows + ] + edges = [ + GraphEdge( + source=row['source_doc_id'], + target=row['target_doc_id'], + relation_type=row['relation_type'], + label=row['label'], + ) + for row in rel_rows + ] + return GraphResponse(nodes=nodes, edges=edges) diff --git a/services/rag-api/routers/search.py b/services/rag-api/routers/search.py new file mode 100644 index 0000000..b4926ef --- /dev/null +++ b/services/rag-api/routers/search.py @@ -0,0 +1,43 @@ +""" +routers/search.py — /search endpoint. +""" + +from __future__ import annotations + +import time + +from fastapi import APIRouter, Depends +from fastapi.responses import JSONResponse + +from core.database import get_pool +from models.requests import SearchRequest +from models.responses import SearchResponse +from services.embedder import EmbedService +from services.retriever import hybrid_search +from core.settings import Settings + +router = APIRouter(prefix='/search', tags=['search']) + + +def _get_settings() -> Settings: + from main import app_settings + return app_settings + + +@router.post('', response_model=SearchResponse) +async def search(req: SearchRequest, settings: Settings = Depends(_get_settings)): + pool = await get_pool() + embedder = EmbedService(settings.ollama_url, settings.embedding_model) + embedding = await embedder.embed(req.query) + + async with pool.acquire() as conn: + results, elapsed = await hybrid_search( + conn=conn, + query=req.query, + embedding=embedding, + limit=req.limit, + threshold=req.threshold, + tags=req.tags, + ) + + return SearchResponse(results=results, total=len(results), query_time_ms=elapsed) diff --git a/services/rag-api/services/__init__.py b/services/rag-api/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/rag-api/services/chat.py b/services/rag-api/services/chat.py new file mode 100644 index 0000000..b5775f1 --- /dev/null +++ b/services/rag-api/services/chat.py @@ -0,0 +1,87 @@ +""" +services/chat.py — RAG chat: retrieves context, streams LLM response. +""" + +from __future__ import annotations + +import json +import logging +from typing import AsyncIterator + +import httpx + +from models.responses import ChunkResult + +logger = logging.getLogger(__name__) + +SYSTEM_PROMPT = """You are a knowledgeable assistant with access to the user's personal knowledge base (Second Brain). +Answer questions based on the provided context documents. +Always cite which documents you drew information from using the format [Document Title]. +If the context doesn't contain enough information, say so honestly rather than fabricating answers. +Be concise and precise.""" + + +async def stream_chat( + message: str, + context_chunks: list[ChunkResult], + ollama_url: str, + model: str, +) -> AsyncIterator[str]: + """ + Stream a chat response via Ollama using the retrieved context. + + Yields Server-Sent Events (SSE) formatted strings. + """ + # Build context block + context_parts = [] + for i, chunk in enumerate(context_chunks, 1): + context_parts.append( + f'[{i}] **{chunk.title}** (path: {chunk.path})\n{chunk.content}' + ) + context_text = '\n\n---\n\n'.join(context_parts) + + prompt = f"""Context from knowledge base: + +{context_text} + +--- + +User question: {message} + +Answer based on the above context:""" + + url = f'{ollama_url.rstrip("/")}/api/chat' + payload = { + 'model': model, + 'stream': True, + 'messages': [ + {'role': 'system', 'content': SYSTEM_PROMPT}, + {'role': 'user', 'content': prompt}, + ], + } + + # Yield sources first + sources = [ + {'title': c.title, 'path': c.path, 'score': c.score} + for c in context_chunks + ] + yield f'data: {json.dumps({"type": "sources", "sources": sources})}\n\n' + + # Stream tokens + async with httpx.AsyncClient(timeout=120.0) as client: + async with client.stream('POST', url, json=payload) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + if not line.strip(): + continue + try: + chunk_data = json.loads(line) + token = chunk_data.get('message', {}).get('content', '') + if token: + yield f'data: {json.dumps({"type": "token", "token": token})}\n\n' + if chunk_data.get('done', False): + break + except json.JSONDecodeError: + continue + + yield f'data: {json.dumps({"type": "done"})}\n\n' diff --git a/services/rag-api/services/embedder.py b/services/rag-api/services/embedder.py new file mode 100644 index 0000000..a55272d --- /dev/null +++ b/services/rag-api/services/embedder.py @@ -0,0 +1,31 @@ +""" +services/embedder.py — Thin async wrapper around Ollama embeddings for the API. +""" + +from __future__ import annotations + +import logging +import time + +import httpx + +logger = logging.getLogger(__name__) + + +class EmbedService: + def __init__(self, ollama_url: str, model: str, timeout: float = 30.0) -> None: + self._url = f'{ollama_url.rstrip("/")}/api/embed' + self._model = model + self._timeout = timeout + + async def embed(self, text: str) -> list[float]: + return (await self.embed_batch([text]))[0] + + async def embed_batch(self, texts: list[str]) -> list[list[float]]: + async with httpx.AsyncClient(timeout=self._timeout) as client: + resp = await client.post( + self._url, + json={'model': self._model, 'input': texts}, + ) + resp.raise_for_status() + return resp.json()['embeddings'] diff --git a/services/rag-api/services/retriever.py b/services/rag-api/services/retriever.py new file mode 100644 index 0000000..26657bf --- /dev/null +++ b/services/rag-api/services/retriever.py @@ -0,0 +1,160 @@ +""" +services/retriever.py — Hybrid vector + full-text search against PostgreSQL. +""" + +from __future__ import annotations + +import logging +import time +from typing import Optional + +import asyncpg + +from models.responses import ChunkResult + +logger = logging.getLogger(__name__) + + +async def hybrid_search( + conn: asyncpg.Connection, + query: str, + embedding: list[float], + limit: int = 10, + threshold: float = 0.65, + tags: Optional[list[str]] = None, +) -> tuple[list[ChunkResult], float]: + """ + Hybrid search: vector similarity + full-text search, merged by RRF. + + Returns (results, query_time_ms). + """ + start = time.monotonic() + + tag_filter = '' + params: list = [embedding, query, limit * 2, threshold] + + if tags: + tag_filter = 'AND d.tags && $5' + params.append(tags) + + # Combined RRF (Reciprocal Rank Fusion) of vector and FTS results + sql = f""" + WITH vector_results AS ( + SELECT + c.id AS chunk_id, + c.document_id, + c.content, + c.chunk_index, + 1 - (c.embedding <=> $1::vector) AS vector_score, + ROW_NUMBER() OVER (ORDER BY c.embedding <=> $1::vector) AS vector_rank + FROM chunks c + JOIN documents d ON d.id = c.document_id + WHERE 1 - (c.embedding <=> $1::vector) >= $4 + {tag_filter} + ORDER BY c.embedding <=> $1::vector + LIMIT $3 + ), + fts_results AS ( + SELECT + c.id AS chunk_id, + c.document_id, + c.content, + c.chunk_index, + ts_rank_cd(d.fts_vector, plainto_tsquery('english', $2)) AS fts_score, + ROW_NUMBER() OVER ( + ORDER BY ts_rank_cd(d.fts_vector, plainto_tsquery('english', $2)) DESC + ) AS fts_rank + FROM chunks c + JOIN documents d ON d.id = c.document_id + WHERE d.fts_vector @@ plainto_tsquery('english', $2) + {tag_filter} + ORDER BY fts_score DESC + LIMIT $3 + ), + merged AS ( + SELECT + COALESCE(v.chunk_id, f.chunk_id) AS chunk_id, + COALESCE(v.document_id, f.document_id) AS document_id, + COALESCE(v.content, f.content) AS content, + (COALESCE(1.0 / (60 + v.vector_rank), 0) + + COALESCE(1.0 / (60 + f.fts_rank), 0)) AS rrf_score, + COALESCE(v.vector_score, 0) AS vector_score + FROM vector_results v + FULL OUTER JOIN fts_results f ON v.chunk_id = f.chunk_id + ) + SELECT + m.chunk_id::text, + m.document_id::text, + m.content, + m.rrf_score, + m.vector_score, + d.title, + d.path, + d.tags, + ts_headline('english', m.content, plainto_tsquery('english', $2), + 'MaxWords=20, MinWords=10, ShortWord=3') AS highlight + FROM merged m + JOIN documents d ON d.id = m.document_id + ORDER BY m.rrf_score DESC + LIMIT $3 + """ + + rows = await conn.fetch(sql, *params) + elapsed_ms = (time.monotonic() - start) * 1000 + + results = [ + ChunkResult( + chunk_id=str(row['chunk_id']), + document_id=str(row['document_id']), + title=row['title'] or '', + path=row['path'], + content=row['content'], + score=round(float(row['rrf_score']), 4), + tags=list(row['tags'] or []), + highlight=row['highlight'], + ) + for row in rows + ] + + return results[:limit], round(elapsed_ms, 2) + + +async def get_related( + conn: asyncpg.Connection, + document_id: str, + limit: int = 5, +) -> list[dict]: + """Find documents related to the given document via average chunk embedding.""" + rows = await conn.fetch( + """ + WITH doc_embedding AS ( + SELECT AVG(embedding) AS avg_emb + FROM chunks + WHERE document_id = $1::uuid + ) + SELECT + d.id::text, + d.title, + d.path, + d.tags, + 1 - (AVG(c.embedding) <=> (SELECT avg_emb FROM doc_embedding)) AS score + FROM chunks c + JOIN documents d ON d.id = c.document_id + WHERE c.document_id != $1::uuid + GROUP BY d.id, d.title, d.path, d.tags + ORDER BY score DESC + LIMIT $2 + """, + document_id, + limit, + ) + return [ + { + 'document_id': row['id'], + 'title': row['title'] or '', + 'path': row['path'], + 'tags': list(row['tags'] or []), + 'score': round(float(row['score']), 4), + } + for row in rows + ] diff --git a/services/web-ui/Dockerfile b/services/web-ui/Dockerfile new file mode 100644 index 0000000..a41331d --- /dev/null +++ b/services/web-ui/Dockerfile @@ -0,0 +1,31 @@ +FROM node:20-alpine AS base + +FROM base AS deps +WORKDIR /app +COPY package.json ./ +RUN npm install --frozen-lockfile || npm install + +FROM base AS builder +WORKDIR /app +COPY --from=deps /app/node_modules ./node_modules +COPY . . +ENV NEXT_TELEMETRY_DISABLED=1 +RUN npm run build + +FROM base AS runner +WORKDIR /app +ENV NODE_ENV=production +ENV NEXT_TELEMETRY_DISABLED=1 + +RUN addgroup --system --gid 1001 nodejs && \ + adduser --system --uid 1001 nextjs + +COPY --from=builder /app/public ./public +COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./ +COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static + +USER nextjs +EXPOSE 3000 +ENV PORT=3000 + +CMD ["node", "server.js"] diff --git a/services/web-ui/app/chat/page.tsx b/services/web-ui/app/chat/page.tsx new file mode 100644 index 0000000..472eea4 --- /dev/null +++ b/services/web-ui/app/chat/page.tsx @@ -0,0 +1,136 @@ +'use client'; + +import { streamChat } from '@/lib/api'; +import { useState, useRef, useEffect, useCallback } from 'react'; +import { Send, Loader2, Bot, User, BookOpen } from 'lucide-react'; + +interface Message { + role: 'user' | 'assistant'; + content: string; + sources?: { title: string; path: string; score: number }[]; +} + +export default function ChatPage() { + const [messages, setMessages] = useState([]); + const [input, setInput] = useState(''); + const [streaming, setStreaming] = useState(false); + const cancelRef = useRef<(() => void) | null>(null); + const bottomRef = useRef(null); + + useEffect(() => { + bottomRef.current?.scrollIntoView({ behavior: 'smooth' }); + }, [messages]); + + const sendMessage = useCallback(async () => { + const text = input.trim(); + if (!text || streaming) return; + setInput(''); + + const userMsg: Message = { role: 'user', content: text }; + setMessages((prev) => [...prev, userMsg]); + setStreaming(true); + + const assistantMsg: Message = { role: 'assistant', content: '', sources: [] }; + setMessages((prev) => [...prev, assistantMsg]); + + cancelRef.current = streamChat( + text, + 5, + (token) => { + setMessages((prev) => { + const next = [...prev]; + const last = next[next.length - 1]; + next[next.length - 1] = { ...last, content: last.content + token }; + return next; + }); + }, + (sources) => { + setMessages((prev) => { + const next = [...prev]; + next[next.length - 1] = { ...next[next.length - 1], sources }; + return next; + }); + }, + () => setStreaming(false), + ); + }, [input, streaming]); + + const handleKeyDown = (e: React.KeyboardEvent) => { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault(); + sendMessage(); + } + }; + + return ( +
+

AI Chat

+ +
+ {messages.length === 0 && ( +
+ +

Ask anything about your knowledge base

+
+ )} + + {messages.map((msg, i) => ( +
+ {msg.role === 'assistant' && ( +
+ +
+ )} +
+
+ {msg.content} + {msg.role === 'assistant' && streaming && i === messages.length - 1 && ( + + )} +
+ {msg.sources && msg.sources.length > 0 && ( +
+ {msg.sources.map((src, si) => ( + + + {src.title} + + ))} +
+ )} +
+ {msg.role === 'user' && ( +
+ +
+ )} +
+ ))} +
+
+ +
+