377 lines
14 KiB
Python
377 lines
14 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
||
# All rights reserved.
|
||
"""ActionToolAdapter: wraps existing workflow actions (dynamicMode=True) as agent tools."""
|
||
|
||
import logging
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
from modules.serviceCenter.services.serviceAgent.datamodelAgent import (
|
||
ToolDefinition, ToolResult
|
||
)
|
||
from modules.serviceCenter.services.serviceAgent.toolRegistry import ToolRegistry
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ActionToolAdapter:
|
||
"""Wraps existing Workflow-Actions as Agent-Tools.
|
||
|
||
Iterates over discovered methods, finds actions with dynamicMode=True,
|
||
and registers them in the ToolRegistry with a compound name (method.action).
|
||
"""
|
||
|
||
def __init__(self, actionExecutor):
|
||
self._actionExecutor = actionExecutor
|
||
self._registeredTools: List[str] = []
|
||
|
||
def registerAll(self, toolRegistry: ToolRegistry):
|
||
"""Discover and register all dynamicMode actions as agent tools."""
|
||
from modules.workflows.processing.shared.methodDiscovery import methods
|
||
|
||
registered = 0
|
||
for methodName, methodInfo in methods.items():
|
||
if not methodName[0].isupper():
|
||
continue
|
||
|
||
shortName = methodName.replace("Method", "").lower()
|
||
methodInstance = methodInfo["instance"]
|
||
|
||
for actionName, actionInfo in methodInfo["actions"].items():
|
||
actionDef = methodInstance._actions.get(actionName)
|
||
if not actionDef or not getattr(actionDef, "dynamicMode", False):
|
||
continue
|
||
|
||
compoundName = f"{shortName}_{actionName}"
|
||
toolDef = _buildToolDefinition(compoundName, actionDef, actionInfo)
|
||
|
||
handler = _createDispatchHandler(self._actionExecutor, shortName, actionName, self._actionExecutor.services)
|
||
toolRegistry.registerFromDefinition(toolDef, handler)
|
||
self._registeredTools.append(compoundName)
|
||
registered += 1
|
||
|
||
logger.info(f"ActionToolAdapter: registered {registered} tools from workflow actions")
|
||
|
||
@property
|
||
def registeredTools(self) -> List[str]:
|
||
"""Names of all tools registered by this adapter."""
|
||
return list(self._registeredTools)
|
||
|
||
|
||
def _buildToolDefinition(compoundName: str, actionDef, actionInfo: Dict[str, Any]) -> ToolDefinition:
|
||
"""Build a ToolDefinition from a WorkflowActionDefinition."""
|
||
parameters = _convertParameterSchema(actionInfo.get("parameters", {}))
|
||
|
||
return ToolDefinition(
|
||
name=compoundName,
|
||
description=actionDef.description or actionInfo.get("description", ""),
|
||
parameters=parameters,
|
||
readOnly=False
|
||
)
|
||
|
||
|
||
def _convertParameterSchema(actionParams: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Convert workflow action parameter schema to JSON Schema for tool definitions.
|
||
|
||
Schicht-3 Adapter (typed): looks up each parameter's `type` against the
|
||
PORT_TYPE_CATALOG and produces a strict JSON Schema fragment.
|
||
Falls back to a generic string schema only when the type is fully unknown
|
||
(which should never happen after Phase 2's signature validator).
|
||
"""
|
||
properties: Dict[str, Any] = {}
|
||
required: List[str] = []
|
||
|
||
for paramName, paramInfo in actionParams.items():
|
||
if not isinstance(paramInfo, dict):
|
||
properties[paramName] = {"type": "string", "description": ""}
|
||
continue
|
||
|
||
paramType = paramInfo.get("type", "str")
|
||
paramDesc = paramInfo.get("description", "") or ""
|
||
paramRequired = bool(paramInfo.get("required", False))
|
||
|
||
prop = _catalogTypeToJsonSchema(paramType)
|
||
if paramDesc:
|
||
prop["description"] = paramDesc
|
||
properties[paramName] = prop
|
||
|
||
if paramRequired:
|
||
required.append(paramName)
|
||
|
||
return {
|
||
"type": "object",
|
||
"properties": properties,
|
||
"required": required,
|
||
}
|
||
|
||
|
||
# Primitive Python type strings → JSON Schema scalar types.
|
||
_PRIMITIVE_JSON_TYPE: Dict[str, str] = {
|
||
"str": "string",
|
||
"int": "integer",
|
||
"float": "number",
|
||
"bool": "boolean",
|
||
}
|
||
|
||
|
||
def _catalogTypeToJsonSchema(typeStr: str, _depth: int = 0) -> Dict[str, Any]:
|
||
"""Recursively convert a PORT_TYPE_CATALOG type reference into a JSON Schema fragment.
|
||
|
||
Supports:
|
||
- Primitives (str/int/bool/float/Any)
|
||
- Catalog object schemas (recursively expanded with properties/required)
|
||
- List[X] (array with typed items)
|
||
- Dict[K, V] (object with typed additionalProperties)
|
||
|
||
`_depth` guards against pathological recursion in case of a cyclic catalog.
|
||
"""
|
||
from modules.features.graphicalEditor.portTypes import (
|
||
PORT_TYPE_CATALOG,
|
||
PRIMITIVE_TYPES,
|
||
)
|
||
|
||
if _depth > 6:
|
||
return {"type": "object", "description": "(max-depth)"}
|
||
|
||
if not typeStr or not isinstance(typeStr, str):
|
||
return {"type": "string"}
|
||
|
||
typeStr = typeStr.strip()
|
||
|
||
if typeStr in _PRIMITIVE_JSON_TYPE:
|
||
return {"type": _PRIMITIVE_JSON_TYPE[typeStr]}
|
||
if typeStr == "Any":
|
||
return {}
|
||
|
||
if typeStr.startswith("List[") and typeStr.endswith("]"):
|
||
inner = typeStr[5:-1].strip()
|
||
return {"type": "array", "items": _catalogTypeToJsonSchema(inner, _depth + 1)}
|
||
|
||
if typeStr.startswith("Dict[") and typeStr.endswith("]"):
|
||
inner = typeStr[5:-1].strip()
|
||
valueType = "Any"
|
||
parts = [p.strip() for p in inner.split(",", 1)]
|
||
if len(parts) == 2:
|
||
valueType = parts[1]
|
||
return {
|
||
"type": "object",
|
||
"additionalProperties": _catalogTypeToJsonSchema(valueType, _depth + 1),
|
||
}
|
||
|
||
schema = PORT_TYPE_CATALOG.get(typeStr)
|
||
if schema is not None:
|
||
props: Dict[str, Any] = {}
|
||
required: List[str] = []
|
||
for f in schema.fields:
|
||
fragment = _catalogTypeToJsonSchema(f.type, _depth + 1)
|
||
if f.description:
|
||
fragment["description"] = f.description
|
||
if f.enumValues:
|
||
fragment["enum"] = list(f.enumValues)
|
||
props[f.name] = fragment
|
||
if f.required:
|
||
required.append(f.name)
|
||
out: Dict[str, Any] = {
|
||
"type": "object",
|
||
"properties": props,
|
||
"description": f"PORT_TYPE_CATALOG schema '{schema.name}'",
|
||
}
|
||
if required:
|
||
out["required"] = required
|
||
return out
|
||
|
||
# Lowercase 'list' / 'dict' aliases (legacy, should be eradicated by Phase 2 validator)
|
||
if typeStr in PRIMITIVE_TYPES and typeStr in {"List", "Dict"}:
|
||
return {"type": "array" if typeStr == "List" else "object"}
|
||
|
||
return {"type": "string", "description": f"unknown type '{typeStr}' (defaulted to string)"}
|
||
|
||
|
||
def _createDispatchHandler(actionExecutor, methodName: str, actionName: str, services=None):
|
||
"""Create an async handler that dispatches to the ActionExecutor.
|
||
|
||
Parameter validation and Ref-payload normalization (collapsing
|
||
``{id: ..., featureCode: ...}`` from the agent's typed tool schema to the
|
||
bare UUID expected by action implementations) happen centrally inside
|
||
``ActionExecutor.executeAction`` via ``parameterValidation``. This keeps
|
||
a single source of truth for the action parameter contract regardless
|
||
of caller (agent, workflow graph, REST route).
|
||
"""
|
||
async def _handler(args: Dict[str, Any], context: Dict[str, Any]) -> ToolResult:
|
||
try:
|
||
if context:
|
||
if "featureInstanceId" not in args and context.get("featureInstanceId"):
|
||
args["featureInstanceId"] = context["featureInstanceId"]
|
||
if "mandateId" not in args and context.get("mandateId"):
|
||
args["mandateId"] = context["mandateId"]
|
||
if "parentOperationId" not in args:
|
||
import time as _time
|
||
toolOpId = f"agentTool_{methodName}_{actionName}_{int(_time.time())}"
|
||
chatSvc = getattr(services, "chat", None) if services else None
|
||
if chatSvc:
|
||
try:
|
||
chatSvc.progressLogStart(toolOpId, methodName.capitalize(), actionName, "Agent tool")
|
||
except Exception:
|
||
pass
|
||
args["parentOperationId"] = toolOpId
|
||
else:
|
||
toolOpId = None
|
||
chatSvc = None
|
||
result = await actionExecutor.executeAction(methodName, actionName, args)
|
||
if toolOpId and chatSvc:
|
||
try:
|
||
chatSvc.progressLogFinish(toolOpId, result.success)
|
||
except Exception:
|
||
pass
|
||
data, sideEvents = _formatActionResult(result, services, context)
|
||
return ToolResult(
|
||
toolCallId="",
|
||
toolName=f"{methodName}_{actionName}",
|
||
success=result.success,
|
||
data=data,
|
||
error=result.error,
|
||
sideEvents=sideEvents or None,
|
||
)
|
||
except Exception as e:
|
||
if toolOpId and chatSvc:
|
||
try:
|
||
chatSvc.progressLogFinish(toolOpId, False)
|
||
except Exception:
|
||
pass
|
||
logger.error(f"ActionToolAdapter dispatch failed for {methodName}_{actionName}: {e}")
|
||
return ToolResult(
|
||
toolCallId="",
|
||
toolName=f"{methodName}_{actionName}",
|
||
success=False,
|
||
error=str(e)
|
||
)
|
||
return _handler
|
||
|
||
|
||
_INLINE_CONTENT_LIMIT = 2000
|
||
|
||
|
||
def _persistLargeDocument(doc, services, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
"""Save an ActionDocument as a workspace file.
|
||
|
||
Handles both str and bytes documentData.
|
||
Returns a dict with 'line' (formatted result) and 'fileInfo' (for sideEvents),
|
||
or None if persistence is not possible.
|
||
"""
|
||
if not services:
|
||
return None
|
||
chatService = getattr(services, "chat", None)
|
||
if not chatService:
|
||
return None
|
||
docData = getattr(doc, "documentData", None)
|
||
if not docData:
|
||
return None
|
||
if isinstance(docData, bytes):
|
||
docBytes = docData
|
||
elif isinstance(docData, str):
|
||
docBytes = docData.encode("utf-8")
|
||
else:
|
||
return None
|
||
docName = getattr(doc, "documentName", "unnamed")
|
||
docMime = getattr(doc, "mimeType", "application/octet-stream")
|
||
try:
|
||
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(docBytes, docName)
|
||
|
||
from modules.serviceCenter.services.serviceAgent.coreTools._helpers import (
|
||
_attachFileAsChatDocument,
|
||
_formatToolFileResult,
|
||
_getOrCreateTempFolder,
|
||
)
|
||
|
||
updateFields = {}
|
||
tempFolderId = _getOrCreateTempFolder(chatService)
|
||
if tempFolderId:
|
||
updateFields["folderId"] = tempFolderId
|
||
fiId = context.get("featureInstanceId") or getattr(services, "featureInstanceId", "")
|
||
if fiId:
|
||
updateFields["featureInstanceId"] = fiId
|
||
updateFields["scope"] = "featureInstance"
|
||
mandateId = context.get("mandateId") or getattr(services, "mandateId", "")
|
||
if mandateId:
|
||
updateFields["mandateId"] = mandateId
|
||
if updateFields:
|
||
logger.debug("_persistLargeDocument: updating file %s with %s", fileItem.id, updateFields)
|
||
chatService.interfaceDbComponent.updateFile(fileItem.id, updateFields)
|
||
else:
|
||
logger.warning("_persistLargeDocument: no updateFields for file %s (tempFolderId=%s, fiId=%s)", fileItem.id, tempFolderId, fiId)
|
||
|
||
chatDocId = _attachFileAsChatDocument(
|
||
services, fileItem,
|
||
label=f"action_doc:{docName}",
|
||
userMessage=f"Action document: {docName}",
|
||
)
|
||
line = _formatToolFileResult(
|
||
fileItem=fileItem,
|
||
chatDocId=chatDocId,
|
||
actionLabel="Produced",
|
||
extraInfo="Use readFile to read the content.",
|
||
)
|
||
return {
|
||
"line": line,
|
||
"fileInfo": {
|
||
"fileId": fileItem.id,
|
||
"fileName": docName,
|
||
"mimeType": docMime,
|
||
"fileSize": len(docBytes),
|
||
},
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"_persistLargeDocument failed for {docName}: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def _formatActionResult(result, services=None, context: Optional[Dict[str, Any]] = None):
|
||
"""Format an ActionResult into a text representation for the agent.
|
||
|
||
Documents whose content exceeds the inline limit (or is binary bytes)
|
||
are persisted as workspace files.
|
||
|
||
Returns (str, list) – the formatted text and a list of sideEvent dicts.
|
||
"""
|
||
parts = []
|
||
sideEvents = []
|
||
ctx = context or {}
|
||
|
||
if result.resultLabel:
|
||
parts.append(f"Result: {result.resultLabel}")
|
||
|
||
if result.error:
|
||
parts.append(f"Error: {result.error}")
|
||
|
||
if result.documents:
|
||
parts.append(f"Documents ({len(result.documents)}):")
|
||
for doc in result.documents:
|
||
docName = getattr(doc, "documentName", "unnamed")
|
||
docType = getattr(doc, "mimeType", "unknown")
|
||
docData = getattr(doc, "documentData", None)
|
||
|
||
needsPersist = (
|
||
(isinstance(docData, bytes) and len(docData) > 0) or
|
||
(isinstance(docData, str) and len(docData) >= _INLINE_CONTENT_LIMIT)
|
||
)
|
||
if needsPersist:
|
||
persisted = _persistLargeDocument(doc, services, ctx)
|
||
if persisted:
|
||
parts.append(f" - {docName} ({docType})")
|
||
parts.append(f" {persisted['line']}")
|
||
sideEvents.append({
|
||
"type": "fileCreated",
|
||
"data": persisted["fileInfo"],
|
||
})
|
||
continue
|
||
logger.error(f"Document '{docName}' ({docType}, {len(docData)} bytes) could not be persisted")
|
||
parts.append(f" - {docName} ({docType}) [ERROR: persistence failed]")
|
||
continue
|
||
|
||
parts.append(f" - {docName} ({docType})")
|
||
if docData and isinstance(docData, str) and len(docData) < _INLINE_CONTENT_LIMIT:
|
||
parts.append(f" Content: {docData[:_INLINE_CONTENT_LIMIT]}")
|
||
|
||
if not parts:
|
||
parts.append("Action completed successfully." if result.success else "Action failed.")
|
||
|
||
return "\n".join(parts), sideEvents
|