gateway/modules/features/workspace/routeFeatureWorkspace.py

1037 lines
40 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Unified AI Workspace routes.
SSE-based endpoints that combine the capabilities of Codeeditor, Chatbot,
and Playground into a single agent-driven workspace.
"""
import logging
import json
import asyncio
from typing import Optional, List
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request, UploadFile, File
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
from modules.auth import limiter, getRequestContext, RequestContext
from modules.interfaces import interfaceDbChat, interfaceDbManagement
from modules.interfaces.interfaceAiObjects import AiObjects
from modules.serviceCenter.core.serviceStreaming import get_event_manager
from modules.serviceCenter.services.serviceAgent.datamodelAgent import AgentEventTypeEnum
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/workspace",
tags=["Unified Workspace"],
responses={404: {"description": "Not found"}},
)
_aiObjects: Optional[AiObjects] = None
class WorkspaceInputRequest(BaseModel):
"""Prompt input for the unified workspace."""
prompt: str = Field(description="User prompt text")
fileIds: List[str] = Field(default_factory=list, description="Referenced file IDs")
uploadedFiles: List[str] = Field(default_factory=list, description="Newly uploaded file IDs")
dataSourceIds: List[str] = Field(default_factory=list, description="Active DataSource IDs")
voiceMode: bool = Field(default=False, description="Enable voice response")
workflowId: Optional[str] = Field(default=None, description="Continue existing workflow")
userLanguage: str = Field(default="en", description="User language code")
async def _getAiObjects() -> AiObjects:
global _aiObjects
if _aiObjects is None:
_aiObjects = await AiObjects.create()
return _aiObjects
def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
instance = rootInterface.getFeatureInstance(instanceId)
if not instance:
raise HTTPException(status_code=404, detail=f"Feature instance {instanceId} not found")
featureAccess = rootInterface.getFeatureAccess(str(context.user.id), instanceId)
if not featureAccess or not featureAccess.enabled:
raise HTTPException(status_code=403, detail="Access denied to this feature instance")
return str(instance.mandateId) if instance.mandateId else None
def _getChatInterface(context: RequestContext, featureInstanceId: str = None):
return interfaceDbChat.getInterface(
context.user,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=featureInstanceId,
)
def _buildResolverDbInterface(chatService):
"""Build a DB adapter that ConnectorResolver can use to load UserConnections.
ConnectorResolver calls db.getUserConnection(connectionId).
interfaceDbApp provides getUserConnectionById(connectionId).
This adapter bridges the method name difference.
"""
class _ResolverDbAdapter:
def __init__(self, appInterface):
self._app = appInterface
def getUserConnection(self, connectionId: str):
if hasattr(self._app, "getUserConnectionById"):
return self._app.getUserConnectionById(connectionId)
return None
appIf = getattr(chatService, "interfaceDbApp", None)
if appIf:
return _ResolverDbAdapter(appIf)
return getattr(chatService, "interfaceDbComponent", None)
def _getDbManagement(context: RequestContext, featureInstanceId: str = None):
return interfaceDbManagement.getInterface(
context.user,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=featureInstanceId,
)
_SOURCE_TYPE_TO_SERVICE = {
"sharepointFolder": "sharepoint",
"onedriveFolder": "onedrive",
"outlookFolder": "outlook",
"googleDriveFolder": "drive",
"gmailFolder": "gmail",
"ftpFolder": "files",
}
def _buildDataSourceContext(chatService, dataSourceIds: List[str]) -> str:
"""Build a description of active data sources for the agent prompt."""
parts = [
"The user has attached the following external data sources to this prompt.",
"IMPORTANT RULES for attached data sources:",
"- Use ONLY browseDataSource, searchDataSource, and downloadFromDataSource to access these sources.",
"- Use the dataSourceId (UUID) exactly as shown below.",
"- Do NOT use listFiles, externalBrowse, or externalSearch for attached data sources -- those tools are for other purposes.",
"- browseDataSource returns BOTH files and folders at the given path.",
"- When downloading files, ALWAYS provide the human-readable fileName (with extension) from the browse results.",
"",
]
found = False
for dsId in dataSourceIds:
try:
ds = chatService.getDataSource(dsId) if hasattr(chatService, "getDataSource") else None
if ds:
found = True
label = ds.get("label", "")
sourceType = ds.get("sourceType", "")
connectionId = ds.get("connectionId", "")
path = ds.get("path", "/")
service = _SOURCE_TYPE_TO_SERVICE.get(sourceType, sourceType)
logger.info(f"DataSource context: id={dsId}, label={label}, sourceType={sourceType}, service={service}, connectionId={connectionId}, path={path[:80]}")
parts.append(
f"- dataSourceId: {dsId}\n"
f" label: \"{label}\"\n"
f" type: {sourceType} (service: {service})\n"
f" connectionId: {connectionId}\n"
f" path: {path}"
)
else:
logger.warning(f"DataSource {dsId} not found in DB")
except Exception as e:
logger.warning(f"Error loading DataSource {dsId}: {e}")
return "\n".join(parts) if found else ""
async def _deriveWorkflowName(prompt: str, aiService) -> str:
"""Use AI to generate a concise workflow title from the user prompt."""
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum
try:
cleanPrompt = prompt.split("\n[Active Data Sources]")[0].strip()[:300]
req = AiCallRequest(
prompt=(
"Generate a short title (3-6 words) for a chat conversation that starts with this user message. "
"Reply with ONLY the title, nothing else. Same language as the user message.\n\n"
f"User message: {cleanPrompt}"
),
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_EXTRACT,
priority=PriorityEnum.SPEED,
compressPrompt=False,
temperature=0.3,
),
)
resp = await aiService.callAi(req)
title = (resp.content or "").strip().strip('"\'').strip()
if title and len(title) <= 60:
return title
except Exception as e:
logger.warning(f"AI naming failed, using fallback: {e}")
text = prompt.split("\n[Active Data Sources]")[0].split("\n")[0].strip()[:50]
return text or "Chat"
# ---------------------------------------------------------------------------
# SSE Stream endpoint
# ---------------------------------------------------------------------------
@router.post("/{instanceId}/start/stream")
@limiter.limit("60/minute")
async def streamWorkspaceStart(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
userInput: WorkspaceInputRequest = Body(...),
context: RequestContext = Depends(getRequestContext),
):
"""Start or continue a Workspace session with SSE streaming via serviceAgent."""
mandateId = _validateInstanceAccess(instanceId, context)
chatInterface = _getChatInterface(context, featureInstanceId=instanceId)
aiObjects = await _getAiObjects()
eventManager = get_event_manager()
if userInput.workflowId:
workflow = chatInterface.getWorkflow(userInput.workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {userInput.workflowId} not found")
else:
workflow = chatInterface.createWorkflow({
"featureInstanceId": instanceId,
"status": "active",
"name": "",
"workflowMode": "Dynamic",
})
workflowId = workflow.get("id") if isinstance(workflow, dict) else getattr(workflow, "id", str(workflow))
queueId = f"workspace-{workflowId}"
eventManager.create_queue(queueId)
chatInterface.createMessage({
"workflowId": workflowId,
"role": "user",
"message": userInput.prompt,
})
agentTask = asyncio.ensure_future(
_runWorkspaceAgent(
workflowId=workflowId,
queueId=queueId,
prompt=userInput.prompt,
fileIds=userInput.fileIds,
dataSourceIds=userInput.dataSourceIds,
voiceMode=userInput.voiceMode,
instanceId=instanceId,
user=context.user,
mandateId=mandateId or "",
aiObjects=aiObjects,
chatInterface=chatInterface,
eventManager=eventManager,
userLanguage=userInput.userLanguage,
)
)
eventManager.register_agent_task(queueId, agentTask)
async def _sseGenerator():
queue = eventManager.get_queue(queueId)
if not queue:
return
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=120)
except asyncio.TimeoutError:
yield "data: {\"type\": \"keepalive\"}\n\n"
continue
if event is None:
break
ssePayload = event.get("data", event) if isinstance(event, dict) else event
yield f"data: {json.dumps(ssePayload, default=str)}\n\n"
eventType = ssePayload.get("type", "") if isinstance(ssePayload, dict) else ""
if eventType in ("complete", "error", "stopped"):
break
await eventManager.cleanup(queueId, delay=30)
return StreamingResponse(
_sseGenerator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
async def _runWorkspaceAgent(
workflowId: str,
queueId: str,
prompt: str,
fileIds: List[str],
dataSourceIds: List[str],
voiceMode: bool,
instanceId: str,
user,
mandateId: str,
aiObjects,
chatInterface,
eventManager,
userLanguage: str = "en",
):
"""Run the serviceAgent loop and forward events to the SSE queue."""
try:
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=user,
mandate_id=mandateId,
feature_instance_id=instanceId,
workflow_id=workflowId,
)
agentService = getService("agent", ctx)
chatService = getService("chat", ctx)
aiService = getService("ai", ctx)
wfRecord = chatInterface.getWorkflow(workflowId) if workflowId else None
wfName = ""
if wfRecord:
wfName = wfRecord.get("name", "") if isinstance(wfRecord, dict) else getattr(wfRecord, "name", "")
if not wfName.strip() or wfName.startswith("Neuer Chat"):
async def _nameInBackground():
try:
autoName = await _deriveWorkflowName(prompt, aiService)
chatInterface.updateWorkflow(workflowId, {"name": autoName})
await eventManager.emit_event(queueId, "workflowUpdated", {
"type": "workflowUpdated",
"workflowId": workflowId,
"name": autoName,
})
except Exception as nameErr:
logger.warning(f"AI workflow naming failed: {nameErr}")
asyncio.ensure_future(_nameInBackground())
enrichedPrompt = prompt
if dataSourceIds:
dsInfo = _buildDataSourceContext(chatService, dataSourceIds)
if dsInfo:
enrichedPrompt = f"{prompt}\n\n[Active Data Sources]\n{dsInfo}"
async for event in agentService.runAgent(
prompt=enrichedPrompt,
fileIds=fileIds,
workflowId=workflowId,
userLanguage=userLanguage,
):
if eventManager.is_cancelled(queueId):
logger.info(f"Agent cancelled by user for workflow {workflowId}")
break
sseEvent = {
"type": event.type.value if hasattr(event.type, "value") else event.type,
"workflowId": workflowId,
}
if event.content:
sseEvent["content"] = event.content
if event.type == AgentEventTypeEnum.MESSAGE:
sseEvent["item"] = {
"id": f"msg-{workflowId}-{id(event)}",
"role": "assistant",
"content": event.content,
"workflowId": workflowId,
}
if event.data:
sseEvent["item"] = event.data
await eventManager.emit_event(queueId, sseEvent["type"], sseEvent)
if event.type in (AgentEventTypeEnum.FINAL, AgentEventTypeEnum.ERROR):
if event.content:
try:
chatInterface.createMessage({
"workflowId": workflowId,
"role": "assistant",
"message": event.content,
})
except Exception as msgErr:
logger.error(f"Failed to persist assistant message: {msgErr}")
logger.info(f"Agent loop completed for workflow {workflowId}, sending 'complete' event")
await eventManager.emit_event(queueId, "complete", {
"type": "complete",
"workflowId": workflowId,
})
except asyncio.CancelledError:
logger.info(f"Agent task cancelled for workflow {workflowId}")
await eventManager.emit_event(queueId, "stopped", {
"type": "stopped",
"workflowId": workflowId,
})
except Exception as e:
logger.error(f"Workspace agent error: {e}", exc_info=True)
await eventManager.emit_event(queueId, "error", {
"type": "error",
"content": str(e),
"workflowId": workflowId,
})
finally:
eventManager._unregister_agent_task(queueId)
# ---------------------------------------------------------------------------
# Stop endpoint
# ---------------------------------------------------------------------------
@router.post("/{instanceId}/{workflowId}/stop")
@limiter.limit("30/minute")
async def stopWorkspace(
request: Request,
instanceId: str = Path(...),
workflowId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
_validateInstanceAccess(instanceId, context)
queueId = f"workspace-{workflowId}"
eventManager = get_event_manager()
cancelled = await eventManager.cancel_agent(queueId)
await eventManager.emit_event(queueId, "stopped", {
"type": "stopped",
"workflowId": workflowId,
})
logger.info(f"Stop requested for workflow {workflowId}, agent task cancelled: {cancelled}")
return JSONResponse({"status": "stopped", "workflowId": workflowId})
# ---------------------------------------------------------------------------
# Workflow / Conversation endpoints
# ---------------------------------------------------------------------------
@router.get("/{instanceId}/workflows")
@limiter.limit("60/minute")
async def listWorkspaceWorkflows(
request: Request,
instanceId: str = Path(...),
includeArchived: bool = Query(default=False, description="Include archived workflows"),
context: RequestContext = Depends(getRequestContext),
):
"""List workspace workflows/conversations for this instance."""
_validateInstanceAccess(instanceId, context)
chatInterface = _getChatInterface(context, featureInstanceId=instanceId)
workflows = chatInterface.getWorkflows() or []
items = []
for wf in workflows:
if isinstance(wf, dict):
item = wf
else:
item = {
"id": getattr(wf, "id", None),
"name": getattr(wf, "name", ""),
"status": getattr(wf, "status", ""),
"startedAt": getattr(wf, "startedAt", None),
"lastActivity": getattr(wf, "lastActivity", None),
}
if not includeArchived and item.get("status") == "archived":
continue
items.append(item)
return JSONResponse({"workflows": items})
class UpdateWorkflowRequest(BaseModel):
"""Request body for updating a workflow (PATCH)."""
name: Optional[str] = Field(default=None, description="New workflow name")
status: Optional[str] = Field(default=None, description="New status (active, archived)")
@router.patch("/{instanceId}/workflows/{workflowId}")
@limiter.limit("60/minute")
async def patchWorkspaceWorkflow(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID to update"),
body: UpdateWorkflowRequest = Body(...),
context: RequestContext = Depends(getRequestContext),
):
"""Update a workspace workflow (e.g. rename)."""
_validateInstanceAccess(instanceId, context)
chatInterface = _getChatInterface(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
updateData = {}
if body.name is not None:
updateData["name"] = body.name
if body.status is not None:
updateData["status"] = body.status
if not updateData:
updated = workflow
else:
updated = chatInterface.updateWorkflow(workflowId, updateData)
if isinstance(updated, dict):
return JSONResponse(updated)
return JSONResponse({
"id": getattr(updated, "id", None),
"name": getattr(updated, "name", ""),
"status": getattr(updated, "status", ""),
"startedAt": getattr(updated, "startedAt", None),
"lastActivity": getattr(updated, "lastActivity", None),
})
@router.delete("/{instanceId}/workflows/{workflowId}")
@limiter.limit("30/minute")
async def deleteWorkspaceWorkflow(
request: Request,
instanceId: str = Path(...),
workflowId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
"""Delete a workspace workflow and its messages."""
_validateInstanceAccess(instanceId, context)
chatInterface = _getChatInterface(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
chatInterface.deleteWorkflow(workflowId)
return JSONResponse({"status": "deleted", "workflowId": workflowId})
@router.post("/{instanceId}/workflows")
@limiter.limit("30/minute")
async def createWorkspaceWorkflow(
request: Request,
instanceId: str = Path(...),
body: dict = Body(default={}),
context: RequestContext = Depends(getRequestContext),
):
"""Create a new empty workspace workflow."""
_validateInstanceAccess(instanceId, context)
chatInterface = _getChatInterface(context, featureInstanceId=instanceId)
name = body.get("name", "Neuer Chat")
workflow = chatInterface.createWorkflow({
"featureInstanceId": instanceId,
"status": "active",
"name": name,
"workflowMode": "Dynamic",
})
wfId = workflow.get("id") if isinstance(workflow, dict) else getattr(workflow, "id", None)
wfName = workflow.get("name") if isinstance(workflow, dict) else getattr(workflow, "name", name)
return JSONResponse({"id": wfId, "name": wfName, "status": "active"})
@router.get("/{instanceId}/workflows/{workflowId}/messages")
@limiter.limit("60/minute")
async def getWorkspaceMessages(
request: Request,
instanceId: str = Path(...),
workflowId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
"""Get all messages for a workspace workflow/conversation."""
_validateInstanceAccess(instanceId, context)
chatInterface = _getChatInterface(context, featureInstanceId=instanceId)
messages = chatInterface.getMessages(workflowId) or []
items = []
for msg in messages:
if isinstance(msg, dict):
items.append(msg)
else:
items.append({
"id": getattr(msg, "id", None),
"role": getattr(msg, "role", ""),
"content": getattr(msg, "message", "") or getattr(msg, "content", ""),
"createdAt": getattr(msg, "publishedAt", None) or getattr(msg, "createdAt", None),
})
return JSONResponse({"messages": items})
# ---------------------------------------------------------------------------
# File and folder list endpoints
# ---------------------------------------------------------------------------
@router.get("/{instanceId}/files")
@limiter.limit("60/minute")
async def listWorkspaceFiles(
request: Request,
instanceId: str = Path(...),
folderId: Optional[str] = Query(None),
tags: Optional[str] = Query(None),
search: Optional[str] = Query(None),
context: RequestContext = Depends(getRequestContext),
):
_validateInstanceAccess(instanceId, context)
dbMgmt = _getDbManagement(context, featureInstanceId=instanceId)
files = dbMgmt.getAllFiles()
from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
instanceLabelCache: dict = {}
result = []
for f in (files or []):
item = f if isinstance(f, dict) else f.model_dump()
fiId = item.get("featureInstanceId") or ""
if fiId and fiId not in instanceLabelCache:
fi = rootInterface.getFeatureInstance(fiId)
instanceLabelCache[fiId] = fi.label if fi else fiId
item["featureInstanceId"] = fiId
item["featureInstanceLabel"] = instanceLabelCache.get(fiId, "(Global)")
result.append(item)
return JSONResponse({"files": result})
@router.get("/{instanceId}/files/{fileId}/content")
@limiter.limit("60/minute")
async def getFileContent(
request: Request,
instanceId: str = Path(...),
fileId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
"""Return the raw content of a file for preview."""
from fastapi.responses import Response
_validateInstanceAccess(instanceId, context)
dbMgmt = _getDbManagement(context, featureInstanceId=instanceId)
fileRecord = dbMgmt.getFile(fileId)
if not fileRecord:
raise HTTPException(status_code=404, detail=f"File {fileId} not found")
fileData = fileRecord if isinstance(fileRecord, dict) else fileRecord.model_dump()
filePath = fileData.get("filePath")
if not filePath:
raise HTTPException(status_code=404, detail="File has no stored path")
import os
if not os.path.isfile(filePath):
raise HTTPException(status_code=404, detail="File not found on disk")
mimeType = fileData.get("mimeType", "application/octet-stream")
with open(filePath, "rb") as fh:
content = fh.read()
return Response(content=content, media_type=mimeType)
@router.get("/{instanceId}/folders")
@limiter.limit("60/minute")
async def listWorkspaceFolders(
request: Request,
instanceId: str = Path(...),
parentId: Optional[str] = Query(None),
context: RequestContext = Depends(getRequestContext),
):
_validateInstanceAccess(instanceId, context)
try:
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=context.user,
mandate_id=str(context.mandateId) if context.mandateId else None,
feature_instance_id=instanceId,
)
chatService = getService("chat", ctx)
folders = chatService.listFolders(parentId=parentId)
return JSONResponse({"folders": folders or []})
except Exception:
return JSONResponse({"folders": []})
@router.get("/{instanceId}/datasources")
@limiter.limit("60/minute")
async def listWorkspaceDataSources(
request: Request,
instanceId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
_validateInstanceAccess(instanceId, context)
try:
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=context.user,
mandate_id=str(context.mandateId) if context.mandateId else None,
feature_instance_id=instanceId,
)
chatService = getService("chat", ctx)
dataSources = chatService.listDataSources(featureInstanceId=instanceId)
return JSONResponse({"dataSources": dataSources or []})
except Exception:
return JSONResponse({"dataSources": []})
@router.get("/{instanceId}/connections")
@limiter.limit("60/minute")
async def listWorkspaceConnections(
request: Request,
instanceId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
"""Return the user's active connections (UserConnections)."""
_validateInstanceAccess(instanceId, context)
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=context.user,
mandate_id=str(context.mandateId) if context.mandateId else None,
feature_instance_id=instanceId,
)
chatService = getService("chat", ctx)
connections = chatService.getUserConnections()
items = []
for c in connections or []:
conn = c if isinstance(c, dict) else (c.model_dump() if hasattr(c, "model_dump") else {})
authority = conn.get("authority")
if hasattr(authority, "value"):
authority = authority.value
status = conn.get("status")
if hasattr(status, "value"):
status = status.value
items.append({
"id": conn.get("id"),
"authority": authority,
"externalUsername": conn.get("externalUsername"),
"externalEmail": conn.get("externalEmail"),
"status": status,
})
return JSONResponse({"connections": items})
class CreateDataSourceRequest(BaseModel):
"""Request body for creating a DataSource."""
connectionId: str = Field(description="Connection ID")
sourceType: str = Field(description="Source type")
path: str = Field(description="Path")
label: str = Field(description="Label")
@router.post("/{instanceId}/datasources")
@limiter.limit("60/minute")
async def createWorkspaceDataSource(
request: Request,
instanceId: str = Path(...),
body: CreateDataSourceRequest = Body(...),
context: RequestContext = Depends(getRequestContext),
):
"""Create a new DataSource for this workspace instance."""
_validateInstanceAccess(instanceId, context)
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=context.user,
mandate_id=str(context.mandateId) if context.mandateId else None,
feature_instance_id=instanceId,
)
chatService = getService("chat", ctx)
dataSource = chatService.createDataSource(
connectionId=body.connectionId,
sourceType=body.sourceType,
path=body.path,
label=body.label,
featureInstanceId=instanceId,
)
return JSONResponse(dataSource if isinstance(dataSource, dict) else dataSource.model_dump())
@router.delete("/{instanceId}/datasources/{dataSourceId}")
@limiter.limit("60/minute")
async def deleteWorkspaceDataSource(
request: Request,
instanceId: str = Path(...),
dataSourceId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
"""Delete a DataSource."""
_validateInstanceAccess(instanceId, context)
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=context.user,
mandate_id=str(context.mandateId) if context.mandateId else None,
feature_instance_id=instanceId,
)
chatService = getService("chat", ctx)
chatService.deleteDataSource(dataSourceId)
return JSONResponse({"success": True})
@router.get("/{instanceId}/connections/{connectionId}/services")
@limiter.limit("30/minute")
async def listConnectionServices(
request: Request,
instanceId: str = Path(...),
connectionId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
"""Return the available services for a specific UserConnection."""
_validateInstanceAccess(instanceId, context)
try:
from modules.connectors.connectorResolver import ConnectorResolver
from modules.serviceCenter import getService as getSvc
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=context.user,
mandate_id=str(context.mandateId) if context.mandateId else None,
feature_instance_id=instanceId,
)
chatService = getSvc("chat", ctx)
securityService = getSvc("security", ctx)
dbInterface = _buildResolverDbInterface(chatService)
resolver = ConnectorResolver(securityService, dbInterface)
provider = await resolver.resolve(connectionId)
services = provider.getAvailableServices()
_serviceLabels = {
"sharepoint": "SharePoint",
"outlook": "Outlook",
"teams": "Teams",
"onedrive": "OneDrive",
"drive": "Google Drive",
"gmail": "Gmail",
"files": "Files (FTP)",
}
_serviceIcons = {
"sharepoint": "sharepoint",
"outlook": "mail",
"teams": "chat",
"onedrive": "cloud",
"drive": "cloud",
"gmail": "mail",
"files": "folder",
}
items = [
{
"service": s,
"label": _serviceLabels.get(s, s),
"icon": _serviceIcons.get(s, "folder"),
}
for s in services
]
return JSONResponse({"services": items})
except Exception as e:
logger.error(f"Error listing services for connection {connectionId}: {e}")
return JSONResponse({"services": [], "error": str(e)}, status_code=400)
@router.get("/{instanceId}/connections/{connectionId}/browse")
@limiter.limit("60/minute")
async def browseConnectionService(
request: Request,
instanceId: str = Path(...),
connectionId: str = Path(...),
service: str = Query(..., description="Service name (e.g. sharepoint, onedrive, outlook)"),
path: str = Query("/", description="Path within the service to browse"),
context: RequestContext = Depends(getRequestContext),
):
"""Browse folders/items within a connection's service at a given path."""
_validateInstanceAccess(instanceId, context)
try:
from modules.connectors.connectorResolver import ConnectorResolver
from modules.serviceCenter import getService as getSvc
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=context.user,
mandate_id=str(context.mandateId) if context.mandateId else None,
feature_instance_id=instanceId,
)
chatService = getSvc("chat", ctx)
securityService = getSvc("security", ctx)
dbInterface = _buildResolverDbInterface(chatService)
resolver = ConnectorResolver(securityService, dbInterface)
adapter = await resolver.resolveService(connectionId, service)
entries = await adapter.browse(path, filter=None)
items = []
for entry in (entries or []):
items.append({
"name": entry.name,
"path": entry.path,
"isFolder": entry.isFolder,
"size": entry.size,
"mimeType": entry.mimeType,
"metadata": entry.metadata if hasattr(entry, "metadata") else {},
})
return JSONResponse({"items": items, "path": path, "service": service})
except Exception as e:
logger.error(f"Error browsing {service} for connection {connectionId} at '{path}': {e}")
return JSONResponse({"items": [], "error": str(e)}, status_code=400)
# ---------------------------------------------------------------------------
# Voice endpoints
# ---------------------------------------------------------------------------
@router.post("/{instanceId}/voice/transcribe")
@limiter.limit("30/minute")
async def transcribeVoice(
request: Request,
instanceId: str = Path(...),
audio: UploadFile = File(...),
context: RequestContext = Depends(getRequestContext),
):
"""Transcribe audio to text using speech-to-text."""
_validateInstanceAccess(instanceId, context)
audioBytes = await audio.read()
try:
import aiohttp
formData = aiohttp.FormData()
formData.add_field("audio", audioBytes, filename=audio.filename or "audio.webm")
async with aiohttp.ClientSession() as session:
async with session.post(
f"{request.base_url}api/voice-google/speech-to-text",
data=formData,
) as resp:
if resp.status == 200:
result = await resp.json()
return JSONResponse({"text": result.get("text", "")})
return JSONResponse({"text": "", "error": f"STT failed: {resp.status}"})
except Exception as e:
logger.error(f"Voice transcription error: {e}")
return JSONResponse({"text": "", "error": str(e)})
@router.post("/{instanceId}/voice/synthesize")
@limiter.limit("30/minute")
async def synthesizeVoice(
request: Request,
instanceId: str = Path(...),
body: dict = Body(...),
context: RequestContext = Depends(getRequestContext),
):
"""Synthesize text to speech audio."""
_validateInstanceAccess(instanceId, context)
text = body.get("text", "")
if not text:
raise HTTPException(status_code=400, detail="text is required")
return JSONResponse({"audio": None, "note": "TTS via browser Speech Synthesis API recommended"})
# =========================================================================
# Voice Settings Endpoints
# =========================================================================
@router.get("/{instanceId}/settings/voice")
@limiter.limit("30/minute")
async def getVoiceSettings(
request: Request,
instanceId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
"""Load voice settings for the current user and instance."""
_validateInstanceAccess(instanceId, context)
dbMgmt = _getDbManagement(context, instanceId)
userId = str(context.user.id)
try:
vs = dbMgmt.getVoiceSettings(userId)
if not vs:
logger.info(f"GET voice settings: not found for user={userId}, creating defaults")
vs = dbMgmt.getOrCreateVoiceSettings(userId)
result = vs.model_dump() if vs else {}
mapKeys = list(result.get("ttsVoiceMap", {}).keys()) if result else []
logger.info(f"GET voice settings for user={userId}: ttsVoiceMap languages={mapKeys}")
return JSONResponse(result)
except Exception as e:
logger.error(f"Failed to load voice settings for user={userId}: {e}", exc_info=True)
return JSONResponse({"ttsVoiceMap": {}}, status_code=200)
@router.put("/{instanceId}/settings/voice")
@limiter.limit("30/minute")
async def updateVoiceSettings(
request: Request,
instanceId: str = Path(...),
body: dict = Body(...),
context: RequestContext = Depends(getRequestContext),
):
"""Update voice settings for the current user and instance."""
_validateInstanceAccess(instanceId, context)
dbMgmt = _getDbManagement(context, instanceId)
userId = str(context.user.id)
try:
logger.info(f"PUT voice settings for user={userId}, instance={instanceId}, body keys={list(body.keys())}")
vs = dbMgmt.getVoiceSettings(userId)
if not vs:
logger.info(f"No existing voice settings, creating new for user={userId}")
createData = {
"userId": userId,
"mandateId": str(context.mandateId) if context.mandateId else "",
"featureInstanceId": instanceId,
}
createData.update(body)
created = dbMgmt.createVoiceSettings(createData)
logger.info(f"Created voice settings for user={userId}, ttsVoiceMap keys={list((created or {}).get('ttsVoiceMap', {}).keys())}")
return JSONResponse(created)
updateData = {k: v for k, v in body.items() if k not in ("id", "userId", "mandateId", "featureInstanceId", "creationDate")}
logger.info(f"Updating voice settings for user={userId}, update keys={list(updateData.keys())}")
updated = dbMgmt.updateVoiceSettings(userId, updateData)
logger.info(f"Updated voice settings for user={userId}, ttsVoiceMap keys={list((updated or {}).get('ttsVoiceMap', {}).keys())}")
return JSONResponse(updated)
except Exception as e:
logger.error(f"Failed to update voice settings for user={userId}: {e}", exc_info=True)
return JSONResponse({"error": str(e)}, status_code=500)
@router.get("/{instanceId}/voice/languages")
@limiter.limit("30/minute")
async def getVoiceLanguages(
request: Request,
instanceId: str = Path(...),
context: RequestContext = Depends(getRequestContext),
):
"""Return available TTS languages."""
mandateId = _validateInstanceAccess(instanceId, context)
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
voiceInterface = getVoiceInterface(context.user, mandateId)
languagesResult = await voiceInterface.getAvailableLanguages()
languageList = languagesResult.get("languages", []) if isinstance(languagesResult, dict) else languagesResult
return JSONResponse({"languages": languageList})
@router.get("/{instanceId}/voice/voices")
@limiter.limit("30/minute")
async def getVoiceVoices(
request: Request,
instanceId: str = Path(...),
language: str = Query("de-DE"),
context: RequestContext = Depends(getRequestContext),
):
"""Return available TTS voices for a given language."""
mandateId = _validateInstanceAccess(instanceId, context)
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
voiceInterface = getVoiceInterface(context.user, mandateId)
voicesResult = await voiceInterface.getAvailableVoices(language)
voiceList = voicesResult.get("voices", []) if isinstance(voicesResult, dict) else voicesResult
return JSONResponse({"voices": voiceList})
@router.post("/{instanceId}/voice/test")
@limiter.limit("10/minute")
async def testVoice(
request: Request,
instanceId: str = Path(...),
body: dict = Body(...),
context: RequestContext = Depends(getRequestContext),
):
"""Test a specific voice with a sample text."""
import base64
mandateId = _validateInstanceAccess(instanceId, context)
text = body.get("text", "Hallo, das ist ein Stimmtest.")
language = body.get("language", "de-DE")
voiceId = body.get("voiceId")
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
voiceInterface = getVoiceInterface(context.user, mandateId)
try:
result = await voiceInterface.textToSpeech(text=text, languageCode=language, voiceName=voiceId)
if result and isinstance(result, dict):
audioContent = result.get("audioContent")
if audioContent:
audioB64 = base64.b64encode(
audioContent if isinstance(audioContent, bytes) else audioContent.encode()
).decode()
return JSONResponse({"success": True, "audio": audioB64, "format": "mp3", "text": text})
return JSONResponse({"success": False, "error": "TTS returned no audio"})
except Exception as e:
logger.error(f"Voice test failed: {e}")
raise HTTPException(status_code=500, detail=f"TTS test failed: {str(e)}")