resumed testing and handover improvement

This commit is contained in:
Ida 2026-05-14 19:25:16 +02:00
parent 7a1deccc2d
commit 42ffeee5d3
14 changed files with 1080 additions and 70 deletions

View file

@ -257,7 +257,7 @@ def _path_suggests_file(path: List[Any], producer_type: str) -> bool:
return False
def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any]) -> str:
def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any], *, _skip_upstream: bool = False) -> str:
"""Resolve condition valueKind for a DataRef against the workflow graph."""
if not isinstance(ref, dict):
return "unknown"
@ -281,31 +281,32 @@ def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any]) -> str:
return "string"
return "file"
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths
if not _skip_upstream:
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths
target_id = graph.get("targetNodeId") or producer_id
matched_type: Optional[str] = None
for entry in compute_upstream_paths(graph, target_id):
if entry.get("producerNodeId") != producer_id:
continue
entry_path = entry.get("path") or []
if _paths_equal(list(entry_path), list(path)):
matched_type = str(entry.get("type") or "Any")
break
if matched_type is None and path:
parent_path = list(path[:-1])
target_id = graph.get("targetNodeId") or producer_id
matched_type: Optional[str] = None
for entry in compute_upstream_paths(graph, target_id):
if entry.get("producerNodeId") != producer_id:
continue
if _paths_equal(list(entry.get("path") or []), parent_path):
entry_path = entry.get("path") or []
if _paths_equal(list(entry_path), list(path)):
matched_type = str(entry.get("type") or "Any")
break
if matched_type:
vk = catalog_type_to_value_kind(matched_type)
if vk != "unknown":
return vk
if matched_type is None and path:
parent_path = list(path[:-1])
for entry in compute_upstream_paths(graph, target_id):
if entry.get("producerNodeId") != producer_id:
continue
if _paths_equal(list(entry.get("path") or []), parent_path):
matched_type = str(entry.get("type") or "Any")
break
if matched_type:
vk = catalog_type_to_value_kind(matched_type)
if vk != "unknown":
return vk
if producer_type in ("trigger.form", "input.form") and path and str(path[0]) == "payload":
return "string"
@ -414,6 +415,12 @@ def _iter_presentation_parts(envelope: Dict[str, Any]) -> List[Dict[str, Any]]:
if not isinstance(bucket, dict):
continue
data = bucket.get("data")
mode = str(bucket.get("outputMode") or "").strip().lower()
if mode == "blob" and isinstance(data, str):
from modules.workflows.methods.methodContext.actions.extractContent import parse_blob_data_segments
parts.extend(parse_blob_data_segments(data))
continue
if isinstance(data, list):
for slot in data:
if isinstance(slot, dict):

View file

