308 lines
11 KiB
Python
308 lines
11 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
"""Build flow.switch branch payloads: filtered context + loop-ready items."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import re
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from modules.features.graphicalEditor.portTypes import unwrapTransit
|
|
|
|
_CONTEXT_FILTER_OPERATORS = frozenset({"contains_content"})
|
|
_BLOB_IMAGE_CHUNK_RE = re.compile(r"^\[image(?:\:([^\]]+))?\]$")
|
|
|
|
|
|
def _artifacts_by_part_id_from_presentation(inp: Any) -> Dict[str, str]:
|
|
plain = _unwrap_input(inp)
|
|
meta = plain.get("_meta") if isinstance(plain, dict) else None
|
|
if not isinstance(meta, dict):
|
|
return {}
|
|
out: Dict[str, str] = {}
|
|
for art in meta.get("persistedImageArtifacts") or []:
|
|
if not isinstance(art, dict):
|
|
continue
|
|
sp = str(art.get("sourcePartId") or "").strip()
|
|
fid = str(art.get("fileId") or "").strip()
|
|
if sp and fid:
|
|
out[sp] = fid
|
|
return out
|
|
|
|
|
|
def _enrich_image_slot(slot: Dict[str, Any], artifacts_by_part: Dict[str, str]) -> None:
|
|
if (slot.get("typeGroup") or "").strip().lower() != "image":
|
|
return
|
|
existing = str(slot.get("embeddedImageFileId") or "").strip()
|
|
if existing and existing in artifacts_by_part.values():
|
|
return
|
|
candidates: List[str] = []
|
|
sid = str(slot.get("id") or "").strip()
|
|
if sid:
|
|
candidates.append(sid)
|
|
data = slot.get("data")
|
|
if isinstance(data, str):
|
|
m = _BLOB_IMAGE_CHUNK_RE.fullmatch(data.strip())
|
|
if m:
|
|
tok = (m.group(1) or "").strip()
|
|
if tok:
|
|
candidates.append(tok)
|
|
for cand in candidates:
|
|
fid = artifacts_by_part.get(cand)
|
|
if fid:
|
|
slot["embeddedImageFileId"] = fid
|
|
return
|
|
|
|
|
|
def _slot_matches_content_type(slot: Dict[str, Any], content_type: str) -> bool:
|
|
target = (content_type or "").strip().lower()
|
|
if not target:
|
|
return False
|
|
tg = (slot.get("typeGroup") or slot.get("contentType") or "").strip().lower()
|
|
if target == "media":
|
|
return tg in ("image", "media", "video", "audio")
|
|
if target == "text":
|
|
return tg in ("text", "table", "structure")
|
|
return tg == target
|
|
|
|
|
|
def _filter_bucket_slots(bucket: Dict[str, Any], content_type: str) -> Dict[str, Any]:
|
|
"""Return a copy of a presentation file bucket with filtered ``data`` slots."""
|
|
mode = str(bucket.get("outputMode") or "").strip().lower()
|
|
data = bucket.get("data")
|
|
if mode == "blob" and isinstance(data, str):
|
|
from modules.workflows.methods.methodContext.actions.extractContent import (
|
|
filter_blob_bucket_by_content_type,
|
|
)
|
|
|
|
return filter_blob_bucket_by_content_type(bucket, content_type)
|
|
out = copy.deepcopy(bucket)
|
|
if isinstance(data, list):
|
|
out["data"] = [s for s in data if isinstance(s, dict) and _slot_matches_content_type(s, content_type)]
|
|
elif isinstance(data, dict) and _slot_matches_content_type(data, content_type):
|
|
out["data"] = data
|
|
else:
|
|
out["data"] = [] if isinstance(data, list) else data
|
|
return out
|
|
|
|
|
|
def _filter_presentation_envelope(envelope: Dict[str, Any], content_type: str) -> Dict[str, Any]:
|
|
"""Filter all slots in a presentation envelope by content type group."""
|
|
from modules.workflows.methods.methodContext.actions.extractContent import (
|
|
PRESENTATION_KIND,
|
|
PRESENTATION_SCHEMA_VERSION,
|
|
)
|
|
|
|
out = copy.deepcopy(envelope)
|
|
files = out.get("files") or {}
|
|
if not isinstance(files, dict):
|
|
return out
|
|
filtered_files: Dict[str, Any] = {}
|
|
kept_order: List[str] = []
|
|
for fk in out.get("fileOrder") or list(files.keys()):
|
|
bucket = files.get(fk)
|
|
if not isinstance(bucket, dict):
|
|
continue
|
|
fb = _filter_bucket_slots(bucket, content_type)
|
|
data = fb.get("data")
|
|
has_data = (
|
|
(isinstance(data, list) and len(data) > 0)
|
|
or (isinstance(data, dict))
|
|
or (isinstance(data, str) and str(data).strip())
|
|
)
|
|
if has_data:
|
|
filtered_files[str(fk)] = fb
|
|
kept_order.append(str(fk))
|
|
out["schemaVersion"] = out.get("schemaVersion") or PRESENTATION_SCHEMA_VERSION
|
|
out["kind"] = out.get("kind") or PRESENTATION_KIND
|
|
out["fileOrder"] = kept_order
|
|
out["files"] = filtered_files
|
|
return out
|
|
|
|
|
|
def _slots_from_bucket(bucket: Dict[str, Any]) -> List[Any]:
|
|
data = bucket.get("data")
|
|
mode = str(bucket.get("outputMode") or "").strip().lower()
|
|
if mode == "blob" and isinstance(data, str) and data.strip():
|
|
from modules.workflows.methods.methodContext.actions.extractContent import parse_blob_data_segments
|
|
|
|
return parse_blob_data_segments(data)
|
|
if isinstance(data, list):
|
|
return [s for s in data if isinstance(s, dict)]
|
|
if isinstance(data, dict):
|
|
return [data]
|
|
if isinstance(data, str) and data.strip():
|
|
return [{"typeGroup": "text", "data": data}]
|
|
items = bucket.get("items")
|
|
if isinstance(items, list):
|
|
return [i for i in items if isinstance(i, dict)]
|
|
return []
|
|
|
|
|
|
def _items_from_presentation_envelope(
|
|
envelope: Dict[str, Any],
|
|
*,
|
|
artifacts_by_part: Optional[Dict[str, str]] = None,
|
|
) -> List[Any]:
|
|
items: List[Any] = []
|
|
files = envelope.get("files") or {}
|
|
if not isinstance(files, dict):
|
|
return items
|
|
for fk in envelope.get("fileOrder") or list(files.keys()):
|
|
bucket = files.get(fk)
|
|
if isinstance(bucket, dict):
|
|
for slot in _slots_from_bucket(bucket):
|
|
if artifacts_by_part:
|
|
_enrich_image_slot(slot, artifacts_by_part)
|
|
sid = str(slot.get("id") or slot.get("label") or len(items))
|
|
items.append({"name": f"{fk}:{sid}", "value": slot})
|
|
return items
|
|
|
|
|
|
def expand_items_from_input(raw: Any) -> List[Any]:
|
|
"""Best-effort loop items from transit/presentation/list/dict input."""
|
|
if raw is None:
|
|
return []
|
|
if isinstance(raw, dict) and isinstance(raw.get("items"), list):
|
|
return list(raw["items"])
|
|
plain = unwrapTransit(raw) if isinstance(raw, dict) and raw.get("_transit") else raw
|
|
if isinstance(plain, dict) and isinstance(plain.get("items"), list):
|
|
return list(plain["items"])
|
|
from modules.workflows.methods.methodContext.actions.extractContent import (
|
|
normalize_presentation_envelopes,
|
|
)
|
|
|
|
envelopes = normalize_presentation_envelopes(plain)
|
|
if envelopes:
|
|
out: List[Any] = []
|
|
for env in envelopes:
|
|
out.extend(_items_from_presentation_envelope(env))
|
|
return out
|
|
if isinstance(plain, list):
|
|
return list(plain)
|
|
if isinstance(plain, dict):
|
|
children = plain.get("children")
|
|
if isinstance(children, list) and children:
|
|
return list(children)
|
|
return [{"name": k, "value": v} for k, v in plain.items()]
|
|
return [plain]
|
|
|
|
|
|
def _unwrap_input(inp: Any) -> Any:
|
|
if isinstance(inp, dict) and inp.get("_transit"):
|
|
return unwrapTransit(inp)
|
|
return inp
|
|
|
|
|
|
def build_switch_branch_payload(
|
|
inp: Any,
|
|
case: Dict[str, Any],
|
|
*,
|
|
value_kind: str = "unknown",
|
|
match_index: int = 0,
|
|
) -> Dict[str, Any]:
|
|
"""Payload for a matched switch case (ContextBranch inner data)."""
|
|
operator = str(case.get("operator") or "eq")
|
|
right = case.get("value")
|
|
plain_in = _unwrap_input(inp)
|
|
|
|
if operator in _CONTEXT_FILTER_OPERATORS and value_kind == "context":
|
|
content_type = str(right or "")
|
|
from modules.workflows.methods.methodContext.actions.extractContent import (
|
|
normalize_presentation_envelopes,
|
|
)
|
|
|
|
source = plain_in
|
|
if isinstance(source, dict) and "data" in source and not source.get("kind"):
|
|
nested = source.get("data")
|
|
if isinstance(nested, dict):
|
|
source = nested
|
|
envelopes = normalize_presentation_envelopes(source)
|
|
if not envelopes and isinstance(plain_in, dict):
|
|
envelopes = normalize_presentation_envelopes(plain_in)
|
|
filtered_envs = [_filter_presentation_envelope(env, content_type) for env in envelopes]
|
|
artifacts_by_part = _artifacts_by_part_id_from_presentation(plain_in)
|
|
items: List[Any] = []
|
|
for env in filtered_envs:
|
|
items.extend(_items_from_presentation_envelope(env, artifacts_by_part=artifacts_by_part))
|
|
if len(filtered_envs) == 1:
|
|
data_out: Any = filtered_envs[0]
|
|
elif filtered_envs:
|
|
data_out = {"envelopes": filtered_envs}
|
|
else:
|
|
data_out = {}
|
|
return {
|
|
"data": data_out,
|
|
"items": items,
|
|
"filterApplied": True,
|
|
"contentType": content_type,
|
|
"match": match_index,
|
|
}
|
|
|
|
data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in}
|
|
return {
|
|
"data": data_out,
|
|
"items": expand_items_from_input(inp),
|
|
"filterApplied": False,
|
|
"match": match_index,
|
|
}
|
|
|
|
|
|
def build_switch_default_payload(inp: Any, *, match_index: int) -> Dict[str, Any]:
|
|
"""Sonst branch: unmodified input passthrough."""
|
|
plain_in = _unwrap_input(inp)
|
|
data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in}
|
|
return {
|
|
"data": data_out,
|
|
"items": expand_items_from_input(inp),
|
|
"filterApplied": False,
|
|
"match": match_index,
|
|
}
|
|
|
|
|
|
def build_switch_combined_output(
|
|
inp: Any,
|
|
cases: List[Any],
|
|
*,
|
|
matched_indices: List[int],
|
|
value_kind: str = "unknown",
|
|
) -> Dict[str, Any]:
|
|
"""Build per-port branch payloads; primary fields mirror the first active match."""
|
|
branches: Dict[str, Dict[str, Any]] = {}
|
|
default_idx = len(cases)
|
|
for idx in matched_indices:
|
|
if idx == default_idx:
|
|
branches[str(idx)] = build_switch_default_payload(inp, match_index=default_idx)
|
|
elif 0 <= idx < len(cases):
|
|
c = cases[idx] if isinstance(cases[idx], dict) else {"operator": "eq", "value": cases[idx]}
|
|
branches[str(idx)] = build_switch_branch_payload(
|
|
inp, c, value_kind=value_kind, match_index=idx,
|
|
)
|
|
primary_idx = matched_indices[0] if matched_indices else default_idx
|
|
primary = branches.get(str(primary_idx)) or build_switch_default_payload(inp, match_index=default_idx)
|
|
return {**primary, "branches": branches}
|
|
|
|
|
|
def switch_branch_payload(transit: Any, source_output: int) -> Optional[Dict[str, Any]]:
|
|
"""Return the ContextBranch inner dict for a specific switch output port."""
|
|
if not isinstance(transit, dict):
|
|
return None
|
|
data = transit.get("data") if transit.get("_transit") else transit
|
|
if not isinstance(data, dict):
|
|
return None
|
|
branches = data.get("branches")
|
|
if isinstance(branches, dict):
|
|
branch = branches.get(str(source_output))
|
|
if isinstance(branch, dict):
|
|
return branch
|
|
if transit.get("_transit"):
|
|
return data
|
|
return data
|
|
|
|
|
|
def unwrap_transit_for_port(output: Any, source_output: Optional[int] = None) -> Any:
|
|
"""Unwrap transit; when ``source_output`` is set, pick that switch branch payload."""
|
|
if source_output is not None:
|
|
branch = switch_branch_payload(output, source_output)
|
|
if branch is not None:
|
|
return branch
|
|
return unwrapTransit(output)
|