# Copyright (c) 2025 Patrick Motsch """ Graph helpers for Pick-not-Push: materialize typed DataRefs before executeGraph runs. - ``materializeConnectionRefs``: empty ``connectionReference`` from upstream connection provenance. - ``materializePrimaryTextHandover``: parameters whose static definition includes ``graphInherit.kind == "primaryTextRef"`` (canonical paths: ``PRIMARY_TEXT_HANDOVER_REF_PATH``). - ``materializeRecommendedDataPickRef``: parameters with ``graphInherit.kind == "recommendedDataPickRef"`` use the upstream output port's ``dataPickOptions`` entry with ``recommended: true``. Runtime: executeGraph deep-copies the version graph and applies these passes in order. """ from __future__ import annotations import copy import logging from typing import Any, Dict, List, Optional from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES from modules.features.graphicalEditor.portTypes import ( PRIMARY_TEXT_HANDOVER_REF_PATH, resolve_output_schema_name, ) from modules.workflows.automation2.graphUtils import buildConnectionMap, getInputSources logger = logging.getLogger(__name__) _NODE_DEF_BY_ID = {n["id"]: n for n in STATIC_NODE_TYPES} _SCHEMAS_WITH_CONNECTION = frozenset( {"FileList", "DocumentList", "EmailList", "TaskList", "EmailDraft", "UdmDocument"}, ) def _data_ref(node_id: str, path: List[Any]) -> Dict[str, Any]: return {"type": "ref", "nodeId": node_id, "path": list(path)} def materializeConnectionRefs(graph: Dict[str, Any]) -> Dict[str, Any]: """ Deep-copy graph and set empty connectionReference (userConnection params) to DataRef { nodeId: upstreamPort0, path: ['connection','id'] } when upstream output schema carries connection provenance. """ g = copy.deepcopy(graph) nodes: List[Dict[str, Any]] = g.get("nodes") or [] connections = g.get("connections") or [] if not nodes: return g conn_map = buildConnectionMap(connections) node_by_id = {n["id"]: n for n in nodes if n.get("id")} for node in nodes: nid = node.get("id") ntype = node.get("type") if not nid or not ntype: continue node_def = _NODE_DEF_BY_ID.get(ntype) if not node_def: continue pdefs = node_def.get("parameters") or [] has_conn = any( p.get("name") == "connectionReference" and p.get("frontendType") == "userConnection" for p in pdefs ) if not has_conn: continue params = node.get("parameters") if not isinstance(params, dict): node["parameters"] = {} params = node["parameters"] cur = params.get("connectionReference") if cur not in (None, "", {}): continue input_sources = getInputSources(nid, conn_map) if 0 not in input_sources: continue src_id, _ = input_sources[0] src_node = node_by_id.get(src_id) or {} src_def = _NODE_DEF_BY_ID.get(src_node.get("type") or "") if not src_def: continue out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {} out_schema = resolve_output_schema_name(src_node, out_port if isinstance(out_port, dict) else {}) if out_schema not in _SCHEMAS_WITH_CONNECTION: continue params["connectionReference"] = _data_ref(src_id, ["connection", "id"]) logger.debug("materializeConnectionRefs: %s.connectionReference -> ref %s.connection.id", nid, src_id) return g def _slot_empty_for_primary_text_inherit(val: Any) -> bool: return val is None or val == "" or val == [] def materializePrimaryTextHandover(graph: Dict[str, Any]) -> Dict[str, Any]: """ For parameters declaring ``graphInherit.kind == "primaryTextRef"`` (optional ``port``, default 0) with an empty value, set an explicit ``DataRef`` to the canonical text field of the producer on that port (see ``PRIMARY_TEXT_HANDOVER_REF_PATH`` keyed by upstream output schema name). """ g = copy.deepcopy(graph) nodes: List[Dict[str, Any]] = g.get("nodes") or [] connections = g.get("connections") or [] if not nodes: return g conn_map = buildConnectionMap(connections) node_by_id = {n["id"]: n for n in nodes if n.get("id")} for node in nodes: nid = node.get("id") ntype = node.get("type") if not nid or not ntype: continue node_def = _NODE_DEF_BY_ID.get(ntype) if not node_def: continue params = node.get("parameters") if not isinstance(params, dict): node["parameters"] = {} params = node["parameters"] for pdef in node_def.get("parameters") or []: gi = pdef.get("graphInherit") if not isinstance(gi, dict) or gi.get("kind") != "primaryTextRef": continue pname = pdef.get("name") if not pname: continue port_ix = int(gi.get("port", 0)) if not _slot_empty_for_primary_text_inherit(params.get(pname)): continue input_sources = getInputSources(nid, conn_map) if port_ix not in input_sources: continue src_id, _ = input_sources[port_ix] src_node = node_by_id.get(src_id) or {} src_def = _NODE_DEF_BY_ID.get(src_node.get("type") or "") if not src_def: continue out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {} out_schema = resolve_output_schema_name(src_node, out_port if isinstance(out_port, dict) else {}) # Port-level override takes precedence over the schema-wide default path. # Example: context.extractContent sets primaryTextRefPath=["data"] because # its ``response`` field is intentionally empty. ref_path = ( out_port.get("primaryTextRefPath") if isinstance(out_port, dict) and out_port.get("primaryTextRefPath") else PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema) ) if not ref_path: continue params[pname] = _data_ref(src_id, list(ref_path)) logger.debug( "materializePrimaryTextHandover: %s.%s -> ref %s path=%s", nid, pname, src_id, ref_path, ) return g def _recommended_data_pick_path(out_port: Dict[str, Any]) -> Optional[List[Any]]: opts = out_port.get("dataPickOptions") if isinstance(out_port, dict) else None if not isinstance(opts, list): return None for opt in opts: if not isinstance(opt, dict): continue if opt.get("recommended") is True: path = opt.get("path") if isinstance(path, list) and path: return list(path) return None def materializeRecommendedDataPickRef(graph: Dict[str, Any]) -> Dict[str, Any]: """Materialize empty parameters that declare ``graphInherit.kind == \"recommendedDataPickRef\"``.""" g = copy.deepcopy(graph) nodes: List[Dict[str, Any]] = g.get("nodes") or [] connections = g.get("connections") or [] if not nodes: return g conn_map = buildConnectionMap(connections) node_by_id = {n["id"]: n for n in nodes if n.get("id")} for node in nodes: nid = node.get("id") ntype = node.get("type") if not nid or not ntype: continue node_def = _NODE_DEF_BY_ID.get(ntype) if not node_def: continue params = node.get("parameters") if not isinstance(params, dict): node["parameters"] = {} params = node["parameters"] for pdef in node_def.get("parameters") or []: gi = pdef.get("graphInherit") if not isinstance(gi, dict) or gi.get("kind") != "recommendedDataPickRef": continue pname = pdef.get("name") if not pname: continue port_ix = int(gi.get("port", 0)) if not _slot_empty_for_primary_text_inherit(params.get(pname)): continue input_sources = getInputSources(nid, conn_map) if port_ix not in input_sources: continue src_id, _ = input_sources[port_ix] src_node = node_by_id.get(src_id) or {} src_def = _NODE_DEF_BY_ID.get(src_node.get("type") or "") if not src_def: continue out_port = (src_def.get("outputPorts") or {}).get(port_ix, {}) or {} if not isinstance(out_port, dict): out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {} ref_path = _recommended_data_pick_path(out_port if isinstance(out_port, dict) else {}) if not ref_path: continue ref = _data_ref(src_id, ref_path) if pdef.get("frontendType") == "contextBuilder": params[pname] = [ref] else: params[pname] = ref logger.debug( "materializeRecommendedDataPickRef: %s.%s -> ref %s path=%s", nid, pname, src_id, ref_path, ) return g _STALE_FILE_CREATE_CONTEXT_PATHS = frozenset({ ("responseData",), ("response",), ("merged",), ("documents", 0, "documentData"), }) def remap_stale_presentation_ref_path(path: List[Any]) -> List[Any]: """Map legacy text-handover paths to unified presentation ``data``.""" if tuple(path) in _STALE_FILE_CREATE_CONTEXT_PATHS: return ["data"] return list(path) def _normalize_presentation_refs_in_value(val: Any) -> Any: """Rewrite stale ref paths inside ``contextBuilder`` lists or bare refs.""" if isinstance(val, dict) and val.get("type") == "ref": path = val.get("path") if isinstance(path, list) and path: new_path = remap_stale_presentation_ref_path(path) if new_path != path: return {**val, "path": new_path} return val if isinstance(val, list): return [_normalize_presentation_refs_in_value(item) for item in val] return val def normalizeFileCreatePresentationRefs(graph: Dict[str, Any]) -> Dict[str, Any]: """Remap legacy ``file.create`` context refs to unified presentation ``data``.""" g = copy.deepcopy(graph) nodes: List[Dict[str, Any]] = g.get("nodes") or [] for node in nodes: if node.get("type") != "file.create": continue params = node.get("parameters") if not isinstance(params, dict): continue ctx = params.get("context") if ctx in (None, "", []): continue normalized = _normalize_presentation_refs_in_value(ctx) if normalized != ctx: params["context"] = normalized logger.debug( "normalizeFileCreatePresentationRefs: %s.context remapped to presentation data ref", node.get("id"), ) return g