""" capture.py — Quick log entries and file uploads to the vault. """ from __future__ import annotations import os import re import logging from datetime import datetime, timezone from pathlib import Path from typing import Optional from fastapi import APIRouter, HTTPException, UploadFile, File, Form from pydantic import BaseModel router = APIRouter(tags=['capture']) logger = logging.getLogger(__name__) # Vault path from environment VAULT_PATH = Path(os.getenv('VAULT_PATH', '/vault')) class LogEntry(BaseModel): """Quick log/note entry.""" content: str tags: Optional[list[str]] = None title: Optional[str] = None class LogResponse(BaseModel): """Response after creating a log entry.""" success: bool file_path: str timestamp: str message: str class UploadResponse(BaseModel): """Response after file upload.""" success: bool file_path: str filename: str size_bytes: int message: str def sanitize_filename(name: str) -> str: """Remove unsafe characters from filename.""" # Keep alphanumeric, dots, dashes, underscores safe = re.sub(r'[^\w\-.]', '_', name) return safe[:200] # Limit length @router.post('/capture/log', response_model=LogResponse) async def create_log_entry(entry: LogEntry): """ Create a quick log entry. Entries are appended to daily log files in the logs/ directory with automatic timestamps. """ now = datetime.now(timezone.utc) date_str = now.strftime('%Y-%m-%d') time_str = now.strftime('%H:%M:%S') logs_dir = VAULT_PATH / 'logs' logs_dir.mkdir(parents=True, exist_ok=True) log_file = logs_dir / f'{date_str}.md' # Format the entry lines = [] # If file doesn't exist, add header if not log_file.exists(): lines.append(f'# Log — {date_str}\n\n') # Entry header with timestamp lines.append(f'## {time_str} UTC') if entry.title: lines.append(f' — {entry.title}') lines.append('\n\n') # Tags if entry.tags: tag_str = ' '.join(f'#{tag}' for tag in entry.tags) lines.append(f'{tag_str}\n\n') # Content lines.append(entry.content.strip()) lines.append('\n\n---\n\n') content = ''.join(lines) try: with open(log_file, 'a', encoding='utf-8') as f: f.write(content) logger.info('Log entry created: %s', log_file) return LogResponse( success=True, file_path=str(log_file.relative_to(VAULT_PATH)), timestamp=now.isoformat(), message=f'Log entry added to {date_str}.md' ) except Exception as e: logger.error('Failed to write log: %s', e) raise HTTPException(status_code=500, detail=f'Failed to write log: {e}') @router.post('/capture/upload', response_model=UploadResponse) async def upload_document( file: UploadFile = File(...), folder: str = Form(default='uploads'), tags: Optional[str] = Form(default=None), ): """ Upload a document to the vault. Supports markdown, text, and PDF files. - folder: Subfolder in vault (default: uploads) - tags: Comma-separated tags to add as frontmatter (for .md files) """ if not file.filename: raise HTTPException(status_code=400, detail='No filename provided') # Sanitize inputs safe_folder = sanitize_filename(folder) safe_filename = sanitize_filename(file.filename) # Allowed extensions allowed_ext = {'.md', '.txt', '.pdf', '.json', '.yaml', '.yml', '.csv'} ext = Path(safe_filename).suffix.lower() if ext not in allowed_ext: raise HTTPException( status_code=400, detail=f'File type {ext} not allowed. Allowed: {", ".join(allowed_ext)}' ) # Target directory target_dir = VAULT_PATH / safe_folder target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / safe_filename # Handle duplicates by adding timestamp if target_path.exists(): stem = Path(safe_filename).stem timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') safe_filename = f'{stem}_{timestamp}{ext}' target_path = target_dir / safe_filename try: content = await file.read() # If markdown and tags provided, prepend frontmatter if ext == '.md' and tags: tag_list = [t.strip() for t in tags.split(',') if t.strip()] if tag_list: frontmatter = '---\n' frontmatter += f'tags: [{", ".join(tag_list)}]\n' frontmatter += f'uploaded: {datetime.now(timezone.utc).isoformat()}\n' frontmatter += '---\n\n' content = frontmatter.encode('utf-8') + content with open(target_path, 'wb') as f: f.write(content) logger.info('File uploaded: %s (%d bytes)', target_path, len(content)) return UploadResponse( success=True, file_path=str(target_path.relative_to(VAULT_PATH)), filename=safe_filename, size_bytes=len(content), message=f'File uploaded to {safe_folder}/' ) except Exception as e: logger.error('Upload failed: %s', e) raise HTTPException(status_code=500, detail=f'Upload failed: {e}') @router.get('/capture/logs') async def list_logs(limit: int = 10): """List recent log files.""" logs_dir = VAULT_PATH / 'logs' if not logs_dir.exists(): return {'logs': [], 'total': 0} log_files = sorted(logs_dir.glob('*.md'), reverse=True)[:limit] logs = [] for lf in log_files: stat = lf.stat() logs.append({ 'date': lf.stem, 'file_path': str(lf.relative_to(VAULT_PATH)), 'size_bytes': stat.st_size, 'modified': datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() }) return {'logs': logs, 'total': len(logs)} @router.get('/capture/log/{date}') async def get_log(date: str): """Get contents of a specific log file by date (YYYY-MM-DD).""" # Validate date format if not re.match(r'^\d{4}-\d{2}-\d{2}$', date): raise HTTPException(status_code=400, detail='Invalid date format. Use YYYY-MM-DD') log_file = VAULT_PATH / 'logs' / f'{date}.md' if not log_file.exists(): raise HTTPException(status_code=404, detail=f'No log found for {date}') content = log_file.read_text(encoding='utf-8') return { 'date': date, 'file_path': str(log_file.relative_to(VAULT_PATH)), 'content': content }