feat: if/else loop extended to progressive comparison mode

This commit is contained in:
Ida 2026-05-14 18:38:18 +02:00
parent 716837e8fb
commit 7a1deccc2d
14 changed files with 1579 additions and 458 deletions

View file

@ -0,0 +1,605 @@
# Copyright (c) 2025 Patrick Motsch
"""Backend-driven condition operator catalog and value-kind resolution for flow.ifElse."""
from __future__ import annotations
import logging
import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.shared.i18nRegistry import resolveText, t
logger = logging.getLogger(__name__)
VALUE_KINDS = (
"string",
"number",
"boolean",
"datetime",
"array",
"object",
"file",
"context",
"unknown",
)
CONTENT_TYPE_OPTIONS = ("text", "image", "table", "code", "media")
OUTPUT_MODE_OPTIONS = ("blob", "lines", "pages", "chunks", "structured")
LANGUAGE_OPTIONS = ("de", "en", "fr", "it")
MIME_EXAMPLE_OPTIONS = (
"application/pdf",
"image/png",
"image/jpeg",
"text/plain",
"text/csv",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
_NODE_BY_TYPE = {n["id"]: n for n in STATIC_NODE_TYPES}
def _op(
op_id: str,
label_key: str,
*,
needs_value: bool = True,
value_input: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
out: Dict[str, Any] = {"id": op_id, "labelKey": label_key, "needsValue": needs_value}
if value_input is not None:
out["valueInput"] = value_input
return out
def _build_catalog() -> Dict[str, List[Dict[str, Any]]]:
text_in = {"kind": "text"}
num_in = {"kind": "number"}
date_in = {"kind": "date"}
regex_in = {"kind": "regex"}
select = lambda opts, kind: {"kind": kind, "options": list(opts)}
return {
"string": [
_op("eq", "condition.op.eq", value_input=text_in),
_op("neq", "condition.op.neq", value_input=text_in),
_op("contains", "condition.op.contains", value_input=text_in),
_op("not_contains", "condition.op.not_contains", value_input=text_in),
_op("starts_with", "condition.op.starts_with", value_input=text_in),
_op("ends_with", "condition.op.ends_with", value_input=text_in),
_op("regex", "condition.op.regex", value_input=regex_in),
_op("empty", "condition.op.empty", needs_value=False),
_op("not_empty", "condition.op.not_empty", needs_value=False),
],
"number": [
_op("eq", "condition.op.eq", value_input=num_in),
_op("neq", "condition.op.neq", value_input=num_in),
_op("lt", "condition.op.lt", value_input=num_in),
_op("lte", "condition.op.lte", value_input=num_in),
_op("gt", "condition.op.gt", value_input=num_in),
_op("gte", "condition.op.gte", value_input=num_in),
_op("empty", "condition.op.empty", needs_value=False),
_op("not_empty", "condition.op.not_empty", needs_value=False),
],
"boolean": [
_op("is_true", "condition.op.is_true", needs_value=False),
_op("is_false", "condition.op.is_false", needs_value=False),
],
"datetime": [
_op("eq", "condition.op.eq", value_input=date_in),
_op("neq", "condition.op.neq", value_input=date_in),
_op("before", "condition.op.before", value_input=date_in),
_op("after", "condition.op.after", value_input=date_in),
_op("empty", "condition.op.empty", needs_value=False),
_op("not_empty", "condition.op.not_empty", needs_value=False),
],
"array": [
_op("contains", "condition.op.contains", value_input=text_in),
_op("not_contains", "condition.op.not_contains", value_input=text_in),
_op("empty", "condition.op.empty", needs_value=False),
_op("not_empty", "condition.op.not_empty", needs_value=False),
_op("length_eq", "condition.op.length_eq", value_input=num_in),
_op("length_gt", "condition.op.length_gt", value_input=num_in),
_op("length_lt", "condition.op.length_lt", value_input=num_in),
],
"object": [
_op("empty", "condition.op.empty", needs_value=False),
_op("not_empty", "condition.op.not_empty", needs_value=False),
],
"file": [
_op("exists", "condition.op.exists", needs_value=False),
_op("not_exists", "condition.op.not_exists", needs_value=False),
_op("mime_is", "condition.op.mime_is", value_input=select(MIME_EXAMPLE_OPTIONS, "mime")),
_op("mime_contains", "condition.op.mime_contains", value_input=text_in),
_op("empty", "condition.op.empty", needs_value=False),
_op("not_empty", "condition.op.not_empty", needs_value=False),
],
"context": [
_op(
"contains_content",
"condition.op.contains_content",
value_input=select(CONTENT_TYPE_OPTIONS, "contentType"),
),
_op("language_is", "condition.op.language_is", value_input=select(LANGUAGE_OPTIONS, "language")),
_op(
"output_mode_is",
"condition.op.output_mode_is",
value_input=select(OUTPUT_MODE_OPTIONS, "outputMode"),
),
_op("file_count_eq", "condition.op.file_count_eq", value_input=num_in),
_op("file_count_gt", "condition.op.file_count_gt", value_input=num_in),
_op("file_count_lt", "condition.op.file_count_lt", value_input=num_in),
_op("slot_count_eq", "condition.op.slot_count_eq", value_input=num_in),
_op("slot_count_gt", "condition.op.slot_count_gt", value_input=num_in),
_op("slot_count_lt", "condition.op.slot_count_lt", value_input=num_in),
_op("regex_on_text", "condition.op.regex_on_text", value_input=regex_in),
_op("empty", "condition.op.empty", needs_value=False),
_op("not_empty", "condition.op.not_empty", needs_value=False),
],
"unknown": [
_op("eq", "condition.op.eq", value_input=text_in),
_op("empty", "condition.op.empty", needs_value=False),
_op("not_empty", "condition.op.not_empty", needs_value=False),
],
}
CONDITION_OPERATOR_CATALOG: Dict[str, List[Dict[str, Any]]] = _build_catalog()
_LABEL_KEYS = {
"condition.op.eq": t("ist gleich"),
"condition.op.neq": t("ist ungleich"),
"condition.op.contains": t("enthält"),
"condition.op.not_contains": t("enthält nicht"),
"condition.op.starts_with": t("beginnt mit"),
"condition.op.ends_with": t("endet mit"),
"condition.op.regex": t("Regex-Match"),
"condition.op.empty": t("ist leer"),
"condition.op.not_empty": t("ist nicht leer"),
"condition.op.lt": t("kleiner als"),
"condition.op.lte": t(""),
"condition.op.gt": t("größer als"),
"condition.op.gte": t(""),
"condition.op.is_true": t("ist wahr"),
"condition.op.is_false": t("ist falsch"),
"condition.op.before": t("vor"),
"condition.op.after": t("nach"),
"condition.op.exists": t("vorhanden"),
"condition.op.not_exists": t("nicht vorhanden"),
"condition.op.mime_is": t("MIME-Typ ist"),
"condition.op.mime_contains": t("MIME-Typ enthält"),
"condition.op.contains_content": t("enthält Inhaltstyp"),
"condition.op.language_is": t("Sprache ist"),
"condition.op.output_mode_is": t("Ausgabemodus ist"),
"condition.op.file_count_eq": t("Dateianzahl gleich"),
"condition.op.file_count_gt": t("Dateianzahl größer als"),
"condition.op.file_count_lt": t("Dateianzahl kleiner als"),
"condition.op.slot_count_eq": t("Slot-Anzahl gleich"),
"condition.op.slot_count_gt": t("Slot-Anzahl größer als"),
"condition.op.slot_count_lt": t("Slot-Anzahl kleiner als"),
"condition.op.regex_on_text": t("Regex auf extrahiertem Text"),
"condition.op.length_eq": t("Länge gleich"),
"condition.op.length_gt": t("Länge größer als"),
"condition.op.length_lt": t("Länge kleiner als"),
}
def localize_operator_catalog(lang: str = "de") -> Dict[str, List[Dict[str, Any]]]:
"""Serialize catalog with resolved labels for API consumers."""
out: Dict[str, List[Dict[str, Any]]] = {}
for kind, ops in CONDITION_OPERATOR_CATALOG.items():
loc_ops: List[Dict[str, Any]] = []
for op in ops:
entry = dict(op)
label_key = op.get("labelKey", "")
label_src = _LABEL_KEYS.get(str(label_key), label_key)
entry["label"] = resolveText(label_src, lang)
loc_ops.append(entry)
out[kind] = loc_ops
return out
def catalog_type_to_value_kind(catalog_type: str) -> str:
"""Map port-catalog / dataPickOptions type strings to condition valueKind."""
ct = (catalog_type or "").strip()
if not ct or ct == "Any":
return "unknown"
low = ct.lower()
if low in ("str", "string", "email", "url"):
return "string"
if low in ("int", "float", "number"):
return "number"
if low == "bool":
return "boolean"
if low in ("date", "datetime", "timestamp"):
return "datetime"
if low.startswith("list[") or low == "list":
return "array"
if low.startswith("dict") or low == "dict":
return "object"
if low in ("file", "actiondocument", "fileref"):
return "file"
return "unknown"
def _paths_equal(a: List[Any], b: List[Any]) -> bool:
if len(a) != len(b):
return False
return all(str(x) == str(y) for x, y in zip(a, b))
def _is_context_producer(node_type: str) -> bool:
return node_type in ("context.extractContent", "context.mergeContext", "context.setContext")
def _path_suggests_context(path: List[Any], producer_type: str) -> bool:
if not path:
return _is_context_producer(producer_type)
last = str(path[-1])
if last in ("data", "files", "merged", "presentation"):
return True
if "files" in [str(p) for p in path]:
return True
if _is_context_producer(producer_type) and path[0] in ("data", "response", "merged"):
return True
return False
def _path_suggests_file(path: List[Any], producer_type: str) -> bool:
path_str = [str(p) for p in path]
if producer_type == "input.upload":
return True
if "file" in path_str or "documents" in path_str or "mimeType" in path_str or "fileName" in path_str:
return True
if producer_type.startswith("sharepoint.") and "file" in path_str:
return True
return False
def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any]) -> str:
"""Resolve condition valueKind for a DataRef against the workflow graph."""
if not isinstance(ref, dict):
return "unknown"
producer_id = ref.get("nodeId")
path = ref.get("path") or []
if not isinstance(path, list):
path = []
if not producer_id:
return "unknown"
nodes = graph.get("nodes") or []
node_by_id = {n.get("id"): n for n in nodes if n.get("id")}
producer = node_by_id.get(producer_id) or {}
producer_type = str(producer.get("type") or "")
if _path_suggests_context(path, producer_type):
return "context"
if _path_suggests_file(path, producer_type):
tail = str(path[-1]) if path else ""
if tail in ("mimeType", "fileName"):
return "string"
return "file"
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths
target_id = graph.get("targetNodeId") or producer_id
matched_type: Optional[str] = None
for entry in compute_upstream_paths(graph, target_id):
if entry.get("producerNodeId") != producer_id:
continue
entry_path = entry.get("path") or []
if _paths_equal(list(entry_path), list(path)):
matched_type = str(entry.get("type") or "Any")
break
if matched_type is None and path:
parent_path = list(path[:-1])
for entry in compute_upstream_paths(graph, target_id):
if entry.get("producerNodeId") != producer_id:
continue
if _paths_equal(list(entry.get("path") or []), parent_path):
matched_type = str(entry.get("type") or "Any")
break
if matched_type:
vk = catalog_type_to_value_kind(matched_type)
if vk != "unknown":
return vk
if producer_type in ("trigger.form", "input.form") and path and str(path[0]) == "payload":
return "string"
return "unknown"
def resolve_condition_meta(
graph: Dict[str, Any],
ref: Dict[str, Any],
*,
lang: str = "de",
) -> Dict[str, Any]:
"""Return valueKind and localized operators for a DataRef."""
value_kind = resolve_value_kind(graph, ref)
catalog = localize_operator_catalog(lang)
operators = catalog.get(value_kind) or catalog.get("unknown", [])
return {"valueKind": value_kind, "operators": operators}
def _is_empty_value(val: Any) -> bool:
if val is None:
return True
if val == "":
return True
if isinstance(val, (list, dict, tuple)) and len(val) == 0:
return True
return False
def _parse_datetime(val: Any) -> Optional[datetime]:
if val is None:
return None
if hasattr(val, "timestamp"):
return val # type: ignore[return-value]
s = str(val).strip()
if not s:
return None
for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.strptime(s, fmt)
except ValueError:
continue
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except ValueError:
return None
def _compare_dates(left: Any, right: Any, op) -> bool:
try:
a, b = _parse_datetime(left), _parse_datetime(right)
if a is None or b is None:
return False
return op(a, b)
except Exception as e:
logger.warning("_compare_dates failed: left=%s right=%s: %s", left, right, e)
return False
def _file_exists(val: Any) -> bool:
if val is None:
return False
if isinstance(val, dict):
return bool(val.get("url") or val.get("name") or val.get("fileId"))
if isinstance(val, str):
return len(val.strip()) > 0
return bool(val)
def _extract_mime(val: Any) -> str:
if isinstance(val, dict):
return str(val.get("mimeType") or val.get("contentType") or "")
return ""
def _presentation_envelopes_from_value(val: Any) -> List[Dict[str, Any]]:
try:
from modules.workflows.methods.methodContext.actions.extractContent import (
normalize_presentation_envelopes,
)
return normalize_presentation_envelopes(val)
except Exception as e:
logger.debug("_presentation_envelopes_from_value: %s", e)
return []
def _joined_text_from_context(val: Any) -> str:
try:
from modules.workflows.methods.methodContext.actions.extractContent import (
joined_text_from_extract_node_data,
)
return joined_text_from_extract_node_data(val) or ""
except Exception:
return ""
def _iter_presentation_parts(envelope: Dict[str, Any]) -> List[Dict[str, Any]]:
parts: List[Dict[str, Any]] = []
files = envelope.get("files") or {}
if not isinstance(files, dict):
return parts
for bucket in files.values():
if not isinstance(bucket, dict):
continue
data = bucket.get("data")
if isinstance(data, list):
for slot in data:
if isinstance(slot, dict):
parts.append(slot)
elif isinstance(data, dict):
parts.append(data)
return parts
def _context_has_content_type(val: Any, content_type: str) -> bool:
target = (content_type or "").strip().lower()
if not target:
return False
for env in _presentation_envelopes_from_value(val):
for part in _iter_presentation_parts(env):
tg = (part.get("typeGroup") or part.get("contentType") or "").strip().lower()
if target == "media":
if tg in ("image", "media", "video", "audio"):
return True
elif tg == target:
return True
return False
def _guess_language_code(text: str) -> str:
sample = (text or "").strip()[:2000]
if not sample:
return ""
de_hits = len(re.findall(r"\b(der|die|das|und|ist|nicht|mit)\b", sample, re.I))
en_hits = len(re.findall(r"\b(the|and|is|not|with|for)\b", sample, re.I))
fr_hits = len(re.findall(r"\b(le|la|les|et|est|pas|avec)\b", sample, re.I))
it_hits = len(re.findall(r"\b(il|la|lo|gli|e|non|con)\b", sample, re.I))
scores = {"de": de_hits, "en": en_hits, "fr": fr_hits, "it": it_hits}
best = max(scores, key=scores.get)
return best if scores[best] > 0 else ""
def _context_language(val: Any) -> str:
if isinstance(val, dict):
meta = val.get("_meta")
if isinstance(meta, dict):
lang = meta.get("language") or meta.get("detectedLanguage")
if lang:
return str(lang).strip().lower()[:2]
text = _joined_text_from_context(val)
return _guess_language_code(text)
def _context_output_mode(val: Any) -> str:
for env in _presentation_envelopes_from_value(val):
om = env.get("outputMode")
if om:
return str(om)
files = env.get("files") or {}
if isinstance(files, dict):
for bucket in files.values():
if isinstance(bucket, dict) and bucket.get("outputMode"):
return str(bucket.get("outputMode"))
if isinstance(val, dict) and val.get("outputMode"):
return str(val.get("outputMode"))
return ""
def _context_file_count(val: Any) -> int:
for env in _presentation_envelopes_from_value(val):
fo = env.get("fileOrder")
if isinstance(fo, list):
return len(fo)
return 0
def _context_slot_count(val: Any) -> int:
total = 0
for env in _presentation_envelopes_from_value(val):
files = env.get("files") or {}
if not isinstance(files, dict):
continue
for bucket in files.values():
if not isinstance(bucket, dict):
continue
data = bucket.get("data")
if isinstance(data, list):
total += len(data)
elif data is not None:
total += 1
return total
def apply_condition_operator(left: Any, operator: str, right: Any, value_kind: Optional[str] = None) -> bool:
"""Evaluate a single condition operator against a resolved left-hand value."""
op = (operator or "eq").strip()
vk = (value_kind or "unknown").strip()
if op == "eq":
if vk == "datetime":
return _compare_dates(left, right, lambda a, b: a == b)
return left == right
if op == "neq":
if vk == "datetime":
return _compare_dates(left, right, lambda a, b: a != b)
return left != right
if op in ("lt", "lte", "gt", "gte"):
try:
l = float(left) if left is not None else 0
r = float(right) if right is not None else 0
if op == "lt":
return l < r
if op == "lte":
return l <= r
if op == "gt":
return l > r
return l >= r
except (TypeError, ValueError):
return False
if op == "contains":
if isinstance(left, (list, tuple, set)):
return right in left or any(str(right) == str(x) for x in left)
return right is not None and str(right) in str(left or "")
if op == "not_contains":
if isinstance(left, (list, tuple, set)):
return right not in left and not any(str(right) == str(x) for x in left)
return right is None or str(right) not in str(left or "")
if op == "starts_with":
return right is not None and str(left or "").startswith(str(right))
if op == "ends_with":
return right is not None and str(left or "").endswith(str(right))
if op == "regex":
try:
return bool(re.search(str(right or ""), str(left or "")))
except re.error as e:
logger.warning("regex operator failed: %s", e)
return False
if op == "empty":
return _is_empty_value(left)
if op == "not_empty":
return not _is_empty_value(left)
if op == "is_true":
return bool(left)
if op == "is_false":
return not bool(left)
if op == "before":
return _compare_dates(left, right, lambda a, b: a < b)
if op == "after":
return _compare_dates(left, right, lambda a, b: a > b)
if op == "exists":
return _file_exists(left)
if op == "not_exists":
return not _file_exists(left)
if op == "mime_is":
return _extract_mime(left).lower() == str(right or "").lower()
if op == "mime_contains":
return str(right or "").lower() in _extract_mime(left).lower()
if op in ("length_eq", "length_gt", "length_lt"):
try:
length = len(left) if left is not None else 0
r = int(float(right))
if op == "length_eq":
return length == r
if op == "length_gt":
return length > r
return length < r
except (TypeError, ValueError):
return False
if op == "contains_content":
return _context_has_content_type(left, str(right or ""))
if op == "language_is":
return _context_language(left) == str(right or "").strip().lower()[:2]
if op == "output_mode_is":
return _context_output_mode(left) == str(right or "")
if op == "file_count_eq":
return _context_file_count(left) == int(float(right))
if op == "file_count_gt":
return _context_file_count(left) > int(float(right))
if op == "file_count_lt":
return _context_file_count(left) < int(float(right))
if op == "slot_count_eq":
return _context_slot_count(left) == int(float(right))
if op == "slot_count_gt":
return _context_slot_count(left) > int(float(right))
if op == "slot_count_lt":
return _context_slot_count(left) < int(float(right))
if op == "regex_on_text":
try:
text = _joined_text_from_context(left)
return bool(re.search(str(right or ""), text))
except re.error as e:
logger.warning("regex_on_text failed: %s", e)
return False
return False

