gateway/modules/workflows/automation2/udmUpstreamShapes.py
2026-04-25 01:13:01 +02:00

36 lines
1.1 KiB
Python

# Copyright (c) 2025 Patrick Motsch
"""
Pure shape coercion for UDM-related upstream payloads (tests + optional tooling).
No runtime wire-handover — kept only so unit tests can assert stable normalisation rules.
"""
from __future__ import annotations
from typing import Any, Dict
def _coerceUdmDocumentInput(upstream: Dict[str, Any]) -> Dict[str, Any]:
if upstream.get("children") is not None and upstream.get("sourceType"):
return upstream
udm = upstream.get("udm")
if isinstance(udm, dict) and udm.get("children") is not None:
return udm
return {}
def _coerceUdmNodeListInput(upstream: Dict[str, Any]) -> Dict[str, Any]:
nodes = upstream.get("nodes")
if isinstance(nodes, list):
return {"nodes": nodes, "count": len(nodes)}
children = upstream.get("children")
if isinstance(children, list):
return {"nodes": children, "count": len(children)}
return {}
def _coerceConsolidateResultInput(upstream: Dict[str, Any]) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for key in ("result", "mode", "count"):
if key in upstream:
result[key] = upstream[key]
return result