You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
6.6 KiB

"""
capture.py — Quick log entries and file uploads to the vault.
"""
from __future__ import annotations
import os
import re
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, UploadFile, File, Form
from pydantic import BaseModel
router = APIRouter(tags=['capture'])
logger = logging.getLogger(__name__)
# Vault path from environment
VAULT_PATH = Path(os.getenv('VAULT_PATH', '/vault'))
class LogEntry(BaseModel):
"""Quick log/note entry."""
content: str
tags: Optional[list[str]] = None
title: Optional[str] = None
class LogResponse(BaseModel):
"""Response after creating a log entry."""
success: bool
file_path: str
timestamp: str
message: str
class UploadResponse(BaseModel):
"""Response after file upload."""
success: bool
file_path: str
filename: str
size_bytes: int
message: str
def sanitize_filename(name: str) -> str:
"""Remove unsafe characters from filename."""
# Keep alphanumeric, dots, dashes, underscores
safe = re.sub(r'[^\w\-.]', '_', name)
return safe[:200] # Limit length
@router.post('/capture/log', response_model=LogResponse)
async def create_log_entry(entry: LogEntry):
"""
Create a quick log entry. Entries are appended to daily log files
in the logs/ directory with automatic timestamps.
"""
now = datetime.now(timezone.utc)
date_str = now.strftime('%Y-%m-%d')
time_str = now.strftime('%H:%M:%S')
logs_dir = VAULT_PATH / 'logs'
logs_dir.mkdir(parents=True, exist_ok=True)
log_file = logs_dir / f'{date_str}.md'
# Format the entry
lines = []
# If file doesn't exist, add header
if not log_file.exists():
lines.append(f'# Log — {date_str}\n\n')
# Entry header with timestamp
lines.append(f'## {time_str} UTC')
if entry.title:
lines.append(f'{entry.title}')
lines.append('\n\n')
# Tags
if entry.tags:
tag_str = ' '.join(f'#{tag}' for tag in entry.tags)
lines.append(f'{tag_str}\n\n')
# Content
lines.append(entry.content.strip())
lines.append('\n\n---\n\n')
content = ''.join(lines)
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(content)
logger.info('Log entry created: %s', log_file)
return LogResponse(
success=True,
file_path=str(log_file.relative_to(VAULT_PATH)),
timestamp=now.isoformat(),
message=f'Log entry added to {date_str}.md'
)
except Exception as e:
logger.error('Failed to write log: %s', e)
raise HTTPException(status_code=500, detail=f'Failed to write log: {e}')
@router.post('/capture/upload', response_model=UploadResponse)
async def upload_document(
file: UploadFile = File(...),
folder: str = Form(default='uploads'),
tags: Optional[str] = Form(default=None),
):
"""
Upload a document to the vault. Supports markdown, text, and PDF files.
- folder: Subfolder in vault (default: uploads)
- tags: Comma-separated tags to add as frontmatter (for .md files)
"""
if not file.filename:
raise HTTPException(status_code=400, detail='No filename provided')
# Sanitize inputs
safe_folder = sanitize_filename(folder)
safe_filename = sanitize_filename(file.filename)
# Allowed extensions
allowed_ext = {'.md', '.txt', '.pdf', '.json', '.yaml', '.yml', '.csv'}
ext = Path(safe_filename).suffix.lower()
if ext not in allowed_ext:
raise HTTPException(
status_code=400,
detail=f'File type {ext} not allowed. Allowed: {", ".join(allowed_ext)}'
)
# Target directory
target_dir = VAULT_PATH / safe_folder
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / safe_filename
# Handle duplicates by adding timestamp
if target_path.exists():
stem = Path(safe_filename).stem
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
safe_filename = f'{stem}_{timestamp}{ext}'
target_path = target_dir / safe_filename
try:
content = await file.read()
# If markdown and tags provided, prepend frontmatter
if ext == '.md' and tags:
tag_list = [t.strip() for t in tags.split(',') if t.strip()]
if tag_list:
frontmatter = '---\n'
frontmatter += f'tags: [{", ".join(tag_list)}]\n'
frontmatter += f'uploaded: {datetime.now(timezone.utc).isoformat()}\n'
frontmatter += '---\n\n'
content = frontmatter.encode('utf-8') + content
with open(target_path, 'wb') as f:
f.write(content)
logger.info('File uploaded: %s (%d bytes)', target_path, len(content))
return UploadResponse(
success=True,
file_path=str(target_path.relative_to(VAULT_PATH)),
filename=safe_filename,
size_bytes=len(content),
message=f'File uploaded to {safe_folder}/'
)
except Exception as e:
logger.error('Upload failed: %s', e)
raise HTTPException(status_code=500, detail=f'Upload failed: {e}')
@router.get('/capture/logs')
async def list_logs(limit: int = 10):
"""List recent log files."""
logs_dir = VAULT_PATH / 'logs'
if not logs_dir.exists():
return {'logs': [], 'total': 0}
log_files = sorted(logs_dir.glob('*.md'), reverse=True)[:limit]
logs = []
for lf in log_files:
stat = lf.stat()
logs.append({
'date': lf.stem,
'file_path': str(lf.relative_to(VAULT_PATH)),
'size_bytes': stat.st_size,
'modified': datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
})
return {'logs': logs, 'total': len(logs)}
@router.get('/capture/log/{date}')
async def get_log(date: str):
"""Get contents of a specific log file by date (YYYY-MM-DD)."""
# Validate date format
if not re.match(r'^\d{4}-\d{2}-\d{2}$', date):
raise HTTPException(status_code=400, detail='Invalid date format. Use YYYY-MM-DD')
log_file = VAULT_PATH / 'logs' / f'{date}.md'
if not log_file.exists():
raise HTTPException(status_code=404, detail=f'No log found for {date}')
content = log_file.read_text(encoding='utf-8')
return {
'date': date,
'file_path': str(log_file.relative_to(VAULT_PATH)),
'content': content
}

Powered by TurnKey Linux.