platform-core/modules/workflowAutomation/engine/runEnvelope.py
ValueOn AG 4a60086c80
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 15s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
cp adapted to 2026 poweron
2026-06-09 09:53:31 +02:00

110 lines
3.6 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""
Unified run envelope for Automation2 start/trigger nodes.
Downstream nodes always see the same structure regardless of entry point
(manual, form, schedule, webhook, email, api, event).
"""
from copy import deepcopy
from typing import Any, Dict, List, Optional
# trigger.type values
TRIGGER_TYPES = frozenset(
{
"manual",
"form",
"schedule",
"email",
"webhook",
"api",
"event",
}
)
def default_run_envelope(
trigger_type: str = "manual",
*,
entry_point_id: Optional[str] = None,
entry_point_label: Optional[str] = None,
payload: Optional[Dict[str, Any]] = None,
context: Optional[Dict[str, Any]] = None,
files: Optional[List[Any]] = None,
user: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
raw: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Build a normalized run envelope dict."""
tt = trigger_type if trigger_type in TRIGGER_TYPES else "manual"
trig: Dict[str, Any] = {"type": tt}
if entry_point_id:
trig["entryPointId"] = entry_point_id
if entry_point_label:
trig["label"] = entry_point_label
return {
"trigger": trig,
"payload": dict(payload or {}),
"context": dict(context or {}),
"files": list(files or []),
"user": dict(user or {}),
"metadata": dict(metadata or {}),
"raw": dict(raw or {}),
}
def merge_run_envelope(base: Dict[str, Any], overrides: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Deep-merge overrides into a copy of base (shallow merge per top-level key except nested dicts)."""
out = deepcopy(base)
if not overrides:
return out
for key in ("payload", "context", "user", "metadata", "raw"):
if key in overrides and isinstance(overrides[key], dict):
merged = dict(out.get(key) or {})
merged.update(overrides[key])
out[key] = merged
if "files" in overrides and overrides["files"] is not None:
out["files"] = list(overrides["files"])
trig = dict(out.get("trigger") or {})
ot = overrides.get("trigger")
if isinstance(ot, dict):
trig.update(ot)
if trig.get("type") and trig["type"] not in TRIGGER_TYPES:
trig["type"] = "manual"
out["trigger"] = trig
return out
def normalize_run_envelope(
incoming: Optional[Dict[str, Any]],
*,
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Normalize partial or missing envelope from API/scheduler.
Ensures all top-level keys exist.
"""
if not incoming or not isinstance(incoming, dict):
env = default_run_envelope("manual")
else:
trig = incoming.get("trigger") if isinstance(incoming.get("trigger"), dict) else {}
ttype = trig.get("type") or "manual"
if ttype not in TRIGGER_TYPES:
ttype = "manual"
env = default_run_envelope(
ttype,
entry_point_id=trig.get("entryPointId"),
entry_point_label=trig.get("label"),
payload=incoming.get("payload"),
context=incoming.get("context"),
files=incoming.get("files"),
user=incoming.get("user"),
metadata=incoming.get("metadata"),
raw=incoming.get("raw"),
)
if user_id and not env.get("user"):
env["user"] = {"id": user_id}
elif user_id and isinstance(env.get("user"), dict) and "id" not in env["user"]:
env["user"] = {**env["user"], "id": user_id}
return env