gateway/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
2026-04-20 17:51:09 +02:00

245 lines
8.1 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Background job service.
Generic infrastructure for fire-and-forget async tasks. Any caller (HTTP route,
AI tool, scheduler) can submit work and get a `jobId` back immediately; status
is polled via `GET /api/jobs/{jobId}`.
Usage (registration, once at module load):
from modules.serviceCenter.services.serviceBackgroundJobs import registerJobHandler
async def _myHandler(job, progressCb):
progressCb(10, "starting...")
result = await doExpensiveWork(job["payload"])
return result # stored as job.result
registerJobHandler("myJobType", _myHandler)
Usage (submission):
from modules.serviceCenter.services.serviceBackgroundJobs import startJob
jobId = await startJob("myJobType", {"foo": "bar"}, mandateId=mid, triggeredBy=userId)
return {"jobId": jobId}
Restart semantics: jobs are tracked in DB. If the worker process dies mid-job,
`_recoverInterruptedJobs()` (called at boot) flips RUNNING jobs to ERROR with a
clear message. No silent zombies.
"""
import asyncio
import logging
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable, Dict, List, Optional
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.shared.dbRegistry import registerDatabase
from modules.datamodels.datamodelBackgroundJob import (
BackgroundJob,
BackgroundJobStatusEnum,
TERMINAL_JOB_STATUSES,
)
logger = logging.getLogger(__name__)
JOBS_DATABASE = APP_CONFIG.get("DB_DATABASE", "poweron_app")
registerDatabase(JOBS_DATABASE)
JobProgressCallback = Callable[[int, Optional[str]], None]
JobHandler = Callable[[Dict[str, Any], JobProgressCallback], Awaitable[Optional[Dict[str, Any]]]]
_JOB_HANDLERS: Dict[str, JobHandler] = {}
def registerJobHandler(jobType: str, handler: JobHandler) -> None:
"""Register a handler for a job type. Idempotent — last registration wins."""
if jobType in _JOB_HANDLERS and _JOB_HANDLERS[jobType] is not handler:
logger.info("Re-registering background job handler for type %s", jobType)
_JOB_HANDLERS[jobType] = handler
def _getDb() -> DatabaseConnector:
return DatabaseConnector(
dbDatabase=JOBS_DATABASE,
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbPort=int(APP_CONFIG.get("DB_PORT", "5432")),
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
)
def _serialiseDatetimes(data: Dict[str, Any]) -> Dict[str, Any]:
"""Return copy of dict with datetime values converted to ISO 8601 strings."""
out: Dict[str, Any] = {}
for k, v in data.items():
if isinstance(v, datetime):
out[k] = v.isoformat()
else:
out[k] = v
return out
async def startJob(
jobType: str,
payload: Optional[Dict[str, Any]] = None,
*,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
triggeredBy: Optional[str] = None,
) -> str:
"""Insert a new BackgroundJob, kick off its handler in the background, return jobId.
Returns immediately; the handler runs via `asyncio.create_task`.
"""
if jobType not in _JOB_HANDLERS:
raise ValueError(f"No handler registered for jobType '{jobType}'")
job = BackgroundJob(
jobType=jobType,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
triggeredBy=triggeredBy,
payload=payload or {},
)
db = _getDb()
record = db.recordCreate(BackgroundJob, _serialiseDatetimes(job.model_dump()))
jobId = record["id"]
asyncio.create_task(_runJob(jobId))
logger.info(
"BackgroundJob %s submitted: type=%s mandate=%s instance=%s by=%s",
jobId, jobType, mandateId, featureInstanceId, triggeredBy,
)
return jobId
def _loadJob(jobId: str) -> Optional[Dict[str, Any]]:
db = _getDb()
rows = db.getRecordset(BackgroundJob, recordFilter={"id": jobId})
return dict(rows[0]) if rows else None
def _updateJob(jobId: str, fields: Dict[str, Any]) -> None:
db = _getDb()
db.recordModify(BackgroundJob, jobId, _serialiseDatetimes(fields))
def _markStarted(jobId: str) -> None:
_updateJob(jobId, {
"status": BackgroundJobStatusEnum.RUNNING.value,
"startedAt": datetime.now(timezone.utc),
})
def _markSuccess(jobId: str, result: Optional[Dict[str, Any]]) -> None:
_updateJob(jobId, {
"status": BackgroundJobStatusEnum.SUCCESS.value,
"result": result or {},
"progress": 100,
"finishedAt": datetime.now(timezone.utc),
})
def _markError(jobId: str, errorMessage: str) -> None:
truncated = (errorMessage or "")[:1000]
_updateJob(jobId, {
"status": BackgroundJobStatusEnum.ERROR.value,
"errorMessage": truncated,
"finishedAt": datetime.now(timezone.utc),
})
def _makeProgressCallback(jobId: str) -> JobProgressCallback:
def _cb(progress: int, message: Optional[str] = None) -> None:
try:
clamped = max(0, min(100, int(progress)))
fields: Dict[str, Any] = {"progress": clamped}
if message is not None:
fields["progressMessage"] = message[:500]
_updateJob(jobId, fields)
except Exception as ex:
logger.warning("Progress update failed for job %s: %s", jobId, ex)
return _cb
async def _runJob(jobId: str) -> None:
job = _loadJob(jobId)
if not job:
logger.error("BackgroundJob %s vanished before runner started", jobId)
return
handler = _JOB_HANDLERS.get(job["jobType"])
if not handler:
msg = f"No handler registered for jobType '{job['jobType']}'"
logger.error("BackgroundJob %s: %s", jobId, msg)
_markError(jobId, msg)
return
_markStarted(jobId)
try:
result = await handler(job, _makeProgressCallback(jobId))
_markSuccess(jobId, result if isinstance(result, dict) else None)
logger.info("BackgroundJob %s (%s) completed successfully", jobId, job["jobType"])
except Exception as e:
logger.exception("BackgroundJob %s (%s) failed", jobId, job["jobType"])
_markError(jobId, str(e))
def getJobStatus(jobId: str) -> Optional[Dict[str, Any]]:
"""Load current job state. Returns None if not found."""
return _loadJob(jobId)
def listJobs(
*,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
jobType: Optional[str] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""List recent jobs filtered by scope. Newest first."""
db = _getDb()
rows = db.getRecordset(BackgroundJob)
out = [dict(r) for r in rows]
if mandateId is not None:
out = [r for r in out if r.get("mandateId") == mandateId]
if featureInstanceId is not None:
out = [r for r in out if r.get("featureInstanceId") == featureInstanceId]
if jobType is not None:
out = [r for r in out if r.get("jobType") == jobType]
out.sort(key=lambda r: r.get("createdAt") or "", reverse=True)
return out[:limit]
def isTerminalStatus(status: str) -> bool:
"""True if the given status is one of {SUCCESS, ERROR, CANCELLED}."""
return status in {s.value for s in TERMINAL_JOB_STATUSES}
def recoverInterruptedJobs() -> int:
"""Flip any RUNNING jobs to ERROR (called at worker boot).
A RUNNING job in the DB after process restart means the previous worker
died mid-execution; the asyncio task is gone and the job will never
finish on its own.
"""
db = _getDb()
try:
rows = db.getRecordset(BackgroundJob, recordFilter={"status": BackgroundJobStatusEnum.RUNNING.value})
except Exception as ex:
logger.warning("recoverInterruptedJobs: failed to scan RUNNING jobs: %s", ex)
return 0
count = 0
for row in rows:
try:
_markError(row["id"], "Interrupted by worker restart")
count += 1
except Exception as ex:
logger.warning("recoverInterruptedJobs: could not mark %s as ERROR: %s", row.get("id"), ex)
if count:
logger.warning("Recovered %d interrupted background job(s) after restart", count)
return count