# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ User-facing Automation Workspace API. Lists workflow runs the user can access (via FeatureAccess on targetFeatureInstanceId) and provides detail views with step logs and linked files. Designed for the "Workspace" tab under Nutzung > Automation. """ import logging import math from typing import Optional from fastapi import APIRouter, Depends, Request, Query, Path, HTTPException from slowapi import Limiter from slowapi.util import get_remote_address from modules.auth.authentication import getRequestContext, RequestContext from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import ( AutoRun, AutoStepLog, AutoWorkflow, ) from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import graphicalEditorDatabase from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeAutomationWorkspace") logger = logging.getLogger(__name__) limiter = Limiter(key_func=get_remote_address) router = APIRouter(prefix="/api/automations/runs", tags=["AutomationWorkspace"]) def _getDb() -> DatabaseConnector: return DatabaseConnector( dbHost=APP_CONFIG.get("DB_HOST", "localhost"), dbDatabase=graphicalEditorDatabase, dbUser=APP_CONFIG.get("DB_USER"), dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"), dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), userId=None, ) def _getUserAccessibleInstanceIds(userId: str) -> list[str]: """Return all featureInstanceIds the user has enabled FeatureAccess for.""" from modules.interfaces.interfaceDbApp import getRootInterface rootIface = getRootInterface() allAccess = rootIface.getFeatureAccessesForUser(userId) or [] return [ a.featureInstanceId for a in allAccess if a.featureInstanceId and a.enabled ] @router.get("") @limiter.limit("60/minute") def listWorkspaceRuns( request: Request, scope: str = Query("mine", description="mine = own runs, mandate = all accessible"), status: Optional[str] = Query(None, description="Filter by run status"), targetInstanceId: Optional[str] = Query(None, description="Filter by targetFeatureInstanceId"), workflowId: Optional[str] = Query(None, description="Filter by workflow"), limit: int = Query(50, ge=1, le=200), offset: int = Query(0, ge=0), context: RequestContext = Depends(getRequestContext), ) -> dict: """List workflow runs visible to the user. scope=mine: only runs owned by the user. scope=mandate: all runs where the user has FeatureAccess on the workflow's targetFeatureInstanceId. """ db = _getDb() if not db._ensureTableExists(AutoRun): return {"runs": [], "total": 0, "limit": limit, "offset": offset} userId = str(context.user.id) if context.user else None if not userId: raise HTTPException(status_code=401, detail=routeApiMsg("Authentication required")) accessibleInstanceIds = _getUserAccessibleInstanceIds(userId) if not accessibleInstanceIds: return {"runs": [], "total": 0, "limit": limit, "offset": offset} if not db._ensureTableExists(AutoWorkflow): return {"runs": [], "total": 0, "limit": limit, "offset": offset} wfFilter: dict = {} if targetInstanceId: if targetInstanceId not in accessibleInstanceIds: raise HTTPException(status_code=403, detail=routeApiMsg("Access denied to target instance")) wfFilter["targetFeatureInstanceId"] = targetInstanceId workflows = db.getRecordset(AutoWorkflow, recordFilter=wfFilter or None) or [] visibleWfIds: set[str] = set() wfMap: dict = {} for wf in workflows: wfDict = dict(wf) tid = wfDict.get("targetFeatureInstanceId") or wfDict.get("featureInstanceId") if tid and tid in accessibleInstanceIds: wfId = wfDict.get("id") if wfId: visibleWfIds.add(wfId) wfMap[wfId] = wfDict if workflowId: if workflowId not in visibleWfIds: return {"runs": [], "total": 0, "limit": limit, "offset": offset} visibleWfIds = {workflowId} if not visibleWfIds: return {"runs": [], "total": 0, "limit": limit, "offset": offset} allRuns = db.getRecordset(AutoRun, recordFilter={}) or [] filtered = [] for r in allRuns: row = dict(r) if row.get("workflowId") not in visibleWfIds: continue if scope == "mine" and row.get("ownerId") != userId: continue if status and row.get("status") != status: continue filtered.append(row) filtered.sort( key=lambda x: x.get("startedAt") or x.get("sysCreatedAt") or 0, reverse=True, ) total = len(filtered) page = filtered[offset: offset + limit] from modules.routes.routeHelpers import enrichRowsWithFkLabels, resolveMandateLabels, resolveInstanceLabels for row in page: wf = wfMap.get(row.get("workflowId"), {}) row["workflowLabel"] = row.get("label") or wf.get("label") or row.get("workflowId", "") row["targetFeatureInstanceId"] = wf.get("targetFeatureInstanceId") or wf.get("featureInstanceId") enrichRowsWithFkLabels( page, labelResolvers={ "mandateId": resolveMandateLabels, "targetFeatureInstanceId": resolveInstanceLabels, }, ) for row in page: row["targetInstanceLabel"] = row.pop("targetFeatureInstanceIdLabel", None) row["mandateLabel"] = row.pop("mandateIdLabel", None) return {"runs": page, "total": total, "limit": limit, "offset": offset} @router.get("/{runId}/detail") @limiter.limit("60/minute") def getWorkspaceRunDetail( request: Request, runId: str = Path(..., description="Run ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Get full detail for a single run: metadata, step logs, linked files.""" db = _getDb() userId = str(context.user.id) if context.user else None if not userId: raise HTTPException(status_code=401, detail=routeApiMsg("Authentication required")) if not db._ensureTableExists(AutoRun): raise HTTPException(status_code=404, detail=routeApiMsg("Run not found")) runs = db.getRecordset(AutoRun, recordFilter={"id": runId}) if not runs: raise HTTPException(status_code=404, detail=routeApiMsg("Run not found")) run = dict(runs[0]) wfId = run.get("workflowId") workflow: dict = {} if wfId and db._ensureTableExists(AutoWorkflow): wfs = db.getRecordset(AutoWorkflow, recordFilter={"id": wfId}) if wfs: workflow = dict(wfs[0]) tid = workflow.get("targetFeatureInstanceId") or workflow.get("featureInstanceId") accessibleIds = _getUserAccessibleInstanceIds(userId) isOwner = run.get("ownerId") == userId if not isOwner and (not tid or tid not in accessibleIds) and not context.isPlatformAdmin: raise HTTPException(status_code=403, detail=routeApiMsg("Access denied")) steps: list = [] if db._ensureTableExists(AutoStepLog): stepRecords = db.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or [] steps = [dict(s) for s in stepRecords] steps.sort(key=lambda s: s.get("startedAt") or 0) fileItems: list = [] try: from modules.datamodels.datamodelFiles import FileItem from modules.interfaces.interfaceDbManagement import ComponentObjects mgmtDb = ComponentObjects().db if mgmtDb._ensureTableExists(FileItem): nodeOutputs = run.get("nodeOutputs") or {} fileIds: set[str] = set() for nodeId, output in nodeOutputs.items(): if not isinstance(output, dict): continue for key in ("fileId", "documentId", "fileIds", "documents"): val = output.get(key) if isinstance(val, str) and val: fileIds.add(val) elif isinstance(val, list): for v in val: if isinstance(v, str) and v: fileIds.add(v) elif isinstance(v, dict) and v.get("id"): fileIds.add(v["id"]) for fid in fileIds: try: rec = mgmtDb.getRecord(FileItem, fid) if rec: fileItems.append(dict(rec)) except Exception: pass except Exception as e: logger.warning("getWorkspaceRunDetail: file lookup failed: %s", e) run["workflowLabel"] = run.get("label") or workflow.get("label") or wfId run["targetFeatureInstanceId"] = tid return { "run": run, "workflow": { "id": workflow.get("id"), "label": workflow.get("label"), "targetFeatureInstanceId": tid, "featureInstanceId": workflow.get("featureInstanceId"), "tags": workflow.get("tags", []), } if workflow else None, "steps": steps, "files": fileItems, }