@ -123,9 +123,27 @@ CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS = [
},
]
_CONTEXT_BRANCH_DATA_PICK_OPTIONS = [
{
"path": ["items"],
"pickerLabel": t("Gefilterte Elemente"),
"detail": t("Empfohlen für Schleifen: je Eintrag ein Durchlauf (z. B. Bild-Slots)."),
"recommended": True,
"type": "List[Any]",
},
{
"path": ["data"],
"pickerLabel": t("Kontext (data)"),
"detail": t("Gefilterter Presentation-Umschlag oder unveränderter Eingang auf dem Sonst-Zweig."),
"recommended": False,
"type": "Dict",
},
]
# Ports, die typische Schritt-Ausgaben durchreichen (nicht nur leerer Transit).
_FLOW_INPUT_SCHEMAS = [
"Transit",
"ContextBranch",
"FormPayload",
"AiResult",
"TextResult",
@ -183,8 +201,10 @@ FLOW_NODES = [
"category": "flow",
"label": t("Switch"),
"description": t(
"Mehrere Zweige nach einem Wert aus einem vorherigen Schritt (Data Picker). "
"Definiere Fälle mit Vergleichsoperator; der Eingang wird an den ersten passenden Zweig durchgereicht."
"Mehrere Zweige nach einem Wert aus einem vorherigen Schritt. "
"Jeder Fall hat einen eigenen Ausgang mit passend gefiltertem Inhalt in ``items``; "
"mehrere Kontext-Filter können gleichzeitig zutreffen (z. B. Text und Bilder). "
"Der letzte Ausgang (Sonst) reicht den unveränderten Eingang durch."
),
"parameters": [
{
@ -199,13 +219,22 @@ FLOW_NODES = [
"type": "array",
"required": False,
"frontendType": "caseList",
"description": t("Fälle: Operator und Vergleichswert"),
"frontendOptions": {
"dependsOn": "value",
"operatorCatalog": "condition",
},
"description": t("Fälle: Operator und Vergleichswert (abhängig vom gewählten Wert)"),
},
],
"inputs": 1,
"outputs": 1,
"inputPorts": {0: {"accepts": list(_FLOW_INPUT_SCHEMAS)}},
"outputPorts": {0: {"schema": "Transit"}},
"outputPorts": {
0: {
"schema": "ContextBranch",
"dataPickOptions": _CONTEXT_BRANCH_DATA_PICK_OPTIONS,
},
},
"executor": "flow",
"meta": {"icon": "mdi-swap-horizontal", "color": "#FF9800", "usesAi": False},
},
@ -265,7 +294,7 @@ FLOW_NODES = [
"outputLabels": [t("Schleife"), t("Fertig")],
"inputPorts": {
0: {"accepts": [
"Transit", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList",
"Transit", "ContextBranch", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList",
"ActionResult", "AiResult", "QueryResult", "FormPayload", "LoopItem",
]},
},

View file

@ -298,6 +298,21 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = {
PortField(name="merged", type="Dict",
description="Zusammengeführte Daten"),
]),
"ContextBranch": PortSchema(name="ContextBranch", fields=[
PortField(name="items", type="List[Any]",
description="Schleifen-fertige Elemente aus dem (gefilterten) Kontext",
recommended=True,
picker_label=t("Gefilterte Elemente")),
PortField(name="data", type="Dict", required=False,
description="Gefilterter Presentation-Umschlag oder Eingabe-Spiegel",
picker_label=t("Kontext (data)")),
PortField(name="filterApplied", type="bool", required=False,
description="True wenn ein Kontext-Inhaltsfilter angewendet wurde"),
PortField(name="contentType", type="str", required=False,
description="Angewendeter Inhaltstyp-Filter (z. B. image)"),
PortField(name="match", type="int", required=False,
description="Aktiver Ausgangs-Index (Fall oder Sonst)"),
]),
"ActionDocument": PortSchema(name="ActionDocument", fields=[
PortField(name="documentName", type="str",
description="Dokumentname",

View file

@ -0,0 +1,308 @@
# Copyright (c) 2025 Patrick Motsch
"""Build flow.switch branch payloads: filtered context + loop-ready items."""
from __future__ import annotations
import copy
import re
from typing import Any, Dict, List, Optional
from modules.features.graphicalEditor.portTypes import unwrapTransit
_CONTEXT_FILTER_OPERATORS = frozenset({"contains_content"})
_BLOB_IMAGE_CHUNK_RE = re.compile(r"^\[image(?:\:([^\]]+))?\]$")
def _artifacts_by_part_id_from_presentation(inp: Any) -> Dict[str, str]:
plain = _unwrap_input(inp)
meta = plain.get("_meta") if isinstance(plain, dict) else None
if not isinstance(meta, dict):
return {}
out: Dict[str, str] = {}
for art in meta.get("persistedImageArtifacts") or []:
if not isinstance(art, dict):
continue
sp = str(art.get("sourcePartId") or "").strip()
fid = str(art.get("fileId") or "").strip()
if sp and fid:
out[sp] = fid
return out
def _enrich_image_slot(slot: Dict[str, Any], artifacts_by_part: Dict[str, str]) -> None:
if (slot.get("typeGroup") or "").strip().lower() != "image":
return
existing = str(slot.get("embeddedImageFileId") or "").strip()
if existing and existing in artifacts_by_part.values():
return
candidates: List[str] = []
sid = str(slot.get("id") or "").strip()
if sid:
candidates.append(sid)
data = slot.get("data")
if isinstance(data, str):
m = _BLOB_IMAGE_CHUNK_RE.fullmatch(data.strip())
if m:
tok = (m.group(1) or "").strip()
if tok:
candidates.append(tok)
for cand in candidates:
fid = artifacts_by_part.get(cand)
if fid:
slot["embeddedImageFileId"] = fid
return
def _slot_matches_content_type(slot: Dict[str, Any], content_type: str) -> bool:
target = (content_type or "").strip().lower()
if not target:
return False
tg = (slot.get("typeGroup") or slot.get("contentType") or "").strip().lower()
if target == "media":
return tg in ("image", "media", "video", "audio")
if target == "text":
return tg in ("text", "table", "structure")
return tg == target
def _filter_bucket_slots(bucket: Dict[str, Any], content_type: str) -> Dict[str, Any]:
"""Return a copy of a presentation file bucket with filtered ``data`` slots."""
mode = str(bucket.get("outputMode") or "").strip().lower()
data = bucket.get("data")
if mode == "blob" and isinstance(data, str):
from modules.workflows.methods.methodContext.actions.extractContent import (
filter_blob_bucket_by_content_type,
)
return filter_blob_bucket_by_content_type(bucket, content_type)
out = copy.deepcopy(bucket)
if isinstance(data, list):
out["data"] = [s for s in data if isinstance(s, dict) and _slot_matches_content_type(s, content_type)]
elif isinstance(data, dict) and _slot_matches_content_type(data, content_type):
out["data"] = data
else:
out["data"] = [] if isinstance(data, list) else data
return out
def _filter_presentation_envelope(envelope: Dict[str, Any], content_type: str) -> Dict[str, Any]:
"""Filter all slots in a presentation envelope by content type group."""
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
PRESENTATION_SCHEMA_VERSION,
)
out = copy.deepcopy(envelope)
files = out.get("files") or {}
if not isinstance(files, dict):
return out
filtered_files: Dict[str, Any] = {}
kept_order: List[str] = []
for fk in out.get("fileOrder") or list(files.keys()):
bucket = files.get(fk)
if not isinstance(bucket, dict):
continue
fb = _filter_bucket_slots(bucket, content_type)
data = fb.get("data")
has_data = (
(isinstance(data, list) and len(data) > 0)
or (isinstance(data, dict))
or (isinstance(data, str) and str(data).strip())
)
if has_data:
filtered_files[str(fk)] = fb
kept_order.append(str(fk))
out["schemaVersion"] = out.get("schemaVersion") or PRESENTATION_SCHEMA_VERSION
out["kind"] = out.get("kind") or PRESENTATION_KIND
out["fileOrder"] = kept_order
out["files"] = filtered_files
return out
def _slots_from_bucket(bucket: Dict[str, Any]) -> List[Any]:
data = bucket.get("data")
mode = str(bucket.get("outputMode") or "").strip().lower()
if mode == "blob" and isinstance(data, str) and data.strip():
from modules.workflows.methods.methodContext.actions.extractContent import parse_blob_data_segments
return parse_blob_data_segments(data)
if isinstance(data, list):
return [s for s in data if isinstance(s, dict)]
if isinstance(data, dict):
return [data]
if isinstance(data, str) and data.strip():
return [{"typeGroup": "text", "data": data}]
items = bucket.get("items")
if isinstance(items, list):
return [i for i in items if isinstance(i, dict)]
return []
def _items_from_presentation_envelope(
envelope: Dict[str, Any],
*,
artifacts_by_part: Optional[Dict[str, str]] = None,
) -> List[Any]:
items: List[Any] = []
files = envelope.get("files") or {}
if not isinstance(files, dict):
return items
for fk in envelope.get("fileOrder") or list(files.keys()):
bucket = files.get(fk)
if isinstance(bucket, dict):
for slot in _slots_from_bucket(bucket):
if artifacts_by_part:
_enrich_image_slot(slot, artifacts_by_part)
sid = str(slot.get("id") or slot.get("label") or len(items))
items.append({"name": f"{fk}:{sid}", "value": slot})
return items
def expand_items_from_input(raw: Any) -> List[Any]:
"""Best-effort loop items from transit/presentation/list/dict input."""
if raw is None:
return []
if isinstance(raw, dict) and isinstance(raw.get("items"), list):
return list(raw["items"])
plain = unwrapTransit(raw) if isinstance(raw, dict) and raw.get("_transit") else raw
if isinstance(plain, dict) and isinstance(plain.get("items"), list):
return list(plain["items"])
from modules.workflows.methods.methodContext.actions.extractContent import (
normalize_presentation_envelopes,
)
envelopes = normalize_presentation_envelopes(plain)
if envelopes:
out: List[Any] = []
for env in envelopes:
out.extend(_items_from_presentation_envelope(env))
return out
if isinstance(plain, list):
return list(plain)
if isinstance(plain, dict):
children = plain.get("children")
if isinstance(children, list) and children:
return list(children)
return [{"name": k, "value": v} for k, v in plain.items()]
return [plain]
def _unwrap_input(inp: Any) -> Any:
if isinstance(inp, dict) and inp.get("_transit"):
return unwrapTransit(inp)
return inp
def build_switch_branch_payload(
inp: Any,
case: Dict[str, Any],
*,
value_kind: str = "unknown",
match_index: int = 0,
) -> Dict[str, Any]:
"""Payload for a matched switch case (ContextBranch inner data)."""
operator = str(case.get("operator") or "eq")
right = case.get("value")
plain_in = _unwrap_input(inp)
if operator in _CONTEXT_FILTER_OPERATORS and value_kind == "context":
content_type = str(right or "")
from modules.workflows.methods.methodContext.actions.extractContent import (
normalize_presentation_envelopes,
)
source = plain_in
if isinstance(source, dict) and "data" in source and not source.get("kind"):
nested = source.get("data")
if isinstance(nested, dict):
source = nested
envelopes = normalize_presentation_envelopes(source)
if not envelopes and isinstance(plain_in, dict):
envelopes = normalize_presentation_envelopes(plain_in)
filtered_envs = [_filter_presentation_envelope(env, content_type) for env in envelopes]
artifacts_by_part = _artifacts_by_part_id_from_presentation(plain_in)
items: List[Any] = []
for env in filtered_envs:
items.extend(_items_from_presentation_envelope(env, artifacts_by_part=artifacts_by_part))
if len(filtered_envs) == 1:
data_out: Any = filtered_envs[0]
elif filtered_envs:
data_out = {"envelopes": filtered_envs}
else:
data_out = {}
return {
"data": data_out,
"items": items,
"filterApplied": True,
"contentType": content_type,
"match": match_index,
}
data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in}
return {
"data": data_out,
"items": expand_items_from_input(inp),
"filterApplied": False,
"match": match_index,
}
def build_switch_default_payload(inp: Any, *, match_index: int) -> Dict[str, Any]:
"""Sonst branch: unmodified input passthrough."""
plain_in = _unwrap_input(inp)
data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in}
return {
"data": data_out,
"items": expand_items_from_input(inp),
"filterApplied": False,
"match": match_index,
}
def build_switch_combined_output(
inp: Any,
cases: List[Any],
*,
matched_indices: List[int],
value_kind: str = "unknown",
) -> Dict[str, Any]:
"""Build per-port branch payloads; primary fields mirror the first active match."""
branches: Dict[str, Dict[str, Any]] = {}
default_idx = len(cases)
for idx in matched_indices:
if idx == default_idx:
branches[str(idx)] = build_switch_default_payload(inp, match_index=default_idx)
elif 0 <= idx < len(cases):
c = cases[idx] if isinstance(cases[idx], dict) else {"operator": "eq", "value": cases[idx]}
branches[str(idx)] = build_switch_branch_payload(
inp, c, value_kind=value_kind, match_index=idx,
)
primary_idx = matched_indices[0] if matched_indices else default_idx
primary = branches.get(str(primary_idx)) or build_switch_default_payload(inp, match_index=default_idx)
return {**primary, "branches": branches}
def switch_branch_payload(transit: Any, source_output: int) -> Optional[Dict[str, Any]]:
"""Return the ContextBranch inner dict for a specific switch output port."""
if not isinstance(transit, dict):
return None
data = transit.get("data") if transit.get("_transit") else transit
if not isinstance(data, dict):
return None
branches = data.get("branches")
if isinstance(branches, dict):
branch = branches.get(str(source_output))
if isinstance(branch, dict):
return branch
if transit.get("_transit"):
return data
return data
def unwrap_transit_for_port(output: Any, source_output: Optional[int] = None) -> Any:
"""Unwrap transit; when ``source_output`` is set, pick that switch branch payload."""
if source_output is not None:
branch = switch_branch_payload(output, source_output)
if branch is not None:
return branch
return unwrapTransit(output)

View file

@ -4,7 +4,7 @@ from __future__ import annotations
from typing import Any, Dict, List, Set
from modules.features.graphicalEditor.conditionOperators import resolve_value_kind
from modules.features.graphicalEditor.conditionOperators import catalog_type_to_value_kind, resolve_value_kind
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG, PortSchema, parse_graph_defined_output_schema
from modules.workflows.automation2.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds
@ -169,12 +169,16 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D
)
for entry in paths:
ref = {
"nodeId": entry.get("producerNodeId"),
"path": entry.get("path") or [],
}
graph_with_target = {**graph, "targetNodeId": target_node_id}
entry["valueKind"] = resolve_value_kind(graph_with_target, ref)
ct = str(entry.get("type") or "Any")
vk = catalog_type_to_value_kind(ct)
if vk == "unknown":
ref = {
"nodeId": entry.get("producerNodeId"),
"path": entry.get("path") or [],
}
graph_with_target = {**graph, "targetNodeId": target_node_id}
vk = resolve_value_kind(graph_with_target, ref, _skip_upstream=True)
entry["valueKind"] = vk
return paths

View file

@ -163,10 +163,15 @@ def _is_node_on_active_path(
meta = out.get("_meta", {}) if out.get("_transit") else out
branch = meta.get("branch")
match = meta.get("match")
matches = meta.get("matches")
active_output = None
if branch is not None:
active_output = branch
elif isinstance(matches, list) and matches:
if source_output not in matches:
return False
continue
elif match is not None:
if match < 0:
return False

View file

@ -475,7 +475,7 @@ def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any:
the first ``connectionMap`` entry so ``injectUpstreamPayload`` (e.g.
``context.mergeContext`` after ``flow.loop``) still receives data.
"""
from modules.features.graphicalEditor.portTypes import unwrapTransit
from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port
nodeOutputs = context.get("nodeOutputs") or {}
connectionMap = context.get("connectionMap") or {}
@ -496,25 +496,25 @@ def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any:
if not entry:
return None
src_node_id, _ = entry
src_node_id, src_out = entry
upstream = nodeOutputs.get(src_node_id)
return unwrapTransit(upstream) if isinstance(upstream, dict) else upstream
return unwrap_transit_for_port(upstream, src_out)
def _resolveBranchInputs(nodeId: str, context: Dict[str, Any]) -> Dict[int, Any]:
"""Return ``Dict[port_index → unwrapped upstream output]`` for every wired input port."""
from modules.features.graphicalEditor.portTypes import unwrapTransit
from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port
src_map = (context.get("inputSources") or {}).get(nodeId) or {}
nodeOutputs = context.get("nodeOutputs") or {}
out: Dict[int, Any] = {}
for port_ix, entry in src_map.items():
if not entry:
continue
src_node_id, _ = entry
src_node_id, src_out = entry
upstream = nodeOutputs.get(src_node_id)
if upstream is None:
continue
out[int(port_ix)] = unwrapTransit(upstream) if isinstance(upstream, dict) else upstream
out[int(port_ix)] = unwrap_transit_for_port(upstream, src_out)
return out
@ -554,7 +554,12 @@ class ActionNodeExecutor:
# 1. Resolve parameters (DataRef, SystemVar, Static)
params = dict(node.get("parameters") or {})
logger.debug("ActionNodeExecutor node %s raw params keys=%s", nodeId, list(params.keys()))
resolvedParams = resolveParameterReferences(params, context.get("nodeOutputs", {}))
resolvedParams = resolveParameterReferences(
params,
context.get("nodeOutputs", {}),
consumer_node_id=nodeId,
input_sources=context.get("inputSources"),
)
logger.debug("ActionNodeExecutor node %s resolved params keys=%s documentList_present=%s documentList_type=%s", nodeId, list(resolvedParams.keys()), "documentList" in resolvedParams, type(resolvedParams.get("documentList")).__name__)
# 2. Apply defaults from parameter definitions

View file

@ -132,15 +132,24 @@ class FlowExecutor:
value_kind = "unknown"
ref_for_kind = left_ref if isinstance(left_ref, dict) else cond.get("ref")
if isinstance(ref_for_kind, dict) and ref_for_kind.get("nodeId") and node:
graph_stub = {
"nodes": [{"id": node.get("id"), "type": node.get("type")}],
"targetNodeId": node.get("id"),
}
graph_stub = self._graph_stub_for_ref(node, ref_for_kind, nodeOutputs)
value_kind = resolve_value_kind(graph_stub, ref_for_kind)
return apply_condition_operator(left, str(operator), right, value_kind)
def _compare_dates(self, left: Any, right: Any, op) -> bool:
def _graph_stub_for_ref(self, node: Dict, ref: Dict, nodeOutputs: Dict) -> Dict[str, Any]:
"""Minimal graph for ``resolve_value_kind`` (includes value producer when known)."""
nodes: List[Dict[str, Any]] = [{"id": node.get("id"), "type": node.get("type")}]
producer_id = ref.get("nodeId")
if producer_id:
ctx = nodeOutputs.get("_context") if isinstance(nodeOutputs.get("_context"), dict) else {}
graph_nodes = ctx.get("graphNodesById") if isinstance(ctx.get("graphNodesById"), dict) else {}
pnode = graph_nodes.get(producer_id) if isinstance(graph_nodes, dict) else None
if isinstance(pnode, dict):
nodes.append({"id": producer_id, "type": pnode.get("type", "")})
else:
nodes.append({"id": producer_id, "type": ""})
return {"nodes": nodes, "targetNodeId": node.get("id")}
"""Compare left/right as dates; op(a,b) is the comparison."""
def parse(v):
@ -197,23 +206,42 @@ class FlowExecutor:
return bool(resolved)
async def _switch(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
valueExpr = (node.get("parameters") or {}).get("value", "")
params = node.get("parameters") or {}
valueExpr = params.get("value", "")
from modules.workflows.automation2.graphUtils import resolveParameterReferences
value = resolveParameterReferences(valueExpr, nodeOutputs)
cases = (node.get("parameters") or {}).get("cases", [])
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
for i, c in enumerate(cases):
if self._evalSwitchCase(value, c):
return wrapTransit(
unwrapTransit(inp) if inp else inp,
{"match": i, "value": value},
)
return wrapTransit(
unwrapTransit(inp) if inp else inp,
{"match": -1, "value": value},
from modules.features.graphicalEditor.switchOutput import (
build_switch_combined_output,
build_switch_default_payload,
)
def _evalSwitchCase(self, left: Any, case: Any) -> bool:
value = resolveParameterReferences(valueExpr, nodeOutputs)
cases = params.get("cases", []) or []
value_kind = "unknown"
if isinstance(valueExpr, dict) and valueExpr.get("type") == "ref":
graph_stub = self._graph_stub_for_ref(node, valueExpr, nodeOutputs)
value_kind = resolve_value_kind(graph_stub, valueExpr)
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
matched: List[int] = [
i for i, c in enumerate(cases)
if self._evalSwitchCase(value, c, value_kind=value_kind)
]
default_idx = len(cases) if isinstance(cases, list) else 0
if not matched:
matched = [default_idx]
combined = build_switch_combined_output(
inp, cases, matched_indices=matched, value_kind=value_kind,
)
return wrapTransit(
combined,
{
"match": matched[0],
"matches": matched,
"value": value,
"filterApplied": bool(combined.get("filterApplied")),
},
)
def _evalSwitchCase(self, left: Any, case: Any, *, value_kind: Optional[str] = None) -> bool:
"""
Evaluate a switch case. Case can be:
- dict: {operator, value} - use operator to compare left vs value
@ -225,14 +253,19 @@ class FlowExecutor:
else:
operator = "eq"
right = case
return apply_condition_operator(left, str(operator), right)
return apply_condition_operator(left, str(operator), right, value_kind)
async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
params = node.get("parameters") or {}
itemsPath = params.get("items", "[]")
from modules.workflows.automation2.graphUtils import resolveParameterReferences
raw = resolveParameterReferences(itemsPath, nodeOutputs)
raw = resolveParameterReferences(
itemsPath,
nodeOutputs,
consumer_node_id=nodeId,
input_sources=inputSources,
)
items = self._normalize_loop_items(raw)
mode = (params.get("iterationMode") or "all").strip().lower()
stride = params.get("iterationStride", 2)
@ -245,6 +278,8 @@ class FlowExecutor:
def _normalize_loop_items(self, raw: Any) -> List[Any]:
"""Coerce resolved `items` into a list (lists, dict children, or scalars)."""
if isinstance(raw, dict) and isinstance(raw.get("items"), list):
return self._expand_presentation_lines_loop_items(raw["items"])
if isinstance(raw, list):
return self._expand_presentation_lines_loop_items(raw)
if isinstance(raw, dict):

View file

@ -253,6 +253,8 @@ def _checkPortCompatibility(
continue
srcOutputPorts = srcDef.get("outputPorts", {})
srcPort = srcOutputPorts.get(srcOut, {}) or {}
if srcNode.get("type") == "flow.switch" and not srcPort.get("schema"):
srcPort = srcOutputPorts.get(0, {}) or srcPort
tgtPort = tgtInputPorts.get(tgtIn, {}) or {}
if not isinstance(srcPort, dict):
@ -264,6 +266,9 @@ def _checkPortCompatibility(
continue
if src_schema in accepts:
continue
# ContextBranch is a typed Transit envelope (switch filtered branches).
if src_schema == "ContextBranch" and ("Transit" in accepts or "ContextBranch" in accepts):
continue
# Port that only declares Transit behaves as an untyped sink (legacy graphs).
if len(accepts) == 1 and accepts[0] == "Transit":
continue
@ -409,12 +414,21 @@ def _unwrapTypedRef(value: Any) -> Any:
return value.get(primary, value)
def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
def resolveParameterReferences(
value: Any,
nodeOutputs: Dict[str, Any],
*,
consumer_node_id: Optional[str] = None,
input_sources: Optional[Dict[str, Dict[int, tuple]]] = None,
) -> Any:
"""
Resolve parameter references:
- {{nodeId.output}} or {{nodeId.output.path}} in strings (legacy)
- { "type": "ref", "nodeId": "...", "path": ["field", "nested"] } -> resolved value
- { "type": "value", "value": ... } -> value (then recursively resolve)
When ``consumer_node_id`` and ``input_sources`` are set, refs to the wired
upstream switch use that connection's output port (per-branch payload).
"""
import json
import re
@ -430,8 +444,13 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
path = value.get("path")
if node_id is not None and isinstance(path, (list, tuple)):
data = nodeOutputs.get(node_id)
# Unwrap transit envelopes to access the real data
if isinstance(data, dict) and data.get("_transit"):
wired = None
if consumer_node_id and input_sources:
wired = (input_sources.get(consumer_node_id) or {}).get(0)
if wired and wired[0] == node_id:
from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port
data = unwrap_transit_for_port(data, wired[1])
elif isinstance(data, dict) and data.get("_transit"):
data = data.get("data", data)
plist = list(path)
resolved = _get_by_path(data, plist)
@ -450,16 +469,34 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
# Form nodes store fields under {"payload": {fieldName: …}}.
# DataPicker emits bare field paths like ["url"]; try under payload.
resolved = _get_by_path(data["payload"], plist)
return resolveParameterReferences(resolved, nodeOutputs)
return resolveParameterReferences(
resolved,
nodeOutputs,
consumer_node_id=consumer_node_id,
input_sources=input_sources,
)
return value
if value.get("type") == "value":
inner = value.get("value")
return resolveParameterReferences(inner, nodeOutputs)
return resolveParameterReferences(
inner,
nodeOutputs,
consumer_node_id=consumer_node_id,
input_sources=input_sources,
)
if value.get("type") == "system":
variable = value.get("variable", "")
from modules.features.graphicalEditor.portTypes import resolveSystemVariable
return resolveSystemVariable(variable, nodeOutputs.get("_context", {}))
return {k: resolveParameterReferences(v, nodeOutputs) for k, v in value.items()}
return {
k: resolveParameterReferences(
v,
nodeOutputs,
consumer_node_id=consumer_node_id,
input_sources=input_sources,
)
for k, v in value.items()
}
if isinstance(value, str):
def repl(m):
@ -498,11 +535,27 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
# contextBuilder: list where every item is a `{"type":"ref",...}` envelope.
# Resolve each part; a single ref preserves the resolved type (str, list, dict).
if value and all(isinstance(v, dict) and v.get("type") == "ref" for v in value):
resolved_parts = [resolveParameterReferences(v, nodeOutputs) for v in value]
resolved_parts = [
resolveParameterReferences(
v,
nodeOutputs,
consumer_node_id=consumer_node_id,
input_sources=input_sources,
)
for v in value
]
if len(resolved_parts) == 1:
return resolved_parts[0]
return resolved_parts
return [resolveParameterReferences(v, nodeOutputs) for v in value]
return [
resolveParameterReferences(
v,
nodeOutputs,
consumer_node_id=consumer_node_id,
input_sources=input_sources,
)
for v in value
]
return value

View file

@ -934,6 +934,52 @@ def _presentation_image_marker_in_data(part: Dict[str, Any]) -> Dict[str, Any]:
return marker
_BLOB_IMAGE_CHUNK_RE = re.compile(r"^\[image(?:\:([^\]]+))?\]$")
def parse_blob_data_segments(data: str) -> List[Dict[str, Any]]:
"""Split presentation ``blob`` ``data`` into virtual slots (text chunks + image markers)."""
segments: List[Dict[str, Any]] = []
if not isinstance(data, str) or not data.strip():
return segments
for idx, chunk in enumerate(data.split("\n\n")):
piece = chunk.strip()
if not piece:
continue
m = _BLOB_IMAGE_CHUNK_RE.fullmatch(piece)
if m:
token = (m.group(1) or "").strip()
seg: Dict[str, Any] = {"typeGroup": "image", "mimeType": "image/*", "data": piece}
if token:
seg["id"] = token
else:
seg["id"] = f"blob_image_{idx}"
segments.append(seg)
else:
segments.append({"typeGroup": "text", "mimeType": "text/plain", "data": piece, "id": f"blob_text_{idx}"})
return segments
def filter_blob_bucket_by_content_type(bucket: Dict[str, Any], content_type: str) -> Dict[str, Any]:
"""Keep only blob segments matching ``content_type`` (re-join as ``\\n\\n`` string)."""
out = copy.deepcopy(bucket)
raw = out.get("data")
if not isinstance(raw, str):
return out
target = (content_type or "").strip().lower()
kept: List[str] = []
for seg in parse_blob_data_segments(raw):
tg = (seg.get("typeGroup") or "").strip().lower()
if target == "media" and tg in ("image", "media", "video", "audio"):
kept.append(str(seg.get("data") or ""))
elif target == "text" and tg in ("text", "table", "structure"):
kept.append(str(seg.get("data") or ""))
elif tg == target:
kept.append(str(seg.get("data") or ""))
out["data"] = "\n\n".join(s for s in kept if s.strip())
return out
def _build_file_presentation(
source_file_name: str,
parts: List[Dict[str, Any]],
@ -959,8 +1005,8 @@ def _build_file_presentation(
tg = (p.get("typeGroup") or "").strip()
if tg == "image":
m = _presentation_image_marker_in_data(p)
pid = str(m.get("partId") or "").strip()
chunks_blob.append(f"[image:{pid}]" if pid else "[image]")
token = str(m.get("embeddedImageFileId") or m.get("partId") or "").strip()
chunks_blob.append(f"[image:{token}]" if token else "[image]")
continue
if _part_carries_plain_text(p):
raw = p.get("data")
@ -1433,6 +1479,20 @@ def normalize_presentation_envelopes(raw: Any) -> List[Dict[str, Any]]:
file_key=str(raw.get("name") or "file_1"),
)
]
if isinstance(raw.get("name"), str) and isinstance(raw.get("value"), dict):
slot = raw["value"]
if _is_presentation_line_slot(slot):
bucket = {
"outputMode": slot.get("outputMode") or "lines",
"sourceFileName": "",
"data": [slot],
}
return [
presentation_envelope_from_file_bucket(
bucket,
file_key=str(raw.get("name") or "file_1"),
)
]
if _is_presentation_file_bucket(raw):
return [presentation_envelope_from_file_bucket(raw)]
if _is_presentation_line_slot(raw):
@ -1450,6 +1510,27 @@ def normalize_presentation_envelopes(raw: Any) -> List[Dict[str, Any]]:
return []
def _artifacts_by_part_id_from_meta(meta: Any) -> Dict[str, str]:
out: Dict[str, str] = {}
if not isinstance(meta, dict):
return out
for art in meta.get("persistedImageArtifacts") or []:
if not isinstance(art, dict):
continue
sp = str(art.get("sourcePartId") or "").strip()
fid = str(art.get("fileId") or "").strip()
if sp and fid:
out[sp] = fid
return out
def _collect_artifacts_by_part_id(envelopes: List[Dict[str, Any]]) -> Dict[str, str]:
merged: Dict[str, str] = {}
for envelope in envelopes:
merged.update(_artifacts_by_part_id_from_meta(envelope.get("_meta")))
return merged
def presentation_envelopes_to_document_json(
raw: Any,
*,
@ -1466,6 +1547,8 @@ def presentation_envelopes_to_document_json(
"context must be presentation data from Inhalt extrahieren (kind=context.extractContent.presentation.v1)"
)
artifacts_by_part = _collect_artifacts_by_part_id(envelopes)
sections: List[Dict[str, Any]] = []
order = 0
@ -1496,8 +1579,35 @@ def presentation_envelopes_to_document_json(
"elements": [{"content": {"inlineRuns": _parseInlineRuns(t)}}],
})
def _append_image_slot(slot: Dict[str, Any]) -> None:
def _resolve_image_file_id(slot: Dict[str, Any]) -> Optional[str]:
fid = slot.get("embeddedImageFileId")
if fid:
return str(fid).strip() or None
candidates: List[str] = []
sid = str(slot.get("id") or "").strip()
if sid:
candidates.append(sid)
raw_d = slot.get("data")
if isinstance(raw_d, str):
m = _BLOB_IMAGE_CHUNK_RE.fullmatch(raw_d.strip())
if m:
tok = (m.group(1) or "").strip()
if tok:
candidates.append(tok)
for cand in candidates:
if cand in artifacts_by_part:
return artifacts_by_part[cand]
# Marker may already carry the persisted storage file id.
try:
blob = _load_image_bytes_by_file_id(services, cand)
if blob:
return cand
except Exception:
pass
return None
def _append_image_slot(slot: Dict[str, Any]) -> None:
fid = _resolve_image_file_id(slot)
if not fid:
raise ValueError(
"image slot is missing embeddedImageFileId — "
@ -1589,6 +1699,11 @@ def presentation_envelopes_to_document_json(
if src:
_append_heading(src)
raw_data = bucket.get("data")
mode = str(bucket.get("outputMode") or "").strip().lower()
if isinstance(raw_data, str) and mode == "blob":
for seg in parse_blob_data_segments(raw_data):
_append_slot(seg)
return
if isinstance(raw_data, str):
_append_paragraph(raw_data)
return

View file

@ -50,6 +50,25 @@ def test_parse_graph_defined_schema_nested_group():
assert "addr.zip" in names
def test_compute_upstream_paths_switch_context_branch_items():
graph = {
"nodes": [
{"id": "ext1", "type": "context.extractContent", "parameters": {}},
{"id": "sw1", "type": "flow.switch", "parameters": {"cases": [{"operator": "contains_content", "value": "image"}]}},
{"id": "ai1", "type": "ai.prompt", "parameters": {"aiPrompt": "summarize"}},
],
"connections": [
{"source": "ext1", "target": "sw1", "sourceOutput": 0, "targetInput": 0},
{"source": "sw1", "target": "ai1", "sourceOutput": 0, "targetInput": 0},
],
}
paths = compute_upstream_paths(graph, "ai1")
sw_paths = [p for p in paths if p.get("producerNodeId") == "sw1"]
items_paths = [p for p in sw_paths if p.get("path") == ["items"]]
assert items_paths, sw_paths
assert items_paths[0].get("type") == "List[Any]"
def test_validate_graph_port_mismatch_errors():
node_type_ids = {n["id"] for n in STATIC_NODE_TYPES}
graph = {

View file

@ -61,6 +61,34 @@ def test_context_contains_content(executor):
assert executor._evalStructuredCondition(cond, {"n1": presentation}, item_param={"type": "ref", "nodeId": "n1", "path": []}, node={"id": "if1", "type": "flow.ifElse"}) is True
def test_context_contains_content_blob_mode(executor):
presentation = {
"kind": PRESENTATION_KIND,
"outputMode": "blob",
"fileOrder": ["f1"],
"files": {
"f1": {
"outputMode": "blob",
"data": "Invoice text\n\n[image:abc123]",
}
},
}
img_cond = {"type": "condition", "operator": "contains_content", "value": "image"}
txt_cond = {"type": "condition", "operator": "contains_content", "value": "text"}
item = {"type": "ref", "nodeId": "n1", "path": []}
node = {"id": "if1", "type": "flow.ifElse"}
assert executor._evalStructuredCondition(img_cond, {"n1": presentation}, item_param=item, node=node) is True
assert executor._evalStructuredCondition(txt_cond, {"n1": presentation}, item_param=item, node=node) is True
def test_switch_uses_shared_operators(executor):
assert executor._evalSwitchCase("abc", {"operator": "starts_with", "value": "ab"}) is True
assert executor._evalSwitchCase([1, 2], {"operator": "length_eq", "value": 2}) is True
def test_switch_resolves_value_kind_for_string_ops(executor):
assert executor._evalSwitchCase(
"hello",
{"operator": "starts_with", "value": "he"},
value_kind="string",
) is True

View file

@ -0,0 +1,359 @@
# Copyright (c) 2025 Patrick Motsch
"""flow.switch ContextBranch: filtered presentation + loop-ready items."""
import pytest
from modules.features.graphicalEditor.portTypes import unwrapTransit, wrapTransit
from modules.features.graphicalEditor.switchOutput import (
build_switch_branch_payload,
build_switch_combined_output,
build_switch_default_payload,
unwrap_transit_for_port,
)
from modules.workflows.automation2.executionEngine import _is_node_on_active_path
from modules.workflows.automation2.executors.flowExecutor import FlowExecutor
from modules.workflows.automation2.graphUtils import resolveParameterReferences
from modules.workflows.methods.methodContext.actions.extractContent import PRESENTATION_KIND
def _presentation_with_text_and_image():
return {
"kind": PRESENTATION_KIND,
"schemaVersion": "1",
"outputMode": "parts",
"fileOrder": ["doc"],
"files": {
"doc": {
"outputMode": "parts",
"data": [
{"typeGroup": "text", "id": "t1", "data": "Hello"},
{"typeGroup": "image", "id": "i1", "mimeType": "image/png", "data": "YQ=="},
],
}
},
}
def _presentation_blob_with_text_and_image():
blob_data = "Hello world\n\n[image:img1]\n\nMore text"
return {
"kind": PRESENTATION_KIND,
"schemaVersion": "1",
"outputMode": "blob",
"fileOrder": ["doc"],
"files": {
"doc": {
"outputMode": "blob",
"sourceFileName": "test.pdf",
"data": blob_data,
}
},
}
def test_build_switch_branch_payload_filters_blob_image():
pres = _presentation_blob_with_text_and_image()
payload = build_switch_branch_payload(
pres,
{"operator": "contains_content", "value": "image"},
value_kind="context",
match_index=0,
)
assert payload["filterApplied"] is True
assert len(payload["items"]) == 1
assert payload["items"][0]["value"]["typeGroup"] == "image"
assert "[image:img1]" in payload["data"]["files"]["doc"]["data"]
def test_build_switch_branch_payload_filters_blob_text():
pres = _presentation_blob_with_text_and_image()
payload = build_switch_branch_payload(
pres,
{"operator": "contains_content", "value": "text"},
value_kind="context",
match_index=1,
)
assert payload["filterApplied"] is True
assert len(payload["items"]) == 2
assert all(i["value"]["typeGroup"] == "text" for i in payload["items"])
filtered = payload["data"]["files"]["doc"]["data"]
assert "Hello world" in filtered
assert "[image:" not in filtered
@pytest.mark.asyncio
async def test_switch_blob_multi_match():
executor = FlowExecutor()
pres = _presentation_blob_with_text_and_image()
sw_id = "sw1"
node_outputs = {
"ext1": pres,
"_context": {
"graphNodesById": {
"ext1": {"id": "ext1", "type": "context.extractContent"},
sw_id: {"id": sw_id, "type": "flow.switch"},
}
},
}
executor._getInputData = lambda *_a, **_k: pres # type: ignore[method-assign]
node = {
"id": sw_id,
"type": "flow.switch",
"parameters": {
"value": {"type": "ref", "nodeId": "ext1", "path": []},
"cases": [
{"operator": "contains_content", "value": "image"},
{"operator": "contains_content", "value": "text"},
],
},
}
out = await executor._switch(node, node_outputs, sw_id, {})
assert out["_meta"]["matches"] == [0, 1]
assert len(unwrap_transit_for_port(out, 0)["items"]) == 1
assert len(unwrap_transit_for_port(out, 1)["items"]) == 2
def test_switch_blob_image_items_get_embedded_file_id():
part_id = "dbd27119-cd21-4a62-b5e2-b06d3b81470b"
file_id = "storage-file-uuid-1"
pres = {
"kind": PRESENTATION_KIND,
"outputMode": "blob",
"fileOrder": ["doc"],
"files": {
"doc": {
"outputMode": "blob",
"data": f"Hello\n\n[image:{part_id}]",
}
},
"_meta": {
"persistedImageArtifacts": [
{"sourcePartId": part_id, "fileId": file_id, "mimeType": "image/png"},
]
},
}
payload = build_switch_branch_payload(
pres,
{"operator": "contains_content", "value": "image"},
value_kind="context",
match_index=0,
)
assert len(payload["items"]) == 1
slot = payload["items"][0]["value"]
assert slot.get("embeddedImageFileId") == file_id
def test_build_switch_branch_payload_filters_images():
pres = _presentation_with_text_and_image()
case = {"operator": "contains_content", "value": "image"}
payload = build_switch_branch_payload(
pres,
case,
value_kind="context",
match_index=0,
)
assert payload["filterApplied"] is True
assert payload["contentType"] == "image"
assert len(payload["items"]) == 1
assert payload["items"][0]["value"]["typeGroup"] == "image"
data = payload["data"]
assert data["kind"] == PRESENTATION_KIND
slots = data["files"]["doc"]["data"]
assert len(slots) == 1
assert slots[0]["typeGroup"] == "image"
def test_build_switch_default_payload_passthrough():
pres = _presentation_with_text_and_image()
payload = build_switch_default_payload(pres, match_index=2)
assert payload["filterApplied"] is False
assert payload["match"] == 2
assert payload["data"]["fileOrder"] == pres["fileOrder"]
assert len(payload["items"]) == 2
@pytest.mark.asyncio
async def test_switch_executor_match_and_default_branch():
executor = FlowExecutor()
pres = _presentation_with_text_and_image()
ext_id = "ext1"
sw_id = "sw1"
node_outputs = {
ext_id: pres,
"_context": {
"graphNodesById": {
ext_id: {"id": ext_id, "type": "context.extractContent"},
sw_id: {"id": sw_id, "type": "flow.switch"},
}
},
}
def _inp(_nid, _sources, _outputs, _output_index=0):
return pres
executor._getInputData = _inp # type: ignore[method-assign]
match_node = {
"id": sw_id,
"type": "flow.switch",
"parameters": {
"value": {"type": "ref", "nodeId": ext_id, "path": []},
"cases": [{"operator": "contains_content", "value": "image"}],
},
}
match_out = await executor._switch(match_node, node_outputs, sw_id, {})
match_payload = unwrapTransit(match_out)
assert match_out["_meta"]["match"] == 0
assert match_out["_meta"]["matches"] == [0]
assert match_payload["filterApplied"] is True
assert len(match_payload["items"]) == 1
assert match_payload["branches"]["0"]["contentType"] == "image"
default_node = {
**match_node,
"parameters": {
**match_node["parameters"],
"cases": [{"operator": "contains_content", "value": "video"}],
},
}
default_out = await executor._switch(default_node, node_outputs, sw_id, {})
assert default_out["_meta"]["match"] == 1
assert default_out["_meta"]["matches"] == [1]
default_payload = unwrapTransit(default_out)
assert default_payload["filterApplied"] is False
assert default_payload["data"]["fileOrder"] == pres["fileOrder"]
@pytest.mark.asyncio
async def test_switch_multi_match_text_and_image_branches():
executor = FlowExecutor()
pres = _presentation_with_text_and_image()
sw_id = "sw1"
node_outputs = {
"ext1": pres,
"_context": {
"graphNodesById": {
"ext1": {"id": "ext1", "type": "context.extractContent"},
sw_id: {"id": sw_id, "type": "flow.switch"},
}
},
}
executor._getInputData = lambda *_a, **_k: pres # type: ignore[method-assign]
node = {
"id": sw_id,
"type": "flow.switch",
"parameters": {
"value": {"type": "ref", "nodeId": "ext1", "path": []},
"cases": [
{"operator": "contains_content", "value": "image"},
{"operator": "contains_content", "value": "text"},
],
},
}
out = await executor._switch(node, node_outputs, sw_id, {})
assert out["_meta"]["matches"] == [0, 1]
img = unwrap_transit_for_port(out, 0)
txt = unwrap_transit_for_port(out, 1)
assert img["contentType"] == "image"
assert txt["contentType"] == "text"
assert len(img["items"]) == 1
assert len(txt["items"]) == 1
assert img["items"][0]["value"]["typeGroup"] == "image"
assert txt["items"][0]["value"]["typeGroup"] == "text"
def test_active_path_allows_all_matching_switch_ports():
combined = build_switch_combined_output(
_presentation_with_text_and_image(),
[
{"operator": "contains_content", "value": "image"},
{"operator": "contains_content", "value": "text"},
],
matched_indices=[0, 1],
value_kind="context",
)
sw_out = wrapTransit(combined, {"match": 0, "matches": [0, 1]})
node_outputs = {"sw1": sw_out}
conn_map = {
"loop_img": [("sw1", 0, 0)],
"file_txt": [("sw1", 1, 0)],
}
assert _is_node_on_active_path("loop_img", conn_map, node_outputs)
assert _is_node_on_active_path("file_txt", conn_map, node_outputs)
assert not _is_node_on_active_path("other", {"other": [("sw1", 2, 0)]}, node_outputs)
@pytest.mark.asyncio
async def test_loop_uses_switch_items_ref():
executor = FlowExecutor()
pres = _presentation_with_text_and_image()
branch = build_switch_branch_payload(
pres,
{"operator": "contains_content", "value": "image"},
value_kind="context",
match_index=0,
)
sw_id = "sw1"
node_outputs = {sw_id: wrapTransit(branch, {"match": 0})}
loop_node = {
"id": "loop1",
"type": "flow.loop",
"parameters": {
"items": {"type": "ref", "nodeId": sw_id, "path": ["items"]},
},
}
out = await executor._loop(loop_node, node_outputs, "loop1", {})
assert out["count"] == 1
assert out["items"][0]["value"]["typeGroup"] == "image"
def test_resolve_context_builder_ref_uses_switch_output_port():
"""file.create context ref to switch.items must use the wired source output port."""
pres = _presentation_with_text_and_image()
combined = build_switch_combined_output(
pres,
[
{"operator": "contains_content", "value": "image"},
{"operator": "contains_content", "value": "text"},
],
matched_indices=[0, 1],
value_kind="context",
)
sw_id = "sw1"
consumer_id = "fc1"
node_outputs = {sw_id: wrapTransit(combined, {"match": 0, "matches": [0, 1]})}
input_sources = {consumer_id: {0: (sw_id, 1)}}
resolved = resolveParameterReferences(
{
"context": [
{
"type": "ref",
"nodeId": sw_id,
"path": ["items"],
}
],
},
node_outputs,
consumer_node_id=consumer_id,
input_sources=input_sources,
)
items = resolved["context"]
assert isinstance(items, list)
assert len(items) == 1
assert items[0]["value"]["typeGroup"] == "text"
branch = build_switch_branch_payload(
_presentation_with_text_and_image(),
{"operator": "contains_content", "value": "image"},
value_kind="context",
match_index=0,
)
node_outputs = {"sw1": wrapTransit(branch, {"match": 0})}
resolved = resolveParameterReferences(
{"type": "ref", "nodeId": "sw1", "path": ["items"]},
node_outputs,
)
assert isinstance(resolved, list)
assert len(resolved) == 1
assert resolved[0]["value"]["typeGroup"] == "image"

View file

@ -37,6 +37,34 @@ class TestValidateGraphStartNode:
assert not any("no start node" in e.lower() for e in errs)
def test_switch_second_output_to_ai_prompt_ok(self):
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
node_type_ids = {n["id"] for n in STATIC_NODE_TYPES}
graph = {
"nodes": [
{"id": "t", "type": "trigger.manual", "parameters": {}},
{
"id": "sw",
"type": "flow.switch",
"parameters": {
"cases": [
{"operator": "contains_content", "value": "image"},
{"operator": "contains_content", "value": "text"},
],
},
},
{"id": "ai", "type": "ai.prompt", "parameters": {"aiPrompt": "hi"}},
],
"connections": [
{"source": "sw", "target": "ai", "sourceOutput": 1, "targetInput": 0},
],
}
errs = validateGraph(graph, node_type_ids)
port_errs = [e for e in errs if "Port mismatch" in e]
assert port_errs == [], port_errs
class TestResolveParameterReferences:
"""Test structured ref/value resolution."""