feat: readded trigger nodes

This commit is contained in:
Ida 2026-05-14 10:56:59 +02:00
parent 41a6b9759c
commit 1c973d5dfe
6 changed files with 120 additions and 22 deletions

View file

@ -83,7 +83,60 @@ def normalize_invocations_list(items: Optional[List[Any]]) -> List[Dict[str, Any
return out
# Schedule / cron: wire an external job runner (APScheduler, Celery, system cron) to call
_NODE_TYPE_TO_KIND = {
"trigger.manual": "manual",
"trigger.form": "form",
"trigger.schedule": "schedule",
}
def invocations_synced_with_graph(
graph: Optional[Dict[str, Any]],
stored_invocations: Optional[List[Any]],
) -> List[Dict[str, Any]]:
"""Derive primary invocation (index 0) from the first start node in ``graph``.
If the graph has no start node, only non-primary stored invocations are kept
(no injected default). Document order in ``nodes`` defines which start wins.
"""
from modules.workflows.automation2.graphUtils import getTriggerNodes
g = graph if isinstance(graph, dict) else {}
nodes = g.get("nodes") or []
stored = list(stored_invocations or [])
rest: List[Dict[str, Any]] = []
for raw in stored[1:]:
if isinstance(raw, dict):
rest.append(normalize_invocation_entry(raw))
triggers = getTriggerNodes(nodes)
if not triggers:
return rest
node = triggers[0]
nt = str(node.get("type", "")).strip()
kind = _NODE_TYPE_TO_KIND.get(nt, "manual")
nid = node.get("id")
if not nid:
nid = str(uuid.uuid4())
raw_title = node.get("title") or node.get("label") or "Start"
old_primary = stored[0] if stored and isinstance(stored[0], dict) else {}
config: Dict[str, Any] = {}
if isinstance(old_primary.get("config"), dict) and old_primary.get("kind") == kind:
config = dict(old_primary["config"])
desc = old_primary.get("description") if isinstance(old_primary.get("description"), dict) else {}
primary_raw: Dict[str, Any] = {
"id": str(nid),
"kind": kind,
"enabled": True,
"title": raw_title,
"description": desc,
"config": config,
}
primary = normalize_invocation_entry(primary_raw)
return [primary] + rest
# POST .../execute with entryPointId set to a schedule entry — no separate in-process scheduler here yet.

View file

@ -49,7 +49,7 @@ from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
AutoRun as Automation2WorkflowRun,
AutoTask as Automation2HumanTask,
)
from modules.features.graphicalEditor.entryPoints import normalize_invocations_list
from modules.features.graphicalEditor.entryPoints import invocations_synced_with_graph
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.shared.dbRegistry import registerDatabase
@ -109,7 +109,7 @@ def getAllWorkflowsForScheduling() -> List[Dict[str, Any]]:
if r.get("active") is False:
continue
wf = dict(r)
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
wf["invocations"] = invocations_synced_with_graph(wf.get("graph") or {}, wf.get("invocations"))
invocations = wf.get("invocations") or []
primary = invocations[0] if invocations else {}
if not isinstance(primary, dict):
@ -204,7 +204,7 @@ class GraphicalEditorObjects:
)
rows = [dict(r) for r in records] if records else []
for wf in rows:
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
wf["invocations"] = invocations_synced_with_graph(wf.get("graph") or {}, wf.get("invocations"))
return rows
def getWorkflow(self, workflowId: str) -> Optional[Dict[str, Any]]:
@ -221,7 +221,7 @@ class GraphicalEditorObjects:
if not records:
return None
wf = dict(records[0])
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
wf["invocations"] = invocations_synced_with_graph(wf.get("graph") or {}, wf.get("invocations"))
return wf
def createWorkflow(self, data: Dict[str, Any]) -> Dict[str, Any]:
@ -234,10 +234,10 @@ class GraphicalEditorObjects:
data["targetFeatureInstanceId"] = self.featureInstanceId
if "active" not in data or data.get("active") is None:
data["active"] = True
data["invocations"] = normalize_invocations_list(data.get("invocations"))
data["invocations"] = invocations_synced_with_graph(data.get("graph") or {}, data.get("invocations"))
created = self.db.recordCreate(Automation2Workflow, data)
out = dict(created)
out["invocations"] = normalize_invocations_list(out.get("invocations"))
out["invocations"] = invocations_synced_with_graph(out.get("graph") or {}, out.get("invocations"))
try:
from modules.shared.callbackRegistry import callbackRegistry
callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED)
@ -252,11 +252,15 @@ class GraphicalEditorObjects:
return None
data.pop("mandateId", None)
data.pop("featureInstanceId", None)
if "invocations" in data:
data["invocations"] = normalize_invocations_list(data.get("invocations"))
if "graph" in data or "invocations" in data:
g = data["graph"] if "graph" in data else existing.get("graph")
if not isinstance(g, dict):
g = {}
inv = data["invocations"] if "invocations" in data else existing.get("invocations")
data["invocations"] = invocations_synced_with_graph(g, inv)
updated = self.db.recordModify(Automation2Workflow, workflowId, data)
out = dict(updated)
out["invocations"] = normalize_invocations_list(out.get("invocations"))
out["invocations"] = invocations_synced_with_graph(out.get("graph") or {}, out.get("invocations"))
try:
from modules.shared.callbackRegistry import callbackRegistry
callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED)

