gateway/modules/features/automation2/nodeRegistry.py
2026-03-22 18:20:31 +01:00

81 lines
3 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Node Type Registry for automation2 - static node definitions (ai, email, sharepoint, trigger, flow, data, input).
Nodes are defined first; IO/method actions are used at execution time.
"""
import logging
from typing import Dict, List, Any
from modules.features.automation2.nodeDefinitions import STATIC_NODE_TYPES
logger = logging.getLogger(__name__)
def getNodeTypes(
services: Any = None,
language: str = "en",
) -> List[Dict[str, Any]]:
"""
Return static node types. No dynamic I/O derivation from methodDiscovery.
services: Optional (kept for API compatibility, not used).
"""
return list(STATIC_NODE_TYPES)
def _localizeNode(node: Dict[str, Any], language: str) -> Dict[str, Any]:
"""Apply language to label/description/parameters."""
lang = language if language in ("en", "de", "fr") else "en"
out = dict(node)
# Strip internal keys for API response
for key in list(out.keys()):
if key.startswith("_"):
del out[key]
if isinstance(node.get("label"), dict):
out["label"] = node["label"].get(lang, node["label"].get("en", str(node["label"])))
if isinstance(node.get("description"), dict):
out["description"] = node["description"].get(lang, node["description"].get("en", str(node["description"])))
params = []
for p in node.get("parameters", []):
pc = dict(p)
if isinstance(p.get("description"), dict):
pc["description"] = p["description"].get(lang, p["description"].get("en", str(p.get("description", ""))))
params.append(pc)
out["parameters"] = params
return out
def getNodeTypesForApi(
services: Any,
language: str = "en",
) -> Dict[str, Any]:
"""
API-ready response: nodeTypes with localized strings, plus categories list.
"""
nodes = getNodeTypes(services, language)
localized = [_localizeNode(n, language) for n in nodes]
categories = [
{"id": "trigger", "label": {"en": "Trigger", "de": "Trigger", "fr": "Déclencheur"}},
{"id": "input", "label": {"en": "Input/Human", "de": "Eingabe/Mensch", "fr": "Entrée/Humain"}},
{"id": "flow", "label": {"en": "Flow", "de": "Ablauf", "fr": "Flux"}},
{"id": "data", "label": {"en": "Data", "de": "Daten", "fr": "Données"}},
{"id": "ai", "label": {"en": "AI", "de": "KI", "fr": "IA"}},
{"id": "email", "label": {"en": "Email", "de": "E-Mail", "fr": "Email"}},
{"id": "sharepoint", "label": {"en": "SharePoint", "de": "SharePoint", "fr": "SharePoint"}},
]
return {"nodeTypes": localized, "categories": categories}
def getNodeTypeToMethodAction() -> Dict[str, tuple]:
"""
Mapping from node type id to (method, action) for execution.
Used by ActionNodeExecutor.
"""
mapping = {}
for node in STATIC_NODE_TYPES:
method = node.get("_method")
action = node.get("_action")
if method and action:
mapping[node["id"]] = (method, action)
return mapping