resumed testing and handover improvement
This commit is contained in:
parent
4016ec31fa
commit
996cb4a775
13 changed files with 1046 additions and 65 deletions
|
|
@ -257,7 +257,7 @@ def _path_suggests_file(path: List[Any], producer_type: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any]) -> str:
|
||||
def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any], *, _skip_upstream: bool = False) -> str:
|
||||
"""Resolve condition valueKind for a DataRef against the workflow graph."""
|
||||
if not isinstance(ref, dict):
|
||||
return "unknown"
|
||||
|
|
@ -281,31 +281,32 @@ def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any]) -> str:
|
|||
return "string"
|
||||
return "file"
|
||||
|
||||
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths
|
||||
if not _skip_upstream:
|
||||
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths
|
||||
|
||||
target_id = graph.get("targetNodeId") or producer_id
|
||||
matched_type: Optional[str] = None
|
||||
for entry in compute_upstream_paths(graph, target_id):
|
||||
if entry.get("producerNodeId") != producer_id:
|
||||
continue
|
||||
entry_path = entry.get("path") or []
|
||||
if _paths_equal(list(entry_path), list(path)):
|
||||
matched_type = str(entry.get("type") or "Any")
|
||||
break
|
||||
|
||||
if matched_type is None and path:
|
||||
parent_path = list(path[:-1])
|
||||
target_id = graph.get("targetNodeId") or producer_id
|
||||
matched_type: Optional[str] = None
|
||||
for entry in compute_upstream_paths(graph, target_id):
|
||||
if entry.get("producerNodeId") != producer_id:
|
||||
continue
|
||||
if _paths_equal(list(entry.get("path") or []), parent_path):
|
||||
entry_path = entry.get("path") or []
|
||||
if _paths_equal(list(entry_path), list(path)):
|
||||
matched_type = str(entry.get("type") or "Any")
|
||||
break
|
||||
|
||||
if matched_type:
|
||||
vk = catalog_type_to_value_kind(matched_type)
|
||||
if vk != "unknown":
|
||||
return vk
|
||||
if matched_type is None and path:
|
||||
parent_path = list(path[:-1])
|
||||
for entry in compute_upstream_paths(graph, target_id):
|
||||
if entry.get("producerNodeId") != producer_id:
|
||||
continue
|
||||
if _paths_equal(list(entry.get("path") or []), parent_path):
|
||||
matched_type = str(entry.get("type") or "Any")
|
||||
break
|
||||
|
||||
if matched_type:
|
||||
vk = catalog_type_to_value_kind(matched_type)
|
||||
if vk != "unknown":
|
||||
return vk
|
||||
|
||||
if producer_type in ("trigger.form", "input.form") and path and str(path[0]) == "payload":
|
||||
return "string"
|
||||
|
|
@ -414,6 +415,12 @@ def _iter_presentation_parts(envelope: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|||
if not isinstance(bucket, dict):
|
||||
continue
|
||||
data = bucket.get("data")
|
||||
mode = str(bucket.get("outputMode") or "").strip().lower()
|
||||
if mode == "blob" and isinstance(data, str):
|
||||
from modules.workflows.methods.methodContext.actions.extractContent import parse_blob_data_segments
|
||||
|
||||
parts.extend(parse_blob_data_segments(data))
|
||||
continue
|
||||
if isinstance(data, list):
|
||||
for slot in data:
|
||||
if isinstance(slot, dict):
|
||||
|
|
|
|||
|
|
@ -298,6 +298,21 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = {
|
|||
PortField(name="merged", type="Dict",
|
||||
description="Zusammengeführte Daten"),
|
||||
]),
|
||||
"ContextBranch": PortSchema(name="ContextBranch", fields=[
|
||||
PortField(name="items", type="List[Any]",
|
||||
description="Schleifen-fertige Elemente aus dem (gefilterten) Kontext",
|
||||
recommended=True,
|
||||
picker_label=t("Gefilterte Elemente")),
|
||||
PortField(name="data", type="Dict", required=False,
|
||||
description="Gefilterter Presentation-Umschlag oder Eingabe-Spiegel",
|
||||
picker_label=t("Kontext (data)")),
|
||||
PortField(name="filterApplied", type="bool", required=False,
|
||||
description="True wenn ein Kontext-Inhaltsfilter angewendet wurde"),
|
||||
PortField(name="contentType", type="str", required=False,
|
||||
description="Angewendeter Inhaltstyp-Filter (z. B. image)"),
|
||||
PortField(name="match", type="int", required=False,
|
||||
description="Aktiver Ausgangs-Index (Fall oder Sonst)"),
|
||||
]),
|
||||
"ActionDocument": PortSchema(name="ActionDocument", fields=[
|
||||
PortField(name="documentName", type="str",
|
||||
description="Dokumentname",
|
||||
|
|
|
|||
308
modules/features/graphicalEditor/switchOutput.py
Normal file
308
modules/features/graphicalEditor/switchOutput.py
Normal file
|
|
@ -0,0 +1,308 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
"""Build flow.switch branch payloads: filtered context + loop-ready items."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from modules.features.graphicalEditor.portTypes import unwrapTransit
|
||||
|
||||
_CONTEXT_FILTER_OPERATORS = frozenset({"contains_content"})
|
||||
_BLOB_IMAGE_CHUNK_RE = re.compile(r"^\[image(?:\:([^\]]+))?\]$")
|
||||
|
||||
|
||||
def _artifacts_by_part_id_from_presentation(inp: Any) -> Dict[str, str]:
|
||||
plain = _unwrap_input(inp)
|
||||
meta = plain.get("_meta") if isinstance(plain, dict) else None
|
||||
if not isinstance(meta, dict):
|
||||
return {}
|
||||
out: Dict[str, str] = {}
|
||||
for art in meta.get("persistedImageArtifacts") or []:
|
||||
if not isinstance(art, dict):
|
||||
continue
|
||||
sp = str(art.get("sourcePartId") or "").strip()
|
||||
fid = str(art.get("fileId") or "").strip()
|
||||
if sp and fid:
|
||||
out[sp] = fid
|
||||
return out
|
||||
|
||||
|
||||
def _enrich_image_slot(slot: Dict[str, Any], artifacts_by_part: Dict[str, str]) -> None:
|
||||
if (slot.get("typeGroup") or "").strip().lower() != "image":
|
||||
return
|
||||
existing = str(slot.get("embeddedImageFileId") or "").strip()
|
||||
if existing and existing in artifacts_by_part.values():
|
||||
return
|
||||
candidates: List[str] = []
|
||||
sid = str(slot.get("id") or "").strip()
|
||||
if sid:
|
||||
candidates.append(sid)
|
||||
data = slot.get("data")
|
||||
if isinstance(data, str):
|
||||
m = _BLOB_IMAGE_CHUNK_RE.fullmatch(data.strip())
|
||||
if m:
|
||||
tok = (m.group(1) or "").strip()
|
||||
if tok:
|
||||
candidates.append(tok)
|
||||
for cand in candidates:
|
||||
fid = artifacts_by_part.get(cand)
|
||||
if fid:
|
||||
slot["embeddedImageFileId"] = fid
|
||||
return
|
||||
|
||||
|
||||
def _slot_matches_content_type(slot: Dict[str, Any], content_type: str) -> bool:
|
||||
target = (content_type or "").strip().lower()
|
||||
if not target:
|
||||
return False
|
||||
tg = (slot.get("typeGroup") or slot.get("contentType") or "").strip().lower()
|
||||
if target == "media":
|
||||
return tg in ("image", "media", "video", "audio")
|
||||
if target == "text":
|
||||
return tg in ("text", "table", "structure")
|
||||
return tg == target
|
||||
|
||||
|
||||
def _filter_bucket_slots(bucket: Dict[str, Any], content_type: str) -> Dict[str, Any]:
|
||||
"""Return a copy of a presentation file bucket with filtered ``data`` slots."""
|
||||
mode = str(bucket.get("outputMode") or "").strip().lower()
|
||||
data = bucket.get("data")
|
||||
if mode == "blob" and isinstance(data, str):
|
||||
from modules.workflows.methods.methodContext.actions.extractContent import (
|
||||
filter_blob_bucket_by_content_type,
|
||||
)
|
||||
|
||||
return filter_blob_bucket_by_content_type(bucket, content_type)
|
||||
out = copy.deepcopy(bucket)
|
||||
if isinstance(data, list):
|
||||
out["data"] = [s for s in data if isinstance(s, dict) and _slot_matches_content_type(s, content_type)]
|
||||
elif isinstance(data, dict) and _slot_matches_content_type(data, content_type):
|
||||
out["data"] = data
|
||||
else:
|
||||
out["data"] = [] if isinstance(data, list) else data
|
||||
return out
|
||||
|
||||
|
||||
def _filter_presentation_envelope(envelope: Dict[str, Any], content_type: str) -> Dict[str, Any]:
|
||||
"""Filter all slots in a presentation envelope by content type group."""
|
||||
from modules.workflows.methods.methodContext.actions.extractContent import (
|
||||
PRESENTATION_KIND,
|
||||
PRESENTATION_SCHEMA_VERSION,
|
||||
)
|
||||
|
||||
out = copy.deepcopy(envelope)
|
||||
files = out.get("files") or {}
|
||||
if not isinstance(files, dict):
|
||||
return out
|
||||
filtered_files: Dict[str, Any] = {}
|
||||
kept_order: List[str] = []
|
||||
for fk in out.get("fileOrder") or list(files.keys()):
|
||||
bucket = files.get(fk)
|
||||
if not isinstance(bucket, dict):
|
||||
continue
|
||||
fb = _filter_bucket_slots(bucket, content_type)
|
||||
data = fb.get("data")
|
||||
has_data = (
|
||||
(isinstance(data, list) and len(data) > 0)
|
||||
or (isinstance(data, dict))
|
||||
or (isinstance(data, str) and str(data).strip())
|
||||
)
|
||||
if has_data:
|
||||
filtered_files[str(fk)] = fb
|
||||
kept_order.append(str(fk))
|
||||
out["schemaVersion"] = out.get("schemaVersion") or PRESENTATION_SCHEMA_VERSION
|
||||
out["kind"] = out.get("kind") or PRESENTATION_KIND
|
||||
out["fileOrder"] = kept_order
|
||||
out["files"] = filtered_files
|
||||
return out
|
||||
|
||||
|
||||
def _slots_from_bucket(bucket: Dict[str, Any]) -> List[Any]:
|
||||
data = bucket.get("data")
|
||||
mode = str(bucket.get("outputMode") or "").strip().lower()
|
||||
if mode == "blob" and isinstance(data, str) and data.strip():
|
||||
from modules.workflows.methods.methodContext.actions.extractContent import parse_blob_data_segments
|
||||
|
||||
return parse_blob_data_segments(data)
|
||||
if isinstance(data, list):
|
||||
return [s for s in data if isinstance(s, dict)]
|
||||
if isinstance(data, dict):
|
||||
return [data]
|
||||
if isinstance(data, str) and data.strip():
|
||||
return [{"typeGroup": "text", "data": data}]
|
||||
items = bucket.get("items")
|
||||
if isinstance(items, list):
|
||||
return [i for i in items if isinstance(i, dict)]
|
||||
return []
|
||||
|
||||
|
||||
def _items_from_presentation_envelope(
|
||||
envelope: Dict[str, Any],
|
||||
*,
|
||||
artifacts_by_part: Optional[Dict[str, str]] = None,
|
||||
) -> List[Any]:
|
||||
items: List[Any] = []
|
||||
files = envelope.get("files") or {}
|
||||
if not isinstance(files, dict):
|
||||
return items
|
||||
for fk in envelope.get("fileOrder") or list(files.keys()):
|
||||
bucket = files.get(fk)
|
||||
if isinstance(bucket, dict):
|
||||
for slot in _slots_from_bucket(bucket):
|
||||
if artifacts_by_part:
|
||||
_enrich_image_slot(slot, artifacts_by_part)
|
||||
sid = str(slot.get("id") or slot.get("label") or len(items))
|
||||
items.append({"name": f"{fk}:{sid}", "value": slot})
|
||||
return items
|
||||
|
||||
|
||||
def expand_items_from_input(raw: Any) -> List[Any]:
|
||||
"""Best-effort loop items from transit/presentation/list/dict input."""
|
||||
if raw is None:
|
||||
return []
|
||||
if isinstance(raw, dict) and isinstance(raw.get("items"), list):
|
||||
return list(raw["items"])
|
||||
plain = unwrapTransit(raw) if isinstance(raw, dict) and raw.get("_transit") else raw
|
||||
if isinstance(plain, dict) and isinstance(plain.get("items"), list):
|
||||
return list(plain["items"])
|
||||
from modules.workflows.methods.methodContext.actions.extractContent import (
|
||||
normalize_presentation_envelopes,
|
||||
)
|
||||
|
||||
envelopes = normalize_presentation_envelopes(plain)
|
||||
if envelopes:
|
||||
out: List[Any] = []
|
||||
for env in envelopes:
|
||||
out.extend(_items_from_presentation_envelope(env))
|
||||
return out
|
||||
if isinstance(plain, list):
|
||||
return list(plain)
|
||||
if isinstance(plain, dict):
|
||||
children = plain.get("children")
|
||||
if isinstance(children, list) and children:
|
||||
return list(children)
|
||||
return [{"name": k, "value": v} for k, v in plain.items()]
|
||||
return [plain]
|
||||
|
||||
|
||||
def _unwrap_input(inp: Any) -> Any:
|
||||
if isinstance(inp, dict) and inp.get("_transit"):
|
||||
return unwrapTransit(inp)
|
||||
return inp
|
||||
|
||||
|
||||
def build_switch_branch_payload(
|
||||
inp: Any,
|
||||
case: Dict[str, Any],
|
||||
*,
|
||||
value_kind: str = "unknown",
|
||||
match_index: int = 0,
|
||||
) -> Dict[str, Any]:
|
||||
"""Payload for a matched switch case (ContextBranch inner data)."""
|
||||
operator = str(case.get("operator") or "eq")
|
||||
right = case.get("value")
|
||||
plain_in = _unwrap_input(inp)
|
||||
|
||||
if operator in _CONTEXT_FILTER_OPERATORS and value_kind == "context":
|
||||
content_type = str(right or "")
|
||||
from modules.workflows.methods.methodContext.actions.extractContent import (
|
||||
normalize_presentation_envelopes,
|
||||
)
|
||||
|
||||
source = plain_in
|
||||
if isinstance(source, dict) and "data" in source and not source.get("kind"):
|
||||
nested = source.get("data")
|
||||
if isinstance(nested, dict):
|
||||
source = nested
|
||||
envelopes = normalize_presentation_envelopes(source)
|
||||
if not envelopes and isinstance(plain_in, dict):
|
||||
envelopes = normalize_presentation_envelopes(plain_in)
|
||||
filtered_envs = [_filter_presentation_envelope(env, content_type) for env in envelopes]
|
||||
artifacts_by_part = _artifacts_by_part_id_from_presentation(plain_in)
|
||||
items: List[Any] = []
|
||||
for env in filtered_envs:
|
||||
items.extend(_items_from_presentation_envelope(env, artifacts_by_part=artifacts_by_part))
|
||||
if len(filtered_envs) == 1:
|
||||
data_out: Any = filtered_envs[0]
|
||||
elif filtered_envs:
|
||||
data_out = {"envelopes": filtered_envs}
|
||||
else:
|
||||
data_out = {}
|
||||
return {
|
||||
"data": data_out,
|
||||
"items": items,
|
||||
"filterApplied": True,
|
||||
"contentType": content_type,
|
||||
"match": match_index,
|
||||
}
|
||||
|
||||
data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in}
|
||||
return {
|
||||
"data": data_out,
|
||||
"items": expand_items_from_input(inp),
|
||||
"filterApplied": False,
|
||||
"match": match_index,
|
||||
}
|
||||
|
||||
|
||||
def build_switch_default_payload(inp: Any, *, match_index: int) -> Dict[str, Any]:
|
||||
"""Sonst branch: unmodified input passthrough."""
|
||||
plain_in = _unwrap_input(inp)
|
||||
data_out = plain_in if isinstance(plain_in, dict) else {"value": plain_in}
|
||||
return {
|
||||
"data": data_out,
|
||||
"items": expand_items_from_input(inp),
|
||||
"filterApplied": False,
|
||||
"match": match_index,
|
||||
}
|
||||
|
||||
|
||||
def build_switch_combined_output(
|
||||
inp: Any,
|
||||
cases: List[Any],
|
||||
*,
|
||||
matched_indices: List[int],
|
||||
value_kind: str = "unknown",
|
||||
) -> Dict[str, Any]:
|
||||
"""Build per-port branch payloads; primary fields mirror the first active match."""
|
||||
branches: Dict[str, Dict[str, Any]] = {}
|
||||
default_idx = len(cases)
|
||||
for idx in matched_indices:
|
||||
if idx == default_idx:
|
||||
branches[str(idx)] = build_switch_default_payload(inp, match_index=default_idx)
|
||||
elif 0 <= idx < len(cases):
|
||||
c = cases[idx] if isinstance(cases[idx], dict) else {"operator": "eq", "value": cases[idx]}
|
||||
branches[str(idx)] = build_switch_branch_payload(
|
||||
inp, c, value_kind=value_kind, match_index=idx,
|
||||
)
|
||||
primary_idx = matched_indices[0] if matched_indices else default_idx
|
||||
primary = branches.get(str(primary_idx)) or build_switch_default_payload(inp, match_index=default_idx)
|
||||
return {**primary, "branches": branches}
|
||||
|
||||
|
||||
def switch_branch_payload(transit: Any, source_output: int) -> Optional[Dict[str, Any]]:
|
||||
"""Return the ContextBranch inner dict for a specific switch output port."""
|
||||
if not isinstance(transit, dict):
|
||||
return None
|
||||
data = transit.get("data") if transit.get("_transit") else transit
|
||||
if not isinstance(data, dict):
|
||||
return None
|
||||
branches = data.get("branches")
|
||||
if isinstance(branches, dict):
|
||||
branch = branches.get(str(source_output))
|
||||
if isinstance(branch, dict):
|
||||
return branch
|
||||
if transit.get("_transit"):
|
||||
return data
|
||||
return data
|
||||
|
||||
|
||||
def unwrap_transit_for_port(output: Any, source_output: Optional[int] = None) -> Any:
|
||||
"""Unwrap transit; when ``source_output`` is set, pick that switch branch payload."""
|
||||
if source_output is not None:
|
||||
branch = switch_branch_payload(output, source_output)
|
||||
if branch is not None:
|
||||
return branch
|
||||
return unwrapTransit(output)
|
||||
|
|
@ -4,7 +4,7 @@ from __future__ import annotations
|
|||
|
||||
from typing import Any, Dict, List, Set
|
||||
|
||||
from modules.features.graphicalEditor.conditionOperators import resolve_value_kind
|
||||
from modules.features.graphicalEditor.conditionOperators import catalog_type_to_value_kind, resolve_value_kind
|
||||
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
|
||||
from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG, PortSchema, parse_graph_defined_output_schema
|
||||
from modules.workflows.automation2.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds
|
||||
|
|
@ -169,12 +169,16 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D
|
|||
)
|
||||
|
||||
for entry in paths:
|
||||
ref = {
|
||||
"nodeId": entry.get("producerNodeId"),
|
||||
"path": entry.get("path") or [],
|
||||
}
|
||||
graph_with_target = {**graph, "targetNodeId": target_node_id}
|
||||
entry["valueKind"] = resolve_value_kind(graph_with_target, ref)
|
||||
ct = str(entry.get("type") or "Any")
|
||||
vk = catalog_type_to_value_kind(ct)
|
||||
if vk == "unknown":
|
||||
ref = {
|
||||
"nodeId": entry.get("producerNodeId"),
|
||||
"path": entry.get("path") or [],
|
||||
}
|
||||
graph_with_target = {**graph, "targetNodeId": target_node_id}
|
||||
vk = resolve_value_kind(graph_with_target, ref, _skip_upstream=True)
|
||||
entry["valueKind"] = vk
|
||||
|
||||
return paths
|
||||
|
||||
|
|
|
|||
|
|
@ -163,10 +163,15 @@ def _is_node_on_active_path(
|
|||
meta = out.get("_meta", {}) if out.get("_transit") else out
|
||||
branch = meta.get("branch")
|
||||
match = meta.get("match")
|
||||
matches = meta.get("matches")
|
||||
|
||||
active_output = None
|
||||
if branch is not None:
|
||||
active_output = branch
|
||||
elif isinstance(matches, list) and matches:
|
||||
if source_output not in matches:
|
||||
return False
|
||||
continue
|
||||
elif match is not None:
|
||||
if match < 0:
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -475,7 +475,7 @@ def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any:
|
|||
the first ``connectionMap`` entry so ``injectUpstreamPayload`` (e.g.
|
||||
``context.mergeContext`` after ``flow.loop``) still receives data.
|
||||
"""
|
||||
from modules.features.graphicalEditor.portTypes import unwrapTransit
|
||||
from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port
|
||||
|
||||
nodeOutputs = context.get("nodeOutputs") or {}
|
||||
connectionMap = context.get("connectionMap") or {}
|
||||
|
|
@ -496,25 +496,25 @@ def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any:
|
|||
|
||||
if not entry:
|
||||
return None
|
||||
src_node_id, _ = entry
|
||||
src_node_id, src_out = entry
|
||||
upstream = nodeOutputs.get(src_node_id)
|
||||
return unwrapTransit(upstream) if isinstance(upstream, dict) else upstream
|
||||
return unwrap_transit_for_port(upstream, src_out)
|
||||
|
||||
|
||||
def _resolveBranchInputs(nodeId: str, context: Dict[str, Any]) -> Dict[int, Any]:
|
||||
"""Return ``Dict[port_index → unwrapped upstream output]`` for every wired input port."""
|
||||
from modules.features.graphicalEditor.portTypes import unwrapTransit
|
||||
from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port
|
||||
src_map = (context.get("inputSources") or {}).get(nodeId) or {}
|
||||
nodeOutputs = context.get("nodeOutputs") or {}
|
||||
out: Dict[int, Any] = {}
|
||||
for port_ix, entry in src_map.items():
|
||||
if not entry:
|
||||
continue
|
||||
src_node_id, _ = entry
|
||||
src_node_id, src_out = entry
|
||||
upstream = nodeOutputs.get(src_node_id)
|
||||
if upstream is None:
|
||||
continue
|
||||
out[int(port_ix)] = unwrapTransit(upstream) if isinstance(upstream, dict) else upstream
|
||||
out[int(port_ix)] = unwrap_transit_for_port(upstream, src_out)
|
||||
return out
|
||||
|
||||
|
||||
|
|
@ -554,7 +554,12 @@ class ActionNodeExecutor:
|
|||
# 1. Resolve parameters (DataRef, SystemVar, Static)
|
||||
params = dict(node.get("parameters") or {})
|
||||
logger.debug("ActionNodeExecutor node %s raw params keys=%s", nodeId, list(params.keys()))
|
||||
resolvedParams = resolveParameterReferences(params, context.get("nodeOutputs", {}))
|
||||
resolvedParams = resolveParameterReferences(
|
||||
params,
|
||||
context.get("nodeOutputs", {}),
|
||||
consumer_node_id=nodeId,
|
||||
input_sources=context.get("inputSources"),
|
||||
)
|
||||
logger.debug("ActionNodeExecutor node %s resolved params keys=%s documentList_present=%s documentList_type=%s", nodeId, list(resolvedParams.keys()), "documentList" in resolvedParams, type(resolvedParams.get("documentList")).__name__)
|
||||
|
||||
# 2. Apply defaults from parameter definitions
|
||||
|
|
|
|||
|
|
@ -132,15 +132,24 @@ class FlowExecutor:
|
|||
value_kind = "unknown"
|
||||
ref_for_kind = left_ref if isinstance(left_ref, dict) else cond.get("ref")
|
||||
if isinstance(ref_for_kind, dict) and ref_for_kind.get("nodeId") and node:
|
||||
graph_stub = {
|
||||
"nodes": [{"id": node.get("id"), "type": node.get("type")}],
|
||||
"targetNodeId": node.get("id"),
|
||||
}
|
||||
graph_stub = self._graph_stub_for_ref(node, ref_for_kind, nodeOutputs)
|
||||
value_kind = resolve_value_kind(graph_stub, ref_for_kind)
|
||||
|
||||
return apply_condition_operator(left, str(operator), right, value_kind)
|
||||
|
||||
def _compare_dates(self, left: Any, right: Any, op) -> bool:
|
||||
def _graph_stub_for_ref(self, node: Dict, ref: Dict, nodeOutputs: Dict) -> Dict[str, Any]:
|
||||
"""Minimal graph for ``resolve_value_kind`` (includes value producer when known)."""
|
||||
nodes: List[Dict[str, Any]] = [{"id": node.get("id"), "type": node.get("type")}]
|
||||
producer_id = ref.get("nodeId")
|
||||
if producer_id:
|
||||
ctx = nodeOutputs.get("_context") if isinstance(nodeOutputs.get("_context"), dict) else {}
|
||||
graph_nodes = ctx.get("graphNodesById") if isinstance(ctx.get("graphNodesById"), dict) else {}
|
||||
pnode = graph_nodes.get(producer_id) if isinstance(graph_nodes, dict) else None
|
||||
if isinstance(pnode, dict):
|
||||
nodes.append({"id": producer_id, "type": pnode.get("type", "")})
|
||||
else:
|
||||
nodes.append({"id": producer_id, "type": ""})
|
||||
return {"nodes": nodes, "targetNodeId": node.get("id")}
|
||||
"""Compare left/right as dates; op(a,b) is the comparison."""
|
||||
|
||||
def parse(v):
|
||||
|
|
@ -197,23 +206,42 @@ class FlowExecutor:
|
|||
return bool(resolved)
|
||||
|
||||
async def _switch(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
|
||||
valueExpr = (node.get("parameters") or {}).get("value", "")
|
||||
params = node.get("parameters") or {}
|
||||
valueExpr = params.get("value", "")
|
||||
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
||||
value = resolveParameterReferences(valueExpr, nodeOutputs)
|
||||
cases = (node.get("parameters") or {}).get("cases", [])
|
||||
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
|
||||
for i, c in enumerate(cases):
|
||||
if self._evalSwitchCase(value, c):
|
||||
return wrapTransit(
|
||||
unwrapTransit(inp) if inp else inp,
|
||||
{"match": i, "value": value},
|
||||
)
|
||||
return wrapTransit(
|
||||
unwrapTransit(inp) if inp else inp,
|
||||
{"match": -1, "value": value},
|
||||
from modules.features.graphicalEditor.switchOutput import (
|
||||
build_switch_combined_output,
|
||||
build_switch_default_payload,
|
||||
)
|
||||
|
||||
def _evalSwitchCase(self, left: Any, case: Any) -> bool:
|
||||
value = resolveParameterReferences(valueExpr, nodeOutputs)
|
||||
cases = params.get("cases", []) or []
|
||||
value_kind = "unknown"
|
||||
if isinstance(valueExpr, dict) and valueExpr.get("type") == "ref":
|
||||
graph_stub = self._graph_stub_for_ref(node, valueExpr, nodeOutputs)
|
||||
value_kind = resolve_value_kind(graph_stub, valueExpr)
|
||||
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
|
||||
matched: List[int] = [
|
||||
i for i, c in enumerate(cases)
|
||||
if self._evalSwitchCase(value, c, value_kind=value_kind)
|
||||
]
|
||||
default_idx = len(cases) if isinstance(cases, list) else 0
|
||||
if not matched:
|
||||
matched = [default_idx]
|
||||
combined = build_switch_combined_output(
|
||||
inp, cases, matched_indices=matched, value_kind=value_kind,
|
||||
)
|
||||
return wrapTransit(
|
||||
combined,
|
||||
{
|
||||
"match": matched[0],
|
||||
"matches": matched,
|
||||
"value": value,
|
||||
"filterApplied": bool(combined.get("filterApplied")),
|
||||
},
|
||||
)
|
||||
|
||||
def _evalSwitchCase(self, left: Any, case: Any, *, value_kind: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Evaluate a switch case. Case can be:
|
||||
- dict: {operator, value} - use operator to compare left vs value
|
||||
|
|
@ -225,14 +253,19 @@ class FlowExecutor:
|
|||
else:
|
||||
operator = "eq"
|
||||
right = case
|
||||
return apply_condition_operator(left, str(operator), right)
|
||||
return apply_condition_operator(left, str(operator), right, value_kind)
|
||||
|
||||
async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
|
||||
params = node.get("parameters") or {}
|
||||
itemsPath = params.get("items", "[]")
|
||||
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
||||
|
||||
raw = resolveParameterReferences(itemsPath, nodeOutputs)
|
||||
raw = resolveParameterReferences(
|
||||
itemsPath,
|
||||
nodeOutputs,
|
||||
consumer_node_id=nodeId,
|
||||
input_sources=inputSources,
|
||||
)
|
||||
items = self._normalize_loop_items(raw)
|
||||
mode = (params.get("iterationMode") or "all").strip().lower()
|
||||
stride = params.get("iterationStride", 2)
|
||||
|
|
@ -245,6 +278,8 @@ class FlowExecutor:
|
|||
|
||||
def _normalize_loop_items(self, raw: Any) -> List[Any]:
|
||||
"""Coerce resolved `items` into a list (lists, dict children, or scalars)."""
|
||||
if isinstance(raw, dict) and isinstance(raw.get("items"), list):
|
||||
return self._expand_presentation_lines_loop_items(raw["items"])
|
||||
if isinstance(raw, list):
|
||||
return self._expand_presentation_lines_loop_items(raw)
|
||||
if isinstance(raw, dict):
|
||||
|
|
|
|||
|
|
@ -253,6 +253,8 @@ def _checkPortCompatibility(
|
|||
continue
|
||||
srcOutputPorts = srcDef.get("outputPorts", {})
|
||||
srcPort = srcOutputPorts.get(srcOut, {}) or {}
|
||||
if srcNode.get("type") == "flow.switch" and not srcPort.get("schema"):
|
||||
srcPort = srcOutputPorts.get(0, {}) or srcPort
|
||||
tgtPort = tgtInputPorts.get(tgtIn, {}) or {}
|
||||
|
||||
if not isinstance(srcPort, dict):
|
||||
|
|
@ -264,6 +266,9 @@ def _checkPortCompatibility(
|
|||
continue
|
||||
if src_schema in accepts:
|
||||
continue
|
||||
# ContextBranch is a typed Transit envelope (switch filtered branches).
|
||||
if src_schema == "ContextBranch" and ("Transit" in accepts or "ContextBranch" in accepts):
|
||||
continue
|
||||
# Port that only declares Transit behaves as an untyped sink (legacy graphs).
|
||||
if len(accepts) == 1 and accepts[0] == "Transit":
|
||||
continue
|
||||
|
|
@ -409,12 +414,21 @@ def _unwrapTypedRef(value: Any) -> Any:
|
|||
return value.get(primary, value)
|
||||
|
||||
|
||||
def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
|
||||
def resolveParameterReferences(
|
||||
value: Any,
|
||||
nodeOutputs: Dict[str, Any],
|
||||
*,
|
||||
consumer_node_id: Optional[str] = None,
|
||||
input_sources: Optional[Dict[str, Dict[int, tuple]]] = None,
|
||||
) -> Any:
|
||||
"""
|
||||
Resolve parameter references:
|
||||
- {{nodeId.output}} or {{nodeId.output.path}} in strings (legacy)
|
||||
- { "type": "ref", "nodeId": "...", "path": ["field", "nested"] } -> resolved value
|
||||
- { "type": "value", "value": ... } -> value (then recursively resolve)
|
||||
|
||||
When ``consumer_node_id`` and ``input_sources`` are set, refs to the wired
|
||||
upstream switch use that connection's output port (per-branch payload).
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
|
|
@ -430,8 +444,13 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
|
|||
path = value.get("path")
|
||||
if node_id is not None and isinstance(path, (list, tuple)):
|
||||
data = nodeOutputs.get(node_id)
|
||||
# Unwrap transit envelopes to access the real data
|
||||
if isinstance(data, dict) and data.get("_transit"):
|
||||
wired = None
|
||||
if consumer_node_id and input_sources:
|
||||
wired = (input_sources.get(consumer_node_id) or {}).get(0)
|
||||
if wired and wired[0] == node_id:
|
||||
from modules.features.graphicalEditor.switchOutput import unwrap_transit_for_port
|
||||
data = unwrap_transit_for_port(data, wired[1])
|
||||
elif isinstance(data, dict) and data.get("_transit"):
|
||||
data = data.get("data", data)
|
||||
plist = list(path)
|
||||
resolved = _get_by_path(data, plist)
|
||||
|
|
@ -450,16 +469,34 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
|
|||
# Form nodes store fields under {"payload": {fieldName: …}}.
|
||||
# DataPicker emits bare field paths like ["url"]; try under payload.
|
||||
resolved = _get_by_path(data["payload"], plist)
|
||||
return resolveParameterReferences(resolved, nodeOutputs)
|
||||
return resolveParameterReferences(
|
||||
resolved,
|
||||
nodeOutputs,
|
||||
consumer_node_id=consumer_node_id,
|
||||
input_sources=input_sources,
|
||||
)
|
||||
return value
|
||||
if value.get("type") == "value":
|
||||
inner = value.get("value")
|
||||
return resolveParameterReferences(inner, nodeOutputs)
|
||||
return resolveParameterReferences(
|
||||
inner,
|
||||
nodeOutputs,
|
||||
consumer_node_id=consumer_node_id,
|
||||
input_sources=input_sources,
|
||||
)
|
||||
if value.get("type") == "system":
|
||||
variable = value.get("variable", "")
|
||||
from modules.features.graphicalEditor.portTypes import resolveSystemVariable
|
||||
return resolveSystemVariable(variable, nodeOutputs.get("_context", {}))
|
||||
return {k: resolveParameterReferences(v, nodeOutputs) for k, v in value.items()}
|
||||
return {
|
||||
k: resolveParameterReferences(
|
||||
v,
|
||||
nodeOutputs,
|
||||
consumer_node_id=consumer_node_id,
|
||||
input_sources=input_sources,
|
||||
)
|
||||
for k, v in value.items()
|
||||
}
|
||||
|
||||
if isinstance(value, str):
|
||||
def repl(m):
|
||||
|
|
@ -498,11 +535,27 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
|
|||
# contextBuilder: list where every item is a `{"type":"ref",...}` envelope.
|
||||
# Resolve each part; a single ref preserves the resolved type (str, list, dict).
|
||||
if value and all(isinstance(v, dict) and v.get("type") == "ref" for v in value):
|
||||
resolved_parts = [resolveParameterReferences(v, nodeOutputs) for v in value]
|
||||
resolved_parts = [
|
||||
resolveParameterReferences(
|
||||
v,
|
||||
nodeOutputs,
|
||||
consumer_node_id=consumer_node_id,
|
||||
input_sources=input_sources,
|
||||
)
|
||||
for v in value
|
||||
]
|
||||
if len(resolved_parts) == 1:
|
||||
return resolved_parts[0]
|
||||
return resolved_parts
|
||||
return [resolveParameterReferences(v, nodeOutputs) for v in value]
|
||||
return [
|
||||
resolveParameterReferences(
|
||||
v,
|
||||
nodeOutputs,
|
||||
consumer_node_id=consumer_node_id,
|
||||
input_sources=input_sources,
|
||||
)
|
||||
for v in value
|
||||
]
|
||||
return value
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -934,6 +934,52 @@ def _presentation_image_marker_in_data(part: Dict[str, Any]) -> Dict[str, Any]:
|
|||
return marker
|
||||
|
||||
|
||||
_BLOB_IMAGE_CHUNK_RE = re.compile(r"^\[image(?:\:([^\]]+))?\]$")
|
||||
|
||||
|
||||
def parse_blob_data_segments(data: str) -> List[Dict[str, Any]]:
|
||||
"""Split presentation ``blob`` ``data`` into virtual slots (text chunks + image markers)."""
|
||||
segments: List[Dict[str, Any]] = []
|
||||
if not isinstance(data, str) or not data.strip():
|
||||
return segments
|
||||
for idx, chunk in enumerate(data.split("\n\n")):
|
||||
piece = chunk.strip()
|
||||
if not piece:
|
||||
continue
|
||||
m = _BLOB_IMAGE_CHUNK_RE.fullmatch(piece)
|
||||
if m:
|
||||
token = (m.group(1) or "").strip()
|
||||
seg: Dict[str, Any] = {"typeGroup": "image", "mimeType": "image/*", "data": piece}
|
||||
if token:
|
||||
seg["id"] = token
|
||||
else:
|
||||
seg["id"] = f"blob_image_{idx}"
|
||||
segments.append(seg)
|
||||
else:
|
||||
segments.append({"typeGroup": "text", "mimeType": "text/plain", "data": piece, "id": f"blob_text_{idx}"})
|
||||
return segments
|
||||
|
||||
|
||||
def filter_blob_bucket_by_content_type(bucket: Dict[str, Any], content_type: str) -> Dict[str, Any]:
|
||||
"""Keep only blob segments matching ``content_type`` (re-join as ``\\n\\n`` string)."""
|
||||
out = copy.deepcopy(bucket)
|
||||
raw = out.get("data")
|
||||
if not isinstance(raw, str):
|
||||
return out
|
||||
target = (content_type or "").strip().lower()
|
||||
kept: List[str] = []
|
||||
for seg in parse_blob_data_segments(raw):
|
||||
tg = (seg.get("typeGroup") or "").strip().lower()
|
||||
if target == "media" and tg in ("image", "media", "video", "audio"):
|
||||
kept.append(str(seg.get("data") or ""))
|
||||
elif target == "text" and tg in ("text", "table", "structure"):
|
||||
kept.append(str(seg.get("data") or ""))
|
||||
elif tg == target:
|
||||
kept.append(str(seg.get("data") or ""))
|
||||
out["data"] = "\n\n".join(s for s in kept if s.strip())
|
||||
return out
|
||||
|
||||
|
||||
def _build_file_presentation(
|
||||
source_file_name: str,
|
||||
parts: List[Dict[str, Any]],
|
||||
|
|
@ -959,8 +1005,8 @@ def _build_file_presentation(
|
|||
tg = (p.get("typeGroup") or "").strip()
|
||||
if tg == "image":
|
||||
m = _presentation_image_marker_in_data(p)
|
||||
pid = str(m.get("partId") or "").strip()
|
||||
chunks_blob.append(f"[image:{pid}]" if pid else "[image]")
|
||||
token = str(m.get("embeddedImageFileId") or m.get("partId") or "").strip()
|
||||
chunks_blob.append(f"[image:{token}]" if token else "[image]")
|
||||
continue
|
||||
if _part_carries_plain_text(p):
|
||||
raw = p.get("data")
|
||||
|
|
@ -1433,6 +1479,20 @@ def normalize_presentation_envelopes(raw: Any) -> List[Dict[str, Any]]:
|
|||
file_key=str(raw.get("name") or "file_1"),
|
||||
)
|
||||
]
|
||||
if isinstance(raw.get("name"), str) and isinstance(raw.get("value"), dict):
|
||||
slot = raw["value"]
|
||||
if _is_presentation_line_slot(slot):
|
||||
bucket = {
|
||||
"outputMode": slot.get("outputMode") or "lines",
|
||||
"sourceFileName": "",
|
||||
"data": [slot],
|
||||
}
|
||||
return [
|
||||
presentation_envelope_from_file_bucket(
|
||||
bucket,
|
||||
file_key=str(raw.get("name") or "file_1"),
|
||||
)
|
||||
]
|
||||
if _is_presentation_file_bucket(raw):
|
||||
return [presentation_envelope_from_file_bucket(raw)]
|
||||
if _is_presentation_line_slot(raw):
|
||||
|
|
@ -1450,6 +1510,27 @@ def normalize_presentation_envelopes(raw: Any) -> List[Dict[str, Any]]:
|
|||
return []
|
||||
|
||||
|
||||
def _artifacts_by_part_id_from_meta(meta: Any) -> Dict[str, str]:
|
||||
out: Dict[str, str] = {}
|
||||
if not isinstance(meta, dict):
|
||||
return out
|
||||
for art in meta.get("persistedImageArtifacts") or []:
|
||||
if not isinstance(art, dict):
|
||||
continue
|
||||
sp = str(art.get("sourcePartId") or "").strip()
|
||||
fid = str(art.get("fileId") or "").strip()
|
||||
if sp and fid:
|
||||
out[sp] = fid
|
||||
return out
|
||||
|
||||
|
||||
def _collect_artifacts_by_part_id(envelopes: List[Dict[str, Any]]) -> Dict[str, str]:
|
||||
merged: Dict[str, str] = {}
|
||||
for envelope in envelopes:
|
||||
merged.update(_artifacts_by_part_id_from_meta(envelope.get("_meta")))
|
||||
return merged
|
||||
|
||||
|
||||
def presentation_envelopes_to_document_json(
|
||||
raw: Any,
|
||||
*,
|
||||
|
|
@ -1466,6 +1547,8 @@ def presentation_envelopes_to_document_json(
|
|||
"context must be presentation data from Inhalt extrahieren (kind=context.extractContent.presentation.v1)"
|
||||
)
|
||||
|
||||
artifacts_by_part = _collect_artifacts_by_part_id(envelopes)
|
||||
|
||||
sections: List[Dict[str, Any]] = []
|
||||
order = 0
|
||||
|
||||
|
|
@ -1496,8 +1579,35 @@ def presentation_envelopes_to_document_json(
|
|||
"elements": [{"content": {"inlineRuns": _parseInlineRuns(t)}}],
|
||||
})
|
||||
|
||||
def _append_image_slot(slot: Dict[str, Any]) -> None:
|
||||
def _resolve_image_file_id(slot: Dict[str, Any]) -> Optional[str]:
|
||||
fid = slot.get("embeddedImageFileId")
|
||||
if fid:
|
||||
return str(fid).strip() or None
|
||||
candidates: List[str] = []
|
||||
sid = str(slot.get("id") or "").strip()
|
||||
if sid:
|
||||
candidates.append(sid)
|
||||
raw_d = slot.get("data")
|
||||
if isinstance(raw_d, str):
|
||||
m = _BLOB_IMAGE_CHUNK_RE.fullmatch(raw_d.strip())
|
||||
if m:
|
||||
tok = (m.group(1) or "").strip()
|
||||
if tok:
|
||||
candidates.append(tok)
|
||||
for cand in candidates:
|
||||
if cand in artifacts_by_part:
|
||||
return artifacts_by_part[cand]
|
||||
# Marker may already carry the persisted storage file id.
|
||||
try:
|
||||
blob = _load_image_bytes_by_file_id(services, cand)
|
||||
if blob:
|
||||
return cand
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _append_image_slot(slot: Dict[str, Any]) -> None:
|
||||
fid = _resolve_image_file_id(slot)
|
||||
if not fid:
|
||||
raise ValueError(
|
||||
"image slot is missing embeddedImageFileId — "
|
||||
|
|
@ -1589,6 +1699,11 @@ def presentation_envelopes_to_document_json(
|
|||
if src:
|
||||
_append_heading(src)
|
||||
raw_data = bucket.get("data")
|
||||
mode = str(bucket.get("outputMode") or "").strip().lower()
|
||||
if isinstance(raw_data, str) and mode == "blob":
|
||||
for seg in parse_blob_data_segments(raw_data):
|
||||
_append_slot(seg)
|
||||
return
|
||||
if isinstance(raw_data, str):
|
||||
_append_paragraph(raw_data)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -50,6 +50,25 @@ def test_parse_graph_defined_schema_nested_group():
|
|||
assert "addr.zip" in names
|
||||
|
||||
|
||||
def test_compute_upstream_paths_switch_context_branch_items():
|
||||
graph = {
|
||||
"nodes": [
|
||||
{"id": "ext1", "type": "context.extractContent", "parameters": {}},
|
||||
{"id": "sw1", "type": "flow.switch", "parameters": {"cases": [{"operator": "contains_content", "value": "image"}]}},
|
||||
{"id": "ai1", "type": "ai.prompt", "parameters": {"aiPrompt": "summarize"}},
|
||||
],
|
||||
"connections": [
|
||||
{"source": "ext1", "target": "sw1", "sourceOutput": 0, "targetInput": 0},
|
||||
{"source": "sw1", "target": "ai1", "sourceOutput": 0, "targetInput": 0},
|
||||
],
|
||||
}
|
||||
paths = compute_upstream_paths(graph, "ai1")
|
||||
sw_paths = [p for p in paths if p.get("producerNodeId") == "sw1"]
|
||||
items_paths = [p for p in sw_paths if p.get("path") == ["items"]]
|
||||
assert items_paths, sw_paths
|
||||
assert items_paths[0].get("type") == "List[Any]"
|
||||
|
||||
|
||||
def test_validate_graph_port_mismatch_errors():
|
||||
node_type_ids = {n["id"] for n in STATIC_NODE_TYPES}
|
||||
graph = {
|
||||
|
|
|
|||
|
|
@ -61,6 +61,34 @@ def test_context_contains_content(executor):
|
|||
assert executor._evalStructuredCondition(cond, {"n1": presentation}, item_param={"type": "ref", "nodeId": "n1", "path": []}, node={"id": "if1", "type": "flow.ifElse"}) is True
|
||||
|
||||
|
||||
def test_context_contains_content_blob_mode(executor):
|
||||
presentation = {
|
||||
"kind": PRESENTATION_KIND,
|
||||
"outputMode": "blob",
|
||||
"fileOrder": ["f1"],
|
||||
"files": {
|
||||
"f1": {
|
||||
"outputMode": "blob",
|
||||
"data": "Invoice text\n\n[image:abc123]",
|
||||
}
|
||||
},
|
||||
}
|
||||
img_cond = {"type": "condition", "operator": "contains_content", "value": "image"}
|
||||
txt_cond = {"type": "condition", "operator": "contains_content", "value": "text"}
|
||||
item = {"type": "ref", "nodeId": "n1", "path": []}
|
||||
node = {"id": "if1", "type": "flow.ifElse"}
|
||||
assert executor._evalStructuredCondition(img_cond, {"n1": presentation}, item_param=item, node=node) is True
|
||||
assert executor._evalStructuredCondition(txt_cond, {"n1": presentation}, item_param=item, node=node) is True
|
||||
|
||||
|
||||
def test_switch_uses_shared_operators(executor):
|
||||
assert executor._evalSwitchCase("abc", {"operator": "starts_with", "value": "ab"}) is True
|
||||
assert executor._evalSwitchCase([1, 2], {"operator": "length_eq", "value": 2}) is True
|
||||
|
||||
|
||||
def test_switch_resolves_value_kind_for_string_ops(executor):
|
||||
assert executor._evalSwitchCase(
|
||||
"hello",
|
||||
{"operator": "starts_with", "value": "he"},
|
||||
value_kind="string",
|
||||
) is True
|
||||
|
|
|
|||
359
tests/unit/workflow/test_switch_filtered_output.py
Normal file
359
tests/unit/workflow/test_switch_filtered_output.py
Normal file
|
|
@ -0,0 +1,359 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
"""flow.switch ContextBranch: filtered presentation + loop-ready items."""
|
||||
|
||||
import pytest
|
||||
|
||||
from modules.features.graphicalEditor.portTypes import unwrapTransit, wrapTransit
|
||||
from modules.features.graphicalEditor.switchOutput import (
|
||||
build_switch_branch_payload,
|
||||
build_switch_combined_output,
|
||||
build_switch_default_payload,
|
||||
unwrap_transit_for_port,
|
||||
)
|
||||
from modules.workflows.automation2.executionEngine import _is_node_on_active_path
|
||||
from modules.workflows.automation2.executors.flowExecutor import FlowExecutor
|
||||
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
||||
from modules.workflows.methods.methodContext.actions.extractContent import PRESENTATION_KIND
|
||||
|
||||
|
||||
def _presentation_with_text_and_image():
|
||||
return {
|
||||
"kind": PRESENTATION_KIND,
|
||||
"schemaVersion": "1",
|
||||
"outputMode": "parts",
|
||||
"fileOrder": ["doc"],
|
||||
"files": {
|
||||
"doc": {
|
||||
"outputMode": "parts",
|
||||
"data": [
|
||||
{"typeGroup": "text", "id": "t1", "data": "Hello"},
|
||||
{"typeGroup": "image", "id": "i1", "mimeType": "image/png", "data": "YQ=="},
|
||||
],
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _presentation_blob_with_text_and_image():
|
||||
blob_data = "Hello world\n\n[image:img1]\n\nMore text"
|
||||
return {
|
||||
"kind": PRESENTATION_KIND,
|
||||
"schemaVersion": "1",
|
||||
"outputMode": "blob",
|
||||
"fileOrder": ["doc"],
|
||||
"files": {
|
||||
"doc": {
|
||||
"outputMode": "blob",
|
||||
"sourceFileName": "test.pdf",
|
||||
"data": blob_data,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def test_build_switch_branch_payload_filters_blob_image():
|
||||
pres = _presentation_blob_with_text_and_image()
|
||||
payload = build_switch_branch_payload(
|
||||
pres,
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
value_kind="context",
|
||||
match_index=0,
|
||||
)
|
||||
assert payload["filterApplied"] is True
|
||||
assert len(payload["items"]) == 1
|
||||
assert payload["items"][0]["value"]["typeGroup"] == "image"
|
||||
assert "[image:img1]" in payload["data"]["files"]["doc"]["data"]
|
||||
|
||||
|
||||
def test_build_switch_branch_payload_filters_blob_text():
|
||||
pres = _presentation_blob_with_text_and_image()
|
||||
payload = build_switch_branch_payload(
|
||||
pres,
|
||||
{"operator": "contains_content", "value": "text"},
|
||||
value_kind="context",
|
||||
match_index=1,
|
||||
)
|
||||
assert payload["filterApplied"] is True
|
||||
assert len(payload["items"]) == 2
|
||||
assert all(i["value"]["typeGroup"] == "text" for i in payload["items"])
|
||||
filtered = payload["data"]["files"]["doc"]["data"]
|
||||
assert "Hello world" in filtered
|
||||
assert "[image:" not in filtered
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_switch_blob_multi_match():
|
||||
executor = FlowExecutor()
|
||||
pres = _presentation_blob_with_text_and_image()
|
||||
sw_id = "sw1"
|
||||
node_outputs = {
|
||||
"ext1": pres,
|
||||
"_context": {
|
||||
"graphNodesById": {
|
||||
"ext1": {"id": "ext1", "type": "context.extractContent"},
|
||||
sw_id: {"id": sw_id, "type": "flow.switch"},
|
||||
}
|
||||
},
|
||||
}
|
||||
executor._getInputData = lambda *_a, **_k: pres # type: ignore[method-assign]
|
||||
node = {
|
||||
"id": sw_id,
|
||||
"type": "flow.switch",
|
||||
"parameters": {
|
||||
"value": {"type": "ref", "nodeId": "ext1", "path": []},
|
||||
"cases": [
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
{"operator": "contains_content", "value": "text"},
|
||||
],
|
||||
},
|
||||
}
|
||||
out = await executor._switch(node, node_outputs, sw_id, {})
|
||||
assert out["_meta"]["matches"] == [0, 1]
|
||||
assert len(unwrap_transit_for_port(out, 0)["items"]) == 1
|
||||
assert len(unwrap_transit_for_port(out, 1)["items"]) == 2
|
||||
|
||||
|
||||
def test_switch_blob_image_items_get_embedded_file_id():
|
||||
part_id = "dbd27119-cd21-4a62-b5e2-b06d3b81470b"
|
||||
file_id = "storage-file-uuid-1"
|
||||
pres = {
|
||||
"kind": PRESENTATION_KIND,
|
||||
"outputMode": "blob",
|
||||
"fileOrder": ["doc"],
|
||||
"files": {
|
||||
"doc": {
|
||||
"outputMode": "blob",
|
||||
"data": f"Hello\n\n[image:{part_id}]",
|
||||
}
|
||||
},
|
||||
"_meta": {
|
||||
"persistedImageArtifacts": [
|
||||
{"sourcePartId": part_id, "fileId": file_id, "mimeType": "image/png"},
|
||||
]
|
||||
},
|
||||
}
|
||||
payload = build_switch_branch_payload(
|
||||
pres,
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
value_kind="context",
|
||||
match_index=0,
|
||||
)
|
||||
assert len(payload["items"]) == 1
|
||||
slot = payload["items"][0]["value"]
|
||||
assert slot.get("embeddedImageFileId") == file_id
|
||||
|
||||
|
||||
def test_build_switch_branch_payload_filters_images():
|
||||
pres = _presentation_with_text_and_image()
|
||||
case = {"operator": "contains_content", "value": "image"}
|
||||
payload = build_switch_branch_payload(
|
||||
pres,
|
||||
case,
|
||||
value_kind="context",
|
||||
match_index=0,
|
||||
)
|
||||
assert payload["filterApplied"] is True
|
||||
assert payload["contentType"] == "image"
|
||||
assert len(payload["items"]) == 1
|
||||
assert payload["items"][0]["value"]["typeGroup"] == "image"
|
||||
data = payload["data"]
|
||||
assert data["kind"] == PRESENTATION_KIND
|
||||
slots = data["files"]["doc"]["data"]
|
||||
assert len(slots) == 1
|
||||
assert slots[0]["typeGroup"] == "image"
|
||||
|
||||
|
||||
def test_build_switch_default_payload_passthrough():
|
||||
pres = _presentation_with_text_and_image()
|
||||
payload = build_switch_default_payload(pres, match_index=2)
|
||||
assert payload["filterApplied"] is False
|
||||
assert payload["match"] == 2
|
||||
assert payload["data"]["fileOrder"] == pres["fileOrder"]
|
||||
assert len(payload["items"]) == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_switch_executor_match_and_default_branch():
|
||||
executor = FlowExecutor()
|
||||
pres = _presentation_with_text_and_image()
|
||||
ext_id = "ext1"
|
||||
sw_id = "sw1"
|
||||
node_outputs = {
|
||||
ext_id: pres,
|
||||
"_context": {
|
||||
"graphNodesById": {
|
||||
ext_id: {"id": ext_id, "type": "context.extractContent"},
|
||||
sw_id: {"id": sw_id, "type": "flow.switch"},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
def _inp(_nid, _sources, _outputs, _output_index=0):
|
||||
return pres
|
||||
|
||||
executor._getInputData = _inp # type: ignore[method-assign]
|
||||
|
||||
match_node = {
|
||||
"id": sw_id,
|
||||
"type": "flow.switch",
|
||||
"parameters": {
|
||||
"value": {"type": "ref", "nodeId": ext_id, "path": []},
|
||||
"cases": [{"operator": "contains_content", "value": "image"}],
|
||||
},
|
||||
}
|
||||
match_out = await executor._switch(match_node, node_outputs, sw_id, {})
|
||||
match_payload = unwrapTransit(match_out)
|
||||
assert match_out["_meta"]["match"] == 0
|
||||
assert match_out["_meta"]["matches"] == [0]
|
||||
assert match_payload["filterApplied"] is True
|
||||
assert len(match_payload["items"]) == 1
|
||||
assert match_payload["branches"]["0"]["contentType"] == "image"
|
||||
|
||||
default_node = {
|
||||
**match_node,
|
||||
"parameters": {
|
||||
**match_node["parameters"],
|
||||
"cases": [{"operator": "contains_content", "value": "video"}],
|
||||
},
|
||||
}
|
||||
default_out = await executor._switch(default_node, node_outputs, sw_id, {})
|
||||
assert default_out["_meta"]["match"] == 1
|
||||
assert default_out["_meta"]["matches"] == [1]
|
||||
default_payload = unwrapTransit(default_out)
|
||||
assert default_payload["filterApplied"] is False
|
||||
assert default_payload["data"]["fileOrder"] == pres["fileOrder"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_switch_multi_match_text_and_image_branches():
|
||||
executor = FlowExecutor()
|
||||
pres = _presentation_with_text_and_image()
|
||||
sw_id = "sw1"
|
||||
node_outputs = {
|
||||
"ext1": pres,
|
||||
"_context": {
|
||||
"graphNodesById": {
|
||||
"ext1": {"id": "ext1", "type": "context.extractContent"},
|
||||
sw_id: {"id": sw_id, "type": "flow.switch"},
|
||||
}
|
||||
},
|
||||
}
|
||||
executor._getInputData = lambda *_a, **_k: pres # type: ignore[method-assign]
|
||||
|
||||
node = {
|
||||
"id": sw_id,
|
||||
"type": "flow.switch",
|
||||
"parameters": {
|
||||
"value": {"type": "ref", "nodeId": "ext1", "path": []},
|
||||
"cases": [
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
{"operator": "contains_content", "value": "text"},
|
||||
],
|
||||
},
|
||||
}
|
||||
out = await executor._switch(node, node_outputs, sw_id, {})
|
||||
assert out["_meta"]["matches"] == [0, 1]
|
||||
img = unwrap_transit_for_port(out, 0)
|
||||
txt = unwrap_transit_for_port(out, 1)
|
||||
assert img["contentType"] == "image"
|
||||
assert txt["contentType"] == "text"
|
||||
assert len(img["items"]) == 1
|
||||
assert len(txt["items"]) == 1
|
||||
assert img["items"][0]["value"]["typeGroup"] == "image"
|
||||
assert txt["items"][0]["value"]["typeGroup"] == "text"
|
||||
|
||||
|
||||
def test_active_path_allows_all_matching_switch_ports():
|
||||
combined = build_switch_combined_output(
|
||||
_presentation_with_text_and_image(),
|
||||
[
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
{"operator": "contains_content", "value": "text"},
|
||||
],
|
||||
matched_indices=[0, 1],
|
||||
value_kind="context",
|
||||
)
|
||||
sw_out = wrapTransit(combined, {"match": 0, "matches": [0, 1]})
|
||||
node_outputs = {"sw1": sw_out}
|
||||
conn_map = {
|
||||
"loop_img": [("sw1", 0, 0)],
|
||||
"file_txt": [("sw1", 1, 0)],
|
||||
}
|
||||
assert _is_node_on_active_path("loop_img", conn_map, node_outputs)
|
||||
assert _is_node_on_active_path("file_txt", conn_map, node_outputs)
|
||||
assert not _is_node_on_active_path("other", {"other": [("sw1", 2, 0)]}, node_outputs)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_loop_uses_switch_items_ref():
|
||||
executor = FlowExecutor()
|
||||
pres = _presentation_with_text_and_image()
|
||||
branch = build_switch_branch_payload(
|
||||
pres,
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
value_kind="context",
|
||||
match_index=0,
|
||||
)
|
||||
sw_id = "sw1"
|
||||
node_outputs = {sw_id: wrapTransit(branch, {"match": 0})}
|
||||
|
||||
loop_node = {
|
||||
"id": "loop1",
|
||||
"type": "flow.loop",
|
||||
"parameters": {
|
||||
"items": {"type": "ref", "nodeId": sw_id, "path": ["items"]},
|
||||
},
|
||||
}
|
||||
out = await executor._loop(loop_node, node_outputs, "loop1", {})
|
||||
assert out["count"] == 1
|
||||
assert out["items"][0]["value"]["typeGroup"] == "image"
|
||||
|
||||
|
||||
def test_resolve_context_builder_ref_uses_switch_output_port():
|
||||
"""file.create context ref to switch.items must use the wired source output port."""
|
||||
pres = _presentation_with_text_and_image()
|
||||
combined = build_switch_combined_output(
|
||||
pres,
|
||||
[
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
{"operator": "contains_content", "value": "text"},
|
||||
],
|
||||
matched_indices=[0, 1],
|
||||
value_kind="context",
|
||||
)
|
||||
sw_id = "sw1"
|
||||
consumer_id = "fc1"
|
||||
node_outputs = {sw_id: wrapTransit(combined, {"match": 0, "matches": [0, 1]})}
|
||||
input_sources = {consumer_id: {0: (sw_id, 1)}}
|
||||
resolved = resolveParameterReferences(
|
||||
{
|
||||
"context": [
|
||||
{
|
||||
"type": "ref",
|
||||
"nodeId": sw_id,
|
||||
"path": ["items"],
|
||||
}
|
||||
],
|
||||
},
|
||||
node_outputs,
|
||||
consumer_node_id=consumer_id,
|
||||
input_sources=input_sources,
|
||||
)
|
||||
items = resolved["context"]
|
||||
assert isinstance(items, list)
|
||||
assert len(items) == 1
|
||||
assert items[0]["value"]["typeGroup"] == "text"
|
||||
branch = build_switch_branch_payload(
|
||||
_presentation_with_text_and_image(),
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
value_kind="context",
|
||||
match_index=0,
|
||||
)
|
||||
node_outputs = {"sw1": wrapTransit(branch, {"match": 0})}
|
||||
resolved = resolveParameterReferences(
|
||||
{"type": "ref", "nodeId": "sw1", "path": ["items"]},
|
||||
node_outputs,
|
||||
)
|
||||
assert isinstance(resolved, list)
|
||||
assert len(resolved) == 1
|
||||
assert resolved[0]["value"]["typeGroup"] == "image"
|
||||
|
|
@ -37,6 +37,34 @@ class TestValidateGraphStartNode:
|
|||
assert not any("no start node" in e.lower() for e in errs)
|
||||
|
||||
|
||||
def test_switch_second_output_to_ai_prompt_ok(self):
|
||||
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
|
||||
|
||||
node_type_ids = {n["id"] for n in STATIC_NODE_TYPES}
|
||||
graph = {
|
||||
"nodes": [
|
||||
{"id": "t", "type": "trigger.manual", "parameters": {}},
|
||||
{
|
||||
"id": "sw",
|
||||
"type": "flow.switch",
|
||||
"parameters": {
|
||||
"cases": [
|
||||
{"operator": "contains_content", "value": "image"},
|
||||
{"operator": "contains_content", "value": "text"},
|
||||
],
|
||||
},
|
||||
},
|
||||
{"id": "ai", "type": "ai.prompt", "parameters": {"aiPrompt": "hi"}},
|
||||
],
|
||||
"connections": [
|
||||
{"source": "sw", "target": "ai", "sourceOutput": 1, "targetInput": 0},
|
||||
],
|
||||
}
|
||||
errs = validateGraph(graph, node_type_ids)
|
||||
port_errs = [e for e in errs if "Port mismatch" in e]
|
||||
assert port_errs == [], port_errs
|
||||
|
||||
|
||||
class TestResolveParameterReferences:
|
||||
"""Test structured ref/value resolution."""
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue