""" base_agent.py — Abstract base class for all AI agents. """ from __future__ import annotations import asyncio import json import logging import time import traceback from abc import ABC, abstractmethod from typing import Any, Optional import asyncpg logger = logging.getLogger(__name__) class BaseAgent(ABC): """ All agents inherit from this class. Responsibilities: - Poll agent_jobs table for work - Claim jobs atomically - Execute with exponential-backoff retries - Log results / errors to agent_logs """ agent_type: str # Must be set by subclass def __init__(self, pool: asyncpg.Pool, settings: Any) -> None: self.pool = pool self.settings = settings self._log = logging.getLogger(f'agent.{self.agent_type}') # ------------------------------------------------------------------ # Public interface # ------------------------------------------------------------------ async def run_forever(self, poll_interval: int = 10) -> None: """Poll for jobs indefinitely.""" self._log.info('Agent started (poll_interval=%ds)', poll_interval) while True: try: job = await self._claim_job() if job: await self._execute(job) else: await asyncio.sleep(poll_interval) except asyncio.CancelledError: self._log.info('Agent shutting down') return except Exception as exc: self._log.error('Unexpected error in agent loop: %s', exc, exc_info=True) await asyncio.sleep(poll_interval) async def enqueue(self, payload: dict, priority: int = 5, delay_seconds: int = 0) -> str: """Create a new job for this agent.""" import uuid from datetime import datetime, timezone, timedelta job_id = str(uuid.uuid4()) scheduled = datetime.now(timezone.utc) if delay_seconds: scheduled += timedelta(seconds=delay_seconds) async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO agent_jobs (id, agent_type, priority, payload, scheduled_for) VALUES ($1::uuid, $2, $3, $4::jsonb, $5) """, job_id, self.agent_type, priority, json.dumps(payload), scheduled, ) return job_id # ------------------------------------------------------------------ # Abstract # ------------------------------------------------------------------ @abstractmethod async def process(self, job_id: str, payload: dict) -> dict: """Process a single job. Return result dict.""" ... # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _claim_job(self) -> Optional[asyncpg.Record]: """Atomically claim the next pending job for this agent type.""" async with self.pool.acquire() as conn: row = await conn.fetchrow( """ UPDATE agent_jobs SET status = 'running', started_at = now() WHERE id = ( SELECT id FROM agent_jobs WHERE agent_type = $1 AND status = 'pending' AND scheduled_for <= now() AND retry_count < max_retries ORDER BY priority ASC, scheduled_for ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING * """, self.agent_type, ) return row async def _execute(self, job: asyncpg.Record) -> None: job_id = str(job['id']) payload = dict(job['payload'] or {}) self._log.info('Processing job %s', job_id) start = time.monotonic() try: result = await self.process(job_id, payload) elapsed = time.monotonic() - start async with self.pool.acquire() as conn: await conn.execute( """ UPDATE agent_jobs SET status = 'done', result = $2::jsonb, completed_at = now() WHERE id = $1::uuid """, job_id, json.dumps(result or {}), ) await self._log_event(job_id, 'info', f'Job done in {elapsed:.2f}s', result or {}) except Exception as exc: elapsed = time.monotonic() - start err_msg = str(exc) self._log.error('Job %s failed: %s', job_id, err_msg, exc_info=True) async with self.pool.acquire() as conn: row = await conn.fetchrow( 'SELECT retry_count, max_retries FROM agent_jobs WHERE id = $1::uuid', job_id ) retries = (row['retry_count'] or 0) + 1 max_retries = row['max_retries'] or 3 if retries < max_retries: # Re-queue with exponential backoff backoff = 2 ** retries await conn.execute( """ UPDATE agent_jobs SET status = 'pending', retry_count = $2, error = $3, scheduled_for = now() + ($4 || ' seconds')::interval WHERE id = $1::uuid """, job_id, retries, err_msg, str(backoff), ) await self._log_event(job_id, 'warning', f'Retry {retries}/{max_retries} in {backoff}s', {}) else: await conn.execute( """ UPDATE agent_jobs SET status = 'failed', error = $2, completed_at = now() WHERE id = $1::uuid """, job_id, err_msg, ) await self._log_event(job_id, 'error', f'Job permanently failed: {err_msg}', {}) async def _log_event( self, job_id: Optional[str], level: str, message: str, metadata: dict, ) -> None: try: async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO agent_logs (job_id, agent_type, level, message, metadata) VALUES ($1::uuid, $2, $3, $4, $5::jsonb) """, job_id, self.agent_type, level, message, json.dumps(metadata), ) except Exception as log_exc: self._log.warning('Failed to write agent log: %s', log_exc)