You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
225 lines
6.6 KiB
225 lines
6.6 KiB
"""
|
|
capture.py — Quick log entries and file uploads to the vault.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, HTTPException, UploadFile, File, Form
|
|
from pydantic import BaseModel
|
|
|
|
router = APIRouter(tags=['capture'])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Vault path from environment
|
|
VAULT_PATH = Path(os.getenv('VAULT_PATH', '/vault'))
|
|
|
|
|
|
class LogEntry(BaseModel):
|
|
"""Quick log/note entry."""
|
|
content: str
|
|
tags: Optional[list[str]] = None
|
|
title: Optional[str] = None
|
|
|
|
|
|
class LogResponse(BaseModel):
|
|
"""Response after creating a log entry."""
|
|
success: bool
|
|
file_path: str
|
|
timestamp: str
|
|
message: str
|
|
|
|
|
|
class UploadResponse(BaseModel):
|
|
"""Response after file upload."""
|
|
success: bool
|
|
file_path: str
|
|
filename: str
|
|
size_bytes: int
|
|
message: str
|
|
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
"""Remove unsafe characters from filename."""
|
|
# Keep alphanumeric, dots, dashes, underscores
|
|
safe = re.sub(r'[^\w\-.]', '_', name)
|
|
return safe[:200] # Limit length
|
|
|
|
|
|
@router.post('/capture/log', response_model=LogResponse)
|
|
async def create_log_entry(entry: LogEntry):
|
|
"""
|
|
Create a quick log entry. Entries are appended to daily log files
|
|
in the logs/ directory with automatic timestamps.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
date_str = now.strftime('%Y-%m-%d')
|
|
time_str = now.strftime('%H:%M:%S')
|
|
|
|
logs_dir = VAULT_PATH / 'logs'
|
|
logs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
log_file = logs_dir / f'{date_str}.md'
|
|
|
|
# Format the entry
|
|
lines = []
|
|
|
|
# If file doesn't exist, add header
|
|
if not log_file.exists():
|
|
lines.append(f'# Log — {date_str}\n\n')
|
|
|
|
# Entry header with timestamp
|
|
lines.append(f'## {time_str} UTC')
|
|
if entry.title:
|
|
lines.append(f' — {entry.title}')
|
|
lines.append('\n\n')
|
|
|
|
# Tags
|
|
if entry.tags:
|
|
tag_str = ' '.join(f'#{tag}' for tag in entry.tags)
|
|
lines.append(f'{tag_str}\n\n')
|
|
|
|
# Content
|
|
lines.append(entry.content.strip())
|
|
lines.append('\n\n---\n\n')
|
|
|
|
content = ''.join(lines)
|
|
|
|
try:
|
|
with open(log_file, 'a', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
logger.info('Log entry created: %s', log_file)
|
|
|
|
return LogResponse(
|
|
success=True,
|
|
file_path=str(log_file.relative_to(VAULT_PATH)),
|
|
timestamp=now.isoformat(),
|
|
message=f'Log entry added to {date_str}.md'
|
|
)
|
|
except Exception as e:
|
|
logger.error('Failed to write log: %s', e)
|
|
raise HTTPException(status_code=500, detail=f'Failed to write log: {e}')
|
|
|
|
|
|
@router.post('/capture/upload', response_model=UploadResponse)
|
|
async def upload_document(
|
|
file: UploadFile = File(...),
|
|
folder: str = Form(default='uploads'),
|
|
tags: Optional[str] = Form(default=None),
|
|
):
|
|
"""
|
|
Upload a document to the vault. Supports markdown, text, and PDF files.
|
|
|
|
- folder: Subfolder in vault (default: uploads)
|
|
- tags: Comma-separated tags to add as frontmatter (for .md files)
|
|
"""
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail='No filename provided')
|
|
|
|
# Sanitize inputs
|
|
safe_folder = sanitize_filename(folder)
|
|
safe_filename = sanitize_filename(file.filename)
|
|
|
|
# Allowed extensions
|
|
allowed_ext = {'.md', '.txt', '.pdf', '.json', '.yaml', '.yml', '.csv'}
|
|
ext = Path(safe_filename).suffix.lower()
|
|
|
|
if ext not in allowed_ext:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f'File type {ext} not allowed. Allowed: {", ".join(allowed_ext)}'
|
|
)
|
|
|
|
# Target directory
|
|
target_dir = VAULT_PATH / safe_folder
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
target_path = target_dir / safe_filename
|
|
|
|
# Handle duplicates by adding timestamp
|
|
if target_path.exists():
|
|
stem = Path(safe_filename).stem
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
safe_filename = f'{stem}_{timestamp}{ext}'
|
|
target_path = target_dir / safe_filename
|
|
|
|
try:
|
|
content = await file.read()
|
|
|
|
# If markdown and tags provided, prepend frontmatter
|
|
if ext == '.md' and tags:
|
|
tag_list = [t.strip() for t in tags.split(',') if t.strip()]
|
|
if tag_list:
|
|
frontmatter = '---\n'
|
|
frontmatter += f'tags: [{", ".join(tag_list)}]\n'
|
|
frontmatter += f'uploaded: {datetime.now(timezone.utc).isoformat()}\n'
|
|
frontmatter += '---\n\n'
|
|
content = frontmatter.encode('utf-8') + content
|
|
|
|
with open(target_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
logger.info('File uploaded: %s (%d bytes)', target_path, len(content))
|
|
|
|
return UploadResponse(
|
|
success=True,
|
|
file_path=str(target_path.relative_to(VAULT_PATH)),
|
|
filename=safe_filename,
|
|
size_bytes=len(content),
|
|
message=f'File uploaded to {safe_folder}/'
|
|
)
|
|
except Exception as e:
|
|
logger.error('Upload failed: %s', e)
|
|
raise HTTPException(status_code=500, detail=f'Upload failed: {e}')
|
|
|
|
|
|
@router.get('/capture/logs')
|
|
async def list_logs(limit: int = 10):
|
|
"""List recent log files."""
|
|
logs_dir = VAULT_PATH / 'logs'
|
|
|
|
if not logs_dir.exists():
|
|
return {'logs': [], 'total': 0}
|
|
|
|
log_files = sorted(logs_dir.glob('*.md'), reverse=True)[:limit]
|
|
|
|
logs = []
|
|
for lf in log_files:
|
|
stat = lf.stat()
|
|
logs.append({
|
|
'date': lf.stem,
|
|
'file_path': str(lf.relative_to(VAULT_PATH)),
|
|
'size_bytes': stat.st_size,
|
|
'modified': datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
|
|
})
|
|
|
|
return {'logs': logs, 'total': len(logs)}
|
|
|
|
|
|
@router.get('/capture/log/{date}')
|
|
async def get_log(date: str):
|
|
"""Get contents of a specific log file by date (YYYY-MM-DD)."""
|
|
# Validate date format
|
|
if not re.match(r'^\d{4}-\d{2}-\d{2}$', date):
|
|
raise HTTPException(status_code=400, detail='Invalid date format. Use YYYY-MM-DD')
|
|
|
|
log_file = VAULT_PATH / 'logs' / f'{date}.md'
|
|
|
|
if not log_file.exists():
|
|
raise HTTPException(status_code=404, detail=f'No log found for {date}')
|
|
|
|
content = log_file.read_text(encoding='utf-8')
|
|
|
|
return {
|
|
'date': date,
|
|
'file_path': str(log_file.relative_to(VAULT_PATH)),
|
|
'content': content
|
|
}
|