# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ActionToolAdapter: wraps existing workflow actions (dynamicMode=True) as agent tools.""" import logging from typing import Dict, Any, List, Optional from modules.serviceCenter.services.serviceAgent.datamodelAgent import ( ToolDefinition, ToolResult ) from modules.serviceCenter.services.serviceAgent.toolRegistry import ToolRegistry logger = logging.getLogger(__name__) class ActionToolAdapter: """Wraps existing Workflow-Actions as Agent-Tools. Iterates over discovered methods, finds actions with dynamicMode=True, and registers them in the ToolRegistry with a compound name (method.action). """ def __init__(self, actionExecutor): self._actionExecutor = actionExecutor self._registeredTools: List[str] = [] def registerAll(self, toolRegistry: ToolRegistry): """Discover and register all dynamicMode actions as agent tools.""" from modules.workflows.processing.shared.methodDiscovery import methods registered = 0 for methodName, methodInfo in methods.items(): if not methodName[0].isupper(): continue shortName = methodName.replace("Method", "").lower() methodInstance = methodInfo["instance"] for actionName, actionInfo in methodInfo["actions"].items(): actionDef = methodInstance._actions.get(actionName) if not actionDef or not getattr(actionDef, "dynamicMode", False): continue compoundName = f"{shortName}_{actionName}" toolDef = _buildToolDefinition(compoundName, actionDef, actionInfo) handler = _createDispatchHandler(self._actionExecutor, shortName, actionName, self._actionExecutor.services) toolRegistry.registerFromDefinition(toolDef, handler) self._registeredTools.append(compoundName) registered += 1 logger.info(f"ActionToolAdapter: registered {registered} tools from workflow actions") @property def registeredTools(self) -> List[str]: """Names of all tools registered by this adapter.""" return list(self._registeredTools) def _buildToolDefinition(compoundName: str, actionDef, actionInfo: Dict[str, Any]) -> ToolDefinition: """Build a ToolDefinition from a WorkflowActionDefinition.""" parameters = _convertParameterSchema(actionInfo.get("parameters", {})) return ToolDefinition( name=compoundName, description=actionDef.description or actionInfo.get("description", ""), parameters=parameters, readOnly=False ) def _convertParameterSchema(actionParams: Dict[str, Any]) -> Dict[str, Any]: """Convert workflow action parameter schema to JSON Schema for tool definitions. Schicht-3 Adapter (typed): looks up each parameter's `type` against the PORT_TYPE_CATALOG and produces a strict JSON Schema fragment. Falls back to a generic string schema only when the type is fully unknown (which should never happen after Phase 2's signature validator). """ properties: Dict[str, Any] = {} required: List[str] = [] for paramName, paramInfo in actionParams.items(): if not isinstance(paramInfo, dict): properties[paramName] = {"type": "string", "description": ""} continue paramType = paramInfo.get("type", "str") paramDesc = paramInfo.get("description", "") or "" paramRequired = bool(paramInfo.get("required", False)) prop = _catalogTypeToJsonSchema(paramType) if paramDesc: prop["description"] = paramDesc properties[paramName] = prop if paramRequired: required.append(paramName) return { "type": "object", "properties": properties, "required": required, } # Primitive Python type strings → JSON Schema scalar types. _PRIMITIVE_JSON_TYPE: Dict[str, str] = { "str": "string", "int": "integer", "float": "number", "bool": "boolean", } def _catalogTypeToJsonSchema(typeStr: str, _depth: int = 0) -> Dict[str, Any]: """Recursively convert a PORT_TYPE_CATALOG type reference into a JSON Schema fragment. Supports: - Primitives (str/int/bool/float/Any) - Catalog object schemas (recursively expanded with properties/required) - List[X] (array with typed items) - Dict[K, V] (object with typed additionalProperties) `_depth` guards against pathological recursion in case of a cyclic catalog. """ from modules.features.graphicalEditor.portTypes import ( PORT_TYPE_CATALOG, PRIMITIVE_TYPES, ) if _depth > 6: return {"type": "object", "description": "(max-depth)"} if not typeStr or not isinstance(typeStr, str): return {"type": "string"} typeStr = typeStr.strip() if typeStr in _PRIMITIVE_JSON_TYPE: return {"type": _PRIMITIVE_JSON_TYPE[typeStr]} if typeStr == "Any": return {} if typeStr.startswith("List[") and typeStr.endswith("]"): inner = typeStr[5:-1].strip() return {"type": "array", "items": _catalogTypeToJsonSchema(inner, _depth + 1)} if typeStr.startswith("Dict[") and typeStr.endswith("]"): inner = typeStr[5:-1].strip() valueType = "Any" parts = [p.strip() for p in inner.split(",", 1)] if len(parts) == 2: valueType = parts[1] return { "type": "object", "additionalProperties": _catalogTypeToJsonSchema(valueType, _depth + 1), } schema = PORT_TYPE_CATALOG.get(typeStr) if schema is not None: props: Dict[str, Any] = {} required: List[str] = [] for f in schema.fields: fragment = _catalogTypeToJsonSchema(f.type, _depth + 1) if f.description: fragment["description"] = f.description if f.enumValues: fragment["enum"] = list(f.enumValues) props[f.name] = fragment if f.required: required.append(f.name) out: Dict[str, Any] = { "type": "object", "properties": props, "description": f"PORT_TYPE_CATALOG schema '{schema.name}'", } if required: out["required"] = required return out # Lowercase 'list' / 'dict' aliases (legacy, should be eradicated by Phase 2 validator) if typeStr in PRIMITIVE_TYPES and typeStr in {"List", "Dict"}: return {"type": "array" if typeStr == "List" else "object"} return {"type": "string", "description": f"unknown type '{typeStr}' (defaulted to string)"} def _createDispatchHandler(actionExecutor, methodName: str, actionName: str, services=None): """Create an async handler that dispatches to the ActionExecutor. Parameter validation and Ref-payload normalization (collapsing ``{id: ..., featureCode: ...}`` from the agent's typed tool schema to the bare UUID expected by action implementations) happen centrally inside ``ActionExecutor.executeAction`` via ``parameterValidation``. This keeps a single source of truth for the action parameter contract regardless of caller (agent, workflow graph, REST route). """ async def _handler(args: Dict[str, Any], context: Dict[str, Any]) -> ToolResult: try: if context: if "featureInstanceId" not in args and context.get("featureInstanceId"): args["featureInstanceId"] = context["featureInstanceId"] if "mandateId" not in args and context.get("mandateId"): args["mandateId"] = context["mandateId"] if "parentOperationId" not in args: import time as _time toolOpId = f"agentTool_{methodName}_{actionName}_{int(_time.time())}" chatSvc = getattr(services, "chat", None) if services else None if chatSvc: try: chatSvc.progressLogStart(toolOpId, methodName.capitalize(), actionName, "Agent tool") except Exception: pass args["parentOperationId"] = toolOpId else: toolOpId = None chatSvc = None result = await actionExecutor.executeAction(methodName, actionName, args) if toolOpId and chatSvc: try: chatSvc.progressLogFinish(toolOpId, result.success) except Exception: pass data, sideEvents = _formatActionResult(result, services, context) return ToolResult( toolCallId="", toolName=f"{methodName}_{actionName}", success=result.success, data=data, error=result.error, sideEvents=sideEvents or None, ) except Exception as e: if toolOpId and chatSvc: try: chatSvc.progressLogFinish(toolOpId, False) except Exception: pass logger.error(f"ActionToolAdapter dispatch failed for {methodName}_{actionName}: {e}") return ToolResult( toolCallId="", toolName=f"{methodName}_{actionName}", success=False, error=str(e) ) return _handler _INLINE_CONTENT_LIMIT = 2000 def _persistLargeDocument(doc, services, context: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Save an ActionDocument as a workspace file. Handles both str and bytes documentData. Returns a dict with 'line' (formatted result) and 'fileInfo' (for sideEvents), or None if persistence is not possible. """ if not services: return None chatService = getattr(services, "chat", None) if not chatService: return None docData = getattr(doc, "documentData", None) if not docData: return None if isinstance(docData, bytes): docBytes = docData elif isinstance(docData, str): docBytes = docData.encode("utf-8") else: return None docName = getattr(doc, "documentName", "unnamed") docMime = getattr(doc, "mimeType", "application/octet-stream") try: fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(docBytes, docName) from modules.serviceCenter.services.serviceAgent.coreTools._helpers import ( _attachFileAsChatDocument, _formatToolFileResult, _getOrCreateTempFolder, ) updateFields = {} tempFolderId = _getOrCreateTempFolder(chatService) if tempFolderId: updateFields["folderId"] = tempFolderId fiId = context.get("featureInstanceId") or getattr(services, "featureInstanceId", "") if fiId: updateFields["featureInstanceId"] = fiId updateFields["scope"] = "featureInstance" mandateId = context.get("mandateId") or getattr(services, "mandateId", "") if mandateId: updateFields["mandateId"] = mandateId if updateFields: logger.debug("_persistLargeDocument: updating file %s with %s", fileItem.id, updateFields) chatService.interfaceDbComponent.updateFile(fileItem.id, updateFields) else: logger.warning("_persistLargeDocument: no updateFields for file %s (tempFolderId=%s, fiId=%s)", fileItem.id, tempFolderId, fiId) chatDocId = _attachFileAsChatDocument( services, fileItem, label=f"action_doc:{docName}", userMessage=f"Action document: {docName}", ) line = _formatToolFileResult( fileItem=fileItem, chatDocId=chatDocId, actionLabel="Produced", extraInfo="Use readFile to read the content.", ) return { "line": line, "fileInfo": { "fileId": fileItem.id, "fileName": docName, "mimeType": docMime, "fileSize": len(docBytes), }, } except Exception as e: logger.error(f"_persistLargeDocument failed for {docName}: {e}", exc_info=True) return None def _formatActionResult(result, services=None, context: Optional[Dict[str, Any]] = None): """Format an ActionResult into a text representation for the agent. Documents whose content exceeds the inline limit (or is binary bytes) are persisted as workspace files. Returns (str, list) – the formatted text and a list of sideEvent dicts. """ parts = [] sideEvents = [] ctx = context or {} if result.resultLabel: parts.append(f"Result: {result.resultLabel}") if result.error: parts.append(f"Error: {result.error}") if result.documents: parts.append(f"Documents ({len(result.documents)}):") for doc in result.documents: docName = getattr(doc, "documentName", "unnamed") docType = getattr(doc, "mimeType", "unknown") docData = getattr(doc, "documentData", None) needsPersist = ( (isinstance(docData, bytes) and len(docData) > 0) or (isinstance(docData, str) and len(docData) >= _INLINE_CONTENT_LIMIT) ) if needsPersist: persisted = _persistLargeDocument(doc, services, ctx) if persisted: parts.append(f" - {docName} ({docType})") parts.append(f" {persisted['line']}") sideEvents.append({ "type": "fileCreated", "data": persisted["fileInfo"], }) continue logger.error(f"Document '{docName}' ({docType}, {len(docData)} bytes) could not be persisted") parts.append(f" - {docName} ({docType}) [ERROR: persistence failed]") continue parts.append(f" - {docName} ({docType})") if docData and isinstance(docData, str) and len(docData) < _INLINE_CONTENT_LIMIT: parts.append(f" Content: {docData[:_INLINE_CONTENT_LIMIT]}") if not parts: parts.append("Action completed successfully." if result.success else "Action failed.") return "\n".join(parts), sideEvents