diff --git a/modules/connectors/providerGoogle/connectorGoogle.py b/modules/connectors/providerGoogle/connectorGoogle.py index 92340883..216b9019 100644 --- a/modules/connectors/providerGoogle/connectorGoogle.py +++ b/modules/connectors/providerGoogle/connectorGoogle.py @@ -113,7 +113,13 @@ class DriveAdapter(ServiceAdapter): async def search(self, query: str, path: Optional[str] = None) -> List[ExternalEntry]: safeQuery = query.replace("'", "\\'") - url = f"{_DRIVE_BASE}/files?q=name contains '{safeQuery}' and trashed=false&fields=files(id,name,mimeType,size)&pageSize=25" + folderId = (path or "").strip("/") + qParts = [f"name contains '{safeQuery}'", "trashed=false"] + if folderId: + qParts.append(f"'{folderId}' in parents") + qStr = " and ".join(qParts) + url = f"{_DRIVE_BASE}/files?q={qStr}&fields=files(id,name,mimeType,size)&pageSize=25" + logger.debug(f"Google Drive search: q={qStr}") result = await _googleGet(self._token, url) if "error" in result: return [] diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py index 87afee2b..0103d4ed 100644 --- a/modules/features/workspace/routeFeatureWorkspace.py +++ b/modules/features/workspace/routeFeatureWorkspace.py @@ -112,8 +112,11 @@ def _buildDataSourceContext(chatService, dataSourceIds: List[str]) -> str: """Build a description of active data sources for the agent prompt.""" parts = [ "The user has attached the following external data sources to this prompt.", - "IMPORTANT: Use the dataSourceId (UUID) exactly as shown below when calling browseDataSource or searchDataSource.", - "Use downloadFromDataSource to download a specific file into local storage.", + "IMPORTANT RULES for attached data sources:", + "- Use ONLY browseDataSource, searchDataSource, and downloadFromDataSource to access these sources.", + "- Use the dataSourceId (UUID) exactly as shown below.", + "- Do NOT use listFiles, externalBrowse, or externalSearch for attached data sources -- those tools are for other purposes.", + "- browseDataSource returns BOTH files and folders at the given path.", "", ] found = False @@ -139,12 +142,32 @@ def _buildDataSourceContext(chatService, dataSourceIds: List[str]) -> str: return "\n".join(parts) if found else "" -def _deriveWorkflowName(prompt: str, maxLen: int = 40) -> str: - """Derive a short workflow name from the user's first prompt.""" - clean = " ".join(prompt.split()) - if len(clean) <= maxLen: - return clean - return clean[:maxLen].rsplit(" ", 1)[0] + "..." +async def _deriveWorkflowName(prompt: str, aiService) -> str: + """Use AI to generate a concise workflow title from the user prompt.""" + from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum + try: + cleanPrompt = prompt.split("\n[Active Data Sources]")[0].strip()[:300] + req = AiCallRequest( + prompt=( + "Generate a short title (3-6 words) for a chat conversation that starts with this user message. " + "Reply with ONLY the title, nothing else. Same language as the user message.\n\n" + f"User message: {cleanPrompt}" + ), + options=AiCallOptions( + operationType=OperationTypeEnum.DATA_EXTRACT, + priority=PriorityEnum.SPEED, + compressPrompt=False, + temperature=0.3, + ), + ) + resp = await aiService.callAi(req) + title = (resp.content or "").strip().strip('"\'').strip() + if title and len(title) <= 60: + return title + except Exception as e: + logger.warning(f"AI naming failed, using fallback: {e}") + text = prompt.split("\n[Active Data Sources]")[0].split("\n")[0].strip()[:50] + return text or "Chat" # --------------------------------------------------------------------------- @@ -170,11 +193,10 @@ async def streamWorkspaceStart( if not workflow: raise HTTPException(status_code=404, detail=f"Workflow {userInput.workflowId} not found") else: - autoName = _deriveWorkflowName(userInput.prompt) workflow = chatInterface.createWorkflow({ "featureInstanceId": instanceId, "status": "active", - "name": autoName, + "name": "", "workflowMode": "Dynamic", }) @@ -267,6 +289,25 @@ async def _runWorkspaceAgent( ) agentService = getService("agent", ctx) chatService = getService("chat", ctx) + aiService = getService("ai", ctx) + + wfRecord = chatInterface.getWorkflow(workflowId) if workflowId else None + wfName = "" + if wfRecord: + wfName = wfRecord.get("name", "") if isinstance(wfRecord, dict) else getattr(wfRecord, "name", "") + if not wfName.strip() or wfName.startswith("Neuer Chat"): + async def _nameInBackground(): + try: + autoName = await _deriveWorkflowName(prompt, aiService) + chatInterface.updateWorkflow(workflowId, {"name": autoName}) + await eventManager.emit_event(queueId, "workflowUpdated", { + "type": "workflowUpdated", + "workflowId": workflowId, + "name": autoName, + }) + except Exception as nameErr: + logger.warning(f"AI workflow naming failed: {nameErr}") + asyncio.ensure_future(_nameInBackground()) enrichedPrompt = prompt if dataSourceIds: @@ -300,12 +341,16 @@ async def _runWorkspaceAgent( if event.type in (AgentEventTypeEnum.FINAL, AgentEventTypeEnum.ERROR): if event.content: - chatInterface.createMessage({ - "workflowId": workflowId, - "role": "assistant", - "message": event.content, - }) + try: + chatInterface.createMessage({ + "workflowId": workflowId, + "role": "assistant", + "message": event.content, + }) + except Exception as msgErr: + logger.error(f"Failed to persist assistant message: {msgErr}") + logger.info(f"Agent loop completed for workflow {workflowId}, sending 'complete' event") await eventManager.emit_event(queueId, "complete", { "type": "complete", "workflowId": workflowId, diff --git a/modules/serviceCenter/core/serviceStreaming/eventManager.py b/modules/serviceCenter/core/serviceStreaming/eventManager.py index 2c5c3979..21f7cf36 100644 --- a/modules/serviceCenter/core/serviceStreaming/eventManager.py +++ b/modules/serviceCenter/core/serviceStreaming/eventManager.py @@ -33,9 +33,22 @@ class EventManager: Returns: Async queue for events """ + if workflow_id in self._cleanup_tasks: + self._cleanup_tasks[workflow_id].cancel() + del self._cleanup_tasks[workflow_id] + logger.debug(f"Cancelled pending cleanup for workflow {workflow_id}") + if workflow_id not in self._queues: self._queues[workflow_id] = asyncio.Queue() logger.debug(f"Created event queue for workflow {workflow_id}") + else: + old = self._queues[workflow_id] + while not old.empty(): + try: + old.get_nowait() + except asyncio.QueueEmpty: + break + logger.debug(f"Reusing event queue for workflow {workflow_id} (drained stale events)") return self._queues[workflow_id] def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]: diff --git a/modules/serviceCenter/services/serviceAgent/agentLoop.py b/modules/serviceCenter/services/serviceAgent/agentLoop.py index 4bd66b6d..c6f426d7 100644 --- a/modules/serviceCenter/services/serviceAgent/agentLoop.py +++ b/modules/serviceCenter/services/serviceAgent/agentLoop.py @@ -107,6 +107,7 @@ async def runAgentLoop( ) break + logger.info(f"Agent round {state.currentRound}/{state.maxRounds} for workflow {workflowId} (tools={state.totalToolCalls}, cost={state.totalCostCHF:.4f})") yield AgentEvent( type=AgentEventTypeEnum.AGENT_PROGRESS, data={ diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py index d20ee9ff..ed93c3b7 100644 --- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py +++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py @@ -142,7 +142,9 @@ class AgentService: ): if event.type == AgentEventTypeEnum.AGENT_SUMMARY: await self._persistTrace(workflowId, event.data or {}) + logger.debug(f"runAgent yielding event type={event.type}") yield event + logger.info(f"runAgent loop completed for workflow {workflowId}") async def _enrichPromptWithFiles(self, prompt: str, fileIds: List[str] = None) -> str: """Resolve file metadata + FileContentIndex for attached fileIds and prepend to prompt. @@ -586,7 +588,7 @@ def _registerCoreTools(registry: ToolRegistry, services): registry.register( "listFiles", _listFiles, - description="List files with optional filters (folder, tags, search text).", + description="List LOCAL workspace files (uploaded/generated). NOT for external data sources -- use browseDataSource instead.", parameters={ "type": "object", "properties": { @@ -600,7 +602,7 @@ def _registerCoreTools(registry: ToolRegistry, services): registry.register( "searchFiles", _searchFiles, - description="Search files by name, description, or tags.", + description="Search LOCAL workspace files by name, description, or tags. NOT for external data sources -- use searchDataSource instead.", parameters={ "type": "object", "properties": { @@ -614,7 +616,7 @@ def _registerCoreTools(registry: ToolRegistry, services): registry.register( "listFolders", _listFolders, - description="List file folders. Use parentId to browse folder hierarchy.", + description="List LOCAL workspace folders. NOT for external data sources -- use browseDataSource instead.", parameters={ "type": "object", "properties": { @@ -859,7 +861,7 @@ def _registerCoreTools(registry: ToolRegistry, services): registry.register( "externalBrowse", _externalBrowse, - description="Browse files and folders in an external data source (SharePoint, Drive, FTP).", + description="Browse files in an external source by connectionId+service. For ATTACHED data sources, prefer browseDataSource instead.", parameters={ "type": "object", "properties": { @@ -903,7 +905,7 @@ def _registerCoreTools(registry: ToolRegistry, services): registry.register( "externalSearch", _externalSearch, - description="Search for files in an external data source.", + description="Search files in an external source by connectionId+service. For ATTACHED data sources, prefer searchDataSource instead.", parameters={ "type": "object", "properties": { @@ -964,7 +966,10 @@ def _registerCoreTools(registry: ToolRegistry, services): return ToolResult(toolCallId="", toolName="browseDataSource", success=False, error="dataSourceId is required") try: connectionId, service, basePath = await _resolveDataSource(dsId) - browsePath = f"{basePath.rstrip('/')}/{subPath.lstrip('/')}" if subPath else basePath + if subPath: + browsePath = subPath + else: + browsePath = basePath from modules.connectors.connectorResolver import ConnectorResolver resolver = ConnectorResolver( services.getService("security"), @@ -1035,7 +1040,7 @@ def _registerCoreTools(registry: ToolRegistry, services): registry.register( "browseDataSource", _browseDataSource, - description="Browse files and folders in an attached data source by its dataSourceId. Returns file/folder listing.", + description="Browse files AND folders in an ATTACHED data source by its dataSourceId. This is the PRIMARY tool for listing data source contents.", parameters={ "type": "object", "properties": { @@ -1610,7 +1615,7 @@ def _registerCoreTools(registry: ToolRegistry, services): sideEvents = [] chatService = services.chat - sanitizedTitle = _re.sub(r'[^a-zA-Z0-9._-]', '_', title).strip('_') or "document" + sanitizedTitle = _re.sub(r'[^\w._-]', '_', title, flags=_re.UNICODE).strip('_') or "document" for doc in documents: docData = doc.documentData if hasattr(doc, "documentData") else b"" diff --git a/modules/serviceCenter/services/serviceAgent/toolRegistry.py b/modules/serviceCenter/services/serviceAgent/toolRegistry.py index 65335d00..625001fb 100644 --- a/modules/serviceCenter/services/serviceAgent/toolRegistry.py +++ b/modules/serviceCenter/services/serviceAgent/toolRegistry.py @@ -86,6 +86,8 @@ class ToolRegistry: ) handler = self._handlers[toolCall.name] + argsSummary = ", ".join(f"{k}={str(v)[:80]}" for k, v in (toolCall.args or {}).items()) + logger.info(f"Tool dispatch: {toolCall.name}({argsSummary})") try: result = await handler(toolCall.args, context or {}) durationMs = int((time.time() - startTime) * 1000) @@ -93,6 +95,11 @@ class ToolRegistry: if isinstance(result, ToolResult): result.toolCallId = toolCall.id result.durationMs = durationMs + dataSummary = (result.data[:200] + "...") if result.data and len(result.data) > 200 else (result.data or "") + if result.success: + logger.info(f"Tool result: {toolCall.name} OK ({durationMs}ms) → {dataSummary}") + else: + logger.warning(f"Tool result: {toolCall.name} FAILED ({durationMs}ms) → {result.error}") return result return ToolResult(