View file

@ -1,5 +1,5 @@
# Copyright (c) 2025 Patrick Motsch
# Canvas start nodes — variant reflects workflow configuration (gear in editor).
# Start nodes (palette category ``start``); kinds align with workflow entry points / run envelope.
from modules.shared.i18nRegistry import t
@ -8,9 +8,9 @@ from modules.features.graphicalEditor.nodeDefinitions.ai import ACTION_RESULT_DA
TRIGGER_NODES = [
{
"id": "trigger.manual",
"category": "trigger",
"category": "start",
"label": t("Start"),
"description": t("Manuell, API oder Hintergrund-Starts (Webhook, E-Mail, …)."),
"description": t("Manuell Trigger. Workflow startet nur, wenn auf Start-Button geklickt wird."),
"parameters": [],
"inputs": 0,
"outputs": 1,
@ -21,9 +21,9 @@ TRIGGER_NODES = [
},
{
"id": "trigger.form",
"category": "trigger",
"category": "start",
"label": t("Start (Formular)"),
"description": t("Felder werden beim Start befüllt; konfigurieren Sie die Felder auf dieser Node."),
"description": t("Formular Trigger. Workflow startet nur, wenn das Formular ausgefüllt und abgeschickt wird."),
"parameters": [
{
"name": "formFields",
@ -42,9 +42,9 @@ TRIGGER_NODES = [
},
{
"id": "trigger.schedule",
"category": "trigger",
"category": "start",
"label": t("Start (Zeitplan)"),
"description": t("Cron-Ausdruck für geplante Läufe."),
"description": t("Workflow startet nach dem ausgewählten Zeitplan."),
"parameters": [
{
"name": "cron",

View file

@ -1,7 +1,7 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Node Type Registry for graphicalEditor - static node definitions (ai, email, sharepoint, trigger, flow, data, input).
Node Type Registry for graphicalEditor - static node definitions (start, input, flow, data, ai, email, ).
Nodes are defined first; IO/method actions are used at execution time.
"""
@ -123,7 +123,7 @@ def getNodeTypesForApi(
nodes = getNodeTypes(services, language)
localized = [_localizeNode(n, language) for n in nodes]
categories = [
{"id": "trigger", "label": "Trigger"},
{"id": "start", "label": "Start"},
{"id": "input", "label": "Eingabe/Mensch"},
{"id": "flow", "label": "Ablauf"},
{"id": "data", "label": "Daten"},

View file

@ -146,8 +146,15 @@ def getInputSources(nodeId: str, connectionMap: Dict[str, List[Tuple[str, int, i
def getTriggerNodes(nodes: List[Dict]) -> List[Dict]:
"""Return nodes with category=trigger or type starting with trigger."""
return [n for n in nodes if (n.get("type", "").startswith("trigger.") or n.get("category") == "trigger")]
"""Return start/trigger nodes: type ``trigger.*``, or category ``trigger`` / ``start``."""
return [
n
for n in nodes
if (
str(n.get("type", "")).startswith("trigger.")
or n.get("category") in ("trigger", "start")
)
]
def validateGraph(graph: Dict[str, Any], nodeTypeIds: Set[str]) -> List[str]:
@ -186,6 +193,11 @@ def validateGraph(graph: Dict[str, Any], nodeTypeIds: Set[str]) -> List[str]:
logger.warning("validateGraph port mismatches: %s", port_errors)
errors.extend(port_errors)
if nodes and not getTriggerNodes(nodes):
errors.append(
"Workflow has no start node: add a node from the Start category before running."
)
if errors:
logger.debug("validateGraph errors: %s", errors)
else:

View file

@ -5,7 +5,36 @@ Unit tests for automation2 graphUtils - resolveParameterReferences (ref/value fo
import pytest
from modules.workflows.automation2.graphUtils import resolveParameterReferences
from modules.workflows.automation2.graphUtils import resolveParameterReferences, validateGraph
_KNOWN_TYPES = frozenset({"trigger.manual", "trigger.form", "ai.prompt", "flow.pass"})
class TestValidateGraphStartNode:
"""Non-empty graphs must include at least one start (trigger.*) node."""
def test_empty_graph_ok_without_start(self):
assert validateGraph({"nodes": [], "connections": []}, _KNOWN_TYPES) == []
def test_non_empty_graph_without_start_fails(self):
graph = {
"nodes": [{"id": "a", "type": "ai.prompt", "parameters": {}}],
"connections": [],
}
errs = validateGraph(graph, _KNOWN_TYPES)
assert any("no start node" in e.lower() for e in errs)
def test_non_empty_graph_with_start_ok(self):
graph = {
"nodes": [
{"id": "t", "type": "trigger.manual", "parameters": {}},
{"id": "a", "type": "ai.prompt", "parameters": {}},
],
"connections": [],
}
errs = validateGraph(graph, _KNOWN_TYPES)
assert not any("no start node" in e.lower() for e in errs)
class TestResolveParameterReferences: