platform-core/modules/features/graphicalEditor/switchOutput.py
ValueOn AG e800bc0b71
Some checks failed
Deploy Plattform-Core / test (push) Failing after 45s
Deploy Plattform-Core / deploy (push) Has been skipped
Sync: full codebase from GitHub gateway main
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 23:54:29 +02:00

308 lines
11 KiB
Python

# Copyright (c) 2025 Patrick Motsch
"""Build flow.switch branch payloads: filtered context + loop-ready items."""
from __future__ import annotations
import copy
import re
from typing import Any, Dict, List, Optional
from modules.features.graphicalEditor.portTypes import unwrapTransit
_CONTEXT_FILTER_OPERATORS = frozenset({"contains_content"})
_BLOB_IMAGE_CHUNK_RE = re.compile(r"^\[image(?:\:([^\]]+))?\]$")
def _artifacts_by_part_id_from_presentation(inp: Any) -> Dict[str, str]:
plain = _unwrap_input(inp)
meta = plain.get("_meta") if isinstance(plain, dict) else None
if not isinstance(meta, dict):
return {}
out: Dict[str, str] = {}
for art in meta.get("persistedImageArtifacts") or []:
if not isinstance(art, dict):
continue
sp = str(art.get("sourcePartId") or "").strip()
fid = str(art.get("fileId") or "").strip()
if sp and fid:
out[sp] = fid
return out
def _enrich_image_slot(slot: Dict[str, Any], artifacts_by_part: Dict[str, str]) -> None:
if (slot.get("typeGroup") or "").strip().lower() != "image":
return
existing = str(slot.get("embeddedImageFileId") or "").strip()
if existing and existing in artifacts_by_part.values():
return
candidates: List[str] = []
sid = str(slot.get("id") or "").strip()
if sid:
candidates.append(sid)
data = slot.get("data")
if isinstance(data, str):
m = _BLOB_IMAGE_CHUNK_RE.fullmatch(data.strip())
if m:
tok = (m.group(1) or "").strip()
if tok:
candidates.append(tok)
for cand in candidates:
fid = artifacts_by_part.get(cand)
if fid:
slot["embeddedImageFileId"] = fid
return
def _slot_matches_content_type(slot: Dict[str, Any], content_type: str) -> bool:
target = (content_type or "").strip().lower()
if not target:
return False
tg = (slot.get("typeGroup") or slot.get("contentType") or "").strip().lower()
if target == "media":
return tg in ("image", "media", "video", "audio")
if target == "text":
return tg in ("text", "table", "structure")
return tg == target
def _filter_bucket_slots(bucket: Dict[str, Any], content_type: str) -> Dict[str, Any]:
"""Return a copy of a presentation file bucket with filtered ``data`` slots."""
mode = str(bucket.get("outputMode") or "").strip().lower()
data = bucket.get("data")
if mode == "blob" and isinstance(data, str):
from modules.workflows.methods.methodContext.actions.extractContent import (
filter_blob_bucket_by_content_type,
)
return filter_blob_bucket_by_content_type(bucket, content_type)
out = copy.deepcopy(bucket)
if isinstance(data, list):
out["data"] = [s for s in data if isinstance(s, dict) and _slot_matches_content_type(s, content_type)]
elif isinstance(data, dict) and _slot_matches_content_type(data, content_type):
out["data"] = data
else:
out["data"] = [] if isinstance(data, list) else data
return out
def _filter_presentation_envelope(envelope: Dict[str, Any], content_type: str) -> Dict[str, Any]:
"""Filter all slots in a presentation envelope by content type group."""
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
PRESENTATION_SCHEMA_VERSION,
)
out = copy.deepcopy(envelope)
files = out.get("files") or {}
if not isinstance(files, dict):
return out
filtered_files: Dict[str, Any] = {}
kept_order: List[str] = []
for fk in out.get("fileOrder") or list(files.keys()):
bucket = files.get(fk)
if not isinstance(bucket, dict):
continue
fb = _filter_bucket_slots(bucket, content_type)
data = fb.get("data")
has_data = (
(isinstance(data, list) and len(data) > 0)
or (isinstance(data, dict))
or (isinstance(data, str) and str(data).strip())
)
if has_data:
filtered_files[str(fk)] = fb
kept_order.append(str(fk))
out["schemaVersion"] = out.get("schemaVersion") or PRESENTATION_SCHEMA_VERSION
out["kind"] = out.get("kind") or PRESENTATION_KIND
out["fileOrder"] = kept_order
out["files"] = filtered_files
return out
def _slots_from_bucket(bucket: Dict[str, Any]) -> List[Any]:
data = bucket.get("data")
mode = str(bucket.get("outputMode") or "").strip().lower()
if mode == "blob" and isinstance(data, str) and data.strip():
from modules.workflows.methods.methodContext.actions.extractContent import parse_blob_data_segments
return parse_blob_data_segments(data)
if isinstance(data, list):
return [s for s in data if isinstance(s, dict)]
if isinstance(data, dict):
return [data]
if isinstance(data, str) and data.strip():
return [{"typeGroup": "text", "data": data}]
items = bucket.get("items")
if isinstance(items, list):
return [i for i in items if isinstance(i, dict)]
return []
def _items_from_presentation_envelope(
envelope: Dict[str, Any],
*,
artifacts_by_part: Optional[Dict[str, str]] = None,
) -> List[Any]:
items: List[Any] = []
files = envelope.get("files") or {}
if not isinstance(files, dict):
return items
for fk in envelope.get("fileOrder") or list(files.keys()):
bucket = files.get(fk)
if isinstance(bucket, dict):
for slot in _slots_from_bucket(bucket):
if artifacts_by_part:
_enrich_image_slot(slot, artifacts_by_part)
sid = str(slot.get("id") or slot.get("label") or len(items))
items.append({"name": f"{fk}:{sid}", "value": slot})
return items
def expand_items_from_input(raw: Any) -> List[Any]:
"""Best-effort loop items from transit/presentation/list/dict input."""
if raw is None:
return []
if isinstance(raw, dict) and isinstance(raw.get("items"), list):
return list(raw["items"])
plain = unwrapTransit(raw) if isinstance(raw, dict) and raw.get("_transit") else raw
if isinstance(plain, dict) and isinstance(plain.get("items"), list):
return list(plain["items"])
from modules.workflows.methods.methodContext.actions.extractContent import (
normalize_presentation_envelopes,
)
envelopes = normalize_presentation_envelopes(plain)
if envelopes:
out: List[Any] = []
for env in envelopes:
out.extend(_items_from_presentation_envelope(env))
return out
if isinstance(plain, list):
return list(plain)
if isinstance(plain, dict):
children = plain.get("children")
if isinstance(children, list) and children:
return list(children)
return [{"name": k, "value": v} for k, v in plain.items()]
return [plain]
def _unwrap_input(inp: Any) -> Any:
if isinstance(inp, dict) and inp.get("_transit"):
return unwrapTransit(inp)
return inp
def build_switch_branch_payload(
inp: Any,
case: Dict[str, Any],
*,
value_kind: str = "unknown",
match_index: int = 0,
) -> Dict[str, Any]:
"""Payload for a matched switch case (ContextBranch inner data)."""
operator = str(case.get("operator") or "eq")
right = case.get("value")
plain_in = _unwrap_input(inp)
if operator in _CONTEXT_FILTER_OPERATORS and value_kind == "context":
content_type = str(right or "")
from modules.workflows.methods.methodContext.actions.extractContent import (
normalize_presentation_envelopes,
)
source = plain_in
if isinstance(source, dict) and "data" in source and not source.get("kind"):
nested = source.get("data")
if isinstance(nested, dict):
source = nested
envelopes = normalize_presentation_envelopes(source)
if not envelopes and isinstance(plain_in, dict):
envelopes = normalize_presentation_envelopes(plain_in)
filtered_envs = [_filter_presentation_envelope(env, content_type) for env in envelopes]
artifacts_by_part = _artifacts_by_part_id_from_presentation(plain_in)
items: List[Any] = []
for env in filtered_envs:
items.extend(_items_from_presentation_envelope(env, artifacts_by_part=artifacts_by_part))
if len(filtered_envs) == 1:
data_out: Any = filtered_envs[0]
elif filtered_envs:
data_out = {"envelopes": filtered_envs}
else:
data_out = {}
return {
"data": data_out,
"items": items,
"filterApplied": True,
"contentType": content_type,
"match": match_index,
}
data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in}
return {
"data": data_out,
"items": expand_items_from_input(inp),
"filterApplied": False,
"match": match_index,
}
def build_switch_default_payload(inp: Any, *, match_index: int) -> Dict[str, Any]:
"""Sonst branch: unmodified input passthrough."""
plain_in = _unwrap_input(inp)
data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in}
return {
"data": data_out,
"items": expand_items_from_input(inp),
"filterApplied": False,
"match": match_index,
}
def build_switch_combined_output(
inp: Any,
cases: List[Any],
*,
matched_indices: List[int],
value_kind: str = "unknown",
) -> Dict[str, Any]:
"""Build per-port branch payloads; primary fields mirror the first active match."""
branches: Dict[str, Dict[str, Any]] = {}
default_idx = len(cases)
for idx in matched_indices:
if idx == default_idx:
branches[str(idx)] = build_switch_default_payload(inp, match_index=default_idx)
elif 0 <= idx < len(cases):
c = cases[idx] if isinstance(cases[idx], dict) else {"operator": "eq", "value": cases[idx]}
branches[str(idx)] = build_switch_branch_payload(
inp, c, value_kind=value_kind, match_index=idx,
)
primary_idx = matched_indices[0] if matched_indices else default_idx
primary = branches.get(str(primary_idx)) or build_switch_default_payload(inp, match_index=default_idx)
return {**primary, "branches": branches}
def switch_branch_payload(transit: Any, source_output: int) -> Optional[Dict[str, Any]]:
"""Return the ContextBranch inner dict for a specific switch output port."""
if not isinstance(transit, dict):
return None
data = transit.get("data") if transit.get("_transit") else transit
if not isinstance(data, dict):
return None
branches = data.get("branches")
if isinstance(branches, dict):
branch = branches.get(str(source_output))
if isinstance(branch, dict):
return branch
if transit.get("_transit"):
return data
return data
def unwrap_transit_for_port(output: Any, source_output: Optional[int] = None) -> Any:
"""Unwrap transit; when ``source_output`` is set, pick that switch branch payload."""
if source_output is not None:
branch = switch_branch_payload(output, source_output)
if branch is not None:
return branch
return unwrapTransit(output)