# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Interface for GraphicalEditor feature - Workflows, Runs, Human Tasks. Uses PostgreSQL poweron_graphicaleditor database (Greenfield). """ import base64 import logging import uuid from typing import Dict, Any, List, Optional def _make_json_serializable(obj: Any) -> Any: """ Recursively convert bytes to base64 strings so structures can be JSON-serialized for storage in JSONB columns. """ if isinstance(obj, bytes): return base64.b64encode(obj).decode("ascii") if isinstance(obj, dict): return {k: _make_json_serializable(v) for k, v in obj.items()} if isinstance(obj, list): return [_make_json_serializable(v) for v in obj] return obj from modules.datamodels.datamodelUam import User from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import ( AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask, AutoWorkflow as Automation2Workflow, AutoRun as Automation2WorkflowRun, AutoTask as Automation2HumanTask, ) from modules.features.graphicalEditor.entryPoints import normalize_invocations_list from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) _GREENFIELD_DB = "poweron_graphicaleditor" _CALLBACK_WORKFLOW_CHANGED = "graphicalEditor.workflow.changed" def getGraphicalEditorInterface( currentUser: User, mandateId: str, featureInstanceId: str, ) -> "GraphicalEditorObjects": """Factory for GraphicalEditor interface with user context.""" return GraphicalEditorObjects( currentUser=currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId, ) # Backward-compatible alias used by workflows/automation2/ execution engine getAutomation2Interface = getGraphicalEditorInterface def getAllWorkflowsForScheduling() -> List[Dict[str, Any]]: """ Get all active workflows that have a schedule entry point (primary invocation). Used by the scheduler to register cron jobs. Does not filter by mandate/instance. """ dbHost = APP_CONFIG.get("DB_HOST", "localhost") dbDatabase = _GREENFIELD_DB dbUser = APP_CONFIG.get("DB_USER") dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD") dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) connector = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=None, ) if not connector._ensureTableExists(Automation2Workflow): logger.warning("GraphicalEditor schedule: table Automation2Workflow does not exist yet") return [] records = connector.getRecordset( Automation2Workflow, recordFilter=None, ) raw_count = len(records) if records else 0 result = [] for r in records or []: if r.get("active") is False: continue wf = dict(r) wf["invocations"] = normalize_invocations_list(wf.get("invocations")) invocations = wf.get("invocations") or [] primary = invocations[0] if invocations else {} if not isinstance(primary, dict): primary = {} graph = wf.get("graph") or {} nodes = graph.get("nodes") or [] cron = None for n in nodes: if n.get("type") == "trigger.schedule": params = n.get("parameters") or {} cron = params.get("cron") if cron: break if not cron or not isinstance(cron, str) or not cron.strip(): continue if primary.get("kind") == "schedule" and primary.get("enabled", True): entry_point_id = primary.get("id") elif invocations and isinstance(invocations[0], dict) and invocations[0].get("id"): entry_point_id = invocations[0].get("id") else: entry_point_id = str(uuid.uuid4()) result.append({ "workflowId": wf.get("id"), "mandateId": wf.get("mandateId"), "featureInstanceId": wf.get("featureInstanceId"), "entryPointId": entry_point_id, "cron": cron.strip(), "workflow": wf, }) logger.info( "GraphicalEditor schedule: DB has %d workflow(s), %d active with trigger.schedule+cron", raw_count, len(result), ) return result class GraphicalEditorObjects: """Interface for GraphicalEditor database operations (Greenfield DB).""" def __init__( self, currentUser: User, mandateId: str, featureInstanceId: str, ): self.currentUser = currentUser self.mandateId = mandateId self.featureInstanceId = featureInstanceId self.userId = currentUser.id if currentUser else None self._init_db() if hasattr(self.db, "updateContext") and self.userId: self.db.updateContext(self.userId) def _init_db(self): """Initialize database connection to poweron_graphicaleditor (Greenfield).""" dbHost = APP_CONFIG.get("DB_HOST", "localhost") dbDatabase = _GREENFIELD_DB dbUser = APP_CONFIG.get("DB_USER") dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD") dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=self.userId, ) logger.debug("GraphicalEditor database initialized for user %s", self.userId) # ------------------------------------------------------------------------- # Workflow CRUD # ------------------------------------------------------------------------- def getWorkflows(self, active: Optional[bool] = None) -> List[Dict[str, Any]]: """Get all workflows for this mandate and feature instance.""" if not self.db._ensureTableExists(Automation2Workflow): return [] rf: Dict[str, Any] = { "mandateId": self.mandateId, "featureInstanceId": self.featureInstanceId, } if active is not None: rf["active"] = active records = self.db.getRecordset( Automation2Workflow, recordFilter=rf, ) rows = [dict(r) for r in records] if records else [] for wf in rows: wf["invocations"] = normalize_invocations_list(wf.get("invocations")) return rows def getWorkflow(self, workflowId: str) -> Optional[Dict[str, Any]]: """Get a single workflow by ID.""" if not self.db._ensureTableExists(Automation2Workflow): return None records = self.db.getRecordset( Automation2Workflow, recordFilter={ "id": workflowId, "mandateId": self.mandateId, "featureInstanceId": self.featureInstanceId, }, ) if not records: return None wf = dict(records[0]) wf["invocations"] = normalize_invocations_list(wf.get("invocations")) return wf def createWorkflow(self, data: Dict[str, Any]) -> Dict[str, Any]: """Create a new workflow.""" if "id" not in data or not data.get("id"): data["id"] = str(uuid.uuid4()) data["mandateId"] = self.mandateId data["featureInstanceId"] = self.featureInstanceId if "active" not in data or data.get("active") is None: data["active"] = True data["invocations"] = normalize_invocations_list(data.get("invocations")) created = self.db.recordCreate(Automation2Workflow, data) out = dict(created) out["invocations"] = normalize_invocations_list(out.get("invocations")) try: from modules.shared.callbackRegistry import callbackRegistry callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED) except Exception: pass return out def updateWorkflow(self, workflowId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update an existing workflow.""" existing = self.getWorkflow(workflowId) if not existing: return None data.pop("mandateId", None) data.pop("featureInstanceId", None) if "invocations" in data: data["invocations"] = normalize_invocations_list(data.get("invocations")) updated = self.db.recordModify(Automation2Workflow, workflowId, data) out = dict(updated) out["invocations"] = normalize_invocations_list(out.get("invocations")) try: from modules.shared.callbackRegistry import callbackRegistry callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED) except Exception: pass return out def deleteWorkflow(self, workflowId: str) -> bool: """Delete a workflow.""" existing = self.getWorkflow(workflowId) if not existing: return False self.db.recordDelete(Automation2Workflow, workflowId) try: from modules.shared.callbackRegistry import callbackRegistry callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED) except Exception: pass return True # ------------------------------------------------------------------------- # Workflow Runs # ------------------------------------------------------------------------- def createRun(self, workflowId: str, nodeOutputs: Dict = None, context: Dict = None) -> Dict[str, Any]: """Create a new workflow run.""" data = { "id": str(uuid.uuid4()), "workflowId": workflowId, "status": "running", "nodeOutputs": _make_json_serializable(nodeOutputs or {}), "currentNodeId": None, "context": context or {}, } created = self.db.recordCreate(Automation2WorkflowRun, data) return dict(created) def getRun(self, runId: str) -> Optional[Dict[str, Any]]: """Get a run by ID.""" if not self.db._ensureTableExists(Automation2WorkflowRun): return None records = self.db.getRecordset( Automation2WorkflowRun, recordFilter={"id": runId}, ) if not records: return None return dict(records[0]) def updateRun( self, runId: str, status: str = None, nodeOutputs: Dict = None, currentNodeId: str = None, context: Dict = None, ) -> Optional[Dict[str, Any]]: """Update a run.""" run = self.getRun(runId) if not run: return None updates = {} if status is not None: updates["status"] = status if nodeOutputs is not None: updates["nodeOutputs"] = _make_json_serializable(nodeOutputs) if currentNodeId is not None: updates["currentNodeId"] = currentNodeId if context is not None: updates["context"] = context if not updates: return run updated = self.db.recordModify(Automation2WorkflowRun, runId, updates) return dict(updated) def getRunsByWorkflow(self, workflowId: str) -> List[Dict[str, Any]]: """Get all runs for a workflow.""" if not self.db._ensureTableExists(Automation2WorkflowRun): return [] records = self.db.getRecordset( Automation2WorkflowRun, recordFilter={"workflowId": workflowId}, ) return [dict(r) for r in records] if records else [] def getRecentCompletedRuns(self, limit: int = 20) -> List[Dict[str, Any]]: """Get recently completed runs for workflows in this instance.""" if not self.db._ensureTableExists(Automation2WorkflowRun): return [] workflows = self.getWorkflows() wf_ids = [w["id"] for w in workflows if w.get("id")] if not wf_ids: return [] records = self.db.getRecordset( Automation2WorkflowRun, recordFilter={"status": "completed"}, ) if not records: return [] runs = [dict(r) for r in records if r.get("workflowId") in wf_ids] wf_by_id = {w["id"]: w for w in workflows} for r in runs: wf = wf_by_id.get(r.get("workflowId"), {}) r["workflowLabel"] = wf.get("label") or r.get("workflowId", "") runs.sort( key=lambda x: (x.get("sysModifiedAt") or x.get("sysCreatedAt") or 0), reverse=True, ) return runs[:limit] def getRunsWaitingForEmail(self) -> List[Dict[str, Any]]: """Get all paused runs waiting for a new email (for background poller).""" if not self.db._ensureTableExists(Automation2WorkflowRun): return [] records = self.db.getRecordset( Automation2WorkflowRun, recordFilter={"status": "paused"}, ) if not records: return [] result = [] for r in records: rec = dict(r) ctx = rec.get("context") or {} if ctx.get("waitReason") == "email": result.append(rec) return result # ------------------------------------------------------------------------- # Human Tasks # ------------------------------------------------------------------------- def createTask( self, runId: str, workflowId: str, nodeId: str, nodeType: str, config: Dict[str, Any], assigneeId: str = None, ) -> Dict[str, Any]: """Create a human task.""" data = { "id": str(uuid.uuid4()), "runId": runId, "workflowId": workflowId, "nodeId": nodeId, "nodeType": nodeType, "config": config, "assigneeId": assigneeId, "status": "pending", "result": None, } created = self.db.recordCreate(Automation2HumanTask, data) return dict(created) def getTask(self, taskId: str) -> Optional[Dict[str, Any]]: """Get a task by ID.""" if not self.db._ensureTableExists(Automation2HumanTask): return None records = self.db.getRecordset( Automation2HumanTask, recordFilter={"id": taskId}, ) if not records: return None return dict(records[0]) def updateTask(self, taskId: str, status: str = None, result: Dict = None) -> Optional[Dict[str, Any]]: """Update a task (e.g. complete with result).""" task = self.getTask(taskId) if not task: return None updates = {} if status is not None: updates["status"] = status if result is not None: updates["result"] = result if not updates: return task updated = self.db.recordModify(Automation2HumanTask, taskId, updates) return dict(updated) def getTasks( self, workflowId: str = None, runId: str = None, status: str = None, assigneeId: str = None, ) -> List[Dict[str, Any]]: """Get tasks with optional filters.""" if not self.db._ensureTableExists(Automation2HumanTask): return [] base_rf: Dict[str, Any] = {} if workflowId: base_rf["workflowId"] = workflowId if runId: base_rf["runId"] = runId if status: base_rf["status"] = status if assigneeId: rf_assigned = {**base_rf, "assigneeId": assigneeId} rf_unassigned = {**base_rf, "assigneeId": None} records1 = self.db.getRecordset(Automation2HumanTask, recordFilter=rf_assigned) records2 = self.db.getRecordset(Automation2HumanTask, recordFilter=rf_unassigned) seen = set() items = [] for r in (records1 or []) + (records2 or []): rec = dict(r) tid = rec.get("id") if tid and tid not in seen: seen.add(tid) items.append(rec) else: records = self.db.getRecordset( Automation2HumanTask, recordFilter=base_rf if base_rf else None, ) items = [dict(r) for r in records] if records else [] workflows = {w["id"]: w for w in self.getWorkflows()} filtered = [t for t in items if t.get("workflowId") in workflows] return filtered # ------------------------------------------------------------------------- # Versions (AutoVersion Lifecycle) # ------------------------------------------------------------------------- def getVersions(self, workflowId: str) -> List[Dict[str, Any]]: """Get all versions for a workflow, ordered by versionNumber desc.""" if not self.db._ensureTableExists(AutoVersion): return [] records = self.db.getRecordset(AutoVersion, recordFilter={"workflowId": workflowId}) versions = [dict(r) for r in records] if records else [] versions.sort(key=lambda v: v.get("versionNumber", 0), reverse=True) return versions def getVersion(self, versionId: str) -> Optional[Dict[str, Any]]: """Get a single version by ID.""" if not self.db._ensureTableExists(AutoVersion): return None record = self.db.getRecord(AutoVersion, versionId) return dict(record) if record else None def createDraftVersion(self, workflowId: str) -> Optional[Dict[str, Any]]: """Create a new draft version from the workflow's current graph.""" wf = self.getWorkflow(workflowId) if not wf: return None existing = self.getVersions(workflowId) nextNumber = max((v.get("versionNumber", 0) for v in existing), default=0) + 1 import time data = { "id": str(uuid.uuid4()), "workflowId": workflowId, "versionNumber": nextNumber, "status": "draft", "graph": wf.get("graph", {}), "invocations": wf.get("invocations", []), } created = self.db.recordCreate(AutoVersion, data) return dict(created) def publishVersion(self, versionId: str, userId: str = None) -> Optional[Dict[str, Any]]: """Publish a draft version. Archives the previously published version.""" version = self.getVersion(versionId) if not version or version.get("status") != "draft": return None workflowId = version.get("workflowId") existing = self.getVersions(workflowId) for v in existing: if v.get("status") == "published" and v.get("id") != versionId: self.db.recordModify(AutoVersion, v["id"], {"status": "archived"}) import time updated = self.db.recordModify(AutoVersion, versionId, { "status": "published", "publishedAt": time.time(), "publishedBy": userId, }) if workflowId: self.db.recordModify(AutoWorkflow, workflowId, { "currentVersionId": versionId, "graph": version.get("graph", {}), "invocations": version.get("invocations", []), }) return dict(updated) def unpublishVersion(self, versionId: str) -> Optional[Dict[str, Any]]: """Revert a published version back to draft status.""" version = self.getVersion(versionId) if not version or version.get("status") != "published": return None workflowId = version.get("workflowId") updated = self.db.recordModify(AutoVersion, versionId, { "status": "draft", "publishedAt": None, "publishedBy": None, }) if workflowId: self.db.recordModify(AutoWorkflow, workflowId, {"currentVersionId": None}) return dict(updated) def archiveVersion(self, versionId: str) -> Optional[Dict[str, Any]]: """Archive a version.""" version = self.getVersion(versionId) if not version: return None updated = self.db.recordModify(AutoVersion, versionId, {"status": "archived"}) return dict(updated) # ------------------------------------------------------------------------- # Templates # ------------------------------------------------------------------------- def getTemplates(self, scope: str = None) -> List[Dict[str, Any]]: """Get workflow templates, optionally filtered by scope. Always includes system-scope templates (mandateId=None) alongside mandate-owned ones. """ if not self.db._ensureTableExists(AutoWorkflow): return [] rf: Dict[str, Any] = { "mandateId": self.mandateId, "featureInstanceId": self.featureInstanceId, "isTemplate": True, } if scope: rf["templateScope"] = scope records = self.db.getRecordset(AutoWorkflow, recordFilter=rf) or [] if scope is None or scope == "system": systemFilter: Dict[str, Any] = { "isTemplate": True, "templateScope": "system", "mandateId": None, } systemRecords = self.db.getRecordset(AutoWorkflow, recordFilter=systemFilter) or [] seenIds = {(r.get("id") if isinstance(r, dict) else getattr(r, "id", None)) for r in records} for sr in systemRecords: srId = sr.get("id") if isinstance(sr, dict) else getattr(sr, "id", None) if srId not in seenIds: records.append(sr) return [dict(r) for r in records] if records else [] def createTemplateFromWorkflow(self, workflowId: str, scope: str = "user") -> Optional[Dict[str, Any]]: """Create a template by copying the published AutoVersion's graph (or workflow graph as fallback).""" wf = self.getWorkflow(workflowId) if not wf: return None graph = wf.get("graph", {}) invocations = wf.get("invocations", []) currentVersionId = wf.get("currentVersionId") if currentVersionId: version = self.getVersion(currentVersionId) if version: graph = version.get("graph", graph) invocations = version.get("invocations", invocations) data = { "id": str(uuid.uuid4()), "mandateId": self.mandateId, "featureInstanceId": self.featureInstanceId, "label": f"{wf.get('label', 'Workflow')} (Template)", "graph": graph, "invocations": invocations, "isTemplate": True, "templateScope": scope, "templateSourceId": workflowId, "active": False, } created = self.db.recordCreate(AutoWorkflow, data) return dict(created) def copyTemplateToUser(self, templateId: str) -> Optional[Dict[str, Any]]: """Copy a template to a new user-owned workflow with templateScope='user'.""" template = self.getWorkflow(templateId) if not template or not template.get("isTemplate"): return None data = { "id": str(uuid.uuid4()), "mandateId": self.mandateId, "featureInstanceId": self.featureInstanceId, "label": template.get("label", "Workflow").replace(" (Template)", ""), "graph": template.get("graph", {}), "invocations": template.get("invocations", []), "isTemplate": False, "templateSourceId": templateId, "templateScope": "user", "active": True, } created = self.db.recordCreate(AutoWorkflow, data) return dict(created) def shareTemplate(self, templateId: str, scope: str) -> Optional[Dict[str, Any]]: """Share a template by changing its scope and setting sharedReadOnly.""" template = self.getWorkflow(templateId) if not template or not template.get("isTemplate"): return None updated = self.db.recordModify(AutoWorkflow, templateId, { "templateScope": scope, "sharedReadOnly": True, }) return dict(updated) # Backward-compatible alias Automation2Objects = GraphicalEditorObjects