View file

@ -151,12 +151,23 @@ FLOW_NODES = [
"Die Daten vom Eingangskanal werden an den gewählten Ausgang durchgereicht."
),
"parameters": [
{
"name": "Item",
"type": "Any",
"required": True,
"frontendType": "dataRef",
"description": t("Item, das auf die Bedingung getestet wird"),
},
{
"name": "condition",
"type": "json",
"required": True,
"frontendType": "condition",
"description": t("Bedingung: Feld aus einem vorherigen Schritt und Vergleich"),
"frontendOptions": {
"dependsOn": "Item",
"operatorCatalog": "condition",
},
"description": t("Bedingung auf das gewählte Item"),
},
],
"inputs": 1,

View file

@ -8,6 +8,7 @@ Nodes are defined first; IO/method actions are used at execution time.
import logging
from typing import Dict, List, Any, Optional
from modules.features.graphicalEditor.conditionOperators import localize_operator_catalog
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.features.graphicalEditor.nodeDefinitions.input import FORM_FIELD_TYPES
from modules.features.graphicalEditor.nodeAdapter import bindsActionFromLegacy
@ -147,6 +148,7 @@ def getNodeTypesForApi(
"nodeTypes": localized,
"categories": categories,
"portTypeCatalog": catalogSerialized,
"conditionOperatorCatalog": localize_operator_catalog(language),
"systemVariables": SYSTEM_VARIABLES,
"formFieldTypes": FORM_FIELD_TYPES,
}

