platform-core/modules/workflowAutomation/engine/udmUpstreamShapes.py
ValueOn AG 9be2d8aab5
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 16s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
refactory workflowAutomation completed as system component reolacing automation2 and graphEditor
2026-06-08 10:31:17 +02:00

36 lines
1.1 KiB
Python

# Copyright (c) 2025 Patrick Motsch
"""
Pure shape coercion for UDM-related upstream payloads (tests + optional tooling).
No runtime wire-handover — kept only so unit tests can assert stable normalisation rules.
"""
from __future__ import annotations
from typing import Any, Dict
def _coerceUdmDocumentInput(upstream: Dict[str, Any]) -> Dict[str, Any]:
if upstream.get("children") is not None and upstream.get("sourceType"):
return upstream
udm = upstream.get("udm")
if isinstance(udm, dict) and udm.get("children") is not None:
return udm
return {}
def _coerceUdmNodeListInput(upstream: Dict[str, Any]) -> Dict[str, Any]:
nodes = upstream.get("nodes")
if isinstance(nodes, list):
return {"nodes": nodes, "count": len(nodes)}
children = upstream.get("children")
if isinstance(children, list):
return {"nodes": children, "count": len(children)}
return {}
def _coerceConsolidateResultInput(upstream: Dict[str, Any]) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for key in ("result", "mode", "count"):
if key in upstream:
result[key] = upstream[key]
return result