# Copyright (c) 2026 PowerOn AG # All rights reserved. """HTTP API for the generic background job service. Endpoints: - GET /api/jobs/{jobId} -> single job status - GET /api/jobs -> list (filter by jobType, instanceId) Access control: a caller may read a job iff they are a member of its mandate (or PlatformAdmin). Jobs without a mandateId (system-wide) are restricted to PlatformAdmin only. """ import logging from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request from modules.auth import getRequestContext, RequestContext, limiter from modules.serviceCenter.services.serviceBackgroundJobs import ( getJobStatus, listJobs, ) from modules.shared.i18nRegistry import apiRouteContext, resolveJobMessage logger = logging.getLogger(__name__) routeApiMsg = apiRouteContext("routeJobs") router = APIRouter( prefix="/api/jobs", tags=["BackgroundJobs"], responses={404: {"description": "Not found"}}, ) def _serialiseJob(job: Dict[str, Any]) -> Dict[str, Any]: """Strip system audit fields, ensure JSON-safe types, translate progress. Walkers store progress as a structured payload (``progressMessageData = {key, params}``). The frontend never calls ``t()`` on backend-supplied keys (i18n convention #2), so we resolve the payload here using the request-context language and overwrite ``progressMessage`` with the fully rendered string. Older clients keep working because they read the same field. """ out = {k: v for k, v in job.items() if not k.startswith("sys")} translated = resolveJobMessage(out.get("progressMessageData")) if translated: out["progressMessage"] = translated return out def _userHasMandateAccess(context: RequestContext, mandateId: Optional[str]) -> bool: """Return True if the current user can read jobs for the given mandate scope.""" if context is None or context.user is None: return False if context.isPlatformAdmin: return True if mandateId is None: return False from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelMembership import UserMandate rootIf = getRootInterface() try: memberships = rootIf.db.getRecordset( UserMandate, recordFilter={"userId": context.user.id, "mandateId": mandateId}, ) return bool(memberships) except Exception as ex: logger.warning( "Mandate access check failed for user=%s mandate=%s: %s", context.user.id, mandateId, ex, ) return False @router.get("/{jobId}") @limiter.limit("60/minute") def get_job( request: Request, jobId: str = Path(..., description="Background job ID"), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Return the current state of one background job.""" job = getJobStatus(jobId) if not job: raise HTTPException(status_code=404, detail=routeApiMsg("Job not found")) if not _userHasMandateAccess(context, job.get("mandateId")): raise HTTPException(status_code=403, detail=routeApiMsg("Access denied")) return _serialiseJob(job) @router.get("") @limiter.limit("30/minute") def list_jobs( request: Request, jobType: Optional[str] = Query(None), mandateId: Optional[str] = Query(None), instanceId: Optional[str] = Query(None, description="Feature instance scope"), limit: int = Query(20, ge=1, le=100), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, List[Dict[str, Any]]]: """List recent jobs filtered by scope. Newest first.""" if mandateId is None: if not context or not context.isPlatformAdmin: raise HTTPException( status_code=400, detail=routeApiMsg("mandateId is required (only PlatformAdmin may list system-wide)"), ) elif not _userHasMandateAccess(context, mandateId): raise HTTPException(status_code=403, detail=routeApiMsg("Access denied")) jobs = listJobs( mandateId=mandateId, featureInstanceId=instanceId, jobType=jobType, limit=limit, ) return {"items": [_serialiseJob(j) for j in jobs]}