View file

@ -26,6 +26,7 @@ from modules.workflows.automation2.runEnvelope import (
normalize_run_envelope,
)
from modules.features.graphicalEditor.entryPoints import find_invocation
from modules.features.graphicalEditor.conditionOperators import resolve_condition_meta
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths, compute_graph_data_sources
from modules.shared.i18nRegistry import apiRouteContext, resolveText
routeApiMsg = apiRouteContext("routeFeatureGraphicalEditor")
@ -192,6 +193,28 @@ def post_upstream_paths(
return {"paths": paths}
@router.post("/{instanceId}/condition-meta")
@limiter.limit("120/minute")
def post_condition_meta(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
body: Dict[str, Any] = Body(...),
language: str = Query("de", description="Localization (en, de, fr)"),
context: RequestContext = Depends(getRequestContext),
) -> dict:
"""Return valueKind and operators for a DataRef (backend-driven If/Else UI)."""
_validateInstanceAccess(instanceId, context)
graph = body.get("graph")
ref = body.get("ref")
node_id = body.get("nodeId")
if not isinstance(graph, dict) or not isinstance(ref, dict):
raise HTTPException(status_code=400, detail=routeApiMsg("graph and ref are required"))
graph_payload = dict(graph)
if node_id:
graph_payload["targetNodeId"] = str(node_id)
return resolve_condition_meta(graph_payload, ref, lang=language)
@router.post("/{instanceId}/graph-data-sources")
@limiter.limit("120/minute")
def post_graph_data_sources(

View file

@ -4,6 +4,7 @@ from __future__ import annotations
from typing import Any, Dict, List, Set
from modules.features.graphicalEditor.conditionOperators import resolve_value_kind
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG, PortSchema, parse_graph_defined_output_schema
from modules.workflows.automation2.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds
@ -167,6 +168,14 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D
]
)
for entry in paths:
ref = {
"nodeId": entry.get("producerNodeId"),
"path": entry.get("path") or [],
}
graph_with_target = {**graph, "targetNodeId": target_node_id}
entry["valueKind"] = resolve_value_kind(graph_with_target, ref)
return paths

View file

