diff --git a/modules/features/graphicalEditor/conditionOperators.py b/modules/features/graphicalEditor/conditionOperators.py index 3feb4775..b375e407 100644 --- a/modules/features/graphicalEditor/conditionOperators.py +++ b/modules/features/graphicalEditor/conditionOperators.py @@ -257,7 +257,7 @@ def _path_suggests_file(path: List[Any], producer_type: str) -> bool: return False -def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any]) -> str: +def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any], *, _skip_upstream: bool = False) -> str: """Resolve condition valueKind for a DataRef against the workflow graph.""" if not isinstance(ref, dict): return "unknown" @@ -281,31 +281,32 @@ def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any]) -> str: return "string" return "file" - from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths + if not _skip_upstream: + from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths - target_id = graph.get("targetNodeId") or producer_id - matched_type: Optional[str] = None - for entry in compute_upstream_paths(graph, target_id): - if entry.get("producerNodeId") != producer_id: - continue - entry_path = entry.get("path") or [] - if _paths_equal(list(entry_path), list(path)): - matched_type = str(entry.get("type") or "Any") - break - - if matched_type is None and path: - parent_path = list(path[:-1]) + target_id = graph.get("targetNodeId") or producer_id + matched_type: Optional[str] = None for entry in compute_upstream_paths(graph, target_id): if entry.get("producerNodeId") != producer_id: continue - if _paths_equal(list(entry.get("path") or []), parent_path): + entry_path = entry.get("path") or [] + if _paths_equal(list(entry_path), list(path)): matched_type = str(entry.get("type") or "Any") break - if matched_type: - vk = catalog_type_to_value_kind(matched_type) - if vk != "unknown": - return vk + if matched_type is None and path: + parent_path = list(path[:-1]) + for entry in compute_upstream_paths(graph, target_id): + if entry.get("producerNodeId") != producer_id: + continue + if _paths_equal(list(entry.get("path") or []), parent_path): + matched_type = str(entry.get("type") or "Any") + break + + if matched_type: + vk = catalog_type_to_value_kind(matched_type) + if vk != "unknown": + return vk if producer_type in ("trigger.form", "input.form") and path and str(path[0]) == "payload": return "string" @@ -414,6 +415,12 @@ def _iter_presentation_parts(envelope: Dict[str, Any]) -> List[Dict[str, Any]]: if not isinstance(bucket, dict): continue data = bucket.get("data") + mode = str(bucket.get("outputMode") or "").strip().lower() + if mode == "blob" and isinstance(data, str): + from modules.workflows.methods.methodContext.actions.extractContent import parse_blob_data_segments + + parts.extend(parse_blob_data_segments(data)) + continue if isinstance(data, list): for slot in data: if isinstance(slot, dict): diff --git a/modules/features/graphicalEditor/nodeDefinitions/flow.py b/modules/features/graphicalEditor/nodeDefinitions/flow.py index 0dc6020e..f1efa0ec 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/flow.py +++ b/modules/features/graphicalEditor/nodeDefinitions/flow.py @@ -123,9 +123,27 @@ CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS = [ }, ] +_CONTEXT_BRANCH_DATA_PICK_OPTIONS = [ + { + "path": ["items"], + "pickerLabel": t("Gefilterte Elemente"), + "detail": t("Empfohlen für Schleifen: je Eintrag ein Durchlauf (z. B. Bild-Slots)."), + "recommended": True, + "type": "List[Any]", + }, + { + "path": ["data"], + "pickerLabel": t("Kontext (data)"), + "detail": t("Gefilterter Presentation-Umschlag oder unveränderter Eingang auf dem Sonst-Zweig."), + "recommended": False, + "type": "Dict", + }, +] + # Ports, die typische Schritt-Ausgaben durchreichen (nicht nur leerer Transit). _FLOW_INPUT_SCHEMAS = [ "Transit", + "ContextBranch", "FormPayload", "AiResult", "TextResult", @@ -183,8 +201,10 @@ FLOW_NODES = [ "category": "flow", "label": t("Switch"), "description": t( - "Mehrere Zweige nach einem Wert aus einem vorherigen Schritt (Data Picker). " - "Definiere Fälle mit Vergleichsoperator; der Eingang wird an den ersten passenden Zweig durchgereicht." + "Mehrere Zweige nach einem Wert aus einem vorherigen Schritt. " + "Jeder Fall hat einen eigenen Ausgang mit passend gefiltertem Inhalt in ``items``; " + "mehrere Kontext-Filter können gleichzeitig zutreffen (z. B. Text und Bilder). " + "Der letzte Ausgang (Sonst) reicht den unveränderten Eingang durch." ), "parameters": [ { @@ -199,13 +219,22 @@ FLOW_NODES = [ "type": "array", "required": False, "frontendType": "caseList", - "description": t("Fälle: Operator und Vergleichswert"), + "frontendOptions": { + "dependsOn": "value", + "operatorCatalog": "condition", + }, + "description": t("Fälle: Operator und Vergleichswert (abhängig vom gewählten Wert)"), }, ], "inputs": 1, "outputs": 1, "inputPorts": {0: {"accepts": list(_FLOW_INPUT_SCHEMAS)}}, - "outputPorts": {0: {"schema": "Transit"}}, + "outputPorts": { + 0: { + "schema": "ContextBranch", + "dataPickOptions": _CONTEXT_BRANCH_DATA_PICK_OPTIONS, + }, + }, "executor": "flow", "meta": {"icon": "mdi-swap-horizontal", "color": "#FF9800", "usesAi": False}, }, @@ -265,7 +294,7 @@ FLOW_NODES = [ "outputLabels": [t("Schleife"), t("Fertig")], "inputPorts": { 0: {"accepts": [ - "Transit", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList", + "Transit", "ContextBranch", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList", "ActionResult", "AiResult", "QueryResult", "FormPayload", "LoopItem", ]}, }, diff --git a/modules/features/graphicalEditor/portTypes.py b/modules/features/graphicalEditor/portTypes.py index e5633514..88cc795b 100644 --- a/modules/features/graphicalEditor/portTypes.py +++ b/modules/features/graphicalEditor/portTypes.py @@ -298,6 +298,21 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = { PortField(name="merged", type="Dict", description="Zusammengeführte Daten"), ]), + "ContextBranch": PortSchema(name="ContextBranch", fields=[ + PortField(name="items", type="List[Any]", + description="Schleifen-fertige Elemente aus dem (gefilterten) Kontext", + recommended=True, + picker_label=t("Gefilterte Elemente")), + PortField(name="data", type="Dict", required=False, + description="Gefilterter Presentation-Umschlag oder Eingabe-Spiegel", + picker_label=t("Kontext (data)")), + PortField(name="filterApplied", type="bool", required=False, + description="True wenn ein Kontext-Inhaltsfilter angewendet wurde"), + PortField(name="contentType", type="str", required=False, + description="Angewendeter Inhaltstyp-Filter (z. B. image)"), + PortField(name="match", type="int", required=False, + description="Aktiver Ausgangs-Index (Fall oder Sonst)"), + ]), "ActionDocument": PortSchema(name="ActionDocument", fields=[ PortField(name="documentName", type="str", description="Dokumentname", diff --git a/modules/features/graphicalEditor/switchOutput.py b/modules/features/graphicalEditor/switchOutput.py new file mode 100644 index 00000000..be469ead --- /dev/null +++ b/modules/features/graphicalEditor/switchOutput.py @@ -0,0 +1,308 @@ +# Copyright (c) 2025 Patrick Motsch +"""Build flow.switch branch payloads: filtered context + loop-ready items.""" + +from __future__ import annotations + +import copy +import re +from typing import Any, Dict, List, Optional + +from modules.features.graphicalEditor.portTypes import unwrapTransit + +_CONTEXT_FILTER_OPERATORS = frozenset({"contains_content"}) +_BLOB_IMAGE_CHUNK_RE = re.compile(r"^\[image(?:\:([^\]]+))?\]$") + + +def _artifacts_by_part_id_from_presentation(inp: Any) -> Dict[str, str]: + plain = _unwrap_input(inp) + meta = plain.get("_meta") if isinstance(plain, dict) else None + if not isinstance(meta, dict): + return {} + out: Dict[str, str] = {} + for art in meta.get("persistedImageArtifacts") or []: + if not isinstance(art, dict): + continue + sp = str(art.get("sourcePartId") or "").strip() + fid = str(art.get("fileId") or "").strip() + if sp and fid: + out[sp] = fid + return out + + +def _enrich_image_slot(slot: Dict[str, Any], artifacts_by_part: Dict[str, str]) -> None: + if (slot.get("typeGroup") or "").strip().lower() != "image": + return + existing = str(slot.get("embeddedImageFileId") or "").strip() + if existing and existing in artifacts_by_part.values(): + return + candidates: List[str] = [] + sid = str(slot.get("id") or "").strip() + if sid: + candidates.append(sid) + data = slot.get("data") + if isinstance(data, str): + m = _BLOB_IMAGE_CHUNK_RE.fullmatch(data.strip()) + if m: + tok = (m.group(1) or "").strip() + if tok: + candidates.append(tok) + for cand in candidates: + fid = artifacts_by_part.get(cand) + if fid: + slot["embeddedImageFileId"] = fid + return + + +def _slot_matches_content_type(slot: Dict[str, Any], content_type: str) -> bool: + target = (content_type or "").strip().lower() + if not target: + return False + tg = (slot.get("typeGroup") or slot.get("contentType") or "").strip().lower() + if target == "media": + return tg in ("image", "media", "video", "audio") + if target == "text": + return tg in ("text", "table", "structure") + return tg == target + + +def _filter_bucket_slots(bucket: Dict[str, Any], content_type: str) -> Dict[str, Any]: + """Return a copy of a presentation file bucket with filtered ``data`` slots.""" + mode = str(bucket.get("outputMode") or "").strip().lower() + data = bucket.get("data") + if mode == "blob" and isinstance(data, str): + from modules.workflows.methods.methodContext.actions.extractContent import ( + filter_blob_bucket_by_content_type, + ) + + return filter_blob_bucket_by_content_type(bucket, content_type) + out = copy.deepcopy(bucket) + if isinstance(data, list): + out["data"] = [s for s in data if isinstance(s, dict) and _slot_matches_content_type(s, content_type)] + elif isinstance(data, dict) and _slot_matches_content_type(data, content_type): + out["data"] = data + else: + out["data"] = [] if isinstance(data, list) else data + return out + + +def _filter_presentation_envelope(envelope: Dict[str, Any], content_type: str) -> Dict[str, Any]: + """Filter all slots in a presentation envelope by content type group.""" + from modules.workflows.methods.methodContext.actions.extractContent import ( + PRESENTATION_KIND, + PRESENTATION_SCHEMA_VERSION, + ) + + out = copy.deepcopy(envelope) + files = out.get("files") or {} + if not isinstance(files, dict): + return out + filtered_files: Dict[str, Any] = {} + kept_order: List[str] = [] + for fk in out.get("fileOrder") or list(files.keys()): + bucket = files.get(fk) + if not isinstance(bucket, dict): + continue + fb = _filter_bucket_slots(bucket, content_type) + data = fb.get("data") + has_data = ( + (isinstance(data, list) and len(data) > 0) + or (isinstance(data, dict)) + or (isinstance(data, str) and str(data).strip()) + ) + if has_data: + filtered_files[str(fk)] = fb + kept_order.append(str(fk)) + out["schemaVersion"] = out.get("schemaVersion") or PRESENTATION_SCHEMA_VERSION + out["kind"] = out.get("kind") or PRESENTATION_KIND + out["fileOrder"] = kept_order + out["files"] = filtered_files + return out + + +def _slots_from_bucket(bucket: Dict[str, Any]) -> List[Any]: + data = bucket.get("data") + mode = str(bucket.get("outputMode") or "").strip().lower() + if mode == "blob" and isinstance(data, str) and data.strip(): + from modules.workflows.methods.methodContext.actions.extractContent import parse_blob_data_segments + + return parse_blob_data_segments(data) + if isinstance(data, list): + return [s for s in data if isinstance(s, dict)] + if isinstance(data, dict): + return [data] + if isinstance(data, str) and data.strip(): + return [{"typeGroup": "text", "data": data}] + items = bucket.get("items") + if isinstance(items, list): + return [i for i in items if isinstance(i, dict)] + return [] + + +def _items_from_presentation_envelope( + envelope: Dict[str, Any], + *, + artifacts_by_part: Optional[Dict[str, str]] = None, +) -> List[Any]: + items: List[Any] = [] + files = envelope.get("files") or {} + if not isinstance(files, dict): + return items + for fk in envelope.get("fileOrder") or list(files.keys()): + bucket = files.get(fk) + if isinstance(bucket, dict): + for slot in _slots_from_bucket(bucket): + if artifacts_by_part: + _enrich_image_slot(slot, artifacts_by_part) + sid = str(slot.get("id") or slot.get("label") or len(items)) + items.append({"name": f"{fk}:{sid}", "value": slot}) + return items + + +def expand_items_from_input(raw: Any) -> List[Any]: + """Best-effort loop items from transit/presentation/list/dict input.""" + if raw is None: + return [] + if isinstance(raw, dict) and isinstance(raw.get("items"), list): + return list(raw["items"]) + plain = unwrapTransit(raw) if isinstance(raw, dict) and raw.get("_transit") else raw + if isinstance(plain, dict) and isinstance(plain.get("items"), list): + return list(plain["items"]) + from modules.workflows.methods.methodContext.actions.extractContent import ( + normalize_presentation_envelopes, + ) + + envelopes = normalize_presentation_envelopes(plain) + if envelopes: + out: List[Any] = [] + for env in envelopes: + out.extend(_items_from_presentation_envelope(env)) + return out + if isinstance(plain, list): + return list(plain) + if isinstance(plain, dict): + children = plain.get("children") + if isinstance(children, list) and children: + return list(children) + return [{"name": k, "value": v} for k, v in plain.items()] + return [plain] + + +def _unwrap_input(inp: Any) -> Any: + if isinstance(inp, dict) and inp.get("_transit"): + return unwrapTransit(inp) + return inp + + +def build_switch_branch_payload( + inp: Any, + case: Dict[str, Any], + *, + value_kind: str = "unknown", + match_index: int = 0, +) -> Dict[str, Any]: + """Payload for a matched switch case (ContextBranch inner data).""" + operator = str(case.get("operator") or "eq") + right = case.get("value") + plain_in = _unwrap_input(inp) + + if operator in _CONTEXT_FILTER_OPERATORS and value_kind == "context": + content_type = str(right or "") + from modules.workflows.methods.methodContext.actions.extractContent import ( + normalize_presentation_envelopes, + ) + + source = plain_in + if isinstance(source, dict) and "data" in source and not source.get("kind"): + nested = source.get("data") + if isinstance(nested, dict): + source = nested + envelopes = normalize_presentation_envelopes(source) + if not envelopes and isinstance(plain_in, dict): + envelopes = normalize_presentation_envelopes(plain_in) + filtered_envs = [_filter_presentation_envelope(env, content_type) for env in envelopes] + artifacts_by_part = _artifacts_by_part_id_from_presentation(plain_in) + items: List[Any] = [] + for env in filtered_envs: + items.extend(_items_from_presentation_envelope(env, artifacts_by_part=artifacts_by_part)) + if len(filtered_envs) == 1: + data_out: Any = filtered_envs[0] + elif filtered_envs: + data_out = {"envelopes": filtered_envs} + else: + data_out = {} + return { + "data": data_out, + "items": items, + "filterApplied": True, + "contentType": content_type, + "match": match_index, + } + + data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in} + return { + "data": data_out, + "items": expand_items_from_input(inp), + "filterApplied": False, + "match": match_index, + } + + +def build_switch_default_payload(inp: Any, *, match_index: int) -> Dict[str, Any]: + """Sonst branch: unmodified input passthrough.""" + plain_in = _unwrap_input(inp) + data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in} + return { + "data": data_out, + "items": expand_items_from_input(inp), + "filterApplied": False, + "match": match_index, + } + + +def build_switch_combined_output( + inp: Any, + cases: List[Any], + *, + matched_indices: List[int], + value_kind: str = "unknown", +) -> Dict[str, Any]: + """Build per-port branch payloads; primary fields mirror the first active match.""" + branches: Dict[str, Dict[str, Any]] = {} + default_idx = len(cases) + for idx in matched_indices: + if idx == default_idx: + branches[str(idx)] = build_switch_default_payload(inp, match_index=default_idx) + elif 0 <= idx < len(cases): + c = cases[idx] if isinstance(cases[idx], dict) else {"operator": "eq", "value": cases[idx]} + branches[str(idx)] = build_switch_branch_payload( + inp, c, value_kind=value_kind, match_index=idx, + ) + primary_idx = matched_indices[0] if matched_indices else default_idx + primary = branches.get(str(primary_idx)) or build_switch_default_payload(inp, match_index=default_idx) + return {**primary, "branches": branches} + + +def switch_branch_payload(transit: Any, source_output: int) -> Optional[Dict[str, Any]]: + """Return the ContextBranch inner dict for a specific switch output port.""" + if not isinstance(transit, dict): + return None + data = transit.get("data") if transit.get("_transit") else transit + if not isinstance(data, dict): + return None + branches = data.get("branches") + if isinstance(branches, dict): + branch = branches.get(str(source_output)) + if isinstance(branch, dict): + return branch + if transit.get("_transit"): + return data + return data + + +def unwrap_transit_for_port(output: Any, source_output: Optional[int] = None) -> Any: + """Unwrap transit; when ``source_output`` is set, pick that switch branch payload.""" + if source_output is not None: + branch = switch_branch_payload(output, source_output) + if branch is not None: + return branch + return unwrapTransit(output) diff --git a/modules/features/graphicalEditor/upstreamPathsService.py b/modules/features/graphicalEditor/upstreamPathsService.py index 71972616..ade9524a 100644 --- a/modules/features/graphicalEditor/upstreamPathsService.py +++ b/modules/features/graphicalEditor/upstreamPathsService.py @@ -4,7 +4,7 @@ from __future__ import annotations from typing import Any, Dict, List, Set -from modules.features.graphicalEditor.conditionOperators import resolve_value_kind +from modules.features.graphicalEditor.conditionOperators import catalog_type_to_value_kind, resolve_value_kind from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG, PortSchema, parse_graph_defined_output_schema from modules.workflows.automation2.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds @@ -169,12 +169,16 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D ) for entry in paths: - ref = { - "nodeId": entry.get("producerNodeId"), - "path": entry.get("path") or [], - } - graph_with_target = {**graph, "targetNodeId": target_node_id} - entry["valueKind"] = resolve_value_kind(graph_with_target, ref) + ct = str(entry.get("type") or "Any") + vk = catalog_type_to_value_kind(ct) + if vk == "unknown": + ref = { + "nodeId": entry.get("producerNodeId"), + "path": entry.get("path") or [], + } + graph_with_target = {**graph, "targetNodeId": target_node_id} + vk = resolve_value_kind(graph_with_target, ref, _skip_upstream=True) + entry["valueKind"] = vk return paths diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index 4e3f89da..8efe9339 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -163,10 +163,15 @@ def _is_node_on_active_path( meta = out.get("_meta", {}) if out.get("_transit") else out branch = meta.get("branch") match = meta.get("match") + matches = meta.get("matches") active_output = None if branch is not None: active_output = branch + elif isinstance(matches, list) and matches: + if source_output not in matches: + return False + continue elif match is not None: if match < 0: return False diff --git a/modules/workflows/automation2/executors/actionNodeExecutor.py b/modules/workflows/automation2/executors/actionNodeExecutor.py index 2158de45..c7b20626 100644 --- a/modules/workflows/automation2/executors/actionNodeExecutor.py +++ b/modules/workflows/automation2/executors/actionNodeExecutor.py @@ -475,7 +475,7 @@ def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any: the first ``connectionMap`` entry so ``injectUpstreamPayload`` (e.g. ``context.mergeContext`` after ``flow.loop``) still receives data. """ - from modules.features.graphicalEditor.portTypes import unwrapTransit + from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port nodeOutputs = context.get("nodeOutputs") or {} connectionMap = context.get("connectionMap") or {} @@ -496,25 +496,25 @@ def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any: if not entry: return None - src_node_id, _ = entry + src_node_id, src_out = entry upstream = nodeOutputs.get(src_node_id) - return unwrapTransit(upstream) if isinstance(upstream, dict) else upstream + return unwrap_transit_for_port(upstream, src_out) def _resolveBranchInputs(nodeId: str, context: Dict[str, Any]) -> Dict[int, Any]: """Return ``Dict[port_index → unwrapped upstream output]`` for every wired input port.""" - from modules.features.graphicalEditor.portTypes import unwrapTransit + from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port src_map = (context.get("inputSources") or {}).get(nodeId) or {} nodeOutputs = context.get("nodeOutputs") or {} out: Dict[int, Any] = {} for port_ix, entry in src_map.items(): if not entry: continue - src_node_id, _ = entry + src_node_id, src_out = entry upstream = nodeOutputs.get(src_node_id) if upstream is None: continue - out[int(port_ix)] = unwrapTransit(upstream) if isinstance(upstream, dict) else upstream + out[int(port_ix)] = unwrap_transit_for_port(upstream, src_out) return out @@ -554,7 +554,12 @@ class ActionNodeExecutor: # 1. Resolve parameters (DataRef, SystemVar, Static) params = dict(node.get("parameters") or {}) logger.debug("ActionNodeExecutor node %s raw params keys=%s", nodeId, list(params.keys())) - resolvedParams = resolveParameterReferences(params, context.get("nodeOutputs", {})) + resolvedParams = resolveParameterReferences( + params, + context.get("nodeOutputs", {}), + consumer_node_id=nodeId, + input_sources=context.get("inputSources"), + ) logger.debug("ActionNodeExecutor node %s resolved params keys=%s documentList_present=%s documentList_type=%s", nodeId, list(resolvedParams.keys()), "documentList" in resolvedParams, type(resolvedParams.get("documentList")).__name__) # 2. Apply defaults from parameter definitions diff --git a/modules/workflows/automation2/executors/flowExecutor.py b/modules/workflows/automation2/executors/flowExecutor.py index e95c4fc3..00ede971 100644 --- a/modules/workflows/automation2/executors/flowExecutor.py +++ b/modules/workflows/automation2/executors/flowExecutor.py @@ -132,15 +132,24 @@ class FlowExecutor: value_kind = "unknown" ref_for_kind = left_ref if isinstance(left_ref, dict) else cond.get("ref") if isinstance(ref_for_kind, dict) and ref_for_kind.get("nodeId") and node: - graph_stub = { - "nodes": [{"id": node.get("id"), "type": node.get("type")}], - "targetNodeId": node.get("id"), - } + graph_stub = self._graph_stub_for_ref(node, ref_for_kind, nodeOutputs) value_kind = resolve_value_kind(graph_stub, ref_for_kind) return apply_condition_operator(left, str(operator), right, value_kind) - def _compare_dates(self, left: Any, right: Any, op) -> bool: + def _graph_stub_for_ref(self, node: Dict, ref: Dict, nodeOutputs: Dict) -> Dict[str, Any]: + """Minimal graph for ``resolve_value_kind`` (includes value producer when known).""" + nodes: List[Dict[str, Any]] = [{"id": node.get("id"), "type": node.get("type")}] + producer_id = ref.get("nodeId") + if producer_id: + ctx = nodeOutputs.get("_context") if isinstance(nodeOutputs.get("_context"), dict) else {} + graph_nodes = ctx.get("graphNodesById") if isinstance(ctx.get("graphNodesById"), dict) else {} + pnode = graph_nodes.get(producer_id) if isinstance(graph_nodes, dict) else None + if isinstance(pnode, dict): + nodes.append({"id": producer_id, "type": pnode.get("type", "")}) + else: + nodes.append({"id": producer_id, "type": ""}) + return {"nodes": nodes, "targetNodeId": node.get("id")} """Compare left/right as dates; op(a,b) is the comparison.""" def parse(v): @@ -197,23 +206,42 @@ class FlowExecutor: return bool(resolved) async def _switch(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any: - valueExpr = (node.get("parameters") or {}).get("value", "") + params = node.get("parameters") or {} + valueExpr = params.get("value", "") from modules.workflows.automation2.graphUtils import resolveParameterReferences - value = resolveParameterReferences(valueExpr, nodeOutputs) - cases = (node.get("parameters") or {}).get("cases", []) - inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs) - for i, c in enumerate(cases): - if self._evalSwitchCase(value, c): - return wrapTransit( - unwrapTransit(inp) if inp else inp, - {"match": i, "value": value}, - ) - return wrapTransit( - unwrapTransit(inp) if inp else inp, - {"match": -1, "value": value}, + from modules.features.graphicalEditor.switchOutput import ( + build_switch_combined_output, + build_switch_default_payload, ) - def _evalSwitchCase(self, left: Any, case: Any) -> bool: + value = resolveParameterReferences(valueExpr, nodeOutputs) + cases = params.get("cases", []) or [] + value_kind = "unknown" + if isinstance(valueExpr, dict) and valueExpr.get("type") == "ref": + graph_stub = self._graph_stub_for_ref(node, valueExpr, nodeOutputs) + value_kind = resolve_value_kind(graph_stub, valueExpr) + inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs) + matched: List[int] = [ + i for i, c in enumerate(cases) + if self._evalSwitchCase(value, c, value_kind=value_kind) + ] + default_idx = len(cases) if isinstance(cases, list) else 0 + if not matched: + matched = [default_idx] + combined = build_switch_combined_output( + inp, cases, matched_indices=matched, value_kind=value_kind, + ) + return wrapTransit( + combined, + { + "match": matched[0], + "matches": matched, + "value": value, + "filterApplied": bool(combined.get("filterApplied")), + }, + ) + + def _evalSwitchCase(self, left: Any, case: Any, *, value_kind: Optional[str] = None) -> bool: """ Evaluate a switch case. Case can be: - dict: {operator, value} - use operator to compare left vs value @@ -225,14 +253,19 @@ class FlowExecutor: else: operator = "eq" right = case - return apply_condition_operator(left, str(operator), right) + return apply_condition_operator(left, str(operator), right, value_kind) async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any: params = node.get("parameters") or {} itemsPath = params.get("items", "[]") from modules.workflows.automation2.graphUtils import resolveParameterReferences - raw = resolveParameterReferences(itemsPath, nodeOutputs) + raw = resolveParameterReferences( + itemsPath, + nodeOutputs, + consumer_node_id=nodeId, + input_sources=inputSources, + ) items = self._normalize_loop_items(raw) mode = (params.get("iterationMode") or "all").strip().lower() stride = params.get("iterationStride", 2) @@ -245,6 +278,8 @@ class FlowExecutor: def _normalize_loop_items(self, raw: Any) -> List[Any]: """Coerce resolved `items` into a list (lists, dict children, or scalars).""" + if isinstance(raw, dict) and isinstance(raw.get("items"), list): + return self._expand_presentation_lines_loop_items(raw["items"]) if isinstance(raw, list): return self._expand_presentation_lines_loop_items(raw) if isinstance(raw, dict): diff --git a/modules/workflows/automation2/graphUtils.py b/modules/workflows/automation2/graphUtils.py index 54cff2a1..b31dd7bb 100644 --- a/modules/workflows/automation2/graphUtils.py +++ b/modules/workflows/automation2/graphUtils.py @@ -253,6 +253,8 @@ def _checkPortCompatibility( continue srcOutputPorts = srcDef.get("outputPorts", {}) srcPort = srcOutputPorts.get(srcOut, {}) or {} + if srcNode.get("type") == "flow.switch" and not srcPort.get("schema"): + srcPort = srcOutputPorts.get(0, {}) or srcPort tgtPort = tgtInputPorts.get(tgtIn, {}) or {} if not isinstance(srcPort, dict): @@ -264,6 +266,9 @@ def _checkPortCompatibility( continue if src_schema in accepts: continue + # ContextBranch is a typed Transit envelope (switch filtered branches). + if src_schema == "ContextBranch" and ("Transit" in accepts or "ContextBranch" in accepts): + continue # Port that only declares Transit behaves as an untyped sink (legacy graphs). if len(accepts) == 1 and accepts[0] == "Transit": continue @@ -409,12 +414,21 @@ def _unwrapTypedRef(value: Any) -> Any: return value.get(primary, value) -def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any: +def resolveParameterReferences( + value: Any, + nodeOutputs: Dict[str, Any], + *, + consumer_node_id: Optional[str] = None, + input_sources: Optional[Dict[str, Dict[int, tuple]]] = None, +) -> Any: """ Resolve parameter references: - {{nodeId.output}} or {{nodeId.output.path}} in strings (legacy) - { "type": "ref", "nodeId": "...", "path": ["field", "nested"] } -> resolved value - { "type": "value", "value": ... } -> value (then recursively resolve) + + When ``consumer_node_id`` and ``input_sources`` are set, refs to the wired + upstream switch use that connection's output port (per-branch payload). """ import json import re @@ -430,8 +444,13 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any: path = value.get("path") if node_id is not None and isinstance(path, (list, tuple)): data = nodeOutputs.get(node_id) - # Unwrap transit envelopes to access the real data - if isinstance(data, dict) and data.get("_transit"): + wired = None + if consumer_node_id and input_sources: + wired = (input_sources.get(consumer_node_id) or {}).get(0) + if wired and wired[0] == node_id: + from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port + data = unwrap_transit_for_port(data, wired[1]) + elif isinstance(data, dict) and data.get("_transit"): data = data.get("data", data) plist = list(path) resolved = _get_by_path(data, plist) @@ -450,16 +469,34 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any: # Form nodes store fields under {"payload": {fieldName: …}}. # DataPicker emits bare field paths like ["url"]; try under payload. resolved = _get_by_path(data["payload"], plist) - return resolveParameterReferences(resolved, nodeOutputs) + return resolveParameterReferences( + resolved, + nodeOutputs, + consumer_node_id=consumer_node_id, + input_sources=input_sources, + ) return value if value.get("type") == "value": inner = value.get("value") - return resolveParameterReferences(inner, nodeOutputs) + return resolveParameterReferences( + inner, + nodeOutputs, + consumer_node_id=consumer_node_id, + input_sources=input_sources, + ) if value.get("type") == "system": variable = value.get("variable", "") from modules.features.graphicalEditor.portTypes import resolveSystemVariable return resolveSystemVariable(variable, nodeOutputs.get("_context", {})) - return {k: resolveParameterReferences(v, nodeOutputs) for k, v in value.items()} + return { + k: resolveParameterReferences( + v, + nodeOutputs, + consumer_node_id=consumer_node_id, + input_sources=input_sources, + ) + for k, v in value.items() + } if isinstance(value, str): def repl(m): @@ -498,11 +535,27 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any: # contextBuilder: list where every item is a `{"type":"ref",...}` envelope. # Resolve each part; a single ref preserves the resolved type (str, list, dict). if value and all(isinstance(v, dict) and v.get("type") == "ref" for v in value): - resolved_parts = [resolveParameterReferences(v, nodeOutputs) for v in value] + resolved_parts = [ + resolveParameterReferences( + v, + nodeOutputs, + consumer_node_id=consumer_node_id, + input_sources=input_sources, + ) + for v in value + ] if len(resolved_parts) == 1: return resolved_parts[0] return resolved_parts - return [resolveParameterReferences(v, nodeOutputs) for v in value] + return [ + resolveParameterReferences( + v, + nodeOutputs, + consumer_node_id=consumer_node_id, + input_sources=input_sources, + ) + for v in value + ] return value diff --git a/modules/workflows/methods/methodContext/actions/extractContent.py b/modules/workflows/methods/methodContext/actions/extractContent.py index ebf8e9ba..52d07b34 100644 --- a/modules/workflows/methods/methodContext/actions/extractContent.py +++ b/modules/workflows/methods/methodContext/actions/extractContent.py @@ -934,6 +934,52 @@ def _presentation_image_marker_in_data(part: Dict[str, Any]) -> Dict[str, Any]: return marker +_BLOB_IMAGE_CHUNK_RE = re.compile(r"^\[image(?:\:([^\]]+))?\]$") + + +def parse_blob_data_segments(data: str) -> List[Dict[str, Any]]: + """Split presentation ``blob`` ``data`` into virtual slots (text chunks + image markers).""" + segments: List[Dict[str, Any]] = [] + if not isinstance(data, str) or not data.strip(): + return segments + for idx, chunk in enumerate(data.split("\n\n")): + piece = chunk.strip() + if not piece: + continue + m = _BLOB_IMAGE_CHUNK_RE.fullmatch(piece) + if m: + token = (m.group(1) or "").strip() + seg: Dict[str, Any] = {"typeGroup": "image", "mimeType": "image/*", "data": piece} + if token: + seg["id"] = token + else: + seg["id"] = f"blob_image_{idx}" + segments.append(seg) + else: + segments.append({"typeGroup": "text", "mimeType": "text/plain", "data": piece, "id": f"blob_text_{idx}"}) + return segments + + +def filter_blob_bucket_by_content_type(bucket: Dict[str, Any], content_type: str) -> Dict[str, Any]: + """Keep only blob segments matching ``content_type`` (re-join as ``\\n\\n`` string).""" + out = copy.deepcopy(bucket) + raw = out.get("data") + if not isinstance(raw, str): + return out + target = (content_type or "").strip().lower() + kept: List[str] = [] + for seg in parse_blob_data_segments(raw): + tg = (seg.get("typeGroup") or "").strip().lower() + if target == "media" and tg in ("image", "media", "video", "audio"): + kept.append(str(seg.get("data") or "")) + elif target == "text" and tg in ("text", "table", "structure"): + kept.append(str(seg.get("data") or "")) + elif tg == target: + kept.append(str(seg.get("data") or "")) + out["data"] = "\n\n".join(s for s in kept if s.strip()) + return out + + def _build_file_presentation( source_file_name: str, parts: List[Dict[str, Any]], @@ -959,8 +1005,8 @@ def _build_file_presentation( tg = (p.get("typeGroup") or "").strip() if tg == "image": m = _presentation_image_marker_in_data(p) - pid = str(m.get("partId") or "").strip() - chunks_blob.append(f"[image:{pid}]" if pid else "[image]") + token = str(m.get("embeddedImageFileId") or m.get("partId") or "").strip() + chunks_blob.append(f"[image:{token}]" if token else "[image]") continue if _part_carries_plain_text(p): raw = p.get("data") @@ -1433,6 +1479,20 @@ def normalize_presentation_envelopes(raw: Any) -> List[Dict[str, Any]]: file_key=str(raw.get("name") or "file_1"), ) ] + if isinstance(raw.get("name"), str) and isinstance(raw.get("value"), dict): + slot = raw["value"] + if _is_presentation_line_slot(slot): + bucket = { + "outputMode": slot.get("outputMode") or "lines", + "sourceFileName": "", + "data": [slot], + } + return [ + presentation_envelope_from_file_bucket( + bucket, + file_key=str(raw.get("name") or "file_1"), + ) + ] if _is_presentation_file_bucket(raw): return [presentation_envelope_from_file_bucket(raw)] if _is_presentation_line_slot(raw): @@ -1450,6 +1510,27 @@ def normalize_presentation_envelopes(raw: Any) -> List[Dict[str, Any]]: return [] +def _artifacts_by_part_id_from_meta(meta: Any) -> Dict[str, str]: + out: Dict[str, str] = {} + if not isinstance(meta, dict): + return out + for art in meta.get("persistedImageArtifacts") or []: + if not isinstance(art, dict): + continue + sp = str(art.get("sourcePartId") or "").strip() + fid = str(art.get("fileId") or "").strip() + if sp and fid: + out[sp] = fid + return out + + +def _collect_artifacts_by_part_id(envelopes: List[Dict[str, Any]]) -> Dict[str, str]: + merged: Dict[str, str] = {} + for envelope in envelopes: + merged.update(_artifacts_by_part_id_from_meta(envelope.get("_meta"))) + return merged + + def presentation_envelopes_to_document_json( raw: Any, *, @@ -1466,6 +1547,8 @@ def presentation_envelopes_to_document_json( "context must be presentation data from Inhalt extrahieren (kind=context.extractContent.presentation.v1)" ) + artifacts_by_part = _collect_artifacts_by_part_id(envelopes) + sections: List[Dict[str, Any]] = [] order = 0 @@ -1496,8 +1579,35 @@ def presentation_envelopes_to_document_json( "elements": [{"content": {"inlineRuns": _parseInlineRuns(t)}}], }) - def _append_image_slot(slot: Dict[str, Any]) -> None: + def _resolve_image_file_id(slot: Dict[str, Any]) -> Optional[str]: fid = slot.get("embeddedImageFileId") + if fid: + return str(fid).strip() or None + candidates: List[str] = [] + sid = str(slot.get("id") or "").strip() + if sid: + candidates.append(sid) + raw_d = slot.get("data") + if isinstance(raw_d, str): + m = _BLOB_IMAGE_CHUNK_RE.fullmatch(raw_d.strip()) + if m: + tok = (m.group(1) or "").strip() + if tok: + candidates.append(tok) + for cand in candidates: + if cand in artifacts_by_part: + return artifacts_by_part[cand] + # Marker may already carry the persisted storage file id. + try: + blob = _load_image_bytes_by_file_id(services, cand) + if blob: + return cand + except Exception: + pass + return None + + def _append_image_slot(slot: Dict[str, Any]) -> None: + fid = _resolve_image_file_id(slot) if not fid: raise ValueError( "image slot is missing embeddedImageFileId — " @@ -1589,6 +1699,11 @@ def presentation_envelopes_to_document_json( if src: _append_heading(src) raw_data = bucket.get("data") + mode = str(bucket.get("outputMode") or "").strip().lower() + if isinstance(raw_data, str) and mode == "blob": + for seg in parse_blob_data_segments(raw_data): + _append_slot(seg) + return if isinstance(raw_data, str): _append_paragraph(raw_data) return diff --git a/tests/unit/graphicalEditor/test_upstream_paths_and_graph_schema.py b/tests/unit/graphicalEditor/test_upstream_paths_and_graph_schema.py index 16aec90d..13072b3f 100644 --- a/tests/unit/graphicalEditor/test_upstream_paths_and_graph_schema.py +++ b/tests/unit/graphicalEditor/test_upstream_paths_and_graph_schema.py @@ -50,6 +50,25 @@ def test_parse_graph_defined_schema_nested_group(): assert "addr.zip" in names +def test_compute_upstream_paths_switch_context_branch_items(): + graph = { + "nodes": [ + {"id": "ext1", "type": "context.extractContent", "parameters": {}}, + {"id": "sw1", "type": "flow.switch", "parameters": {"cases": [{"operator": "contains_content", "value": "image"}]}}, + {"id": "ai1", "type": "ai.prompt", "parameters": {"aiPrompt": "summarize"}}, + ], + "connections": [ + {"source": "ext1", "target": "sw1", "sourceOutput": 0, "targetInput": 0}, + {"source": "sw1", "target": "ai1", "sourceOutput": 0, "targetInput": 0}, + ], + } + paths = compute_upstream_paths(graph, "ai1") + sw_paths = [p for p in paths if p.get("producerNodeId") == "sw1"] + items_paths = [p for p in sw_paths if p.get("path") == ["items"]] + assert items_paths, sw_paths + assert items_paths[0].get("type") == "List[Any]" + + def test_validate_graph_port_mismatch_errors(): node_type_ids = {n["id"] for n in STATIC_NODE_TYPES} graph = { diff --git a/tests/unit/workflow/test_flow_executor_conditions.py b/tests/unit/workflow/test_flow_executor_conditions.py index ebfa6907..70cc84f4 100644 --- a/tests/unit/workflow/test_flow_executor_conditions.py +++ b/tests/unit/workflow/test_flow_executor_conditions.py @@ -61,6 +61,34 @@ def test_context_contains_content(executor): assert executor._evalStructuredCondition(cond, {"n1": presentation}, item_param={"type": "ref", "nodeId": "n1", "path": []}, node={"id": "if1", "type": "flow.ifElse"}) is True +def test_context_contains_content_blob_mode(executor): + presentation = { + "kind": PRESENTATION_KIND, + "outputMode": "blob", + "fileOrder": ["f1"], + "files": { + "f1": { + "outputMode": "blob", + "data": "Invoice text\n\n[image:abc123]", + } + }, + } + img_cond = {"type": "condition", "operator": "contains_content", "value": "image"} + txt_cond = {"type": "condition", "operator": "contains_content", "value": "text"} + item = {"type": "ref", "nodeId": "n1", "path": []} + node = {"id": "if1", "type": "flow.ifElse"} + assert executor._evalStructuredCondition(img_cond, {"n1": presentation}, item_param=item, node=node) is True + assert executor._evalStructuredCondition(txt_cond, {"n1": presentation}, item_param=item, node=node) is True + + def test_switch_uses_shared_operators(executor): assert executor._evalSwitchCase("abc", {"operator": "starts_with", "value": "ab"}) is True assert executor._evalSwitchCase([1, 2], {"operator": "length_eq", "value": 2}) is True + + +def test_switch_resolves_value_kind_for_string_ops(executor): + assert executor._evalSwitchCase( + "hello", + {"operator": "starts_with", "value": "he"}, + value_kind="string", + ) is True diff --git a/tests/unit/workflow/test_switch_filtered_output.py b/tests/unit/workflow/test_switch_filtered_output.py new file mode 100644 index 00000000..1cfac160 --- /dev/null +++ b/tests/unit/workflow/test_switch_filtered_output.py @@ -0,0 +1,359 @@ +# Copyright (c) 2025 Patrick Motsch +"""flow.switch ContextBranch: filtered presentation + loop-ready items.""" + +import pytest + +from modules.features.graphicalEditor.portTypes import unwrapTransit, wrapTransit +from modules.features.graphicalEditor.switchOutput import ( + build_switch_branch_payload, + build_switch_combined_output, + build_switch_default_payload, + unwrap_transit_for_port, +) +from modules.workflows.automation2.executionEngine import _is_node_on_active_path +from modules.workflows.automation2.executors.flowExecutor import FlowExecutor +from modules.workflows.automation2.graphUtils import resolveParameterReferences +from modules.workflows.methods.methodContext.actions.extractContent import PRESENTATION_KIND + + +def _presentation_with_text_and_image(): + return { + "kind": PRESENTATION_KIND, + "schemaVersion": "1", + "outputMode": "parts", + "fileOrder": ["doc"], + "files": { + "doc": { + "outputMode": "parts", + "data": [ + {"typeGroup": "text", "id": "t1", "data": "Hello"}, + {"typeGroup": "image", "id": "i1", "mimeType": "image/png", "data": "YQ=="}, + ], + } + }, + } + + +def _presentation_blob_with_text_and_image(): + blob_data = "Hello world\n\n[image:img1]\n\nMore text" + return { + "kind": PRESENTATION_KIND, + "schemaVersion": "1", + "outputMode": "blob", + "fileOrder": ["doc"], + "files": { + "doc": { + "outputMode": "blob", + "sourceFileName": "test.pdf", + "data": blob_data, + } + }, + } + + +def test_build_switch_branch_payload_filters_blob_image(): + pres = _presentation_blob_with_text_and_image() + payload = build_switch_branch_payload( + pres, + {"operator": "contains_content", "value": "image"}, + value_kind="context", + match_index=0, + ) + assert payload["filterApplied"] is True + assert len(payload["items"]) == 1 + assert payload["items"][0]["value"]["typeGroup"] == "image" + assert "[image:img1]" in payload["data"]["files"]["doc"]["data"] + + +def test_build_switch_branch_payload_filters_blob_text(): + pres = _presentation_blob_with_text_and_image() + payload = build_switch_branch_payload( + pres, + {"operator": "contains_content", "value": "text"}, + value_kind="context", + match_index=1, + ) + assert payload["filterApplied"] is True + assert len(payload["items"]) == 2 + assert all(i["value"]["typeGroup"] == "text" for i in payload["items"]) + filtered = payload["data"]["files"]["doc"]["data"] + assert "Hello world" in filtered + assert "[image:" not in filtered + + +@pytest.mark.asyncio +async def test_switch_blob_multi_match(): + executor = FlowExecutor() + pres = _presentation_blob_with_text_and_image() + sw_id = "sw1" + node_outputs = { + "ext1": pres, + "_context": { + "graphNodesById": { + "ext1": {"id": "ext1", "type": "context.extractContent"}, + sw_id: {"id": sw_id, "type": "flow.switch"}, + } + }, + } + executor._getInputData = lambda *_a, **_k: pres # type: ignore[method-assign] + node = { + "id": sw_id, + "type": "flow.switch", + "parameters": { + "value": {"type": "ref", "nodeId": "ext1", "path": []}, + "cases": [ + {"operator": "contains_content", "value": "image"}, + {"operator": "contains_content", "value": "text"}, + ], + }, + } + out = await executor._switch(node, node_outputs, sw_id, {}) + assert out["_meta"]["matches"] == [0, 1] + assert len(unwrap_transit_for_port(out, 0)["items"]) == 1 + assert len(unwrap_transit_for_port(out, 1)["items"]) == 2 + + +def test_switch_blob_image_items_get_embedded_file_id(): + part_id = "dbd27119-cd21-4a62-b5e2-b06d3b81470b" + file_id = "storage-file-uuid-1" + pres = { + "kind": PRESENTATION_KIND, + "outputMode": "blob", + "fileOrder": ["doc"], + "files": { + "doc": { + "outputMode": "blob", + "data": f"Hello\n\n[image:{part_id}]", + } + }, + "_meta": { + "persistedImageArtifacts": [ + {"sourcePartId": part_id, "fileId": file_id, "mimeType": "image/png"}, + ] + }, + } + payload = build_switch_branch_payload( + pres, + {"operator": "contains_content", "value": "image"}, + value_kind="context", + match_index=0, + ) + assert len(payload["items"]) == 1 + slot = payload["items"][0]["value"] + assert slot.get("embeddedImageFileId") == file_id + + +def test_build_switch_branch_payload_filters_images(): + pres = _presentation_with_text_and_image() + case = {"operator": "contains_content", "value": "image"} + payload = build_switch_branch_payload( + pres, + case, + value_kind="context", + match_index=0, + ) + assert payload["filterApplied"] is True + assert payload["contentType"] == "image" + assert len(payload["items"]) == 1 + assert payload["items"][0]["value"]["typeGroup"] == "image" + data = payload["data"] + assert data["kind"] == PRESENTATION_KIND + slots = data["files"]["doc"]["data"] + assert len(slots) == 1 + assert slots[0]["typeGroup"] == "image" + + +def test_build_switch_default_payload_passthrough(): + pres = _presentation_with_text_and_image() + payload = build_switch_default_payload(pres, match_index=2) + assert payload["filterApplied"] is False + assert payload["match"] == 2 + assert payload["data"]["fileOrder"] == pres["fileOrder"] + assert len(payload["items"]) == 2 + + +@pytest.mark.asyncio +async def test_switch_executor_match_and_default_branch(): + executor = FlowExecutor() + pres = _presentation_with_text_and_image() + ext_id = "ext1" + sw_id = "sw1" + node_outputs = { + ext_id: pres, + "_context": { + "graphNodesById": { + ext_id: {"id": ext_id, "type": "context.extractContent"}, + sw_id: {"id": sw_id, "type": "flow.switch"}, + } + }, + } + + def _inp(_nid, _sources, _outputs, _output_index=0): + return pres + + executor._getInputData = _inp # type: ignore[method-assign] + + match_node = { + "id": sw_id, + "type": "flow.switch", + "parameters": { + "value": {"type": "ref", "nodeId": ext_id, "path": []}, + "cases": [{"operator": "contains_content", "value": "image"}], + }, + } + match_out = await executor._switch(match_node, node_outputs, sw_id, {}) + match_payload = unwrapTransit(match_out) + assert match_out["_meta"]["match"] == 0 + assert match_out["_meta"]["matches"] == [0] + assert match_payload["filterApplied"] is True + assert len(match_payload["items"]) == 1 + assert match_payload["branches"]["0"]["contentType"] == "image" + + default_node = { + **match_node, + "parameters": { + **match_node["parameters"], + "cases": [{"operator": "contains_content", "value": "video"}], + }, + } + default_out = await executor._switch(default_node, node_outputs, sw_id, {}) + assert default_out["_meta"]["match"] == 1 + assert default_out["_meta"]["matches"] == [1] + default_payload = unwrapTransit(default_out) + assert default_payload["filterApplied"] is False + assert default_payload["data"]["fileOrder"] == pres["fileOrder"] + + +@pytest.mark.asyncio +async def test_switch_multi_match_text_and_image_branches(): + executor = FlowExecutor() + pres = _presentation_with_text_and_image() + sw_id = "sw1" + node_outputs = { + "ext1": pres, + "_context": { + "graphNodesById": { + "ext1": {"id": "ext1", "type": "context.extractContent"}, + sw_id: {"id": sw_id, "type": "flow.switch"}, + } + }, + } + executor._getInputData = lambda *_a, **_k: pres # type: ignore[method-assign] + + node = { + "id": sw_id, + "type": "flow.switch", + "parameters": { + "value": {"type": "ref", "nodeId": "ext1", "path": []}, + "cases": [ + {"operator": "contains_content", "value": "image"}, + {"operator": "contains_content", "value": "text"}, + ], + }, + } + out = await executor._switch(node, node_outputs, sw_id, {}) + assert out["_meta"]["matches"] == [0, 1] + img = unwrap_transit_for_port(out, 0) + txt = unwrap_transit_for_port(out, 1) + assert img["contentType"] == "image" + assert txt["contentType"] == "text" + assert len(img["items"]) == 1 + assert len(txt["items"]) == 1 + assert img["items"][0]["value"]["typeGroup"] == "image" + assert txt["items"][0]["value"]["typeGroup"] == "text" + + +def test_active_path_allows_all_matching_switch_ports(): + combined = build_switch_combined_output( + _presentation_with_text_and_image(), + [ + {"operator": "contains_content", "value": "image"}, + {"operator": "contains_content", "value": "text"}, + ], + matched_indices=[0, 1], + value_kind="context", + ) + sw_out = wrapTransit(combined, {"match": 0, "matches": [0, 1]}) + node_outputs = {"sw1": sw_out} + conn_map = { + "loop_img": [("sw1", 0, 0)], + "file_txt": [("sw1", 1, 0)], + } + assert _is_node_on_active_path("loop_img", conn_map, node_outputs) + assert _is_node_on_active_path("file_txt", conn_map, node_outputs) + assert not _is_node_on_active_path("other", {"other": [("sw1", 2, 0)]}, node_outputs) + + +@pytest.mark.asyncio +async def test_loop_uses_switch_items_ref(): + executor = FlowExecutor() + pres = _presentation_with_text_and_image() + branch = build_switch_branch_payload( + pres, + {"operator": "contains_content", "value": "image"}, + value_kind="context", + match_index=0, + ) + sw_id = "sw1" + node_outputs = {sw_id: wrapTransit(branch, {"match": 0})} + + loop_node = { + "id": "loop1", + "type": "flow.loop", + "parameters": { + "items": {"type": "ref", "nodeId": sw_id, "path": ["items"]}, + }, + } + out = await executor._loop(loop_node, node_outputs, "loop1", {}) + assert out["count"] == 1 + assert out["items"][0]["value"]["typeGroup"] == "image" + + +def test_resolve_context_builder_ref_uses_switch_output_port(): + """file.create context ref to switch.items must use the wired source output port.""" + pres = _presentation_with_text_and_image() + combined = build_switch_combined_output( + pres, + [ + {"operator": "contains_content", "value": "image"}, + {"operator": "contains_content", "value": "text"}, + ], + matched_indices=[0, 1], + value_kind="context", + ) + sw_id = "sw1" + consumer_id = "fc1" + node_outputs = {sw_id: wrapTransit(combined, {"match": 0, "matches": [0, 1]})} + input_sources = {consumer_id: {0: (sw_id, 1)}} + resolved = resolveParameterReferences( + { + "context": [ + { + "type": "ref", + "nodeId": sw_id, + "path": ["items"], + } + ], + }, + node_outputs, + consumer_node_id=consumer_id, + input_sources=input_sources, + ) + items = resolved["context"] + assert isinstance(items, list) + assert len(items) == 1 + assert items[0]["value"]["typeGroup"] == "text" + branch = build_switch_branch_payload( + _presentation_with_text_and_image(), + {"operator": "contains_content", "value": "image"}, + value_kind="context", + match_index=0, + ) + node_outputs = {"sw1": wrapTransit(branch, {"match": 0})} + resolved = resolveParameterReferences( + {"type": "ref", "nodeId": "sw1", "path": ["items"]}, + node_outputs, + ) + assert isinstance(resolved, list) + assert len(resolved) == 1 + assert resolved[0]["value"]["typeGroup"] == "image" diff --git a/tests/unit/workflows/test_automation2_graphUtils.py b/tests/unit/workflows/test_automation2_graphUtils.py index f4249a1b..f76b9545 100644 --- a/tests/unit/workflows/test_automation2_graphUtils.py +++ b/tests/unit/workflows/test_automation2_graphUtils.py @@ -37,6 +37,34 @@ class TestValidateGraphStartNode: assert not any("no start node" in e.lower() for e in errs) + def test_switch_second_output_to_ai_prompt_ok(self): + from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES + + node_type_ids = {n["id"] for n in STATIC_NODE_TYPES} + graph = { + "nodes": [ + {"id": "t", "type": "trigger.manual", "parameters": {}}, + { + "id": "sw", + "type": "flow.switch", + "parameters": { + "cases": [ + {"operator": "contains_content", "value": "image"}, + {"operator": "contains_content", "value": "text"}, + ], + }, + }, + {"id": "ai", "type": "ai.prompt", "parameters": {"aiPrompt": "hi"}}, + ], + "connections": [ + {"source": "sw", "target": "ai", "sourceOutput": 1, "targetInput": 0}, + ], + } + errs = validateGraph(graph, node_type_ids) + port_errs = [e for e in errs if "Port mismatch" in e] + assert port_errs == [], port_errs + + class TestResolveParameterReferences: """Test structured ref/value resolution."""