From 04305c5f956733a24e4f1a61450076b9e1dbb871 Mon Sep 17 00:00:00 2001 From: idittrich-valueon Date: Sun, 22 Mar 2026 16:15:11 +0100 Subject: [PATCH] basic functionality --- .../datamodelFeatureAutomation2.py | 159 ++++++++++ .../interfaceFeatureAutomation2.py | 278 +++++++++++++++++ .../features/automation2/mainAutomation2.py | 28 +- .../automation2/nodeDefinitions/__init__.py | 3 +- .../automation2/nodeDefinitions/input.py | 117 ++++++++ modules/features/automation2/nodeRegistry.py | 3 +- .../automation2/routeFeatureAutomation2.py | 280 +++++++++++++++++- modules/interfaces/interfaceRbac.py | 4 + .../workflows/automation2/executionEngine.py | 98 +++++- .../automation2/executors/__init__.py | 10 +- .../automation2/executors/dataExecutor.py | 24 +- .../automation2/executors/flowExecutor.py | 29 +- .../automation2/executors/inputExecutor.py | 70 +++++ .../automation2/executors/ioExecutor.py | 19 +- .../automation2/executors/triggerExecutor.py | 18 +- modules/workflows/automation2/graphUtils.py | 18 +- scripts/script_db_init_automation2.py | 96 ++++++ 17 files changed, 1217 insertions(+), 37 deletions(-) create mode 100644 modules/features/automation2/datamodelFeatureAutomation2.py create mode 100644 modules/features/automation2/interfaceFeatureAutomation2.py create mode 100644 modules/features/automation2/nodeDefinitions/input.py create mode 100644 modules/workflows/automation2/executors/inputExecutor.py create mode 100644 scripts/script_db_init_automation2.py diff --git a/modules/features/automation2/datamodelFeatureAutomation2.py b/modules/features/automation2/datamodelFeatureAutomation2.py new file mode 100644 index 00000000..f505c7d0 --- /dev/null +++ b/modules/features/automation2/datamodelFeatureAutomation2.py @@ -0,0 +1,159 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Automation2 models: Automation2Workflow, Automation2WorkflowRun, Automation2HumanTask.""" + +from typing import Dict, Any, List, Optional +from pydantic import BaseModel, Field +from modules.shared.attributeUtils import registerModelLabels +import uuid + + +class Automation2Workflow(BaseModel): + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), + description="Primary key", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, + ) + mandateId: str = Field( + description="Mandate ID", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, + ) + featureInstanceId: str = Field( + description="Feature instance ID", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, + ) + label: str = Field( + description="User-friendly workflow name", + json_schema_extra={"frontend_type": "text", "frontend_required": True}, + ) + graph: Dict[str, Any] = Field( + default_factory=dict, + description="Graph with nodes and connections (incl. node parameters)", + json_schema_extra={"frontend_type": "textarea", "frontend_required": True}, + ) + active: bool = Field( + default=True, + description="Whether workflow is active", + json_schema_extra={"frontend_type": "checkbox", "frontend_required": False}, + ) + + +registerModelLabels( + "Automation2Workflow", + {"en": "Automation2 Workflow", "de": "Automation2 Workflow", "fr": "Workflow Automation2"}, + { + "id": {"en": "ID", "de": "ID", "fr": "ID"}, + "mandateId": {"en": "Mandate ID", "de": "Mandanten-ID", "fr": "ID du mandat"}, + "featureInstanceId": {"en": "Feature Instance ID", "de": "Feature-Instanz-ID", "fr": "ID instance"}, + "label": {"en": "Label", "de": "Bezeichnung", "fr": "Libellé"}, + "graph": {"en": "Graph", "de": "Graph", "fr": "Graphe"}, + "active": {"en": "Active", "de": "Aktiv", "fr": "Actif"}, + }, +) + + +class Automation2WorkflowRun(BaseModel): + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), + description="Primary key", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, + ) + workflowId: str = Field( + description="Workflow ID", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}, + ) + status: str = Field( + default="running", + description="Status: running|paused|completed|failed", + json_schema_extra={"frontend_type": "text", "frontend_required": False}, + ) + nodeOutputs: Dict[str, Any] = Field( + default_factory=dict, + description="Outputs from executed nodes", + json_schema_extra={"frontend_type": "textarea", "frontend_required": False}, + ) + currentNodeId: Optional[str] = Field( + default=None, + description="Node ID when paused (human task)", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, + ) + context: Dict[str, Any] = Field( + default_factory=dict, + description="Context for resume (connectionMap, inputSources, etc.)", + json_schema_extra={"frontend_type": "textarea", "frontend_required": False}, + ) + + +registerModelLabels( + "Automation2WorkflowRun", + {"en": "Automation2 Workflow Run", "de": "Automation2 Workflow-Ausführung", "fr": "Exécution workflow"}, + { + "id": {"en": "ID", "de": "ID", "fr": "ID"}, + "workflowId": {"en": "Workflow ID", "de": "Workflow-ID", "fr": "ID workflow"}, + "status": {"en": "Status", "de": "Status", "fr": "Statut"}, + "nodeOutputs": {"en": "Node Outputs", "de": "Node-Ausgaben", "fr": "Sorties nœuds"}, + "currentNodeId": {"en": "Current Node", "de": "Aktueller Knoten", "fr": "Nœud actuel"}, + "context": {"en": "Context", "de": "Kontext", "fr": "Contexte"}, + }, +) + + +class Automation2HumanTask(BaseModel): + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), + description="Primary key", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, + ) + runId: str = Field( + description="Workflow run ID", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}, + ) + workflowId: str = Field( + description="Workflow ID", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}, + ) + nodeId: str = Field( + description="Node ID in the graph", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}, + ) + nodeType: str = Field( + description="Node type: form|approval|upload|comment|review|selection|confirmation", + json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}, + ) + config: Dict[str, Any] = Field( + default_factory=dict, + description="Node config (form schema, approval text, etc.)", + json_schema_extra={"frontend_type": "textarea", "frontend_required": False}, + ) + assigneeId: Optional[str] = Field( + default=None, + description="User ID assigned to complete the task", + json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False}, + ) + status: str = Field( + default="pending", + description="Status: pending|completed|rejected", + json_schema_extra={"frontend_type": "text", "frontend_required": False}, + ) + result: Optional[Dict[str, Any]] = Field( + default=None, + description="Task result (form data, approval decision, etc.)", + json_schema_extra={"frontend_type": "textarea", "frontend_required": False}, + ) + + +registerModelLabels( + "Automation2HumanTask", + {"en": "Automation2 Human Task", "de": "Automation2 Benutzer-Aufgabe", "fr": "Tâche utilisateur"}, + { + "id": {"en": "ID", "de": "ID", "fr": "ID"}, + "runId": {"en": "Run ID", "de": "Lauf-ID", "fr": "ID exécution"}, + "workflowId": {"en": "Workflow ID", "de": "Workflow-ID", "fr": "ID workflow"}, + "nodeId": {"en": "Node ID", "de": "Knoten-ID", "fr": "ID nœud"}, + "nodeType": {"en": "Node Type", "de": "Knotentyp", "fr": "Type nœud"}, + "config": {"en": "Config", "de": "Konfiguration", "fr": "Configuration"}, + "assigneeId": {"en": "Assignee", "de": "Zugewiesen an", "fr": "Assigné à"}, + "status": {"en": "Status", "de": "Status", "fr": "Statut"}, + "result": {"en": "Result", "de": "Ergebnis", "fr": "Résultat"}, + }, +) diff --git a/modules/features/automation2/interfaceFeatureAutomation2.py b/modules/features/automation2/interfaceFeatureAutomation2.py new file mode 100644 index 00000000..fd27f81b --- /dev/null +++ b/modules/features/automation2/interfaceFeatureAutomation2.py @@ -0,0 +1,278 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Interface for Automation2 feature - Workflows, Runs, Human Tasks. +Uses PostgreSQL poweron_automation2 database. +""" + +import logging +import uuid +from typing import Dict, Any, List, Optional + +from modules.datamodels.datamodelUam import User +from modules.features.automation2.datamodelFeatureAutomation2 import ( + Automation2Workflow, + Automation2WorkflowRun, + Automation2HumanTask, +) +from modules.connectors.connectorDbPostgre import DatabaseConnector +from modules.shared.configuration import APP_CONFIG + +logger = logging.getLogger(__name__) + + +def getAutomation2Interface( + currentUser: User, + mandateId: str, + featureInstanceId: str, +) -> "Automation2Objects": + """Factory for Automation2 interface with user context.""" + return Automation2Objects( + currentUser=currentUser, + mandateId=mandateId, + featureInstanceId=featureInstanceId, + ) + + +class Automation2Objects: + """Interface for Automation2 database operations.""" + + def __init__( + self, + currentUser: User, + mandateId: str, + featureInstanceId: str, + ): + self.currentUser = currentUser + self.mandateId = mandateId + self.featureInstanceId = featureInstanceId + self.userId = currentUser.id if currentUser else None + self._init_db() + if hasattr(self.db, "updateContext") and self.userId: + self.db.updateContext(self.userId) + + def _init_db(self): + """Initialize database connection to poweron_automation2.""" + dbHost = APP_CONFIG.get("DB_HOST", "localhost") + dbDatabase = "poweron_automation2" + dbUser = APP_CONFIG.get("DB_USER") + dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD") + dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) + self.db = DatabaseConnector( + dbHost=dbHost, + dbDatabase=dbDatabase, + dbUser=dbUser, + dbPassword=dbPassword, + dbPort=dbPort, + userId=self.userId, + ) + logger.debug("Automation2 database initialized for user %s", self.userId) + + # ------------------------------------------------------------------------- + # Workflow CRUD + # ------------------------------------------------------------------------- + + def getWorkflows(self) -> List[Dict[str, Any]]: + """Get all workflows for this mandate and feature instance.""" + if not self.db._ensureTableExists(Automation2Workflow): + return [] + records = self.db.getRecordset( + Automation2Workflow, + recordFilter={ + "mandateId": self.mandateId, + "featureInstanceId": self.featureInstanceId, + }, + ) + return [dict(r) for r in records] if records else [] + + def getWorkflow(self, workflowId: str) -> Optional[Dict[str, Any]]: + """Get a single workflow by ID.""" + if not self.db._ensureTableExists(Automation2Workflow): + return None + records = self.db.getRecordset( + Automation2Workflow, + recordFilter={ + "id": workflowId, + "mandateId": self.mandateId, + "featureInstanceId": self.featureInstanceId, + }, + ) + if not records: + return None + return dict(records[0]) + + def createWorkflow(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Create a new workflow.""" + if "id" not in data or not data.get("id"): + data["id"] = str(uuid.uuid4()) + data["mandateId"] = self.mandateId + data["featureInstanceId"] = self.featureInstanceId + created = self.db.recordCreate(Automation2Workflow, data) + return dict(created) + + def updateWorkflow(self, workflowId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Update an existing workflow.""" + existing = self.getWorkflow(workflowId) + if not existing: + return None + # Don't overwrite mandateId/featureInstanceId + data.pop("mandateId", None) + data.pop("featureInstanceId", None) + updated = self.db.recordModify(Automation2Workflow, workflowId, data) + return dict(updated) + + def deleteWorkflow(self, workflowId: str) -> bool: + """Delete a workflow.""" + existing = self.getWorkflow(workflowId) + if not existing: + return False + self.db.recordDelete(Automation2Workflow, workflowId) + return True + + # ------------------------------------------------------------------------- + # Workflow Runs + # ------------------------------------------------------------------------- + + def createRun(self, workflowId: str, nodeOutputs: Dict = None, context: Dict = None) -> Dict[str, Any]: + """Create a new workflow run.""" + data = { + "id": str(uuid.uuid4()), + "workflowId": workflowId, + "status": "running", + "nodeOutputs": nodeOutputs or {}, + "currentNodeId": None, + "context": context or {}, + } + created = self.db.recordCreate(Automation2WorkflowRun, data) + return dict(created) + + def getRun(self, runId: str) -> Optional[Dict[str, Any]]: + """Get a run by ID.""" + if not self.db._ensureTableExists(Automation2WorkflowRun): + return None + records = self.db.getRecordset( + Automation2WorkflowRun, + recordFilter={"id": runId}, + ) + if not records: + return None + return dict(records[0]) + + def updateRun( + self, + runId: str, + status: str = None, + nodeOutputs: Dict = None, + currentNodeId: str = None, + context: Dict = None, + ) -> Optional[Dict[str, Any]]: + """Update a run.""" + run = self.getRun(runId) + if not run: + return None + updates = {} + if status is not None: + updates["status"] = status + if nodeOutputs is not None: + updates["nodeOutputs"] = nodeOutputs + if currentNodeId is not None: + updates["currentNodeId"] = currentNodeId + if context is not None: + updates["context"] = context + if not updates: + return run + updated = self.db.recordModify(Automation2WorkflowRun, runId, updates) + return dict(updated) + + def getRunsByWorkflow(self, workflowId: str) -> List[Dict[str, Any]]: + """Get all runs for a workflow.""" + if not self.db._ensureTableExists(Automation2WorkflowRun): + return [] + records = self.db.getRecordset( + Automation2WorkflowRun, + recordFilter={"workflowId": workflowId}, + ) + return [dict(r) for r in records] if records else [] + + # ------------------------------------------------------------------------- + # Human Tasks + # ------------------------------------------------------------------------- + + def createTask( + self, + runId: str, + workflowId: str, + nodeId: str, + nodeType: str, + config: Dict[str, Any], + assigneeId: str = None, + ) -> Dict[str, Any]: + """Create a human task.""" + data = { + "id": str(uuid.uuid4()), + "runId": runId, + "workflowId": workflowId, + "nodeId": nodeId, + "nodeType": nodeType, + "config": config, + "assigneeId": assigneeId, + "status": "pending", + "result": None, + } + created = self.db.recordCreate(Automation2HumanTask, data) + return dict(created) + + def getTask(self, taskId: str) -> Optional[Dict[str, Any]]: + """Get a task by ID.""" + if not self.db._ensureTableExists(Automation2HumanTask): + return None + records = self.db.getRecordset( + Automation2HumanTask, + recordFilter={"id": taskId}, + ) + if not records: + return None + return dict(records[0]) + + def updateTask(self, taskId: str, status: str = None, result: Dict = None) -> Optional[Dict[str, Any]]: + """Update a task (e.g. complete with result).""" + task = self.getTask(taskId) + if not task: + return None + updates = {} + if status is not None: + updates["status"] = status + if result is not None: + updates["result"] = result + if not updates: + return task + updated = self.db.recordModify(Automation2HumanTask, taskId, updates) + return dict(updated) + + def getTasks( + self, + workflowId: str = None, + runId: str = None, + status: str = None, + assigneeId: str = None, + ) -> List[Dict[str, Any]]: + """Get tasks with optional filters. AssigneeId filters to that user; None returns all.""" + if not self.db._ensureTableExists(Automation2HumanTask): + return [] + rf = {} + if workflowId: + rf["workflowId"] = workflowId + if runId: + rf["runId"] = runId + if status: + rf["status"] = status + if assigneeId: + rf["assigneeId"] = assigneeId + records = self.db.getRecordset( + Automation2HumanTask, + recordFilter=rf if rf else None, + ) + items = [dict(r) for r in records] if records else [] + workflows = {w["id"]: w for w in self.getWorkflows()} + filtered = [t for t in items if t.get("workflowId") in workflows] + return filtered diff --git a/modules/features/automation2/mainAutomation2.py b/modules/features/automation2/mainAutomation2.py index 7adf6bb9..47f2fb62 100644 --- a/modules/features/automation2/mainAutomation2.py +++ b/modules/features/automation2/mainAutomation2.py @@ -25,9 +25,14 @@ FEATURE_ICON = "mdi-sitemap" UI_OBJECTS = [ { - "objectKey": "ui.feature.automation2.dashboard", - "label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"}, - "meta": {"area": "dashboard"} + "objectKey": "ui.feature.automation2.editor", + "label": {"en": "Editor", "de": "Editor", "fr": "Éditeur"}, + "meta": {"area": "editor"} + }, + { + "objectKey": "ui.feature.automation2.workflows-tasks", + "label": {"en": "Workflows & Tasks", "de": "Workflows & Tasks", "fr": "Workflows et tâches"}, + "meta": {"area": "tasks"} }, ] @@ -58,7 +63,8 @@ TEMPLATE_ROLES = [ "fr": "Utilisateur Automation2 - Utiliser le flow builder" }, "accessRules": [ - {"context": "UI", "item": "ui.feature.automation2.dashboard", "view": True}, + {"context": "UI", "item": "ui.feature.automation2.editor", "view": True}, + {"context": "UI", "item": "ui.feature.automation2.workflows-tasks", "view": True}, {"context": "RESOURCE", "item": "resource.feature.automation2.dashboard", "view": True}, {"context": "RESOURCE", "item": "resource.feature.automation2.node-types", "view": True}, {"context": "RESOURCE", "item": "resource.feature.automation2.execute", "view": True}, @@ -194,7 +200,10 @@ def registerFeature(catalogService) -> bool: def _syncTemplateRolesToDb() -> int: - """Sync template roles and their AccessRules to database.""" + """Sync template roles and their AccessRules to database. + Also syncs rules to mandate-specific roles (same roleLabel) so new UI objects + become visible after gateway restart without manual role update. + """ try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelRbac import Role @@ -223,6 +232,15 @@ def _syncTemplateRolesToDb() -> int: logger.info(f"Created template role '{roleLabel}' for {FEATURE_CODE}") _ensureAccessRulesForRole(rootInterface, roleId, template.get("accessRules", [])) + + # Sync same rules to mandate-specific roles (so Workflows & Tasks etc. appear in sidebar) + for r in existingRoles: + if r.mandateId and r.roleLabel == roleLabel: + added = _ensureAccessRulesForRole( + rootInterface, str(r.id), template.get("accessRules", []) + ) + if added: + logger.debug(f"Added {added} access rules to mandate role {r.id}") return created except Exception as e: logger.warning(f"Template role sync for {FEATURE_CODE}: {e}") diff --git a/modules/features/automation2/nodeDefinitions/__init__.py b/modules/features/automation2/nodeDefinitions/__init__.py index eb2b4a6f..f25e61d2 100644 --- a/modules/features/automation2/nodeDefinitions/__init__.py +++ b/modules/features/automation2/nodeDefinitions/__init__.py @@ -4,5 +4,6 @@ from .triggers import TRIGGER_NODES from .flow import FLOW_NODES from .data import DATA_NODES +from .input import INPUT_NODES -STATIC_NODE_TYPES = TRIGGER_NODES + FLOW_NODES + DATA_NODES +STATIC_NODE_TYPES = TRIGGER_NODES + FLOW_NODES + DATA_NODES + INPUT_NODES diff --git a/modules/features/automation2/nodeDefinitions/input.py b/modules/features/automation2/nodeDefinitions/input.py new file mode 100644 index 00000000..8eb43e63 --- /dev/null +++ b/modules/features/automation2/nodeDefinitions/input.py @@ -0,0 +1,117 @@ +# Copyright (c) 2025 Patrick Motsch +# Input/Human node definitions - nodes that require user action. + +INPUT_NODES = [ + { + "id": "input.form", + "category": "input", + "label": {"en": "Form", "de": "Formular", "fr": "Formulaire"}, + "description": {"en": "User fills out a form", "de": "Benutzer füllt ein Formular aus", "fr": "L'utilisateur remplit un formulaire"}, + "parameters": [ + { + "name": "fields", + "type": "json", + "required": True, + "description": {"en": "Form fields: [{name, type, label, required, options?}]", "de": "Formularfelder", "fr": "Champs du formulaire"}, + "default": [], + }, + ], + "inputs": 1, + "outputs": 1, + "executor": "input", + "meta": {"icon": "mdi-form-textbox", "color": "#9C27B0"}, + }, + { + "id": "input.approval", + "category": "input", + "label": {"en": "Approval", "de": "Genehmigung", "fr": "Approbation"}, + "description": {"en": "User approves or rejects", "de": "Benutzer genehmigt oder lehnt ab", "fr": "L'utilisateur approuve ou rejette"}, + "parameters": [ + {"name": "title", "type": "string", "required": True, "description": {"en": "Approval title", "de": "Genehmigungstitel", "fr": "Titre"}}, + {"name": "description", "type": "string", "required": False, "description": {"en": "What to approve", "de": "Was genehmigt werden soll", "fr": "Ce qu'il faut approuver"}}, + {"name": "approvalType", "type": "string", "required": False, "description": {"en": "Type: document or generic", "de": "Typ: document oder generic", "fr": "Type: document ou generic"}, "default": "generic"}, + ], + "inputs": 1, + "outputs": 1, + "executor": "input", + "meta": {"icon": "mdi-check-decagram", "color": "#4CAF50"}, + }, + { + "id": "input.upload", + "category": "input", + "label": {"en": "Upload", "de": "Upload", "fr": "Téléversement"}, + "description": {"en": "User uploads file(s)", "de": "Benutzer lädt Datei(en) hoch", "fr": "L'utilisateur téléverse des fichiers"}, + "parameters": [ + {"name": "accept", "type": "string", "required": False, "description": {"en": "MIME types (e.g. .pdf,image/*)", "de": "MIME-Typen", "fr": "Types MIME"}, "default": ""}, + {"name": "maxSize", "type": "number", "required": False, "description": {"en": "Max file size in MB", "de": "Max. Dateigröße in MB", "fr": "Taille max en Mo"}, "default": 10}, + {"name": "multiple", "type": "boolean", "required": False, "description": {"en": "Allow multiple files", "de": "Mehrere Dateien erlauben", "fr": "Autoriser plusieurs fichiers"}, "default": False}, + ], + "inputs": 1, + "outputs": 1, + "executor": "input", + "meta": {"icon": "mdi-upload", "color": "#2196F3"}, + }, + { + "id": "input.comment", + "category": "input", + "label": {"en": "Comment", "de": "Kommentar", "fr": "Commentaire"}, + "description": {"en": "User adds a comment", "de": "Benutzer fügt einen Kommentar hinzu", "fr": "L'utilisateur ajoute un commentaire"}, + "parameters": [ + {"name": "placeholder", "type": "string", "required": False, "description": {"en": "Placeholder text", "de": "Platzhalter", "fr": "Texte indicatif"}, "default": ""}, + {"name": "required", "type": "boolean", "required": False, "description": {"en": "Comment required", "de": "Kommentar erforderlich", "fr": "Commentaire requis"}, "default": True}, + ], + "inputs": 1, + "outputs": 1, + "executor": "input", + "meta": {"icon": "mdi-comment-text", "color": "#FF9800"}, + }, + { + "id": "input.review", + "category": "input", + "label": {"en": "Review", "de": "Prüfung", "fr": "Revue"}, + "description": {"en": "User reviews content", "de": "Benutzer prüft Inhalt", "fr": "L'utilisateur révise le contenu"}, + "parameters": [ + {"name": "contentRef", "type": "string", "required": True, "description": {"en": "Reference to content (e.g. {{nodeId.field}})", "de": "Referenz auf Inhalt", "fr": "Référence au contenu"}}, + {"name": "reviewType", "type": "string", "required": False, "description": {"en": "Type of review", "de": "Art der Prüfung", "fr": "Type de revue"}, "default": "generic"}, + ], + "inputs": 1, + "outputs": 1, + "executor": "input", + "meta": {"icon": "mdi-magnify-scan", "color": "#673AB7"}, + }, + { + "id": "input.selection", + "category": "input", + "label": {"en": "Selection", "de": "Auswahl", "fr": "Sélection"}, + "description": {"en": "User selects from options", "de": "Benutzer wählt aus Optionen", "fr": "L'utilisateur choisit parmi les options"}, + "parameters": [ + { + "name": "options", + "type": "json", + "required": True, + "description": {"en": "Options: [{value, label}]", "de": "Optionen", "fr": "Options"}, + "default": [], + }, + {"name": "multiple", "type": "boolean", "required": False, "description": {"en": "Allow multiple selection", "de": "Mehrfachauswahl erlauben", "fr": "Sélection multiple"}, "default": False}, + ], + "inputs": 1, + "outputs": 1, + "executor": "input", + "meta": {"icon": "mdi-format-list-checks", "color": "#009688"}, + }, + { + "id": "input.confirmation", + "category": "input", + "label": {"en": "Confirmation", "de": "Bestätigung", "fr": "Confirmation"}, + "description": {"en": "User confirms yes/no", "de": "Benutzer bestätigt Ja/Nein", "fr": "L'utilisateur confirme oui/non"}, + "parameters": [ + {"name": "question", "type": "string", "required": True, "description": {"en": "Question to confirm", "de": "Zu bestätigende Frage", "fr": "Question à confirmer"}}, + {"name": "confirmLabel", "type": "string", "required": False, "description": {"en": "Label for confirm button", "de": "Label für Bestätigen-Button", "fr": "Libellé du bouton confirmer"}, "default": "Confirm"}, + {"name": "rejectLabel", "type": "string", "required": False, "description": {"en": "Label for reject button", "de": "Label für Ablehnen-Button", "fr": "Libellé du bouton refuser"}, "default": "Reject"}, + ], + "inputs": 1, + "outputs": 1, + "executor": "input", + "meta": {"icon": "mdi-checkbox-marked-circle", "color": "#8BC34A"}, + }, +] diff --git a/modules/features/automation2/nodeRegistry.py b/modules/features/automation2/nodeRegistry.py index 1cec085d..c8615ea6 100644 --- a/modules/features/automation2/nodeRegistry.py +++ b/modules/features/automation2/nodeRegistry.py @@ -158,10 +158,9 @@ def getNodeTypesForApi( localized = [_localizeNode(n, language) for n in nodes] categories = [ {"id": "trigger", "label": {"en": "Trigger", "de": "Trigger", "fr": "Déclencheur"}}, - {"id": "input", "label": {"en": "Input", "de": "Eingabe", "fr": "Entrée"}}, + {"id": "input", "label": {"en": "Input/Human", "de": "Eingabe/Mensch", "fr": "Entrée/Humain"}}, {"id": "flow", "label": {"en": "Flow", "de": "Ablauf", "fr": "Flux"}}, {"id": "data", "label": {"en": "Data", "de": "Daten", "fr": "Données"}}, {"id": "io", "label": {"en": "I/O", "de": "E/A", "fr": "E/S"}}, - {"id": "human", "label": {"en": "Human", "de": "Mensch", "fr": "Humain"}}, ] return {"nodeTypes": localized, "categories": categories} diff --git a/modules/features/automation2/routeFeatureAutomation2.py b/modules/features/automation2/routeFeatureAutomation2.py index ab048e0d..488e48ca 100644 --- a/modules/features/automation2/routeFeatureAutomation2.py +++ b/modules/features/automation2/routeFeatureAutomation2.py @@ -1,15 +1,16 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ -Automation2 routes - node-types, execute, info. +Automation2 routes - node-types, execute, workflows, runs, tasks. """ import logging -from fastapi import APIRouter, Depends, Path, Query, Body +from fastapi import APIRouter, Depends, Path, Query, Body, Request, HTTPException from modules.auth import limiter, getRequestContext, RequestContext from modules.features.automation2.mainAutomation2 import getAutomation2Services from modules.features.automation2.nodeRegistry import getNodeTypesForApi +from modules.features.automation2.interfaceFeatureAutomation2 import getAutomation2Interface from modules.workflows.automation2.executionEngine import executeGraph logger = logging.getLogger(__name__) @@ -39,6 +40,7 @@ def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str: @router.get("/{instanceId}/info") @limiter.limit("60/minute") def get_automation2_info( + request: Request, instanceId: str = Path(..., description="Feature instance ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: @@ -55,28 +57,44 @@ def get_automation2_info( @router.get("/{instanceId}/node-types") @limiter.limit("60/minute") def get_node_types( + request: Request, instanceId: str = Path(..., description="Feature instance ID"), language: str = Query("en", description="Localization (en, de, fr)"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Return node types for the flow builder: static + I/O from methodDiscovery.""" + logger.info("automation2 node-types request: instanceId=%s language=%s", instanceId, language) mandateId = _validateInstanceAccess(instanceId, context) services = getAutomation2Services( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) - return getNodeTypesForApi(services, language=language) + result = getNodeTypesForApi(services, language=language) + logger.info( + "automation2 node-types response: %d nodeTypes %d categories", + len(result.get("nodeTypes", [])), + len(result.get("categories", [])), + ) + return result @router.post("/{instanceId}/execute") @limiter.limit("30/minute") async def post_execute( + request: Request, instanceId: str = Path(..., description="Feature instance ID"), body: dict = Body(..., description="{ workflowId?, graph: { nodes, connections } }"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Execute automation2 graph. Body: { workflowId?, graph: { nodes, connections } }.""" + userId = str(context.user.id) if context.user else None + logger.info( + "automation2 execute request: instanceId=%s userId=%s body_keys=%s", + instanceId, + userId, + list(body.keys()), + ) mandateId = _validateInstanceAccess(instanceId, context) services = getAutomation2Services( context.user, @@ -85,12 +103,266 @@ async def post_execute( ) graph = body.get("graph") or body workflowId = body.get("workflowId") + if workflowId: + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + wf = a2.getWorkflow(workflowId) + if wf and wf.get("graph"): + graph = wf["graph"] + logger.info("automation2 execute: loaded graph from workflow %s", workflowId) + nodes_count = len(graph.get("nodes") or []) + connections_count = len(graph.get("connections") or []) + logger.info( + "automation2 execute: graph nodes=%d connections=%d workflowId=%s mandateId=%s", + nodes_count, + connections_count, + workflowId, + mandateId, + ) + a2_interface = getAutomation2Interface(context.user, mandateId, instanceId) if workflowId else None result = await executeGraph( + graph=graph, + services=services, + workflowId=workflowId, + instanceId=instanceId, + userId=userId, + mandateId=mandateId, + automation2_interface=a2_interface, + ) + logger.info( + "automation2 execute result: success=%s error=%s nodeOutputs_keys=%s failedNode=%s paused=%s", + result.get("success"), + result.get("error"), + list(result.get("nodeOutputs", {}).keys()) if result.get("nodeOutputs") else [], + result.get("failedNode"), + result.get("paused"), + ) + return result + + +# ------------------------------------------------------------------------- +# Workflow CRUD +# ------------------------------------------------------------------------- + + +@router.get("/{instanceId}/workflows") +@limiter.limit("60/minute") +def get_workflows( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """List all workflows for this feature instance.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + items = a2.getWorkflows() + return {"workflows": items} + + +@router.get("/{instanceId}/workflows/{workflowId}") +@limiter.limit("60/minute") +def get_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Get a single workflow by ID.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + wf = a2.getWorkflow(workflowId) + if not wf: + raise HTTPException(status_code=404, detail="Workflow not found") + return wf + + +@router.post("/{instanceId}/workflows") +@limiter.limit("30/minute") +def create_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + body: dict = Body(..., description="{ label, graph }"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Create a new workflow.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + created = a2.createWorkflow(body) + return created + + +@router.put("/{instanceId}/workflows/{workflowId}") +@limiter.limit("30/minute") +def update_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + body: dict = Body(..., description="{ label?, graph? }"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Update a workflow.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + updated = a2.updateWorkflow(workflowId, body) + if not updated: + raise HTTPException(status_code=404, detail="Workflow not found") + return updated + + +@router.delete("/{instanceId}/workflows/{workflowId}") +@limiter.limit("30/minute") +def delete_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Delete a workflow.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + if not a2.deleteWorkflow(workflowId): + raise HTTPException(status_code=404, detail="Workflow not found") + return {"success": True} + + +# ------------------------------------------------------------------------- +# Runs and Resume +# ------------------------------------------------------------------------- + + +@router.get("/{instanceId}/workflows/{workflowId}/runs") +@limiter.limit("60/minute") +def get_workflow_runs( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Get runs for a workflow.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + if not a2.getWorkflow(workflowId): + raise HTTPException(status_code=404, detail="Workflow not found") + runs = a2.getRunsByWorkflow(workflowId) + return {"runs": runs} + + +@router.post("/{instanceId}/runs/{runId}/resume") +@limiter.limit("30/minute") +async def resume_run( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + runId: str = Path(..., description="Run ID"), + body: dict = Body(..., description="{ taskId, result }"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Resume a paused run after task completion.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + run = a2.getRun(runId) + if not run: + raise HTTPException(status_code=404, detail="Run not found") + taskId = body.get("taskId") + result = body.get("result") + if not taskId or result is None: + raise HTTPException(status_code=400, detail="taskId and result required") + task = a2.getTask(taskId) + if not task or task.get("runId") != runId: + raise HTTPException(status_code=404, detail="Task not found") + if task.get("status") != "pending": + raise HTTPException(status_code=400, detail="Task already completed") + a2.updateTask(taskId, status="completed", result=result) + nodeId = task.get("nodeId") + nodeOutputs = dict(run.get("nodeOutputs") or {}) + nodeOutputs[nodeId] = result + runContext = run.get("context") or {} + connectionMap = runContext.get("connectionMap", {}) + inputSources = runContext.get("inputSources", {}) + workflowId = run.get("workflowId") + wf = a2.getWorkflow(workflowId) if workflowId else None + if not wf or not wf.get("graph"): + raise HTTPException(status_code=400, detail="Workflow graph not found") + graph = wf["graph"] + services = getAutomation2Services(context.user, mandateId=mandateId, featureInstanceId=instanceId) + resume_result = await executeGraph( graph=graph, services=services, workflowId=workflowId, instanceId=instanceId, userId=str(context.user.id) if context.user else None, mandateId=mandateId, + automation2_interface=a2, + initialNodeOutputs=nodeOutputs, + startAfterNodeId=nodeId, + runId=runId, + ) + return resume_result + + +# ------------------------------------------------------------------------- +# Tasks +# ------------------------------------------------------------------------- + + +@router.get("/{instanceId}/tasks") +@limiter.limit("60/minute") +def get_tasks( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Query(None, description="Filter by workflow ID"), + status: str = Query(None, description="Filter: pending, completed, rejected"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Get tasks - by default those assigned to current user, or all if no assignee filter.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + assigneeId = str(context.user.id) if context.user else None + items = a2.getTasks(workflowId=workflowId, status=status, assigneeId=assigneeId) + return {"tasks": items} + + +@router.post("/{instanceId}/tasks/{taskId}/complete") +@limiter.limit("30/minute") +async def complete_task( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + taskId: str = Path(..., description="Task ID"), + body: dict = Body(..., description="{ result }"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Complete a task and resume the run.""" + mandateId = _validateInstanceAccess(instanceId, context) + a2 = getAutomation2Interface(context.user, mandateId, instanceId) + task = a2.getTask(taskId) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + runId = task.get("runId") + result = body.get("result") + if result is None: + raise HTTPException(status_code=400, detail="result required") + run = a2.getRun(runId) + if not run: + raise HTTPException(status_code=404, detail="Run not found") + if task.get("status") != "pending": + raise HTTPException(status_code=400, detail="Task already completed") + a2.updateTask(taskId, status="completed", result=result) + nodeId = task.get("nodeId") + nodeOutputs = dict(run.get("nodeOutputs") or {}) + nodeOutputs[nodeId] = result + workflowId = run.get("workflowId") + wf = a2.getWorkflow(workflowId) if workflowId else None + if not wf or not wf.get("graph"): + raise HTTPException(status_code=400, detail="Workflow graph not found") + graph = wf["graph"] + services = getAutomation2Services(context.user, mandateId=mandateId, featureInstanceId=instanceId) + return await executeGraph( + graph=graph, + services=services, + workflowId=workflowId, + instanceId=instanceId, + userId=str(context.user.id) if context.user else None, + mandateId=mandateId, + automation2_interface=a2, + initialNodeOutputs=nodeOutputs, + startAfterNodeId=nodeId, + runId=runId, ) - return result diff --git a/modules/interfaces/interfaceRbac.py b/modules/interfaces/interfaceRbac.py index b4c9a3b4..14789523 100644 --- a/modules/interfaces/interfaceRbac.py +++ b/modules/interfaces/interfaceRbac.py @@ -72,6 +72,10 @@ TABLE_NAMESPACE = { # Automation - benutzer-eigen "AutomationDefinition": "automation", "AutomationTemplate": "automation", + # Automation2 - feature-scoped + "Automation2Workflow": "automation2", + "Automation2WorkflowRun": "automation2", + "Automation2HumanTask": "automation2", # Knowledge Store - benutzer-eigen "FileContentIndex": "knowledge", "ContentChunk": "knowledge", diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index 35e06ad5..61701e28 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -2,7 +2,7 @@ # Main execution engine for automation2 graphs. import logging -from typing import Dict, Any, List, Set +from typing import Dict, Any, List, Set, Optional from modules.workflows.automation2.graphUtils import ( parseGraph, @@ -17,6 +17,8 @@ from modules.workflows.automation2.executors import ( FlowExecutor, DataExecutor, IOExecutor, + InputExecutor, + PauseForHumanTaskError, ) from modules.features.automation2.nodeDefinitions import STATIC_NODE_TYPES from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods @@ -40,7 +42,11 @@ def _getNodeTypeIds(services: Any) -> Set[str]: return ids -def _getExecutor(nodeType: str, services: Any) -> Any: +def _getExecutor( + nodeType: str, + services: Any, + automation2_interface: Optional[Any] = None, +) -> Any: """Dispatch to correct executor based on node type.""" if nodeType.startswith("trigger."): return TriggerExecutor() @@ -50,6 +56,8 @@ def _getExecutor(nodeType: str, services: Any) -> Any: return DataExecutor() if nodeType.startswith("io."): return IOExecutor(services) + if nodeType.startswith("input.") and automation2_interface: + return InputExecutor(automation2_interface) return None @@ -60,21 +68,57 @@ async def executeGraph( instanceId: str = None, userId: str = None, mandateId: str = None, + automation2_interface: Optional[Any] = None, + initialNodeOutputs: Optional[Dict[str, Any]] = None, + startAfterNodeId: Optional[str] = None, + runId: Optional[str] = None, ) -> Dict[str, Any]: """ Execute automation2 graph. Returns { success, nodeOutputs, error?, stopped? }. + When an input node is reached and automation2_interface is provided, creates a task, + pauses the run, and returns { success: False, paused: True, taskId, runId }. + For resume: pass initialNodeOutputs (with result for the human node) and startAfterNodeId. """ + logger.info( + "executeGraph start: instanceId=%s workflowId=%s userId=%s mandateId=%s resume=%s", + instanceId, + workflowId, + userId, + mandateId, + startAfterNodeId is not None, + ) nodeTypeIds = _getNodeTypeIds(services) + logger.debug("executeGraph nodeTypeIds (%d): %s", len(nodeTypeIds), sorted(nodeTypeIds)) errors = validateGraph(graph, nodeTypeIds) if errors: + logger.warning("executeGraph validation failed: %s", errors) return {"success": False, "error": "; ".join(errors), "nodeOutputs": {}} nodes, connections = parseGraph(graph)[:2] connectionMap = buildConnectionMap(connections) inputSources = {n["id"]: getInputSources(n["id"], connectionMap) for n in nodes if n.get("id")} + logger.info( + "executeGraph parsed: nodes=%d connections=%d connectionMap_targets=%s", + len(nodes), + len(connections), + list(connectionMap.keys()), + ) ordered = topoSort(nodes, connectionMap) - nodeOutputs: Dict[str, Any] = {} + ordered_ids = [n.get("id") for n in ordered if n.get("id")] + logger.info("executeGraph topoSort order: %s", ordered_ids) + + nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {}) + is_resume = startAfterNodeId is not None + if not runId and automation2_interface and workflowId and not is_resume: + run = automation2_interface.createRun( + workflowId=workflowId, + nodeOutputs=nodeOutputs, + context={"connectionMap": connectionMap, "inputSources": inputSources, "orderedNodeIds": ordered_ids}, + ) + runId = run.get("id") if run else None + logger.info("executeGraph created run %s", runId) + context = { "workflowId": workflowId, "instanceId": instanceId, @@ -84,23 +128,58 @@ async def executeGraph( "connectionMap": connectionMap, "inputSources": inputSources, "services": services, + "_runId": runId, + "_orderedNodes": ordered, } - for node in ordered: + skip_until_passed = bool(startAfterNodeId) + for i, node in enumerate(ordered): + if skip_until_passed: + if node.get("id") == startAfterNodeId: + skip_until_passed = False + continue if context.get("_stopped"): + logger.info("executeGraph stopped early (flow.stop) at step %d", i) break nodeId = node.get("id") nodeType = node.get("type", "") - executor = _getExecutor(nodeType, services) + executor = _getExecutor(nodeType, services, automation2_interface) + logger.info( + "executeGraph step %d/%d: nodeId=%s nodeType=%s executor=%s", + i + 1, + len(ordered), + nodeId, + nodeType, + type(executor).__name__ if executor else "None", + ) if not executor: nodeOutputs[nodeId] = None + logger.debug("executeGraph node %s: no executor, output=None", nodeId) continue try: result = await executor.execute(node, context) nodeOutputs[nodeId] = result + logger.info( + "executeGraph node %s done: result_type=%s result_keys=%s", + nodeId, + type(result).__name__, + list(result.keys()) if isinstance(result, dict) else "n/a", + ) + except PauseForHumanTaskError as e: + logger.info("executeGraph paused for human task %s", e.taskId) + return { + "success": False, + "paused": True, + "taskId": e.taskId, + "runId": e.runId, + "nodeId": e.nodeId, + "nodeOutputs": dict(nodeOutputs), + } except Exception as e: - logger.exception(f"automation2 execution failed for node {nodeId} ({nodeType})") + logger.exception("executeGraph node %s (%s) FAILED: %s", nodeId, nodeType, e) nodeOutputs[nodeId] = {"error": str(e), "success": False} + if runId and automation2_interface: + automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs) return { "success": False, "error": str(e), @@ -108,6 +187,13 @@ async def executeGraph( "failedNode": nodeId, } + if runId and automation2_interface: + automation2_interface.updateRun(runId, status="completed", nodeOutputs=nodeOutputs) + logger.info( + "executeGraph complete: success=True nodeOutputs_keys=%s stopped=%s", + list(nodeOutputs.keys()), + context.get("_stopped", False), + ) return { "success": True, "nodeOutputs": nodeOutputs, diff --git a/modules/workflows/automation2/executors/__init__.py b/modules/workflows/automation2/executors/__init__.py index 40d4c49c..ef6b0c5f 100644 --- a/modules/workflows/automation2/executors/__init__.py +++ b/modules/workflows/automation2/executors/__init__.py @@ -5,5 +5,13 @@ from .triggerExecutor import TriggerExecutor from .flowExecutor import FlowExecutor from .dataExecutor import DataExecutor from .ioExecutor import IOExecutor +from .inputExecutor import InputExecutor, PauseForHumanTaskError -__all__ = ["TriggerExecutor", "FlowExecutor", "DataExecutor", "IOExecutor"] +__all__ = [ + "TriggerExecutor", + "FlowExecutor", + "DataExecutor", + "IOExecutor", + "InputExecutor", + "PauseForHumanTaskError", +] diff --git a/modules/workflows/automation2/executors/dataExecutor.py b/modules/workflows/automation2/executors/dataExecutor.py index b8c8eb59..386c8abd 100644 --- a/modules/workflows/automation2/executors/dataExecutor.py +++ b/modules/workflows/automation2/executors/dataExecutor.py @@ -36,6 +36,13 @@ class DataExecutor: nodeId = node.get("id", "") inputSources = context.get("inputSources", {}).get(nodeId, {}) params = node.get("parameters") or {} + logger.info( + "DataExecutor node %s type=%s inputSources=%s params=%s", + nodeId, + nodeType, + inputSources, + params, + ) inp = None if 0 in inputSources: @@ -46,14 +53,23 @@ class DataExecutor: resolvedParams = {k: resolveParameterReferences(v, nodeOutputs) for k, v in params.items()} if nodeType == "data.setFields": - return self._setFields(inp, resolvedParams) + out = self._setFields(inp, resolvedParams) + logger.info("DataExecutor node %s setFields inp=%s -> %s", nodeId, type(inp).__name__, out) + return out if nodeType == "data.filter": - return self._filter(inp, resolvedParams) + out = self._filter(inp, resolvedParams) + logger.info("DataExecutor node %s filter inp=%s -> len=%d", nodeId, type(inp).__name__, len(out) if isinstance(out, list) else -1) + return out if nodeType == "data.parseJson": - return self._parseJson(inp, resolvedParams) + out = self._parseJson(inp, resolvedParams) + logger.info("DataExecutor node %s parseJson -> %s", nodeId, type(out).__name__) + return out if nodeType == "data.template": - return self._template(inp, resolvedParams, nodeOutputs) + out = self._template(inp, resolvedParams, nodeOutputs) + logger.info("DataExecutor node %s template -> %s", nodeId, out) + return out + logger.debug("DataExecutor node %s unhandled type %s -> passThrough", nodeId, nodeType) return inp def _setFields(self, inp: Any, params: Dict) -> Any: diff --git a/modules/workflows/automation2/executors/flowExecutor.py b/modules/workflows/automation2/executors/flowExecutor.py index 11b1f92a..de5789e5 100644 --- a/modules/workflows/automation2/executors/flowExecutor.py +++ b/modules/workflows/automation2/executors/flowExecutor.py @@ -21,21 +21,40 @@ class FlowExecutor: connectionMap = context.get("connectionMap", {}) nodeId = node.get("id", "") inputSources = context.get("inputSources", {}).get(nodeId, {}) + logger.info( + "FlowExecutor node %s type=%s inputSources=%s params=%s", + nodeId, + nodeType, + inputSources, + node.get("parameters"), + ) if nodeType == "flow.ifElse": - return await self._ifElse(node, nodeOutputs, nodeId, inputSources) + out = await self._ifElse(node, nodeOutputs, nodeId, inputSources) + logger.info("FlowExecutor node %s ifElse -> %s", nodeId, out) + return out if nodeType == "flow.merge": - return await self._merge(node, nodeOutputs, nodeId, inputSources) + out = await self._merge(node, nodeOutputs, nodeId, inputSources) + logger.info("FlowExecutor node %s merge -> %s", nodeId, out) + return out if nodeType == "flow.wait": - return await self._wait(node, nodeOutputs, nodeId, inputSources) + out = await self._wait(node, nodeOutputs, nodeId, inputSources) + logger.info("FlowExecutor node %s wait -> %s", nodeId, out) + return out if nodeType == "flow.stop": context["_stopped"] = True + logger.info("FlowExecutor node %s -> STOP", nodeId) return {"stopped": True} if nodeType == "flow.switch": - return await self._switch(node, nodeOutputs, nodeId, inputSources) + out = await self._switch(node, nodeOutputs, nodeId, inputSources) + logger.info("FlowExecutor node %s switch -> %s", nodeId, out) + return out if nodeType == "flow.loop": - return await self._loop(node, nodeOutputs, nodeId, inputSources) + out = await self._loop(node, nodeOutputs, nodeId, inputSources) + logger.info("FlowExecutor node %s loop -> %s", nodeId, out) + return out + logger.debug("FlowExecutor node %s unhandled type %s -> None", nodeId, nodeType) return None def _getInputData(self, nodeId: str, inputSources: Dict, nodeOutputs: Dict, outputIndex: int = 0) -> Any: diff --git a/modules/workflows/automation2/executors/inputExecutor.py b/modules/workflows/automation2/executors/inputExecutor.py new file mode 100644 index 00000000..e873b6d8 --- /dev/null +++ b/modules/workflows/automation2/executors/inputExecutor.py @@ -0,0 +1,70 @@ +# Copyright (c) 2025 Patrick Motsch +# Input/Human node executor - creates tasks and pauses execution. + +import logging +from typing import Dict, Any + +logger = logging.getLogger(__name__) + + +class PauseForHumanTaskError(Exception): + """Raised when execution must pause for a human task. Contains runId, taskId.""" + + def __init__(self, runId: str, taskId: str, nodeId: str): + self.runId = runId + self.taskId = taskId + self.nodeId = nodeId + super().__init__(f"Pause for human task {taskId} (run {runId}, node {nodeId})") + + +class InputExecutor: + """ + Execute input/human nodes. Creates a HumanTask, pauses the run, and raises + PauseForHumanTaskError so the engine returns { paused: true, taskId, runId }. + """ + + def __init__(self, automation2_interface: Any): + self.automation2 = automation2_interface + + async def execute( + self, + node: Dict[str, Any], + context: Dict[str, Any], + ) -> Any: + nodeType = node.get("type", "") + nodeId = node.get("id", "") + runId = context.get("_runId") + workflowId = context.get("workflowId") + instanceId = context.get("instanceId") + userId = context.get("userId") + + if not runId or not workflowId: + logger.error("InputExecutor: runId/workflowId missing in context - cannot create task") + return {"error": "Missing run context", "success": False} + + config = dict(node.get("parameters") or {}) + logger.info("InputExecutor node %s type=%s creating task", nodeId, nodeType) + + task = self.automation2.createTask( + runId=runId, + workflowId=workflowId, + nodeId=nodeId, + nodeType=nodeType, + config=config, + assigneeId=userId, + ) + taskId = task.get("id") + + self.automation2.updateRun( + runId, + status="paused", + nodeOutputs=context.get("nodeOutputs"), + currentNodeId=nodeId, + context={ + "connectionMap": context.get("connectionMap"), + "inputSources": context.get("inputSources"), + "orderedNodeIds": [n.get("id") for n in context.get("_orderedNodes", []) if n.get("id")], + }, + ) + logger.info("InputExecutor node %s: created task %s, run %s paused", nodeId, taskId, runId) + raise PauseForHumanTaskError(runId=runId, taskId=taskId, nodeId=nodeId) diff --git a/modules/workflows/automation2/executors/ioExecutor.py b/modules/workflows/automation2/executors/ioExecutor.py index b72771e6..eb006c7e 100644 --- a/modules/workflows/automation2/executors/ioExecutor.py +++ b/modules/workflows/automation2/executors/ioExecutor.py @@ -21,21 +21,26 @@ class IOExecutor: from modules.workflows.processing.core.actionExecutor import ActionExecutor nodeType = node.get("type", "") + nodeId = node.get("id", "") + logger.info("IOExecutor node %s type=%s", nodeId, nodeType) if not nodeType.startswith("io."): + logger.debug("IOExecutor node %s not io.* -> None", nodeId) return None parts = nodeType.split(".", 2) if len(parts) < 3: + logger.debug("IOExecutor node %s invalid type parts -> None", nodeId) return None _, methodName, actionName = parts + logger.info("IOExecutor node %s method=%s action=%s", nodeId, methodName, actionName) nodeOutputs = context.get("nodeOutputs", {}) params = dict(node.get("parameters") or {}) from modules.workflows.automation2.graphUtils import resolveParameterReferences resolvedParams = resolveParameterReferences(params, nodeOutputs) + logger.info("IOExecutor node %s resolvedParams keys=%s", nodeId, list(resolvedParams.keys())) - nodeId = node.get("id", "") inputSources = context.get("inputSources", {}).get(nodeId, {}) if 0 in inputSources: srcId, _ = inputSources[0] @@ -46,11 +51,19 @@ class IOExecutor: resolvedParams.setdefault("input", inp) executor = ActionExecutor(self.services) + logger.info("IOExecutor node %s calling executeAction(%s, %s)", nodeId, methodName, actionName) result = await executor.executeAction(methodName, actionName, resolvedParams) - - return { + out = { "success": result.success, "error": result.error, "documents": [d.model_dump() if hasattr(d, "model_dump") else d for d in (result.documents or [])], "data": result.model_dump() if hasattr(result, "model_dump") else {"success": result.success, "error": result.error}, } + logger.info( + "IOExecutor node %s result: success=%s error=%s doc_count=%d", + nodeId, + result.success, + result.error, + len(out.get("documents", [])), + ) + return out diff --git a/modules/workflows/automation2/executors/triggerExecutor.py b/modules/workflows/automation2/executors/triggerExecutor.py index ec7227e9..87ac359e 100644 --- a/modules/workflows/automation2/executors/triggerExecutor.py +++ b/modules/workflows/automation2/executors/triggerExecutor.py @@ -16,12 +16,22 @@ class TriggerExecutor: context: Dict[str, Any], ) -> Any: nodeType = node.get("type", "") + nodeId = node.get("id", "") + logger.info("TriggerExecutor node %s type=%s parameters=%s", nodeId, nodeType, node.get("parameters")) if nodeType == "trigger.manual": - return {"triggered": True, "source": "manual"} + out = {"triggered": True, "source": "manual"} + logger.info("TriggerExecutor node %s -> manual trigger: %s", nodeId, out) + return out if nodeType == "trigger.schedule": - return {"triggered": True, "source": "schedule"} + out = {"triggered": True, "source": "schedule"} + logger.info("TriggerExecutor node %s -> schedule trigger: %s", nodeId, out) + return out if nodeType == "trigger.formSubmit": params = node.get("parameters") or {} formId = params.get("formId", "") - return {"triggered": True, "source": "formSubmit", "formId": formId} - return {"triggered": True, "source": "unknown"} + out = {"triggered": True, "source": "formSubmit", "formId": formId} + logger.info("TriggerExecutor node %s -> formSubmit: %s", nodeId, out) + return out + out = {"triggered": True, "source": "unknown"} + logger.info("TriggerExecutor node %s -> unknown: %s", nodeId, out) + return out diff --git a/modules/workflows/automation2/graphUtils.py b/modules/workflows/automation2/graphUtils.py index ceed2efa..ad58c69c 100644 --- a/modules/workflows/automation2/graphUtils.py +++ b/modules/workflows/automation2/graphUtils.py @@ -16,6 +16,12 @@ def parseGraph(graph: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Set[str]] nodes = graph.get("nodes") or [] connections = graph.get("connections") or [] nodeIds = {n.get("id") for n in nodes if n.get("id")} + logger.debug( + "parseGraph: nodes=%d connections=%d nodeIds=%s", + len(nodes), + len(connections), + sorted(nodeIds), + ) return nodes, connections, nodeIds @@ -25,16 +31,19 @@ def buildConnectionMap(connections: List[Dict]) -> Dict[str, List[Tuple[str, int connection: { source, sourceOutput?, target, targetInput? } """ out: Dict[str, List[Tuple[str, int, int]]] = {} - for c in connections: + for i, c in enumerate(connections): src = c.get("source") or c.get("sourceNode") tgt = c.get("target") or c.get("targetNode") if not src or not tgt: + logger.debug("buildConnectionMap skip conn[%d]: missing source/target %r", i, c) continue so = c.get("sourceOutput", 0) ti = c.get("targetInput", 0) if tgt not in out: out[tgt] = [] out[tgt].append((src, so, ti)) + logger.debug("buildConnectionMap conn[%d]: %s -> %s (so=%d ti=%d)", i, src, tgt, so, ti) + logger.debug("buildConnectionMap result: %s", {k: v for k, v in out.items()}) return out @@ -83,6 +92,10 @@ def validateGraph(graph: Dict[str, Any], nodeTypeIds: Set[str]) -> List[str]: if nid not in nodeIds: errors.append(f"Connection references non-existent node {nid}") + if errors: + logger.debug("validateGraph errors: %s", errors) + else: + logger.debug("validateGraph: OK") return errors @@ -118,13 +131,14 @@ def topoSort(nodes: List[Dict], connectionMap: Dict[str, List[Tuple[str, int, in order.append(nodeById[tgt]) triggerIds = [t["id"] for t in triggers] + logger.debug("topoSort triggers: %s", triggerIds) bfs(triggerIds) # Append any orphan nodes (e.g. disconnected) for n in nodes: if n.get("id") and n["id"] not in visited: order.append(n) - + logger.debug("topoSort order (%d nodes): %s", len(order), [n.get("id") for n in order]) return order diff --git a/scripts/script_db_init_automation2.py b/scripts/script_db_init_automation2.py new file mode 100644 index 00000000..56d0daaf --- /dev/null +++ b/scripts/script_db_init_automation2.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Initialize poweron_automation2 database for the Automation2 feature. + +Creates the poweron_automation2 database if it does not exist. +Uses DB_* config. Tables (Automation2Workflow, Automation2WorkflowRun, +Automation2HumanTask) are auto-created by the connector on first use. + +Usage: + python scripts/script_db_init_automation2.py [--dry-run] +""" + +import os +import sys +import argparse +import logging +from pathlib import Path + +scriptPath = Path(__file__).resolve() +gatewayPath = scriptPath.parent.parent +sys.path.insert(0, str(gatewayPath)) +os.chdir(str(gatewayPath)) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +import psycopg2 +from modules.shared.configuration import APP_CONFIG + +DB_NAME = "poweron_automation2" + + +def _get_config(): + """Get DB config from APP_CONFIG.""" + host = APP_CONFIG.get("DB_HOST", "localhost") + port = int(APP_CONFIG.get("DB_PORT", "5432")) + user = APP_CONFIG.get("DB_USER") + password = ( + APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD") + ) + return {"host": host, "port": port, "user": user, "password": password} + + +def init_automation2_db(dry_run: bool = False) -> bool: + """Create poweron_automation2 database if it does not exist.""" + config = _get_config() + if not config["user"] or not config["password"]: + logger.error("DB_USER and DB_PASSWORD required") + return False + + try: + conn = psycopg2.connect( + host=config["host"], + port=config["port"], + database="postgres", + user=config["user"], + password=config["password"], + ) + conn.autocommit = True + + with conn.cursor() as cur: + cur.execute( + "SELECT 1 FROM pg_database WHERE datname = %s", + (DB_NAME,), + ) + exists = cur.fetchone() is not None + + if exists: + logger.info("Database %s already exists", DB_NAME) + else: + if dry_run: + logger.info("[DRY-RUN] Would create database %s", DB_NAME) + else: + cur.execute(f'CREATE DATABASE "{DB_NAME}"') + logger.info("Created database %s", DB_NAME) + + conn.close() + return True + except Exception as e: + logger.error("Failed to init %s: %s", DB_NAME, e) + return False + + +def main(): + parser = argparse.ArgumentParser(description="Initialize poweron_automation2 database") + parser.add_argument("--dry-run", action="store_true", help="Do not create, only report") + args = parser.parse_args() + ok = init_automation2_db(dry_run=args.dry_run) + sys.exit(0 if ok else 1) + + +if __name__ == "__main__": + main()