diff --git a/app.py b/app.py index cb9377af..b59c1a25 100644 --- a/app.py +++ b/app.py @@ -380,6 +380,15 @@ async def lifespan(app: FastAPI): from modules.shared.auditLogger import registerAuditLogCleanupScheduler registerAuditLogCleanupScheduler() + # Recover background jobs that were RUNNING when the previous worker died + try: + from modules.serviceCenter.services.serviceBackgroundJobs.mainBackgroundJobService import ( + recoverInterruptedJobs, + ) + recoverInterruptedJobs() + except Exception as e: + logger.warning(f"BackgroundJob recovery failed (non-critical): {e}") + yield # --- Stop Managers --- @@ -627,6 +636,9 @@ app.include_router(billingRouter) from modules.routes.routeSubscription import router as subscriptionRouter app.include_router(subscriptionRouter) +from modules.routes.routeJobs import router as jobsRouter +app.include_router(jobsRouter) + # ============================================================================ # SYSTEM ROUTES (Navigation, etc.) # ============================================================================ diff --git a/modules/datamodels/datamodelBackgroundJob.py b/modules/datamodels/datamodelBackgroundJob.py new file mode 100644 index 00000000..45a26b2c --- /dev/null +++ b/modules/datamodels/datamodelBackgroundJob.py @@ -0,0 +1,130 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Background job models: generic, reusable infrastructure for long-running tasks. + +A `BackgroundJob` record tracks the lifecycle of one async task that must not block +the calling HTTP request. Any caller (HTTP route, AI tool, scheduled task) can: + +1. Register a handler once via `registerJobHandler(jobType, handler)`. +2. Submit work via `startJob(jobType, payload, ...)` which returns a `jobId` + immediately and runs the handler in the background. +3. Poll `getJobStatus(jobId)` (HTTP `GET /api/jobs/{jobId}`) until `status` is + one of {SUCCESS, ERROR, CANCELLED}. + +See `modules.serviceCenter.services.serviceBackgroundJobs.mainBackgroundJobService`. +""" + +from typing import Any, Dict, Optional +from enum import Enum +from datetime import datetime, timezone +import uuid + +from pydantic import Field + +from modules.datamodels.datamodelBase import PowerOnModel +from modules.shared.i18nRegistry import i18nModel + + +class BackgroundJobStatusEnum(str, Enum): + """Lifecycle status of a background job.""" + PENDING = "PENDING" + RUNNING = "RUNNING" + SUCCESS = "SUCCESS" + ERROR = "ERROR" + CANCELLED = "CANCELLED" + + +TERMINAL_JOB_STATUSES = { + BackgroundJobStatusEnum.SUCCESS, + BackgroundJobStatusEnum.ERROR, + BackgroundJobStatusEnum.CANCELLED, +} + + +@i18nModel("Hintergrund-Job") +class BackgroundJob(PowerOnModel): + """Generic record describing a long-running asynchronous task. + + Scope: the combination of `mandateId` and optionally `featureInstanceId` + is used for access control on `GET /api/jobs/{jobId}`. + """ + + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), + description="Primary key", + json_schema_extra={"label": "ID"}, + ) + jobType: str = Field( + ..., + description="Handler key registered via registerJobHandler() (e.g. 'trusteeAccountingSync')", + json_schema_extra={"label": "Typ"}, + ) + mandateId: Optional[str] = Field( + None, + description="Mandate scope (used for access checks). None for system-wide jobs.", + json_schema_extra={ + "label": "Mandanten-ID", + "fk_target": {"db": "poweron_app", "table": "Mandate"}, + }, + ) + featureInstanceId: Optional[str] = Field( + None, + description="Feature instance scope (optional)", + json_schema_extra={ + "label": "Feature-Instanz", + "fk_target": {"db": "poweron_app", "table": "FeatureInstance"}, + }, + ) + triggeredBy: Optional[str] = Field( + None, + description="UserId or 'ai-tool:' / 'scheduler:'", + json_schema_extra={"label": "Ausgeloest von"}, + ) + + status: str = Field( + default=BackgroundJobStatusEnum.PENDING.value, + description="Current lifecycle status", + json_schema_extra={"label": "Status"}, + ) + progress: int = Field( + default=0, + description="Progress 0..100 (best-effort; may stay 0 for handlers that cannot estimate)", + json_schema_extra={"label": "Fortschritt"}, + ) + progressMessage: Optional[str] = Field( + None, + description="Human-readable current step (e.g. 'Importing journal entries...')", + json_schema_extra={"label": "Fortschritts-Nachricht"}, + ) + + payload: Dict[str, Any] = Field( + default_factory=dict, + description="Job input parameters (JSON)", + json_schema_extra={"label": "Eingabe"}, + ) + result: Optional[Dict[str, Any]] = Field( + None, + description="Handler return value on success (JSON)", + json_schema_extra={"label": "Ergebnis"}, + ) + errorMessage: Optional[str] = Field( + None, + description="Truncated error message on failure (full stack trace in logs)", + json_schema_extra={"label": "Fehler"}, + ) + + createdAt: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + description="When the job was submitted", + json_schema_extra={"label": "Eingereicht"}, + ) + startedAt: Optional[datetime] = Field( + None, + description="When the handler began running", + json_schema_extra={"label": "Gestartet"}, + ) + finishedAt: Optional[datetime] = Field( + None, + description="When the handler reached a terminal status", + json_schema_extra={"label": "Beendet"}, + ) diff --git a/modules/features/trustee/accounting/accountingConnectorBase.py b/modules/features/trustee/accounting/accountingConnectorBase.py index 355b6f34..c5124184 100644 --- a/modules/features/trustee/accounting/accountingConnectorBase.py +++ b/modules/features/trustee/accounting/accountingConnectorBase.py @@ -56,6 +56,7 @@ class ConnectorConfigField(BaseModel): secret: bool = False required: bool = True placeholder: Optional[str] = None + suggestions: Optional[List[str]] = None class BaseAccountingConnector(ABC): diff --git a/modules/features/trustee/accounting/accountingDataSync.py b/modules/features/trustee/accounting/accountingDataSync.py index e0584a02..e422566f 100644 --- a/modules/features/trustee/accounting/accountingDataSync.py +++ b/modules/features/trustee/accounting/accountingDataSync.py @@ -215,17 +215,34 @@ class AccountingDataSync: logger.error(f"Compute balances failed: {e}") summary["errors"].append(f"Balances: {e}") - # Update config with last import timestamp - try: - cfgId = cfgRecord.get("id") - if cfgId: - self._if.db.recordModify(TrusteeAccountingConfig, cfgId, { - "lastSyncAt": time.time(), - "lastSyncStatus": "success" if not summary["errors"] else "partial", - "lastSyncErrorMessage": "; ".join(summary["errors"])[:500] if summary["errors"] else None, - }) - except Exception: - pass + cfgId = cfgRecord.get("id") + if cfgId: + corePayload = { + "lastSyncAt": time.time(), + "lastSyncStatus": "success" if not summary["errors"] else "partial", + "lastSyncErrorMessage": "; ".join(summary["errors"])[:500] if summary["errors"] else None, + } + try: + self._if.db.recordModify(TrusteeAccountingConfig, cfgId, corePayload) + except Exception as coreErr: + logger.exception(f"AccountingDataSync: failed to write core lastSync* fields for cfg {cfgId}: {coreErr}") + summary["errors"].append(f"Persist lastSync core: {coreErr}") + extPayload = { + "lastSyncDateFrom": dateFrom, + "lastSyncDateTo": dateTo, + "lastSyncCounts": { + "accounts": int(summary.get("accounts", 0)), + "journalEntries": int(summary.get("journalEntries", 0)), + "journalLines": int(summary.get("journalLines", 0)), + "contacts": int(summary.get("contacts", 0)), + "accountBalances": int(summary.get("accountBalances", 0)), + }, + } + try: + self._if.db.recordModify(TrusteeAccountingConfig, cfgId, extPayload) + except Exception as extErr: + logger.exception(f"AccountingDataSync: failed to write extended lastSync* fields for cfg {cfgId}: {extErr}") + summary["errors"].append(f"Persist lastSync ext: {extErr}") summary["finishedAt"] = time.time() summary["durationSeconds"] = round(summary["finishedAt"] - summary["startedAt"], 1) diff --git a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py index bcf52561..79a61d77 100644 --- a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py +++ b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py @@ -47,6 +47,10 @@ class AccountingConnectorRma(BaseAccountingConnector): fieldType="text", secret=False, placeholder="https://service.runmyaccounts.com/api/latest/clients/", + suggestions=[ + "https://service.runmyaccounts.com/api/latest/clients/", + "https://service.int.runmyaccounts.com/api/latest/clients/", + ], ), ConnectorConfigField( key="clientName", diff --git a/modules/features/trustee/datamodelFeatureTrustee.py b/modules/features/trustee/datamodelFeatureTrustee.py index 5d1b4263..fcf5c8b4 100644 --- a/modules/features/trustee/datamodelFeatureTrustee.py +++ b/modules/features/trustee/datamodelFeatureTrustee.py @@ -3,7 +3,7 @@ """Trustee models: TrusteeOrganisation, TrusteeRole, TrusteeAccess, TrusteeContract, TrusteeDocument, TrusteePosition.""" from enum import Enum -from typing import Optional +from typing import Optional, Dict from pydantic import BaseModel, Field from modules.datamodels.datamodelBase import PowerOnModel @@ -818,6 +818,9 @@ class TrusteeAccountingConfig(PowerOnModel): lastSyncAt: Optional[float] = Field(default=None, description="Timestamp of last sync attempt", json_schema_extra={"label": "Letzte Synchronisation"}) lastSyncStatus: Optional[str] = Field(default=None, description="Last sync result: success, error, partial", json_schema_extra={"label": "Status"}) lastSyncErrorMessage: Optional[str] = Field(default=None, description="Error message when lastSyncStatus is error", json_schema_extra={"label": "Fehlermeldung"}) + lastSyncDateFrom: Optional[str] = Field(default=None, description="dateFrom (ISO date) of the last data import window", json_schema_extra={"label": "Letztes Import-Fenster von"}) + lastSyncDateTo: Optional[str] = Field(default=None, description="dateTo (ISO date) of the last data import window", json_schema_extra={"label": "Letztes Import-Fenster bis"}) + lastSyncCounts: Optional[Dict[str, int]] = Field(default=None, description="Per-entity counts of the last import (accounts, journalEntries, journalLines, contacts, accountBalances)", json_schema_extra={"label": "Letzte Import-Zaehler"}) cachedChartOfAccounts: Optional[str] = Field(default=None, description="JSON-serialised chart of accounts cache (list of {accountNumber, label, accountType})", json_schema_extra={"label": "Cached Kontoplan"}) chartCachedAt: Optional[float] = Field(default=None, description="Timestamp when cachedChartOfAccounts was last refreshed", json_schema_extra={"label": "Kontoplan-Cache-Zeitpunkt"}) mandateId: Optional[str] = Field(default=None, json_schema_extra={"label": "Mandat", "fk_target": {"db": "poweron_app", "table": "Mandate"}}) diff --git a/modules/features/trustee/routeFeatureTrustee.py b/modules/features/trustee/routeFeatureTrustee.py index 73752788..7b80189e 100644 --- a/modules/features/trustee/routeFeatureTrustee.py +++ b/modules/features/trustee/routeFeatureTrustee.py @@ -1643,7 +1643,46 @@ def get_position_sync_status( # ===== Accounting Data Import ===== -@router.post("/{instanceId}/accounting/import-data") +TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE = "trusteeAccountingSync" + + +async def _trusteeAccountingSyncJobHandler(job: Dict[str, Any], progressCb) -> Dict[str, Any]: + """BackgroundJob handler: imports accounting data from the external system. + + Reads inputs from `job["payload"]` (dateFrom, dateTo, userId) and runs + `AccountingDataSync.importData(...)` in the worker's event loop without + blocking the original HTTP request that submitted the job. + """ + from modules.security.rootAccess import getRootUser + from .accounting.accountingDataSync import AccountingDataSync + + instanceId = job["featureInstanceId"] + mandateId = job["mandateId"] + payload = job.get("payload") or {} + rootUser = getRootUser() + + progressCb(5, "Initialisiere Import...") + interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId) + sync = AccountingDataSync(interface) + progressCb(10, "Lese Daten vom Buchhaltungssystem...") + result = await sync.importData( + featureInstanceId=instanceId, + mandateId=mandateId, + dateFrom=payload.get("dateFrom"), + dateTo=payload.get("dateTo"), + ) + progressCb(100, "Import abgeschlossen") + return result + + +try: + from modules.serviceCenter.services.serviceBackgroundJobs import registerJobHandler + registerJobHandler(TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE, _trusteeAccountingSyncJobHandler) +except Exception as _regErr: + logger.warning("Failed to register trusteeAccountingSync job handler: %s", _regErr) + + +@router.post("/{instanceId}/accounting/import-data", status_code=status.HTTP_202_ACCEPTED) @limiter.limit("3/minute") async def import_accounting_data( request: Request, @@ -1651,20 +1690,26 @@ async def import_accounting_data( data: Dict[str, Any] = Body(default={}), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: - """Import accounting data (chart, journal entries, contacts) from the external system into TrusteeData* tables.""" + """Submit a background job to import accounting data. + + Returns immediately with `{ jobId }`; clients poll `GET /api/jobs/{jobId}` + until status is SUCCESS / ERROR. + """ + from modules.serviceCenter.services.serviceBackgroundJobs import startJob + mandateId = _validateInstanceAccess(instanceId, context) - interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId) - from .accounting.accountingDataSync import AccountingDataSync - sync = AccountingDataSync(interface) - dateFrom = data.get("dateFrom") - dateTo = data.get("dateTo") - result = await sync.importData( - featureInstanceId=instanceId, + payload = { + "dateFrom": data.get("dateFrom"), + "dateTo": data.get("dateTo"), + } + jobId = await startJob( + TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE, + payload, mandateId=mandateId, - dateFrom=dateFrom, - dateTo=dateTo, + featureInstanceId=instanceId, + triggeredBy=context.user.id if context.user else None, ) - return result + return {"jobId": jobId, "status": "pending"} @router.get("/{instanceId}/accounting/import-status") @@ -1695,6 +1740,9 @@ def get_import_status( counts["lastSyncAt"] = cfg.get("lastSyncAt") counts["lastSyncStatus"] = cfg.get("lastSyncStatus") counts["lastSyncErrorMessage"] = cfg.get("lastSyncErrorMessage") + counts["lastSyncDateFrom"] = cfg.get("lastSyncDateFrom") + counts["lastSyncDateTo"] = cfg.get("lastSyncDateTo") + counts["lastSyncCounts"] = cfg.get("lastSyncCounts") return counts diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py index 2a88fecc..c754684f 100644 --- a/modules/interfaces/interfaceDbApp.py +++ b/modules/interfaces/interfaceDbApp.py @@ -19,6 +19,7 @@ from modules.connectors.connectorDbPostgre import DatabaseConnector, _get_cached from modules.shared.configuration import APP_CONFIG from modules.shared.dbRegistry import registerDatabase from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp +from modules.shared.i18nRegistry import resolveText from modules.interfaces.interfaceRbac import getRecordsetWithRBAC from modules.security.rbac import RbacClass from modules.datamodels.datamodelUam import ( @@ -1639,7 +1640,7 @@ class AppObjects: if not featureDef.get("autoCreateInstance", False): continue featureCode = featureDef.get("code", featureName) - featureLabel = featureDef.get("label", {}).get("en", featureName) + featureLabel = resolveText(featureDef.get("label", featureName)) instance = featureInterface.createFeatureInstance( featureCode=featureCode, mandateId=mandateId, diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py index 9b238df1..b0967259 100644 --- a/modules/routes/routeBilling.py +++ b/modules/routes/routeBilling.py @@ -935,11 +935,34 @@ async def stripeWebhook( return {"received": True} session_dict = session.to_dict_recursive() if hasattr(session, "to_dict_recursive") else dict(session) - result = _creditStripeSessionIfNeeded(billingInterface, session_dict, eventId=event_id) - logger.info( - f"Stripe webhook processed session {result.sessionId}: " - f"credited={result.credited}, alreadyCredited={result.alreadyCredited}" - ) + try: + result = _creditStripeSessionIfNeeded(billingInterface, session_dict, eventId=event_id) + logger.info( + f"Stripe webhook processed session {result.sessionId}: " + f"credited={result.credited}, alreadyCredited={result.alreadyCredited}" + ) + except HTTPException as he: + logger.error( + "Stripe webhook %s for session %s failed: status=%s detail=%s metadata=%s amount_total=%s", + event_id, + session_dict.get("id"), + he.status_code, + he.detail, + session_dict.get("metadata"), + session_dict.get("amount_total"), + ) + if 400 <= he.status_code < 500 and event_id: + if not billingInterface.getStripeWebhookEventByEventId(event_id): + try: + billingInterface.createStripeWebhookEvent(event_id) + logger.warning( + "Marked Stripe event %s as processed (permanent 4xx) to stop retries", + event_id, + ) + except Exception as markEx: + logger.error("Failed to mark Stripe event %s as processed: %s", event_id, markEx) + return {"received": True} + raise return {"received": True} @@ -1036,8 +1059,22 @@ def _handleSubscriptionCheckoutCompleted(session, eventId: str) -> None: operative = subInterface.getOperativeForMandate(mandateId) hasActivePredecessor = operative is not None and operative["id"] != subscriptionRecordId + predecessorIsTrial = ( + hasActivePredecessor + and operative.get("status") == SubscriptionStatusEnum.TRIALING.value + ) - if hasActivePredecessor: + if hasActivePredecessor and predecessorIsTrial: + try: + subInterface.forceExpire(operative["id"]) + logger.info( + "Trial subscription %s expired immediately for mandate %s due to paid upgrade %s", + operative["id"], mandateId, subscriptionRecordId, + ) + except Exception as e: + logger.error("Failed to expire trial predecessor %s: %s", operative["id"], e) + toStatus = SubscriptionStatusEnum.ACTIVE + elif hasActivePredecessor: toStatus = SubscriptionStatusEnum.SCHEDULED if operative.get("recurring", True): operativeStripeId = operative.get("stripeSubscriptionId") diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py index f5b6f3d4..82cf1624 100644 --- a/modules/routes/routeDataFiles.py +++ b/modules/routes/routeDataFiles.py @@ -23,6 +23,55 @@ routeApiMsg = apiRouteContext("routeDataFiles") logger = logging.getLogger(__name__) +def _resolveFileWithScope(currentUser: User, context: RequestContext, fileId: str): + """Returns (managementInterface, fileItem) with RBAC scoped to the file's own mandate/instance. + + Files generated by workflows (e.g. AI report outputs) carry their own + mandateId/featureInstanceId. Direct download links via cannot send + custom scope headers, so we resolve the scope from the FileItem itself and + re-check RBAC in that scope. + + Returns (None, None) if the file does not exist or the user lacks access + in the file's actual scope. + """ + requestMandateId = str(context.mandateId) if context.mandateId else None + requestInstanceId = str(context.featureInstanceId) if context.featureInstanceId else None + + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=requestMandateId, + featureInstanceId=requestInstanceId, + ) + fileItem = mgmt.getFile(fileId) + if fileItem: + return mgmt, fileItem + + metas = mgmt.db.getRecordset(FileItem, recordFilter={"id": fileId}) + if not metas: + return None, None + + meta = metas[0] + fileMandateId = meta.get("mandateId") or None + fileInstanceId = meta.get("featureInstanceId") or None + + if not fileMandateId and not fileInstanceId: + return None, None + + if fileMandateId == requestMandateId and fileInstanceId == requestInstanceId: + return None, None + + scopedMgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=fileMandateId, + featureInstanceId=fileInstanceId, + ) + fileItem = scopedMgmt.getFile(fileId) + if not fileItem: + return None, None + + return scopedMgmt, fileItem + + async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user): """Background task: pre-scan + extraction + knowledge indexing. Step 1: Structure Pre-Scan (AI-free) -> FileContentIndex (persisted) @@ -975,20 +1024,18 @@ def updateFileNeutralize( def get_file( request: Request, fileId: str = Path(..., description="ID of the file"), - currentUser: User = Depends(getCurrentUser) + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) ) -> FileItem: - """Get a file""" + """Get a file. Resolves the file's mandate/instance scope automatically.""" try: - managementInterface = interfaceDbManagement.getInterface(currentUser) - - # Get file via LucyDOM interface from the database - fileData = managementInterface.getFile(fileId) + _mgmt, fileData = _resolveFileWithScope(currentUser, context, fileId) if not fileData: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"File with ID {fileId} not found" ) - + return fileData except interfaceDbManagement.FileNotFoundError as e: @@ -1107,23 +1154,17 @@ def download_file( currentUser: User = Depends(getCurrentUser), context: RequestContext = Depends(getRequestContext) ) -> Response: - """Download a file. Uses mandate/instance context when present (e.g. from feature pages).""" + """Download a file. Resolves the file's mandate/instance scope automatically, + so direct links work even when X-Mandate-Id / X-Instance-Id headers + are not sent by the browser.""" try: - managementInterface = interfaceDbManagement.getInterface( - currentUser, - mandateId=str(context.mandateId) if context.mandateId else None, - featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None - ) - - # Get file data - fileData = managementInterface.getFile(fileId) + managementInterface, fileData = _resolveFileWithScope(currentUser, context, fileId) if not fileData: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"File with ID {fileId} not found" ) - - # Get file content + fileContent = managementInterface.getFileData(fileId) if not fileContent: raise HTTPException( @@ -1160,15 +1201,15 @@ def preview_file( currentUser: User = Depends(getCurrentUser), context: RequestContext = Depends(getRequestContext) ) -> FilePreview: - """Preview a file's content. Uses mandate/instance context when present.""" + """Preview a file's content. Resolves the file's mandate/instance scope automatically.""" try: - managementInterface = interfaceDbManagement.getInterface( - currentUser, - mandateId=str(context.mandateId) if context.mandateId else None, - featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None - ) - - # Get file preview using the correct method + managementInterface, fileMeta = _resolveFileWithScope(currentUser, context, fileId) + if not fileMeta: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"File with ID {fileId} not found" + ) + preview = managementInterface.getFileContent(fileId) if not preview: raise HTTPException( diff --git a/modules/routes/routeJobs.py b/modules/routes/routeJobs.py new file mode 100644 index 00000000..d2124a0b --- /dev/null +++ b/modules/routes/routeJobs.py @@ -0,0 +1,107 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""HTTP API for the generic background job service. + +Endpoints: +- GET /api/jobs/{jobId} -> single job status +- GET /api/jobs -> list (filter by jobType, instanceId) + +Access control: a caller may read a job iff they are a member of its mandate +(or PlatformAdmin). Jobs without a mandateId (system-wide) are restricted to +PlatformAdmin only. +""" + +import logging +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request + +from modules.auth import getRequestContext, RequestContext, limiter +from modules.serviceCenter.services.serviceBackgroundJobs import ( + getJobStatus, + listJobs, +) +from modules.shared.i18nRegistry import apiRouteContext + +logger = logging.getLogger(__name__) +routeApiMsg = apiRouteContext("routeJobs") + +router = APIRouter( + prefix="/api/jobs", + tags=["BackgroundJobs"], + responses={404: {"description": "Not found"}}, +) + + +def _serialiseJob(job: Dict[str, Any]) -> Dict[str, Any]: + """Strip system audit fields and ensure JSON-safe types.""" + return {k: v for k, v in job.items() if not k.startswith("sys")} + + +def _userHasMandateAccess(context: RequestContext, mandateId: Optional[str]) -> bool: + """Return True if the current user can read jobs for the given mandate scope.""" + if context is None or context.user is None: + return False + if context.isPlatformAdmin: + return True + if mandateId is None: + return False + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.datamodels.datamodelMembership import UserMandate + rootIf = getRootInterface() + try: + memberships = rootIf.db.getRecordset( + UserMandate, + recordFilter={"userId": context.user.id, "mandateId": mandateId}, + ) + return bool(memberships) + except Exception as ex: + logger.warning( + "Mandate access check failed for user=%s mandate=%s: %s", + context.user.id, mandateId, ex, + ) + return False + + +@router.get("/{jobId}") +@limiter.limit("60/minute") +def get_job( + request: Request, + jobId: str = Path(..., description="Background job ID"), + context: RequestContext = Depends(getRequestContext), +) -> Dict[str, Any]: + """Return the current state of one background job.""" + job = getJobStatus(jobId) + if not job: + raise HTTPException(status_code=404, detail=routeApiMsg("Job not found")) + if not _userHasMandateAccess(context, job.get("mandateId")): + raise HTTPException(status_code=403, detail=routeApiMsg("Access denied")) + return _serialiseJob(job) + + +@router.get("") +@limiter.limit("30/minute") +def list_jobs( + request: Request, + jobType: Optional[str] = Query(None), + mandateId: Optional[str] = Query(None), + instanceId: Optional[str] = Query(None, description="Feature instance scope"), + limit: int = Query(20, ge=1, le=100), + context: RequestContext = Depends(getRequestContext), +) -> Dict[str, List[Dict[str, Any]]]: + """List recent jobs filtered by scope. Newest first.""" + if mandateId is None: + if not context or not context.isPlatformAdmin: + raise HTTPException( + status_code=400, + detail=routeApiMsg("mandateId is required (only PlatformAdmin may list system-wide)"), + ) + elif not _userHasMandateAccess(context, mandateId): + raise HTTPException(status_code=403, detail=routeApiMsg("Access denied")) + jobs = listJobs( + mandateId=mandateId, + featureInstanceId=instanceId, + jobType=jobType, + limit=limit, + ) + return {"items": [_serialiseJob(j) for j in jobs]} diff --git a/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py b/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py new file mode 100644 index 00000000..e9d4c94c --- /dev/null +++ b/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py @@ -0,0 +1,19 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Background job service: generic, reusable infrastructure for long-running tasks.""" + +from .mainBackgroundJobService import ( + registerJobHandler, + startJob, + getJobStatus, + listJobs, + JobProgressCallback, +) + +__all__ = [ + "registerJobHandler", + "startJob", + "getJobStatus", + "listJobs", + "JobProgressCallback", +] diff --git a/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py new file mode 100644 index 00000000..37830fd1 --- /dev/null +++ b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py @@ -0,0 +1,245 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Background job service. + +Generic infrastructure for fire-and-forget async tasks. Any caller (HTTP route, +AI tool, scheduler) can submit work and get a `jobId` back immediately; status +is polled via `GET /api/jobs/{jobId}`. + +Usage (registration, once at module load): + + from modules.serviceCenter.services.serviceBackgroundJobs import registerJobHandler + + async def _myHandler(job, progressCb): + progressCb(10, "starting...") + result = await doExpensiveWork(job["payload"]) + return result # stored as job.result + + registerJobHandler("myJobType", _myHandler) + +Usage (submission): + + from modules.serviceCenter.services.serviceBackgroundJobs import startJob + jobId = await startJob("myJobType", {"foo": "bar"}, mandateId=mid, triggeredBy=userId) + return {"jobId": jobId} + +Restart semantics: jobs are tracked in DB. If the worker process dies mid-job, +`_recoverInterruptedJobs()` (called at boot) flips RUNNING jobs to ERROR with a +clear message. No silent zombies. +""" + +import asyncio +import logging +from datetime import datetime, timezone +from typing import Any, Awaitable, Callable, Dict, List, Optional + +from modules.connectors.connectorDbPostgre import DatabaseConnector +from modules.shared.configuration import APP_CONFIG +from modules.shared.dbRegistry import registerDatabase +from modules.datamodels.datamodelBackgroundJob import ( + BackgroundJob, + BackgroundJobStatusEnum, + TERMINAL_JOB_STATUSES, +) + +logger = logging.getLogger(__name__) + + +JOBS_DATABASE = APP_CONFIG.get("DB_DATABASE", "poweron_app") +registerDatabase(JOBS_DATABASE) + + +JobProgressCallback = Callable[[int, Optional[str]], None] +JobHandler = Callable[[Dict[str, Any], JobProgressCallback], Awaitable[Optional[Dict[str, Any]]]] + + +_JOB_HANDLERS: Dict[str, JobHandler] = {} + + +def registerJobHandler(jobType: str, handler: JobHandler) -> None: + """Register a handler for a job type. Idempotent — last registration wins.""" + if jobType in _JOB_HANDLERS and _JOB_HANDLERS[jobType] is not handler: + logger.info("Re-registering background job handler for type %s", jobType) + _JOB_HANDLERS[jobType] = handler + + +def _getDb() -> DatabaseConnector: + return DatabaseConnector( + dbDatabase=JOBS_DATABASE, + dbHost=APP_CONFIG.get("DB_HOST", "localhost"), + dbPort=int(APP_CONFIG.get("DB_PORT", "5432")), + dbUser=APP_CONFIG.get("DB_USER"), + dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"), + ) + + +def _serialiseDatetimes(data: Dict[str, Any]) -> Dict[str, Any]: + """Return copy of dict with datetime values converted to ISO 8601 strings.""" + out: Dict[str, Any] = {} + for k, v in data.items(): + if isinstance(v, datetime): + out[k] = v.isoformat() + else: + out[k] = v + return out + + +async def startJob( + jobType: str, + payload: Optional[Dict[str, Any]] = None, + *, + mandateId: Optional[str] = None, + featureInstanceId: Optional[str] = None, + triggeredBy: Optional[str] = None, +) -> str: + """Insert a new BackgroundJob, kick off its handler in the background, return jobId. + + Returns immediately; the handler runs via `asyncio.create_task`. + """ + if jobType not in _JOB_HANDLERS: + raise ValueError(f"No handler registered for jobType '{jobType}'") + + job = BackgroundJob( + jobType=jobType, + mandateId=mandateId, + featureInstanceId=featureInstanceId, + triggeredBy=triggeredBy, + payload=payload or {}, + ) + db = _getDb() + record = db.recordCreate(BackgroundJob, _serialiseDatetimes(job.model_dump())) + jobId = record["id"] + + asyncio.create_task(_runJob(jobId)) + logger.info( + "BackgroundJob %s submitted: type=%s mandate=%s instance=%s by=%s", + jobId, jobType, mandateId, featureInstanceId, triggeredBy, + ) + return jobId + + +def _loadJob(jobId: str) -> Optional[Dict[str, Any]]: + db = _getDb() + rows = db.getRecordset(BackgroundJob, recordFilter={"id": jobId}) + return dict(rows[0]) if rows else None + + +def _updateJob(jobId: str, fields: Dict[str, Any]) -> None: + db = _getDb() + db.recordModify(BackgroundJob, jobId, _serialiseDatetimes(fields)) + + +def _markStarted(jobId: str) -> None: + _updateJob(jobId, { + "status": BackgroundJobStatusEnum.RUNNING.value, + "startedAt": datetime.now(timezone.utc), + }) + + +def _markSuccess(jobId: str, result: Optional[Dict[str, Any]]) -> None: + _updateJob(jobId, { + "status": BackgroundJobStatusEnum.SUCCESS.value, + "result": result or {}, + "progress": 100, + "finishedAt": datetime.now(timezone.utc), + }) + + +def _markError(jobId: str, errorMessage: str) -> None: + truncated = (errorMessage or "")[:1000] + _updateJob(jobId, { + "status": BackgroundJobStatusEnum.ERROR.value, + "errorMessage": truncated, + "finishedAt": datetime.now(timezone.utc), + }) + + +def _makeProgressCallback(jobId: str) -> JobProgressCallback: + def _cb(progress: int, message: Optional[str] = None) -> None: + try: + clamped = max(0, min(100, int(progress))) + fields: Dict[str, Any] = {"progress": clamped} + if message is not None: + fields["progressMessage"] = message[:500] + _updateJob(jobId, fields) + except Exception as ex: + logger.warning("Progress update failed for job %s: %s", jobId, ex) + return _cb + + +async def _runJob(jobId: str) -> None: + job = _loadJob(jobId) + if not job: + logger.error("BackgroundJob %s vanished before runner started", jobId) + return + handler = _JOB_HANDLERS.get(job["jobType"]) + if not handler: + msg = f"No handler registered for jobType '{job['jobType']}'" + logger.error("BackgroundJob %s: %s", jobId, msg) + _markError(jobId, msg) + return + + _markStarted(jobId) + try: + result = await handler(job, _makeProgressCallback(jobId)) + _markSuccess(jobId, result if isinstance(result, dict) else None) + logger.info("BackgroundJob %s (%s) completed successfully", jobId, job["jobType"]) + except Exception as e: + logger.exception("BackgroundJob %s (%s) failed", jobId, job["jobType"]) + _markError(jobId, str(e)) + + +def getJobStatus(jobId: str) -> Optional[Dict[str, Any]]: + """Load current job state. Returns None if not found.""" + return _loadJob(jobId) + + +def listJobs( + *, + mandateId: Optional[str] = None, + featureInstanceId: Optional[str] = None, + jobType: Optional[str] = None, + limit: int = 20, +) -> List[Dict[str, Any]]: + """List recent jobs filtered by scope. Newest first.""" + db = _getDb() + rows = db.getRecordset(BackgroundJob) + out = [dict(r) for r in rows] + if mandateId is not None: + out = [r for r in out if r.get("mandateId") == mandateId] + if featureInstanceId is not None: + out = [r for r in out if r.get("featureInstanceId") == featureInstanceId] + if jobType is not None: + out = [r for r in out if r.get("jobType") == jobType] + out.sort(key=lambda r: r.get("createdAt") or "", reverse=True) + return out[:limit] + + +def isTerminalStatus(status: str) -> bool: + """True if the given status is one of {SUCCESS, ERROR, CANCELLED}.""" + return status in {s.value for s in TERMINAL_JOB_STATUSES} + + +def recoverInterruptedJobs() -> int: + """Flip any RUNNING jobs to ERROR (called at worker boot). + + A RUNNING job in the DB after process restart means the previous worker + died mid-execution; the asyncio task is gone and the job will never + finish on its own. + """ + db = _getDb() + try: + rows = db.getRecordset(BackgroundJob, recordFilter={"status": BackgroundJobStatusEnum.RUNNING.value}) + except Exception as ex: + logger.warning("recoverInterruptedJobs: failed to scan RUNNING jobs: %s", ex) + return 0 + count = 0 + for row in rows: + try: + _markError(row["id"], "Interrupted by worker restart") + count += 1 + except Exception as ex: + logger.warning("recoverInterruptedJobs: could not mark %s as ERROR: %s", row.get("id"), ex) + if count: + logger.warning("Recovered %d interrupted background job(s) after restart", count) + return count diff --git a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py index 681070b0..8a2ff8d5 100644 --- a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py +++ b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py @@ -179,6 +179,10 @@ class SubscriptionService: checkoutUrl = self._createCheckoutSession(mid, plan, created, currentOperative, returnUrl) created["redirectUrl"] = checkoutUrl except Exception as e: + logger.exception( + "Checkout creation failed for mandate %s, plan %s — force-expiring PENDING %s", + mid, planKey, created["id"], + ) self._interface.forceExpire(created["id"]) self.invalidateCache(mid) raise ValueError(f"Subscription konnte nicht erstellt werden: {e}") from e @@ -276,7 +280,11 @@ class SubscriptionService: }, } - if currentOperative and currentOperative.get("currentPeriodEnd"): + isTrialPredecessor = ( + currentOperative is not None + and currentOperative.get("status") == SubscriptionStatusEnum.TRIALING.value + ) + if currentOperative and currentOperative.get("currentPeriodEnd") and not isTrialPredecessor: periodEnd = currentOperative["currentPeriodEnd"] if isinstance(periodEnd, str): periodEnd = datetime.fromisoformat(periodEnd)