feat: readded trigger nodes
This commit is contained in:
parent
b238721563
commit
64591fda3f
6 changed files with 120 additions and 22 deletions
|
|
@ -83,7 +83,60 @@ def normalize_invocations_list(items: Optional[List[Any]]) -> List[Dict[str, Any
|
|||
return out
|
||||
|
||||
|
||||
# Schedule / cron: wire an external job runner (APScheduler, Celery, system cron) to call
|
||||
_NODE_TYPE_TO_KIND = {
|
||||
"trigger.manual": "manual",
|
||||
"trigger.form": "form",
|
||||
"trigger.schedule": "schedule",
|
||||
}
|
||||
|
||||
|
||||
def invocations_synced_with_graph(
|
||||
graph: Optional[Dict[str, Any]],
|
||||
stored_invocations: Optional[List[Any]],
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Derive primary invocation (index 0) from the first start node in ``graph``.
|
||||
|
||||
If the graph has no start node, only non-primary stored invocations are kept
|
||||
(no injected default). Document order in ``nodes`` defines which start wins.
|
||||
"""
|
||||
from modules.workflows.automation2.graphUtils import getTriggerNodes
|
||||
|
||||
g = graph if isinstance(graph, dict) else {}
|
||||
nodes = g.get("nodes") or []
|
||||
stored = list(stored_invocations or [])
|
||||
rest: List[Dict[str, Any]] = []
|
||||
for raw in stored[1:]:
|
||||
if isinstance(raw, dict):
|
||||
rest.append(normalize_invocation_entry(raw))
|
||||
|
||||
triggers = getTriggerNodes(nodes)
|
||||
if not triggers:
|
||||
return rest
|
||||
|
||||
node = triggers[0]
|
||||
nt = str(node.get("type", "")).strip()
|
||||
kind = _NODE_TYPE_TO_KIND.get(nt, "manual")
|
||||
nid = node.get("id")
|
||||
if not nid:
|
||||
nid = str(uuid.uuid4())
|
||||
raw_title = node.get("title") or node.get("label") or "Start"
|
||||
|
||||
old_primary = stored[0] if stored and isinstance(stored[0], dict) else {}
|
||||
config: Dict[str, Any] = {}
|
||||
if isinstance(old_primary.get("config"), dict) and old_primary.get("kind") == kind:
|
||||
config = dict(old_primary["config"])
|
||||
desc = old_primary.get("description") if isinstance(old_primary.get("description"), dict) else {}
|
||||
|
||||
primary_raw: Dict[str, Any] = {
|
||||
"id": str(nid),
|
||||
"kind": kind,
|
||||
"enabled": True,
|
||||
"title": raw_title,
|
||||
"description": desc,
|
||||
"config": config,
|
||||
}
|
||||
primary = normalize_invocation_entry(primary_raw)
|
||||
return [primary] + rest
|
||||
# POST .../execute with entryPointId set to a schedule entry — no separate in-process scheduler here yet.
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
|
|||
AutoRun as Automation2WorkflowRun,
|
||||
AutoTask as Automation2HumanTask,
|
||||
)
|
||||
from modules.features.graphicalEditor.entryPoints import normalize_invocations_list
|
||||
from modules.features.graphicalEditor.entryPoints import invocations_synced_with_graph
|
||||
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
||||
from modules.shared.configuration import APP_CONFIG
|
||||
from modules.shared.dbRegistry import registerDatabase
|
||||
|
|
@ -109,7 +109,7 @@ def getAllWorkflowsForScheduling() -> List[Dict[str, Any]]:
|
|||
if r.get("active") is False:
|
||||
continue
|
||||
wf = dict(r)
|
||||
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
|
||||
wf["invocations"] = invocations_synced_with_graph(wf.get("graph") or {}, wf.get("invocations"))
|
||||
invocations = wf.get("invocations") or []
|
||||
primary = invocations[0] if invocations else {}
|
||||
if not isinstance(primary, dict):
|
||||
|
|
@ -204,7 +204,7 @@ class GraphicalEditorObjects:
|
|||
)
|
||||
rows = [dict(r) for r in records] if records else []
|
||||
for wf in rows:
|
||||
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
|
||||
wf["invocations"] = invocations_synced_with_graph(wf.get("graph") or {}, wf.get("invocations"))
|
||||
return rows
|
||||
|
||||
def getWorkflow(self, workflowId: str) -> Optional[Dict[str, Any]]:
|
||||
|
|
@ -221,7 +221,7 @@ class GraphicalEditorObjects:
|
|||
if not records:
|
||||
return None
|
||||
wf = dict(records[0])
|
||||
wf["invocations"] = normalize_invocations_list(wf.get("invocations"))
|
||||
wf["invocations"] = invocations_synced_with_graph(wf.get("graph") or {}, wf.get("invocations"))
|
||||
return wf
|
||||
|
||||
def createWorkflow(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
|
|
@ -234,10 +234,10 @@ class GraphicalEditorObjects:
|
|||
data["targetFeatureInstanceId"] = self.featureInstanceId
|
||||
if "active" not in data or data.get("active") is None:
|
||||
data["active"] = True
|
||||
data["invocations"] = normalize_invocations_list(data.get("invocations"))
|
||||
data["invocations"] = invocations_synced_with_graph(data.get("graph") or {}, data.get("invocations"))
|
||||
created = self.db.recordCreate(Automation2Workflow, data)
|
||||
out = dict(created)
|
||||
out["invocations"] = normalize_invocations_list(out.get("invocations"))
|
||||
out["invocations"] = invocations_synced_with_graph(out.get("graph") or {}, out.get("invocations"))
|
||||
try:
|
||||
from modules.shared.callbackRegistry import callbackRegistry
|
||||
callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED)
|
||||
|
|
@ -252,11 +252,15 @@ class GraphicalEditorObjects:
|
|||
return None
|
||||
data.pop("mandateId", None)
|
||||
data.pop("featureInstanceId", None)
|
||||
if "invocations" in data:
|
||||
data["invocations"] = normalize_invocations_list(data.get("invocations"))
|
||||
if "graph" in data or "invocations" in data:
|
||||
g = data["graph"] if "graph" in data else existing.get("graph")
|
||||
if not isinstance(g, dict):
|
||||
g = {}
|
||||
inv = data["invocations"] if "invocations" in data else existing.get("invocations")
|
||||
data["invocations"] = invocations_synced_with_graph(g, inv)
|
||||
updated = self.db.recordModify(Automation2Workflow, workflowId, data)
|
||||
out = dict(updated)
|
||||
out["invocations"] = normalize_invocations_list(out.get("invocations"))
|
||||
out["invocations"] = invocations_synced_with_graph(out.get("graph") or {}, out.get("invocations"))
|
||||
try:
|
||||
from modules.shared.callbackRegistry import callbackRegistry
|
||||
callbackRegistry.trigger(_CALLBACK_WORKFLOW_CHANGED)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# Canvas start nodes — variant reflects workflow configuration (gear in editor).
|
||||
# Start nodes (palette category ``start``); kinds align with workflow entry points / run envelope.
|
||||
|
||||
from modules.shared.i18nRegistry import t
|
||||
|
||||
|
|
@ -8,9 +8,9 @@ from modules.features.graphicalEditor.nodeDefinitions.ai import ACTION_RESULT_DA
|
|||
TRIGGER_NODES = [
|
||||
{
|
||||
"id": "trigger.manual",
|
||||
"category": "trigger",
|
||||
"category": "start",
|
||||
"label": t("Start"),
|
||||
"description": t("Manuell, API oder Hintergrund-Starts (Webhook, E-Mail, …)."),
|
||||
"description": t("Manuell Trigger. Workflow startet nur, wenn auf Start-Button geklickt wird."),
|
||||
"parameters": [],
|
||||
"inputs": 0,
|
||||
"outputs": 1,
|
||||
|
|
@ -21,9 +21,9 @@ TRIGGER_NODES = [
|
|||
},
|
||||
{
|
||||
"id": "trigger.form",
|
||||
"category": "trigger",
|
||||
"category": "start",
|
||||
"label": t("Start (Formular)"),
|
||||
"description": t("Felder werden beim Start befüllt; konfigurieren Sie die Felder auf dieser Node."),
|
||||
"description": t("Formular Trigger. Workflow startet nur, wenn das Formular ausgefüllt und abgeschickt wird."),
|
||||
"parameters": [
|
||||
{
|
||||
"name": "formFields",
|
||||
|
|
@ -42,9 +42,9 @@ TRIGGER_NODES = [
|
|||
},
|
||||
{
|
||||
"id": "trigger.schedule",
|
||||
"category": "trigger",
|
||||
"category": "start",
|
||||
"label": t("Start (Zeitplan)"),
|
||||
"description": t("Cron-Ausdruck für geplante Läufe."),
|
||||
"description": t("Workflow startet nach dem ausgewählten Zeitplan."),
|
||||
"parameters": [
|
||||
{
|
||||
"name": "cron",
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# All rights reserved.
|
||||
"""
|
||||
Node Type Registry for graphicalEditor - static node definitions (ai, email, sharepoint, trigger, flow, data, input).
|
||||
Node Type Registry for graphicalEditor - static node definitions (start, input, flow, data, ai, email, …).
|
||||
Nodes are defined first; IO/method actions are used at execution time.
|
||||
"""
|
||||
|
||||
|
|
@ -123,7 +123,7 @@ def getNodeTypesForApi(
|
|||
nodes = getNodeTypes(services, language)
|
||||
localized = [_localizeNode(n, language) for n in nodes]
|
||||
categories = [
|
||||
{"id": "trigger", "label": "Trigger"},
|
||||
{"id": "start", "label": "Start"},
|
||||
{"id": "input", "label": "Eingabe/Mensch"},
|
||||
{"id": "flow", "label": "Ablauf"},
|
||||
{"id": "data", "label": "Daten"},
|
||||
|
|
|
|||
|
|
@ -146,8 +146,15 @@ def getInputSources(nodeId: str, connectionMap: Dict[str, List[Tuple[str, int, i
|
|||
|
||||
|
||||
def getTriggerNodes(nodes: List[Dict]) -> List[Dict]:
|
||||
"""Return nodes with category=trigger or type starting with trigger."""
|
||||
return [n for n in nodes if (n.get("type", "").startswith("trigger.") or n.get("category") == "trigger")]
|
||||
"""Return start/trigger nodes: type ``trigger.*``, or category ``trigger`` / ``start``."""
|
||||
return [
|
||||
n
|
||||
for n in nodes
|
||||
if (
|
||||
str(n.get("type", "")).startswith("trigger.")
|
||||
or n.get("category") in ("trigger", "start")
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def validateGraph(graph: Dict[str, Any], nodeTypeIds: Set[str]) -> List[str]:
|
||||
|
|
@ -186,6 +193,11 @@ def validateGraph(graph: Dict[str, Any], nodeTypeIds: Set[str]) -> List[str]:
|
|||
logger.warning("validateGraph port mismatches: %s", port_errors)
|
||||
errors.extend(port_errors)
|
||||
|
||||
if nodes and not getTriggerNodes(nodes):
|
||||
errors.append(
|
||||
"Workflow has no start node: add a node from the Start category before running."
|
||||
)
|
||||
|
||||
if errors:
|
||||
logger.debug("validateGraph errors: %s", errors)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -5,7 +5,36 @@ Unit tests for automation2 graphUtils - resolveParameterReferences (ref/value fo
|
|||
|
||||
import pytest
|
||||
|
||||
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
||||
from modules.workflows.automation2.graphUtils import resolveParameterReferences, validateGraph
|
||||
|
||||
|
||||
_KNOWN_TYPES = frozenset({"trigger.manual", "trigger.form", "ai.prompt", "flow.pass"})
|
||||
|
||||
|
||||
class TestValidateGraphStartNode:
|
||||
"""Non-empty graphs must include at least one start (trigger.*) node."""
|
||||
|
||||
def test_empty_graph_ok_without_start(self):
|
||||
assert validateGraph({"nodes": [], "connections": []}, _KNOWN_TYPES) == []
|
||||
|
||||
def test_non_empty_graph_without_start_fails(self):
|
||||
graph = {
|
||||
"nodes": [{"id": "a", "type": "ai.prompt", "parameters": {}}],
|
||||
"connections": [],
|
||||
}
|
||||
errs = validateGraph(graph, _KNOWN_TYPES)
|
||||
assert any("no start node" in e.lower() for e in errs)
|
||||
|
||||
def test_non_empty_graph_with_start_ok(self):
|
||||
graph = {
|
||||
"nodes": [
|
||||
{"id": "t", "type": "trigger.manual", "parameters": {}},
|
||||
{"id": "a", "type": "ai.prompt", "parameters": {}},
|
||||
],
|
||||
"connections": [],
|
||||
}
|
||||
errs = validateGraph(graph, _KNOWN_TYPES)
|
||||
assert not any("no start node" in e.lower() for e in errs)
|
||||
|
||||
|
||||
class TestResolveParameterReferences:
|
||||
|
|
|
|||
Loading…
Reference in a new issue