From 39aba4cca814520a1aaa554e52cd4a5463c3feea Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 7 Jun 2026 22:26:18 +0200
Subject: [PATCH] before refactory workflowAutomation
---
app.py | 25 +-
modules/datamodels/datamodelNavigation.py | 48 ++
.../datamodels/datamodelWorkflowAutomation.py | 18 +-
modules/datamodels/serviceExceptions.py | 25 +
modules/demoConfigs/investorDemo2026.py | 4 +-
modules/demoConfigs/pwgDemo2026.py | 2 +-
.../graphicalEditor/mainGraphicalEditor.py | 195 --------
.../routeFeatureGraphicalEditor.py | 8 +-
modules/interfaces/interfaceBootstrap.py | 2 +-
modules/routes/routeAutomationWorkspace.py | 6 +-
modules/routes/routeSystem.py | 5 +-
modules/routes/routeWorkflowAutomation.py | 453 ++++++++++++++++++
modules/routes/routeWorkflowDashboard.py | 29 +-
.../services/serviceAgent/workflowTools.py | 5 +
modules/shared/documentUtils.py | 49 ++
modules/system/i18nBootSync.py | 2 +
.../workflows/automation2/executionEngine.py | 50 ++
.../executors/actionNodeExecutor.py | 47 +-
.../automation2/executors/inputExecutor.py | 22 +-
.../methodContext/actions/setContext.py | 2 +-
.../methods/methodFile/actions/create.py | 2 +-
modules/workflows/scheduler/mainScheduler.py | 4 +-
22 files changed, 713 insertions(+), 290 deletions(-)
create mode 100644 modules/routes/routeWorkflowAutomation.py
diff --git a/app.py b/app.py
index c91212e3..d8104fad 100644
--- a/app.py
+++ b/app.py
@@ -481,7 +481,15 @@ async def lifespan(app: FastAPI):
except RuntimeError:
pass
eventManager.start()
-
+
+ # --- WorkflowAutomation: Scheduler boot (System-Lifespan, not Feature-onStart) ---
+ try:
+ from modules.workflows.scheduler.mainScheduler import start as _startWorkflowScheduler
+ _startWorkflowScheduler(eventUser)
+ logger.info("WorkflowAutomation scheduler started (system lifespan)")
+ except Exception as e:
+ logger.error(f"WorkflowAutomation scheduler failed to start: {e}")
+
# Register audit log cleanup scheduler
from modules.dbHelpers.auditLogger import registerAuditLogCleanupScheduler
registerAuditLogCleanupScheduler()
@@ -562,6 +570,18 @@ async def lifespan(app: FastAPI):
# 3. Stop scheduler (removes all pending cron/interval jobs)
eventManager.stop()
+ # 3.5 Stop WorkflowAutomation scheduler + email poller (System-Lifespan)
+ try:
+ from modules.workflows.scheduler.mainScheduler import stop as _stopWorkflowScheduler
+ _stopWorkflowScheduler()
+ except Exception as e:
+ logger.warning(f"WorkflowAutomation scheduler stop failed: {e}")
+ try:
+ from modules.features.graphicalEditor.emailPoller import stop as _stopEmailPoller
+ _stopEmailPoller(eventUser)
+ except Exception as e:
+ logger.warning(f"Email poller stop failed: {e}")
+
# 4. Stop Feature Containers (Plug&Play)
try:
mainModules = loadFeatureMainModules()
@@ -849,6 +869,9 @@ app.include_router(workflowDashboardRouter)
from modules.routes.routeAutomationWorkspace import router as automationWorkspaceRouter
app.include_router(automationWorkspaceRouter)
+from modules.routes.routeWorkflowAutomation import router as workflowAutomationRouter
+app.include_router(workflowAutomationRouter)
+
# ============================================================================
# PLUG&PLAY FEATURE ROUTERS
# Dynamically load routers from feature containers in modules/features/
diff --git a/modules/datamodels/datamodelNavigation.py b/modules/datamodels/datamodelNavigation.py
index 2fc278ef..eb9d3b69 100644
--- a/modules/datamodels/datamodelNavigation.py
+++ b/modules/datamodels/datamodelNavigation.py
@@ -158,6 +158,54 @@ NAVIGATION_SECTIONS = [
},
],
},
+ # --- Workflow-Automation (System-Komponente, cross-mandate) ---
+ {
+ "id": "workflowAutomation",
+ "title": t("Workflow-Automation"),
+ "order": 25,
+ "items": [
+ {
+ "id": "wa-workflows",
+ "objectKey": "ui.system.workflowAutomation.workflows",
+ "label": t("Workflows"),
+ "icon": "FaSitemap",
+ "path": "/workflow-automation?tab=workflows",
+ "order": 10,
+ },
+ {
+ "id": "wa-editor",
+ "objectKey": "ui.system.workflowAutomation.editor",
+ "label": t("Editor"),
+ "icon": "FaProjectDiagram",
+ "path": "/workflow-automation?tab=editor",
+ "order": 20,
+ },
+ {
+ "id": "wa-templates",
+ "objectKey": "ui.system.workflowAutomation.templates",
+ "label": t("Vorlagen"),
+ "icon": "FaCopy",
+ "path": "/workflow-automation?tab=templates",
+ "order": 30,
+ },
+ {
+ "id": "wa-runs",
+ "objectKey": "ui.system.workflowAutomation.runs",
+ "label": t("Läufe"),
+ "icon": "FaPlay",
+ "path": "/workflow-automation?tab=runs",
+ "order": 40,
+ },
+ {
+ "id": "wa-tasks",
+ "objectKey": "ui.system.workflowAutomation.tasks",
+ "label": t("Tasks"),
+ "icon": "FaTasks",
+ "path": "/workflow-automation?tab=tasks",
+ "order": 50,
+ },
+ ],
+ },
# --- Administration (with subgroups) ---
{
"id": "admin",
diff --git a/modules/datamodels/datamodelWorkflowAutomation.py b/modules/datamodels/datamodelWorkflowAutomation.py
index 5f9cb7b2..c9957c25 100644
--- a/modules/datamodels/datamodelWorkflowAutomation.py
+++ b/modules/datamodels/datamodelWorkflowAutomation.py
@@ -77,14 +77,26 @@ class AutoWorkflow(PowerOnModel):
"fk_target": {"db": "poweron_app", "table": "Mandate", "labelField": "label"},
},
)
- featureInstanceId: str = Field(
- description="Feature instance ID (GE owner instance / RBAC scope)",
+ featureInstanceId: Optional[str] = Field(
+ default=None,
+ description="Feature instance ID (legacy GE owner — being phased out; NULL for mandate-level workflows)",
json_schema_extra={
"frontend_type": "text",
"frontend_readonly": True,
"frontend_required": False,
"label": "Feature-Instanz-ID",
- "fk_target": {"db": "poweron_app", "table": "FeatureInstance", "labelField": "label"},
+ "fk_target": {"db": "poweron_app", "table": "FeatureInstance", "labelField": "label", "softFk": True},
+ },
+ )
+ runAsPrincipal: Optional[str] = Field(
+ default=None,
+ description="Identity (userId or service-account) under which this workflow executes. Governs RBAC for data access at runtime.",
+ json_schema_extra={
+ "frontend_type": "text",
+ "frontend_readonly": False,
+ "frontend_required": False,
+ "label": "Ausführungsidentität",
+ "fk_target": {"db": "poweron_app", "table": "UserInDB", "labelField": "username", "softFk": True},
},
)
targetFeatureInstanceId: Optional[str] = Field(
diff --git a/modules/datamodels/serviceExceptions.py b/modules/datamodels/serviceExceptions.py
index 2aa94d95..7585c6a9 100644
--- a/modules/datamodels/serviceExceptions.py
+++ b/modules/datamodels/serviceExceptions.py
@@ -144,3 +144,28 @@ class BillingContextError(Exception):
def __init__(self, message: str = None):
self.message = message or "Billing context incomplete - AI call blocked"
super().__init__(self.message)
+
+
+# ============================================================================
+# Workflow execution pause exceptions
+# (Canonical location — formerly in automation2/executors/inputExecutor.py)
+# ============================================================================
+
+class PauseForHumanTaskError(Exception):
+ """Raised when execution must pause for a human task. Contains runId, taskId."""
+
+ def __init__(self, runId: str, taskId: str, nodeId: str):
+ self.runId = runId
+ self.taskId = taskId
+ self.nodeId = nodeId
+ super().__init__(f"Pause for human task {taskId} (run {runId}, node {nodeId})")
+
+
+class PauseForEmailWaitError(Exception):
+ """Raised when execution must pause waiting for a new email. Background poller will resume."""
+
+ def __init__(self, runId: str, nodeId: str, waitConfig: Dict[str, Any]):
+ self.runId = runId
+ self.nodeId = nodeId
+ self.waitConfig = waitConfig
+ super().__init__(f"Pause for email wait (run {runId}, node {nodeId})")
diff --git a/modules/demoConfigs/investorDemo2026.py b/modules/demoConfigs/investorDemo2026.py
index 6855a63c..0f7b0863 100644
--- a/modules/demoConfigs/investorDemo2026.py
+++ b/modules/demoConfigs/investorDemo2026.py
@@ -44,7 +44,7 @@ _USER = {
_FEATURES_HAPPYLIFE = [
{"code": "workspace", "label": "Dokumentenablage"},
{"code": "trustee", "label": "Buchhaltung"},
- {"code": "graphicalEditor", "label": "Automationen"},
+ {"code": "graphicalEditor", "label": "Automationen"}, # DEPRECATED: migrated to WorkflowAutomation system component
{"code": "neutralization", "label": "Datenschutz"},
]
_FEATURES_ALPINA = [
@@ -52,7 +52,7 @@ _FEATURES_ALPINA = [
{"code": "trustee", "label": "BUHA Müller Immobilien GmbH"},
{"code": "trustee", "label": "BUHA Schneider Gastro AG"},
{"code": "trustee", "label": "BUHA Weber Consulting"},
- {"code": "graphicalEditor", "label": "Automationen"},
+ {"code": "graphicalEditor", "label": "Automationen"}, # DEPRECATED: migrated to WorkflowAutomation system component
{"code": "neutralization", "label": "Datenschutz"},
]
diff --git a/modules/demoConfigs/pwgDemo2026.py b/modules/demoConfigs/pwgDemo2026.py
index 968aabf8..6e21e45a 100644
--- a/modules/demoConfigs/pwgDemo2026.py
+++ b/modules/demoConfigs/pwgDemo2026.py
@@ -49,7 +49,7 @@ _USER = {
_FEATURES_PWG = [
{"code": "workspace", "label": "Dokumentenablage PWG"},
{"code": "trustee", "label": "Buchhaltung PWG"},
- {"code": "graphicalEditor", "label": "PWG Automationen"},
+ {"code": "graphicalEditor", "label": "PWG Automationen"}, # DEPRECATED: migrated to WorkflowAutomation system component
{"code": "neutralization", "label": "Datenschutz"},
]
diff --git a/modules/features/graphicalEditor/mainGraphicalEditor.py b/modules/features/graphicalEditor/mainGraphicalEditor.py
index bf50abb2..f88ccfdc 100644
--- a/modules/features/graphicalEditor/mainGraphicalEditor.py
+++ b/modules/features/graphicalEditor/mainGraphicalEditor.py
@@ -26,25 +26,6 @@ REQUIRED_SERVICES = [
{"serviceKey": "generation", "meta": {"usage": "file.create document rendering"}},
]
FEATURE_LABEL = t("Grafischer Editor", context="UI")
-FEATURE_ICON = "mdi-sitemap"
-
-UI_OBJECTS = [
- {
- "objectKey": "ui.feature.graphicalEditor.editor",
- "label": t("Editor", context="UI"),
- "meta": {"area": "editor"}
- },
- {
- "objectKey": "ui.feature.graphicalEditor.templates",
- "label": t("Vorlagen", context="UI"),
- "meta": {"area": "templates"}
- },
- {
- "objectKey": "ui.feature.graphicalEditor.workflows-tasks",
- "label": t("Tasks", context="UI"),
- "meta": {"area": "tasks"}
- },
-]
RESOURCE_OBJECTS = [
{
@@ -64,41 +45,6 @@ RESOURCE_OBJECTS = [
},
]
-TEMPLATE_ROLES = [
- {
- "roleLabel": "graphicalEditor-viewer",
- "description": "Grafischer Editor Betrachter - Workflows ansehen (nur lesen)",
- "accessRules": [
- {"context": "UI", "item": "ui.feature.graphicalEditor.workflows", "view": True},
- {"context": "UI", "item": "ui.feature.graphicalEditor.workflows-tasks", "view": True},
- {"context": "UI", "item": "ui.feature.graphicalEditor.templates", "view": True},
- {"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
- ],
- },
- {
- "roleLabel": "graphicalEditor-user",
- "description": "Grafischer Editor Benutzer - Flow-Builder nutzen",
- "accessRules": [
- {"context": "UI", "item": "ui.feature.graphicalEditor.editor", "view": True},
- {"context": "UI", "item": "ui.feature.graphicalEditor.workflows", "view": True},
- {"context": "UI", "item": "ui.feature.graphicalEditor.workflows-tasks", "view": True},
- {"context": "UI", "item": "ui.feature.graphicalEditor.templates", "view": True},
- {"context": "RESOURCE", "item": "resource.feature.graphicalEditor.dashboard", "view": True},
- {"context": "RESOURCE", "item": "resource.feature.graphicalEditor.node-types", "view": True},
- {"context": "RESOURCE", "item": "resource.feature.graphicalEditor.execute", "view": True},
- {"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"},
- ],
- },
- {
- "roleLabel": "graphicalEditor-admin",
- "description": "Grafischer Editor Admin - Volle UI und API für die Instanz; Daten weiterhin benutzerspezifisch (MY)",
- "accessRules": [
- {"context": "UI", "item": None, "view": True},
- {"context": "RESOURCE", "item": None, "view": True},
- {"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"},
- ],
- },
-]
def getRequiredServiceKeys() -> List[str]:
@@ -186,28 +132,6 @@ class _GraphicalEditorServiceHub:
generation = None
-async def onStart(eventUser) -> None:
- """Feature startup: start consolidated scheduler."""
- from modules.workflows.scheduler.mainScheduler import start as startScheduler
- startScheduler(eventUser)
-
-
-async def onStop(eventUser) -> None:
- """Feature shutdown - stop scheduler and email poller."""
- from modules.workflows.scheduler.mainScheduler import stop as stopScheduler
- stopScheduler()
- from modules.features.graphicalEditor.emailPoller import stop as stopEmailPoller
- stopEmailPoller(eventUser)
-
-
-def getFeatureDefinition() -> Dict[str, Any]:
- """Return the feature definition for registration."""
- return {
- "code": FEATURE_CODE,
- "label": FEATURE_LABEL,
- "icon": FEATURE_ICON,
- "autoCreateInstance": False,
- }
# ---------------------------------------------------------------------------
@@ -473,125 +397,6 @@ def _buildSystemTemplates():
]
-def getUiObjects() -> List[Dict[str, Any]]:
- """Return UI objects for RBAC catalog registration."""
- return UI_OBJECTS
-
-
def getResourceObjects() -> List[Dict[str, Any]]:
"""Return resource objects for RBAC catalog registration."""
return RESOURCE_OBJECTS
-
-
-def getTemplateRoles() -> List[Dict[str, Any]]:
- """Return template roles for this feature."""
- return TEMPLATE_ROLES
-
-
-def registerFeature(catalogService) -> bool:
- """Register this feature's RBAC objects in the catalog."""
- try:
- for uiObj in UI_OBJECTS:
- catalogService.registerUiObject(
- featureCode=FEATURE_CODE,
- objectKey=uiObj["objectKey"],
- label=uiObj["label"],
- meta=uiObj.get("meta")
- )
- for resObj in RESOURCE_OBJECTS:
- catalogService.registerResourceObject(
- featureCode=FEATURE_CODE,
- objectKey=resObj["objectKey"],
- label=resObj["label"],
- meta=resObj.get("meta")
- )
- _syncTemplateRolesToDb()
- logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects")
- return True
- except Exception as e:
- logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
- return False
-
-
-def _syncTemplateRolesToDb() -> int:
- """Sync template roles and their AccessRules to database.
- Also syncs rules to mandate-specific roles (same roleLabel) so new UI objects
- become visible after gateway restart without manual role update.
- """
- try:
- from modules.interfaces.interfaceDbApp import getRootInterface
- from modules.datamodels.datamodelRbac import Role
- from modules.datamodels.datamodelUtils import coerce_text_multilingual
-
- rootInterface = getRootInterface()
- existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE)
- existingLabels = {r.roleLabel: str(r.id) for r in existingRoles if r.mandateId is None}
- created = 0
-
- for template in TEMPLATE_ROLES:
- roleLabel = template["roleLabel"]
- if roleLabel in existingLabels:
- roleId = existingLabels[roleLabel]
- else:
- newRole = Role(
- roleLabel=roleLabel,
- description=coerce_text_multilingual(template.get("description", {})),
- featureCode=FEATURE_CODE,
- mandateId=None,
- featureInstanceId=None,
- isSystemRole=False
- )
- rec = rootInterface.db.recordCreate(Role, newRole.model_dump())
- roleId = rec.get("id")
- created += 1
- logger.info(f"Created template role '{roleLabel}' for {FEATURE_CODE}")
-
- _ensureAccessRulesForRole(rootInterface, roleId, template.get("accessRules", []))
-
- for r in existingRoles:
- if r.mandateId and r.roleLabel == roleLabel:
- added = _ensureAccessRulesForRole(
- rootInterface, str(r.id), template.get("accessRules", [])
- )
- if added:
- logger.debug(f"Added {added} access rules to mandate role {r.id}")
- return created
- except Exception as e:
- logger.warning(f"Template role sync for {FEATURE_CODE}: {e}")
- return 0
-
-
-def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
- """Ensure AccessRules exist for a role based on templates."""
- from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
-
- existingRules = rootInterface.getAccessRulesByRole(roleId)
- existingSignatures = {
- (r.context.value if r.context else None, r.item)
- for r in existingRules
- }
- created = 0
- for t in ruleTemplates:
- context = t.get("context", "UI")
- item = t.get("item")
- sig = (context, item)
- if sig in existingSignatures:
- continue
- ctx_enum = (
- AccessRuleContext.UI if context == "UI" else
- AccessRuleContext.DATA if context == "DATA" else
- AccessRuleContext.RESOURCE if context == "RESOURCE" else context
- )
- newRule = AccessRule(
- roleId=roleId,
- context=ctx_enum,
- item=item,
- view=t.get("view", False),
- read=t.get("read"),
- create=t.get("create"),
- update=t.get("update"),
- delete=t.get("delete"),
- )
- rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
- created += 1
- return created
diff --git a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py
index 20a2708b..38d9d769 100644
--- a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py
+++ b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py
@@ -1,7 +1,10 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
-GraphicalEditor routes - node-types, execute, workflows, runs, tasks, connections, browse.
+DEPRECATED: These per-instance routes are superseded by /api/workflow-automation/
+(routeWorkflowAutomation.py). Kept for backward compatibility during migration.
+
+Original: GraphicalEditor routes - node-types, execute, workflows, runs, tasks, connections, browse.
"""
import asyncio
@@ -644,7 +647,8 @@ def get_templates(
from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoWorkflow
- enrichRowsWithFkLabels(templates, AutoWorkflow, db=iface.db)
+ from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIface
+ enrichRowsWithFkLabels(templates, AutoWorkflow, db=_getRootIface().db)
if mode == "filterValues":
if not column:
diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py
index 5ac5d089..9a6e2e26 100644
--- a/modules/interfaces/interfaceBootstrap.py
+++ b/modules/interfaces/interfaceBootstrap.py
@@ -1610,7 +1610,7 @@ def _createStoreResourceRules(db: DatabaseConnector) -> None:
"resource.store.workspace",
"resource.store.commcoach",
"resource.store.trustee",
- "resource.store.graphicalEditor",
+ "resource.store.graphicalEditor", # DEPRECATED: will move with WorkflowAutomation code restructuring
]
storeRules = []
diff --git a/modules/routes/routeAutomationWorkspace.py b/modules/routes/routeAutomationWorkspace.py
index 09c5238c..a93fff70 100644
--- a/modules/routes/routeAutomationWorkspace.py
+++ b/modules/routes/routeAutomationWorkspace.py
@@ -21,12 +21,12 @@ from slowapi.util import get_remote_address
from modules.auth.authentication import getRequestContext, RequestContext
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
-from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
+from modules.datamodels.datamodelWorkflowAutomation import (
AutoRun,
AutoStepLog,
AutoWorkflow,
+ GRAPHICAL_EDITOR_DATABASE,
)
-from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import graphicalEditorDatabase
from modules.workflows.automation2.workflowArtifactVisibility import suppress_workflow_file_in_workspace_ui
from modules.shared.i18nRegistry import apiRouteContext
@@ -40,7 +40,7 @@ router = APIRouter(prefix="/api/automations/runs", tags=["AutomationWorkspace"])
def _getDb() -> DatabaseConnector:
return DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
- dbDatabase=graphicalEditorDatabase,
+ dbDatabase=GRAPHICAL_EDITOR_DATABASE,
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
diff --git a/modules/routes/routeSystem.py b/modules/routes/routeSystem.py
index 56568cd9..217dfa14 100644
--- a/modules/routes/routeSystem.py
+++ b/modules/routes/routeSystem.py
@@ -105,9 +105,6 @@ def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]:
elif featureCode == "realestate":
from modules.features.realEstate.mainRealEstate import UI_OBJECTS
return UI_OBJECTS
- elif featureCode == "graphicalEditor":
- from modules.features.graphicalEditor.mainGraphicalEditor import UI_OBJECTS
- return UI_OBJECTS
elif featureCode == "teamsbot":
from modules.features.teamsbot.mainTeamsbot import UI_OBJECTS
return UI_OBJECTS
@@ -841,7 +838,7 @@ def _buildIntegrationsOverviewPayload(userId: str, user=None) -> Dict[str, Any]:
from modules.shared.configuration import APP_CONFIG
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelPagination import PaginationParams
- from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
+ from modules.datamodels.datamodelWorkflowAutomation import (
AutoWorkflow, AutoRun,
)
diff --git a/modules/routes/routeWorkflowAutomation.py b/modules/routes/routeWorkflowAutomation.py
new file mode 100644
index 00000000..6ce6fb21
--- /dev/null
+++ b/modules/routes/routeWorkflowAutomation.py
@@ -0,0 +1,453 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Mandatsweite WorkflowAutomation API.
+
+System-level API for workflows, runs, tasks — scoped by mandate membership,
+not by graphicalEditor FeatureInstance. Parallel to the legacy per-instance
+API in routeFeatureGraphicalEditor.py during the migration period.
+
+RBAC model:
+ - Read: mandate membership (user sees workflows in own mandates)
+ - Write/Execute: mandate admin or isPlatformAdmin
+ - isPlatformAdmin bypasses all checks
+"""
+
+import json
+import logging
+import time
+from typing import Optional, List, Dict, Any
+
+from fastapi import APIRouter, Depends, HTTPException, Query
+from slowapi import Limiter
+from slowapi.util import get_remote_address
+
+from modules.auth.authentication import getRequestContext, RequestContext
+from modules.connectors.connectorDbPostgre import DatabaseConnector
+from modules.datamodels.datamodelWorkflowAutomation import (
+ AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask,
+ GRAPHICAL_EDITOR_DATABASE,
+)
+from modules.datamodels.datamodelPagination import PaginationParams, normalize_pagination_dict
+from modules.interfaces.interfaceDbApp import getRootInterface
+from modules.shared.configuration import APP_CONFIG
+from modules.shared.i18nRegistry import apiRouteContext
+
+routeApiMsg = apiRouteContext("routeWorkflowAutomation")
+
+logger = logging.getLogger(__name__)
+limiter = Limiter(key_func=get_remote_address)
+
+router = APIRouter(prefix="/api/workflow-automation", tags=["WorkflowAutomation"])
+
+
+# ---------------------------------------------------------------------------
+# DB + RBAC helpers
+# ---------------------------------------------------------------------------
+
+def _getDb() -> DatabaseConnector:
+ return DatabaseConnector(
+ dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
+ dbDatabase=GRAPHICAL_EDITOR_DATABASE,
+ dbUser=APP_CONFIG.get("DB_USER"),
+ dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
+ dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
+ userId=None,
+ )
+
+
+def _getUserMandateIds(userId: str) -> List[str]:
+ rootIface = getRootInterface()
+ memberships = rootIface.getUserMandates(userId)
+ return [um.mandateId for um in memberships if um.mandateId and um.enabled]
+
+
+def _getAdminMandateIds(userId: str, mandateIds: List[str]) -> List[str]:
+ if not mandateIds:
+ return []
+ rootIface = getRootInterface()
+ from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
+
+ memberships = rootIface.db.getRecordset(
+ UserMandate,
+ recordFilter={"userId": userId, "mandateId": mandateIds, "enabled": True},
+ )
+ if not memberships:
+ return []
+
+ umIdToMandateId: Dict[str, str] = {}
+ for m in memberships:
+ row = m if isinstance(m, dict) else m.__dict__
+ um_id = row.get("id")
+ mid = row.get("mandateId")
+ if um_id and mid:
+ umIdToMandateId[str(um_id)] = str(mid)
+
+ userMandateIds = list(umIdToMandateId.keys())
+ allRoles = rootIface.db.getRecordset(
+ UserMandateRole,
+ recordFilter={"userMandateId": userMandateIds},
+ )
+ if not allRoles:
+ return []
+
+ roleIds: set = set()
+ roleToMandate: Dict[str, set] = {}
+ for r in allRoles:
+ row = r if isinstance(r, dict) else r.__dict__
+ rid = row.get("roleId")
+ um_id = row.get("userMandateId")
+ mid = umIdToMandateId.get(str(um_id)) if um_id else None
+ if rid and mid:
+ roleIds.add(rid)
+ roleToMandate.setdefault(rid, set()).add(mid)
+
+ if not roleIds:
+ return []
+
+ from modules.datamodels.datamodelRbac import Role
+ roleRecords = rootIface.db.getRecordset(Role, recordFilter={"id": list(roleIds)})
+ adminMandates: set = set()
+ for role in (roleRecords or []):
+ row = role if isinstance(role, dict) else role.__dict__
+ rid = row.get("id")
+ if not rid or rid not in roleToMandate:
+ continue
+ if row.get("roleLabel") == "admin" and not row.get("featureInstanceId"):
+ adminMandates.update(roleToMandate[rid])
+
+ return [mid for mid in mandateIds if mid in adminMandates]
+
+
+def _validateWorkflowAccess(
+ context: RequestContext,
+ workflow: Optional[Dict[str, Any]],
+ action: str = "read",
+) -> None:
+ """Validate access to a workflow based on mandate membership + admin status.
+
+ Actions: 'read' (mandate member), 'write'/'execute'/'delete' (mandate admin or platform admin).
+ Raises HTTPException(403) on denial.
+ """
+ if context.isPlatformAdmin:
+ return
+
+ userId = str(context.user.id) if context.user else None
+ if not userId:
+ raise HTTPException(status_code=403, detail="Authentication required")
+
+ if workflow is None:
+ raise HTTPException(status_code=404, detail="Workflow not found")
+
+ wfMandateId = workflow.get("mandateId") or ""
+ if not wfMandateId:
+ if action == "read":
+ return
+ raise HTTPException(status_code=403, detail="Workflow has no mandate — admin only")
+
+ userMandateIds = _getUserMandateIds(userId)
+ if wfMandateId not in userMandateIds:
+ raise HTTPException(status_code=403, detail="Not a member of the workflow's mandate")
+
+ if action == "read":
+ return
+
+ adminMandateIds = _getAdminMandateIds(userId, [wfMandateId])
+ if wfMandateId not in adminMandateIds:
+ raise HTTPException(
+ status_code=403,
+ detail=f"Mandate admin required for '{action}' on workflows",
+ )
+
+
+def _scopedWorkflowFilter(context: RequestContext) -> Optional[Dict[str, Any]]:
+ """Build DB filter for listing workflows: mandate-scoped for members, None for sysadmin."""
+ if context.isPlatformAdmin:
+ return None
+
+ userId = str(context.user.id) if context.user else None
+ if not userId:
+ return {"mandateId": "__impossible__"}
+
+ mandateIds = _getUserMandateIds(userId)
+ if mandateIds:
+ return {"mandateId": mandateIds}
+ return {"mandateId": "__impossible__"}
+
+
+def _scopedRunFilter(context: RequestContext) -> Optional[Dict[str, Any]]:
+ """Build DB filter for listing runs: admin sees mandate runs, user sees own."""
+ if context.isPlatformAdmin:
+ return None
+
+ userId = str(context.user.id) if context.user else None
+ if not userId:
+ return {"ownerId": "__impossible__"}
+
+ mandateIds = _getUserMandateIds(userId)
+ adminMandateIds = _getAdminMandateIds(userId, mandateIds)
+
+ if adminMandateIds:
+ return {"mandateId": adminMandateIds}
+ return {"ownerId": userId}
+
+
+def _parsePagination(pagination: Optional[str]) -> Optional[PaginationParams]:
+ if not pagination:
+ return None
+ try:
+ d = json.loads(pagination)
+ except json.JSONDecodeError:
+ raise HTTPException(status_code=400, detail="Invalid pagination JSON")
+ if not d:
+ return None
+ return normalize_pagination_dict(d)
+
+
+# ---------------------------------------------------------------------------
+# Workflow CRUD
+# ---------------------------------------------------------------------------
+
+@router.get("/workflows")
+async def _listWorkflows(
+ request: RequestContext = Depends(getRequestContext),
+ pagination: Optional[str] = Query(default=None),
+ mandateId: Optional[str] = Query(default=None),
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoWorkflow)
+ scopeFilter = _scopedWorkflowFilter(request)
+ if mandateId and scopeFilter is not None:
+ if mandateId not in (scopeFilter.get("mandateId") or []):
+ return {"items": [], "total": 0}
+ scopeFilter = {"mandateId": mandateId}
+ elif mandateId and scopeFilter is None:
+ scopeFilter = {"mandateId": mandateId}
+
+ params = _parsePagination(pagination)
+ records = db.getRecordset(AutoWorkflow, recordFilter=scopeFilter, pagination=params)
+ total = db.getRecordCount(AutoWorkflow, recordFilter=scopeFilter) if params else len(records or [])
+ return {"items": records or [], "total": total}
+ finally:
+ db.close()
+
+
+@router.get("/workflows/{workflowId}")
+async def _getWorkflow(
+ workflowId: str,
+ request: RequestContext = Depends(getRequestContext),
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoWorkflow)
+ wf = db.getRecord(AutoWorkflow, workflowId)
+ if not wf:
+ raise HTTPException(status_code=404, detail="Workflow not found")
+ _validateWorkflowAccess(request, wf, "read")
+ return wf
+ finally:
+ db.close()
+
+
+@router.post("/workflows")
+async def _createWorkflow(
+ request: RequestContext = Depends(getRequestContext),
+ body: Dict[str, Any] = {},
+):
+ mandateId = body.get("mandateId")
+ if not mandateId:
+ raise HTTPException(status_code=400, detail="mandateId required")
+
+ _validateWorkflowAccess(request, {"mandateId": mandateId}, "write")
+
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoWorkflow)
+ import uuid
+ data = {**body, "id": str(uuid.uuid4())}
+ if request.user:
+ data.setdefault("runAsPrincipal", str(request.user.id))
+ rec = db.recordCreate(AutoWorkflow, data)
+ return rec
+ finally:
+ db.close()
+
+
+@router.put("/workflows/{workflowId}")
+async def _updateWorkflow(
+ workflowId: str,
+ request: RequestContext = Depends(getRequestContext),
+ body: Dict[str, Any] = {},
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoWorkflow)
+ wf = db.getRecord(AutoWorkflow, workflowId)
+ _validateWorkflowAccess(request, wf, "write")
+ updated = db.recordModify(AutoWorkflow, workflowId, body)
+ return updated
+ finally:
+ db.close()
+
+
+@router.delete("/workflows/{workflowId}")
+async def _deleteWorkflow(
+ workflowId: str,
+ request: RequestContext = Depends(getRequestContext),
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoWorkflow)
+ wf = db.getRecord(AutoWorkflow, workflowId)
+ _validateWorkflowAccess(request, wf, "delete")
+
+ for v in db.getRecordset(AutoVersion, recordFilter={"workflowId": workflowId}) or []:
+ db.recordDelete(AutoVersion, v.get("id"))
+ for run in db.getRecordset(AutoRun, recordFilter={"workflowId": workflowId}) or []:
+ runId = run.get("id")
+ for sl in db.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
+ db.recordDelete(AutoStepLog, sl.get("id"))
+ db.recordDelete(AutoRun, runId)
+ for task in db.getRecordset(AutoTask, recordFilter={"workflowId": workflowId}) or []:
+ db.recordDelete(AutoTask, task.get("id"))
+ db.recordDelete(AutoWorkflow, workflowId)
+ return {"deleted": True, "workflowId": workflowId}
+ finally:
+ db.close()
+
+
+# ---------------------------------------------------------------------------
+# Runs
+# ---------------------------------------------------------------------------
+
+@router.get("/runs")
+async def _listRuns(
+ request: RequestContext = Depends(getRequestContext),
+ pagination: Optional[str] = Query(default=None),
+ mandateId: Optional[str] = Query(default=None),
+ workflowId: Optional[str] = Query(default=None),
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoRun)
+ scopeFilter = _scopedRunFilter(request)
+ if mandateId:
+ if scopeFilter is None:
+ scopeFilter = {"mandateId": mandateId}
+ elif "mandateId" in scopeFilter:
+ if mandateId not in scopeFilter["mandateId"]:
+ return {"items": [], "total": 0}
+ scopeFilter = {"mandateId": mandateId}
+ if workflowId:
+ scopeFilter = {**(scopeFilter or {}), "workflowId": workflowId}
+
+ params = _parsePagination(pagination)
+ records = db.getRecordset(AutoRun, recordFilter=scopeFilter, pagination=params)
+ total = db.getRecordCount(AutoRun, recordFilter=scopeFilter) if params else len(records or [])
+ return {"items": records or [], "total": total}
+ finally:
+ db.close()
+
+
+@router.get("/runs/{runId}")
+async def _getRun(
+ runId: str,
+ request: RequestContext = Depends(getRequestContext),
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoRun)
+ run = db.getRecord(AutoRun, runId)
+ if not run:
+ raise HTTPException(status_code=404, detail="Run not found")
+
+ wfId = run.get("workflowId")
+ if wfId:
+ wf = db.getRecord(AutoWorkflow, wfId)
+ _validateWorkflowAccess(request, wf, "read")
+ return run
+ finally:
+ db.close()
+
+
+# ---------------------------------------------------------------------------
+# Tasks
+# ---------------------------------------------------------------------------
+
+@router.get("/tasks")
+async def _listTasks(
+ request: RequestContext = Depends(getRequestContext),
+ pagination: Optional[str] = Query(default=None),
+ status: Optional[str] = Query(default=None),
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoTask)
+ scopeFilter: Optional[Dict[str, Any]] = None
+
+ if not request.isPlatformAdmin:
+ userId = str(request.user.id) if request.user else None
+ if not userId:
+ return {"items": [], "total": 0}
+ scopeFilter = {"assigneeId": userId}
+
+ if status:
+ scopeFilter = {**(scopeFilter or {}), "status": status}
+
+ params = _parsePagination(pagination)
+ records = db.getRecordset(AutoTask, recordFilter=scopeFilter, pagination=params)
+ total = db.getRecordCount(AutoTask, recordFilter=scopeFilter) if params else len(records or [])
+ return {"items": records or [], "total": total}
+ finally:
+ db.close()
+
+
+# ---------------------------------------------------------------------------
+# Versions
+# ---------------------------------------------------------------------------
+
+@router.get("/workflows/{workflowId}/versions")
+async def _listVersions(
+ workflowId: str,
+ request: RequestContext = Depends(getRequestContext),
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoWorkflow)
+ wf = db.getRecord(AutoWorkflow, workflowId)
+ _validateWorkflowAccess(request, wf, "read")
+
+ db._ensureTableExists(AutoVersion)
+ versions = db.getRecordset(AutoVersion, recordFilter={"workflowId": workflowId})
+ return {"items": versions or []}
+ finally:
+ db.close()
+
+
+# ---------------------------------------------------------------------------
+# Step logs
+# ---------------------------------------------------------------------------
+
+@router.get("/runs/{runId}/steps")
+async def _listStepLogs(
+ runId: str,
+ request: RequestContext = Depends(getRequestContext),
+):
+ db = _getDb()
+ try:
+ db._ensureTableExists(AutoRun)
+ run = db.getRecord(AutoRun, runId)
+ if not run:
+ raise HTTPException(status_code=404, detail="Run not found")
+
+ wfId = run.get("workflowId")
+ if wfId:
+ wf = db.getRecord(AutoWorkflow, wfId)
+ _validateWorkflowAccess(request, wf, "read")
+
+ db._ensureTableExists(AutoStepLog)
+ steps = db.getRecordset(AutoStepLog, recordFilter={"runId": runId})
+ return {"items": steps or []}
+ finally:
+ db.close()
diff --git a/modules/routes/routeWorkflowDashboard.py b/modules/routes/routeWorkflowDashboard.py
index f29fc557..020e5ec7 100644
--- a/modules/routes/routeWorkflowDashboard.py
+++ b/modules/routes/routeWorkflowDashboard.py
@@ -27,10 +27,10 @@ from modules.interfaces.interfaceDbApp import getRootInterface
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelPagination import PaginationParams, normalize_pagination_dict
-from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
+from modules.datamodels.datamodelWorkflowAutomation import (
AutoRun, AutoStepLog, AutoWorkflow, AutoTask, AutoVersion,
+ GRAPHICAL_EDITOR_DATABASE,
)
-from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import graphicalEditorDatabase
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeWorkflowDashboard")
@@ -44,7 +44,7 @@ router = APIRouter(prefix="/api/system/workflow-runs", tags=["WorkflowDashboard"
def _getDb() -> DatabaseConnector:
return DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
- dbDatabase=graphicalEditorDatabase,
+ dbDatabase=GRAPHICAL_EDITOR_DATABASE,
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
@@ -619,7 +619,8 @@ def get_workflow_runs(
for wf in (wfs or []):
wfMap[wf.get("id")] = wf
- from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels, resolveMandateLabels, resolveInstanceLabels
+ from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels, resolveMandateLabels, resolveInstanceLabels, resolveUserLabels
+ from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIface
runs = []
for r in pageRuns:
@@ -635,17 +636,20 @@ def get_workflow_runs(
row["featureInstanceId"] = fiid
runs.append(row)
+ appDb = _getRootIface().db
enrichRowsWithFkLabels(
runs,
db=db,
labelResolvers={
- "mandateId": partial(resolveMandateLabels, db),
- "featureInstanceId": partial(resolveInstanceLabels, db),
+ "mandateId": partial(resolveMandateLabels, appDb),
+ "featureInstanceId": partial(resolveInstanceLabels, appDb),
+ "ownerId": partial(resolveUserLabels, appDb),
},
)
for row in runs:
row["instanceLabel"] = row.pop("featureInstanceIdLabel", None)
row["mandateLabel"] = row.pop("mandateIdLabel", None)
+ row["ownerLabel"] = row.pop("ownerIdLabel", None)
return {"runs": runs, "total": total, "limit": limit, "offset": offset}
@@ -808,6 +812,9 @@ def get_system_workflows(
userMandateIds = _getUserMandateIds(userId)
adminMandateIds = _getAdminMandateIds(userId, userMandateIds)
+ from modules.dbHelpers.fkLabelResolver import resolveUserLabels as _resolveUserLabels
+ from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIface
+
fkSortField = _firstFkSortFieldForWorkflows(paginationParams)
if fkSortField:
from modules.dbHelpers.paginationHelpers import getRecordsetPaginatedWithFkSort, applyFiltersAndSort
@@ -869,17 +876,20 @@ def get_system_workflows(
row["canExecute"] = False
row.pop("graph", None)
items.append(row)
+ _appDb = _getRootIface().db
enrichRowsWithFkLabels(
items,
db=db,
labelResolvers={
- "mandateId": partial(resolveMandateLabels, db),
+ "mandateId": partial(resolveMandateLabels, _appDb),
"featureInstanceId": _resolveInstanceLabelsWithFeatureCode,
+ "ownerId": partial(_resolveUserLabels, _appDb),
},
)
for row in items:
row["instanceLabel"] = row.pop("featureInstanceIdLabel", None)
row["mandateLabel"] = row.pop("mandateIdLabel", None)
+ row["ownerLabel"] = row.pop("ownerIdLabel", None)
row["featureCode"] = featureCodeMap.get(row.get("featureInstanceId"))
if hasComputedFilter or hasComputedSort:
computedFilters = {
@@ -932,17 +942,20 @@ def get_system_workflows(
row["canExecute"] = False
row.pop("graph", None)
items.append(row)
+ _appDb2 = _getRootIface().db
enrichRowsWithFkLabels(
items,
db=db,
labelResolvers={
- "mandateId": partial(resolveMandateLabels, db),
+ "mandateId": partial(resolveMandateLabels, _appDb2),
"featureInstanceId": _resolveInstanceLabelsWithFeatureCode,
+ "ownerId": partial(_resolveUserLabels, _appDb2),
},
)
for row in items:
row["instanceLabel"] = row.pop("featureInstanceIdLabel", None)
row["mandateLabel"] = row.pop("mandateIdLabel", None)
+ row["ownerLabel"] = row.pop("ownerIdLabel", None)
row["featureCode"] = featureCodeMap.get(row.get("featureInstanceId"))
return {
diff --git a/modules/serviceCenter/services/serviceAgent/workflowTools.py b/modules/serviceCenter/services/serviceAgent/workflowTools.py
index 82eda22d..32defa2b 100644
--- a/modules/serviceCenter/services/serviceAgent/workflowTools.py
+++ b/modules/serviceCenter/services/serviceAgent/workflowTools.py
@@ -89,6 +89,7 @@ def _resolveMandateId(context: Any) -> str:
def _getInterface(context: Any, instanceId: str):
+ # DEPRECATED: will move with WorkflowAutomation code restructuring
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface
return getGraphicalEditorInterface(_resolveUser(context), _resolveMandateId(context), instanceId)
@@ -306,6 +307,7 @@ async def _list_upstream_paths(params: Dict[str, Any], context: Any) -> ToolResu
return _err(name, f"Workflow {workflow_id} not found")
graph = wf.get("graph", {}) or {}
+ # DEPRECATED: will move with WorkflowAutomation code restructuring
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths
paths = compute_upstream_paths(graph if isinstance(graph, dict) else {}, str(node_id))
@@ -436,6 +438,7 @@ async def _listAvailableNodeTypes(params: Dict[str, Any], context: Any) -> ToolR
"""
name = "listAvailableNodeTypes"
try:
+ # DEPRECATED: will move with WorkflowAutomation code restructuring
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
nodeTypes = []
for n in STATIC_NODE_TYPES:
@@ -462,6 +465,7 @@ async def _describeNodeType(params: Dict[str, Any], context: Any) -> ToolResult:
nodeType = params.get("nodeType") or params.get("id")
if not nodeType:
return _err(name, "nodeType required")
+ # DEPRECATED: will move with WorkflowAutomation code restructuring
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
target: Dict[str, Any] = {}
for n in STATIC_NODE_TYPES:
@@ -875,6 +879,7 @@ async def _exportWorkflowToFile(params: Dict[str, Any], context: Any) -> ToolRes
envelope = iface.exportWorkflowToDict(workflowId)
if envelope is None:
return _err(name, f"Workflow {workflowId} not found")
+ # DEPRECATED: will move with WorkflowAutomation code restructuring
from modules.features.graphicalEditor._workflowFileSchema import buildFileName
return _ok(name, {
"fileName": buildFileName(envelope.get("label", "workflow")),
diff --git a/modules/shared/documentUtils.py b/modules/shared/documentUtils.py
index cc08835c..37eec8a5 100644
--- a/modules/shared/documentUtils.py
+++ b/modules/shared/documentUtils.py
@@ -5,7 +5,10 @@ Document utility functions (Layer L0 - shared).
Pure text-processing helpers with zero internal dependencies.
"""
+import base64
+import binascii
import re
+from typing import Any, Optional
def parseInlineRuns(text: str) -> list:
@@ -62,3 +65,49 @@ def parseInlineRuns(text: str) -> list:
runs.append({"type": "text", "value": text[lastEnd:]})
return runs if runs else [{"type": "text", "value": text}]
+
+
+def _looksLikeAsciiBase64Payload(s: str) -> bool:
+ """Heuristic: ActionDocument binary payloads use standard ASCII base64; markdown/text uses other chars."""
+ t = "".join(s.split())
+ if len(t) < 8:
+ return False
+ if not t.isascii():
+ return False
+ return bool(re.fullmatch(r"[A-Za-z0-9+/]+=*", t)) and len(t) % 4 == 0
+
+
+def coerceDocumentDataToBytes(raw: Any) -> Optional[bytes]:
+ """Normalize documentData for DB file persistence.
+
+ ActionDocument conventions (see methodFile.create): binary bodies are carried as ASCII
+ base64 strings; plain markdown/text stays as Unicode. Do not UTF-8-encode a base64
+ literal — that persists the ASCII of the encoding (file looks like base64 gibberish).
+ """
+ if raw is None:
+ return None
+ if isinstance(raw, bytes):
+ return raw if len(raw) > 0 else None
+ if isinstance(raw, bytearray):
+ b = bytes(raw)
+ return b if len(b) > 0 else None
+ if isinstance(raw, memoryview):
+ b = raw.tobytes()
+ return b if len(b) > 0 else None
+ if isinstance(raw, str):
+ stripped = raw.strip()
+ if not stripped:
+ return None
+ if _looksLikeAsciiBase64Payload(stripped):
+ try:
+ decoded = base64.b64decode(stripped, validate=True)
+ except (TypeError, binascii.Error, ValueError):
+ try:
+ decoded = base64.b64decode(stripped)
+ except (binascii.Error, ValueError):
+ decoded = b""
+ if decoded:
+ return decoded
+ b = stripped.encode("utf-8")
+ return b if len(b) > 0 else None
+ return None
diff --git a/modules/system/i18nBootSync.py b/modules/system/i18nBootSync.py
index 96f1b69d..15501a0f 100644
--- a/modules/system/i18nBootSync.py
+++ b/modules/system/i18nBootSync.py
@@ -242,6 +242,7 @@ def _registerNodeLabels():
added += 1
try:
+ # DEPRECATED: will move with WorkflowAutomation code restructuring
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
for nd in STATIC_NODE_TYPES:
_reg(_extractRegistrySourceText(nd.get("label")), "node.label")
@@ -265,6 +266,7 @@ def _registerNodeLabels():
pass
try:
+ # DEPRECATED: will move with WorkflowAutomation code restructuring
from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG
for schema in PORT_TYPE_CATALOG.values():
for field in getattr(schema, "fields", []) or []:
diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py
index d5fdbd0e..b6313342 100644
--- a/modules/workflows/automation2/executionEngine.py
+++ b/modules/workflows/automation2/executionEngine.py
@@ -429,6 +429,52 @@ async def _executeWithRetry(executor, node, context, maxRetries: int = 0, retryD
raise lastError
+def _validateFeatureInstanceMandates(graph: Dict[str, Any], mandateId: str) -> None:
+ """Verify that all FeatureInstanceRef IDs in the graph belong to the workflow's mandate.
+
+ Logs a warning for each mismatch but does NOT abort execution — the node
+ executor will fail on its own with a more specific error if the instance is
+ truly inaccessible. This is a defence-in-depth guard (A0.2).
+ """
+ nodes = graph.get("nodes") if isinstance(graph, dict) else None
+ if not isinstance(nodes, list):
+ return
+ instanceIds: set = set()
+ for node in nodes:
+ if not isinstance(node, dict):
+ continue
+ params = node.get("parameters") or {}
+ ref = params.get("featureInstanceId")
+ if isinstance(ref, dict) and ref.get("$type") == "FeatureInstanceRef":
+ iid = ref.get("id")
+ if iid:
+ instanceIds.add(iid)
+ elif isinstance(ref, str) and ref.strip():
+ instanceIds.add(ref.strip())
+ if not instanceIds:
+ return
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ root = getRootInterface()
+ from modules.datamodels.datamodelFeatures import FeatureInstance
+ for iid in instanceIds:
+ fi = root.db.getRecord(FeatureInstance, iid)
+ if not fi:
+ logger.warning(
+ "MandateValidation: FeatureInstance %s referenced in graph not found", iid,
+ )
+ continue
+ fiMandateId = fi.get("mandateId") if isinstance(fi, dict) else getattr(fi, "mandateId", None)
+ if fiMandateId and fiMandateId != mandateId:
+ logger.warning(
+ "MandateValidation: FeatureInstance %s belongs to mandate %s, "
+ "but workflow mandate is %s — cross-mandate access",
+ iid, fiMandateId, mandateId,
+ )
+ except Exception as e:
+ logger.debug("MandateValidation: could not verify instances: %s", e)
+
+
def _substituteFeatureInstancePlaceholders(
graph: Dict[str, Any],
targetFeatureInstanceId: str,
@@ -675,6 +721,10 @@ async def executeGraph(
# Phase-5 Schicht-4: typed-ref envelopes are materialized FIRST so the
# subsequent connection-ref pass and validation see the canonical shape.
graph = materializeFeatureInstanceRefs(graph)
+
+ if mandateId:
+ _validateFeatureInstanceMandates(graph, mandateId)
+
graph = materializeConnectionRefs(graph)
graph = materializePrimaryTextHandover(graph)
graph = materializeRecommendedDataPickRef(graph)
diff --git a/modules/workflows/automation2/executors/actionNodeExecutor.py b/modules/workflows/automation2/executors/actionNodeExecutor.py
index 9af626d4..ee1101e5 100644
--- a/modules/workflows/automation2/executors/actionNodeExecutor.py
+++ b/modules/workflows/automation2/executors/actionNodeExecutor.py
@@ -7,8 +7,6 @@
# ``documentListWire`` is applied at runtime in this executor via graphUtils.extract_wired_document_list.
-import base64
-import binascii
import json
import logging
import re
@@ -122,50 +120,7 @@ def _log_file_create_context_resolution(
)
-def _looks_like_ascii_base64_payload(s: str) -> bool:
- """Heuristic: ActionDocument binary payloads use standard ASCII base64; markdown/text uses other chars (#, *, -, …)."""
- t = "".join(s.split())
- if len(t) < 8:
- return False
- if not t.isascii():
- return False
- return bool(re.fullmatch(r"[A-Za-z0-9+/]+=*", t)) and len(t) % 4 == 0
-
-
-def coerceDocumentDataToBytes(raw: Any) -> Optional[bytes]:
- """Normalize documentData for DB file persistence.
-
- ActionDocument conventions (see methodFile.create): binary bodies are carried as ASCII
- base64 strings; plain markdown/text stays as Unicode. Do not UTF-8-encode a base64
- literal — that persists the ASCII of the encoding (file looks like base64 gibberish).
- """
- if raw is None:
- return None
- if isinstance(raw, bytes):
- return raw if len(raw) > 0 else None
- if isinstance(raw, bytearray):
- b = bytes(raw)
- return b if len(b) > 0 else None
- if isinstance(raw, memoryview):
- b = raw.tobytes()
- return b if len(b) > 0 else None
- if isinstance(raw, str):
- stripped = raw.strip()
- if not stripped:
- return None
- if _looks_like_ascii_base64_payload(stripped):
- try:
- decoded = base64.b64decode(stripped, validate=True)
- except (TypeError, binascii.Error, ValueError):
- try:
- decoded = base64.b64decode(stripped)
- except (binascii.Error, ValueError):
- decoded = b""
- if decoded:
- return decoded
- b = stripped.encode("utf-8")
- return b if len(b) > 0 else None
- return None
+from modules.shared.documentUtils import coerceDocumentDataToBytes # noqa: F401 — re-export shim
def _image_documents_from_docs_list(docs_list: list) -> list:
diff --git a/modules/workflows/automation2/executors/inputExecutor.py b/modules/workflows/automation2/executors/inputExecutor.py
index 4ccef725..aaf31ff1 100644
--- a/modules/workflows/automation2/executors/inputExecutor.py
+++ b/modules/workflows/automation2/executors/inputExecutor.py
@@ -4,29 +4,11 @@
import logging
from typing import Dict, Any
+from modules.datamodels.serviceExceptions import PauseForHumanTaskError, PauseForEmailWaitError # noqa: F401 — re-export shim
+
logger = logging.getLogger(__name__)
-class PauseForHumanTaskError(Exception):
- """Raised when execution must pause for a human task. Contains runId, taskId."""
-
- def __init__(self, runId: str, taskId: str, nodeId: str):
- self.runId = runId
- self.taskId = taskId
- self.nodeId = nodeId
- super().__init__(f"Pause for human task {taskId} (run {runId}, node {nodeId})")
-
-
-class PauseForEmailWaitError(Exception):
- """Raised when execution must pause waiting for a new email. Background poller will resume."""
-
- def __init__(self, runId: str, nodeId: str, waitConfig: Dict[str, Any]):
- self.runId = runId
- self.nodeId = nodeId
- self.waitConfig = waitConfig
- super().__init__(f"Pause for email wait (run {runId}, node {nodeId})")
-
-
class InputExecutor:
"""
Execute input/human nodes. Creates a HumanTask, pauses the run, and raises
diff --git a/modules/workflows/methods/methodContext/actions/setContext.py b/modules/workflows/methods/methodContext/actions/setContext.py
index 10f292b7..24e10fc8 100644
--- a/modules/workflows/methods/methodContext/actions/setContext.py
+++ b/modules/workflows/methods/methodContext/actions/setContext.py
@@ -22,7 +22,7 @@ import logging
from typing import Any, Dict, List, Optional, Tuple
from modules.datamodels.datamodelChat import ActionResult
-from modules.workflows.automation2.executors.inputExecutor import PauseForHumanTaskError
+from modules.datamodels.serviceExceptions import PauseForHumanTaskError
logger = logging.getLogger(__name__)
diff --git a/modules/workflows/methods/methodFile/actions/create.py b/modules/workflows/methods/methodFile/actions/create.py
index cc5550ca..bb778c8f 100644
--- a/modules/workflows/methods/methodFile/actions/create.py
+++ b/modules/workflows/methods/methodFile/actions/create.py
@@ -13,7 +13,7 @@ import re
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.shared.i18nRegistry import normalizePrimaryLanguageTag
-from modules.workflows.automation2.executors.actionNodeExecutor import coerceDocumentDataToBytes
+from modules.shared.documentUtils import coerceDocumentDataToBytes
from modules.workflows.methods.methodAi._common import is_image_action_document_list
from modules.workflows.methods.methodContext.actions.extractContent import (
presentation_envelopes_to_document_json,
diff --git a/modules/workflows/scheduler/mainScheduler.py b/modules/workflows/scheduler/mainScheduler.py
index 9af9889f..11544015 100644
--- a/modules/workflows/scheduler/mainScheduler.py
+++ b/modules/workflows/scheduler/mainScheduler.py
@@ -93,9 +93,9 @@ class WorkflowScheduler:
activeWorkflowIds.add(workflowId)
cron = item.get("cron")
mandateId = item.get("mandateId")
- instanceId = item.get("featureInstanceId")
+ instanceId = item.get("featureInstanceId") or ""
- if not instanceId or not cron:
+ if not cron:
continue
jobId = f"{JOB_ID_PREFIX}{workflowId}"