@ -33,12 +33,72 @@ class RendererMarkdown(BaseRenderer):
@classmethod
def getAcceptedSectionTypes(cls, formatName: Optional[str] = None) -> List[str]:
"""
Return list of section content types that Markdown renderer accepts.
Markdown renderer accepts all section types except images.
"""Markdown accepts all section types including images.
Images are emitted as sibling files (``extract_media_.png``) with
``![alt](filename)`` relative links in the ``.md`` same pattern as
``RendererHtml`` (main document + sidecar assets).
"""
from modules.datamodels.datamodelJson import supportedSectionTypes
return [st for st in supportedSectionTypes if st != "image"]
return list(supportedSectionTypes)
def _collectImageDocuments(self, jsonContent: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract image sections into sidecar file payloads for markdown export."""
import base64 as _b64
out: List[Dict[str, Any]] = []
documents = jsonContent.get("documents")
if not isinstance(documents, list):
raise ValueError("extractedContent.documents must be a list")
for doc in documents:
if not isinstance(doc, dict):
continue
for section in doc.get("sections") or []:
if not isinstance(section, dict):
continue
if section.get("content_type") != "image":
continue
for element in section.get("elements") or []:
if not isinstance(element, dict):
raise ValueError("image section element must be a dict")
content = element.get("content")
if not isinstance(content, dict):
raise ValueError("image section element missing content dict")
b64 = content.get("base64Data")
if not isinstance(b64, str) or not b64:
raise ValueError(
"image section missing base64Data — markdown export "
"requires binary payload to write sidecar image files"
)
alt = content.get("altText")
if not isinstance(alt, str) or not alt.strip():
raise ValueError("image section missing altText")
mime = content.get("mimeType")
if not isinstance(mime, str) or not mime.strip().startswith("image/"):
raise ValueError("image section missing mimeType")
fname = content.get("fileName")
if not isinstance(fname, str) or not fname.strip():
raise ValueError("image section missing fileName")
safe_name = "".join(
c if c.isalnum() or c in "._-" else "_" for c in fname.strip()
)
if not safe_name:
raise ValueError(f"image fileName sanitized to empty: {fname!r}")
blob = _b64.b64decode(b64, validate=True)
if not blob:
raise ValueError(f"image base64Data decoded to empty bytes ({fname!r})")
out.append({
"filename": safe_name,
"altText": alt.strip(),
"mimeType": mime.strip(),
"bytes": blob,
})
return out
async def render(
self,
@ -49,208 +109,152 @@ class RendererMarkdown(BaseRenderer):
*,
style: Dict[str, Any] = None,
) -> List[RenderedDocument]:
"""Render extracted JSON content to Markdown format."""
"""Render markdown plus sidecar image files (same folder as the ``.md``).
Returns ``[main.md, image1.png, image2.jpg, ]``. Relative ``![alt](file)``
links in the markdown point at those sibling files no API URLs, no
base64 inlined in the markdown text.
"""
_ = style
try:
# Generate markdown from JSON structure
markdownContent = self._generateMarkdownFromJson(extractedContent, title)
# Determine filename from document or title
documents = extractedContent.get("documents", [])
if documents and isinstance(documents[0], dict):
filename = documents[0].get("filename")
if not filename:
filename = self._determineFilename(title, "text/markdown")
else:
filename = self._determineFilename(title, "text/markdown")
# Extract metadata for document type and other info
metadata = extractedContent.get("metadata", {}) if extractedContent else {}
documentType = metadata.get("documentType") if isinstance(metadata, dict) else None
return [
image_docs = self._collectImageDocuments(extractedContent)
markdownContent = self._generateMarkdownFromJson(extractedContent, title)
documents = extractedContent.get("documents") or []
filename: Optional[str] = None
if documents and isinstance(documents[0], dict):
filename = documents[0].get("filename")
if not filename:
filename = self._determineFilename(title, "text/markdown")
metadata = extractedContent.get("metadata") if isinstance(extractedContent, dict) else None
if not isinstance(metadata, dict):
metadata = None
documentType = metadata.get("documentType") if metadata else None
result: List[RenderedDocument] = [
RenderedDocument(
documentData=markdownContent.encode("utf-8"),
mimeType="text/markdown",
filename=filename,
documentType=documentType,
metadata=metadata,
)
]
for img in image_docs:
result.append(
RenderedDocument(
documentData=markdownContent.encode('utf-8'),
mimeType="text/markdown",
filename=filename,
documentType=documentType,
metadata=metadata if isinstance(metadata, dict) else None
documentData=img["bytes"],
mimeType=img["mimeType"],
filename=img["filename"],
)
]
except Exception as e:
self.logger.error(f"Error rendering markdown: {str(e)}")
# Return minimal markdown fallback
fallbackContent = f"# {title}\n\nError rendering report: {str(e)}"
metadata = extractedContent.get("metadata", {}) if extractedContent else {}
documentType = metadata.get("documentType") if isinstance(metadata, dict) else None
return [
RenderedDocument(
documentData=fallbackContent.encode('utf-8'),
mimeType="text/markdown",
filename=self._determineFilename(title, "text/markdown"),
documentType=documentType,
metadata=metadata if isinstance(metadata, dict) else None
)
]
)
return result
def _generateMarkdownFromJson(self, jsonContent: Dict[str, Any], title: str) -> str:
"""Generate markdown content from structured JSON document."""
try:
# Validate JSON structure (standardized schema: {metadata: {...}, documents: [{sections: [...]}]})
if not self._validateJsonStructure(jsonContent):
raise ValueError("JSON content must follow standardized schema: {metadata: {...}, documents: [{sections: [...]}]}")
# Extract sections and metadata from standardized schema
sections = self._extractSections(jsonContent)
metadata = self._extractMetadata(jsonContent)
# Use provided title (which comes from documents[].title) as primary source
# Fallback to metadata.title only if title parameter is empty
documentTitle = title if title else metadata.get("title", "Generated Document")
# Build markdown content
markdownParts = []
# Document title
markdownParts.append(f"# {documentTitle}")
markdownParts.append("")
# Process each section
for section in sections:
sectionMarkdown = self._renderJsonSection(section)
if sectionMarkdown:
markdownParts.append(sectionMarkdown)
markdownParts.append("") # Add spacing between sections
# Add generation info
markdownParts.append("---")
markdownParts.append(f"*Generated: {self._formatTimestamp()}*")
return '\n'.join(markdownParts)
except Exception as e:
self.logger.error(f"Error generating markdown from JSON: {str(e)}")
raise Exception(f"Markdown generation failed: {str(e)}")
if not self._validateJsonStructure(jsonContent):
raise ValueError(
"JSON content must follow standardized schema: "
"{metadata: {...}, documents: [{sections: [...]}]}"
)
sections = self._extractSections(jsonContent)
metadata = self._extractMetadata(jsonContent)
documentTitle = title or (metadata.get("title") if isinstance(metadata, dict) else None)
if not documentTitle:
raise ValueError(
"markdown render: no title given and metadata.title missing — "
"callers must pass an explicit title"
)
markdownParts: List[str] = [f"# {documentTitle}", ""]
for section in sections:
sectionMarkdown = self._renderJsonSection(section)
if sectionMarkdown:
markdownParts.append(sectionMarkdown)
markdownParts.append("")
markdownParts.append("---")
markdownParts.append(f"*Generated: {self._formatTimestamp()}*")
return "\n".join(markdownParts)
def _renderJsonSection(self, section: Dict[str, Any]) -> str:
"""Render a single JSON section to markdown.
Supports three content formats: reference, object (base64), extracted_text.
Errors propagate: unknown section types or malformed payloads must surface,
not be swallowed into a fallback paragraph or ``[Error rendering section]``
marker that hides the real problem.
"""
try:
sectionType = self._getSectionType(section)
sectionData = self._getSectionData(section)
# Check for three content formats from Phase 5D in elements
if isinstance(sectionData, list):
markdownParts = []
for element in sectionData:
element_type = element.get("type", "") if isinstance(element, dict) else ""
# Support three content formats from Phase 5D
if element_type == "reference":
# Document reference format
doc_ref = element.get("documentReference", "")
label = element.get("label", "Reference")
markdownParts.append(f"*[Reference: {label}]*")
continue
elif element_type == "extracted_text":
# Extracted text format
content = element.get("content", "")
source = element.get("source", "")
if content:
source_text = f" *(Source: {source})*" if source else ""
markdownParts.append(f"{content}{source_text}")
continue
# If we processed reference/extracted_text elements, return them
if markdownParts:
return '\n\n'.join(markdownParts)
if sectionType == "table":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonTable(element)
return ""
elif sectionType == "bullet_list":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonBulletList(element)
return ""
elif sectionType == "heading":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonHeading(element)
return ""
elif sectionType == "paragraph":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonParagraph(element)
elif isinstance(sectionData, dict):
return self._renderJsonParagraph(sectionData)
return ""
elif sectionType == "code_block":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonCodeBlock(element)
return ""
elif sectionType == "image":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonImage(element)
return ""
else:
# Fallback to paragraph for unknown types
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonParagraph(element)
elif isinstance(sectionData, dict):
return self._renderJsonParagraph(sectionData)
return ""
except Exception as e:
self.logger.warning(f"Error rendering section {self._getSectionId(section)}: {str(e)}")
return f"*[Error rendering section: {str(e)}]*"
sectionType = self._getSectionType(section)
sectionData = self._getSectionData(section)
if isinstance(sectionData, list):
markdownParts: List[str] = []
for element in sectionData:
element_type = element.get("type", "") if isinstance(element, dict) else ""
if element_type == "reference":
label = element.get("label", "Reference")
markdownParts.append(f"*[Reference: {label}]*")
continue
if element_type == "extracted_text":
content = element.get("content", "")
source = element.get("source", "")
if content:
source_text = f" *(Source: {source})*" if source else ""
markdownParts.append(f"{content}{source_text}")
continue
if markdownParts:
return "\n\n".join(markdownParts)
def _first_element(data: Any) -> Dict[str, Any]:
if isinstance(data, list) and data and isinstance(data[0], dict):
return data[0]
if isinstance(data, dict):
return data
raise ValueError(
f"section type {sectionType!r} expects elements list / dict, got {type(data).__name__}"
)
if sectionType == "table":
return self._renderJsonTable(_first_element(sectionData))
if sectionType == "bullet_list":
return self._renderJsonBulletList(_first_element(sectionData))
if sectionType == "heading":
return self._renderJsonHeading(_first_element(sectionData))
if sectionType == "paragraph":
return self._renderJsonParagraph(_first_element(sectionData))
if sectionType == "code_block":
return self._renderJsonCodeBlock(_first_element(sectionData))
if sectionType == "image":
return self._renderJsonImage(_first_element(sectionData))
raise ValueError(
f"unsupported section content_type {sectionType!r} "
f"(section id={self._getSectionId(section)!r})"
)
def _renderJsonTable(self, tableData: Dict[str, Any]) -> str:
"""Render a JSON table to markdown."""
try:
# Extract from nested content structure: element.content.{headers, rows}
content = tableData.get("content", {})
if not isinstance(content, dict):
return ""
headers = content.get("headers", [])
rows = content.get("rows", [])
if not headers or not rows:
return ""
markdownParts = []
# Create table header
headerLine = " | ".join(str(header) for header in headers)
markdownParts.append(headerLine)
# Add separator line
separatorLine = " | ".join("---" for _ in headers)
markdownParts.append(separatorLine)
# Add data rows
for row in rows:
rowLine = " | ".join(str(cellData) for cellData in row)
markdownParts.append(rowLine)
return '\n'.join(markdownParts)
except Exception as e:
self.logger.warning(f"Error rendering table: {str(e)}")
content = tableData.get("content")
if not isinstance(content, dict):
raise ValueError(
f"table section has invalid content (type={type(content).__name__})"
)
headers = content.get("headers") or []
rows = content.get("rows") or []
if not headers or not rows:
return ""
lines = [
" | ".join(str(h) for h in headers),
" | ".join("---" for _ in headers),
]
for row in rows:
lines.append(" | ".join(str(cell) for cell in row))
return "\n".join(lines)
def _renderInlineRunsMarkdown(self, runs: Any) -> str:
"""Turn Phase-5 inlineRuns (from markdownToDocumentJson) into markdown text."""
@ -289,118 +293,97 @@ class RendererMarkdown(BaseRenderer):
def _renderJsonBulletList(self, listData: Dict[str, Any]) -> str:
"""Render a JSON bullet list to markdown."""
try:
# Extract from nested content structure: element.content.{items}
content = listData.get("content", {})
if not isinstance(content, dict):
return ""
items = content.get("items", [])
if not items:
return ""
markdownParts = []
for item in items:
if isinstance(item, str):
markdownParts.append(f"- {item}")
elif isinstance(item, list):
markdownParts.append(f"- {self._renderInlineRunsMarkdown(item)}")
elif isinstance(item, dict) and "text" in item:
markdownParts.append(f"- {item['text']}")
return '\n'.join(markdownParts)
except Exception as e:
self.logger.warning(f"Error rendering bullet list: {str(e)}")
content = listData.get("content")
if not isinstance(content, dict):
raise ValueError(
f"bullet_list section has invalid content (type={type(content).__name__})"
)
items = content.get("items") or []
if not items:
return ""
lines: List[str] = []
for item in items:
if isinstance(item, str):
lines.append(f"- {item}")
elif isinstance(item, list):
lines.append(f"- {self._renderInlineRunsMarkdown(item)}")
elif isinstance(item, dict) and "text" in item:
lines.append(f"- {item['text']}")
else:
raise ValueError(
f"bullet_list item has unsupported shape (type={type(item).__name__})"
)
return "\n".join(lines)
def _renderJsonHeading(self, headingData: Dict[str, Any]) -> str:
"""Render a JSON heading to markdown."""
try:
# Extract from nested content structure: element.content.{text, level}
content = headingData.get("content", {})
if not isinstance(content, dict):
return ""
text = content.get("text", "")
level = content.get("level", 1)
if text:
level = max(1, min(6, level))
md_level = min(6, level + 1)
return f"{'#' * md_level} {text}"
return ""
except Exception as e:
self.logger.warning(f"Error rendering heading: {str(e)}")
return ""
content = headingData.get("content")
if not isinstance(content, dict):
raise ValueError(
f"heading section has invalid content (type={type(content).__name__})"
)
text = content.get("text")
if not isinstance(text, str) or not text:
raise ValueError("heading section has empty 'text'")
level = content.get("level", 1)
if not isinstance(level, int):
raise ValueError(f"heading 'level' must be int, got {type(level).__name__}")
level = max(1, min(6, level))
md_level = min(6, level + 1)
return f"{'#' * md_level} {text}"
def _renderJsonParagraph(self, paragraphData: Dict[str, Any]) -> str:
"""Render a JSON paragraph to markdown."""
try:
# Extract from nested content structure
content = paragraphData.get("content", {})
top = paragraphData.get("text")
if isinstance(top, str) and top.strip():
if not isinstance(content, dict) or (
not content.get("text") and not content.get("inlineRuns")
):
return top
content = paragraphData.get("content")
top = paragraphData.get("text")
if isinstance(top, str) and top.strip():
if not isinstance(content, dict) or (
not content.get("text") and not content.get("inlineRuns")
):
return top
if isinstance(content, dict):
runs = self._inlineRunsFromContent(content)
if runs:
return self._renderInlineRunsMarkdown(runs)
text = content.get("text", "")
elif isinstance(content, str):
text = content
else:
text = ""
return text if text else ""
if isinstance(content, dict):
runs = self._inlineRunsFromContent(content)
if runs:
return self._renderInlineRunsMarkdown(runs)
text = content.get("text", "")
return text if isinstance(text, str) else ""
if isinstance(content, str):
return content
raise ValueError(
f"paragraph section has invalid content (type={type(content).__name__})"
)
except Exception as e:
self.logger.warning(f"Error rendering paragraph: {str(e)}")
return ""
def _renderJsonCodeBlock(self, codeData: Dict[str, Any]) -> str:
"""Render a JSON code block to markdown."""
try:
# Extract from nested content structure
content = codeData.get("content", {})
if not isinstance(content, dict):
return ""
code = content.get("code", "")
language = content.get("language", "")
if code:
if language:
return f"```{language}\n{code}\n```"
else:
return f"```\n{code}\n```"
return ""
except Exception as e:
self.logger.warning(f"Error rendering code block: {str(e)}")
return ""
content = codeData.get("content")
if not isinstance(content, dict):
raise ValueError(
f"code_block section has invalid content (type={type(content).__name__})"
)
code = content.get("code")
if not isinstance(code, str) or not code:
raise ValueError("code_block section has empty 'code'")
language = content.get("language") or ""
return f"```{language}\n{code}\n```" if language else f"```\n{code}\n```"
def _renderJsonImage(self, imageData: Dict[str, Any]) -> str:
"""Render a JSON image to markdown."""
try:
# Extract from nested content structure: element.content.{base64Data, altText, caption}
content = imageData.get("content", {})
if not isinstance(content, dict):
return ""
altText = content.get("altText", "Image")
base64Data = content.get("base64Data", "")
if base64Data:
# For base64 images, we can't embed them directly in markdown
# So we'll use a placeholder with the alt text
return f"![{altText}](data:image/png;base64,{base64Data[:50]}...)"
else:
return f"![{altText}](image-placeholder)"
except Exception as e:
self.logger.warning(f"Error rendering image: {str(e)}")
return f"![{imageData.get('altText', 'Image')}](image-error)"
"""Render image as relative ``![alt](fileName)`` link to a sidecar file."""
content = imageData.get("content")
if not isinstance(content, dict):
raise ValueError(
f"image section has invalid content (type={type(content).__name__})"
)
altText = content.get("altText")
if not isinstance(altText, str) or not altText.strip():
raise ValueError("image section is missing 'altText'")
fileName = content.get("fileName")
if not isinstance(fileName, str) or not fileName.strip():
raise ValueError("image section is missing 'fileName' for relative markdown link")
safe_name = "".join(
c if c.isalnum() or c in "._-" else "_" for c in fileName.strip()
)
if not safe_name:
raise ValueError(f"image fileName sanitized to empty: {fileName!r}")
return f"![{altText.strip()}]({safe_name})"

