gateway/modules/serviceCenter/services/serviceAgent/actionToolAdapter.py
2026-05-06 23:28:22 +02:00

377 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""ActionToolAdapter: wraps existing workflow actions (dynamicMode=True) as agent tools."""
import logging
from typing import Dict, Any, List, Optional
from modules.serviceCenter.services.serviceAgent.datamodelAgent import (
ToolDefinition, ToolResult
)
from modules.serviceCenter.services.serviceAgent.toolRegistry import ToolRegistry
logger = logging.getLogger(__name__)
class ActionToolAdapter:
"""Wraps existing Workflow-Actions as Agent-Tools.
Iterates over discovered methods, finds actions with dynamicMode=True,
and registers them in the ToolRegistry with a compound name (method.action).
"""
def __init__(self, actionExecutor):
self._actionExecutor = actionExecutor
self._registeredTools: List[str] = []
def registerAll(self, toolRegistry: ToolRegistry):
"""Discover and register all dynamicMode actions as agent tools."""
from modules.workflows.processing.shared.methodDiscovery import methods
registered = 0
for methodName, methodInfo in methods.items():
if not methodName[0].isupper():
continue
shortName = methodName.replace("Method", "").lower()
methodInstance = methodInfo["instance"]
for actionName, actionInfo in methodInfo["actions"].items():
actionDef = methodInstance._actions.get(actionName)
if not actionDef or not getattr(actionDef, "dynamicMode", False):
continue
compoundName = f"{shortName}_{actionName}"
toolDef = _buildToolDefinition(compoundName, actionDef, actionInfo)
handler = _createDispatchHandler(self._actionExecutor, shortName, actionName, self._actionExecutor.services)
toolRegistry.registerFromDefinition(toolDef, handler)
self._registeredTools.append(compoundName)
registered += 1
logger.info(f"ActionToolAdapter: registered {registered} tools from workflow actions")
@property
def registeredTools(self) -> List[str]:
"""Names of all tools registered by this adapter."""
return list(self._registeredTools)
def _buildToolDefinition(compoundName: str, actionDef, actionInfo: Dict[str, Any]) -> ToolDefinition:
"""Build a ToolDefinition from a WorkflowActionDefinition."""
parameters = _convertParameterSchema(actionInfo.get("parameters", {}))
return ToolDefinition(
name=compoundName,
description=actionDef.description or actionInfo.get("description", ""),
parameters=parameters,
readOnly=False
)
def _convertParameterSchema(actionParams: Dict[str, Any]) -> Dict[str, Any]:
"""Convert workflow action parameter schema to JSON Schema for tool definitions.
Schicht-3 Adapter (typed): looks up each parameter's `type` against the
PORT_TYPE_CATALOG and produces a strict JSON Schema fragment.
Falls back to a generic string schema only when the type is fully unknown
(which should never happen after Phase 2's signature validator).
"""
properties: Dict[str, Any] = {}
required: List[str] = []
for paramName, paramInfo in actionParams.items():
if not isinstance(paramInfo, dict):
properties[paramName] = {"type": "string", "description": ""}
continue
paramType = paramInfo.get("type", "str")
paramDesc = paramInfo.get("description", "") or ""
paramRequired = bool(paramInfo.get("required", False))
prop = _catalogTypeToJsonSchema(paramType)
if paramDesc:
prop["description"] = paramDesc
properties[paramName] = prop
if paramRequired:
required.append(paramName)
return {
"type": "object",
"properties": properties,
"required": required,
}
# Primitive Python type strings → JSON Schema scalar types.
_PRIMITIVE_JSON_TYPE: Dict[str, str] = {
"str": "string",
"int": "integer",
"float": "number",
"bool": "boolean",
}
def _catalogTypeToJsonSchema(typeStr: str, _depth: int = 0) -> Dict[str, Any]:
"""Recursively convert a PORT_TYPE_CATALOG type reference into a JSON Schema fragment.
Supports:
- Primitives (str/int/bool/float/Any)
- Catalog object schemas (recursively expanded with properties/required)
- List[X] (array with typed items)
- Dict[K, V] (object with typed additionalProperties)
`_depth` guards against pathological recursion in case of a cyclic catalog.
"""
from modules.features.graphicalEditor.portTypes import (
PORT_TYPE_CATALOG,
PRIMITIVE_TYPES,
)
if _depth > 6:
return {"type": "object", "description": "(max-depth)"}
if not typeStr or not isinstance(typeStr, str):
return {"type": "string"}
typeStr = typeStr.strip()
if typeStr in _PRIMITIVE_JSON_TYPE:
return {"type": _PRIMITIVE_JSON_TYPE[typeStr]}
if typeStr == "Any":
return {}
if typeStr.startswith("List[") and typeStr.endswith("]"):
inner = typeStr[5:-1].strip()
return {"type": "array", "items": _catalogTypeToJsonSchema(inner, _depth + 1)}
if typeStr.startswith("Dict[") and typeStr.endswith("]"):
inner = typeStr[5:-1].strip()
valueType = "Any"
parts = [p.strip() for p in inner.split(",", 1)]
if len(parts) == 2:
valueType = parts[1]
return {
"type": "object",
"additionalProperties": _catalogTypeToJsonSchema(valueType, _depth + 1),
}
schema = PORT_TYPE_CATALOG.get(typeStr)
if schema is not None:
props: Dict[str, Any] = {}
required: List[str] = []
for f in schema.fields:
fragment = _catalogTypeToJsonSchema(f.type, _depth + 1)
if f.description:
fragment["description"] = f.description
if f.enumValues:
fragment["enum"] = list(f.enumValues)
props[f.name] = fragment
if f.required:
required.append(f.name)
out: Dict[str, Any] = {
"type": "object",
"properties": props,
"description": f"PORT_TYPE_CATALOG schema '{schema.name}'",
}
if required:
out["required"] = required
return out
# Lowercase 'list' / 'dict' aliases (legacy, should be eradicated by Phase 2 validator)
if typeStr in PRIMITIVE_TYPES and typeStr in {"List", "Dict"}:
return {"type": "array" if typeStr == "List" else "object"}
return {"type": "string", "description": f"unknown type '{typeStr}' (defaulted to string)"}
def _createDispatchHandler(actionExecutor, methodName: str, actionName: str, services=None):
"""Create an async handler that dispatches to the ActionExecutor.
Parameter validation and Ref-payload normalization (collapsing
``{id: ..., featureCode: ...}`` from the agent's typed tool schema to the
bare UUID expected by action implementations) happen centrally inside
``ActionExecutor.executeAction`` via ``parameterValidation``. This keeps
a single source of truth for the action parameter contract regardless
of caller (agent, workflow graph, REST route).
"""
async def _handler(args: Dict[str, Any], context: Dict[str, Any]) -> ToolResult:
try:
if context:
if "featureInstanceId" not in args and context.get("featureInstanceId"):
args["featureInstanceId"] = context["featureInstanceId"]
if "mandateId" not in args and context.get("mandateId"):
args["mandateId"] = context["mandateId"]
if "parentOperationId" not in args:
import time as _time
toolOpId = f"agentTool_{methodName}_{actionName}_{int(_time.time())}"
chatSvc = getattr(services, "chat", None) if services else None
if chatSvc:
try:
chatSvc.progressLogStart(toolOpId, methodName.capitalize(), actionName, "Agent tool")
except Exception:
pass
args["parentOperationId"] = toolOpId
else:
toolOpId = None
chatSvc = None
result = await actionExecutor.executeAction(methodName, actionName, args)
if toolOpId and chatSvc:
try:
chatSvc.progressLogFinish(toolOpId, result.success)
except Exception:
pass
data, sideEvents = _formatActionResult(result, services, context)
return ToolResult(
toolCallId="",
toolName=f"{methodName}_{actionName}",
success=result.success,
data=data,
error=result.error,
sideEvents=sideEvents or None,
)
except Exception as e:
if toolOpId and chatSvc:
try:
chatSvc.progressLogFinish(toolOpId, False)
except Exception:
pass
logger.error(f"ActionToolAdapter dispatch failed for {methodName}_{actionName}: {e}")
return ToolResult(
toolCallId="",
toolName=f"{methodName}_{actionName}",
success=False,
error=str(e)
)
return _handler
_INLINE_CONTENT_LIMIT = 2000
def _persistLargeDocument(doc, services, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Save an ActionDocument as a workspace file.
Handles both str and bytes documentData.
Returns a dict with 'line' (formatted result) and 'fileInfo' (for sideEvents),
or None if persistence is not possible.
"""
if not services:
return None
chatService = getattr(services, "chat", None)
if not chatService:
return None
docData = getattr(doc, "documentData", None)
if not docData:
return None
if isinstance(docData, bytes):
docBytes = docData
elif isinstance(docData, str):
docBytes = docData.encode("utf-8")
else:
return None
docName = getattr(doc, "documentName", "unnamed")
docMime = getattr(doc, "mimeType", "application/octet-stream")
try:
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(docBytes, docName)
from modules.serviceCenter.services.serviceAgent.coreTools._helpers import (
_attachFileAsChatDocument,
_formatToolFileResult,
_getOrCreateTempFolder,
)
updateFields = {}
tempFolderId = _getOrCreateTempFolder(chatService)
if tempFolderId:
updateFields["folderId"] = tempFolderId
fiId = context.get("featureInstanceId") or getattr(services, "featureInstanceId", "")
if fiId:
updateFields["featureInstanceId"] = fiId
updateFields["scope"] = "featureInstance"
mandateId = context.get("mandateId") or getattr(services, "mandateId", "")
if mandateId:
updateFields["mandateId"] = mandateId
if updateFields:
logger.debug("_persistLargeDocument: updating file %s with %s", fileItem.id, updateFields)
chatService.interfaceDbComponent.updateFile(fileItem.id, updateFields)
else:
logger.warning("_persistLargeDocument: no updateFields for file %s (tempFolderId=%s, fiId=%s)", fileItem.id, tempFolderId, fiId)
chatDocId = _attachFileAsChatDocument(
services, fileItem,
label=f"action_doc:{docName}",
userMessage=f"Action document: {docName}",
)
line = _formatToolFileResult(
fileItem=fileItem,
chatDocId=chatDocId,
actionLabel="Produced",
extraInfo="Use readFile to read the content.",
)
return {
"line": line,
"fileInfo": {
"fileId": fileItem.id,
"fileName": docName,
"mimeType": docMime,
"fileSize": len(docBytes),
},
}
except Exception as e:
logger.error(f"_persistLargeDocument failed for {docName}: {e}", exc_info=True)
return None
def _formatActionResult(result, services=None, context: Optional[Dict[str, Any]] = None):
"""Format an ActionResult into a text representation for the agent.
Documents whose content exceeds the inline limit (or is binary bytes)
are persisted as workspace files.
Returns (str, list) the formatted text and a list of sideEvent dicts.
"""
parts = []
sideEvents = []
ctx = context or {}
if result.resultLabel:
parts.append(f"Result: {result.resultLabel}")
if result.error:
parts.append(f"Error: {result.error}")
if result.documents:
parts.append(f"Documents ({len(result.documents)}):")
for doc in result.documents:
docName = getattr(doc, "documentName", "unnamed")
docType = getattr(doc, "mimeType", "unknown")
docData = getattr(doc, "documentData", None)
needsPersist = (
(isinstance(docData, bytes) and len(docData) > 0) or
(isinstance(docData, str) and len(docData) >= _INLINE_CONTENT_LIMIT)
)
if needsPersist:
persisted = _persistLargeDocument(doc, services, ctx)
if persisted:
parts.append(f" - {docName} ({docType})")
parts.append(f" {persisted['line']}")
sideEvents.append({
"type": "fileCreated",
"data": persisted["fileInfo"],
})
continue
logger.error(f"Document '{docName}' ({docType}, {len(docData)} bytes) could not be persisted")
parts.append(f" - {docName} ({docType}) [ERROR: persistence failed]")
continue
parts.append(f" - {docName} ({docType})")
if docData and isinstance(docData, str) and len(docData) < _INLINE_CONTENT_LIMIT:
parts.append(f" Content: {docData[:_INLINE_CONTENT_LIMIT]}")
if not parts:
parts.append("Action completed successfully." if result.success else "Action failed.")
return "\n".join(parts), sideEvents