# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Unified AI Workspace routes. SSE-based endpoints for the agent-driven AI Workspace. """ import logging import json import asyncio import uuid from typing import Any, Dict, Optional, List from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request, UploadFile, File from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field from modules.auth import limiter, getRequestContext, RequestContext from modules.serviceCenter.services.serviceBilling.mainServiceBilling import ( InsufficientBalanceException, ) from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import ( SubscriptionInactiveException, ) from modules.interfaces import interfaceDbChat, interfaceDbManagement from modules.features.workspace import interfaceFeatureWorkspace from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface from modules.interfaces.interfaceAiObjects import AiObjects from modules.serviceCenter.core.serviceStreaming import get_event_manager from modules.serviceCenter.services.serviceAgent.datamodelAgent import AgentEventTypeEnum, PendingFileEdit from modules.shared.timeUtils import parseTimestamp logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/workspace", tags=["Unified Workspace"], responses={404: {"description": "Not found"}}, ) _aiObjects: Optional[AiObjects] = None class _InstanceEdits: """Pending file edits for a single workspace instance.""" def __init__(self): self._edits: Dict[str, PendingFileEdit] = {} def add(self, edit: PendingFileEdit) -> None: self._edits[edit.id] = edit def get(self, editId: str) -> Optional[PendingFileEdit]: return self._edits.get(editId) def getPending(self) -> List[PendingFileEdit]: return [e for e in self._edits.values() if e.status == "pending"] def items(self): return self._edits.items() class _PendingEditsStore: """Global store for pending file edits across all workspace instances.""" def __init__(self): self._instances: Dict[str, _InstanceEdits] = {} def forInstance(self, instanceId: str) -> _InstanceEdits: """Get-or-create the edit collection for a workspace instance.""" if instanceId not in self._instances: self._instances[instanceId] = _InstanceEdits() return self._instances[instanceId] _pendingEditsStore = _PendingEditsStore() def _workspaceBillingFeatureCode(user, mandateId: Optional[str], instanceId: str) -> Optional[str]: """Resolve FeatureInstance.featureCode for billing/UI when workflow is not on ServiceCenterContext.""" if not instanceId or not str(instanceId).strip(): return None try: from modules.interfaces.interfaceDbApp import getInterface as getAppInterface appIf = getAppInterface(user, mandateId=mandateId or None) inst = appIf.getFeatureInstance(str(instanceId).strip()) if not inst: return None if isinstance(inst, dict): code = inst.get("featureCode") else: code = getattr(inst, "featureCode", None) return str(code).strip() if code else None except Exception as e: logger.debug("Workspace: feature code lookup failed for instance %s: %s", instanceId, e) return None class WorkspaceInputRequest(BaseModel): """Prompt input for the unified workspace.""" prompt: str = Field(description="User prompt text") fileIds: List[str] = Field(default_factory=list, description="Referenced file IDs") uploadedFiles: List[str] = Field(default_factory=list, description="Newly uploaded file IDs") dataSourceIds: List[str] = Field(default_factory=list, description="Active DataSource IDs") featureDataSourceIds: List[str] = Field(default_factory=list, description="Attached FeatureDataSource IDs") voiceMode: bool = Field(default=False, description="Enable voice response") workflowId: Optional[str] = Field(default=None, description="Continue existing workflow") userLanguage: str = Field(default="en", description="User language code") allowedProviders: List[str] = Field(default_factory=list, description="Restrict AI to these providers") requireNeutralization: Optional[bool] = Field(default=None, description="Per-request neutralization override") async def _getAiObjects() -> AiObjects: global _aiObjects if _aiObjects is None: _aiObjects = await AiObjects.create() return _aiObjects def _validateInstanceAccess(instanceId: str, context: RequestContext): """Validate access and return (mandateId, instanceConfig) tuple.""" from modules.interfaces.interfaceDbApp import getRootInterface rootInterface = getRootInterface() instance = rootInterface.getFeatureInstance(instanceId) if not instance: raise HTTPException(status_code=404, detail=f"Feature instance {instanceId} not found") featureAccess = rootInterface.getFeatureAccess(str(context.user.id), instanceId) if not featureAccess or not featureAccess.enabled: raise HTTPException(status_code=403, detail="Access denied to this feature instance") mandateId = str(instance.mandateId) if instance.mandateId else None instanceConfig = instance.config if hasattr(instance, "config") and instance.config else {} return mandateId, instanceConfig def _getChatInterface(context: RequestContext, featureInstanceId: str = None): return interfaceDbChat.getInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=featureInstanceId, ) def _buildResolverDbInterface(chatService): """Build a DB adapter that ConnectorResolver can use to load UserConnections. ConnectorResolver calls db.getUserConnection(connectionId). interfaceDbApp provides getUserConnectionById(connectionId). This adapter bridges the method name difference. """ class _ResolverDbAdapter: def __init__(self, appInterface): self._app = appInterface def getUserConnection(self, connectionId: str): if hasattr(self._app, "getUserConnectionById"): return self._app.getUserConnectionById(connectionId) return None appIf = getattr(chatService, "interfaceDbApp", None) if appIf: return _ResolverDbAdapter(appIf) return getattr(chatService, "interfaceDbComponent", None) def _getDbManagement(context: RequestContext, featureInstanceId: str = None): return interfaceDbManagement.getInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=featureInstanceId, ) def _getWorkspaceInterface(context: RequestContext, featureInstanceId: str = None): return interfaceFeatureWorkspace.getInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=featureInstanceId, ) _SOURCE_TYPE_TO_SERVICE = { "sharepointFolder": "sharepoint", "onedriveFolder": "onedrive", "outlookFolder": "outlook", "googleDriveFolder": "drive", "gmailFolder": "gmail", "ftpFolder": "files", "clickupList": "clickup", } def _buildDataSourceContext(chatService, dataSourceIds: List[str]) -> str: """Build a description of active data sources for the agent prompt.""" parts = [ "The user has attached the following external data sources to this prompt.", "IMPORTANT RULES for attached data sources:", "- Use ONLY browseDataSource, searchDataSource, and downloadFromDataSource to access these sources.", "- Use the dataSourceId (UUID) exactly as shown below.", "- Do NOT use listFiles, externalBrowse, or externalSearch for attached data sources -- those tools are for other purposes.", "- browseDataSource returns BOTH files and folders at the given path.", "- When downloading files, ALWAYS provide the human-readable fileName (with extension) from the browse results.", "", ] found = False for dsId in dataSourceIds: try: ds = chatService.getDataSource(dsId) if hasattr(chatService, "getDataSource") else None if ds: found = True label = ds.get("label", "") sourceType = ds.get("sourceType", "") connectionId = ds.get("connectionId", "") path = ds.get("path", "/") service = _SOURCE_TYPE_TO_SERVICE.get(sourceType, sourceType) logger.info(f"DataSource context: id={dsId}, label={label}, sourceType={sourceType}, service={service}, connectionId={connectionId}, path={path[:80]}") parts.append( f"- dataSourceId: {dsId}\n" f" label: \"{label}\"\n" f" type: {sourceType} (service: {service})\n" f" connectionId: {connectionId}\n" f" path: {path}" ) else: logger.warning(f"DataSource {dsId} not found in DB") except Exception as e: logger.warning(f"Error loading DataSource {dsId}: {e}") return "\n".join(parts) if found else "" def _buildFeatureDataSourceContext(featureDataSourceIds: List[str]) -> str: """Build a description of attached feature data sources for the agent prompt.""" from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource from modules.security.rbacCatalog import getCatalogService from modules.interfaces.interfaceDbApp import getRootInterface parts = [ "The user has attached data from the following feature instances.", "Use queryFeatureInstance(featureInstanceId, question) to query this data.", "", ] found = False catalog = getCatalogService() rootIf = getRootInterface() instanceCache: Dict[str, Any] = {} for fdsId in featureDataSourceIds: try: records = rootIf.db.getRecordset(FeatureDataSource, recordFilter={"id": fdsId}) if not records: logger.warning(f"FeatureDataSource {fdsId} not found") continue fds = records[0] found = True fiId = fds.get("featureInstanceId", "") featureCode = fds.get("featureCode", "") tableName = fds.get("tableName", "") label = fds.get("label", tableName) if fiId not in instanceCache: inst = rootIf.getFeatureInstance(fiId) instanceCache[fiId] = inst inst = instanceCache.get(fiId) instanceLabel = getattr(inst, "label", fiId) if inst else fiId dataObj = catalog.getDataObjects(featureCode) tableFields = [] for obj in dataObj: if obj.get("meta", {}).get("table") == tableName: tableFields = obj.get("meta", {}).get("fields", []) break recordFilter = fds.get("recordFilter") filterLine = "" if recordFilter and isinstance(recordFilter, dict): filterParts = [f"{k} = {v}" for k, v in recordFilter.items()] filterLine = f"\n recordFilter: {', '.join(filterParts)} (data is scoped to this record)" parts.append( f"- featureInstanceId: {fiId}\n" f" feature: {featureCode}\n" f" instance: \"{instanceLabel}\"\n" f" table: {tableName} ({label})\n" f" fields: {', '.join(tableFields) if tableFields else 'all'}" f"{filterLine}" ) except Exception as e: logger.warning(f"Error loading FeatureDataSource {fdsId}: {e}") return "\n".join(parts) if found else "" def _workspaceFilesToChatDocuments(dbMgmt, fileIds: List[str]) -> List[Dict[str, Any]]: """Build ChatDocument payloads for workspace files referenced on the user message.""" documents: List[Dict[str, Any]] = [] for fid in fileIds or []: try: fr = dbMgmt.getFile(fid) if not fr: logger.warning(f"Workspace user message: file {fid} not found, skipping attachment record") continue fd = fr if isinstance(fr, dict) else fr.model_dump() documents.append({ "id": str(uuid.uuid4()), "fileId": fd.get("id") or fid, "fileName": fd.get("fileName") or "file", "fileSize": int(fd.get("fileSize") or 0), "mimeType": fd.get("mimeType") or "application/octet-stream", "roundNumber": 0, "taskNumber": 0, "actionNumber": 0, }) except Exception as e: logger.warning(f"Workspace user message: could not load file {fid}: {e}") return documents def _buildWorkspaceAttachmentLabel(chatService: Any, dataSourceIds: List[str], featureDataSourceIds: List[str]) -> str: """Short human-readable line for non-file attachments (data sources) on the user message.""" parts: List[str] = [] dsLabels: List[str] = [] for dsId in dataSourceIds or []: try: ds = chatService.getDataSource(dsId) if chatService and hasattr(chatService, "getDataSource") else None if ds: label = ds.get("label") or ds.get("path") or dsId[:8] dsLabels.append(str(label)) except Exception as e: logger.debug(f"Label for data source {dsId}: {e}") if dsLabels: parts.append("Datenquellen: " + ", ".join(dsLabels)) fdsLabels: List[str] = [] for fdsId in featureDataSourceIds or []: try: from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource from modules.interfaces.interfaceDbApp import getRootInterface rootIf = getRootInterface() records = rootIf.db.getRecordset(FeatureDataSource, recordFilter={"id": fdsId}) if records: fds = records[0] tbl = fds.get("tableName") or "" lbl = fds.get("label") or tbl fdsLabels.append(f"{tbl} ({lbl})".strip() if tbl else str(lbl)) except Exception as e: logger.debug(f"Label for feature data source {fdsId}: {e}") if fdsLabels: parts.append("Feature-Daten: " + ", ".join(fdsLabels)) return " | ".join(parts) def _workspaceMessageToClientDict(msg: Any) -> Dict[str, Any]: """Serialize ChatMessage (or dict) for workspace GET /messages including documents.""" if isinstance(msg, dict): raw = dict(msg) elif hasattr(msg, "model_dump"): raw = msg.model_dump() elif hasattr(msg, "dict"): raw = msg.dict() else: raw = { "id": getattr(msg, "id", None), "workflowId": getattr(msg, "workflowId", None), "role": getattr(msg, "role", ""), "message": getattr(msg, "message", None) or getattr(msg, "content", None), "publishedAt": getattr(msg, "publishedAt", None), "sequenceNr": getattr(msg, "sequenceNr", None), "documentsLabel": getattr(msg, "documentsLabel", None), "documents": getattr(msg, "documents", None) or [], } if raw.get("message") is not None and raw.get("content") is None: raw["content"] = raw["message"] docs = raw.get("documents") or [] serialized_docs: List[Dict[str, Any]] = [] for doc in docs: if isinstance(doc, dict): serialized_docs.append(doc) elif hasattr(doc, "model_dump"): serialized_docs.append(doc.model_dump()) elif hasattr(doc, "dict"): serialized_docs.append(doc.dict()) else: serialized_docs.append({ "id": getattr(doc, "id", ""), "messageId": getattr(doc, "messageId", ""), "fileId": getattr(doc, "fileId", ""), "fileName": getattr(doc, "fileName", ""), "fileSize": getattr(doc, "fileSize", 0), "mimeType": getattr(doc, "mimeType", ""), "roundNumber": getattr(doc, "roundNumber", None), "taskNumber": getattr(doc, "taskNumber", None), "actionNumber": getattr(doc, "actionNumber", None), "actionId": getattr(doc, "actionId", None), }) raw["documents"] = serialized_docs return raw def _loadConversationHistory(chatInterface, workflowId: str, currentPrompt: str) -> List[Dict[str, str]]: """Load prior messages from DB for follow-up context, excluding the current prompt. File documents attached to user messages are serialized as a short ``[Attached files: …]`` block appended to the message content so the agent sees which files a previous prompt referred to. """ try: rawMessages = chatInterface.getMessages(workflowId) or [] except Exception as e: logger.warning(f"Failed to load conversation history: {e}") return [] history = [] for msg in rawMessages: if isinstance(msg, dict): role = msg.get("role", "") content = msg.get("message", "") or msg.get("content", "") docs = msg.get("documents") or [] docsLabel = msg.get("documentsLabel") or "" else: role = getattr(msg, "role", "") content = getattr(msg, "message", "") or getattr(msg, "content", "") docs = getattr(msg, "documents", None) or [] docsLabel = getattr(msg, "documentsLabel", "") or "" if role not in ("user", "assistant"): continue if not content and not docs: continue enriched = content or "" if role == "user" and docs: fileParts = [] for doc in docs: if isinstance(doc, dict): fName = doc.get("fileName", "") fId = doc.get("fileId", "") fMime = doc.get("mimeType", "") fSize = doc.get("fileSize", 0) elif hasattr(doc, "fileName"): fName = getattr(doc, "fileName", "") fId = getattr(doc, "fileId", "") fMime = getattr(doc, "mimeType", "") fSize = getattr(doc, "fileSize", 0) else: continue if fId or fName: fileParts.append(f" - {fName} (id: {fId}, type: {fMime}, size: {fSize} bytes)") if fileParts: enriched += "\n\n[Attached files]\n" + "\n".join(fileParts) if role == "user" and docsLabel: enriched += f"\n[Attachments: {docsLabel}]" if enriched.strip(): history.append({"role": role, "content": enriched}) if not history: return [] # Drop the last user message if it matches the current prompt (already added by the agent loop) lastContent = history[-1].get("content", "").strip() currentStripped = currentPrompt.strip() if history[-1]["role"] == "user" and ( lastContent == currentStripped or lastContent.startswith(currentStripped) ): history = history[:-1] if history: logger.info(f"Loaded {len(history)} prior messages for workflow {workflowId}") return history def _collectPriorFileIds(chatInterface, workflowId: str) -> List[str]: """Collect fileIds from all prior user messages in the workflow. Returns a deduplicated list of file IDs so follow-up prompts can reference files that were attached to earlier messages. """ try: rawMessages = chatInterface.getMessages(workflowId) or [] except Exception: return [] seen: set = set() result: List[str] = [] for msg in rawMessages: if isinstance(msg, dict): role = msg.get("role", "") docs = msg.get("documents") or [] else: role = getattr(msg, "role", "") docs = getattr(msg, "documents", None) or [] if role != "user": continue for doc in docs: fid = doc.get("fileId", "") if isinstance(doc, dict) else getattr(doc, "fileId", "") if fid and fid not in seen: seen.add(fid) result.append(fid) return result async def _deriveWorkflowName(prompt: str, aiService) -> str: """Use AI to generate a concise workflow title from the user prompt.""" from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum try: cleanPrompt = prompt.split("\n[Active Data Sources]")[0].strip()[:300] req = AiCallRequest( prompt=( "Generate a short title (3-6 words) for a chat conversation that starts with this user message. " "Reply with ONLY the title, nothing else. Same language as the user message.\n\n" f"User message: {cleanPrompt}" ), options=AiCallOptions( operationType=OperationTypeEnum.DATA_EXTRACT, priority=PriorityEnum.SPEED, compressPrompt=False, temperature=0.3, ), ) resp = await aiService.callAi(req) title = (resp.content or "").strip().strip('"\'').strip() if title and len(title) <= 60: return title except Exception as e: logger.warning(f"AI naming failed, using fallback: {e}") text = prompt.split("\n[Active Data Sources]")[0].split("\n")[0].strip()[:50] return text or "Chat" # --------------------------------------------------------------------------- # SSE Stream endpoint # --------------------------------------------------------------------------- @router.post("/{instanceId}/start/stream") @limiter.limit("300/minute") async def streamWorkspaceStart( request: Request, instanceId: str = Path(..., description="Feature instance ID"), userInput: WorkspaceInputRequest = Body(...), context: RequestContext = Depends(getRequestContext), ): """Start or continue a Workspace session with SSE streaming via serviceAgent.""" mandateId, instanceConfig = _validateInstanceAccess(instanceId, context) chatInterface = _getChatInterface(context, featureInstanceId=instanceId) aiObjects = await _getAiObjects() eventManager = get_event_manager() if userInput.workflowId: workflow = chatInterface.getWorkflow(userInput.workflowId) if not workflow: logger.warning(f"Workflow {userInput.workflowId} not found, creating new one") workflow = chatInterface.createWorkflow({ "featureInstanceId": instanceId, "status": "active", "name": "", "workflowMode": "Dynamic", }) else: workflow = chatInterface.createWorkflow({ "featureInstanceId": instanceId, "status": "active", "name": "", "workflowMode": "Dynamic", }) workflowId = workflow.get("id") if isinstance(workflow, dict) else getattr(workflow, "id", str(workflow)) queueId = f"workspace-{workflowId}" eventManager.create_queue(queueId) dbMgmt = _getDbManagement(context, featureInstanceId=instanceId) userDocuments = _workspaceFilesToChatDocuments(dbMgmt, userInput.fileIds or []) from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext wsBillingFeatureCode = _workspaceBillingFeatureCode(context.user, mandateId or "", instanceId) svcCtx = ServiceCenterContext( user=context.user, mandate_id=mandateId or "", feature_instance_id=instanceId, workflow_id=workflowId, feature_code=wsBillingFeatureCode, ) chatSvc = getService("chat", svcCtx) attachmentLabel = _buildWorkspaceAttachmentLabel( chatSvc, userInput.dataSourceIds or [], userInput.featureDataSourceIds or [], ) userMessageData: Dict[str, Any] = { "workflowId": workflowId, "role": "user", "message": userInput.prompt, } if userDocuments: userMessageData["documents"] = userDocuments if attachmentLabel: userMessageData["documentsLabel"] = attachmentLabel chatInterface.createMessage(userMessageData) agentTask = asyncio.ensure_future( _runWorkspaceAgent( workflowId=workflowId, queueId=queueId, prompt=userInput.prompt, fileIds=userInput.fileIds, dataSourceIds=userInput.dataSourceIds, featureDataSourceIds=userInput.featureDataSourceIds, voiceMode=userInput.voiceMode, instanceId=instanceId, user=context.user, mandateId=mandateId or "", aiObjects=aiObjects, chatInterface=chatInterface, eventManager=eventManager, userLanguage=userInput.userLanguage, instanceConfig=instanceConfig, allowedProviders=userInput.allowedProviders, requireNeutralization=userInput.requireNeutralization, billingFeatureCode=wsBillingFeatureCode, ) ) eventManager.register_agent_task(queueId, agentTask) async def _sseGenerator(): queue = eventManager.get_queue(queueId) if not queue: return while True: try: event = await asyncio.wait_for(queue.get(), timeout=120) except asyncio.TimeoutError: yield "data: {\"type\": \"keepalive\"}\n\n" continue if event is None: break ssePayload = event.get("data", event) if isinstance(event, dict) else event yield f"data: {json.dumps(ssePayload, default=str)}\n\n" eventType = ssePayload.get("type", "") if isinstance(ssePayload, dict) else "" if eventType in ("complete", "error", "stopped"): break await eventManager.cleanup(queueId, delay=30) return StreamingResponse( _sseGenerator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) async def _runWorkspaceAgent( workflowId: str, queueId: str, prompt: str, fileIds: List[str], dataSourceIds: List[str], featureDataSourceIds: List[str] = None, voiceMode: bool = False, instanceId: str = "", user=None, mandateId: str = "", aiObjects=None, chatInterface=None, eventManager=None, userLanguage: str = "en", instanceConfig: Dict[str, Any] = None, allowedProviders: List[str] = None, requireNeutralization: Optional[bool] = None, billingFeatureCode: Optional[str] = None, ): """Run the serviceAgent loop and forward events to the SSE queue.""" try: from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=user, mandate_id=mandateId, feature_instance_id=instanceId, workflow_id=workflowId, feature_code=billingFeatureCode, ) agentService = getService("agent", ctx) chatService = getService("chat", ctx) aiService = getService("ai", ctx) if allowedProviders: aiService.services.allowedProviders = allowedProviders logger.info(f"Workspace agent: allowedProviders={allowedProviders}") else: logger.debug("Workspace agent: no allowedProviders in request") if requireNeutralization is not None: ctx.requireNeutralization = requireNeutralization wfRecord = chatInterface.getWorkflow(workflowId) if workflowId else None wfName = "" if wfRecord: wfName = wfRecord.get("name", "") if isinstance(wfRecord, dict) else getattr(wfRecord, "name", "") if not wfName.strip() or wfName.startswith("Neuer Chat"): async def _nameInBackground(): try: autoName = await _deriveWorkflowName(prompt, aiService) chatInterface.updateWorkflow(workflowId, {"name": autoName}) await eventManager.emit_event(queueId, "workflowUpdated", { "type": "workflowUpdated", "workflowId": workflowId, "name": autoName, }) except Exception as nameErr: logger.warning(f"AI workflow naming failed: {nameErr}") asyncio.ensure_future(_nameInBackground()) enrichedPrompt = prompt if dataSourceIds: dsInfo = _buildDataSourceContext(chatService, dataSourceIds) if dsInfo: enrichedPrompt = f"{prompt}\n\n[Active Data Sources]\n{dsInfo}" if featureDataSourceIds: fdsInfo = _buildFeatureDataSourceContext(featureDataSourceIds) if fdsInfo: enrichedPrompt = f"{enrichedPrompt}\n\n[Attached Feature Data Sources]\n{fdsInfo}" conversationHistory = _loadConversationHistory(chatInterface, workflowId, prompt) priorFileIds = _collectPriorFileIds(chatInterface, workflowId) currentFileIdSet = set(fileIds or []) mergedFileIds = list(fileIds or []) for pf in priorFileIds: if pf not in currentFileIdSet: mergedFileIds.append(pf) if len(mergedFileIds) > len(fileIds or []): logger.info( f"Merged {len(mergedFileIds) - len(fileIds or [])} prior file(s) into agent context " f"(total: {len(mergedFileIds)}) for workflow {workflowId}" ) accumulatedText = "" messagePersisted = False _cfg = instanceConfig or {} _toolSet = _cfg.get("toolSet", "core") _agentCfg = _cfg.get("agentConfig") from modules.serviceCenter.services.serviceAgent.datamodelAgent import AgentConfig agentCfgDict = dict(_agentCfg) if isinstance(_agentCfg, dict) else {} try: wsIf = interfaceFeatureWorkspace.getInterface(user, mandateId=mandateId, featureInstanceId=instanceId) userSettings = wsIf.getWorkspaceUserSettings(user.id if user else None) if userSettings and userSettings.maxAgentRounds is not None: agentCfgDict["maxRounds"] = userSettings.maxAgentRounds except Exception as e: logger.debug(f"Could not load workspace user settings for agent config: {e}") agentConfig = AgentConfig(**agentCfgDict) if agentCfgDict else None async for event in agentService.runAgent( prompt=enrichedPrompt, fileIds=mergedFileIds, workflowId=workflowId, userLanguage=userLanguage, conversationHistory=conversationHistory, toolSet=_toolSet, config=agentConfig, ): if eventManager.is_cancelled(queueId): logger.info(f"Agent cancelled by user for workflow {workflowId}") break if event.type == AgentEventTypeEnum.CHUNK and event.content: accumulatedText += event.content if event.type == AgentEventTypeEnum.FILE_EDIT_PROPOSAL and event.data: editData = event.data editId = editData.get("id", "") if editId: pendingEdit = PendingFileEdit( id=editId, fileId=editData.get("fileId", ""), fileName=editData.get("fileName", ""), mimeType=editData.get("mimeType", ""), oldContent=editData.get("oldContent", ""), newContent=editData.get("newContent", ""), workflowId=workflowId, ) _pendingEditsStore.forInstance(instanceId).add(pendingEdit) logger.info(f"Stored pending edit {editId} for file '{pendingEdit.fileName}' in instance {instanceId}") await eventManager.emit_event(queueId, "fileEditProposal", { "type": "fileEditProposal", "workflowId": workflowId, "item": { "id": editId, "fileId": editData.get("fileId", ""), "fileName": editData.get("fileName", ""), "mimeType": editData.get("mimeType", ""), "oldSize": len(editData.get("oldContent", "")), "newSize": len(editData.get("newContent", "")), }, }) continue sseEvent = { "type": event.type.value if hasattr(event.type, "value") else event.type, "workflowId": workflowId, } if event.content: sseEvent["content"] = event.content if event.type == AgentEventTypeEnum.MESSAGE: accumulatedText += event.content sseEvent["item"] = { "id": f"msg-{workflowId}-{id(event)}", "role": "assistant", "content": event.content, "workflowId": workflowId, } if event.data: sseEvent["item"] = event.data await eventManager.emit_event(queueId, sseEvent["type"], sseEvent) if event.type in (AgentEventTypeEnum.FINAL, AgentEventTypeEnum.ERROR): finalContent = event.content or accumulatedText if finalContent: try: chatInterface.createMessage({ "workflowId": workflowId, "role": "assistant", "message": finalContent, }) messagePersisted = True except Exception as msgErr: logger.error(f"Failed to persist assistant message: {msgErr}") # Persist any streamed content that wasn't saved via FINAL (e.g. cancellation) if not messagePersisted and accumulatedText.strip(): try: chatInterface.createMessage({ "workflowId": workflowId, "role": "assistant", "message": accumulatedText, }) logger.info(f"Persisted partial assistant response ({len(accumulatedText)} chars) for workflow {workflowId}") except Exception as msgErr: logger.error(f"Failed to persist partial assistant message: {msgErr}") logger.info(f"Agent loop completed for workflow {workflowId}, sending 'complete' event") await eventManager.emit_event(queueId, "complete", { "type": "complete", "workflowId": workflowId, }) except asyncio.CancelledError: logger.info(f"Agent task cancelled for workflow {workflowId}") await eventManager.emit_event(queueId, "stopped", { "type": "stopped", "workflowId": workflowId, }) except Exception as e: if isinstance(e, SubscriptionInactiveException): logger.warning(f"Workspace blocked by subscription: {e.message}") await eventManager.emit_event(queueId, "error", { "type": "error", "content": e.message, "workflowId": workflowId, "item": e.toClientDict(), }) elif isinstance(e, InsufficientBalanceException): logger.warning(f"Workspace blocked by billing: {e.message}") await eventManager.emit_event(queueId, "error", { "type": "error", "content": e.message, "workflowId": workflowId, "item": e.toClientDict(), }) else: logger.error(f"Workspace agent error: {e}", exc_info=True) await eventManager.emit_event(queueId, "error", { "type": "error", "content": str(e), "workflowId": workflowId, }) finally: eventManager._unregister_agent_task(queueId) # --------------------------------------------------------------------------- # Stop endpoint # --------------------------------------------------------------------------- @router.post("/{instanceId}/{workflowId}/stop") @limiter.limit("120/minute") async def stopWorkspace( request: Request, instanceId: str = Path(...), workflowId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) queueId = f"workspace-{workflowId}" eventManager = get_event_manager() cancelled = await eventManager.cancel_agent(queueId) await eventManager.emit_event(queueId, "stopped", { "type": "stopped", "workflowId": workflowId, }) logger.info(f"Stop requested for workflow {workflowId}, agent task cancelled: {cancelled}") return JSONResponse({"status": "stopped", "workflowId": workflowId}) # --------------------------------------------------------------------------- # Workflow / Conversation endpoints # --------------------------------------------------------------------------- @router.get("/{instanceId}/workflows") @limiter.limit("300/minute") async def listWorkspaceWorkflows( request: Request, instanceId: str = Path(...), includeArchived: bool = Query(default=False, description="Include archived workflows"), search: str = Query(default="", description="Fulltext search in workflow titles and message content"), context: RequestContext = Depends(getRequestContext), ): """List workspace workflows/conversations for this instance.""" _validateInstanceAccess(instanceId, context) chatInterface = _getChatInterface(context, featureInstanceId=instanceId) workflows = chatInterface.getWorkflows() or [] from modules.interfaces.interfaceDbApp import getRootInterface rootIf = getRootInterface() _fiCache: Dict[str, Dict[str, str]] = {} def _resolveFeatureLabels(fiId: str) -> Dict[str, str]: if fiId not in _fiCache: fi = rootIf.getFeatureInstance(fiId) if fi: _fiCache[fiId] = { "featureLabel": getattr(fi, "label", "") or getattr(fi, "featureCode", fiId), "featureCode": getattr(fi, "featureCode", ""), } else: _fiCache[fiId] = {"featureLabel": fiId[:8], "featureCode": ""} return _fiCache[fiId] items = [] for wf in workflows: if isinstance(wf, dict): item = wf else: item = { "id": getattr(wf, "id", None), "name": getattr(wf, "name", ""), "status": getattr(wf, "status", ""), "startedAt": getattr(wf, "startedAt", None), "lastActivity": getattr(wf, "lastActivity", None), "featureInstanceId": getattr(wf, "featureInstanceId", instanceId), } if not includeArchived and item.get("status") == "archived": continue fiId = item.get("featureInstanceId") or instanceId labels = _resolveFeatureLabels(fiId) item.setdefault("featureLabel", labels["featureLabel"]) item.setdefault("featureCode", labels["featureCode"]) item.setdefault("featureInstanceId", fiId) lastMsg = chatInterface.getLastMessageTimestamp(item.get("id")) if lastMsg: item["lastMessageAt"] = lastMsg items.append(item) if search and search.strip(): searchLower = search.strip().lower() matchedIds = set() for item in items: if searchLower in (item.get("name") or "").lower() or searchLower in (item.get("label") or "").lower(): matchedIds.add(item["id"]) contentHits = chatInterface.searchWorkflowsByContent(searchLower, limit=50) matchedIds.update(contentHits) items = [i for i in items if i["id"] in matchedIds] return JSONResponse({"workflows": items}) class ResolveRagRequest(BaseModel): """Request body for resolving a chat via RAG.""" chatId: str = Field(..., description="Workflow/chat ID to resolve") @router.post("/{instanceId}/resolve-rag") @limiter.limit("60/minute") async def resolveRag( request: Request, instanceId: str = Path(...), body: ResolveRagRequest = Body(...), context: RequestContext = Depends(getRequestContext), ): """Build a RAG summary for a chat (workflow) to inject into the input area.""" _validateInstanceAccess(instanceId, context) chatInterface = _getChatInterface(context, featureInstanceId=instanceId) messages = chatInterface.getMessages(body.chatId) or [] texts = [] for msg in messages[:30]: content = msg.get("message") if isinstance(msg, dict) else getattr(msg, "message", "") if content: texts.append(content[:500]) summary = "\n---\n".join(texts[:10]) if texts else "" return JSONResponse({"summary": summary, "chatId": body.chatId, "messageCount": len(texts)}) class UpdateWorkflowRequest(BaseModel): """Request body for updating a workflow (PATCH).""" name: Optional[str] = Field(default=None, description="New workflow name") status: Optional[str] = Field(default=None, description="New status (active, archived)") @router.patch("/{instanceId}/workflows/{workflowId}") @limiter.limit("300/minute") async def patchWorkspaceWorkflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID to update"), body: UpdateWorkflowRequest = Body(...), context: RequestContext = Depends(getRequestContext), ): """Update a workspace workflow (e.g. rename).""" _validateInstanceAccess(instanceId, context) chatInterface = _getChatInterface(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") updateData = {} if body.name is not None: updateData["name"] = body.name if body.status is not None: updateData["status"] = body.status if not updateData: updated = workflow else: updated = chatInterface.updateWorkflow(workflowId, updateData) if isinstance(updated, dict): return JSONResponse(updated) return JSONResponse({ "id": getattr(updated, "id", None), "name": getattr(updated, "name", ""), "status": getattr(updated, "status", ""), "startedAt": getattr(updated, "startedAt", None), "lastActivity": getattr(updated, "lastActivity", None), }) @router.delete("/{instanceId}/workflows/{workflowId}") @limiter.limit("120/minute") async def deleteWorkspaceWorkflow( request: Request, instanceId: str = Path(...), workflowId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Delete a workspace workflow and its messages.""" _validateInstanceAccess(instanceId, context) chatInterface = _getChatInterface(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") chatInterface.deleteWorkflow(workflowId) return JSONResponse({"status": "deleted", "workflowId": workflowId}) @router.post("/{instanceId}/workflows") @limiter.limit("120/minute") async def createWorkspaceWorkflow( request: Request, instanceId: str = Path(...), body: dict = Body(default={}), context: RequestContext = Depends(getRequestContext), ): """Create a new empty workspace workflow.""" _validateInstanceAccess(instanceId, context) chatInterface = _getChatInterface(context, featureInstanceId=instanceId) name = body.get("name", "Neuer Chat") workflow = chatInterface.createWorkflow({ "featureInstanceId": instanceId, "status": "active", "name": name, "workflowMode": "Dynamic", }) wfId = workflow.get("id") if isinstance(workflow, dict) else getattr(workflow, "id", None) wfName = workflow.get("name") if isinstance(workflow, dict) else getattr(workflow, "name", name) return JSONResponse({"id": wfId, "name": wfName, "status": "active"}) @router.get("/{instanceId}/workflows/{workflowId}/messages") @limiter.limit("300/minute") async def getWorkspaceMessages( request: Request, instanceId: str = Path(...), workflowId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Get all messages for a workspace workflow/conversation.""" _validateInstanceAccess(instanceId, context) chatInterface = _getChatInterface(context, featureInstanceId=instanceId) messages = chatInterface.getMessages(workflowId) or [] items = [_workspaceMessageToClientDict(m) for m in messages] items.sort( key=lambda m: ( parseTimestamp(m.get("publishedAt"), default=0) or 0, m.get("sequenceNr") or 0, str(m.get("id") or ""), ) ) return JSONResponse({"messages": items}) # --------------------------------------------------------------------------- # File and folder list endpoints # --------------------------------------------------------------------------- @router.get("/{instanceId}/files") @limiter.limit("300/minute") async def listWorkspaceFiles( request: Request, instanceId: str = Path(...), folderId: Optional[str] = Query(None), tags: Optional[str] = Query(None), search: Optional[str] = Query(None), context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) dbMgmt = _getDbManagement(context, featureInstanceId=instanceId) files = dbMgmt.getAllFiles() from modules.interfaces.interfaceDbApp import getRootInterface rootInterface = getRootInterface() instanceLabelCache: dict = {} result = [] for f in (files or []): item = f if isinstance(f, dict) else f.model_dump() fiId = item.get("featureInstanceId") or "" if fiId and fiId not in instanceLabelCache: fi = rootInterface.getFeatureInstance(fiId) instanceLabelCache[fiId] = fi.label if fi else fiId item["featureInstanceId"] = fiId item["featureInstanceLabel"] = instanceLabelCache.get(fiId, "(Global)") result.append(item) return JSONResponse({"files": result}) @router.get("/{instanceId}/files/{fileId}/content") @limiter.limit("300/minute") async def getFileContent( request: Request, instanceId: str = Path(...), fileId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Return the raw content of a file for preview.""" from fastapi.responses import Response _validateInstanceAccess(instanceId, context) dbMgmt = _getDbManagement(context, featureInstanceId=instanceId) fileRecord = dbMgmt.getFile(fileId) if not fileRecord: raise HTTPException(status_code=404, detail=f"File {fileId} not found") fileData = fileRecord if isinstance(fileRecord, dict) else fileRecord.model_dump() filePath = fileData.get("filePath") if not filePath: raise HTTPException(status_code=404, detail="File has no stored path") import os if not os.path.isfile(filePath): raise HTTPException(status_code=404, detail="File not found on disk") mimeType = fileData.get("mimeType", "application/octet-stream") with open(filePath, "rb") as fh: content = fh.read() return Response(content=content, media_type=mimeType) @router.get("/{instanceId}/folders") @limiter.limit("300/minute") async def listWorkspaceFolders( request: Request, instanceId: str = Path(...), parentId: Optional[str] = Query(None), context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) try: from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else None, feature_instance_id=instanceId, ) chatService = getService("chat", ctx) folders = chatService.listFolders(parentId=parentId) return JSONResponse({"folders": folders or []}) except Exception: return JSONResponse({"folders": []}) @router.get("/{instanceId}/datasources") @limiter.limit("300/minute") async def listWorkspaceDataSources( request: Request, instanceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) try: from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else None, feature_instance_id=instanceId, ) chatService = getService("chat", ctx) dataSources = chatService.listDataSources(featureInstanceId=instanceId) return JSONResponse({"dataSources": dataSources or []}) except Exception: return JSONResponse({"dataSources": []}) @router.get("/{instanceId}/connections") @limiter.limit("300/minute") async def listWorkspaceConnections( request: Request, instanceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Return the user's active connections (UserConnections).""" _validateInstanceAccess(instanceId, context) from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else None, feature_instance_id=instanceId, ) chatService = getService("chat", ctx) connections = chatService.getUserConnections() items = [] for c in connections or []: conn = c if isinstance(c, dict) else (c.model_dump() if hasattr(c, "model_dump") else {}) authority = conn.get("authority") if hasattr(authority, "value"): authority = authority.value status = conn.get("status") if hasattr(status, "value"): status = status.value items.append({ "id": conn.get("id"), "authority": authority, "externalUsername": conn.get("externalUsername"), "externalEmail": conn.get("externalEmail"), "status": status, }) return JSONResponse({"connections": items}) class CreateDataSourceRequest(BaseModel): """Request body for creating a DataSource.""" connectionId: str = Field(description="Connection ID") sourceType: str = Field(description="Source type") path: str = Field(description="Path") label: str = Field(description="Label") displayPath: Optional[str] = Field(default=None, description="Full human-readable path for tooltips") @router.post("/{instanceId}/datasources") @limiter.limit("300/minute") async def createWorkspaceDataSource( request: Request, instanceId: str = Path(...), body: CreateDataSourceRequest = Body(...), context: RequestContext = Depends(getRequestContext), ): """Create a new DataSource for this workspace instance.""" _validateInstanceAccess(instanceId, context) from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else None, feature_instance_id=instanceId, ) chatService = getService("chat", ctx) dataSource = chatService.createDataSource( connectionId=body.connectionId, sourceType=body.sourceType, path=body.path, label=body.label, featureInstanceId=instanceId, displayPath=body.displayPath, ) return JSONResponse(dataSource if isinstance(dataSource, dict) else dataSource.model_dump()) @router.delete("/{instanceId}/datasources/{dataSourceId}") @limiter.limit("300/minute") async def deleteWorkspaceDataSource( request: Request, instanceId: str = Path(...), dataSourceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Delete a DataSource.""" _validateInstanceAccess(instanceId, context) from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else None, feature_instance_id=instanceId, ) chatService = getService("chat", ctx) chatService.deleteDataSource(dataSourceId) return JSONResponse({"success": True}) # ---- Feature Connections & Feature Data Sources ---- @router.get("/{instanceId}/feature-connections") @limiter.limit("120/minute") async def listFeatureConnections( request: Request, instanceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """List feature instances the user has access to, scoped to the workspace mandate.""" wsMandateId, _ = _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceDbApp import getRootInterface from modules.security.rbacCatalog import getCatalogService from modules.datamodels.datamodelUam import Mandate rootIf = getRootInterface() userId = str(context.user.id) catalog = getCatalogService() featureCodesWithData = catalog.getFeaturesWithDataObjects() userMandates = rootIf.getUserMandates(userId) if not userMandates: return JSONResponse({"featureConnectionsByMandate": []}) allowedMandateIds = {um.mandateId for um in userMandates} if wsMandateId and wsMandateId in allowedMandateIds: allowedMandateIds = {wsMandateId} mandateLabels: dict = {} for um in userMandates: if um.mandateId not in allowedMandateIds: continue try: rows = rootIf.db.getRecordset(Mandate, recordFilter={"id": um.mandateId}) if rows: m = rows[0] mandateLabels[um.mandateId] = m.get("label") or m.get("name") or um.mandateId except Exception: mandateLabels[um.mandateId] = um.mandateId byMandate: dict = {} seenIds: set = set() for um in userMandates: if um.mandateId not in allowedMandateIds: continue allInstances = rootIf.getFeatureInstancesByMandate(um.mandateId) for inst in allInstances: if inst.id in seenIds: continue seenIds.add(inst.id) if not inst.enabled: continue if inst.featureCode not in featureCodesWithData: continue featureAccess = rootIf.getFeatureAccess(userId, inst.id) if not featureAccess or not featureAccess.enabled: continue featureDef = catalog.getFeatureDefinition(inst.featureCode) or {} dataObjects = catalog.getDataObjects(inst.featureCode) label = inst.label or inst.featureCode mid = inst.mandateId connItem = { "featureInstanceId": inst.id, "featureCode": inst.featureCode, "mandateId": mid, "label": label, "icon": featureDef.get("icon", "mdi-database"), "tableCount": len(dataObjects), } if mid not in byMandate: byMandate[mid] = [] byMandate[mid].append(connItem) def _sortKeyLabel(x: dict) -> str: return (x.get("label") or "").lower() groups = [] for mid in sorted(byMandate.keys(), key=lambda m: (mandateLabels.get(m, m) or "").lower()): conns = sorted(byMandate[mid], key=_sortKeyLabel) groups.append({ "mandateId": mid, "mandateLabel": mandateLabels.get(mid, mid), "featureConnections": conns, }) return JSONResponse({"featureConnectionsByMandate": groups}) @router.get("/{instanceId}/feature-connections/{fiId}/tables") @limiter.limit("120/minute") async def listFeatureConnectionTables( request: Request, instanceId: str = Path(...), fiId: str = Path(..., description="Feature instance ID"), context: RequestContext = Depends(getRequestContext), ): """List data tables (DATA_OBJECTS) for a feature instance, filtered by RBAC.""" wsMandateId, _ = _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceDbApp import getRootInterface from modules.security.rbacCatalog import getCatalogService rootIf = getRootInterface() inst = rootIf.getFeatureInstance(fiId) if not inst: raise HTTPException(status_code=404, detail="Feature instance not found") mandateId = str(inst.mandateId) if inst.mandateId else None if wsMandateId and mandateId and mandateId != wsMandateId: raise HTTPException(status_code=403, detail="Feature instance does not belong to workspace mandate") catalog = getCatalogService() try: from modules.security.rbac import RbacClass from modules.security.rootAccess import getRootDbAppConnector dbApp = getRootDbAppConnector() rbac = RbacClass(dbApp, dbApp=dbApp) accessible = catalog.getAccessibleDataObjects( featureCode=inst.featureCode, rbacInstance=rbac, user=context.user, mandateId=mandateId or "", featureInstanceId=fiId, ) except Exception: accessible = catalog.getDataObjects(inst.featureCode) tables = [] for obj in accessible: meta = obj.get("meta", {}) node = { "objectKey": obj.get("objectKey", ""), "tableName": meta.get("table", ""), "label": obj.get("label", {}), "fields": meta.get("fields", []), } if meta.get("isParent"): node["isParent"] = True node["displayFields"] = meta.get("displayFields", []) if meta.get("parentTable"): node["parentTable"] = meta["parentTable"] node["parentKey"] = meta.get("parentKey", "") tables.append(node) return JSONResponse({"tables": tables}) @router.get("/{instanceId}/feature-connections/{fiId}/parent-objects/{tableName}") @limiter.limit("120/minute") async def listParentObjects( request: Request, instanceId: str = Path(...), fiId: str = Path(..., description="Feature instance ID"), tableName: str = Path(..., description="Parent table name from DATA_OBJECTS"), context: RequestContext = Depends(getRequestContext), ): """List records from a parent table so the user can pick a specific record to scope data.""" wsMandateId, _ = _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceDbApp import getRootInterface from modules.security.rbacCatalog import getCatalogService rootIf = getRootInterface() inst = rootIf.getFeatureInstance(fiId) if not inst: raise HTTPException(status_code=404, detail="Feature instance not found") featureCode = inst.featureCode mandateId = str(inst.mandateId) if inst.mandateId else "" if wsMandateId and mandateId and mandateId != wsMandateId: raise HTTPException(status_code=403, detail="Feature instance does not belong to workspace mandate") catalog = getCatalogService() parentObj = None for obj in catalog.getDataObjects(featureCode): meta = obj.get("meta", {}) if meta.get("table") == tableName and meta.get("isParent"): parentObj = obj break if not parentObj: raise HTTPException(status_code=400, detail=f"Table '{tableName}' is not a registered parent table") displayFields = parentObj["meta"].get("displayFields", []) selectCols = ', '.join(f'"{f}"' for f in (["id"] + displayFields)) if displayFields else "*" from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG featureDbName = f"poweron_{featureCode.lower()}" featureDbConn = None try: featureDbConn = DatabaseConnector( dbHost=APP_CONFIG.get("DB_HOST", "localhost"), dbDatabase=featureDbName, dbUser=APP_CONFIG.get("DB_USER"), dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"), dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), userId=str(context.user.id), ) conn = featureDbConn.connection with conn.cursor() as cur: cur.execute( "SELECT column_name FROM information_schema.columns " "WHERE table_schema = 'public' AND LOWER(table_name) = LOWER(%s) " "AND column_name IN ('featureInstanceId', 'instanceId')", [tableName], ) instanceCols = [row["column_name"] for row in cur.fetchall()] instanceCol = "featureInstanceId" if "featureInstanceId" in instanceCols else "instanceId" cur.execute( "SELECT column_name FROM information_schema.columns " "WHERE table_schema = 'public' AND LOWER(table_name) = LOWER(%s) " "AND column_name = 'userId'", [tableName], ) hasUserId = cur.rowcount > 0 sql = ( f'SELECT {selectCols} FROM "{tableName}" ' f'WHERE "{instanceCol}" = %s' ) params = [fiId] if mandateId: sql += ' AND "mandateId" = %s' params.append(mandateId) if hasUserId: sql += ' AND "userId" = %s' params.append(str(context.user.id)) sql += ' ORDER BY "id" DESC LIMIT 100' cur.execute(sql, params) rows = [] for row in cur.fetchall(): r = dict(row) for k, v in r.items(): if hasattr(v, "isoformat"): r[k] = v.isoformat() elif isinstance(v, (bytes, bytearray)): r[k] = f"" displayParts = [str(r.get(f, "")) for f in displayFields if r.get(f) is not None] rows.append({ "id": r.get("id", ""), "displayLabel": " | ".join(displayParts) if displayParts else r.get("id", ""), "fields": {f: r.get(f) for f in displayFields}, }) except Exception as e: logger.error(f"listParentObjects({tableName}) failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to list parent objects: {e}") finally: if featureDbConn: try: featureDbConn.close() except Exception: pass return JSONResponse({"parentObjects": rows}) class CreateFeatureDataSourceRequest(BaseModel): """Request body for adding a feature table as data source.""" featureInstanceId: str = Field(description="Feature instance ID") featureCode: str = Field(description="Feature code") tableName: str = Field(description="Table name from DATA_OBJECTS") objectKey: str = Field(description="RBAC object key") label: str = Field(description="User-visible label") recordFilter: Optional[dict] = Field(default=None, description="Record-level filter for scoping") @router.post("/{instanceId}/feature-datasources") @limiter.limit("300/minute") async def createFeatureDataSource( request: Request, instanceId: str = Path(...), body: CreateFeatureDataSourceRequest = Body(...), context: RequestContext = Depends(getRequestContext), ): """Create a FeatureDataSource for this workspace instance.""" wsMandateId, _ = _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource rootIf = getRootInterface() inst = rootIf.getFeatureInstance(body.featureInstanceId) mandateId = str(inst.mandateId) if inst else (str(context.mandateId) if context.mandateId else "") if wsMandateId and mandateId and mandateId != wsMandateId: raise HTTPException(status_code=403, detail="Feature instance does not belong to workspace mandate") fds = FeatureDataSource( featureInstanceId=body.featureInstanceId, featureCode=body.featureCode, tableName=body.tableName, objectKey=body.objectKey, label=body.label, mandateId=mandateId, userId=str(context.user.id), workspaceInstanceId=instanceId, recordFilter=body.recordFilter, ) created = rootIf.db.recordCreate(FeatureDataSource, fds.model_dump()) return JSONResponse(created if isinstance(created, dict) else fds.model_dump()) @router.get("/{instanceId}/feature-datasources") @limiter.limit("300/minute") async def listFeatureDataSources( request: Request, instanceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """List active FeatureDataSources for this workspace instance.""" _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource rootIf = getRootInterface() records = rootIf.db.getRecordset( FeatureDataSource, recordFilter={"workspaceInstanceId": instanceId}, ) return JSONResponse({"featureDataSources": records or []}) @router.delete("/{instanceId}/feature-datasources/{featureDataSourceId}") @limiter.limit("300/minute") async def deleteFeatureDataSource( request: Request, instanceId: str = Path(...), featureDataSourceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Delete a FeatureDataSource.""" _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource rootIf = getRootInterface() rootIf.db.recordDelete(FeatureDataSource, featureDataSourceId) return JSONResponse({"success": True}) @router.get("/{instanceId}/connections/{connectionId}/services") @limiter.limit("120/minute") async def listConnectionServices( request: Request, instanceId: str = Path(...), connectionId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Return the available services for a specific UserConnection.""" _validateInstanceAccess(instanceId, context) try: from modules.connectors.connectorResolver import ConnectorResolver from modules.serviceCenter import getService as getSvc from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else None, feature_instance_id=instanceId, ) chatService = getSvc("chat", ctx) securityService = getSvc("security", ctx) dbInterface = _buildResolverDbInterface(chatService) resolver = ConnectorResolver(securityService, dbInterface) provider = await resolver.resolve(connectionId) services = provider.getAvailableServices() _serviceLabels = { "sharepoint": "SharePoint", "outlook": "Outlook", "teams": "Teams", "onedrive": "OneDrive", "drive": "Google Drive", "gmail": "Gmail", "files": "Files (FTP)", } _serviceIcons = { "sharepoint": "sharepoint", "outlook": "mail", "teams": "chat", "onedrive": "cloud", "drive": "cloud", "gmail": "mail", "files": "folder", } items = [ { "service": s, "label": _serviceLabels.get(s, s), "icon": _serviceIcons.get(s, "folder"), } for s in services ] return JSONResponse({"services": items}) except Exception as e: logger.error(f"Error listing services for connection {connectionId}: {e}") return JSONResponse({"services": [], "error": str(e)}, status_code=400) @router.get("/{instanceId}/connections/{connectionId}/browse") @limiter.limit("300/minute") async def browseConnectionService( request: Request, instanceId: str = Path(...), connectionId: str = Path(...), service: str = Query(..., description="Service name (e.g. sharepoint, onedrive, outlook)"), path: str = Query("/", description="Path within the service to browse"), context: RequestContext = Depends(getRequestContext), ): """Browse folders/items within a connection's service at a given path.""" _validateInstanceAccess(instanceId, context) try: from modules.connectors.connectorResolver import ConnectorResolver from modules.serviceCenter import getService as getSvc from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else None, feature_instance_id=instanceId, ) chatService = getSvc("chat", ctx) securityService = getSvc("security", ctx) dbInterface = _buildResolverDbInterface(chatService) resolver = ConnectorResolver(securityService, dbInterface) adapter = await resolver.resolveService(connectionId, service) entries = await adapter.browse(path, filter=None) items = [] for entry in (entries or []): items.append({ "name": entry.name, "path": entry.path, "isFolder": entry.isFolder, "size": entry.size, "mimeType": entry.mimeType, "metadata": entry.metadata if hasattr(entry, "metadata") else {}, }) return JSONResponse({"items": items, "path": path, "service": service}) except Exception as e: logger.error(f"Error browsing {service} for connection {connectionId} at '{path}': {e}") return JSONResponse({"items": [], "error": str(e)}, status_code=400) # --------------------------------------------------------------------------- # Voice endpoints # --------------------------------------------------------------------------- @router.post("/{instanceId}/voice/transcribe") @limiter.limit("120/minute") async def transcribeVoice( request: Request, instanceId: str = Path(...), audio: UploadFile = File(...), context: RequestContext = Depends(getRequestContext), ): """Transcribe audio to text using speech-to-text.""" _validateInstanceAccess(instanceId, context) audioBytes = await audio.read() try: import aiohttp formData = aiohttp.FormData() formData.add_field("audio", audioBytes, filename=audio.filename or "audio.webm") async with aiohttp.ClientSession() as session: async with session.post( f"{request.base_url}api/voice-google/speech-to-text", data=formData, ) as resp: if resp.status == 200: result = await resp.json() return JSONResponse({"text": result.get("text", "")}) return JSONResponse({"text": "", "error": f"STT failed: {resp.status}"}) except Exception as e: logger.error(f"Voice transcription error: {e}") return JSONResponse({"text": "", "error": str(e)}) @router.post("/{instanceId}/voice/synthesize") @limiter.limit("120/minute") async def synthesizeVoice( request: Request, instanceId: str = Path(...), body: dict = Body(...), context: RequestContext = Depends(getRequestContext), ): """Synthesize text to speech audio.""" _validateInstanceAccess(instanceId, context) text = body.get("text", "") if not text: raise HTTPException(status_code=400, detail="text is required") return JSONResponse({"audio": None, "note": "TTS via browser Speech Synthesis API recommended"}) # ============================================================================= # FILE EDIT PROPOSAL ENDPOINTS # ============================================================================= @router.get("/{instanceId}/pending-edits") @limiter.limit("120/minute") async def getPendingEdits( request: Request, instanceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Return all pending file edit proposals for this workspace instance.""" _validateInstanceAccess(instanceId, context) editList = [e.model_dump() for e in _pendingEditsStore.forInstance(instanceId).getPending()] return JSONResponse({"edits": editList}) @router.post("/{instanceId}/edit/{editId}/accept") @limiter.limit("120/minute") async def acceptEdit( request: Request, instanceId: str = Path(...), editId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Accept a proposed file edit -- applies the new content to the file.""" _validateInstanceAccess(instanceId, context) edit = _pendingEditsStore.forInstance(instanceId).get(editId) if not edit: raise HTTPException(status_code=404, detail=f"Edit proposal {editId} not found") if edit.status != "pending": raise HTTPException(status_code=409, detail=f"Edit proposal is already {edit.status}") dbMgmt = _getDbManagement(context, instanceId) try: success = dbMgmt.updateFileData(edit.fileId, edit.newContent.encode("utf-8")) if not success: raise HTTPException(status_code=500, detail="Failed to update file data") except HTTPException: raise except Exception as e: logger.error(f"Failed to apply edit {editId}: {e}") raise HTTPException(status_code=500, detail=f"Failed to apply edit: {str(e)}") edit.status = "accepted" logger.info(f"Edit {editId} accepted for file '{edit.fileName}' in instance {instanceId}") return JSONResponse({ "success": True, "editId": editId, "fileId": edit.fileId, "fileName": edit.fileName, }) @router.post("/{instanceId}/edit/{editId}/reject") @limiter.limit("120/minute") async def rejectEdit( request: Request, instanceId: str = Path(...), editId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Reject a proposed file edit -- discards the change.""" _validateInstanceAccess(instanceId, context) edit = _pendingEditsStore.forInstance(instanceId).get(editId) if not edit: raise HTTPException(status_code=404, detail=f"Edit proposal {editId} not found") if edit.status != "pending": raise HTTPException(status_code=409, detail=f"Edit proposal is already {edit.status}") edit.status = "rejected" logger.info(f"Edit {editId} rejected for file '{edit.fileName}' in instance {instanceId}") return JSONResponse({ "success": True, "editId": editId, "fileId": edit.fileId, "fileName": edit.fileName, }) @router.post("/{instanceId}/edit/accept-all") @limiter.limit("30/minute") async def acceptAllEdits( request: Request, instanceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Accept all pending file edit proposals for this instance.""" _validateInstanceAccess(instanceId, context) instanceEdits = _pendingEditsStore.forInstance(instanceId) dbMgmt = _getDbManagement(context, instanceId) accepted = [] errors = [] for editId, edit in instanceEdits.items(): if edit.status != "pending": continue try: success = dbMgmt.updateFileData(edit.fileId, edit.newContent.encode("utf-8")) if success: edit.status = "accepted" accepted.append(editId) else: errors.append({"editId": editId, "error": "updateFileData returned False"}) except Exception as e: errors.append({"editId": editId, "error": str(e)}) logger.info(f"Accepted {len(accepted)} edits for instance {instanceId}, {len(errors)} errors") return JSONResponse({"accepted": accepted, "errors": errors}) @router.post("/{instanceId}/edit/reject-all") @limiter.limit("30/minute") async def rejectAllEdits( request: Request, instanceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Reject all pending file edit proposals for this instance.""" _validateInstanceAccess(instanceId, context) instanceEdits = _pendingEditsStore.forInstance(instanceId) rejected = [] for editId, edit in instanceEdits.items(): if edit.status != "pending": continue edit.status = "rejected" rejected.append(editId) logger.info(f"Rejected {len(rejected)} edits for instance {instanceId}") return JSONResponse({"rejected": rejected}) # ========================================================================= # General Settings Endpoints (per-user workspace settings) # ========================================================================= @router.get("/{instanceId}/settings/general") @limiter.limit("120/minute") async def getGeneralSettings( request: Request, instanceId: str = Path(...), context: RequestContext = Depends(getRequestContext), ): """Load general workspace settings for the current user, with effective values.""" _mandateId, instanceConfig = _validateInstanceAccess(instanceId, context) wsInterface = _getWorkspaceInterface(context, instanceId) userId = str(context.user.id) userSettings = wsInterface.getWorkspaceUserSettings(userId) agentCfg = (instanceConfig or {}).get("agentConfig", {}) instanceDefault = agentCfg.get("maxRounds", 25) if isinstance(agentCfg, dict) else 25 userOverride = userSettings.maxAgentRounds if userSettings else None effective = userOverride if userOverride is not None else instanceDefault return JSONResponse({ "maxAgentRounds": { "effective": effective, "userOverride": userOverride, "instanceDefault": instanceDefault, }, }) @router.put("/{instanceId}/settings/general") @limiter.limit("120/minute") async def updateGeneralSettings( request: Request, instanceId: str = Path(...), body: dict = Body(...), context: RequestContext = Depends(getRequestContext), ): """Update general workspace settings for the current user.""" _validateInstanceAccess(instanceId, context) wsInterface = _getWorkspaceInterface(context, instanceId) userId = str(context.user.id) data = { "userId": userId, "mandateId": str(context.mandateId) if context.mandateId else "", "featureInstanceId": instanceId, } if "maxAgentRounds" in body: val = body["maxAgentRounds"] data["maxAgentRounds"] = int(val) if val is not None else None wsInterface.saveWorkspaceUserSettings(data) return await getGeneralSettings(request, instanceId, context) # ========================================================================= # RAG / Knowledge — anonymised instance statistics (presentation / KPIs) # ========================================================================= def _collectWorkspaceFileIdsForStats(instanceId: str, mandateId: Optional[str]) -> List[str]: """All FileItem ids for this feature instance (any user). Knowledge rows are often stored without featureInstanceId; we correlate by file id from the Management DB.""" from modules.datamodels.datamodelFiles import FileItem from modules.interfaces.interfaceDbManagement import ComponentObjects co = ComponentObjects() rows = co.db.getRecordset(FileItem, recordFilter={"featureInstanceId": instanceId}) out: List[str] = [] m = str(mandateId) if mandateId else "" for r in rows or []: rid = r.get("id") if isinstance(r, dict) else getattr(r, "id", None) if not rid: continue if m: mid = r.get("mandateId") if isinstance(r, dict) else getattr(r, "mandateId", "") or "" if mid and mid != m: continue out.append(str(rid)) return out @router.get("/{instanceId}/rag-statistics") @limiter.limit("60/minute") async def getRagStatistics( request: Request, instanceId: str = Path(...), days: int = Query(90, ge=7, le=365, description="Timeline window in days"), context: RequestContext = Depends(getRequestContext), ): """Aggregated, non-identifying knowledge-store metrics for this workspace instance.""" mandateId, _instanceConfig = _validateInstanceAccess(instanceId, context) workspaceFileIds = _collectWorkspaceFileIdsForStats(instanceId, mandateId) kdb = getKnowledgeInterface(context.user) stats = kdb.getRagStatisticsForInstance( featureInstanceId=instanceId, mandateId=str(mandateId) if mandateId else "", timelineDays=days, workspaceFileIds=workspaceFileIds, ) if isinstance(stats, dict): stats.setdefault("scope", {}) stats["scope"]["workspaceFileIdsResolved"] = len(workspaceFileIds) return JSONResponse(stats)