diff --git a/app.py b/app.py index c91212e3..d8104fad 100644 --- a/app.py +++ b/app.py @@ -481,7 +481,15 @@ async def lifespan(app: FastAPI): except RuntimeError: pass eventManager.start() - + + # --- WorkflowAutomation: Scheduler boot (System-Lifespan, not Feature-onStart) --- + try: + from modules.workflows.scheduler.mainScheduler import start as _startWorkflowScheduler + _startWorkflowScheduler(eventUser) + logger.info("WorkflowAutomation scheduler started (system lifespan)") + except Exception as e: + logger.error(f"WorkflowAutomation scheduler failed to start: {e}") + # Register audit log cleanup scheduler from modules.dbHelpers.auditLogger import registerAuditLogCleanupScheduler registerAuditLogCleanupScheduler() @@ -562,6 +570,18 @@ async def lifespan(app: FastAPI): # 3. Stop scheduler (removes all pending cron/interval jobs) eventManager.stop() + # 3.5 Stop WorkflowAutomation scheduler + email poller (System-Lifespan) + try: + from modules.workflows.scheduler.mainScheduler import stop as _stopWorkflowScheduler + _stopWorkflowScheduler() + except Exception as e: + logger.warning(f"WorkflowAutomation scheduler stop failed: {e}") + try: + from modules.features.graphicalEditor.emailPoller import stop as _stopEmailPoller + _stopEmailPoller(eventUser) + except Exception as e: + logger.warning(f"Email poller stop failed: {e}") + # 4. Stop Feature Containers (Plug&Play) try: mainModules = loadFeatureMainModules() @@ -849,6 +869,9 @@ app.include_router(workflowDashboardRouter) from modules.routes.routeAutomationWorkspace import router as automationWorkspaceRouter app.include_router(automationWorkspaceRouter) +from modules.routes.routeWorkflowAutomation import router as workflowAutomationRouter +app.include_router(workflowAutomationRouter) + # ============================================================================ # PLUG&PLAY FEATURE ROUTERS # Dynamically load routers from feature containers in modules/features/ diff --git a/modules/datamodels/datamodelNavigation.py b/modules/datamodels/datamodelNavigation.py index 2fc278ef..eb9d3b69 100644 --- a/modules/datamodels/datamodelNavigation.py +++ b/modules/datamodels/datamodelNavigation.py @@ -158,6 +158,54 @@ NAVIGATION_SECTIONS = [ }, ], }, + # --- Workflow-Automation (System-Komponente, cross-mandate) --- + { + "id": "workflowAutomation", + "title": t("Workflow-Automation"), + "order": 25, + "items": [ + { + "id": "wa-workflows", + "objectKey": "ui.system.workflowAutomation.workflows", + "label": t("Workflows"), + "icon": "FaSitemap", + "path": "/workflow-automation?tab=workflows", + "order": 10, + }, + { + "id": "wa-editor", + "objectKey": "ui.system.workflowAutomation.editor", + "label": t("Editor"), + "icon": "FaProjectDiagram", + "path": "/workflow-automation?tab=editor", + "order": 20, + }, + { + "id": "wa-templates", + "objectKey": "ui.system.workflowAutomation.templates", + "label": t("Vorlagen"), + "icon": "FaCopy", + "path": "/workflow-automation?tab=templates", + "order": 30, + }, + { + "id": "wa-runs", + "objectKey": "ui.system.workflowAutomation.runs", + "label": t("Läufe"), + "icon": "FaPlay", + "path": "/workflow-automation?tab=runs", + "order": 40, + }, + { + "id": "wa-tasks", + "objectKey": "ui.system.workflowAutomation.tasks", + "label": t("Tasks"), + "icon": "FaTasks", + "path": "/workflow-automation?tab=tasks", + "order": 50, + }, + ], + }, # --- Administration (with subgroups) --- { "id": "admin", diff --git a/modules/datamodels/datamodelWorkflowAutomation.py b/modules/datamodels/datamodelWorkflowAutomation.py index 5f9cb7b2..c9957c25 100644 --- a/modules/datamodels/datamodelWorkflowAutomation.py +++ b/modules/datamodels/datamodelWorkflowAutomation.py @@ -77,14 +77,26 @@ class AutoWorkflow(PowerOnModel): "fk_target": {"db": "poweron_app", "table": "Mandate", "labelField": "label"}, }, ) - featureInstanceId: str = Field( - description="Feature instance ID (GE owner instance / RBAC scope)", + featureInstanceId: Optional[str] = Field( + default=None, + description="Feature instance ID (legacy GE owner — being phased out; NULL for mandate-level workflows)", json_schema_extra={ "frontend_type": "text", "frontend_readonly": True, "frontend_required": False, "label": "Feature-Instanz-ID", - "fk_target": {"db": "poweron_app", "table": "FeatureInstance", "labelField": "label"}, + "fk_target": {"db": "poweron_app", "table": "FeatureInstance", "labelField": "label", "softFk": True}, + }, + ) + runAsPrincipal: Optional[str] = Field( + default=None, + description="Identity (userId or service-account) under which this workflow executes. Governs RBAC for data access at runtime.", + json_schema_extra={ + "frontend_type": "text", + "frontend_readonly": False, + "frontend_required": False, + "label": "Ausführungsidentität", + "fk_target": {"db": "poweron_app", "table": "UserInDB", "labelField": "username", "softFk": True}, }, ) targetFeatureInstanceId: Optional[str] = Field( diff --git a/modules/datamodels/serviceExceptions.py b/modules/datamodels/serviceExceptions.py index 2aa94d95..7585c6a9 100644 --- a/modules/datamodels/serviceExceptions.py +++ b/modules/datamodels/serviceExceptions.py @@ -144,3 +144,28 @@ class BillingContextError(Exception): def __init__(self, message: str = None): self.message = message or "Billing context incomplete - AI call blocked" super().__init__(self.message) + + +# ============================================================================ +# Workflow execution pause exceptions +# (Canonical location — formerly in automation2/executors/inputExecutor.py) +# ============================================================================ + +class PauseForHumanTaskError(Exception): + """Raised when execution must pause for a human task. Contains runId, taskId.""" + + def __init__(self, runId: str, taskId: str, nodeId: str): + self.runId = runId + self.taskId = taskId + self.nodeId = nodeId + super().__init__(f"Pause for human task {taskId} (run {runId}, node {nodeId})") + + +class PauseForEmailWaitError(Exception): + """Raised when execution must pause waiting for a new email. Background poller will resume.""" + + def __init__(self, runId: str, nodeId: str, waitConfig: Dict[str, Any]): + self.runId = runId + self.nodeId = nodeId + self.waitConfig = waitConfig + super().__init__(f"Pause for email wait (run {runId}, node {nodeId})") diff --git a/modules/demoConfigs/investorDemo2026.py b/modules/demoConfigs/investorDemo2026.py index 6855a63c..0f7b0863 100644 --- a/modules/demoConfigs/investorDemo2026.py +++ b/modules/demoConfigs/investorDemo2026.py @@ -44,7 +44,7 @@ _USER = { _FEATURES_HAPPYLIFE = [ {"code": "workspace", "label": "Dokumentenablage"}, {"code": "trustee", "label": "Buchhaltung"}, - {"code": "graphicalEditor", "label": "Automationen"}, + {"code": "graphicalEditor", "label": "Automationen"}, # DEPRECATED: migrated to WorkflowAutomation system component {"code": "neutralization", "label": "Datenschutz"}, ] _FEATURES_ALPINA = [ @@ -52,7 +52,7 @@ _FEATURES_ALPINA = [ {"code": "trustee", "label": "BUHA Müller Immobilien GmbH"}, {"code": "trustee", "label": "BUHA Schneider Gastro AG"}, {"code": "trustee", "label": "BUHA Weber Consulting"}, - {"code": "graphicalEditor", "label": "Automationen"}, + {"code": "graphicalEditor", "label": "Automationen"}, # DEPRECATED: migrated to WorkflowAutomation system component {"code": "neutralization", "label": "Datenschutz"}, ] diff --git a/modules/demoConfigs/pwgDemo2026.py b/modules/demoConfigs/pwgDemo2026.py index 968aabf8..6e21e45a 100644 --- a/modules/demoConfigs/pwgDemo2026.py +++ b/modules/demoConfigs/pwgDemo2026.py @@ -49,7 +49,7 @@ _USER = { _FEATURES_PWG = [ {"code": "workspace", "label": "Dokumentenablage PWG"}, {"code": "trustee", "label": "Buchhaltung PWG"}, - {"code": "graphicalEditor", "label": "PWG Automationen"}, + {"code": "graphicalEditor", "label": "PWG Automationen"}, # DEPRECATED: migrated to WorkflowAutomation system component {"code": "neutralization", "label": "Datenschutz"}, ] diff --git a/modules/features/graphicalEditor/mainGraphicalEditor.py b/modules/features/graphicalEditor/mainGraphicalEditor.py index bf50abb2..f88ccfdc 100644 --- a/modules/features/graphicalEditor/mainGraphicalEditor.py +++ b/modules/features/graphicalEditor/mainGraphicalEditor.py @@ -26,25 +26,6 @@ REQUIRED_SERVICES = [ {"serviceKey": "generation", "meta": {"usage": "file.create document rendering"}}, ] FEATURE_LABEL = t("Grafischer Editor", context="UI") -FEATURE_ICON = "mdi-sitemap" - -UI_OBJECTS = [ - { - "objectKey": "ui.feature.graphicalEditor.editor", - "label": t("Editor", context="UI"), - "meta": {"area": "editor"} - }, - { - "objectKey": "ui.feature.graphicalEditor.templates", - "label": t("Vorlagen", context="UI"), - "meta": {"area": "templates"} - }, - { - "objectKey": "ui.feature.graphicalEditor.workflows-tasks", - "label": t("Tasks", context="UI"), - "meta": {"area": "tasks"} - }, -] RESOURCE_OBJECTS = [ { @@ -64,41 +45,6 @@ RESOURCE_OBJECTS = [ }, ] -TEMPLATE_ROLES = [ - { - "roleLabel": "graphicalEditor-viewer", - "description": "Grafischer Editor Betrachter - Workflows ansehen (nur lesen)", - "accessRules": [ - {"context": "UI", "item": "ui.feature.graphicalEditor.workflows", "view": True}, - {"context": "UI", "item": "ui.feature.graphicalEditor.workflows-tasks", "view": True}, - {"context": "UI", "item": "ui.feature.graphicalEditor.templates", "view": True}, - {"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"}, - ], - }, - { - "roleLabel": "graphicalEditor-user", - "description": "Grafischer Editor Benutzer - Flow-Builder nutzen", - "accessRules": [ - {"context": "UI", "item": "ui.feature.graphicalEditor.editor", "view": True}, - {"context": "UI", "item": "ui.feature.graphicalEditor.workflows", "view": True}, - {"context": "UI", "item": "ui.feature.graphicalEditor.workflows-tasks", "view": True}, - {"context": "UI", "item": "ui.feature.graphicalEditor.templates", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.graphicalEditor.dashboard", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.graphicalEditor.node-types", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.graphicalEditor.execute", "view": True}, - {"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"}, - ], - }, - { - "roleLabel": "graphicalEditor-admin", - "description": "Grafischer Editor Admin - Volle UI und API für die Instanz; Daten weiterhin benutzerspezifisch (MY)", - "accessRules": [ - {"context": "UI", "item": None, "view": True}, - {"context": "RESOURCE", "item": None, "view": True}, - {"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"}, - ], - }, -] def getRequiredServiceKeys() -> List[str]: @@ -186,28 +132,6 @@ class _GraphicalEditorServiceHub: generation = None -async def onStart(eventUser) -> None: - """Feature startup: start consolidated scheduler.""" - from modules.workflows.scheduler.mainScheduler import start as startScheduler - startScheduler(eventUser) - - -async def onStop(eventUser) -> None: - """Feature shutdown - stop scheduler and email poller.""" - from modules.workflows.scheduler.mainScheduler import stop as stopScheduler - stopScheduler() - from modules.features.graphicalEditor.emailPoller import stop as stopEmailPoller - stopEmailPoller(eventUser) - - -def getFeatureDefinition() -> Dict[str, Any]: - """Return the feature definition for registration.""" - return { - "code": FEATURE_CODE, - "label": FEATURE_LABEL, - "icon": FEATURE_ICON, - "autoCreateInstance": False, - } # --------------------------------------------------------------------------- @@ -473,125 +397,6 @@ def _buildSystemTemplates(): ] -def getUiObjects() -> List[Dict[str, Any]]: - """Return UI objects for RBAC catalog registration.""" - return UI_OBJECTS - - def getResourceObjects() -> List[Dict[str, Any]]: """Return resource objects for RBAC catalog registration.""" return RESOURCE_OBJECTS - - -def getTemplateRoles() -> List[Dict[str, Any]]: - """Return template roles for this feature.""" - return TEMPLATE_ROLES - - -def registerFeature(catalogService) -> bool: - """Register this feature's RBAC objects in the catalog.""" - try: - for uiObj in UI_OBJECTS: - catalogService.registerUiObject( - featureCode=FEATURE_CODE, - objectKey=uiObj["objectKey"], - label=uiObj["label"], - meta=uiObj.get("meta") - ) - for resObj in RESOURCE_OBJECTS: - catalogService.registerResourceObject( - featureCode=FEATURE_CODE, - objectKey=resObj["objectKey"], - label=resObj["label"], - meta=resObj.get("meta") - ) - _syncTemplateRolesToDb() - logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects") - return True - except Exception as e: - logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}") - return False - - -def _syncTemplateRolesToDb() -> int: - """Sync template roles and their AccessRules to database. - Also syncs rules to mandate-specific roles (same roleLabel) so new UI objects - become visible after gateway restart without manual role update. - """ - try: - from modules.interfaces.interfaceDbApp import getRootInterface - from modules.datamodels.datamodelRbac import Role - from modules.datamodels.datamodelUtils import coerce_text_multilingual - - rootInterface = getRootInterface() - existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE) - existingLabels = {r.roleLabel: str(r.id) for r in existingRoles if r.mandateId is None} - created = 0 - - for template in TEMPLATE_ROLES: - roleLabel = template["roleLabel"] - if roleLabel in existingLabels: - roleId = existingLabels[roleLabel] - else: - newRole = Role( - roleLabel=roleLabel, - description=coerce_text_multilingual(template.get("description", {})), - featureCode=FEATURE_CODE, - mandateId=None, - featureInstanceId=None, - isSystemRole=False - ) - rec = rootInterface.db.recordCreate(Role, newRole.model_dump()) - roleId = rec.get("id") - created += 1 - logger.info(f"Created template role '{roleLabel}' for {FEATURE_CODE}") - - _ensureAccessRulesForRole(rootInterface, roleId, template.get("accessRules", [])) - - for r in existingRoles: - if r.mandateId and r.roleLabel == roleLabel: - added = _ensureAccessRulesForRole( - rootInterface, str(r.id), template.get("accessRules", []) - ) - if added: - logger.debug(f"Added {added} access rules to mandate role {r.id}") - return created - except Exception as e: - logger.warning(f"Template role sync for {FEATURE_CODE}: {e}") - return 0 - - -def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int: - """Ensure AccessRules exist for a role based on templates.""" - from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext - - existingRules = rootInterface.getAccessRulesByRole(roleId) - existingSignatures = { - (r.context.value if r.context else None, r.item) - for r in existingRules - } - created = 0 - for t in ruleTemplates: - context = t.get("context", "UI") - item = t.get("item") - sig = (context, item) - if sig in existingSignatures: - continue - ctx_enum = ( - AccessRuleContext.UI if context == "UI" else - AccessRuleContext.DATA if context == "DATA" else - AccessRuleContext.RESOURCE if context == "RESOURCE" else context - ) - newRule = AccessRule( - roleId=roleId, - context=ctx_enum, - item=item, - view=t.get("view", False), - read=t.get("read"), - create=t.get("create"), - update=t.get("update"), - delete=t.get("delete"), - ) - rootInterface.db.recordCreate(AccessRule, newRule.model_dump()) - created += 1 - return created diff --git a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py index 20a2708b..38d9d769 100644 --- a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py +++ b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py @@ -1,7 +1,10 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ -GraphicalEditor routes - node-types, execute, workflows, runs, tasks, connections, browse. +DEPRECATED: These per-instance routes are superseded by /api/workflow-automation/ +(routeWorkflowAutomation.py). Kept for backward compatibility during migration. + +Original: GraphicalEditor routes - node-types, execute, workflows, runs, tasks, connections, browse. """ import asyncio @@ -644,7 +647,8 @@ def get_templates( from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoWorkflow - enrichRowsWithFkLabels(templates, AutoWorkflow, db=iface.db) + from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIface + enrichRowsWithFkLabels(templates, AutoWorkflow, db=_getRootIface().db) if mode == "filterValues": if not column: diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index 5ac5d089..9a6e2e26 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -1610,7 +1610,7 @@ def _createStoreResourceRules(db: DatabaseConnector) -> None: "resource.store.workspace", "resource.store.commcoach", "resource.store.trustee", - "resource.store.graphicalEditor", + "resource.store.graphicalEditor", # DEPRECATED: will move with WorkflowAutomation code restructuring ] storeRules = [] diff --git a/modules/routes/routeAutomationWorkspace.py b/modules/routes/routeAutomationWorkspace.py index 09c5238c..a93fff70 100644 --- a/modules/routes/routeAutomationWorkspace.py +++ b/modules/routes/routeAutomationWorkspace.py @@ -21,12 +21,12 @@ from slowapi.util import get_remote_address from modules.auth.authentication import getRequestContext, RequestContext from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG -from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import ( +from modules.datamodels.datamodelWorkflowAutomation import ( AutoRun, AutoStepLog, AutoWorkflow, + GRAPHICAL_EDITOR_DATABASE, ) -from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import graphicalEditorDatabase from modules.workflows.automation2.workflowArtifactVisibility import suppress_workflow_file_in_workspace_ui from modules.shared.i18nRegistry import apiRouteContext @@ -40,7 +40,7 @@ router = APIRouter(prefix="/api/automations/runs", tags=["AutomationWorkspace"]) def _getDb() -> DatabaseConnector: return DatabaseConnector( dbHost=APP_CONFIG.get("DB_HOST", "localhost"), - dbDatabase=graphicalEditorDatabase, + dbDatabase=GRAPHICAL_EDITOR_DATABASE, dbUser=APP_CONFIG.get("DB_USER"), dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"), dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), diff --git a/modules/routes/routeSystem.py b/modules/routes/routeSystem.py index 56568cd9..217dfa14 100644 --- a/modules/routes/routeSystem.py +++ b/modules/routes/routeSystem.py @@ -105,9 +105,6 @@ def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]: elif featureCode == "realestate": from modules.features.realEstate.mainRealEstate import UI_OBJECTS return UI_OBJECTS - elif featureCode == "graphicalEditor": - from modules.features.graphicalEditor.mainGraphicalEditor import UI_OBJECTS - return UI_OBJECTS elif featureCode == "teamsbot": from modules.features.teamsbot.mainTeamsbot import UI_OBJECTS return UI_OBJECTS @@ -841,7 +838,7 @@ def _buildIntegrationsOverviewPayload(userId: str, user=None) -> Dict[str, Any]: from modules.shared.configuration import APP_CONFIG from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.datamodels.datamodelPagination import PaginationParams - from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import ( + from modules.datamodels.datamodelWorkflowAutomation import ( AutoWorkflow, AutoRun, ) diff --git a/modules/routes/routeWorkflowAutomation.py b/modules/routes/routeWorkflowAutomation.py new file mode 100644 index 00000000..6ce6fb21 --- /dev/null +++ b/modules/routes/routeWorkflowAutomation.py @@ -0,0 +1,453 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Mandatsweite WorkflowAutomation API. + +System-level API for workflows, runs, tasks — scoped by mandate membership, +not by graphicalEditor FeatureInstance. Parallel to the legacy per-instance +API in routeFeatureGraphicalEditor.py during the migration period. + +RBAC model: + - Read: mandate membership (user sees workflows in own mandates) + - Write/Execute: mandate admin or isPlatformAdmin + - isPlatformAdmin bypasses all checks +""" + +import json +import logging +import time +from typing import Optional, List, Dict, Any + +from fastapi import APIRouter, Depends, HTTPException, Query +from slowapi import Limiter +from slowapi.util import get_remote_address + +from modules.auth.authentication import getRequestContext, RequestContext +from modules.connectors.connectorDbPostgre import DatabaseConnector +from modules.datamodels.datamodelWorkflowAutomation import ( + AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask, + GRAPHICAL_EDITOR_DATABASE, +) +from modules.datamodels.datamodelPagination import PaginationParams, normalize_pagination_dict +from modules.interfaces.interfaceDbApp import getRootInterface +from modules.shared.configuration import APP_CONFIG +from modules.shared.i18nRegistry import apiRouteContext + +routeApiMsg = apiRouteContext("routeWorkflowAutomation") + +logger = logging.getLogger(__name__) +limiter = Limiter(key_func=get_remote_address) + +router = APIRouter(prefix="/api/workflow-automation", tags=["WorkflowAutomation"]) + + +# --------------------------------------------------------------------------- +# DB + RBAC helpers +# --------------------------------------------------------------------------- + +def _getDb() -> DatabaseConnector: + return DatabaseConnector( + dbHost=APP_CONFIG.get("DB_HOST", "localhost"), + dbDatabase=GRAPHICAL_EDITOR_DATABASE, + dbUser=APP_CONFIG.get("DB_USER"), + dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"), + dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), + userId=None, + ) + + +def _getUserMandateIds(userId: str) -> List[str]: + rootIface = getRootInterface() + memberships = rootIface.getUserMandates(userId) + return [um.mandateId for um in memberships if um.mandateId and um.enabled] + + +def _getAdminMandateIds(userId: str, mandateIds: List[str]) -> List[str]: + if not mandateIds: + return [] + rootIface = getRootInterface() + from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole + + memberships = rootIface.db.getRecordset( + UserMandate, + recordFilter={"userId": userId, "mandateId": mandateIds, "enabled": True}, + ) + if not memberships: + return [] + + umIdToMandateId: Dict[str, str] = {} + for m in memberships: + row = m if isinstance(m, dict) else m.__dict__ + um_id = row.get("id") + mid = row.get("mandateId") + if um_id and mid: + umIdToMandateId[str(um_id)] = str(mid) + + userMandateIds = list(umIdToMandateId.keys()) + allRoles = rootIface.db.getRecordset( + UserMandateRole, + recordFilter={"userMandateId": userMandateIds}, + ) + if not allRoles: + return [] + + roleIds: set = set() + roleToMandate: Dict[str, set] = {} + for r in allRoles: + row = r if isinstance(r, dict) else r.__dict__ + rid = row.get("roleId") + um_id = row.get("userMandateId") + mid = umIdToMandateId.get(str(um_id)) if um_id else None + if rid and mid: + roleIds.add(rid) + roleToMandate.setdefault(rid, set()).add(mid) + + if not roleIds: + return [] + + from modules.datamodels.datamodelRbac import Role + roleRecords = rootIface.db.getRecordset(Role, recordFilter={"id": list(roleIds)}) + adminMandates: set = set() + for role in (roleRecords or []): + row = role if isinstance(role, dict) else role.__dict__ + rid = row.get("id") + if not rid or rid not in roleToMandate: + continue + if row.get("roleLabel") == "admin" and not row.get("featureInstanceId"): + adminMandates.update(roleToMandate[rid]) + + return [mid for mid in mandateIds if mid in adminMandates] + + +def _validateWorkflowAccess( + context: RequestContext, + workflow: Optional[Dict[str, Any]], + action: str = "read", +) -> None: + """Validate access to a workflow based on mandate membership + admin status. + + Actions: 'read' (mandate member), 'write'/'execute'/'delete' (mandate admin or platform admin). + Raises HTTPException(403) on denial. + """ + if context.isPlatformAdmin: + return + + userId = str(context.user.id) if context.user else None + if not userId: + raise HTTPException(status_code=403, detail="Authentication required") + + if workflow is None: + raise HTTPException(status_code=404, detail="Workflow not found") + + wfMandateId = workflow.get("mandateId") or "" + if not wfMandateId: + if action == "read": + return + raise HTTPException(status_code=403, detail="Workflow has no mandate — admin only") + + userMandateIds = _getUserMandateIds(userId) + if wfMandateId not in userMandateIds: + raise HTTPException(status_code=403, detail="Not a member of the workflow's mandate") + + if action == "read": + return + + adminMandateIds = _getAdminMandateIds(userId, [wfMandateId]) + if wfMandateId not in adminMandateIds: + raise HTTPException( + status_code=403, + detail=f"Mandate admin required for '{action}' on workflows", + ) + + +def _scopedWorkflowFilter(context: RequestContext) -> Optional[Dict[str, Any]]: + """Build DB filter for listing workflows: mandate-scoped for members, None for sysadmin.""" + if context.isPlatformAdmin: + return None + + userId = str(context.user.id) if context.user else None + if not userId: + return {"mandateId": "__impossible__"} + + mandateIds = _getUserMandateIds(userId) + if mandateIds: + return {"mandateId": mandateIds} + return {"mandateId": "__impossible__"} + + +def _scopedRunFilter(context: RequestContext) -> Optional[Dict[str, Any]]: + """Build DB filter for listing runs: admin sees mandate runs, user sees own.""" + if context.isPlatformAdmin: + return None + + userId = str(context.user.id) if context.user else None + if not userId: + return {"ownerId": "__impossible__"} + + mandateIds = _getUserMandateIds(userId) + adminMandateIds = _getAdminMandateIds(userId, mandateIds) + + if adminMandateIds: + return {"mandateId": adminMandateIds} + return {"ownerId": userId} + + +def _parsePagination(pagination: Optional[str]) -> Optional[PaginationParams]: + if not pagination: + return None + try: + d = json.loads(pagination) + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid pagination JSON") + if not d: + return None + return normalize_pagination_dict(d) + + +# --------------------------------------------------------------------------- +# Workflow CRUD +# --------------------------------------------------------------------------- + +@router.get("/workflows") +async def _listWorkflows( + request: RequestContext = Depends(getRequestContext), + pagination: Optional[str] = Query(default=None), + mandateId: Optional[str] = Query(default=None), +): + db = _getDb() + try: + db._ensureTableExists(AutoWorkflow) + scopeFilter = _scopedWorkflowFilter(request) + if mandateId and scopeFilter is not None: + if mandateId not in (scopeFilter.get("mandateId") or []): + return {"items": [], "total": 0} + scopeFilter = {"mandateId": mandateId} + elif mandateId and scopeFilter is None: + scopeFilter = {"mandateId": mandateId} + + params = _parsePagination(pagination) + records = db.getRecordset(AutoWorkflow, recordFilter=scopeFilter, pagination=params) + total = db.getRecordCount(AutoWorkflow, recordFilter=scopeFilter) if params else len(records or []) + return {"items": records or [], "total": total} + finally: + db.close() + + +@router.get("/workflows/{workflowId}") +async def _getWorkflow( + workflowId: str, + request: RequestContext = Depends(getRequestContext), +): + db = _getDb() + try: + db._ensureTableExists(AutoWorkflow) + wf = db.getRecord(AutoWorkflow, workflowId) + if not wf: + raise HTTPException(status_code=404, detail="Workflow not found") + _validateWorkflowAccess(request, wf, "read") + return wf + finally: + db.close() + + +@router.post("/workflows") +async def _createWorkflow( + request: RequestContext = Depends(getRequestContext), + body: Dict[str, Any] = {}, +): + mandateId = body.get("mandateId") + if not mandateId: + raise HTTPException(status_code=400, detail="mandateId required") + + _validateWorkflowAccess(request, {"mandateId": mandateId}, "write") + + db = _getDb() + try: + db._ensureTableExists(AutoWorkflow) + import uuid + data = {**body, "id": str(uuid.uuid4())} + if request.user: + data.setdefault("runAsPrincipal", str(request.user.id)) + rec = db.recordCreate(AutoWorkflow, data) + return rec + finally: + db.close() + + +@router.put("/workflows/{workflowId}") +async def _updateWorkflow( + workflowId: str, + request: RequestContext = Depends(getRequestContext), + body: Dict[str, Any] = {}, +): + db = _getDb() + try: + db._ensureTableExists(AutoWorkflow) + wf = db.getRecord(AutoWorkflow, workflowId) + _validateWorkflowAccess(request, wf, "write") + updated = db.recordModify(AutoWorkflow, workflowId, body) + return updated + finally: + db.close() + + +@router.delete("/workflows/{workflowId}") +async def _deleteWorkflow( + workflowId: str, + request: RequestContext = Depends(getRequestContext), +): + db = _getDb() + try: + db._ensureTableExists(AutoWorkflow) + wf = db.getRecord(AutoWorkflow, workflowId) + _validateWorkflowAccess(request, wf, "delete") + + for v in db.getRecordset(AutoVersion, recordFilter={"workflowId": workflowId}) or []: + db.recordDelete(AutoVersion, v.get("id")) + for run in db.getRecordset(AutoRun, recordFilter={"workflowId": workflowId}) or []: + runId = run.get("id") + for sl in db.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []: + db.recordDelete(AutoStepLog, sl.get("id")) + db.recordDelete(AutoRun, runId) + for task in db.getRecordset(AutoTask, recordFilter={"workflowId": workflowId}) or []: + db.recordDelete(AutoTask, task.get("id")) + db.recordDelete(AutoWorkflow, workflowId) + return {"deleted": True, "workflowId": workflowId} + finally: + db.close() + + +# --------------------------------------------------------------------------- +# Runs +# --------------------------------------------------------------------------- + +@router.get("/runs") +async def _listRuns( + request: RequestContext = Depends(getRequestContext), + pagination: Optional[str] = Query(default=None), + mandateId: Optional[str] = Query(default=None), + workflowId: Optional[str] = Query(default=None), +): + db = _getDb() + try: + db._ensureTableExists(AutoRun) + scopeFilter = _scopedRunFilter(request) + if mandateId: + if scopeFilter is None: + scopeFilter = {"mandateId": mandateId} + elif "mandateId" in scopeFilter: + if mandateId not in scopeFilter["mandateId"]: + return {"items": [], "total": 0} + scopeFilter = {"mandateId": mandateId} + if workflowId: + scopeFilter = {**(scopeFilter or {}), "workflowId": workflowId} + + params = _parsePagination(pagination) + records = db.getRecordset(AutoRun, recordFilter=scopeFilter, pagination=params) + total = db.getRecordCount(AutoRun, recordFilter=scopeFilter) if params else len(records or []) + return {"items": records or [], "total": total} + finally: + db.close() + + +@router.get("/runs/{runId}") +async def _getRun( + runId: str, + request: RequestContext = Depends(getRequestContext), +): + db = _getDb() + try: + db._ensureTableExists(AutoRun) + run = db.getRecord(AutoRun, runId) + if not run: + raise HTTPException(status_code=404, detail="Run not found") + + wfId = run.get("workflowId") + if wfId: + wf = db.getRecord(AutoWorkflow, wfId) + _validateWorkflowAccess(request, wf, "read") + return run + finally: + db.close() + + +# --------------------------------------------------------------------------- +# Tasks +# --------------------------------------------------------------------------- + +@router.get("/tasks") +async def _listTasks( + request: RequestContext = Depends(getRequestContext), + pagination: Optional[str] = Query(default=None), + status: Optional[str] = Query(default=None), +): + db = _getDb() + try: + db._ensureTableExists(AutoTask) + scopeFilter: Optional[Dict[str, Any]] = None + + if not request.isPlatformAdmin: + userId = str(request.user.id) if request.user else None + if not userId: + return {"items": [], "total": 0} + scopeFilter = {"assigneeId": userId} + + if status: + scopeFilter = {**(scopeFilter or {}), "status": status} + + params = _parsePagination(pagination) + records = db.getRecordset(AutoTask, recordFilter=scopeFilter, pagination=params) + total = db.getRecordCount(AutoTask, recordFilter=scopeFilter) if params else len(records or []) + return {"items": records or [], "total": total} + finally: + db.close() + + +# --------------------------------------------------------------------------- +# Versions +# --------------------------------------------------------------------------- + +@router.get("/workflows/{workflowId}/versions") +async def _listVersions( + workflowId: str, + request: RequestContext = Depends(getRequestContext), +): + db = _getDb() + try: + db._ensureTableExists(AutoWorkflow) + wf = db.getRecord(AutoWorkflow, workflowId) + _validateWorkflowAccess(request, wf, "read") + + db._ensureTableExists(AutoVersion) + versions = db.getRecordset(AutoVersion, recordFilter={"workflowId": workflowId}) + return {"items": versions or []} + finally: + db.close() + + +# --------------------------------------------------------------------------- +# Step logs +# --------------------------------------------------------------------------- + +@router.get("/runs/{runId}/steps") +async def _listStepLogs( + runId: str, + request: RequestContext = Depends(getRequestContext), +): + db = _getDb() + try: + db._ensureTableExists(AutoRun) + run = db.getRecord(AutoRun, runId) + if not run: + raise HTTPException(status_code=404, detail="Run not found") + + wfId = run.get("workflowId") + if wfId: + wf = db.getRecord(AutoWorkflow, wfId) + _validateWorkflowAccess(request, wf, "read") + + db._ensureTableExists(AutoStepLog) + steps = db.getRecordset(AutoStepLog, recordFilter={"runId": runId}) + return {"items": steps or []} + finally: + db.close() diff --git a/modules/routes/routeWorkflowDashboard.py b/modules/routes/routeWorkflowDashboard.py index f29fc557..020e5ec7 100644 --- a/modules/routes/routeWorkflowDashboard.py +++ b/modules/routes/routeWorkflowDashboard.py @@ -27,10 +27,10 @@ from modules.interfaces.interfaceDbApp import getRootInterface from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG from modules.datamodels.datamodelPagination import PaginationParams, normalize_pagination_dict -from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import ( +from modules.datamodels.datamodelWorkflowAutomation import ( AutoRun, AutoStepLog, AutoWorkflow, AutoTask, AutoVersion, + GRAPHICAL_EDITOR_DATABASE, ) -from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import graphicalEditorDatabase from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeWorkflowDashboard") @@ -44,7 +44,7 @@ router = APIRouter(prefix="/api/system/workflow-runs", tags=["WorkflowDashboard" def _getDb() -> DatabaseConnector: return DatabaseConnector( dbHost=APP_CONFIG.get("DB_HOST", "localhost"), - dbDatabase=graphicalEditorDatabase, + dbDatabase=GRAPHICAL_EDITOR_DATABASE, dbUser=APP_CONFIG.get("DB_USER"), dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"), dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), @@ -619,7 +619,8 @@ def get_workflow_runs( for wf in (wfs or []): wfMap[wf.get("id")] = wf - from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels, resolveMandateLabels, resolveInstanceLabels + from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels, resolveMandateLabels, resolveInstanceLabels, resolveUserLabels + from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIface runs = [] for r in pageRuns: @@ -635,17 +636,20 @@ def get_workflow_runs( row["featureInstanceId"] = fiid runs.append(row) + appDb = _getRootIface().db enrichRowsWithFkLabels( runs, db=db, labelResolvers={ - "mandateId": partial(resolveMandateLabels, db), - "featureInstanceId": partial(resolveInstanceLabels, db), + "mandateId": partial(resolveMandateLabels, appDb), + "featureInstanceId": partial(resolveInstanceLabels, appDb), + "ownerId": partial(resolveUserLabels, appDb), }, ) for row in runs: row["instanceLabel"] = row.pop("featureInstanceIdLabel", None) row["mandateLabel"] = row.pop("mandateIdLabel", None) + row["ownerLabel"] = row.pop("ownerIdLabel", None) return {"runs": runs, "total": total, "limit": limit, "offset": offset} @@ -808,6 +812,9 @@ def get_system_workflows( userMandateIds = _getUserMandateIds(userId) adminMandateIds = _getAdminMandateIds(userId, userMandateIds) + from modules.dbHelpers.fkLabelResolver import resolveUserLabels as _resolveUserLabels + from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIface + fkSortField = _firstFkSortFieldForWorkflows(paginationParams) if fkSortField: from modules.dbHelpers.paginationHelpers import getRecordsetPaginatedWithFkSort, applyFiltersAndSort @@ -869,17 +876,20 @@ def get_system_workflows( row["canExecute"] = False row.pop("graph", None) items.append(row) + _appDb = _getRootIface().db enrichRowsWithFkLabels( items, db=db, labelResolvers={ - "mandateId": partial(resolveMandateLabels, db), + "mandateId": partial(resolveMandateLabels, _appDb), "featureInstanceId": _resolveInstanceLabelsWithFeatureCode, + "ownerId": partial(_resolveUserLabels, _appDb), }, ) for row in items: row["instanceLabel"] = row.pop("featureInstanceIdLabel", None) row["mandateLabel"] = row.pop("mandateIdLabel", None) + row["ownerLabel"] = row.pop("ownerIdLabel", None) row["featureCode"] = featureCodeMap.get(row.get("featureInstanceId")) if hasComputedFilter or hasComputedSort: computedFilters = { @@ -932,17 +942,20 @@ def get_system_workflows( row["canExecute"] = False row.pop("graph", None) items.append(row) + _appDb2 = _getRootIface().db enrichRowsWithFkLabels( items, db=db, labelResolvers={ - "mandateId": partial(resolveMandateLabels, db), + "mandateId": partial(resolveMandateLabels, _appDb2), "featureInstanceId": _resolveInstanceLabelsWithFeatureCode, + "ownerId": partial(_resolveUserLabels, _appDb2), }, ) for row in items: row["instanceLabel"] = row.pop("featureInstanceIdLabel", None) row["mandateLabel"] = row.pop("mandateIdLabel", None) + row["ownerLabel"] = row.pop("ownerIdLabel", None) row["featureCode"] = featureCodeMap.get(row.get("featureInstanceId")) return { diff --git a/modules/serviceCenter/services/serviceAgent/workflowTools.py b/modules/serviceCenter/services/serviceAgent/workflowTools.py index 82eda22d..32defa2b 100644 --- a/modules/serviceCenter/services/serviceAgent/workflowTools.py +++ b/modules/serviceCenter/services/serviceAgent/workflowTools.py @@ -89,6 +89,7 @@ def _resolveMandateId(context: Any) -> str: def _getInterface(context: Any, instanceId: str): + # DEPRECATED: will move with WorkflowAutomation code restructuring from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface return getGraphicalEditorInterface(_resolveUser(context), _resolveMandateId(context), instanceId) @@ -306,6 +307,7 @@ async def _list_upstream_paths(params: Dict[str, Any], context: Any) -> ToolResu return _err(name, f"Workflow {workflow_id} not found") graph = wf.get("graph", {}) or {} + # DEPRECATED: will move with WorkflowAutomation code restructuring from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths paths = compute_upstream_paths(graph if isinstance(graph, dict) else {}, str(node_id)) @@ -436,6 +438,7 @@ async def _listAvailableNodeTypes(params: Dict[str, Any], context: Any) -> ToolR """ name = "listAvailableNodeTypes" try: + # DEPRECATED: will move with WorkflowAutomation code restructuring from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES nodeTypes = [] for n in STATIC_NODE_TYPES: @@ -462,6 +465,7 @@ async def _describeNodeType(params: Dict[str, Any], context: Any) -> ToolResult: nodeType = params.get("nodeType") or params.get("id") if not nodeType: return _err(name, "nodeType required") + # DEPRECATED: will move with WorkflowAutomation code restructuring from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES target: Dict[str, Any] = {} for n in STATIC_NODE_TYPES: @@ -875,6 +879,7 @@ async def _exportWorkflowToFile(params: Dict[str, Any], context: Any) -> ToolRes envelope = iface.exportWorkflowToDict(workflowId) if envelope is None: return _err(name, f"Workflow {workflowId} not found") + # DEPRECATED: will move with WorkflowAutomation code restructuring from modules.features.graphicalEditor._workflowFileSchema import buildFileName return _ok(name, { "fileName": buildFileName(envelope.get("label", "workflow")), diff --git a/modules/shared/documentUtils.py b/modules/shared/documentUtils.py index cc08835c..37eec8a5 100644 --- a/modules/shared/documentUtils.py +++ b/modules/shared/documentUtils.py @@ -5,7 +5,10 @@ Document utility functions (Layer L0 - shared). Pure text-processing helpers with zero internal dependencies. """ +import base64 +import binascii import re +from typing import Any, Optional def parseInlineRuns(text: str) -> list: @@ -62,3 +65,49 @@ def parseInlineRuns(text: str) -> list: runs.append({"type": "text", "value": text[lastEnd:]}) return runs if runs else [{"type": "text", "value": text}] + + +def _looksLikeAsciiBase64Payload(s: str) -> bool: + """Heuristic: ActionDocument binary payloads use standard ASCII base64; markdown/text uses other chars.""" + t = "".join(s.split()) + if len(t) < 8: + return False + if not t.isascii(): + return False + return bool(re.fullmatch(r"[A-Za-z0-9+/]+=*", t)) and len(t) % 4 == 0 + + +def coerceDocumentDataToBytes(raw: Any) -> Optional[bytes]: + """Normalize documentData for DB file persistence. + + ActionDocument conventions (see methodFile.create): binary bodies are carried as ASCII + base64 strings; plain markdown/text stays as Unicode. Do not UTF-8-encode a base64 + literal — that persists the ASCII of the encoding (file looks like base64 gibberish). + """ + if raw is None: + return None + if isinstance(raw, bytes): + return raw if len(raw) > 0 else None + if isinstance(raw, bytearray): + b = bytes(raw) + return b if len(b) > 0 else None + if isinstance(raw, memoryview): + b = raw.tobytes() + return b if len(b) > 0 else None + if isinstance(raw, str): + stripped = raw.strip() + if not stripped: + return None + if _looksLikeAsciiBase64Payload(stripped): + try: + decoded = base64.b64decode(stripped, validate=True) + except (TypeError, binascii.Error, ValueError): + try: + decoded = base64.b64decode(stripped) + except (binascii.Error, ValueError): + decoded = b"" + if decoded: + return decoded + b = stripped.encode("utf-8") + return b if len(b) > 0 else None + return None diff --git a/modules/system/i18nBootSync.py b/modules/system/i18nBootSync.py index 96f1b69d..15501a0f 100644 --- a/modules/system/i18nBootSync.py +++ b/modules/system/i18nBootSync.py @@ -242,6 +242,7 @@ def _registerNodeLabels(): added += 1 try: + # DEPRECATED: will move with WorkflowAutomation code restructuring from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES for nd in STATIC_NODE_TYPES: _reg(_extractRegistrySourceText(nd.get("label")), "node.label") @@ -265,6 +266,7 @@ def _registerNodeLabels(): pass try: + # DEPRECATED: will move with WorkflowAutomation code restructuring from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG for schema in PORT_TYPE_CATALOG.values(): for field in getattr(schema, "fields", []) or []: diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index d5fdbd0e..b6313342 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -429,6 +429,52 @@ async def _executeWithRetry(executor, node, context, maxRetries: int = 0, retryD raise lastError +def _validateFeatureInstanceMandates(graph: Dict[str, Any], mandateId: str) -> None: + """Verify that all FeatureInstanceRef IDs in the graph belong to the workflow's mandate. + + Logs a warning for each mismatch but does NOT abort execution — the node + executor will fail on its own with a more specific error if the instance is + truly inaccessible. This is a defence-in-depth guard (A0.2). + """ + nodes = graph.get("nodes") if isinstance(graph, dict) else None + if not isinstance(nodes, list): + return + instanceIds: set = set() + for node in nodes: + if not isinstance(node, dict): + continue + params = node.get("parameters") or {} + ref = params.get("featureInstanceId") + if isinstance(ref, dict) and ref.get("$type") == "FeatureInstanceRef": + iid = ref.get("id") + if iid: + instanceIds.add(iid) + elif isinstance(ref, str) and ref.strip(): + instanceIds.add(ref.strip()) + if not instanceIds: + return + try: + from modules.interfaces.interfaceDbApp import getRootInterface + root = getRootInterface() + from modules.datamodels.datamodelFeatures import FeatureInstance + for iid in instanceIds: + fi = root.db.getRecord(FeatureInstance, iid) + if not fi: + logger.warning( + "MandateValidation: FeatureInstance %s referenced in graph not found", iid, + ) + continue + fiMandateId = fi.get("mandateId") if isinstance(fi, dict) else getattr(fi, "mandateId", None) + if fiMandateId and fiMandateId != mandateId: + logger.warning( + "MandateValidation: FeatureInstance %s belongs to mandate %s, " + "but workflow mandate is %s — cross-mandate access", + iid, fiMandateId, mandateId, + ) + except Exception as e: + logger.debug("MandateValidation: could not verify instances: %s", e) + + def _substituteFeatureInstancePlaceholders( graph: Dict[str, Any], targetFeatureInstanceId: str, @@ -675,6 +721,10 @@ async def executeGraph( # Phase-5 Schicht-4: typed-ref envelopes are materialized FIRST so the # subsequent connection-ref pass and validation see the canonical shape. graph = materializeFeatureInstanceRefs(graph) + + if mandateId: + _validateFeatureInstanceMandates(graph, mandateId) + graph = materializeConnectionRefs(graph) graph = materializePrimaryTextHandover(graph) graph = materializeRecommendedDataPickRef(graph) diff --git a/modules/workflows/automation2/executors/actionNodeExecutor.py b/modules/workflows/automation2/executors/actionNodeExecutor.py index 9af626d4..ee1101e5 100644 --- a/modules/workflows/automation2/executors/actionNodeExecutor.py +++ b/modules/workflows/automation2/executors/actionNodeExecutor.py @@ -7,8 +7,6 @@ # ``documentListWire`` is applied at runtime in this executor via graphUtils.extract_wired_document_list. -import base64 -import binascii import json import logging import re @@ -122,50 +120,7 @@ def _log_file_create_context_resolution( ) -def _looks_like_ascii_base64_payload(s: str) -> bool: - """Heuristic: ActionDocument binary payloads use standard ASCII base64; markdown/text uses other chars (#, *, -, …).""" - t = "".join(s.split()) - if len(t) < 8: - return False - if not t.isascii(): - return False - return bool(re.fullmatch(r"[A-Za-z0-9+/]+=*", t)) and len(t) % 4 == 0 - - -def coerceDocumentDataToBytes(raw: Any) -> Optional[bytes]: - """Normalize documentData for DB file persistence. - - ActionDocument conventions (see methodFile.create): binary bodies are carried as ASCII - base64 strings; plain markdown/text stays as Unicode. Do not UTF-8-encode a base64 - literal — that persists the ASCII of the encoding (file looks like base64 gibberish). - """ - if raw is None: - return None - if isinstance(raw, bytes): - return raw if len(raw) > 0 else None - if isinstance(raw, bytearray): - b = bytes(raw) - return b if len(b) > 0 else None - if isinstance(raw, memoryview): - b = raw.tobytes() - return b if len(b) > 0 else None - if isinstance(raw, str): - stripped = raw.strip() - if not stripped: - return None - if _looks_like_ascii_base64_payload(stripped): - try: - decoded = base64.b64decode(stripped, validate=True) - except (TypeError, binascii.Error, ValueError): - try: - decoded = base64.b64decode(stripped) - except (binascii.Error, ValueError): - decoded = b"" - if decoded: - return decoded - b = stripped.encode("utf-8") - return b if len(b) > 0 else None - return None +from modules.shared.documentUtils import coerceDocumentDataToBytes # noqa: F401 — re-export shim def _image_documents_from_docs_list(docs_list: list) -> list: diff --git a/modules/workflows/automation2/executors/inputExecutor.py b/modules/workflows/automation2/executors/inputExecutor.py index 4ccef725..aaf31ff1 100644 --- a/modules/workflows/automation2/executors/inputExecutor.py +++ b/modules/workflows/automation2/executors/inputExecutor.py @@ -4,29 +4,11 @@ import logging from typing import Dict, Any +from modules.datamodels.serviceExceptions import PauseForHumanTaskError, PauseForEmailWaitError # noqa: F401 — re-export shim + logger = logging.getLogger(__name__) -class PauseForHumanTaskError(Exception): - """Raised when execution must pause for a human task. Contains runId, taskId.""" - - def __init__(self, runId: str, taskId: str, nodeId: str): - self.runId = runId - self.taskId = taskId - self.nodeId = nodeId - super().__init__(f"Pause for human task {taskId} (run {runId}, node {nodeId})") - - -class PauseForEmailWaitError(Exception): - """Raised when execution must pause waiting for a new email. Background poller will resume.""" - - def __init__(self, runId: str, nodeId: str, waitConfig: Dict[str, Any]): - self.runId = runId - self.nodeId = nodeId - self.waitConfig = waitConfig - super().__init__(f"Pause for email wait (run {runId}, node {nodeId})") - - class InputExecutor: """ Execute input/human nodes. Creates a HumanTask, pauses the run, and raises diff --git a/modules/workflows/methods/methodContext/actions/setContext.py b/modules/workflows/methods/methodContext/actions/setContext.py index 10f292b7..24e10fc8 100644 --- a/modules/workflows/methods/methodContext/actions/setContext.py +++ b/modules/workflows/methods/methodContext/actions/setContext.py @@ -22,7 +22,7 @@ import logging from typing import Any, Dict, List, Optional, Tuple from modules.datamodels.datamodelChat import ActionResult -from modules.workflows.automation2.executors.inputExecutor import PauseForHumanTaskError +from modules.datamodels.serviceExceptions import PauseForHumanTaskError logger = logging.getLogger(__name__) diff --git a/modules/workflows/methods/methodFile/actions/create.py b/modules/workflows/methods/methodFile/actions/create.py index cc5550ca..bb778c8f 100644 --- a/modules/workflows/methods/methodFile/actions/create.py +++ b/modules/workflows/methods/methodFile/actions/create.py @@ -13,7 +13,7 @@ import re from modules.datamodels.datamodelChat import ActionResult, ActionDocument from modules.shared.i18nRegistry import normalizePrimaryLanguageTag -from modules.workflows.automation2.executors.actionNodeExecutor import coerceDocumentDataToBytes +from modules.shared.documentUtils import coerceDocumentDataToBytes from modules.workflows.methods.methodAi._common import is_image_action_document_list from modules.workflows.methods.methodContext.actions.extractContent import ( presentation_envelopes_to_document_json, diff --git a/modules/workflows/scheduler/mainScheduler.py b/modules/workflows/scheduler/mainScheduler.py index 9af9889f..11544015 100644 --- a/modules/workflows/scheduler/mainScheduler.py +++ b/modules/workflows/scheduler/mainScheduler.py @@ -93,9 +93,9 @@ class WorkflowScheduler: activeWorkflowIds.add(workflowId) cron = item.get("cron") mandateId = item.get("mandateId") - instanceId = item.get("featureInstanceId") + instanceId = item.get("featureInstanceId") or "" - if not instanceId or not cron: + if not cron: continue jobId = f"{JOB_ID_PREFIX}{workflowId}"