# Copyright (c) 2025 Patrick Motsch # Action node executor - maps ai.*, email.*, sharepoint.*, clickup.*, file.*, trustee.* to method actions. # # Typed Port System: no heuristic merging. Uses INPUT_EXTRACTORS for wire-handover, # DataRef for explicit parameter mapping, and _normalizeToSchema for output normalization. import json import logging import re from typing import Dict, Any, Optional from modules.features.graphicalEditor.portTypes import ( INPUT_EXTRACTORS, _normalizeToSchema, _normalizeError, _unwrapTransit, ) logger = logging.getLogger(__name__) _USER_CONNECTION_ID_RE = re.compile( r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.IGNORECASE, ) def _isUserConnectionId(val: Any) -> bool: if val is None or isinstance(val, (dict, list)): return False s = str(val).strip() return bool(_USER_CONNECTION_ID_RE.match(s)) def _getNodeDefinition(nodeType: str) -> Optional[Dict[str, Any]]: """Get node definition by type id.""" from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES for node in STATIC_NODE_TYPES: if node.get("id") == nodeType: return node return None def _resolveConnectionIdToReference(chatService, connectionId: str, services=None) -> Optional[str]: """ Resolve connectionId (UserConnection.id) to connectionReference format. connectionReference format: connection:{authority}:{externalUsername} """ if not connectionId: return None if isinstance(connectionId, str) and connectionId.startswith("connection:"): return connectionId if chatService: try: connections = chatService.getUserConnections() for c in connections or []: conn = c if isinstance(c, dict) else (c.model_dump() if hasattr(c, "model_dump") else {}) if str(conn.get("id")) == str(connectionId): authority = conn.get("authority") if hasattr(authority, "value"): authority = authority.value username = conn.get("externalUsername", "") return f"connection:{authority}:{username}" except Exception as e: logger.debug("_resolveConnectionIdToReference chatService: %s", e) app = getattr(services, "interfaceDbApp", None) if services else None if app and hasattr(app, "getUserConnectionById"): try: conn = app.getUserConnectionById(str(connectionId)) if conn: authority = getattr(conn, "authority", None) if hasattr(authority, "value"): authority = authority.value else: authority = str(authority) if authority else "outlook" username = getattr(conn, "externalUsername", "") or "" return f"connection:{authority}:{username}" except Exception as e: logger.debug("_resolveConnectionIdToReference getUserConnectionById: %s", e) return None def _buildEmailFilter(fromAddress: str = None, subjectContains: str = None, hasAttachment: bool = None) -> str: """Build Microsoft Graph API $filter string.""" parts = [] if fromAddress and str(fromAddress).strip(): safe = str(fromAddress).strip().replace("'", "''") parts.append(f"from/emailAddress/address eq '{safe}'") if subjectContains and str(subjectContains).strip(): safe = str(subjectContains).strip().replace("'", "''") parts.append(f"contains(subject,'{safe}')") if hasAttachment is True: parts.append("hasAttachments eq true") return " and ".join(parts) if parts else "" def _buildSearchQuery( query: str = None, fromAddress: str = None, toAddress: str = None, subjectContains: str = None, bodyContains: str = None, hasAttachment: bool = None, filterStr: str = None, ) -> str: """Build Microsoft Graph $search query from discrete params.""" if filterStr and str(filterStr).strip(): return str(filterStr).strip() parts = [] if query and str(query).strip(): parts.append(str(query).strip()) if fromAddress and str(fromAddress).strip(): parts.append(f'from:{str(fromAddress).strip().replace(chr(34), "")}') if toAddress and str(toAddress).strip(): parts.append(f'to:{str(toAddress).strip().replace(chr(34), "")}') if subjectContains and str(subjectContains).strip(): parts.append(f'subject:{str(subjectContains).strip().replace(chr(34), "")}') if bodyContains and str(bodyContains).strip(): parts.append(f'body:{str(bodyContains).strip().replace(chr(34), "")}') if hasAttachment is True: parts.append("hasattachments:true") return " ".join(parts) if parts else "*" def _resolveConnectionParam(params: Dict, chatService, services) -> None: """Resolve connectionReference if it looks like a UUID (UserConnection.id).""" connRef = params.get("connectionReference") if connRef and _isUserConnectionId(connRef): resolved = _resolveConnectionIdToReference(chatService, connRef, services) if resolved: params["connectionReference"] = resolved def _applyEmailCheckFilter(params: Dict) -> None: """Build filter from discrete email params for email.checkEmail.""" built = _buildEmailFilter( fromAddress=params.get("fromAddress"), subjectContains=params.get("subjectContains"), hasAttachment=params.get("hasAttachment"), ) rawFilter = (params.get("filter") or "").strip() params["filter"] = built if built else (rawFilter if rawFilter else None) for k in ("fromAddress", "subjectContains", "hasAttachment"): params.pop(k, None) def _applyEmailSearchQuery(params: Dict) -> None: """Build query from discrete email params for email.searchEmail.""" built = _buildSearchQuery( query=params.get("query"), fromAddress=params.get("fromAddress"), toAddress=params.get("toAddress"), subjectContains=params.get("subjectContains"), bodyContains=params.get("bodyContains"), hasAttachment=params.get("hasAttachment"), filterStr=params.get("filter"), ) params["query"] = built for k in ("fromAddress", "toAddress", "subjectContains", "bodyContains", "hasAttachment", "filter"): params.pop(k, None) def _wireHandover(nodeDef: Dict, inputSources: Dict, nodeOutputs: Dict, params: Dict) -> None: """Apply wire-handover: extract fields from upstream using INPUT_EXTRACTORS.""" if 0 not in inputSources: logger.debug("_wireHandover: no port 0 in inputSources=%s", inputSources) return srcId, _ = inputSources[0] upstream = nodeOutputs.get(srcId) if not upstream or not isinstance(upstream, dict): logger.debug("_wireHandover: upstream for %s is missing or not dict: %s", srcId, type(upstream)) return data = _unwrapTransit(upstream) if not isinstance(data, dict): logger.debug("_wireHandover: unwrapped data is not dict: %s", type(data)) return inputPorts = nodeDef.get("inputPorts", {}) port0 = inputPorts.get(0, {}) accepts = port0.get("accepts", []) logger.debug("_wireHandover: srcId=%s accepts=%s upstream_keys=%s params_keys_before=%s", srcId, accepts, list(data.keys()), list(params.keys())) for schemaName in accepts: if schemaName == "Transit": continue extractor = INPUT_EXTRACTORS.get(schemaName) if extractor: extracted = extractor(data) logger.debug("_wireHandover: extractor %s returned keys=%s", schemaName, list(extracted.keys()) if extracted else None) if extracted: for k, v in extracted.items(): existing = params.get(k) if not existing: params[k] = v logger.debug("_wireHandover: set %s (was empty/missing) type=%s len=%s", k, type(v).__name__, len(v) if isinstance(v, (list, str, dict)) else "n/a") else: logger.debug("_wireHandover: skip %s (already has value, type=%s)", k, type(existing).__name__) return def _getOutputSchemaName(nodeDef: Dict) -> str: """Get the output schema name from the node definition.""" outputPorts = nodeDef.get("outputPorts", {}) port0 = outputPorts.get(0, {}) return port0.get("schema", "ActionResult") class ActionNodeExecutor: """Execute action nodes by mapping to method actions via ActionExecutor.""" def __init__(self, services: Any): self.services = services async def execute( self, node: Dict[str, Any], context: Dict[str, Any], ) -> Any: from modules.features.graphicalEditor.nodeRegistry import getNodeTypeToMethodAction from modules.workflows.automation2.graphUtils import resolveParameterReferences from modules.workflows.processing.core.actionExecutor import ActionExecutor nodeType = node.get("type", "") nodeId = node.get("id", "") logger.info("ActionNodeExecutor node %s type=%s", nodeId, nodeType) mapping = getNodeTypeToMethodAction() methodAction = mapping.get(nodeType) if not methodAction: logger.debug("ActionNodeExecutor node %s not in mapping -> None", nodeId) return None methodName, actionName = methodAction nodeDef = _getNodeDefinition(nodeType) or {} outputSchema = _getOutputSchemaName(nodeDef) # 1. Resolve parameters (DataRef, SystemVar, Static) params = dict(node.get("parameters") or {}) logger.debug("ActionNodeExecutor node %s raw params keys=%s", nodeId, list(params.keys())) resolvedParams = resolveParameterReferences(params, context.get("nodeOutputs", {})) logger.debug("ActionNodeExecutor node %s resolved params keys=%s documentList_present=%s documentList_type=%s", nodeId, list(resolvedParams.keys()), "documentList" in resolvedParams, type(resolvedParams.get("documentList")).__name__) # 2. Wire-handover via extractors (fills missing params from upstream) inputSources = context.get("inputSources", {}).get(nodeId, {}) _wireHandover(nodeDef, inputSources, context.get("nodeOutputs", {}), resolvedParams) logger.debug("ActionNodeExecutor node %s after wireHandover: params keys=%s documentList_present=%s documentList_type=%s", nodeId, list(resolvedParams.keys()), "documentList" in resolvedParams, type(resolvedParams.get("documentList")).__name__) # 3. Apply defaults from parameter definitions for pDef in nodeDef.get("parameters", []): pName = pDef.get("name") if pName and pName not in resolvedParams and "default" in pDef: resolvedParams[pName] = pDef["default"] # 4. Resolve connectionReference chatService = getattr(self.services, "chat", None) _resolveConnectionParam(resolvedParams, chatService, self.services) # 5. Node-type-specific param transformations if nodeType == "email.checkEmail": _applyEmailCheckFilter(resolvedParams) elif nodeType == "email.searchEmail": _applyEmailSearchQuery(resolvedParams) elif nodeType == "clickup.updateTask": from modules.workflows.automation2.clickupTaskUpdateMerge import merge_clickup_task_update_entries merge_clickup_task_update_entries(resolvedParams) # 6. email.checkEmail pause for email wait if nodeType == "email.checkEmail": runId = context.get("_runId") workflowId = context.get("workflowId") connRef = resolvedParams.get("connectionReference") if runId and workflowId and connRef: from modules.workflows.automation2.executors import PauseForEmailWaitError waitConfig = { "connectionReference": connRef, "folder": resolvedParams.get("folder", "Inbox"), "limit": min(int(resolvedParams.get("limit") or 10), 50), "filter": resolvedParams.get("filter"), } raise PauseForEmailWaitError(runId=runId, nodeId=nodeId, waitConfig=waitConfig) # 7. AI nodes: normalize legacy "prompt" -> "aiPrompt" if nodeType == "ai.prompt": if "aiPrompt" not in resolvedParams and "prompt" in resolvedParams: resolvedParams["aiPrompt"] = resolvedParams.pop("prompt") # 8. Build context for email.draftEmail from subject + body if nodeType == "email.draftEmail": subject = resolvedParams.get("subject", "") body = resolvedParams.get("body", "") if subject or body: contextParts = [] if subject: contextParts.append(f"Subject: {subject}") if body: contextParts.append(f"Body:\n{body}") resolvedParams["context"] = "\n\n".join(contextParts) resolvedParams.pop("subject", None) resolvedParams.pop("body", None) # 9. Execute action logger.info("ActionNodeExecutor node %s calling %s.%s with %d params", nodeId, methodName, actionName, len(resolvedParams)) try: executor = ActionExecutor(self.services) result = await executor.executeAction(methodName, actionName, resolvedParams) except Exception as e: logger.exception("ActionNodeExecutor node %s FAILED: %s", nodeId, e) return _normalizeError(e, outputSchema) # 10. Persist generated documents as files and build JSON-safe output docsList = [] for d in (result.documents or []): dumped = d.model_dump() if hasattr(d, "model_dump") else dict(d) if isinstance(d, dict) else d rawData = getattr(d, "documentData", None) if hasattr(d, "documentData") else (dumped.get("documentData") if isinstance(dumped, dict) else None) if isinstance(dumped, dict) and isinstance(rawData, bytes) and len(rawData) > 0: try: from modules.interfaces.interfaceDbManagement import getInterface as _getMgmtInterface from modules.security.rootAccess import getRootUser _userId = context.get("userId") _mandateId = context.get("mandateId") _instanceId = context.get("instanceId") _mgmt = _getMgmtInterface(getRootUser(), mandateId=_mandateId, featureInstanceId=_instanceId) _docName = dumped.get("documentName") or f"workflow-result-{nodeId}.bin" _mimeType = dumped.get("mimeType") or "application/octet-stream" _fileItem = _mgmt.createFile(_docName, _mimeType, rawData) _mgmt.createFileData(_fileItem.id, rawData) dumped["fileId"] = _fileItem.id dumped["id"] = _fileItem.id dumped["fileName"] = _fileItem.fileName logger.info("Persisted workflow document %s as file %s", _docName, _fileItem.id) except Exception as _fe: logger.warning("Could not persist workflow document: %s", _fe) dumped["documentData"] = None dumped["_hasBinaryData"] = True docsList.append(dumped) extractedContext = "" if result.documents: doc = result.documents[0] raw = getattr(doc, "documentData", None) if hasattr(doc, "documentData") else (doc.get("documentData") if isinstance(doc, dict) else None) if isinstance(raw, bytes): try: extractedContext = raw.decode("utf-8").strip() except (UnicodeDecodeError, ValueError): extractedContext = "" elif raw: extractedContext = str(raw).strip() promptText = str(resolvedParams.get("aiPrompt") or resolvedParams.get("prompt") or "").strip() resultData = getattr(result, "data", None) if resultData and isinstance(resultData, dict): dataField = resultData elif hasattr(result, "model_dump"): dataField = result.model_dump() else: dataField = {"success": result.success, "error": result.error} out = { "success": result.success, "error": result.error, "documents": docsList, "documentList": docsList, "data": dataField, } if nodeType.startswith("ai."): out["prompt"] = promptText out["response"] = extractedContext out["context"] = f"{promptText}\n\n{extractedContext}" if promptText and extractedContext else (extractedContext or promptText) # Structured output if extractedContext: try: parsed = json.loads(extractedContext) if isinstance(parsed, dict): out["responseData"] = parsed except (json.JSONDecodeError, TypeError): pass if nodeType.startswith("clickup.") and result.success and docsList: try: d0 = docsList[0] if isinstance(docsList[0], dict) else {} raw = d0.get("documentData") if isinstance(raw, str) and raw.strip(): parsed = json.loads(raw) if isinstance(parsed, dict) and parsed.get("id") is not None: out["taskId"] = str(parsed["id"]) out["task"] = parsed except (json.JSONDecodeError, TypeError, ValueError): pass if outputSchema == "ConsolidateResult" and nodeType == "ai.consolidate": data_dict = result.data if isinstance(getattr(result, "data", None), dict) else {} cr_out = { "result": data_dict.get("result", ""), "mode": data_dict.get("mode", resolvedParams.get("mode", "summarize")), "count": int(data_dict.get("count", 0)), } return _normalizeToSchema(cr_out, outputSchema) return _normalizeToSchema(out, outputSchema)