View file

@ -2,8 +2,9 @@
# Flow control node executor (ifElse, switch, loop, merge).
import logging
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from modules.features.graphicalEditor.conditionOperators import apply_condition_operator, resolve_value_kind
from modules.features.graphicalEditor.portTypes import wrapTransit, unwrapTransit
logger = logging.getLogger(__name__)
@ -65,20 +66,29 @@ class FlowExecutor:
nodeId: str,
inputSources: Dict,
) -> Any:
condParam = (node.get("parameters") or {}).get("condition")
params = node.get("parameters") or {}
condParam = params.get("condition")
itemParam = params.get("Item")
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
ok = self._evalConditionParam(condParam, nodeOutputs)
ok = self._evalConditionParam(condParam, nodeOutputs, item_param=itemParam, node=node)
return wrapTransit(
unwrapTransit(inp) if inp else inp,
{"branch": 0 if ok else 1, "conditionResult": ok},
)
def _evalConditionParam(self, condParam: Any, nodeOutputs: Dict) -> bool:
"""Evaluate condition: structured {type,ref,operator,value} or legacy string/ref."""
def _evalConditionParam(
self,
condParam: Any,
nodeOutputs: Dict,
*,
item_param: Any = None,
node: Optional[Dict] = None,
) -> bool:
"""Evaluate condition: structured {operator,value} with Item dataRef, or legacy."""
if condParam is None:
return False
if isinstance(condParam, dict) and condParam.get("type") == "condition":
return self._evalStructuredCondition(condParam, nodeOutputs)
return self._evalStructuredCondition(condParam, nodeOutputs, item_param=item_param, node=node)
from modules.workflows.automation2.graphUtils import resolveParameterReferences
resolved = resolveParameterReferences(condParam, nodeOutputs)
return self._evalCondition(resolved)
@ -101,55 +111,34 @@ class FlowExecutor:
return None
return current
def _evalStructuredCondition(self, cond: Dict, nodeOutputs: Dict) -> bool:
"""Evaluate structured {ref, operator, value} condition."""
ref = cond.get("ref")
if not ref or ref.get("type") != "ref":
return False
node_id = ref.get("nodeId")
path = ref.get("path") or []
left = self._get_by_path(nodeOutputs.get(node_id), list(path))
def _evalStructuredCondition(
self,
cond: Dict,
nodeOutputs: Dict,
*,
item_param: Any = None,
node: Optional[Dict] = None,
) -> bool:
"""Evaluate structured {operator, value} with Item dataRef (legacy: condition.ref)."""
from modules.workflows.automation2.graphUtils import resolveParameterReferences
left_ref = item_param
if left_ref is None or (isinstance(left_ref, dict) and not left_ref):
left_ref = cond.get("ref")
left = resolveParameterReferences(left_ref, nodeOutputs) if left_ref is not None else None
operator = cond.get("operator", "eq")
right = cond.get("value")
if operator == "eq":
return left == right
if operator == "neq":
return left != right
if operator in ("lt", "lte", "gt", "gte"):
try:
l, r = float(left) if left is not None else 0, float(right) if right is not None else 0
if operator == "lt":
return l < r
if operator == "lte":
return l <= r
if operator == "gt":
return l > r
if operator == "gte":
return l >= r
except (TypeError, ValueError):
return False
if operator == "contains":
return right is not None and str(right) in str(left or "")
if operator == "not_contains":
return right is None or str(right) not in str(left or "")
if operator == "empty":
return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0)
if operator == "not_empty":
return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0)
if operator == "is_true":
return bool(left)
if operator == "is_false":
return not bool(left)
if operator == "before":
return self._compare_dates(left, right, lambda a, b: a < b)
if operator == "after":
return self._compare_dates(left, right, lambda a, b: a > b)
if operator == "exists":
return self._file_exists(left)
if operator == "not_exists":
return not self._file_exists(left)
return False
value_kind = "unknown"
ref_for_kind = left_ref if isinstance(left_ref, dict) else cond.get("ref")
if isinstance(ref_for_kind, dict) and ref_for_kind.get("nodeId") and node:
graph_stub = {
"nodes": [{"id": node.get("id"), "type": node.get("type")}],
"targetNodeId": node.get("id"),
}
value_kind = resolve_value_kind(graph_stub, ref_for_kind)
return apply_condition_operator(left, str(operator), right, value_kind)
def _compare_dates(self, left: Any, right: Any, op) -> bool:
"""Compare left/right as dates; op(a,b) is the comparison."""
@ -236,45 +225,7 @@ class FlowExecutor:
else:
operator = "eq"
right = case
# Same logic as _evalStructuredCondition but with explicit left/right
if operator == "eq":
return left == right
if operator == "neq":
return left != right
if operator in ("lt", "lte", "gt", "gte"):
try:
l, r = float(left) if left is not None else 0, float(right) if right is not None else 0
if operator == "lt":
return l < r
if operator == "lte":
return l <= r
if operator == "gt":
return l > r
if operator == "gte":
return l >= r
except (TypeError, ValueError):
return False
if operator == "contains":
return right is not None and str(right) in str(left or "")
if operator == "not_contains":
return right is None or str(right) not in str(left or "")
if operator == "empty":
return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0)
if operator == "not_empty":
return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0)
if operator == "is_true":
return bool(left)
if operator == "is_false":
return not bool(left)
if operator == "before":
return self._compare_dates(left, right, lambda a, b: a < b)
if operator == "after":
return self._compare_dates(left, right, lambda a, b: a > b)
if operator == "exists":
return self._file_exists(left)
if operator == "not_exists":
return not self._file_exists(left)
return False
return apply_condition_operator(left, str(operator), right)
async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
params = node.get("parameters") or {}

