From f29e0c9edc8fec865d0a00a97fcee0c95049feaa Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 20 Apr 2026 17:51:09 +0200
Subject: [PATCH] feature fixes
---
app.py | 12 +
modules/datamodels/datamodelBackgroundJob.py | 130 ++++++++++
.../accounting/accountingConnectorBase.py | 1 +
.../trustee/accounting/accountingDataSync.py | 39 ++-
.../connectors/accountingConnectorRma.py | 4 +
.../trustee/datamodelFeatureTrustee.py | 5 +-
.../features/trustee/routeFeatureTrustee.py | 72 ++++-
modules/interfaces/interfaceDbApp.py | 3 +-
modules/routes/routeBilling.py | 49 +++-
modules/routes/routeDataFiles.py | 93 +++++--
modules/routes/routeJobs.py | 107 ++++++++
.../serviceBackgroundJobs/__init__.py | 19 ++
.../mainBackgroundJobService.py | 245 ++++++++++++++++++
.../mainServiceSubscription.py | 10 +-
14 files changed, 731 insertions(+), 58 deletions(-)
create mode 100644 modules/datamodels/datamodelBackgroundJob.py
create mode 100644 modules/routes/routeJobs.py
create mode 100644 modules/serviceCenter/services/serviceBackgroundJobs/__init__.py
create mode 100644 modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
diff --git a/app.py b/app.py
index cb9377af..b59c1a25 100644
--- a/app.py
+++ b/app.py
@@ -380,6 +380,15 @@ async def lifespan(app: FastAPI):
from modules.shared.auditLogger import registerAuditLogCleanupScheduler
registerAuditLogCleanupScheduler()
+ # Recover background jobs that were RUNNING when the previous worker died
+ try:
+ from modules.serviceCenter.services.serviceBackgroundJobs.mainBackgroundJobService import (
+ recoverInterruptedJobs,
+ )
+ recoverInterruptedJobs()
+ except Exception as e:
+ logger.warning(f"BackgroundJob recovery failed (non-critical): {e}")
+
yield
# --- Stop Managers ---
@@ -627,6 +636,9 @@ app.include_router(billingRouter)
from modules.routes.routeSubscription import router as subscriptionRouter
app.include_router(subscriptionRouter)
+from modules.routes.routeJobs import router as jobsRouter
+app.include_router(jobsRouter)
+
# ============================================================================
# SYSTEM ROUTES (Navigation, etc.)
# ============================================================================
diff --git a/modules/datamodels/datamodelBackgroundJob.py b/modules/datamodels/datamodelBackgroundJob.py
new file mode 100644
index 00000000..45a26b2c
--- /dev/null
+++ b/modules/datamodels/datamodelBackgroundJob.py
@@ -0,0 +1,130 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Background job models: generic, reusable infrastructure for long-running tasks.
+
+A `BackgroundJob` record tracks the lifecycle of one async task that must not block
+the calling HTTP request. Any caller (HTTP route, AI tool, scheduled task) can:
+
+1. Register a handler once via `registerJobHandler(jobType, handler)`.
+2. Submit work via `startJob(jobType, payload, ...)` which returns a `jobId`
+ immediately and runs the handler in the background.
+3. Poll `getJobStatus(jobId)` (HTTP `GET /api/jobs/{jobId}`) until `status` is
+ one of {SUCCESS, ERROR, CANCELLED}.
+
+See `modules.serviceCenter.services.serviceBackgroundJobs.mainBackgroundJobService`.
+"""
+
+from typing import Any, Dict, Optional
+from enum import Enum
+from datetime import datetime, timezone
+import uuid
+
+from pydantic import Field
+
+from modules.datamodels.datamodelBase import PowerOnModel
+from modules.shared.i18nRegistry import i18nModel
+
+
+class BackgroundJobStatusEnum(str, Enum):
+ """Lifecycle status of a background job."""
+ PENDING = "PENDING"
+ RUNNING = "RUNNING"
+ SUCCESS = "SUCCESS"
+ ERROR = "ERROR"
+ CANCELLED = "CANCELLED"
+
+
+TERMINAL_JOB_STATUSES = {
+ BackgroundJobStatusEnum.SUCCESS,
+ BackgroundJobStatusEnum.ERROR,
+ BackgroundJobStatusEnum.CANCELLED,
+}
+
+
+@i18nModel("Hintergrund-Job")
+class BackgroundJob(PowerOnModel):
+ """Generic record describing a long-running asynchronous task.
+
+ Scope: the combination of `mandateId` and optionally `featureInstanceId`
+ is used for access control on `GET /api/jobs/{jobId}`.
+ """
+
+ id: str = Field(
+ default_factory=lambda: str(uuid.uuid4()),
+ description="Primary key",
+ json_schema_extra={"label": "ID"},
+ )
+ jobType: str = Field(
+ ...,
+ description="Handler key registered via registerJobHandler() (e.g. 'trusteeAccountingSync')",
+ json_schema_extra={"label": "Typ"},
+ )
+ mandateId: Optional[str] = Field(
+ None,
+ description="Mandate scope (used for access checks). None for system-wide jobs.",
+ json_schema_extra={
+ "label": "Mandanten-ID",
+ "fk_target": {"db": "poweron_app", "table": "Mandate"},
+ },
+ )
+ featureInstanceId: Optional[str] = Field(
+ None,
+ description="Feature instance scope (optional)",
+ json_schema_extra={
+ "label": "Feature-Instanz",
+ "fk_target": {"db": "poweron_app", "table": "FeatureInstance"},
+ },
+ )
+ triggeredBy: Optional[str] = Field(
+ None,
+ description="UserId or 'ai-tool:' / 'scheduler:'",
+ json_schema_extra={"label": "Ausgeloest von"},
+ )
+
+ status: str = Field(
+ default=BackgroundJobStatusEnum.PENDING.value,
+ description="Current lifecycle status",
+ json_schema_extra={"label": "Status"},
+ )
+ progress: int = Field(
+ default=0,
+ description="Progress 0..100 (best-effort; may stay 0 for handlers that cannot estimate)",
+ json_schema_extra={"label": "Fortschritt"},
+ )
+ progressMessage: Optional[str] = Field(
+ None,
+ description="Human-readable current step (e.g. 'Importing journal entries...')",
+ json_schema_extra={"label": "Fortschritts-Nachricht"},
+ )
+
+ payload: Dict[str, Any] = Field(
+ default_factory=dict,
+ description="Job input parameters (JSON)",
+ json_schema_extra={"label": "Eingabe"},
+ )
+ result: Optional[Dict[str, Any]] = Field(
+ None,
+ description="Handler return value on success (JSON)",
+ json_schema_extra={"label": "Ergebnis"},
+ )
+ errorMessage: Optional[str] = Field(
+ None,
+ description="Truncated error message on failure (full stack trace in logs)",
+ json_schema_extra={"label": "Fehler"},
+ )
+
+ createdAt: datetime = Field(
+ default_factory=lambda: datetime.now(timezone.utc),
+ description="When the job was submitted",
+ json_schema_extra={"label": "Eingereicht"},
+ )
+ startedAt: Optional[datetime] = Field(
+ None,
+ description="When the handler began running",
+ json_schema_extra={"label": "Gestartet"},
+ )
+ finishedAt: Optional[datetime] = Field(
+ None,
+ description="When the handler reached a terminal status",
+ json_schema_extra={"label": "Beendet"},
+ )
diff --git a/modules/features/trustee/accounting/accountingConnectorBase.py b/modules/features/trustee/accounting/accountingConnectorBase.py
index 355b6f34..c5124184 100644
--- a/modules/features/trustee/accounting/accountingConnectorBase.py
+++ b/modules/features/trustee/accounting/accountingConnectorBase.py
@@ -56,6 +56,7 @@ class ConnectorConfigField(BaseModel):
secret: bool = False
required: bool = True
placeholder: Optional[str] = None
+ suggestions: Optional[List[str]] = None
class BaseAccountingConnector(ABC):
diff --git a/modules/features/trustee/accounting/accountingDataSync.py b/modules/features/trustee/accounting/accountingDataSync.py
index e0584a02..e422566f 100644
--- a/modules/features/trustee/accounting/accountingDataSync.py
+++ b/modules/features/trustee/accounting/accountingDataSync.py
@@ -215,17 +215,34 @@ class AccountingDataSync:
logger.error(f"Compute balances failed: {e}")
summary["errors"].append(f"Balances: {e}")
- # Update config with last import timestamp
- try:
- cfgId = cfgRecord.get("id")
- if cfgId:
- self._if.db.recordModify(TrusteeAccountingConfig, cfgId, {
- "lastSyncAt": time.time(),
- "lastSyncStatus": "success" if not summary["errors"] else "partial",
- "lastSyncErrorMessage": "; ".join(summary["errors"])[:500] if summary["errors"] else None,
- })
- except Exception:
- pass
+ cfgId = cfgRecord.get("id")
+ if cfgId:
+ corePayload = {
+ "lastSyncAt": time.time(),
+ "lastSyncStatus": "success" if not summary["errors"] else "partial",
+ "lastSyncErrorMessage": "; ".join(summary["errors"])[:500] if summary["errors"] else None,
+ }
+ try:
+ self._if.db.recordModify(TrusteeAccountingConfig, cfgId, corePayload)
+ except Exception as coreErr:
+ logger.exception(f"AccountingDataSync: failed to write core lastSync* fields for cfg {cfgId}: {coreErr}")
+ summary["errors"].append(f"Persist lastSync core: {coreErr}")
+ extPayload = {
+ "lastSyncDateFrom": dateFrom,
+ "lastSyncDateTo": dateTo,
+ "lastSyncCounts": {
+ "accounts": int(summary.get("accounts", 0)),
+ "journalEntries": int(summary.get("journalEntries", 0)),
+ "journalLines": int(summary.get("journalLines", 0)),
+ "contacts": int(summary.get("contacts", 0)),
+ "accountBalances": int(summary.get("accountBalances", 0)),
+ },
+ }
+ try:
+ self._if.db.recordModify(TrusteeAccountingConfig, cfgId, extPayload)
+ except Exception as extErr:
+ logger.exception(f"AccountingDataSync: failed to write extended lastSync* fields for cfg {cfgId}: {extErr}")
+ summary["errors"].append(f"Persist lastSync ext: {extErr}")
summary["finishedAt"] = time.time()
summary["durationSeconds"] = round(summary["finishedAt"] - summary["startedAt"], 1)
diff --git a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py
index bcf52561..79a61d77 100644
--- a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py
+++ b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py
@@ -47,6 +47,10 @@ class AccountingConnectorRma(BaseAccountingConnector):
fieldType="text",
secret=False,
placeholder="https://service.runmyaccounts.com/api/latest/clients/",
+ suggestions=[
+ "https://service.runmyaccounts.com/api/latest/clients/",
+ "https://service.int.runmyaccounts.com/api/latest/clients/",
+ ],
),
ConnectorConfigField(
key="clientName",
diff --git a/modules/features/trustee/datamodelFeatureTrustee.py b/modules/features/trustee/datamodelFeatureTrustee.py
index 5d1b4263..fcf5c8b4 100644
--- a/modules/features/trustee/datamodelFeatureTrustee.py
+++ b/modules/features/trustee/datamodelFeatureTrustee.py
@@ -3,7 +3,7 @@
"""Trustee models: TrusteeOrganisation, TrusteeRole, TrusteeAccess, TrusteeContract, TrusteeDocument, TrusteePosition."""
from enum import Enum
-from typing import Optional
+from typing import Optional, Dict
from pydantic import BaseModel, Field
from modules.datamodels.datamodelBase import PowerOnModel
@@ -818,6 +818,9 @@ class TrusteeAccountingConfig(PowerOnModel):
lastSyncAt: Optional[float] = Field(default=None, description="Timestamp of last sync attempt", json_schema_extra={"label": "Letzte Synchronisation"})
lastSyncStatus: Optional[str] = Field(default=None, description="Last sync result: success, error, partial", json_schema_extra={"label": "Status"})
lastSyncErrorMessage: Optional[str] = Field(default=None, description="Error message when lastSyncStatus is error", json_schema_extra={"label": "Fehlermeldung"})
+ lastSyncDateFrom: Optional[str] = Field(default=None, description="dateFrom (ISO date) of the last data import window", json_schema_extra={"label": "Letztes Import-Fenster von"})
+ lastSyncDateTo: Optional[str] = Field(default=None, description="dateTo (ISO date) of the last data import window", json_schema_extra={"label": "Letztes Import-Fenster bis"})
+ lastSyncCounts: Optional[Dict[str, int]] = Field(default=None, description="Per-entity counts of the last import (accounts, journalEntries, journalLines, contacts, accountBalances)", json_schema_extra={"label": "Letzte Import-Zaehler"})
cachedChartOfAccounts: Optional[str] = Field(default=None, description="JSON-serialised chart of accounts cache (list of {accountNumber, label, accountType})", json_schema_extra={"label": "Cached Kontoplan"})
chartCachedAt: Optional[float] = Field(default=None, description="Timestamp when cachedChartOfAccounts was last refreshed", json_schema_extra={"label": "Kontoplan-Cache-Zeitpunkt"})
mandateId: Optional[str] = Field(default=None, json_schema_extra={"label": "Mandat", "fk_target": {"db": "poweron_app", "table": "Mandate"}})
diff --git a/modules/features/trustee/routeFeatureTrustee.py b/modules/features/trustee/routeFeatureTrustee.py
index 73752788..7b80189e 100644
--- a/modules/features/trustee/routeFeatureTrustee.py
+++ b/modules/features/trustee/routeFeatureTrustee.py
@@ -1643,7 +1643,46 @@ def get_position_sync_status(
# ===== Accounting Data Import =====
-@router.post("/{instanceId}/accounting/import-data")
+TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE = "trusteeAccountingSync"
+
+
+async def _trusteeAccountingSyncJobHandler(job: Dict[str, Any], progressCb) -> Dict[str, Any]:
+ """BackgroundJob handler: imports accounting data from the external system.
+
+ Reads inputs from `job["payload"]` (dateFrom, dateTo, userId) and runs
+ `AccountingDataSync.importData(...)` in the worker's event loop without
+ blocking the original HTTP request that submitted the job.
+ """
+ from modules.security.rootAccess import getRootUser
+ from .accounting.accountingDataSync import AccountingDataSync
+
+ instanceId = job["featureInstanceId"]
+ mandateId = job["mandateId"]
+ payload = job.get("payload") or {}
+ rootUser = getRootUser()
+
+ progressCb(5, "Initialisiere Import...")
+ interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId)
+ sync = AccountingDataSync(interface)
+ progressCb(10, "Lese Daten vom Buchhaltungssystem...")
+ result = await sync.importData(
+ featureInstanceId=instanceId,
+ mandateId=mandateId,
+ dateFrom=payload.get("dateFrom"),
+ dateTo=payload.get("dateTo"),
+ )
+ progressCb(100, "Import abgeschlossen")
+ return result
+
+
+try:
+ from modules.serviceCenter.services.serviceBackgroundJobs import registerJobHandler
+ registerJobHandler(TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE, _trusteeAccountingSyncJobHandler)
+except Exception as _regErr:
+ logger.warning("Failed to register trusteeAccountingSync job handler: %s", _regErr)
+
+
+@router.post("/{instanceId}/accounting/import-data", status_code=status.HTTP_202_ACCEPTED)
@limiter.limit("3/minute")
async def import_accounting_data(
request: Request,
@@ -1651,20 +1690,26 @@ async def import_accounting_data(
data: Dict[str, Any] = Body(default={}),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
- """Import accounting data (chart, journal entries, contacts) from the external system into TrusteeData* tables."""
+ """Submit a background job to import accounting data.
+
+ Returns immediately with `{ jobId }`; clients poll `GET /api/jobs/{jobId}`
+ until status is SUCCESS / ERROR.
+ """
+ from modules.serviceCenter.services.serviceBackgroundJobs import startJob
+
mandateId = _validateInstanceAccess(instanceId, context)
- interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
- from .accounting.accountingDataSync import AccountingDataSync
- sync = AccountingDataSync(interface)
- dateFrom = data.get("dateFrom")
- dateTo = data.get("dateTo")
- result = await sync.importData(
- featureInstanceId=instanceId,
+ payload = {
+ "dateFrom": data.get("dateFrom"),
+ "dateTo": data.get("dateTo"),
+ }
+ jobId = await startJob(
+ TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE,
+ payload,
mandateId=mandateId,
- dateFrom=dateFrom,
- dateTo=dateTo,
+ featureInstanceId=instanceId,
+ triggeredBy=context.user.id if context.user else None,
)
- return result
+ return {"jobId": jobId, "status": "pending"}
@router.get("/{instanceId}/accounting/import-status")
@@ -1695,6 +1740,9 @@ def get_import_status(
counts["lastSyncAt"] = cfg.get("lastSyncAt")
counts["lastSyncStatus"] = cfg.get("lastSyncStatus")
counts["lastSyncErrorMessage"] = cfg.get("lastSyncErrorMessage")
+ counts["lastSyncDateFrom"] = cfg.get("lastSyncDateFrom")
+ counts["lastSyncDateTo"] = cfg.get("lastSyncDateTo")
+ counts["lastSyncCounts"] = cfg.get("lastSyncCounts")
return counts
diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py
index 2a88fecc..c754684f 100644
--- a/modules/interfaces/interfaceDbApp.py
+++ b/modules/interfaces/interfaceDbApp.py
@@ -19,6 +19,7 @@ from modules.connectors.connectorDbPostgre import DatabaseConnector, _get_cached
from modules.shared.configuration import APP_CONFIG
from modules.shared.dbRegistry import registerDatabase
from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp
+from modules.shared.i18nRegistry import resolveText
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
from modules.security.rbac import RbacClass
from modules.datamodels.datamodelUam import (
@@ -1639,7 +1640,7 @@ class AppObjects:
if not featureDef.get("autoCreateInstance", False):
continue
featureCode = featureDef.get("code", featureName)
- featureLabel = featureDef.get("label", {}).get("en", featureName)
+ featureLabel = resolveText(featureDef.get("label", featureName))
instance = featureInterface.createFeatureInstance(
featureCode=featureCode,
mandateId=mandateId,
diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py
index 9b238df1..b0967259 100644
--- a/modules/routes/routeBilling.py
+++ b/modules/routes/routeBilling.py
@@ -935,11 +935,34 @@ async def stripeWebhook(
return {"received": True}
session_dict = session.to_dict_recursive() if hasattr(session, "to_dict_recursive") else dict(session)
- result = _creditStripeSessionIfNeeded(billingInterface, session_dict, eventId=event_id)
- logger.info(
- f"Stripe webhook processed session {result.sessionId}: "
- f"credited={result.credited}, alreadyCredited={result.alreadyCredited}"
- )
+ try:
+ result = _creditStripeSessionIfNeeded(billingInterface, session_dict, eventId=event_id)
+ logger.info(
+ f"Stripe webhook processed session {result.sessionId}: "
+ f"credited={result.credited}, alreadyCredited={result.alreadyCredited}"
+ )
+ except HTTPException as he:
+ logger.error(
+ "Stripe webhook %s for session %s failed: status=%s detail=%s metadata=%s amount_total=%s",
+ event_id,
+ session_dict.get("id"),
+ he.status_code,
+ he.detail,
+ session_dict.get("metadata"),
+ session_dict.get("amount_total"),
+ )
+ if 400 <= he.status_code < 500 and event_id:
+ if not billingInterface.getStripeWebhookEventByEventId(event_id):
+ try:
+ billingInterface.createStripeWebhookEvent(event_id)
+ logger.warning(
+ "Marked Stripe event %s as processed (permanent 4xx) to stop retries",
+ event_id,
+ )
+ except Exception as markEx:
+ logger.error("Failed to mark Stripe event %s as processed: %s", event_id, markEx)
+ return {"received": True}
+ raise
return {"received": True}
@@ -1036,8 +1059,22 @@ def _handleSubscriptionCheckoutCompleted(session, eventId: str) -> None:
operative = subInterface.getOperativeForMandate(mandateId)
hasActivePredecessor = operative is not None and operative["id"] != subscriptionRecordId
+ predecessorIsTrial = (
+ hasActivePredecessor
+ and operative.get("status") == SubscriptionStatusEnum.TRIALING.value
+ )
- if hasActivePredecessor:
+ if hasActivePredecessor and predecessorIsTrial:
+ try:
+ subInterface.forceExpire(operative["id"])
+ logger.info(
+ "Trial subscription %s expired immediately for mandate %s due to paid upgrade %s",
+ operative["id"], mandateId, subscriptionRecordId,
+ )
+ except Exception as e:
+ logger.error("Failed to expire trial predecessor %s: %s", operative["id"], e)
+ toStatus = SubscriptionStatusEnum.ACTIVE
+ elif hasActivePredecessor:
toStatus = SubscriptionStatusEnum.SCHEDULED
if operative.get("recurring", True):
operativeStripeId = operative.get("stripeSubscriptionId")
diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py
index f5b6f3d4..82cf1624 100644
--- a/modules/routes/routeDataFiles.py
+++ b/modules/routes/routeDataFiles.py
@@ -23,6 +23,55 @@ routeApiMsg = apiRouteContext("routeDataFiles")
logger = logging.getLogger(__name__)
+def _resolveFileWithScope(currentUser: User, context: RequestContext, fileId: str):
+ """Returns (managementInterface, fileItem) with RBAC scoped to the file's own mandate/instance.
+
+ Files generated by workflows (e.g. AI report outputs) carry their own
+ mandateId/featureInstanceId. Direct download links via cannot send
+ custom scope headers, so we resolve the scope from the FileItem itself and
+ re-check RBAC in that scope.
+
+ Returns (None, None) if the file does not exist or the user lacks access
+ in the file's actual scope.
+ """
+ requestMandateId = str(context.mandateId) if context.mandateId else None
+ requestInstanceId = str(context.featureInstanceId) if context.featureInstanceId else None
+
+ mgmt = interfaceDbManagement.getInterface(
+ currentUser,
+ mandateId=requestMandateId,
+ featureInstanceId=requestInstanceId,
+ )
+ fileItem = mgmt.getFile(fileId)
+ if fileItem:
+ return mgmt, fileItem
+
+ metas = mgmt.db.getRecordset(FileItem, recordFilter={"id": fileId})
+ if not metas:
+ return None, None
+
+ meta = metas[0]
+ fileMandateId = meta.get("mandateId") or None
+ fileInstanceId = meta.get("featureInstanceId") or None
+
+ if not fileMandateId and not fileInstanceId:
+ return None, None
+
+ if fileMandateId == requestMandateId and fileInstanceId == requestInstanceId:
+ return None, None
+
+ scopedMgmt = interfaceDbManagement.getInterface(
+ currentUser,
+ mandateId=fileMandateId,
+ featureInstanceId=fileInstanceId,
+ )
+ fileItem = scopedMgmt.getFile(fileId)
+ if not fileItem:
+ return None, None
+
+ return scopedMgmt, fileItem
+
+
async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user):
"""Background task: pre-scan + extraction + knowledge indexing.
Step 1: Structure Pre-Scan (AI-free) -> FileContentIndex (persisted)
@@ -975,20 +1024,18 @@ def updateFileNeutralize(
def get_file(
request: Request,
fileId: str = Path(..., description="ID of the file"),
- currentUser: User = Depends(getCurrentUser)
+ currentUser: User = Depends(getCurrentUser),
+ context: RequestContext = Depends(getRequestContext)
) -> FileItem:
- """Get a file"""
+ """Get a file. Resolves the file's mandate/instance scope automatically."""
try:
- managementInterface = interfaceDbManagement.getInterface(currentUser)
-
- # Get file via LucyDOM interface from the database
- fileData = managementInterface.getFile(fileId)
+ _mgmt, fileData = _resolveFileWithScope(currentUser, context, fileId)
if not fileData:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found"
)
-
+
return fileData
except interfaceDbManagement.FileNotFoundError as e:
@@ -1107,23 +1154,17 @@ def download_file(
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Response:
- """Download a file. Uses mandate/instance context when present (e.g. from feature pages)."""
+ """Download a file. Resolves the file's mandate/instance scope automatically,
+ so direct links work even when X-Mandate-Id / X-Instance-Id headers
+ are not sent by the browser."""
try:
- managementInterface = interfaceDbManagement.getInterface(
- currentUser,
- mandateId=str(context.mandateId) if context.mandateId else None,
- featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None
- )
-
- # Get file data
- fileData = managementInterface.getFile(fileId)
+ managementInterface, fileData = _resolveFileWithScope(currentUser, context, fileId)
if not fileData:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found"
)
-
- # Get file content
+
fileContent = managementInterface.getFileData(fileId)
if not fileContent:
raise HTTPException(
@@ -1160,15 +1201,15 @@ def preview_file(
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> FilePreview:
- """Preview a file's content. Uses mandate/instance context when present."""
+ """Preview a file's content. Resolves the file's mandate/instance scope automatically."""
try:
- managementInterface = interfaceDbManagement.getInterface(
- currentUser,
- mandateId=str(context.mandateId) if context.mandateId else None,
- featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None
- )
-
- # Get file preview using the correct method
+ managementInterface, fileMeta = _resolveFileWithScope(currentUser, context, fileId)
+ if not fileMeta:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"File with ID {fileId} not found"
+ )
+
preview = managementInterface.getFileContent(fileId)
if not preview:
raise HTTPException(
diff --git a/modules/routes/routeJobs.py b/modules/routes/routeJobs.py
new file mode 100644
index 00000000..d2124a0b
--- /dev/null
+++ b/modules/routes/routeJobs.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""HTTP API for the generic background job service.
+
+Endpoints:
+- GET /api/jobs/{jobId} -> single job status
+- GET /api/jobs -> list (filter by jobType, instanceId)
+
+Access control: a caller may read a job iff they are a member of its mandate
+(or PlatformAdmin). Jobs without a mandateId (system-wide) are restricted to
+PlatformAdmin only.
+"""
+
+import logging
+from typing import Any, Dict, List, Optional
+
+from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request
+
+from modules.auth import getRequestContext, RequestContext, limiter
+from modules.serviceCenter.services.serviceBackgroundJobs import (
+ getJobStatus,
+ listJobs,
+)
+from modules.shared.i18nRegistry import apiRouteContext
+
+logger = logging.getLogger(__name__)
+routeApiMsg = apiRouteContext("routeJobs")
+
+router = APIRouter(
+ prefix="/api/jobs",
+ tags=["BackgroundJobs"],
+ responses={404: {"description": "Not found"}},
+)
+
+
+def _serialiseJob(job: Dict[str, Any]) -> Dict[str, Any]:
+ """Strip system audit fields and ensure JSON-safe types."""
+ return {k: v for k, v in job.items() if not k.startswith("sys")}
+
+
+def _userHasMandateAccess(context: RequestContext, mandateId: Optional[str]) -> bool:
+ """Return True if the current user can read jobs for the given mandate scope."""
+ if context is None or context.user is None:
+ return False
+ if context.isPlatformAdmin:
+ return True
+ if mandateId is None:
+ return False
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.datamodels.datamodelMembership import UserMandate
+ rootIf = getRootInterface()
+ try:
+ memberships = rootIf.db.getRecordset(
+ UserMandate,
+ recordFilter={"userId": context.user.id, "mandateId": mandateId},
+ )
+ return bool(memberships)
+ except Exception as ex:
+ logger.warning(
+ "Mandate access check failed for user=%s mandate=%s: %s",
+ context.user.id, mandateId, ex,
+ )
+ return False
+
+
+@router.get("/{jobId}")
+@limiter.limit("60/minute")
+def get_job(
+ request: Request,
+ jobId: str = Path(..., description="Background job ID"),
+ context: RequestContext = Depends(getRequestContext),
+) -> Dict[str, Any]:
+ """Return the current state of one background job."""
+ job = getJobStatus(jobId)
+ if not job:
+ raise HTTPException(status_code=404, detail=routeApiMsg("Job not found"))
+ if not _userHasMandateAccess(context, job.get("mandateId")):
+ raise HTTPException(status_code=403, detail=routeApiMsg("Access denied"))
+ return _serialiseJob(job)
+
+
+@router.get("")
+@limiter.limit("30/minute")
+def list_jobs(
+ request: Request,
+ jobType: Optional[str] = Query(None),
+ mandateId: Optional[str] = Query(None),
+ instanceId: Optional[str] = Query(None, description="Feature instance scope"),
+ limit: int = Query(20, ge=1, le=100),
+ context: RequestContext = Depends(getRequestContext),
+) -> Dict[str, List[Dict[str, Any]]]:
+ """List recent jobs filtered by scope. Newest first."""
+ if mandateId is None:
+ if not context or not context.isPlatformAdmin:
+ raise HTTPException(
+ status_code=400,
+ detail=routeApiMsg("mandateId is required (only PlatformAdmin may list system-wide)"),
+ )
+ elif not _userHasMandateAccess(context, mandateId):
+ raise HTTPException(status_code=403, detail=routeApiMsg("Access denied"))
+ jobs = listJobs(
+ mandateId=mandateId,
+ featureInstanceId=instanceId,
+ jobType=jobType,
+ limit=limit,
+ )
+ return {"items": [_serialiseJob(j) for j in jobs]}
diff --git a/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py b/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py
new file mode 100644
index 00000000..e9d4c94c
--- /dev/null
+++ b/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py
@@ -0,0 +1,19 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Background job service: generic, reusable infrastructure for long-running tasks."""
+
+from .mainBackgroundJobService import (
+ registerJobHandler,
+ startJob,
+ getJobStatus,
+ listJobs,
+ JobProgressCallback,
+)
+
+__all__ = [
+ "registerJobHandler",
+ "startJob",
+ "getJobStatus",
+ "listJobs",
+ "JobProgressCallback",
+]
diff --git a/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
new file mode 100644
index 00000000..37830fd1
--- /dev/null
+++ b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
@@ -0,0 +1,245 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Background job service.
+
+Generic infrastructure for fire-and-forget async tasks. Any caller (HTTP route,
+AI tool, scheduler) can submit work and get a `jobId` back immediately; status
+is polled via `GET /api/jobs/{jobId}`.
+
+Usage (registration, once at module load):
+
+ from modules.serviceCenter.services.serviceBackgroundJobs import registerJobHandler
+
+ async def _myHandler(job, progressCb):
+ progressCb(10, "starting...")
+ result = await doExpensiveWork(job["payload"])
+ return result # stored as job.result
+
+ registerJobHandler("myJobType", _myHandler)
+
+Usage (submission):
+
+ from modules.serviceCenter.services.serviceBackgroundJobs import startJob
+ jobId = await startJob("myJobType", {"foo": "bar"}, mandateId=mid, triggeredBy=userId)
+ return {"jobId": jobId}
+
+Restart semantics: jobs are tracked in DB. If the worker process dies mid-job,
+`_recoverInterruptedJobs()` (called at boot) flips RUNNING jobs to ERROR with a
+clear message. No silent zombies.
+"""
+
+import asyncio
+import logging
+from datetime import datetime, timezone
+from typing import Any, Awaitable, Callable, Dict, List, Optional
+
+from modules.connectors.connectorDbPostgre import DatabaseConnector
+from modules.shared.configuration import APP_CONFIG
+from modules.shared.dbRegistry import registerDatabase
+from modules.datamodels.datamodelBackgroundJob import (
+ BackgroundJob,
+ BackgroundJobStatusEnum,
+ TERMINAL_JOB_STATUSES,
+)
+
+logger = logging.getLogger(__name__)
+
+
+JOBS_DATABASE = APP_CONFIG.get("DB_DATABASE", "poweron_app")
+registerDatabase(JOBS_DATABASE)
+
+
+JobProgressCallback = Callable[[int, Optional[str]], None]
+JobHandler = Callable[[Dict[str, Any], JobProgressCallback], Awaitable[Optional[Dict[str, Any]]]]
+
+
+_JOB_HANDLERS: Dict[str, JobHandler] = {}
+
+
+def registerJobHandler(jobType: str, handler: JobHandler) -> None:
+ """Register a handler for a job type. Idempotent — last registration wins."""
+ if jobType in _JOB_HANDLERS and _JOB_HANDLERS[jobType] is not handler:
+ logger.info("Re-registering background job handler for type %s", jobType)
+ _JOB_HANDLERS[jobType] = handler
+
+
+def _getDb() -> DatabaseConnector:
+ return DatabaseConnector(
+ dbDatabase=JOBS_DATABASE,
+ dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
+ dbPort=int(APP_CONFIG.get("DB_PORT", "5432")),
+ dbUser=APP_CONFIG.get("DB_USER"),
+ dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+
+def _serialiseDatetimes(data: Dict[str, Any]) -> Dict[str, Any]:
+ """Return copy of dict with datetime values converted to ISO 8601 strings."""
+ out: Dict[str, Any] = {}
+ for k, v in data.items():
+ if isinstance(v, datetime):
+ out[k] = v.isoformat()
+ else:
+ out[k] = v
+ return out
+
+
+async def startJob(
+ jobType: str,
+ payload: Optional[Dict[str, Any]] = None,
+ *,
+ mandateId: Optional[str] = None,
+ featureInstanceId: Optional[str] = None,
+ triggeredBy: Optional[str] = None,
+) -> str:
+ """Insert a new BackgroundJob, kick off its handler in the background, return jobId.
+
+ Returns immediately; the handler runs via `asyncio.create_task`.
+ """
+ if jobType not in _JOB_HANDLERS:
+ raise ValueError(f"No handler registered for jobType '{jobType}'")
+
+ job = BackgroundJob(
+ jobType=jobType,
+ mandateId=mandateId,
+ featureInstanceId=featureInstanceId,
+ triggeredBy=triggeredBy,
+ payload=payload or {},
+ )
+ db = _getDb()
+ record = db.recordCreate(BackgroundJob, _serialiseDatetimes(job.model_dump()))
+ jobId = record["id"]
+
+ asyncio.create_task(_runJob(jobId))
+ logger.info(
+ "BackgroundJob %s submitted: type=%s mandate=%s instance=%s by=%s",
+ jobId, jobType, mandateId, featureInstanceId, triggeredBy,
+ )
+ return jobId
+
+
+def _loadJob(jobId: str) -> Optional[Dict[str, Any]]:
+ db = _getDb()
+ rows = db.getRecordset(BackgroundJob, recordFilter={"id": jobId})
+ return dict(rows[0]) if rows else None
+
+
+def _updateJob(jobId: str, fields: Dict[str, Any]) -> None:
+ db = _getDb()
+ db.recordModify(BackgroundJob, jobId, _serialiseDatetimes(fields))
+
+
+def _markStarted(jobId: str) -> None:
+ _updateJob(jobId, {
+ "status": BackgroundJobStatusEnum.RUNNING.value,
+ "startedAt": datetime.now(timezone.utc),
+ })
+
+
+def _markSuccess(jobId: str, result: Optional[Dict[str, Any]]) -> None:
+ _updateJob(jobId, {
+ "status": BackgroundJobStatusEnum.SUCCESS.value,
+ "result": result or {},
+ "progress": 100,
+ "finishedAt": datetime.now(timezone.utc),
+ })
+
+
+def _markError(jobId: str, errorMessage: str) -> None:
+ truncated = (errorMessage or "")[:1000]
+ _updateJob(jobId, {
+ "status": BackgroundJobStatusEnum.ERROR.value,
+ "errorMessage": truncated,
+ "finishedAt": datetime.now(timezone.utc),
+ })
+
+
+def _makeProgressCallback(jobId: str) -> JobProgressCallback:
+ def _cb(progress: int, message: Optional[str] = None) -> None:
+ try:
+ clamped = max(0, min(100, int(progress)))
+ fields: Dict[str, Any] = {"progress": clamped}
+ if message is not None:
+ fields["progressMessage"] = message[:500]
+ _updateJob(jobId, fields)
+ except Exception as ex:
+ logger.warning("Progress update failed for job %s: %s", jobId, ex)
+ return _cb
+
+
+async def _runJob(jobId: str) -> None:
+ job = _loadJob(jobId)
+ if not job:
+ logger.error("BackgroundJob %s vanished before runner started", jobId)
+ return
+ handler = _JOB_HANDLERS.get(job["jobType"])
+ if not handler:
+ msg = f"No handler registered for jobType '{job['jobType']}'"
+ logger.error("BackgroundJob %s: %s", jobId, msg)
+ _markError(jobId, msg)
+ return
+
+ _markStarted(jobId)
+ try:
+ result = await handler(job, _makeProgressCallback(jobId))
+ _markSuccess(jobId, result if isinstance(result, dict) else None)
+ logger.info("BackgroundJob %s (%s) completed successfully", jobId, job["jobType"])
+ except Exception as e:
+ logger.exception("BackgroundJob %s (%s) failed", jobId, job["jobType"])
+ _markError(jobId, str(e))
+
+
+def getJobStatus(jobId: str) -> Optional[Dict[str, Any]]:
+ """Load current job state. Returns None if not found."""
+ return _loadJob(jobId)
+
+
+def listJobs(
+ *,
+ mandateId: Optional[str] = None,
+ featureInstanceId: Optional[str] = None,
+ jobType: Optional[str] = None,
+ limit: int = 20,
+) -> List[Dict[str, Any]]:
+ """List recent jobs filtered by scope. Newest first."""
+ db = _getDb()
+ rows = db.getRecordset(BackgroundJob)
+ out = [dict(r) for r in rows]
+ if mandateId is not None:
+ out = [r for r in out if r.get("mandateId") == mandateId]
+ if featureInstanceId is not None:
+ out = [r for r in out if r.get("featureInstanceId") == featureInstanceId]
+ if jobType is not None:
+ out = [r for r in out if r.get("jobType") == jobType]
+ out.sort(key=lambda r: r.get("createdAt") or "", reverse=True)
+ return out[:limit]
+
+
+def isTerminalStatus(status: str) -> bool:
+ """True if the given status is one of {SUCCESS, ERROR, CANCELLED}."""
+ return status in {s.value for s in TERMINAL_JOB_STATUSES}
+
+
+def recoverInterruptedJobs() -> int:
+ """Flip any RUNNING jobs to ERROR (called at worker boot).
+
+ A RUNNING job in the DB after process restart means the previous worker
+ died mid-execution; the asyncio task is gone and the job will never
+ finish on its own.
+ """
+ db = _getDb()
+ try:
+ rows = db.getRecordset(BackgroundJob, recordFilter={"status": BackgroundJobStatusEnum.RUNNING.value})
+ except Exception as ex:
+ logger.warning("recoverInterruptedJobs: failed to scan RUNNING jobs: %s", ex)
+ return 0
+ count = 0
+ for row in rows:
+ try:
+ _markError(row["id"], "Interrupted by worker restart")
+ count += 1
+ except Exception as ex:
+ logger.warning("recoverInterruptedJobs: could not mark %s as ERROR: %s", row.get("id"), ex)
+ if count:
+ logger.warning("Recovered %d interrupted background job(s) after restart", count)
+ return count
diff --git a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
index 681070b0..8a2ff8d5 100644
--- a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
+++ b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
@@ -179,6 +179,10 @@ class SubscriptionService:
checkoutUrl = self._createCheckoutSession(mid, plan, created, currentOperative, returnUrl)
created["redirectUrl"] = checkoutUrl
except Exception as e:
+ logger.exception(
+ "Checkout creation failed for mandate %s, plan %s — force-expiring PENDING %s",
+ mid, planKey, created["id"],
+ )
self._interface.forceExpire(created["id"])
self.invalidateCache(mid)
raise ValueError(f"Subscription konnte nicht erstellt werden: {e}") from e
@@ -276,7 +280,11 @@ class SubscriptionService:
},
}
- if currentOperative and currentOperative.get("currentPeriodEnd"):
+ isTrialPredecessor = (
+ currentOperative is not None
+ and currentOperative.get("status") == SubscriptionStatusEnum.TRIALING.value
+ )
+ if currentOperative and currentOperative.get("currentPeriodEnd") and not isTrialPredecessor:
periodEnd = currentOperative["currentPeriodEnd"]
if isinstance(periodEnd, str):
periodEnd = datetime.fromisoformat(periodEnd)