gateway/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py
2026-04-16 23:13:05 +02:00

658 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface for GraphicalEditor feature - Workflows, Runs, Human Tasks.
Uses PostgreSQL poweron_graphicaleditor database (Greenfield).
"""
import base64
import logging
import uuid
from typing import Dict, Any, List, Optional
def _make_json_serializable(obj: Any) -> Any:
"""
Recursively convert bytes to base64 strings so structures can be JSON-serialized
for storage in JSONB columns.
"""
if isinstance(obj, bytes):
return base64.b64encode(obj).decode("ascii")
if isinstance(obj, dict):
return {k: _make_json_serializable(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_make_json_serializable(v) for v in obj]
return obj
from modules.datamodels.datamodelUam import User
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
AutoWorkflow,
AutoVersion,
AutoRun,
AutoStepLog,
AutoTask,
AutoWorkflow as Automation2Workflow,
AutoRun as Automation2WorkflowRun,
AutoTask as Automation2HumanTask,
)
from modules.features.graphicalEditor.entryPoints import normalize_invocations_list
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.shared.dbRegistry import registerDatabase
logger = logging.getLogger(__name__)
graphicalEditorDatabase = "poweron_graphicaleditor"
registerDatabase(graphicalEditorDatabase)
_CALLBACK_WORKFLOW_CHANGED = "graphicalEditor.workflow.changed"
def getGraphicalEditorInterface(
currentUser: User,
mandateId: str,
featureInstanceId: str,
) -> "GraphicalEditorObjects":
"""Factory for GraphicalEditor interface with user context."""
return GraphicalEditorObjects(
currentUser=currentUser,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
)
# Backward-compatible alias used by workflows/automation2/ execution engine
getAutomation2Interface = getGraphicalEditorInterface
def getAllWorkflowsForScheduling() -> List[Dict[str, Any]]:
"""
Get all active workflows that have a schedule entry point (primary invocation).
Used by the scheduler to register cron jobs. Does not filter by mandate/instance.
"""
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
dbDatabase = graphicalEditorDatabase
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
connector = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=None,
)
if not connector._ensureTableExists(Automation2Workflow):
logger.warning("GraphicalEditor schedule: table Automation2Workflow does not exist yet")
return []
records = connector.getRecordset(
Automation2Workflow,
recordFilter=None,
)
raw_count = len(records) if records else 0
result = []
for r in records or []:
if r.get("active") is False:
continue
wf = dict(r)
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
invocations = wf.get("invocations") or []
primary = invocations[0] if invocations else {}
if not isinstance(primary, dict):
primary = {}
graph = wf.get("graph") or {}
nodes = graph.get("nodes") or []
cron = None
for n in nodes:
if n.get("type") == "trigger.schedule":
params = n.get("parameters") or {}
cron = params.get("cron")
if cron:
break
if not cron or not isinstance(cron, str) or not cron.strip():
continue
if primary.get("kind") == "schedule" and primary.get("enabled", True):
entry_point_id = primary.get("id")
elif invocations and isinstance(invocations[0], dict) and invocations[0].get("id"):
entry_point_id = invocations[0].get("id")
else:
entry_point_id = str(uuid.uuid4())
result.append({
"workflowId": wf.get("id"),
"mandateId": wf.get("mandateId"),
"featureInstanceId": wf.get("featureInstanceId"),
"entryPointId": entry_point_id,
"cron": cron.strip(),
"workflow": wf,
})
logger.info(
"GraphicalEditor schedule: DB has %d workflow(s), %d active with trigger.schedule+cron",
raw_count,
len(result),
)
return result
class GraphicalEditorObjects:
"""Interface for GraphicalEditor database operations (Greenfield DB)."""
def __init__(
self,
currentUser: User,
mandateId: str,
featureInstanceId: str,
):
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.userId = currentUser.id if currentUser else None
self._init_db()
if hasattr(self.db, "updateContext") and self.userId:
self.db.updateContext(self.userId)
def _init_db(self):
"""Initialize database connection to poweron_graphicaleditor (Greenfield)."""
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
dbDatabase = graphicalEditorDatabase
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
logger.debug("GraphicalEditor database initialized for user %s", self.userId)
# -------------------------------------------------------------------------
# Workflow CRUD
# -------------------------------------------------------------------------
def getWorkflows(self, active: Optional[bool] = None) -> List[Dict[str, Any]]:
"""Get all workflows for this mandate (cross-instance)."""
if not self.db._ensureTableExists(Automation2Workflow):
return []
rf: Dict[str, Any] = {
"mandateId": self.mandateId,
}
if active is not None:
rf["active"] = active
records = self.db.getRecordset(
Automation2Workflow,
recordFilter=rf,
)
rows = [dict(r) for r in records] if records else []
for wf in rows:
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
return rows
def getWorkflow(self, workflowId: str) -> Optional[Dict[str, Any]]:
"""Get a single workflow by ID (mandate-scoped, cross-instance)."""
if not self.db._ensureTableExists(Automation2Workflow):
return None
records = self.db.getRecordset(
Automation2Workflow,
recordFilter={
"id": workflowId,
"mandateId": self.mandateId,
},
)
if not records:
return None
wf = dict(records[0])
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
return wf
def createWorkflow(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new workflow."""
if "id" not in data or not data.get("id"):
data["id"] = str(uuid.uuid4())
data["mandateId"] = self.mandateId
data["featureInstanceId"] = self.featureInstanceId
if "active" not in data or data.get("active") is None:
data["active"] = True
data["invocations"] = normalize_invocations_list(data.get("invocations"))
created = self.db.recordCreate(Automation2Workflow, data)
out = dict(created)
out["invocations"] = normalize_invocations_list(out.get("invocations"))
try:
from modules.shared.callbackRegistry import callbackRegistry
callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED)
except Exception:
pass
return out
def updateWorkflow(self, workflowId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update an existing workflow."""
existing = self.getWorkflow(workflowId)
if not existing:
return None
data.pop("mandateId", None)
data.pop("featureInstanceId", None)
if "invocations" in data:
data["invocations"] = normalize_invocations_list(data.get("invocations"))
updated = self.db.recordModify(Automation2Workflow, workflowId, data)
out = dict(updated)
out["invocations"] = normalize_invocations_list(out.get("invocations"))
try:
from modules.shared.callbackRegistry import callbackRegistry
callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED)
except Exception:
pass
return out
def deleteWorkflow(self, workflowId: str) -> bool:
"""Delete a workflow."""
existing = self.getWorkflow(workflowId)
if not existing:
return False
self.db.recordDelete(Automation2Workflow, workflowId)
try:
from modules.shared.callbackRegistry import callbackRegistry
callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED)
except Exception:
pass
return True
# -------------------------------------------------------------------------
# Workflow Runs
# -------------------------------------------------------------------------
def createRun(self, workflowId: str, nodeOutputs: Dict = None, context: Dict = None, label: str = None) -> Dict[str, Any]:
"""Create a new workflow run.
*label* human-readable name persisted on the run. Callers should
pass the workflow label or a descriptive name; ``executeGraph`` fills
in a fallback when nothing is provided.
"""
ctx = context or {}
data = {
"id": str(uuid.uuid4()),
"workflowId": workflowId,
"label": label,
"status": "running",
"nodeOutputs": _make_json_serializable(nodeOutputs or {}),
"currentNodeId": None,
"context": ctx,
"mandateId": ctx.get("mandateId") or self.mandateId,
"ownerId": ctx.get("userId") or (self.currentUser.id if self.currentUser else None),
}
created = self.db.recordCreate(Automation2WorkflowRun, data)
return dict(created)
def getRun(self, runId: str) -> Optional[Dict[str, Any]]:
"""Get a run by ID."""
if not self.db._ensureTableExists(Automation2WorkflowRun):
return None
records = self.db.getRecordset(
Automation2WorkflowRun,
recordFilter={"id": runId},
)
if not records:
return None
return dict(records[0])
def updateRun(
self,
runId: str,
status: str = None,
nodeOutputs: Dict = None,
currentNodeId: str = None,
context: Dict = None,
) -> Optional[Dict[str, Any]]:
"""Update a run."""
run = self.getRun(runId)
if not run:
return None
updates = {}
if status is not None:
updates["status"] = status
if nodeOutputs is not None:
updates["nodeOutputs"] = _make_json_serializable(nodeOutputs)
if currentNodeId is not None:
updates["currentNodeId"] = currentNodeId
if context is not None:
updates["context"] = context
if not updates:
return run
updated = self.db.recordModify(Automation2WorkflowRun, runId, updates)
return dict(updated)
def getRunsByWorkflow(self, workflowId: str) -> List[Dict[str, Any]]:
"""Get all runs for a workflow."""
if not self.db._ensureTableExists(Automation2WorkflowRun):
return []
records = self.db.getRecordset(
Automation2WorkflowRun,
recordFilter={"workflowId": workflowId},
)
return [dict(r) for r in records] if records else []
def getRecentCompletedRuns(self, limit: int = 20) -> List[Dict[str, Any]]:
"""Get recent runs (all statuses) for workflows in this instance."""
if not self.db._ensureTableExists(Automation2WorkflowRun):
return []
workflows = self.getWorkflows()
wf_ids = [w["id"] for w in workflows if w.get("id")]
if not wf_ids:
return []
records = self.db.getRecordset(
Automation2WorkflowRun,
recordFilter={},
)
if not records:
return []
runs = [dict(r) for r in records if r.get("workflowId") in wf_ids]
wf_by_id = {w["id"]: w for w in workflows}
for r in runs:
wf = wf_by_id.get(r.get("workflowId"), {})
r["workflowLabel"] = wf.get("label") or r.get("workflowId", "")
runs.sort(
key=lambda x: (x.get("sysModifiedAt") or x.get("sysCreatedAt") or 0),
reverse=True,
)
return runs[:limit]
def getRunsWaitingForEmail(self) -> List[Dict[str, Any]]:
"""Get all paused runs waiting for a new email (for background poller)."""
if not self.db._ensureTableExists(Automation2WorkflowRun):
return []
records = self.db.getRecordset(
Automation2WorkflowRun,
recordFilter={"status": "paused"},
)
if not records:
return []
result = []
for r in records:
rec = dict(r)
ctx = rec.get("context") or {}
if ctx.get("waitReason") == "email":
result.append(rec)
return result
# -------------------------------------------------------------------------
# Human Tasks
# -------------------------------------------------------------------------
def createTask(
self,
runId: str,
workflowId: str,
nodeId: str,
nodeType: str,
config: Dict[str, Any],
assigneeId: str = None,
) -> Dict[str, Any]:
"""Create a human task."""
data = {
"id": str(uuid.uuid4()),
"runId": runId,
"workflowId": workflowId,
"nodeId": nodeId,
"nodeType": nodeType,
"config": config,
"assigneeId": assigneeId,
"status": "pending",
"result": None,
}
created = self.db.recordCreate(Automation2HumanTask, data)
return dict(created)
def getTask(self, taskId: str) -> Optional[Dict[str, Any]]:
"""Get a task by ID."""
if not self.db._ensureTableExists(Automation2HumanTask):
return None
records = self.db.getRecordset(
Automation2HumanTask,
recordFilter={"id": taskId},
)
if not records:
return None
return dict(records[0])
def updateTask(self, taskId: str, status: str = None, result: Dict = None) -> Optional[Dict[str, Any]]:
"""Update a task (e.g. complete with result)."""
task = self.getTask(taskId)
if not task:
return None
updates = {}
if status is not None:
updates["status"] = status
if result is not None:
updates["result"] = result
if not updates:
return task
updated = self.db.recordModify(Automation2HumanTask, taskId, updates)
return dict(updated)
def getTasks(
self,
workflowId: str = None,
runId: str = None,
status: str = None,
assigneeId: str = None,
) -> List[Dict[str, Any]]:
"""Get tasks with optional filters."""
if not self.db._ensureTableExists(Automation2HumanTask):
return []
base_rf: Dict[str, Any] = {}
if workflowId:
base_rf["workflowId"] = workflowId
if runId:
base_rf["runId"] = runId
if status:
base_rf["status"] = status
if assigneeId:
rf_assigned = {**base_rf, "assigneeId": assigneeId}
rf_unassigned = {**base_rf, "assigneeId": None}
records1 = self.db.getRecordset(Automation2HumanTask, recordFilter=rf_assigned)
records2 = self.db.getRecordset(Automation2HumanTask, recordFilter=rf_unassigned)
seen = set()
items = []
for r in (records1 or []) + (records2 or []):
rec = dict(r)
tid = rec.get("id")
if tid and tid not in seen:
seen.add(tid)
items.append(rec)
else:
records = self.db.getRecordset(
Automation2HumanTask,
recordFilter=base_rf if base_rf else None,
)
items = [dict(r) for r in records] if records else []
workflows = {w["id"]: w for w in self.getWorkflows()}
filtered = [t for t in items if t.get("workflowId") in workflows]
return filtered
# -------------------------------------------------------------------------
# Versions (AutoVersion Lifecycle)
# -------------------------------------------------------------------------
def getVersions(self, workflowId: str) -> List[Dict[str, Any]]:
"""Get all versions for a workflow, ordered by versionNumber desc."""
if not self.db._ensureTableExists(AutoVersion):
return []
records = self.db.getRecordset(AutoVersion, recordFilter={"workflowId": workflowId})
versions = [dict(r) for r in records] if records else []
versions.sort(key=lambda v: v.get("versionNumber", 0), reverse=True)
return versions
def getVersion(self, versionId: str) -> Optional[Dict[str, Any]]:
"""Get a single version by ID."""
if not self.db._ensureTableExists(AutoVersion):
return None
record = self.db.getRecord(AutoVersion, versionId)
return dict(record) if record else None
def createDraftVersion(self, workflowId: str) -> Optional[Dict[str, Any]]:
"""Create a new draft version from the workflow's current graph."""
wf = self.getWorkflow(workflowId)
if not wf:
return None
existing = self.getVersions(workflowId)
nextNumber = max((v.get("versionNumber", 0) for v in existing), default=0) + 1
import time
data = {
"id": str(uuid.uuid4()),
"workflowId": workflowId,
"versionNumber": nextNumber,
"status": "draft",
"graph": wf.get("graph", {}),
"invocations": wf.get("invocations", []),
}
created = self.db.recordCreate(AutoVersion, data)
return dict(created)
def publishVersion(self, versionId: str, userId: str = None) -> Optional[Dict[str, Any]]:
"""Publish a draft version. Archives the previously published version."""
version = self.getVersion(versionId)
if not version or version.get("status") != "draft":
return None
workflowId = version.get("workflowId")
existing = self.getVersions(workflowId)
for v in existing:
if v.get("status") == "published" and v.get("id") != versionId:
self.db.recordModify(AutoVersion, v["id"], {"status": "archived"})
import time
updated = self.db.recordModify(AutoVersion, versionId, {
"status": "published",
"publishedAt": time.time(),
"publishedBy": userId,
})
if workflowId:
self.db.recordModify(AutoWorkflow, workflowId, {
"currentVersionId": versionId,
"graph": version.get("graph", {}),
"invocations": version.get("invocations", []),
})
return dict(updated)
def unpublishVersion(self, versionId: str) -> Optional[Dict[str, Any]]:
"""Revert a published version back to draft status."""
version = self.getVersion(versionId)
if not version or version.get("status") != "published":
return None
workflowId = version.get("workflowId")
updated = self.db.recordModify(AutoVersion, versionId, {
"status": "draft",
"publishedAt": None,
"publishedBy": None,
})
if workflowId:
self.db.recordModify(AutoWorkflow, workflowId, {"currentVersionId": None})
return dict(updated)
def archiveVersion(self, versionId: str) -> Optional[Dict[str, Any]]:
"""Archive a version."""
version = self.getVersion(versionId)
if not version:
return None
updated = self.db.recordModify(AutoVersion, versionId, {"status": "archived"})
return dict(updated)
# -------------------------------------------------------------------------
# Templates
# -------------------------------------------------------------------------
def getTemplates(self, scope: str = None) -> List[Dict[str, Any]]:
"""Get workflow templates, optionally filtered by scope.
Always includes system-scope templates (mandateId=None) alongside mandate-owned ones.
"""
if not self.db._ensureTableExists(AutoWorkflow):
return []
rf: Dict[str, Any] = {
"mandateId": self.mandateId,
"featureInstanceId": self.featureInstanceId,
"isTemplate": True,
}
if scope:
rf["templateScope"] = scope
records = self.db.getRecordset(AutoWorkflow, recordFilter=rf) or []
if scope is None or scope == "system":
systemFilter: Dict[str, Any] = {
"isTemplate": True,
"templateScope": "system",
"mandateId": None,
}
systemRecords = self.db.getRecordset(AutoWorkflow, recordFilter=systemFilter) or []
seenIds = {(r.get("id") if isinstance(r, dict) else getattr(r, "id", None)) for r in records}
for sr in systemRecords:
srId = sr.get("id") if isinstance(sr, dict) else getattr(sr, "id", None)
if srId not in seenIds:
records.append(sr)
return [dict(r) for r in records] if records else []
def createTemplateFromWorkflow(self, workflowId: str, scope: str = "user") -> Optional[Dict[str, Any]]:
"""Create a template by copying the published AutoVersion's graph (or workflow graph as fallback)."""
wf = self.getWorkflow(workflowId)
if not wf:
return None
graph = wf.get("graph", {})
invocations = wf.get("invocations", [])
currentVersionId = wf.get("currentVersionId")
if currentVersionId:
version = self.getVersion(currentVersionId)
if version:
graph = version.get("graph", graph)
invocations = version.get("invocations", invocations)
data = {
"id": str(uuid.uuid4()),
"mandateId": self.mandateId,
"featureInstanceId": self.featureInstanceId,
"label": f"{wf.get('label', 'Workflow')} (Template)",
"graph": graph,
"invocations": invocations,
"isTemplate": True,
"templateScope": scope,
"templateSourceId": workflowId,
"active": False,
}
created = self.db.recordCreate(AutoWorkflow, data)
return dict(created)
def copyTemplateToUser(self, templateId: str) -> Optional[Dict[str, Any]]:
"""Copy a template to a new user-owned workflow with templateScope='user'."""
template = self.getWorkflow(templateId)
if not template or not template.get("isTemplate"):
return None
data = {
"id": str(uuid.uuid4()),
"mandateId": self.mandateId,
"featureInstanceId": self.featureInstanceId,
"label": template.get("label", "Workflow").replace(" (Template)", ""),
"graph": template.get("graph", {}),
"invocations": template.get("invocations", []),
"isTemplate": False,
"templateSourceId": templateId,
"templateScope": "user",
"active": True,
}
created = self.db.recordCreate(AutoWorkflow, data)
return dict(created)
def shareTemplate(self, templateId: str, scope: str) -> Optional[Dict[str, Any]]:
"""Change a template's scope. Sets sharedReadOnly=True for shared scopes, False for user scope."""
template = self.getWorkflow(templateId)
if not template or not template.get("isTemplate"):
return None
updated = self.db.recordModify(AutoWorkflow, templateId, {
"templateScope": scope,
"sharedReadOnly": scope != "user",
})
return dict(updated)
# Backward-compatible alias
Automation2Objects = GraphicalEditorObjects