From b418207c2c4e53814deb7a741c08b5bd17cff068 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 16 Mar 2026 14:26:42 +0100
Subject: [PATCH] workflow fixes
---
.../providerGoogle/connectorGoogle.py | 8 +-
.../workspace/routeFeatureWorkspace.py | 75 +++++++++++++++----
.../core/serviceStreaming/eventManager.py | 13 ++++
.../services/serviceAgent/agentLoop.py | 1 +
.../services/serviceAgent/mainServiceAgent.py | 21 ++++--
.../services/serviceAgent/toolRegistry.py | 7 ++
6 files changed, 101 insertions(+), 24 deletions(-)
diff --git a/modules/connectors/providerGoogle/connectorGoogle.py b/modules/connectors/providerGoogle/connectorGoogle.py
index 92340883..216b9019 100644
--- a/modules/connectors/providerGoogle/connectorGoogle.py
+++ b/modules/connectors/providerGoogle/connectorGoogle.py
@@ -113,7 +113,13 @@ class DriveAdapter(ServiceAdapter):
async def search(self, query: str, path: Optional[str] = None) -> List[ExternalEntry]:
safeQuery = query.replace("'", "\\'")
- url = f"{_DRIVE_BASE}/files?q=name contains '{safeQuery}' and trashed=false&fields=files(id,name,mimeType,size)&pageSize=25"
+ folderId = (path or "").strip("/")
+ qParts = [f"name contains '{safeQuery}'", "trashed=false"]
+ if folderId:
+ qParts.append(f"'{folderId}' in parents")
+ qStr = " and ".join(qParts)
+ url = f"{_DRIVE_BASE}/files?q={qStr}&fields=files(id,name,mimeType,size)&pageSize=25"
+ logger.debug(f"Google Drive search: q={qStr}")
result = await _googleGet(self._token, url)
if "error" in result:
return []
diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py
index 87afee2b..0103d4ed 100644
--- a/modules/features/workspace/routeFeatureWorkspace.py
+++ b/modules/features/workspace/routeFeatureWorkspace.py
@@ -112,8 +112,11 @@ def _buildDataSourceContext(chatService, dataSourceIds: List[str]) -> str:
"""Build a description of active data sources for the agent prompt."""
parts = [
"The user has attached the following external data sources to this prompt.",
- "IMPORTANT: Use the dataSourceId (UUID) exactly as shown below when calling browseDataSource or searchDataSource.",
- "Use downloadFromDataSource to download a specific file into local storage.",
+ "IMPORTANT RULES for attached data sources:",
+ "- Use ONLY browseDataSource, searchDataSource, and downloadFromDataSource to access these sources.",
+ "- Use the dataSourceId (UUID) exactly as shown below.",
+ "- Do NOT use listFiles, externalBrowse, or externalSearch for attached data sources -- those tools are for other purposes.",
+ "- browseDataSource returns BOTH files and folders at the given path.",
"",
]
found = False
@@ -139,12 +142,32 @@ def _buildDataSourceContext(chatService, dataSourceIds: List[str]) -> str:
return "\n".join(parts) if found else ""
-def _deriveWorkflowName(prompt: str, maxLen: int = 40) -> str:
- """Derive a short workflow name from the user's first prompt."""
- clean = " ".join(prompt.split())
- if len(clean) <= maxLen:
- return clean
- return clean[:maxLen].rsplit(" ", 1)[0] + "..."
+async def _deriveWorkflowName(prompt: str, aiService) -> str:
+ """Use AI to generate a concise workflow title from the user prompt."""
+ from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum
+ try:
+ cleanPrompt = prompt.split("\n[Active Data Sources]")[0].strip()[:300]
+ req = AiCallRequest(
+ prompt=(
+ "Generate a short title (3-6 words) for a chat conversation that starts with this user message. "
+ "Reply with ONLY the title, nothing else. Same language as the user message.\n\n"
+ f"User message: {cleanPrompt}"
+ ),
+ options=AiCallOptions(
+ operationType=OperationTypeEnum.DATA_EXTRACT,
+ priority=PriorityEnum.SPEED,
+ compressPrompt=False,
+ temperature=0.3,
+ ),
+ )
+ resp = await aiService.callAi(req)
+ title = (resp.content or "").strip().strip('"\'').strip()
+ if title and len(title) <= 60:
+ return title
+ except Exception as e:
+ logger.warning(f"AI naming failed, using fallback: {e}")
+ text = prompt.split("\n[Active Data Sources]")[0].split("\n")[0].strip()[:50]
+ return text or "Chat"
# ---------------------------------------------------------------------------
@@ -170,11 +193,10 @@ async def streamWorkspaceStart(
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {userInput.workflowId} not found")
else:
- autoName = _deriveWorkflowName(userInput.prompt)
workflow = chatInterface.createWorkflow({
"featureInstanceId": instanceId,
"status": "active",
- "name": autoName,
+ "name": "",
"workflowMode": "Dynamic",
})
@@ -267,6 +289,25 @@ async def _runWorkspaceAgent(
)
agentService = getService("agent", ctx)
chatService = getService("chat", ctx)
+ aiService = getService("ai", ctx)
+
+ wfRecord = chatInterface.getWorkflow(workflowId) if workflowId else None
+ wfName = ""
+ if wfRecord:
+ wfName = wfRecord.get("name", "") if isinstance(wfRecord, dict) else getattr(wfRecord, "name", "")
+ if not wfName.strip() or wfName.startswith("Neuer Chat"):
+ async def _nameInBackground():
+ try:
+ autoName = await _deriveWorkflowName(prompt, aiService)
+ chatInterface.updateWorkflow(workflowId, {"name": autoName})
+ await eventManager.emit_event(queueId, "workflowUpdated", {
+ "type": "workflowUpdated",
+ "workflowId": workflowId,
+ "name": autoName,
+ })
+ except Exception as nameErr:
+ logger.warning(f"AI workflow naming failed: {nameErr}")
+ asyncio.ensure_future(_nameInBackground())
enrichedPrompt = prompt
if dataSourceIds:
@@ -300,12 +341,16 @@ async def _runWorkspaceAgent(
if event.type in (AgentEventTypeEnum.FINAL, AgentEventTypeEnum.ERROR):
if event.content:
- chatInterface.createMessage({
- "workflowId": workflowId,
- "role": "assistant",
- "message": event.content,
- })
+ try:
+ chatInterface.createMessage({
+ "workflowId": workflowId,
+ "role": "assistant",
+ "message": event.content,
+ })
+ except Exception as msgErr:
+ logger.error(f"Failed to persist assistant message: {msgErr}")
+ logger.info(f"Agent loop completed for workflow {workflowId}, sending 'complete' event")
await eventManager.emit_event(queueId, "complete", {
"type": "complete",
"workflowId": workflowId,
diff --git a/modules/serviceCenter/core/serviceStreaming/eventManager.py b/modules/serviceCenter/core/serviceStreaming/eventManager.py
index 2c5c3979..21f7cf36 100644
--- a/modules/serviceCenter/core/serviceStreaming/eventManager.py
+++ b/modules/serviceCenter/core/serviceStreaming/eventManager.py
@@ -33,9 +33,22 @@ class EventManager:
Returns:
Async queue for events
"""
+ if workflow_id in self._cleanup_tasks:
+ self._cleanup_tasks[workflow_id].cancel()
+ del self._cleanup_tasks[workflow_id]
+ logger.debug(f"Cancelled pending cleanup for workflow {workflow_id}")
+
if workflow_id not in self._queues:
self._queues[workflow_id] = asyncio.Queue()
logger.debug(f"Created event queue for workflow {workflow_id}")
+ else:
+ old = self._queues[workflow_id]
+ while not old.empty():
+ try:
+ old.get_nowait()
+ except asyncio.QueueEmpty:
+ break
+ logger.debug(f"Reusing event queue for workflow {workflow_id} (drained stale events)")
return self._queues[workflow_id]
def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]:
diff --git a/modules/serviceCenter/services/serviceAgent/agentLoop.py b/modules/serviceCenter/services/serviceAgent/agentLoop.py
index 4bd66b6d..c6f426d7 100644
--- a/modules/serviceCenter/services/serviceAgent/agentLoop.py
+++ b/modules/serviceCenter/services/serviceAgent/agentLoop.py
@@ -107,6 +107,7 @@ async def runAgentLoop(
)
break
+ logger.info(f"Agent round {state.currentRound}/{state.maxRounds} for workflow {workflowId} (tools={state.totalToolCalls}, cost={state.totalCostCHF:.4f})")
yield AgentEvent(
type=AgentEventTypeEnum.AGENT_PROGRESS,
data={
diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
index d20ee9ff..ed93c3b7 100644
--- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
+++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
@@ -142,7 +142,9 @@ class AgentService:
):
if event.type == AgentEventTypeEnum.AGENT_SUMMARY:
await self._persistTrace(workflowId, event.data or {})
+ logger.debug(f"runAgent yielding event type={event.type}")
yield event
+ logger.info(f"runAgent loop completed for workflow {workflowId}")
async def _enrichPromptWithFiles(self, prompt: str, fileIds: List[str] = None) -> str:
"""Resolve file metadata + FileContentIndex for attached fileIds and prepend to prompt.
@@ -586,7 +588,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
registry.register(
"listFiles", _listFiles,
- description="List files with optional filters (folder, tags, search text).",
+ description="List LOCAL workspace files (uploaded/generated). NOT for external data sources -- use browseDataSource instead.",
parameters={
"type": "object",
"properties": {
@@ -600,7 +602,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
registry.register(
"searchFiles", _searchFiles,
- description="Search files by name, description, or tags.",
+ description="Search LOCAL workspace files by name, description, or tags. NOT for external data sources -- use searchDataSource instead.",
parameters={
"type": "object",
"properties": {
@@ -614,7 +616,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
registry.register(
"listFolders", _listFolders,
- description="List file folders. Use parentId to browse folder hierarchy.",
+ description="List LOCAL workspace folders. NOT for external data sources -- use browseDataSource instead.",
parameters={
"type": "object",
"properties": {
@@ -859,7 +861,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
registry.register(
"externalBrowse", _externalBrowse,
- description="Browse files and folders in an external data source (SharePoint, Drive, FTP).",
+ description="Browse files in an external source by connectionId+service. For ATTACHED data sources, prefer browseDataSource instead.",
parameters={
"type": "object",
"properties": {
@@ -903,7 +905,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
registry.register(
"externalSearch", _externalSearch,
- description="Search for files in an external data source.",
+ description="Search files in an external source by connectionId+service. For ATTACHED data sources, prefer searchDataSource instead.",
parameters={
"type": "object",
"properties": {
@@ -964,7 +966,10 @@ def _registerCoreTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="browseDataSource", success=False, error="dataSourceId is required")
try:
connectionId, service, basePath = await _resolveDataSource(dsId)
- browsePath = f"{basePath.rstrip('/')}/{subPath.lstrip('/')}" if subPath else basePath
+ if subPath:
+ browsePath = subPath
+ else:
+ browsePath = basePath
from modules.connectors.connectorResolver import ConnectorResolver
resolver = ConnectorResolver(
services.getService("security"),
@@ -1035,7 +1040,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
registry.register(
"browseDataSource", _browseDataSource,
- description="Browse files and folders in an attached data source by its dataSourceId. Returns file/folder listing.",
+ description="Browse files AND folders in an ATTACHED data source by its dataSourceId. This is the PRIMARY tool for listing data source contents.",
parameters={
"type": "object",
"properties": {
@@ -1610,7 +1615,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
sideEvents = []
chatService = services.chat
- sanitizedTitle = _re.sub(r'[^a-zA-Z0-9._-]', '_', title).strip('_') or "document"
+ sanitizedTitle = _re.sub(r'[^\w._-]', '_', title, flags=_re.UNICODE).strip('_') or "document"
for doc in documents:
docData = doc.documentData if hasattr(doc, "documentData") else b""
diff --git a/modules/serviceCenter/services/serviceAgent/toolRegistry.py b/modules/serviceCenter/services/serviceAgent/toolRegistry.py
index 65335d00..625001fb 100644
--- a/modules/serviceCenter/services/serviceAgent/toolRegistry.py
+++ b/modules/serviceCenter/services/serviceAgent/toolRegistry.py
@@ -86,6 +86,8 @@ class ToolRegistry:
)
handler = self._handlers[toolCall.name]
+ argsSummary = ", ".join(f"{k}={str(v)[:80]}" for k, v in (toolCall.args or {}).items())
+ logger.info(f"Tool dispatch: {toolCall.name}({argsSummary})")
try:
result = await handler(toolCall.args, context or {})
durationMs = int((time.time() - startTime) * 1000)
@@ -93,6 +95,11 @@ class ToolRegistry:
if isinstance(result, ToolResult):
result.toolCallId = toolCall.id
result.durationMs = durationMs
+ dataSummary = (result.data[:200] + "...") if result.data and len(result.data) > 200 else (result.data or "")
+ if result.success:
+ logger.info(f"Tool result: {toolCall.name} OK ({durationMs}ms) → {dataSummary}")
+ else:
+ logger.warning(f"Tool result: {toolCall.name} FAILED ({durationMs}ms) → {result.error}")
return result
return ToolResult(