View file

@ -194,48 +194,41 @@ class MethodBase:
return wrapper
def _validateParameters(self, parameters: Dict[str, Any], paramDefs: Dict[str, WorkflowActionParameter]) -> Dict[str, Any]:
"""Validate parameters against definitions
IMPORTANT: System parameters (like parentOperationId, expectedDocumentFormats) are preserved
even if they're not in the parameter definitions, as they're used internally by the framework.
"""Validate declared parameters; pass through unknown ones from the node definition.
The graphical-editor node definition is the source of truth for the full UI parameter
list. Actions only need to declare the parameters they want validated/defaulted; any
additional parameter passed in by the executor (e.g. contentFilter, pdfExtractMode,
outputMode for context.extractContent) is preserved so the action can read it.
System parameters (parentOperationId, _runContext, _upstreamPayload, ...) are always
preserved as before.
"""
validated = {}
# System parameters that should always be preserved, even if not in paramDefs
systemParams = [
'parentOperationId',
'expectedDocumentFormats',
# Injected by automation2 ActionNodeExecutor (graph node definitions)
'_runContext',
'_upstreamPayload',
'_branchInputs',
'_workflowNodeId',
]
for sysParam in systemParams:
if sysParam in parameters:
validated[sysParam] = parameters[sysParam]
validated: Dict[str, Any] = {}
for paramName, paramDef in paramDefs.items():
value = parameters.get(paramName)
# Check required
if paramDef.required and value is None:
raise ValueError(f"Required parameter '{paramName}' is missing")
# Use default if not provided
if value is None and paramDef.default is not None:
value = paramDef.default
# Type validation
if value is not None:
value = self._validateType(value, paramDef.type)
# Custom validation rules
if paramDef.validation and value is not None:
self._applyValidationRules(value, paramDef.validation)
validated[paramName] = value
# Preserve every additional parameter the executor passed in (node-defined params,
# system params, declarative injections). This keeps the node definition authoritative.
for k, v in parameters.items():
if k not in validated:
validated[k] = v
return validated
def _validateType(self, value: Any, expectedType: str) -> Any:

View file

@ -255,10 +255,17 @@ def parse_presentation_parameters(parameters: Dict[str, Any]) -> Dict[str, Any]:
pdf_mode = "all"
if pdf_mode not in _PDF_EXTRACT_PRESENTATION_MODES:
pdf_mode = "all"
# Coerce pdfExtractMode to match contentFilter intent. contentFilter is the
# authoritative user choice; pdfExtractMode is a presentation-layer detail that
# must stay consistent with it.
if content_filter == "all" and pdf_mode == "text":
pdf_mode = "all"
elif content_filter == "imagesOnly" and pdf_mode in ("text", "tables"):
elif content_filter == "imagesOnly" and pdf_mode != "images":
pdf_mode = "images"
elif content_filter == "textOnly" and pdf_mode not in ("text", "tables"):
pdf_mode = "text"
elif content_filter == "noImages" and pdf_mode == "images":
pdf_mode = "text"
return {
"outputMode": output_mode,
"splitBy": split_by,
@ -1287,41 +1294,63 @@ def _get_mgmt_for_presentation_render(services: Any) -> Optional[Any]:
return None
def _resize_image_bytes_for_document(image_bytes: bytes) -> bytes:
try:
from PIL import Image as PILImage
def _sniff_image_mime(image_bytes: bytes) -> str:
"""Detect image mime type from raw bytes (magic numbers).
img = PILImage.open(BytesIO(image_bytes))
if img.mode in ("RGBA", "LA"):
bg = PILImage.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode == "P":
img = img.convert("RGBA")
bg = PILImage.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode != "RGB":
img = img.convert("RGB")
if max(img.size) > _IMAGE_MAX_DIMENSION:
img.thumbnail((_IMAGE_MAX_DIMENSION, _IMAGE_MAX_DIMENSION), PILImage.BILINEAR)
out = BytesIO()
img.save(out, format="JPEG", quality=85, optimize=True)
return out.getvalue()
except Exception as exc:
logger.warning("presentation render: image resize failed (%s)", exc)
return image_bytes
Raises ``ValueError`` for unknown / unreadable signatures callers must NOT
silently fall back to a guessed mime type, because that produces broken
renders downstream (wrong content-type in data URIs, wrong file extensions).
"""
if not image_bytes or len(image_bytes) < 12:
raise ValueError(
f"image bytes too short to detect mime type ({len(image_bytes) if image_bytes else 0} bytes)"
)
head = image_bytes[:12]
if head[:8] == b"\x89PNG\r\n\x1a\n":
return "image/png"
if head[:3] == b"\xff\xd8\xff":
return "image/jpeg"
if head[:6] in (b"GIF87a", b"GIF89a"):
return "image/gif"
if head[:4] == b"RIFF" and head[8:12] == b"WEBP":
return "image/webp"
if head[:2] == b"BM":
return "image/bmp"
if head[:4] in (b"II*\x00", b"MM\x00*"):
return "image/tiff"
raise ValueError(f"unknown image signature: {head[:8]!r}")
def _resize_image_bytes_for_document(image_bytes: bytes) -> bytes:
from PIL import Image as PILImage
img = PILImage.open(BytesIO(image_bytes))
if img.mode in ("RGBA", "LA"):
bg = PILImage.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode == "P":
img = img.convert("RGBA")
bg = PILImage.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode != "RGB":
img = img.convert("RGB")
if max(img.size) > _IMAGE_MAX_DIMENSION:
img.thumbnail((_IMAGE_MAX_DIMENSION, _IMAGE_MAX_DIMENSION), PILImage.BILINEAR)
out = BytesIO()
img.save(out, format="JPEG", quality=85, optimize=True)
return out.getvalue()
def _load_image_bytes_by_file_id(services: Any, file_id: str) -> Optional[bytes]:
mgmt = _get_mgmt_for_presentation_render(services)
if not mgmt or not hasattr(mgmt, "getFileData"):
return None
try:
return mgmt.getFileData(str(file_id))
except Exception as exc:
logger.warning("presentation render: getFileData(%s) failed: %s", file_id, exc)
return None
raise ValueError(
"no management interface available to load persisted image bytes — "
"services.interfaceDbComponent / mandate / instance must be set"
)
return mgmt.getFileData(str(file_id))
def _inline_runs_from_presentation_lines(lines: List[Any]) -> List[Dict[str, Any]]:
@ -1470,25 +1499,34 @@ def presentation_envelopes_to_document_json(
def _append_image_slot(slot: Dict[str, Any]) -> None:
fid = slot.get("embeddedImageFileId")
if not fid:
return
raise ValueError(
"image slot is missing embeddedImageFileId — "
"extractContent must persist every image part before handover"
)
blob = _load_image_bytes_by_file_id(services, str(fid))
if not blob:
return
raise ValueError(
f"could not load persisted image bytes for fileId={fid!r}"
)
if len(blob) > _MAX_IMAGE_EMBED_BYTES:
blob = _resize_image_bytes_for_document(blob)
alt = (
slot.get("embeddedImageFileName")
or slot.get("label")
or f"image_{fid}"
)
name = slot.get("embeddedImageFileName") or slot.get("label")
if not name:
raise ValueError(
f"image slot is missing embeddedImageFileName/label for fileId={fid!r}"
)
mime = _sniff_image_mime(blob)
sections.append({
"id": _next_id(),
"content_type": "image",
"order": order,
"elements": [{
"content": {
"altText": str(alt),
"altText": str(name),
"base64Data": _b64.b64encode(blob).decode("ascii"),
"fileId": str(fid),
"fileName": str(name),
"mimeType": mime,
},
}],
})

View file

@ -0,0 +1,49 @@
# Copyright (c) 2025 Patrick Motsch
"""Tests for backend-driven condition operator catalog."""
from modules.features.graphicalEditor.conditionOperators import (
CONDITION_OPERATOR_CATALOG,
VALUE_KINDS,
apply_condition_operator,
catalog_type_to_value_kind,
localize_operator_catalog,
)
def test_all_value_kinds_have_operators():
for kind in VALUE_KINDS:
assert kind in CONDITION_OPERATOR_CATALOG
assert len(CONDITION_OPERATOR_CATALOG[kind]) > 0
def test_operator_ids_unique_per_kind():
for kind, ops in CONDITION_OPERATOR_CATALOG.items():
ids = [o["id"] for o in ops]
assert len(ids) == len(set(ids)), f"duplicate operator id in {kind}"
def test_localize_operator_catalog_has_labels():
loc = localize_operator_catalog("de")
assert "string" in loc
assert all("label" in o and o["label"] for o in loc["string"])
def test_catalog_type_mapping():
assert catalog_type_to_value_kind("str") == "string"
assert catalog_type_to_value_kind("int") == "number"
assert catalog_type_to_value_kind("bool") == "boolean"
assert catalog_type_to_value_kind("List[Any]") == "array"
assert catalog_type_to_value_kind("Dict") == "object"
def test_string_operators_apply():
assert apply_condition_operator("hello", "starts_with", "he", "string")
assert apply_condition_operator("hello", "ends_with", "lo", "string")
assert apply_condition_operator("hello", "regex", "ell", "string")
assert not apply_condition_operator("hello", "contains", "xyz", "string")
def test_array_length_operators():
assert apply_condition_operator([1, 2, 3], "length_eq", 3, "array")
assert apply_condition_operator([1, 2, 3], "length_gt", 2, "array")
assert apply_condition_operator([], "empty", None, "array")

View file

@ -0,0 +1,60 @@
# Copyright (c) 2025 Patrick Motsch
"""Tests for condition valueKind resolution."""
from modules.features.graphicalEditor.conditionOperators import resolve_value_kind
def _graph(nodes, connections=None, target=None):
return {
"nodes": nodes,
"connections": connections or [],
"targetNodeId": target or nodes[-1]["id"],
}
def test_form_payload_field_is_string():
graph = _graph(
[
{"id": "f1", "type": "input.form", "parameters": {"formFields": [{"name": "email", "type": "email"}]}},
{"id": "if1", "type": "flow.ifElse", "parameters": {}},
],
target="if1",
)
ref = {"nodeId": "f1", "path": ["payload", "email"]}
assert resolve_value_kind(graph, ref) == "string"
def test_extract_content_data_is_context():
graph = _graph(
[
{"id": "ext1", "type": "context.extractContent", "parameters": {}},
{"id": "if1", "type": "flow.ifElse", "parameters": {}},
],
target="if1",
)
ref = {"nodeId": "ext1", "path": ["data"]}
assert resolve_value_kind(graph, ref) == "context"
def test_upload_file_is_file():
graph = _graph(
[
{"id": "up1", "type": "input.upload", "parameters": {}},
{"id": "if1", "type": "flow.ifElse", "parameters": {}},
],
target="if1",
)
ref = {"nodeId": "up1", "path": ["file"]}
assert resolve_value_kind(graph, ref) == "file"
def test_upload_mime_is_string():
graph = _graph(
[
{"id": "up1", "type": "input.upload", "parameters": {}},
{"id": "if1", "type": "flow.ifElse", "parameters": {}},
],
target="if1",
)
ref = {"nodeId": "up1", "path": ["file", "mimeType"]}
assert resolve_value_kind(graph, ref) == "string"

View file

@ -568,6 +568,7 @@ def test_presentation_envelopes_preserves_data_slot_order_text_image_text():
"typeGroup": "image",
"mimeType": "image/png",
"embeddedImageFileId": "00000000-0000-0000-0000-000000000001",
"embeddedImageFileName": "img.png",
},
{"typeGroup": "text", "mimeType": "text/plain", "lines": ["After"]},
],
@ -659,7 +660,8 @@ def test_presentation_envelopes_to_document_json_image_slot():
class _Mgmt:
def getFileData(self, file_id):
assert file_id == fid
return b"\x89PNG\r\n"
# Valid PNG signature + enough bytes for mime sniffing (>= 12 bytes).
return b"\x89PNG\r\n\x1a\n" + b"\x00" * 16
class _Svc:
interfaceDbComponent = _Mgmt()

View file

@ -0,0 +1,66 @@
# Copyright (c) 2025 Patrick Motsch
"""FlowExecutor structured condition evaluation with Item dataRef."""
import pytest
from modules.workflows.automation2.executors.flowExecutor import FlowExecutor
from modules.workflows.methods.methodContext.actions.extractContent import PRESENTATION_KIND
@pytest.fixture
def executor():
return FlowExecutor()
def test_if_else_uses_item_param(executor):
node_outputs = {
"n1": {"payload": {"status": "ok"}},
}
node = {
"id": "if1",
"type": "flow.ifElse",
"parameters": {
"Item": {"type": "ref", "nodeId": "n1", "path": ["payload", "status"]},
"condition": {"type": "condition", "operator": "eq", "value": "ok"},
},
}
ok = executor._evalStructuredCondition(
node["parameters"]["condition"],
node_outputs,
item_param=node["parameters"]["Item"],
node=node,
)
assert ok is True
def test_legacy_condition_ref_fallback(executor):
node_outputs = {"n1": {"count": 5}}
node = {"id": "if1", "type": "flow.ifElse", "parameters": {}}
cond = {
"type": "condition",
"ref": {"type": "ref", "nodeId": "n1", "path": ["count"]},
"operator": "gt",
"value": 3,
}
assert executor._evalStructuredCondition(cond, node_outputs, node=node) is True
def test_context_contains_content(executor):
presentation = {
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["f1"],
"files": {
"f1": {
"outputMode": "lines",
"data": [{"typeGroup": "text", "lines": ["Hallo Welt"]}],
}
},
}
cond = {"type": "condition", "operator": "contains_content", "value": "text"}
assert executor._evalStructuredCondition(cond, {"n1": presentation}, item_param={"type": "ref", "nodeId": "n1", "path": []}, node={"id": "if1", "type": "flow.ifElse"}) is True
def test_switch_uses_shared_operators(executor):
assert executor._evalSwitchCase("abc", {"operator": "starts_with", "value": "ab"}) is True
assert executor._evalSwitchCase([1, 2], {"operator": "length_eq", "value": 2}) is True

View file

@ -371,3 +371,332 @@ def test_no_node_named_is_merge_node_in_engine():
"""Legacy _isMergeNode alias must be removed from executionEngine."""
import modules.workflows.automation2.executionEngine as eng
assert not hasattr(eng, "_isMergeNode"), "_isMergeNode legacy alias must be deleted"
# ---------------------------------------------------------------------------
# 13. methodBase parameter passthrough — node-defined params must reach the action
# ---------------------------------------------------------------------------
def test_method_base_validate_parameters_passes_through_undeclared_keys():
"""_validateParameters must keep parameters the action did not formally declare.
Regression: WorkflowActionDefinition for context.extractContent only declares
``documentList``, but the node exposes contentFilter, pdfExtractMode, outputMode, ...
Those MUST reach the action implementation.
"""
from modules.workflows.methods.methodBase import MethodBase
from modules.datamodels.datamodelWorkflowActions import WorkflowActionParameter
from modules.shared.frontendTypes import FrontendType
paramDefs = {
"documentList": WorkflowActionParameter(
name="documentList", type="Any", frontendType=FrontendType.HIDDEN,
required=True, description="docs",
),
}
class _Svc:
pass
mb = MethodBase.__new__(MethodBase)
mb.services = _Svc()
incoming = {
"documentList": ["doc1"],
"contentFilter": "imagesOnly",
"pdfExtractMode": "all",
"outputMode": "lines",
"_runContext": {"mandateId": "m"},
"parentOperationId": "op1",
}
validated = mb._validateParameters(incoming, paramDefs)
assert validated["documentList"] == ["doc1"]
assert validated["contentFilter"] == "imagesOnly", (
"contentFilter must pass through even though the action did not declare it"
)
assert validated["pdfExtractMode"] == "all"
assert validated["outputMode"] == "lines"
assert validated["_runContext"] == {"mandateId": "m"}
assert validated["parentOperationId"] == "op1"
def test_parse_presentation_parameters_imagesonly_coerces_pdf_mode_to_images():
"""contentFilter=imagesOnly must override pdfExtractMode=all (node default)."""
from modules.workflows.methods.methodContext.actions.extractContent import (
parse_presentation_parameters,
)
cfg = parse_presentation_parameters({"contentFilter": "imagesOnly", "pdfExtractMode": "all"})
assert cfg["pdfExtractMode"] == "images", (
"imagesOnly + pdfExtractMode=all must coerce to 'images' — otherwise text parts "
"leak into the presentation layer."
)
def test_parse_presentation_parameters_textonly_coerces_pdf_mode():
from modules.workflows.methods.methodContext.actions.extractContent import (
parse_presentation_parameters,
)
cfg = parse_presentation_parameters({"contentFilter": "textOnly", "pdfExtractMode": "images"})
assert cfg["pdfExtractMode"] == "text"
def test_sniff_image_mime_recognizes_common_signatures():
from modules.workflows.methods.methodContext.actions.extractContent import (
_sniff_image_mime,
)
assert _sniff_image_mime(b"\x89PNG\r\n\x1a\n" + b"\x00" * 8) == "image/png"
assert _sniff_image_mime(b"\xff\xd8\xff\xe0" + b"\x00" * 8) == "image/jpeg"
assert _sniff_image_mime(b"GIF89a" + b"\x00" * 8) == "image/gif"
assert _sniff_image_mime(b"RIFF" + b"\x00\x00\x00\x00" + b"WEBP") == "image/webp"
def test_sniff_image_mime_raises_on_unknown_signature():
"""No silent fallback to image/png — unknown signatures must error out."""
import pytest as _pt
from modules.workflows.methods.methodContext.actions.extractContent import (
_sniff_image_mime,
)
with _pt.raises(ValueError):
_sniff_image_mime(b"NOT_AN_IMAGE_" + b"\x00" * 8)
with _pt.raises(ValueError):
_sniff_image_mime(b"")
def test_markdown_renderer_image_uses_relative_path_and_emits_sidecar_files():
"""Images: relative ![alt](file.png) in md + separate image RenderedDocuments."""
import asyncio
import base64 as _b64
from modules.serviceCenter.services.serviceGeneration.renderers.rendererMarkdown import (
RendererMarkdown,
)
png_b64 = _b64.b64encode(
b"\x89PNG\r\n\x1a\n" + b"\x00" * 16
).decode("ascii")
content = {
"metadata": {"title": "doc"},
"documents": [{
"id": "d1",
"title": "doc",
"outputFormat": "md",
"language": "de",
"sections": [{
"id": "s1",
"content_type": "image",
"order": 1,
"elements": [{
"content": {
"altText": "alpha.png",
"fileName": "alpha.png",
"mimeType": "image/png",
"base64Data": png_b64,
},
}],
}],
}],
}
r = RendererMarkdown()
rendered = asyncio.run(r.render(content, title="doc"))
assert len(rendered) == 2, "markdown render must return .md + sidecar image"
md = rendered[0].documentData.decode("utf-8")
assert "![alpha.png](alpha.png)" in md
assert "/api/files/" not in md
assert "base64" not in md.lower()
assert rendered[1].filename == "alpha.png"
assert rendered[1].mimeType == "image/png"
assert len(rendered[1].documentData) > 0
def test_markdown_renderer_image_raises_without_base64_data():
"""Missing base64Data must fail — no API URL fallback."""
import asyncio
import pytest as _pt
from modules.serviceCenter.services.serviceGeneration.renderers.rendererMarkdown import (
RendererMarkdown,
)
content = {
"metadata": {},
"documents": [{
"id": "d1",
"title": "doc",
"outputFormat": "md",
"language": "de",
"sections": [{
"id": "s1",
"content_type": "image",
"order": 1,
"elements": [{
"content": {
"altText": "beta.jpg",
"fileName": "beta.jpg",
"mimeType": "image/jpeg",
"fileId": "FILE-1",
},
}],
}],
}],
}
r = RendererMarkdown()
with _pt.raises(ValueError, match="base64Data"):
asyncio.run(r.render(content, title="doc"))
def test_markdown_renderer_unknown_section_type_raises():
"""No fallback to paragraph — unknown section types must surface."""
import asyncio
import pytest as _pt
from modules.serviceCenter.services.serviceGeneration.renderers.rendererMarkdown import (
RendererMarkdown,
)
content = {
"metadata": {},
"documents": [{
"id": "d1",
"title": "doc",
"outputFormat": "md",
"language": "de",
"sections": [{
"id": "s1",
"content_type": "totally_unknown",
"order": 1,
"elements": [{"content": {"text": "x"}}],
}],
}],
}
r = RendererMarkdown()
with _pt.raises(ValueError, match="unsupported section content_type"):
asyncio.run(r.render(content, title="doc"))
def test_markdown_renderer_accepts_image_section_type():
"""Regression: markdown must declare 'image' as accepted to avoid silent filtering."""
from modules.serviceCenter.services.serviceGeneration.renderers.rendererMarkdown import (
RendererMarkdown,
)
accepted = RendererMarkdown.getAcceptedSectionTypes("md")
assert "image" in accepted, "image must be in accepted section types for markdown"
def test_extract_image_slot_carries_file_id_and_mime():
"""Presentation→document conversion must propagate fileId & mimeType to renderers."""
from modules.workflows.methods.methodContext.actions.extractContent import (
presentation_envelopes_to_document_json,
PRESENTATION_KIND,
PRESENTATION_SCHEMA_VERSION,
)
class _MgmtStub:
def getFileData(self, fid):
return b"\xff\xd8\xff\xe0" + b"\x00" * 100
class _Services:
def __init__(self):
self.interfaceDbComponent = _MgmtStub()
envelope = {
"schemaVersion": PRESENTATION_SCHEMA_VERSION,
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["file_1_x.pdf"],
"files": {
"file_1_x.pdf": {
"outputMode": "lines",
"sourceFileName": "x.pdf",
"data": [
{
"id": "img1",
"typeGroup": "image",
"mimeType": "image/jpeg",
"embeddedImageFileId": "FILE-7",
"embeddedImageFileName": "extract_media_x.jpg",
"label": "x",
}
],
}
},
}
doc_json = presentation_envelopes_to_document_json(
envelope, title="t", language="de", services=_Services()
)
sections = doc_json["documents"][0]["sections"]
image_sections = [s for s in sections if s.get("content_type") == "image"]
assert len(image_sections) == 1
content = image_sections[0]["elements"][0]["content"]
assert content.get("fileId") == "FILE-7"
assert content.get("mimeType") == "image/jpeg", (
f"mime must be sniffed from bytes (JPEG magic), got {content.get('mimeType')!r}"
)
assert content.get("base64Data"), "base64Data must be present for embed-capable renderers"
def test_extract_image_slot_raises_when_file_id_missing():
"""No silent skip — missing embeddedImageFileId must fail loudly."""
import pytest as _pt
from modules.workflows.methods.methodContext.actions.extractContent import (
presentation_envelopes_to_document_json,
PRESENTATION_KIND,
PRESENTATION_SCHEMA_VERSION,
)
class _Services:
interfaceDbComponent = None
envelope = {
"schemaVersion": PRESENTATION_SCHEMA_VERSION,
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["file_1_x.pdf"],
"files": {
"file_1_x.pdf": {
"outputMode": "lines",
"sourceFileName": "x.pdf",
"data": [
{
"id": "img1",
"typeGroup": "image",
"mimeType": "image/jpeg",
"label": "x",
}
],
}
},
}
with _pt.raises(ValueError, match="embeddedImageFileId"):
presentation_envelopes_to_document_json(
envelope, title="t", language="de", services=_Services()
)
def test_parse_presentation_parameters_noimages_drops_images_mode():
from modules.workflows.methods.methodContext.actions.extractContent import (
parse_presentation_parameters,
)
cfg = parse_presentation_parameters({"contentFilter": "noImages", "pdfExtractMode": "images"})
assert cfg["pdfExtractMode"] == "text"
def test_method_base_validate_parameters_applies_defaults_for_declared():
"""Declared parameters still get defaults applied even when undeclared keys pass through."""
from modules.workflows.methods.methodBase import MethodBase
from modules.datamodels.datamodelWorkflowActions import WorkflowActionParameter
from modules.shared.frontendTypes import FrontendType
paramDefs = {
"outputFormat": WorkflowActionParameter(
name="outputFormat", type="str", frontendType=FrontendType.TEXT,
required=False, default="docx", description="fmt",
),
}
class _Svc:
pass
mb = MethodBase.__new__(MethodBase)
mb.services = _Svc()
validated = mb._validateParameters({"unknown": "x"}, paramDefs)
assert validated["outputFormat"] == "docx"
assert validated["unknown"] == "x"