gateway/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
2026-03-16 00:47:42 +01:00

1641 lines
76 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Agent service: entry point for running AI agents with tool use."""
import logging
from typing import Any, Callable, Dict, List, Optional, AsyncGenerator
from modules.datamodels.datamodelAi import (
AiCallRequest, AiCallOptions, AiCallResponse, OperationTypeEnum
)
from modules.serviceCenter.services.serviceAgent.datamodelAgent import (
AgentConfig, AgentEvent, AgentEventTypeEnum
)
from modules.serviceCenter.services.serviceAgent.toolRegistry import ToolRegistry
from modules.serviceCenter.services.serviceAgent.agentLoop import runAgentLoop
from modules.serviceCenter.services.serviceAgent.actionToolAdapter import ActionToolAdapter
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import (
getService as getBillingService,
InsufficientBalanceException,
BillingContextError
)
logger = logging.getLogger(__name__)
class _ServicesAdapter:
"""Adapter providing service access from (context, get_service)."""
def __init__(self, context, getService: Callable[[str], Any]):
self._context = context
self._getService = getService
self.user = context.user
self.mandateId = context.mandate_id
self.featureInstanceId = context.feature_instance_id
@property
def workflow(self):
return self._context.workflow
@property
def ai(self):
return self._getService("ai")
@property
def chat(self):
return self._getService("chat")
@property
def streaming(self):
return self._getService("streaming")
@property
def billing(self):
return self._getService("billing")
@property
def utils(self):
return self._getService("utils")
@property
def extraction(self):
return self._getService("extraction")
def getService(self, name: str):
"""Access any service by name."""
return self._getService(name)
@property
def featureCode(self) -> Optional[str]:
w = self.workflow
if w and hasattr(w, "feature") and w.feature:
return getattr(w.feature, "code", None)
return getattr(w, "featureCode", None) if w else None
class AgentService:
"""Service for running AI agents with ReAct loop and tool use.
Registered as IMPORTABLE_SERVICE with objectKey 'service.agent'.
Uses serviceAi for model selection/billing, streaming for SSE events.
"""
def __init__(self, context, get_service: Callable[[str], Any]):
self._context = context
self._getService = get_service
self.services = _ServicesAdapter(context, get_service)
async def runAgent(
self,
prompt: str,
fileIds: List[str] = None,
config: AgentConfig = None,
toolSet: str = "core",
workflowId: str = None,
additionalTools: List[Dict[str, Any]] = None,
userLanguage: str = "",
) -> AsyncGenerator[AgentEvent, None]:
"""Run an agent with the given prompt and tools.
Args:
prompt: User prompt
fileIds: Optional list of file IDs to include as context
config: Agent configuration
toolSet: Which tool set to activate
workflowId: Workflow ID for tracking and billing
additionalTools: Extra tool definitions to register dynamically
userLanguage: ISO 639-1 language code; falls back to user.language from profile
Yields:
AgentEvent for each step (SSE-ready)
"""
if config is None:
config = AgentConfig(toolSet=toolSet)
if workflowId is None:
workflowId = getattr(self.services.workflow, "id", "unknown") if self.services.workflow else "unknown"
resolvedLanguage = userLanguage or getattr(self.services.user, "language", "") or "de"
enrichedPrompt = await self._enrichPromptWithFiles(prompt, fileIds)
toolRegistry = self._buildToolRegistry(config)
aiCallFn = self._createAiCallFn()
aiCallStreamFn = self._createAiCallStreamFn()
getWorkflowCostFn = self._createGetWorkflowCostFn(workflowId)
buildRagContextFn = self._createBuildRagContextFn()
async for event in runAgentLoop(
prompt=enrichedPrompt,
toolRegistry=toolRegistry,
config=config,
aiCallFn=aiCallFn,
getWorkflowCostFn=getWorkflowCostFn,
workflowId=workflowId,
userId=self.services.user.id if self.services.user else "",
featureInstanceId=self.services.featureInstanceId or "",
buildRagContextFn=buildRagContextFn,
mandateId=self.services.mandateId or "",
aiCallStreamFn=aiCallStreamFn,
userLanguage=resolvedLanguage,
):
if event.type == AgentEventTypeEnum.AGENT_SUMMARY:
await self._persistTrace(workflowId, event.data or {})
yield event
async def _enrichPromptWithFiles(self, prompt: str, fileIds: List[str] = None) -> str:
"""Resolve file metadata + FileContentIndex for attached fileIds and prepend to prompt.
The FileContentIndex is produced by the upload pipeline (AI-free extraction)
and tells the agent exactly which content objects (text, images, tables, etc.)
exist inside a file, so the agent can work with them directly via tools.
"""
if not fileIds:
return prompt
try:
chatService = self.services.chat
knowledgeDb = None
try:
from modules.interfaces.interfaceDbKnowledge import getInterface as _getKnowledgeInterface
knowledgeDb = _getKnowledgeInterface()
except Exception:
pass
fileDescriptions = []
for fid in fileIds:
try:
info = chatService.getFileInfo(fid)
fileName = info.get("fileName", fid) if info else fid
mimeType = info.get("mimeType", "unknown") if info else "unknown"
fileSize = info.get("size", "?") if info else "?"
desc = f"### File: {fileName}\n - id: {fid}\n - type: {mimeType}\n - size: {fileSize} bytes"
if knowledgeDb:
contentIndex = knowledgeDb.getFileContentIndex(fid)
if contentIndex:
structure = contentIndex.get("structure", {})
totalObjects = contentIndex.get("totalObjects", 0)
desc += f"\n - indexed: yes ({totalObjects} content objects)"
if structure:
structParts = []
for key, val in structure.items():
if isinstance(val, (int, str)):
structParts.append(f"{key}: {val}")
if structParts:
desc += f"\n - structure: {', '.join(structParts)}"
objectSummary = contentIndex.get("objectSummary", [])
if objectSummary:
desc += "\n - content objects:"
for obj in objectSummary[:20]:
objType = obj.get("type", obj.get("contentType", "?"))
objRef = obj.get("ref", {})
objLabel = objRef.get("location", "") if isinstance(objRef, dict) else ""
objId = obj.get("id", obj.get("contentObjectId", ""))
desc += f"\n * [{objType}] {objLabel}" + (f" (id: {objId})" if objId else "")
if len(objectSummary) > 20:
desc += f"\n ... and {len(objectSummary) - 20} more objects"
else:
desc += "\n - indexed: no (use readFile to trigger extraction)"
fileDescriptions.append(desc)
except Exception:
fileDescriptions.append(f"### File id: {fid}")
if fileDescriptions:
header = (
"## Attached Files\n"
"These files have been uploaded and processed through the extraction pipeline.\n"
"Use `readFile(fileId)` to read text content, `readContentObjects(fileId)` for structured access, "
"or `describeImage(fileId)` for image analysis.\n"
"When generating documents with `renderDocument`, embed images using `![alt text](file:fileId)` in the markdown content.\n\n"
)
header += "\n\n".join(fileDescriptions)
return f"{header}\n\n---\n\nUser request: {prompt}"
except Exception as e:
logger.warning(f"Could not enrich prompt with file metadata: {e}")
return prompt
def _buildToolRegistry(self, config: AgentConfig) -> ToolRegistry:
"""Build a tool registry with core tools and ActionToolAdapter tools."""
registry = ToolRegistry()
_registerCoreTools(registry, self.services)
try:
from modules.workflows.processing.core.actionExecutor import ActionExecutor
actionExecutor = ActionExecutor(self.services)
adapter = ActionToolAdapter(actionExecutor)
adapter.registerAll(registry)
except Exception as e:
logger.warning(f"Could not register action tools: {e}")
return registry
async def _persistTrace(self, workflowId: str, summaryData: Dict[str, Any]):
"""Persist the agent trace as a workflow memory entry in the knowledge store."""
try:
knowledgeService = self._getService("knowledge")
userId = self.services.user.id if self.services.user else ""
featureInstanceId = self.services.featureInstanceId or ""
import json
traceValue = json.dumps(summaryData, default=str)
await knowledgeService.storeEntity(
workflowId=workflowId,
userId=userId,
featureInstanceId=featureInstanceId,
key="_agentTrace",
value=traceValue,
source="agent",
)
logger.info(f"Persisted agent trace for workflow {workflowId}")
except Exception as e:
logger.warning(f"Could not persist agent trace: {e}")
def _createAiCallFn(self) -> Callable[[AiCallRequest], AiCallResponse]:
"""Create the AI call function that wraps serviceAi with billing."""
async def _aiCallFn(request: AiCallRequest) -> AiCallResponse:
aiService = self.services.ai
return await aiService.callAi(request)
return _aiCallFn
def _createAiCallStreamFn(self):
"""Create the streaming AI call function. Yields str deltas, then AiCallResponse."""
async def _aiCallStreamFn(request: AiCallRequest):
aiService = self.services.ai
async for chunk in aiService.callAiStream(request):
yield chunk
return _aiCallStreamFn
def _createGetWorkflowCostFn(self, workflowId: str) -> Callable[[], float]:
"""Create a function that returns the current workflow cost."""
async def _getWorkflowCost() -> float:
try:
billingService = self.services.billing
return await billingService.getWorkflowCost(workflowId)
except Exception:
return 0.0
return _getWorkflowCost
def _createBuildRagContextFn(self):
"""Create the RAG context builder function that delegates to KnowledgeService."""
async def _buildRagContext(
currentPrompt: str, workflowId: str, userId: str,
featureInstanceId: str, mandateId: str, **kwargs
) -> str:
try:
knowledgeService = self.services.getService("knowledge")
return await knowledgeService.buildAgentContext(
currentPrompt=currentPrompt,
workflowId=workflowId,
userId=userId,
featureInstanceId=featureInstanceId,
mandateId=mandateId,
)
except Exception as e:
logger.debug(f"RAG context not available: {e}")
return ""
return _buildRagContext
def _registerCoreTools(registry: ToolRegistry, services):
"""Register built-in core tools: file operations, search, and folder management."""
from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult
# ---- Read-only tools ----
async def _readFile(args: Dict[str, Any], context: Dict[str, Any]):
fileId = args.get("fileId", "")
if not fileId:
return ToolResult(toolCallId="", toolName="readFile", success=False, error="fileId is required")
try:
knowledgeService = services.getService("knowledge") if hasattr(services, "getService") else None
# 1) Knowledge Store: return already-extracted text chunks
if knowledgeService:
fileStatus = knowledgeService.getFileStatus(fileId)
if fileStatus == "indexed":
chunks = knowledgeService._knowledgeDb.getContentChunks(fileId)
textChunks = [
c for c in (chunks or [])
if c.get("contentType") == "text" and c.get("data")
]
if textChunks:
assembled = "\n\n".join(c["data"] for c in textChunks)
return ToolResult(
toolCallId="", toolName="readFile", success=True,
data=assembled,
)
elif fileStatus in ("processing", "embedding", "extracted"):
return ToolResult(
toolCallId="", toolName="readFile", success=True,
data=f"[File {fileId} is currently being processed (status: {fileStatus}). Try again shortly.]",
)
# 2) Not indexed yet: try on-demand extraction
chatService = services.chat
fileInfo = chatService.getFileInfo(fileId)
if not fileInfo:
return ToolResult(toolCallId="", toolName="readFile", success=True, data="File not found.")
fileName = fileInfo.get("fileName", fileId)
mimeType = fileInfo.get("mimeType", "")
_BINARY_TYPES = ("application/pdf", "image/", "application/vnd.", "application/zip",
"application/x-zip", "application/x-tar", "application/x-7z")
isBinary = any(mimeType.startswith(t) for t in _BINARY_TYPES)
if isBinary and knowledgeService:
rawBytes = chatService.getFileData(fileId)
if rawBytes:
try:
from modules.serviceCenter.services.serviceExtraction.subRegistry import ExtractorRegistry, ChunkerRegistry
from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction
from modules.datamodels.datamodelExtraction import ExtractionOptions
extracted = runExtraction(
ExtractorRegistry(), ChunkerRegistry(),
rawBytes, fileName, mimeType, ExtractionOptions(),
)
contentObjects = []
for part in extracted.parts:
ct = "image" if part.typeGroup == "image" else ("text" if part.typeGroup == "text" else "other")
if not part.data or not part.data.strip():
continue
contentObjects.append({
"contentObjectId": part.id,
"contentType": ct,
"data": part.data,
"contextRef": {
"containerPath": fileName,
"location": part.label or "file",
**(part.metadata or {}),
},
})
if contentObjects:
userId = context.get("userId", "")
await knowledgeService.indexFile(
fileId=fileId, fileName=fileName, mimeType=mimeType,
userId=userId, contentObjects=contentObjects,
)
textParts = [o["data"] for o in contentObjects if o["contentType"] == "text"]
if textParts:
return ToolResult(
toolCallId="", toolName="readFile", success=True,
data="\n\n".join(textParts),
)
return ToolResult(
toolCallId="", toolName="readFile", success=True,
data=f"[Extracted {len(contentObjects)} content objects from {fileName}. "
f"No text content found. Use describeImage or readContentObjects for image/other content.]",
)
except Exception as extractErr:
logger.warning(f"readFile on-demand extraction failed for {fileId}: {extractErr}")
# 3) Read raw bytes and decode
rawBytes = chatService.getFileData(fileId)
if not rawBytes:
return ToolResult(toolCallId="", toolName="readFile", success=True, data="File data not accessible.")
if isBinary:
return ToolResult(
toolCallId="", toolName="readFile", success=True,
data=f"[Binary file: {fileName}, type={mimeType}. Extraction failed or not available.]",
)
for encoding in ("utf-8", "utf-8-sig", "latin-1"):
try:
text = rawBytes.decode(encoding)
if text.strip():
return ToolResult(
toolCallId="", toolName="readFile", success=True,
data=text,
)
except (UnicodeDecodeError, ValueError):
continue
return ToolResult(
toolCallId="", toolName="readFile", success=True,
data="File is empty or could not be decoded.",
)
except Exception as e:
return ToolResult(toolCallId="", toolName="readFile", success=False, error=str(e))
async def _listFiles(args: Dict[str, Any], context: Dict[str, Any]):
try:
chatService = services.chat
files = chatService.listFiles(
folderId=args.get("folderId"),
tags=args.get("tags"),
search=args.get("search"),
)
fileList = "\n".join(
f"- {f.get('fileName', 'unknown')} (id: {f.get('id', '?')}, "
f"type: {f.get('mimeType', '?')}, size: {f.get('fileSize', '?')}, "
f"tags: {f.get('tags', [])}, status: {f.get('status', 'n/a')})"
for f in files
) if files else "No files found."
return ToolResult(toolCallId="", toolName="listFiles", success=True, data=fileList)
except Exception as e:
return ToolResult(toolCallId="", toolName="listFiles", success=False, error=str(e))
async def _searchFiles(args: Dict[str, Any], context: Dict[str, Any]):
query = args.get("query", "")
if not query:
return ToolResult(toolCallId="", toolName="searchFiles", success=False, error="query is required")
try:
chatService = services.chat
files = chatService.listFiles(search=query, tags=args.get("tags"))
fileList = "\n".join(
f"- {f.get('fileName', 'unknown')} (id: {f.get('id', '?')})"
for f in files
) if files else "No files matching query."
return ToolResult(toolCallId="", toolName="searchFiles", success=True, data=fileList)
except Exception as e:
return ToolResult(toolCallId="", toolName="searchFiles", success=False, error=str(e))
async def _listFolders(args: Dict[str, Any], context: Dict[str, Any]):
try:
chatService = services.chat
folders = chatService.listFolders(parentId=args.get("parentId"))
folderList = "\n".join(
f"- {f.get('name', 'unnamed')} (id: {f.get('id', '?')})"
for f in folders
) if folders else "No folders found."
return ToolResult(toolCallId="", toolName="listFolders", success=True, data=folderList)
except Exception as e:
return ToolResult(toolCallId="", toolName="listFolders", success=False, error=str(e))
async def _webSearch(args: Dict[str, Any], context: Dict[str, Any]):
query = args.get("query", "")
if not query:
return ToolResult(toolCallId="", toolName="webSearch", success=False, error="query is required")
try:
webService = services.getService("web")
result = await webService.search(query)
return ToolResult(
toolCallId="", toolName="webSearch", success=True,
data=result if isinstance(result, str) else str(result)
)
except Exception as e:
return ToolResult(toolCallId="", toolName="webSearch", success=False, error=str(e))
# ---- Write tools ----
async def _tagFile(args: Dict[str, Any], context: Dict[str, Any]):
fileId = args.get("fileId", "")
tags = args.get("tags", [])
if not fileId:
return ToolResult(toolCallId="", toolName="tagFile", success=False, error="fileId is required")
try:
chatService = services.chat
chatService.interfaceDbComponent.updateFile(fileId, {"tags": tags})
return ToolResult(
toolCallId="", toolName="tagFile", success=True,
data=f"Tags updated to {tags} for file {fileId}"
)
except Exception as e:
return ToolResult(toolCallId="", toolName="tagFile", success=False, error=str(e))
async def _moveFile(args: Dict[str, Any], context: Dict[str, Any]):
fileId = args.get("fileId", "")
targetFolderId = args.get("targetFolderId")
if not fileId:
return ToolResult(toolCallId="", toolName="moveFile", success=False, error="fileId is required")
try:
chatService = services.chat
chatService.interfaceDbComponent.updateFile(fileId, {"folderId": targetFolderId})
return ToolResult(
toolCallId="", toolName="moveFile", success=True,
data=f"File {fileId} moved to folder {targetFolderId or 'root'}"
)
except Exception as e:
return ToolResult(toolCallId="", toolName="moveFile", success=False, error=str(e))
async def _createFolder(args: Dict[str, Any], context: Dict[str, Any]):
name = args.get("name", "")
if not name:
return ToolResult(toolCallId="", toolName="createFolder", success=False, error="name is required")
try:
chatService = services.chat
folder = chatService.createFolder(name=name, parentId=args.get("parentId"))
return ToolResult(
toolCallId="", toolName="createFolder", success=True,
data=f"Folder '{name}' created (id: {folder.get('id', '?')})"
)
except Exception as e:
return ToolResult(toolCallId="", toolName="createFolder", success=False, error=str(e))
async def _writeFile(args: Dict[str, Any], context: Dict[str, Any]):
name = args.get("name", "")
content = args.get("content", "")
if not name:
return ToolResult(toolCallId="", toolName="writeFile", success=False, error="name is required")
try:
chatService = services.chat
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(
content.encode("utf-8"), name
)
if args.get("folderId"):
chatService.interfaceDbComponent.updateFile(fileItem.id, {"folderId": args["folderId"]})
if args.get("tags"):
chatService.interfaceDbComponent.updateFile(fileItem.id, {"tags": args["tags"]})
return ToolResult(
toolCallId="", toolName="writeFile", success=True,
data=f"File '{name}' created (id: {fileItem.id})",
sideEvents=[{
"type": "fileCreated",
"data": {
"fileId": fileItem.id,
"fileName": name,
"mimeType": fileItem.mimeType,
"fileSize": fileItem.fileSize,
},
}],
)
except Exception as e:
return ToolResult(toolCallId="", toolName="writeFile", success=False, error=str(e))
# ---- Register all tools ----
registry.register(
"readFile", _readFile,
description="Read the content of a file by its fileId.",
parameters={
"type": "object",
"properties": {"fileId": {"type": "string", "description": "The file ID to read"}},
"required": ["fileId"]
},
readOnly=True
)
registry.register(
"listFiles", _listFiles,
description="List files with optional filters (folder, tags, search text).",
parameters={
"type": "object",
"properties": {
"folderId": {"type": "string", "description": "Filter by folder ID"},
"tags": {"type": "array", "items": {"type": "string"}, "description": "Filter by tags (any match)"},
"search": {"type": "string", "description": "Search in file names and descriptions"},
}
},
readOnly=True
)
registry.register(
"searchFiles", _searchFiles,
description="Search files by name, description, or tags.",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"tags": {"type": "array", "items": {"type": "string"}, "description": "Additional tag filter"},
},
"required": ["query"]
},
readOnly=True
)
registry.register(
"listFolders", _listFolders,
description="List file folders. Use parentId to browse folder hierarchy.",
parameters={
"type": "object",
"properties": {
"parentId": {"type": "string", "description": "Parent folder ID (omit for root)"},
}
},
readOnly=True
)
registry.register(
"webSearch", _webSearch,
description="Search the web for information.",
parameters={
"type": "object",
"properties": {"query": {"type": "string", "description": "Search query"}},
"required": ["query"]
},
readOnly=True
)
registry.register(
"tagFile", _tagFile,
description="Set tags on a file for categorization.",
parameters={
"type": "object",
"properties": {
"fileId": {"type": "string", "description": "The file ID"},
"tags": {"type": "array", "items": {"type": "string"}, "description": "Tags to set"},
},
"required": ["fileId", "tags"]
},
readOnly=False
)
registry.register(
"moveFile", _moveFile,
description="Move a file to a different folder.",
parameters={
"type": "object",
"properties": {
"fileId": {"type": "string", "description": "The file ID to move"},
"targetFolderId": {"type": "string", "description": "Target folder ID (null for root)"},
},
"required": ["fileId"]
},
readOnly=False
)
registry.register(
"createFolder", _createFolder,
description="Create a new file folder.",
parameters={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Folder name"},
"parentId": {"type": "string", "description": "Parent folder ID (omit for root)"},
},
"required": ["name"]
},
readOnly=False
)
registry.register(
"writeFile", _writeFile,
description="Create a new file with text content.",
parameters={
"type": "object",
"properties": {
"name": {"type": "string", "description": "File name including extension"},
"content": {"type": "string", "description": "File content as text"},
"folderId": {"type": "string", "description": "Target folder ID"},
"tags": {"type": "array", "items": {"type": "string"}, "description": "Tags"},
},
"required": ["name", "content"]
},
readOnly=False
)
# ---- Connection tools (external data sources) ----
async def _listConnections(args: Dict[str, Any], context: Dict[str, Any]):
try:
chatService = services.chat
connections = chatService.getUserConnections() if hasattr(chatService, "getUserConnections") else []
if not connections:
return ToolResult(toolCallId="", toolName="listConnections", success=True, data="No connections available.")
lines = []
for conn in connections:
connId = conn.get("id", "?") if isinstance(conn, dict) else getattr(conn, "id", "?")
authority = conn.get("authority", "?") if isinstance(conn, dict) else getattr(conn, "authority", "?")
email = conn.get("externalEmail", "") if isinstance(conn, dict) else getattr(conn, "externalEmail", "")
lines.append(f"- {authority} ({email}) id: {connId}")
return ToolResult(toolCallId="", toolName="listConnections", success=True, data="\n".join(lines))
except Exception as e:
return ToolResult(toolCallId="", toolName="listConnections", success=False, error=str(e))
async def _externalBrowse(args: Dict[str, Any], context: Dict[str, Any]):
connectionId = args.get("connectionId", "")
service = args.get("service", "")
path = args.get("path", "/")
if not connectionId or not service:
return ToolResult(toolCallId="", toolName="externalBrowse", success=False, error="connectionId and service are required")
try:
from modules.connectors.connectorResolver import ConnectorResolver
resolver = ConnectorResolver(
services.getService("security"),
services.chat.interfaceDbComponent if hasattr(services.chat, "interfaceDbComponent") else None,
)
adapter = await resolver.resolveService(connectionId, service)
entries = await adapter.browse(path, filter=args.get("filter"))
entryLines = "\n".join(
f"- {'[DIR]' if e.isFolder else '[FILE]'} {e.name} ({e.size or '?'} bytes)"
for e in entries
) if entries else "Empty directory."
return ToolResult(toolCallId="", toolName="externalBrowse", success=True, data=entryLines)
except Exception as e:
return ToolResult(toolCallId="", toolName="externalBrowse", success=False, error=str(e))
async def _externalDownload(args: Dict[str, Any], context: Dict[str, Any]):
connectionId = args.get("connectionId", "")
service = args.get("service", "")
path = args.get("path", "")
if not connectionId or not service or not path:
return ToolResult(toolCallId="", toolName="externalDownload", success=False, error="connectionId, service, and path are required")
try:
from modules.connectors.connectorResolver import ConnectorResolver
resolver = ConnectorResolver(
services.getService("security"),
services.chat.interfaceDbComponent if hasattr(services.chat, "interfaceDbComponent") else None,
)
adapter = await resolver.resolveService(connectionId, service)
fileBytes = await adapter.download(path)
if not fileBytes:
return ToolResult(toolCallId="", toolName="externalDownload", success=False, error="Download returned empty")
fileName = path.split("/")[-1] or "downloaded_file"
chatService = services.chat
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(fileBytes, fileName)
return ToolResult(
toolCallId="", toolName="externalDownload", success=True,
data=f"Downloaded '{fileName}' ({len(fileBytes)} bytes) → local file id: {fileItem.id}"
)
except Exception as e:
return ToolResult(toolCallId="", toolName="externalDownload", success=False, error=str(e))
async def _externalUpload(args: Dict[str, Any], context: Dict[str, Any]):
connectionId = args.get("connectionId", "")
service = args.get("service", "")
path = args.get("path", "")
fileId = args.get("fileId", "")
if not connectionId or not service or not path or not fileId:
return ToolResult(toolCallId="", toolName="externalUpload", success=False, error="connectionId, service, path, and fileId are required")
try:
from modules.connectors.connectorResolver import ConnectorResolver
resolver = ConnectorResolver(
services.getService("security"),
services.chat.interfaceDbComponent if hasattr(services.chat, "interfaceDbComponent") else None,
)
adapter = await resolver.resolveService(connectionId, service)
chatService = services.chat
fileContent = chatService.getFileContent(fileId)
if not fileContent:
return ToolResult(toolCallId="", toolName="externalUpload", success=False, error="File not found")
fileData = fileContent.get("data", b"") if isinstance(fileContent, dict) else b""
if isinstance(fileData, str):
fileData = fileData.encode("utf-8")
fileName = fileContent.get("fileName", "file") if isinstance(fileContent, dict) else "file"
result = await adapter.upload(path, fileData, fileName)
return ToolResult(toolCallId="", toolName="externalUpload", success=True, data=str(result))
except Exception as e:
return ToolResult(toolCallId="", toolName="externalUpload", success=False, error=str(e))
async def _externalSearch(args: Dict[str, Any], context: Dict[str, Any]):
connectionId = args.get("connectionId", "")
service = args.get("service", "")
query = args.get("query", "")
if not connectionId or not service or not query:
return ToolResult(toolCallId="", toolName="externalSearch", success=False, error="connectionId, service, and query are required")
try:
from modules.connectors.connectorResolver import ConnectorResolver
resolver = ConnectorResolver(
services.getService("security"),
services.chat.interfaceDbComponent if hasattr(services.chat, "interfaceDbComponent") else None,
)
adapter = await resolver.resolveService(connectionId, service)
entries = await adapter.search(query, path=args.get("path"))
resultLines = "\n".join(
f"- {e.name} ({e.path})"
for e in entries
) if entries else "No results found."
return ToolResult(toolCallId="", toolName="externalSearch", success=True, data=resultLines)
except Exception as e:
return ToolResult(toolCallId="", toolName="externalSearch", success=False, error=str(e))
async def _sendMail(args: Dict[str, Any], context: Dict[str, Any]):
connectionId = args.get("connectionId", "")
to = args.get("to", [])
subject = args.get("subject", "")
body = args.get("body", "")
if not connectionId or not to or not subject:
return ToolResult(toolCallId="", toolName="sendMail", success=False, error="connectionId, to, and subject are required")
try:
from modules.connectors.connectorResolver import ConnectorResolver
resolver = ConnectorResolver(
services.getService("security"),
services.chat.interfaceDbComponent if hasattr(services.chat, "interfaceDbComponent") else None,
)
adapter = await resolver.resolveService(connectionId, "outlook")
if hasattr(adapter, "sendMail"):
result = await adapter.sendMail(to=to, subject=subject, body=body, cc=args.get("cc"))
return ToolResult(toolCallId="", toolName="sendMail", success=True, data=str(result))
return ToolResult(toolCallId="", toolName="sendMail", success=False, error="Mail not supported by this adapter")
except Exception as e:
return ToolResult(toolCallId="", toolName="sendMail", success=False, error=str(e))
_connToolParams = {
"connectionId": {"type": "string", "description": "UserConnection ID"},
"service": {"type": "string", "description": "Service name (sharepoint, outlook, drive, etc.)"},
}
registry.register(
"listConnections", _listConnections,
description="List available external connections and their services.",
parameters={"type": "object", "properties": {}},
readOnly=True,
)
registry.register(
"externalBrowse", _externalBrowse,
description="Browse files and folders in an external data source (SharePoint, Drive, FTP).",
parameters={
"type": "object",
"properties": {
**_connToolParams,
"path": {"type": "string", "description": "Path to browse"},
"filter": {"type": "string", "description": "Filter pattern (e.g. '*.pdf')"},
},
"required": ["connectionId", "service"],
},
readOnly=True,
)
registry.register(
"externalDownload", _externalDownload,
description="Download a file from an external source into local storage + auto-index.",
parameters={
"type": "object",
"properties": {
**_connToolParams,
"path": {"type": "string", "description": "File path to download"},
},
"required": ["connectionId", "service", "path"],
},
readOnly=False,
)
registry.register(
"externalUpload", _externalUpload,
description="Upload a local file to an external data source.",
parameters={
"type": "object",
"properties": {
**_connToolParams,
"path": {"type": "string", "description": "Destination path"},
"fileId": {"type": "string", "description": "Local file ID to upload"},
},
"required": ["connectionId", "service", "path", "fileId"],
},
readOnly=False,
)
registry.register(
"externalSearch", _externalSearch,
description="Search for files in an external data source.",
parameters={
"type": "object",
"properties": {
**_connToolParams,
"query": {"type": "string", "description": "Search query"},
"path": {"type": "string", "description": "Scope to a specific path"},
},
"required": ["connectionId", "service", "query"],
},
readOnly=True,
)
registry.register(
"sendMail", _sendMail,
description="Send an email via a connected mail service (Outlook, Gmail).",
parameters={
"type": "object",
"properties": {
"connectionId": {"type": "string", "description": "UserConnection ID"},
"to": {"type": "array", "items": {"type": "string"}, "description": "Recipient email addresses"},
"subject": {"type": "string", "description": "Email subject"},
"body": {"type": "string", "description": "Email body text"},
"cc": {"type": "array", "items": {"type": "string"}, "description": "CC addresses"},
},
"required": ["connectionId", "to", "subject", "body"],
},
readOnly=False,
)
# ---- Document tools (Smart Documents / Container Handling) ----
async def _browseContainer(args: Dict[str, Any], context: Dict[str, Any]):
fileId = args.get("fileId", "")
if not fileId:
return ToolResult(toolCallId="", toolName="browseContainer", success=False, error="fileId is required")
try:
knowledgeService = services.getService("knowledge")
index = knowledgeService.getFileContentIndex(fileId)
if not index:
return ToolResult(toolCallId="", toolName="browseContainer", success=True, data="No content index available for this file. It may not have been indexed yet.")
structure = index.get("structure", {}) if isinstance(index, dict) else {}
objectSummary = index.get("objectSummary", []) if isinstance(index, dict) else []
totalObjects = index.get("totalObjects", 0) if isinstance(index, dict) else 0
result = f"File: {index.get('fileName', '?')} ({index.get('mimeType', '?')})\n"
result += f"Total content objects: {totalObjects}\n"
sections = structure.get("sections", [])
if sections:
result += "\nSections:\n"
for s in sections:
result += f" [{s.get('id', '?')}] {s.get('title', 'Untitled')} (pages {s.get('startPage', '?')}-{s.get('endPage', '?')})\n"
if structure.get("pageMap"):
pages = len(structure["pageMap"])
result += f"\nPages: {pages}\n"
imgCount = structure.get("imageCount", 0)
tableCount = structure.get("tableCount", 0)
if imgCount:
result += f"Images: {imgCount}\n"
if tableCount:
result += f"Tables: {tableCount}\n"
if structure.get("sheetMap"):
result += "\nSheets:\n"
for s in structure["sheetMap"]:
result += f" {s.get('sheetName', '?')} ({s.get('rows', '?')} rows x {s.get('columns', '?')} cols)\n"
if structure.get("slideMap"):
result += "\nSlides:\n"
for s in structure["slideMap"]:
result += f" Slide {s.get('slideIndex', 0) + 1}: {s.get('title', '(no title)')}\n"
return ToolResult(toolCallId="", toolName="browseContainer", success=True, data=result)
except Exception as e:
return ToolResult(toolCallId="", toolName="browseContainer", success=False, error=str(e))
async def _readContentObjects(args: Dict[str, Any], context: Dict[str, Any]):
fileId = args.get("fileId", "")
if not fileId:
return ToolResult(toolCallId="", toolName="readContentObjects", success=False, error="fileId is required")
try:
knowledgeService = services.getService("knowledge")
filterDict = {}
if args.get("pageIndex") is not None:
filterDict["pageIndex"] = args["pageIndex"]
if args.get("contentType"):
filterDict["contentType"] = args["contentType"]
if args.get("sectionId"):
filterDict["sectionId"] = args["sectionId"]
objects = await knowledgeService.readContentObjects(fileId, filterDict)
if not objects:
return ToolResult(toolCallId="", toolName="readContentObjects", success=True, data="No content objects found with the given filter.")
result = f"Found {len(objects)} content objects:\n\n"
for obj in objects[:20]:
data = obj.get("data", "")
cType = obj.get("contentType", "?")
ref = obj.get("contextRef", {})
location = ref.get("location", "") if isinstance(ref, dict) else ""
preview = data[:300] if cType == "text" else f"[{cType} data, {len(data)} chars]"
result += f"[{cType}] {location}: {preview}\n\n"
if len(objects) > 20:
result += f"... and {len(objects) - 20} more objects"
return ToolResult(toolCallId="", toolName="readContentObjects", success=True, data=result)
except Exception as e:
return ToolResult(toolCallId="", toolName="readContentObjects", success=False, error=str(e))
async def _extractContainerItem(args: Dict[str, Any], context: Dict[str, Any]):
fileId = args.get("fileId", "")
containerPath = args.get("containerPath", "")
if not fileId or not containerPath:
return ToolResult(toolCallId="", toolName="extractContainerItem", success=False, error="fileId and containerPath are required")
try:
knowledgeService = services.getService("knowledge")
result = await knowledgeService.extractContainerItem(fileId, containerPath)
if result:
return ToolResult(toolCallId="", toolName="extractContainerItem", success=True, data=str(result))
return ToolResult(toolCallId="", toolName="extractContainerItem", success=True, data=f"On-demand extraction for '{containerPath}' queued.")
except Exception as e:
return ToolResult(toolCallId="", toolName="extractContainerItem", success=False, error=str(e))
async def _summarizeContent(args: Dict[str, Any], context: Dict[str, Any]):
fileId = args.get("fileId", "")
if not fileId:
return ToolResult(toolCallId="", toolName="summarizeContent", success=False, error="fileId is required")
try:
knowledgeService = services.getService("knowledge")
filterDict = {}
if args.get("sectionId"):
filterDict["sectionId"] = args["sectionId"]
if args.get("pageIndex") is not None:
filterDict["pageIndex"] = args["pageIndex"]
if args.get("contentType"):
filterDict["contentType"] = args["contentType"]
objects = await knowledgeService.readContentObjects(fileId, filterDict)
if not objects:
return ToolResult(toolCallId="", toolName="summarizeContent", success=True, data="No content found to summarize.")
textParts = [obj.get("data", "") for obj in objects if obj.get("contentType") == "text"]
combinedText = "\n\n".join(textParts)[:6000]
aiService = services.ai
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
summaryRequest = AiCallRequest(
prompt=f"Summarize the following content concisely:\n\n{combinedText}",
options=AiCallOptions(operationType=OperationTypeEnum.DATA_ANALYSE),
)
response = await aiService.callAi(summaryRequest)
return ToolResult(toolCallId="", toolName="summarizeContent", success=True, data=response.content)
except Exception as e:
return ToolResult(toolCallId="", toolName="summarizeContent", success=False, error=str(e))
registry.register(
"browseContainer", _browseContainer,
description="Browse the structural index of a file/container (pages, sections, sheets, slides).",
parameters={
"type": "object",
"properties": {"fileId": {"type": "string", "description": "The file ID to browse"}},
"required": ["fileId"],
},
readOnly=True,
)
registry.register(
"readContentObjects", _readContentObjects,
description="Read content objects from a file with optional filters (page, section, type).",
parameters={
"type": "object",
"properties": {
"fileId": {"type": "string", "description": "The file ID"},
"pageIndex": {"type": "integer", "description": "Filter by page index"},
"sectionId": {"type": "string", "description": "Filter by section ID"},
"contentType": {"type": "string", "description": "Filter by content type (text, image, etc.)"},
},
"required": ["fileId"],
},
readOnly=True,
)
registry.register(
"extractContainerItem", _extractContainerItem,
description="On-demand extraction of a specific item within a container (ZIP, nested file).",
parameters={
"type": "object",
"properties": {
"fileId": {"type": "string", "description": "The container file ID"},
"containerPath": {"type": "string", "description": "Path within the container"},
},
"required": ["fileId", "containerPath"],
},
readOnly=True,
)
registry.register(
"summarizeContent", _summarizeContent,
description="AI-powered summary of content objects from a file, optionally filtered.",
parameters={
"type": "object",
"properties": {
"fileId": {"type": "string", "description": "The file ID"},
"sectionId": {"type": "string", "description": "Optional: summarize only this section"},
"pageIndex": {"type": "integer", "description": "Optional: summarize only this page"},
"contentType": {"type": "string", "description": "Optional: filter by content type"},
},
"required": ["fileId"],
},
readOnly=True,
)
# ---- Vision tool ----
async def _describeImage(args: Dict[str, Any], context: Dict[str, Any]):
"""Analyse an image using AI vision. Uses Knowledge Store chunks produced by Extractors."""
fileId = args.get("fileId", "")
prompt = args.get("prompt", "Describe this image in detail. Extract all visible text, tables, and data.")
pageIndex = args.get("pageIndex")
if not fileId:
return ToolResult(toolCallId="", toolName="describeImage", success=False, error="fileId is required")
try:
import base64 as _b64
imageData = None
mimeType = "image/png"
knowledgeService = services.getService("knowledge") if hasattr(services, "getService") else None
# 1) Knowledge Store: image chunks already produced by PdfExtractor / ImageExtractor
if knowledgeService:
chunks = knowledgeService._knowledgeDb.getContentChunks(fileId)
imageChunks = [c for c in (chunks or []) if c.get("contentType") == "image"]
if pageIndex is not None:
imageChunks = [c for c in imageChunks if c.get("contextRef", {}).get("pageIndex") == pageIndex]
if imageChunks:
imageData = imageChunks[0].get("data", "")
chunkMime = imageChunks[0].get("contextRef", {}).get("mimeType")
if chunkMime:
mimeType = chunkMime
# 2) File not yet indexed -> trigger extraction via ExtractionService, then retry
if not imageData and knowledgeService and not knowledgeService.isFileIndexed(fileId):
try:
chatService = services.chat
fileInfo = chatService.getFileInfo(fileId)
fileContent = chatService.getFileContent(fileId)
if fileContent and fileInfo:
rawData = fileContent.get("data", "")
if isinstance(rawData, str) and len(rawData) > 100:
rawBytes = _b64.b64decode(rawData)
elif isinstance(rawData, bytes):
rawBytes = rawData
else:
rawBytes = None
if rawBytes:
from modules.serviceCenter.services.serviceExtraction.subRegistry import ExtractorRegistry
from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction
from modules.datamodels.datamodelExtraction import ExtractionOptions
fileMime = fileInfo.get("mimeType", "application/octet-stream")
fileName = fileInfo.get("fileName", fileId)
extracted = runExtraction(
ExtractorRegistry(), None,
rawBytes, fileName, fileMime, ExtractionOptions(),
)
contentObjects = []
for part in extracted.parts:
ct = "image" if part.typeGroup == "image" else ("text" if part.typeGroup == "text" else "other")
if not part.data or not part.data.strip():
continue
contentObjects.append({
"contentObjectId": part.id,
"contentType": ct,
"data": part.data,
"contextRef": {"containerPath": fileName, "location": part.label, **(part.metadata or {})},
})
if contentObjects:
await knowledgeService.indexFile(
fileId=fileId, fileName=fileName, mimeType=fileMime,
userId=context.get("userId", ""), contentObjects=contentObjects,
)
chunks = knowledgeService._knowledgeDb.getContentChunks(fileId)
imageChunks = [c for c in (chunks or []) if c.get("contentType") == "image"]
if pageIndex is not None:
imageChunks = [c for c in imageChunks if c.get("contextRef", {}).get("pageIndex") == pageIndex]
if imageChunks:
imageData = imageChunks[0].get("data", "")
except Exception as extractErr:
logger.warning(f"describeImage: on-demand extraction failed: {extractErr}")
# 3) Direct image file (not a container) - use raw file data
if not imageData:
chatService = services.chat
fileContent = chatService.getFileContent(fileId)
if fileContent:
fileMimeType = fileContent.get("mimeType", "")
if fileMimeType.startswith("image/"):
imageData = fileContent.get("data", "")
mimeType = fileMimeType
if not imageData:
return ToolResult(toolCallId="", toolName="describeImage", success=False,
error="No image data found. The file may not contain images or extraction failed.")
dataUrl = f"data:{mimeType};base64,{imageData}"
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum as OTE
visionRequest = AiCallRequest(
prompt=prompt,
options=AiCallOptions(operationType=OTE.IMAGE_ANALYSE),
messages=[{"role": "user", "content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": dataUrl}},
]}],
)
visionResponse = await services.ai.callAi(visionRequest)
if visionResponse.errorCount > 0:
return ToolResult(toolCallId="", toolName="describeImage", success=False, error=visionResponse.content)
return ToolResult(toolCallId="", toolName="describeImage", success=True, data=visionResponse.content)
except Exception as e:
return ToolResult(toolCallId="", toolName="describeImage", success=False, error=str(e))
registry.register(
"describeImage", _describeImage,
description="Analyse an image using AI vision. Works with image files and images extracted from PDFs/DOCX/PPTX.",
parameters={
"type": "object",
"properties": {
"fileId": {"type": "string", "description": "The file ID containing the image or document with images"},
"prompt": {"type": "string", "description": "What to look for in the image (default: describe everything)"},
"pageIndex": {"type": "integer", "description": "Filter images by page index (0-based, for multi-page documents)"},
},
"required": ["fileId"],
},
readOnly=True,
)
# ---- Document rendering tool ----
def _markdownToDocumentJson(markdown: str, title: str, language: str = "de") -> Dict[str, Any]:
"""Convert markdown content to the standard document JSON format expected by renderers."""
import re as _re
sections = []
order = 0
lines = markdown.split("\n")
i = 0
def _nextId():
nonlocal order
order += 1
return f"s_{order}"
while i < len(lines):
line = lines[i]
# --- Headings ---
headingMatch = _re.match(r'^(#{1,6})\s+(.+)', line)
if headingMatch:
level = len(headingMatch.group(1))
text = headingMatch.group(2).strip()
sections.append({
"id": _nextId(), "content_type": "heading", "order": order,
"elements": [{"content": {"text": text, "level": level}}],
})
i += 1
continue
# --- Fenced code blocks ---
codeMatch = _re.match(r'^```(\w*)', line)
if codeMatch:
lang = codeMatch.group(1) or "text"
codeLines = []
i += 1
while i < len(lines) and not lines[i].startswith("```"):
codeLines.append(lines[i])
i += 1
i += 1
sections.append({
"id": _nextId(), "content_type": "code_block", "order": order,
"elements": [{"content": {"code": "\n".join(codeLines), "language": lang}}],
})
continue
# --- Tables ---
tableMatch = _re.match(r'^\|(.+)\|$', line)
if tableMatch and (i + 1) < len(lines) and _re.match(r'^\|[\s\-:|]+\|$', lines[i + 1]):
headerCells = [c.strip() for c in tableMatch.group(1).split("|")]
i += 2
rows = []
while i < len(lines) and _re.match(r'^\|(.+)\|$', lines[i]):
rowCells = [c.strip() for c in lines[i][1:-1].split("|")]
rows.append(rowCells)
i += 1
sections.append({
"id": _nextId(), "content_type": "table", "order": order,
"elements": [{"content": {"headers": headerCells, "rows": rows}}],
})
continue
# --- Bullet / numbered lists ---
listMatch = _re.match(r'^(\s*)([-*+]|\d+[.)]) (.+)', line)
if listMatch:
isNumbered = bool(_re.match(r'\d+[.)]', listMatch.group(2)))
items = []
while i < len(lines) and _re.match(r'^(\s*)([-*+]|\d+[.)]) (.+)', lines[i]):
m = _re.match(r'^(\s*)([-*+]|\d+[.)]) (.+)', lines[i])
items.append({"text": m.group(3).strip()})
i += 1
sections.append({
"id": _nextId(), "content_type": "bullet_list", "order": order,
"elements": [{"content": {"items": items, "list_type": "numbered" if isNumbered else "bullet"}}],
})
continue
# --- Empty lines (skip) ---
if not line.strip():
i += 1
continue
# --- Images: ![alt](file:fileId) or ![alt](url) ---
imgMatch = _re.match(r'^!\[([^\]]*)\]\(([^)]+)\)', line)
if imgMatch:
altText = imgMatch.group(1).strip() or "Image"
src = imgMatch.group(2).strip()
fileId = ""
if src.startswith("file:"):
fileId = src[5:]
sections.append({
"id": _nextId(), "content_type": "image", "order": order,
"elements": [{
"content": {
"altText": altText,
"base64Data": "",
"_fileRef": fileId,
"_srcUrl": src if not fileId else "",
}
}],
})
i += 1
continue
# --- Paragraph (collect consecutive non-empty lines) ---
paraLines = []
while i < len(lines) and lines[i].strip() and not _re.match(r'^(#{1,6}\s|```|\|.+\||!\[|(\s*)([-*+]|\d+[.)]) )', lines[i]):
paraLines.append(lines[i])
i += 1
if paraLines:
sections.append({
"id": _nextId(), "content_type": "paragraph", "order": order,
"elements": [{"content": {"text": " ".join(paraLines)}}],
})
continue
i += 1
if not sections:
sections.append({
"id": _nextId(), "content_type": "paragraph", "order": order,
"elements": [{"content": {"text": markdown.strip() or "(empty)"}}],
})
return {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "agent_rendering",
"title": title,
"language": language,
},
"documents": [{
"id": "doc_1",
"title": title,
"sections": sections,
}],
}
async def _renderDocument(args: Dict[str, Any], context: Dict[str, Any]):
"""Render agent-produced markdown content into any document format via the RendererRegistry."""
import re as _re
content = args.get("content", "")
outputFormat = args.get("outputFormat", "pdf")
title = args.get("title", "Document")
language = args.get("language", "de")
if not content:
return ToolResult(toolCallId="", toolName="renderDocument", success=False, error="content is required")
try:
structuredContent = _markdownToDocumentJson(content, title, language)
# Resolve image file references (file:fileId) to base64 data from Knowledge Store
knowledgeService = None
try:
knowledgeService = services.getService("knowledge")
except Exception:
pass
resolvedImages = 0
for doc in structuredContent.get("documents", []):
for section in doc.get("sections", []):
if section.get("content_type") != "image":
continue
for element in section.get("elements", []):
contentObj = element.get("content", {})
fileRef = contentObj.get("_fileRef", "")
if not fileRef or contentObj.get("base64Data"):
continue
if knowledgeService:
chunks = knowledgeService._knowledgeDb.getContentChunks(fileRef)
imageChunks = [c for c in (chunks or []) if c.get("contentType") == "image"]
if imageChunks:
contentObj["base64Data"] = imageChunks[0].get("data", "")
chunkMime = imageChunks[0].get("contextRef", {}).get("mimeType", "image/png")
contentObj["mimeType"] = chunkMime
resolvedImages += 1
if not contentObj.get("base64Data"):
try:
rawBytes = services.chat.getFileData(fileRef)
if rawBytes:
import base64 as _b64
contentObj["base64Data"] = _b64.b64encode(rawBytes).decode("ascii")
contentObj["mimeType"] = "image/png"
resolvedImages += 1
except Exception:
pass
contentObj.pop("_fileRef", None)
contentObj.pop("_srcUrl", None)
sectionCount = len(structuredContent.get("documents", [{}])[0].get("sections", []))
logger.info(f"renderDocument: parsed {sectionCount} sections from markdown ({len(content)} chars), resolved {resolvedImages} image(s), format={outputFormat}")
generationService = services.getService("generation")
documents = await generationService.renderReport(
extractedContent=structuredContent,
outputFormat=outputFormat,
language=language,
title=title,
userPrompt=content,
)
if not documents:
return ToolResult(toolCallId="", toolName="renderDocument", success=False, error="Rendering produced no output")
savedFiles = []
sideEvents = []
chatService = services.chat
sanitizedTitle = _re.sub(r'[^a-zA-Z0-9._-]', '_', title).strip('_') or "document"
for doc in documents:
docData = doc.documentData if hasattr(doc, "documentData") else b""
docName = doc.filename if hasattr(doc, "filename") else f"{sanitizedTitle}.{outputFormat}"
docMime = doc.mimeType if hasattr(doc, "mimeType") else "application/octet-stream"
if not docName.lower().endswith(f".{outputFormat}"):
docName = f"{sanitizedTitle}.{outputFormat}"
fileItem = None
if hasattr(chatService.interfaceDbComponent, "saveGeneratedFile"):
fileItem = chatService.interfaceDbComponent.saveGeneratedFile(docData, docName, docMime)
else:
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(docData, docName)
if fileItem:
fid = fileItem.id if hasattr(fileItem, "id") else fileItem.get("id", "?")
savedFiles.append(f"- {docName} (id: {fid})")
sideEvents.append({
"type": "fileCreated",
"data": {
"fileId": fid,
"fileName": docName,
"mimeType": docMime,
"fileSize": len(docData),
},
})
result = f"Rendered {len(documents)} document(s):\n" + "\n".join(savedFiles)
return ToolResult(toolCallId="", toolName="renderDocument", success=True, data=result, sideEvents=sideEvents)
except Exception as e:
logger.error(f"renderDocument failed: {e}")
return ToolResult(toolCallId="", toolName="renderDocument", success=False, error=str(e))
registry.register(
"renderDocument", _renderDocument,
description=(
"Render markdown content into a document file (PDF, DOCX, XLSX, PPTX, CSV, HTML, MD, JSON, TXT). "
"You write the full document content as markdown, then this tool converts and renders it. "
"To embed images from uploaded files, use markdown image syntax with the file ID: ![alt text](file:fileId). "
"The images will be resolved from the Knowledge Store and embedded in the output document."
),
parameters={
"type": "object",
"properties": {
"content": {"type": "string", "description": "Full document content as markdown (headings, tables, lists, code blocks, paragraphs, images via ![alt](file:fileId))"},
"outputFormat": {"type": "string", "description": "Target format: pdf, docx, xlsx, pptx, csv, html, md, json, txt", "default": "pdf"},
"title": {"type": "string", "description": "Document title", "default": "Document"},
"language": {"type": "string", "description": "Document language (ISO 639-1)", "default": "de"},
},
"required": ["content"],
},
readOnly=False,
)
# ── textToSpeech tool ──────────────────────────────────────────────
def _stripMarkdownForTts(text: str) -> str:
"""Strip markdown formatting so TTS reads clean speech text."""
import re as _re
t = text
t = _re.sub(r'\*\*(.+?)\*\*', r'\1', t)
t = _re.sub(r'\*(.+?)\*', r'\1', t)
t = _re.sub(r'__(.+?)__', r'\1', t)
t = _re.sub(r'_(.+?)_', r'\1', t)
t = _re.sub(r'`[^`]+`', lambda m: m.group(0)[1:-1], t)
t = _re.sub(r'^#{1,6}\s*', '', t, flags=_re.MULTILINE)
t = _re.sub(r'^\s*[-*+]\s+', '', t, flags=_re.MULTILINE)
t = _re.sub(r'^\s*\d+\.\s+', '', t, flags=_re.MULTILINE)
t = _re.sub(r'\[(.+?)\]\(.+?\)', r'\1', t)
t = _re.sub(r'!\[.*?\]\(.*?\)', '', t)
t = _re.sub(r'\n{3,}', '\n\n', t)
return t.strip()
async def _textToSpeech(args: Dict[str, Any], context: Dict[str, Any]):
"""Convert text to speech using Google Cloud TTS, deliver audio via SSE."""
import base64 as _b64
text = args.get("text", "")
language = args.get("language", "auto")
voiceName = args.get("voiceName")
if not text:
return ToolResult(toolCallId="", toolName="textToSpeech", success=False, error="text is required")
cleanText = _stripMarkdownForTts(text)
if not cleanText:
return ToolResult(toolCallId="", toolName="textToSpeech", success=False, error="text is empty after stripping markdown")
try:
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
mandateId = context.get("mandateId", "")
voiceInterface = getVoiceInterface(currentUser=None, mandateId=mandateId)
_ISO_TO_BCP47 = {
"de": "de-DE", "en": "en-US", "fr": "fr-FR", "it": "it-IT",
"es": "es-ES", "pt": "pt-BR", "nl": "nl-NL", "pl": "pl-PL",
"ru": "ru-RU", "ja": "ja-JP", "zh": "zh-CN", "ko": "ko-KR",
"ar": "ar-XA", "hi": "hi-IN", "tr": "tr-TR", "sv": "sv-SE",
}
if language == "auto":
try:
snippet = cleanText[:500]
detectResult = await voiceInterface.detectLanguage(snippet)
if detectResult and detectResult.get("success"):
detected = detectResult.get("language", "de")
language = _ISO_TO_BCP47.get(detected, detected)
if "-" not in language:
language = _ISO_TO_BCP47.get(language, f"{language}-{language.upper()}")
logger.info(f"textToSpeech: auto-detected language '{detected}' -> '{language}'")
else:
language = "de-DE"
except Exception as detectErr:
logger.warning(f"textToSpeech: language detection failed: {detectErr}, defaulting to de-DE")
language = "de-DE"
if not voiceName:
try:
featureInstanceId = context.get("featureInstanceId", "")
userId = context.get("userId", "")
if featureInstanceId and userId:
dbMgmt = services.chat.interfaceDbApp if hasattr(services.chat, "interfaceDbApp") else None
if dbMgmt and hasattr(dbMgmt, "getVoiceSettings"):
vs = dbMgmt.getVoiceSettings(userId)
if vs:
voiceMap = {}
if hasattr(vs, "ttsVoiceMap") and vs.ttsVoiceMap:
voiceMap = vs.ttsVoiceMap if isinstance(vs.ttsVoiceMap, dict) else {}
if language in voiceMap:
voiceName = voiceMap[language].get("voiceName") if isinstance(voiceMap[language], dict) else voiceMap[language]
logger.info(f"textToSpeech: using configured voice '{voiceName}' for {language}")
elif hasattr(vs, "ttsVoice") and vs.ttsVoice and hasattr(vs, "ttsLanguage") and vs.ttsLanguage == language:
voiceName = vs.ttsVoice
except Exception as prefErr:
logger.debug(f"textToSpeech: could not load voice preferences: {prefErr}")
ttsResult = await voiceInterface.textToSpeech(
text=cleanText,
languageCode=language,
voiceName=voiceName,
)
if not ttsResult or not ttsResult.get("success"):
errMsg = ttsResult.get("error", "TTS call failed") if ttsResult else "TTS returned None"
return ToolResult(toolCallId="", toolName="textToSpeech", success=False, error=errMsg)
audioContent = ttsResult.get("audioContent", "")
if not audioContent:
return ToolResult(toolCallId="", toolName="textToSpeech", success=False, error="TTS returned no audio")
if isinstance(audioContent, bytes):
audioB64 = _b64.b64encode(audioContent).decode("ascii")
elif isinstance(audioContent, str):
audioB64 = audioContent
else:
audioB64 = str(audioContent)
audioFormat = ttsResult.get("audioFormat", "mp3")
charCount = len(cleanText)
usedVoice = voiceName or "default"
logger.info(f"textToSpeech: generated {audioFormat} audio for {charCount} chars, language={language}, voice={usedVoice}")
return ToolResult(
toolCallId="", toolName="textToSpeech", success=True,
data=f"Audio generated ({charCount} characters, language={language}, voice={usedVoice}). Playing in chat.",
sideEvents=[{
"type": "voiceResponse",
"data": {
"audio": audioB64,
"format": audioFormat,
"language": language,
"charCount": charCount,
},
}],
)
except ImportError:
return ToolResult(toolCallId="", toolName="textToSpeech", success=False,
error="Voice interface not available (missing dependency)")
except Exception as e:
logger.error(f"textToSpeech failed: {e}")
return ToolResult(toolCallId="", toolName="textToSpeech", success=False, error=str(e))
registry.register(
"textToSpeech", _textToSpeech,
description=(
"Convert text to speech audio. The audio is played directly in the chat. "
"Use this when the user asks you to read something aloud, narrate, or speak. "
"Language is auto-detected from the text content. You do NOT need to specify a language."
),
parameters={
"type": "object",
"properties": {
"text": {"type": "string", "description": "The text to convert to speech. Can include markdown (will be stripped automatically)."},
"language": {"type": "string", "description": "BCP-47 language code (e.g. de-DE, en-US) or 'auto' for automatic detection", "default": "auto"},
"voiceName": {"type": "string", "description": "Optional specific voice name. If omitted, uses the configured voice for the detected language."},
},
"required": ["text"],
},
readOnly=False,
)