# Copyright (c) 2025 Patrick Motsch """ Unified run envelope for Automation2 start/trigger nodes. Downstream nodes always see the same structure regardless of entry point (manual, form, schedule, webhook, email, api, event). """ from copy import deepcopy from typing import Any, Dict, List, Optional # trigger.type values TRIGGER_TYPES = frozenset( { "manual", "form", "schedule", "email", "webhook", "api", "event", } ) def default_run_envelope( trigger_type: str = "manual", *, entry_point_id: Optional[str] = None, entry_point_label: Optional[str] = None, payload: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, files: Optional[List[Any]] = None, user: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None, raw: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Build a normalized run envelope dict.""" tt = trigger_type if trigger_type in TRIGGER_TYPES else "manual" trig: Dict[str, Any] = {"type": tt} if entry_point_id: trig["entryPointId"] = entry_point_id if entry_point_label: trig["label"] = entry_point_label return { "trigger": trig, "payload": dict(payload or {}), "context": dict(context or {}), "files": list(files or []), "user": dict(user or {}), "metadata": dict(metadata or {}), "raw": dict(raw or {}), } def merge_run_envelope(base: Dict[str, Any], overrides: Optional[Dict[str, Any]]) -> Dict[str, Any]: """Deep-merge overrides into a copy of base (shallow merge per top-level key except nested dicts).""" out = deepcopy(base) if not overrides: return out for key in ("payload", "context", "user", "metadata", "raw"): if key in overrides and isinstance(overrides[key], dict): merged = dict(out.get(key) or {}) merged.update(overrides[key]) out[key] = merged if "files" in overrides and overrides["files"] is not None: out["files"] = list(overrides["files"]) trig = dict(out.get("trigger") or {}) ot = overrides.get("trigger") if isinstance(ot, dict): trig.update(ot) if trig.get("type") and trig["type"] not in TRIGGER_TYPES: trig["type"] = "manual" out["trigger"] = trig return out def normalize_run_envelope( incoming: Optional[Dict[str, Any]], *, user_id: Optional[str] = None, ) -> Dict[str, Any]: """ Normalize partial or missing envelope from API/scheduler. Ensures all top-level keys exist. """ if not incoming or not isinstance(incoming, dict): env = default_run_envelope("manual") else: trig = incoming.get("trigger") if isinstance(incoming.get("trigger"), dict) else {} ttype = trig.get("type") or "manual" if ttype not in TRIGGER_TYPES: ttype = "manual" env = default_run_envelope( ttype, entry_point_id=trig.get("entryPointId"), entry_point_label=trig.get("label"), payload=incoming.get("payload"), context=incoming.get("context"), files=incoming.get("files"), user=incoming.get("user"), metadata=incoming.get("metadata"), raw=incoming.get("raw"), ) if user_id and not env.get("user"): env["user"] = {"id": user_id} elif user_id and isinstance(env.get("user"), dict) and "id" not in env["user"]: env["user"] = {**env["user"], "id": user_id} return env