# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ CommCoach routes for the backend API. Implements coaching context management, session streaming, tasks, dashboard, and voice endpoints. """ import logging import json import asyncio import base64 from typing import Optional from fastapi import APIRouter, HTTPException, Depends, Request from fastapi.responses import StreamingResponse from modules.auth import limiter, getRequestContext, RequestContext from modules.shared.timeUtils import getIsoTimestamp from modules.interfaces.interfaceDbApp import getRootInterface from modules.interfaces.interfaceFeatures import getFeatureInterface from . import interfaceFeatureCommcoach as interfaceDb from .datamodelCommcoach import ( CoachingContext, CoachingContextStatus, CoachingSession, CoachingSessionStatus, CoachingMessage, CoachingMessageRole, CoachingMessageContentType, CoachingTask, CoachingTaskStatus, CreateContextRequest, UpdateContextRequest, SendMessageRequest, CreateTaskRequest, UpdateTaskRequest, UpdateTaskStatusRequest, UpdateProfileRequest, ) from .serviceCommcoach import CommcoachService, emitSessionEvent, getSessionEventQueue, cleanupSessionEvents logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/commcoach", tags=["CommCoach"], responses={404: {"description": "Not found"}} ) # ========================================================================= # Helpers # ========================================================================= def _getInterface(context: RequestContext, instanceId: Optional[str] = None): mandateId = str(context.mandateId) if context.mandateId else None return interfaceDb.getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId) def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str: rootInterface = getRootInterface() featureInterface = getFeatureInterface(rootInterface.db) instance = featureInterface.getFeatureInstance(instanceId) if not instance: raise HTTPException(status_code=404, detail=f"Feature instance '{instanceId}' not found") mandateId = instance.get("mandateId") if isinstance(instance, dict) else getattr(instance, "mandateId", None) if not mandateId: raise HTTPException(status_code=500, detail="Feature instance has no mandateId") return str(mandateId) def _validateOwnership(record: dict, context: RequestContext, fieldName: str = "userId") -> None: """Strict ownership check. SysAdmin does NOT bypass for content access.""" if record.get(fieldName) != str(context.user.id): raise HTTPException(status_code=404, detail="Not found") # ========================================================================= # Context Endpoints # ========================================================================= @router.get("/{instanceId}/contexts") @limiter.limit("60/minute") async def listContexts( request: Request, instanceId: str, includeArchived: bool = False, context: RequestContext = Depends(getRequestContext), ): """List all coaching contexts for the current user.""" mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) contexts = interface.getContexts(instanceId, userId, includeArchived=includeArchived) return {"contexts": contexts} @router.post("/{instanceId}/contexts") @limiter.limit("20/minute") async def createContext( request: Request, instanceId: str, body: CreateContextRequest, context: RequestContext = Depends(getRequestContext), ): """Create a new coaching context/dossier.""" mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) goalsJson = None if body.goals: import uuid as _uuid goalsList = [{"id": str(_uuid.uuid4()), "text": g, "status": "open", "createdAt": ""} for g in body.goals] goalsJson = json.dumps(goalsList) contextData = CoachingContext( userId=userId, mandateId=mandateId, instanceId=instanceId, title=body.title, description=body.description, category=body.category, goals=goalsJson, ).model_dump() created = interface.createContext(contextData) logger.info(f"CommCoach context created: {created.get('id')} for user {userId}") return {"context": created} @router.get("/{instanceId}/contexts/{contextId}") @limiter.limit("60/minute") async def getContext( request: Request, instanceId: str, contextId: str, context: RequestContext = Depends(getRequestContext), ): """Get a coaching context with tasks and score summary.""" _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) ctx = interface.getContext(contextId) if not ctx: raise HTTPException(status_code=404, detail="Context not found") _validateOwnership(ctx, context) tasks = interface.getTasks(contextId, userId) scores = interface.getScores(contextId, userId) sessions = interface.getSessions(contextId, userId) return { "context": ctx, "tasks": tasks, "scores": scores, "sessions": sessions, } @router.put("/{instanceId}/contexts/{contextId}") @limiter.limit("30/minute") async def updateContext( request: Request, instanceId: str, contextId: str, body: UpdateContextRequest, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) ctx = interface.getContext(contextId) if not ctx: raise HTTPException(status_code=404, detail="Context not found") _validateOwnership(ctx, context) updates = body.model_dump(exclude_none=True) updated = interface.updateContext(contextId, updates) return {"context": updated} @router.delete("/{instanceId}/contexts/{contextId}") @limiter.limit("10/minute") async def deleteContext( request: Request, instanceId: str, contextId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) ctx = interface.getContext(contextId) if not ctx: raise HTTPException(status_code=404, detail="Context not found") _validateOwnership(ctx, context) interface.deleteContext(contextId) return {"deleted": True} @router.post("/{instanceId}/contexts/{contextId}/archive") @limiter.limit("10/minute") async def archiveContext( request: Request, instanceId: str, contextId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) ctx = interface.getContext(contextId) if not ctx: raise HTTPException(status_code=404, detail="Context not found") _validateOwnership(ctx, context) updated = interface.updateContext(contextId, {"status": CoachingContextStatus.ARCHIVED.value}) return {"context": updated} @router.post("/{instanceId}/contexts/{contextId}/activate") @limiter.limit("10/minute") async def activateContext( request: Request, instanceId: str, contextId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) ctx = interface.getContext(contextId) if not ctx: raise HTTPException(status_code=404, detail="Context not found") _validateOwnership(ctx, context) updated = interface.updateContext(contextId, {"status": CoachingContextStatus.ACTIVE.value}) return {"context": updated} # ========================================================================= # Session Endpoints # ========================================================================= @router.get("/{instanceId}/contexts/{contextId}/sessions") @limiter.limit("60/minute") async def listSessions( request: Request, instanceId: str, contextId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) ctx = interface.getContext(contextId) if not ctx: raise HTTPException(status_code=404, detail="Context not found") _validateOwnership(ctx, context) sessions = interface.getSessions(contextId, userId) return {"sessions": sessions} @router.post("/{instanceId}/contexts/{contextId}/sessions/start") @limiter.limit("10/minute") async def startSession( request: Request, instanceId: str, contextId: str, context: RequestContext = Depends(getRequestContext), ): """Start a new coaching session or resume active one. Returns SSE stream with sessionState, messages, and complete.""" mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) ctx = interface.getContext(contextId) if not ctx: raise HTTPException(status_code=404, detail="Context not found") _validateOwnership(ctx, context) activeSession = interface.getActiveSession(contextId, userId) if activeSession: sessionId = activeSession.get("id") messages = interface.getMessages(sessionId) async def _resumedEventGenerator(): service = CommcoachService(context.user, mandateId, instanceId) greetingText = await service.generateResumeGreeting(sessionId, contextId, messages, interface) assistantMsg = CoachingMessage( sessionId=sessionId, contextId=contextId, userId=userId, role=CoachingMessageRole.ASSISTANT, content=greetingText, contentType=CoachingMessageContentType.TEXT, ).model_dump() createdGreeting = interface.createMessage(assistantMsg) interface.updateSession(sessionId, {"messageCount": len(messages) + 1}) greetingForFrontend = { "id": createdGreeting.get("id"), "sessionId": sessionId, "contextId": contextId, "role": "assistant", "content": greetingText, "contentType": "text", "createdAt": createdGreeting.get("createdAt"), } messagesWithGreeting = messages + [greetingForFrontend] sessionWithCount = {**activeSession, "messageCount": len(messagesWithGreeting)} yield f"data: {json.dumps({'type': 'sessionState', 'data': {'session': sessionWithCount, 'resumed': True, 'messages': messagesWithGreeting}})}\n\n" if greetingText: try: from modules.interfaces.interfaceVoiceObjects import getVoiceInterface voiceInterface = getVoiceInterface(context.user, mandateId) profile = interface.getProfile(userId, instanceId) language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" voiceName = profile.get("preferredVoice") if profile else None from .serviceCommcoach import _stripMarkdownForTts ttsResult = await voiceInterface.textToSpeech( text=_stripMarkdownForTts(greetingText), languageCode=language, voiceName=voiceName, ) if ttsResult and isinstance(ttsResult, dict): audioBytes = ttsResult.get("audioContent") if audioBytes: audioB64 = base64.b64encode( audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() ).decode() yield f"data: {json.dumps({'type': 'ttsAudio', 'data': {'audio': audioB64, 'format': 'mp3'}})}\n\n" except Exception as e: logger.warning(f"TTS failed for resumed session: {e}") yield f"data: {json.dumps({'type': 'complete', 'data': {}, 'timestamp': getIsoTimestamp()})}\n\n" return StreamingResponse( _resumedEventGenerator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, ) sessionData = CoachingSession( contextId=contextId, userId=userId, mandateId=mandateId, instanceId=instanceId, ).model_dump() created = interface.createSession(sessionData) sessionId = created.get("id") eventQueue = getSessionEventQueue(sessionId) await emitSessionEvent(sessionId, "sessionState", {"session": created, "resumed": False}) service = CommcoachService(context.user, mandateId, instanceId) asyncio.create_task(service.processSessionOpening(sessionId, contextId, interface)) async def _newSessionEventGenerator(): from modules.shared.timeUtils import getIsoTimestamp timeoutCount = 0 try: while True: try: event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) yield f"data: {json.dumps(event)}\n\n" timeoutCount = 0 if event.get("type") in ("complete", "error"): break except asyncio.TimeoutError: yield f"data: {json.dumps({'type': 'ping', 'timestamp': getIsoTimestamp()})}\n\n" timeoutCount += 1 if timeoutCount > 10: break except asyncio.CancelledError: pass logger.info(f"CommCoach session started (streaming): {sessionId} for context {contextId}") return StreamingResponse( _newSessionEventGenerator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, ) @router.get("/{instanceId}/sessions/{sessionId}") @limiter.limit("60/minute") async def getSession( request: Request, instanceId: str, sessionId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail="Session not found") _validateOwnership(session, context) messages = interface.getMessages(sessionId) return {"session": session, "messages": messages} @router.post("/{instanceId}/sessions/{sessionId}/complete") @limiter.limit("10/minute") async def completeSession( request: Request, instanceId: str, sessionId: str, context: RequestContext = Depends(getRequestContext), ): """Complete a coaching session. Triggers summary, scoring, task extraction, email.""" mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail="Session not found") _validateOwnership(session, context) if session.get("status") != CoachingSessionStatus.ACTIVE.value: raise HTTPException(status_code=400, detail=f"Session is already {session.get('status')}") service = CommcoachService(context.user, mandateId, instanceId) result = await service.completeSession(sessionId, interface) return {"session": result} @router.post("/{instanceId}/sessions/{sessionId}/cancel") @limiter.limit("10/minute") async def cancelSession( request: Request, instanceId: str, sessionId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail="Session not found") _validateOwnership(session, context) from modules.shared.timeUtils import getIsoTimestamp interface.updateSession(sessionId, { "status": CoachingSessionStatus.CANCELLED.value, "endedAt": getIsoTimestamp(), }) return {"cancelled": True} # ========================================================================= # Chat Streaming Endpoints # ========================================================================= @router.post("/{instanceId}/sessions/{sessionId}/message/stream") @limiter.limit("30/minute") async def sendMessageStream( request: Request, instanceId: str, sessionId: str, body: SendMessageRequest, context: RequestContext = Depends(getRequestContext), ): """Send a text message and stream the coaching response via SSE.""" mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail="Session not found") _validateOwnership(session, context) if session.get("status") != CoachingSessionStatus.ACTIVE.value: raise HTTPException(status_code=400, detail="Session is not active") contextId = session.get("contextId") service = CommcoachService(context.user, mandateId, instanceId) # Process in background asyncio.create_task( service.processMessage(sessionId, contextId, body.content, interface) ) # Stream events async def _eventGenerator(): eventQueue = getSessionEventQueue(sessionId) try: timeout_count = 0 while True: try: event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) yield f"data: {json.dumps(event)}\n\n" timeout_count = 0 eventType = event.get("type") if eventType in ("complete", "error"): break except asyncio.TimeoutError: yield f"data: {json.dumps({'type': 'ping'})}\n\n" timeout_count += 1 if timeout_count > 10: break except asyncio.CancelledError: pass return StreamingResponse( _eventGenerator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) @router.post("/{instanceId}/sessions/{sessionId}/audio/stream") @limiter.limit("20/minute") async def sendAudioStream( request: Request, instanceId: str, sessionId: str, context: RequestContext = Depends(getRequestContext), ): """Send audio, get STT -> coaching response -> TTS via SSE.""" mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail="Session not found") _validateOwnership(session, context) if session.get("status") != CoachingSessionStatus.ACTIVE.value: raise HTTPException(status_code=400, detail="Session is not active") audioBody = await request.body() if not audioBody: raise HTTPException(status_code=400, detail="No audio data received") profile = interface.getProfile(str(context.user.id), instanceId) language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" contextId = session.get("contextId") service = CommcoachService(context.user, mandateId, instanceId) asyncio.create_task( service.processAudioMessage(sessionId, contextId, audioBody, language, interface) ) async def _eventGenerator(): eventQueue = getSessionEventQueue(sessionId) try: timeout_count = 0 while True: try: event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) yield f"data: {json.dumps(event)}\n\n" timeout_count = 0 eventType = event.get("type") if eventType in ("complete", "error"): break if eventType == "message" and event.get("data", {}).get("role") == "assistant": break except asyncio.TimeoutError: yield f"data: {json.dumps({'type': 'ping'})}\n\n" timeout_count += 1 if timeout_count > 10: break except asyncio.CancelledError: pass return StreamingResponse( _eventGenerator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) @router.get("/{instanceId}/sessions/{sessionId}/stream") @limiter.limit("60/minute") async def streamSession( request: Request, instanceId: str, sessionId: str, context: RequestContext = Depends(getRequestContext), ): """Reconnect to an active session's SSE stream.""" _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail="Session not found") _validateOwnership(session, context) async def _eventGenerator(): yield f"data: {json.dumps({'type': 'sessionState', 'data': session})}\n\n" messages = interface.getMessages(sessionId) for msg in messages: yield f"data: {json.dumps({'type': 'message', 'data': msg})}\n\n" eventQueue = getSessionEventQueue(sessionId) try: while True: try: event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) yield f"data: {json.dumps(event)}\n\n" if event.get("type") in ("complete", "error"): break except asyncio.TimeoutError: yield f"data: {json.dumps({'type': 'ping'})}\n\n" except asyncio.CancelledError: pass return StreamingResponse( _eventGenerator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) # ========================================================================= # Task Endpoints # ========================================================================= @router.get("/{instanceId}/contexts/{contextId}/tasks") @limiter.limit("60/minute") async def listTasks( request: Request, instanceId: str, contextId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) tasks = interface.getTasks(contextId, userId) return {"tasks": tasks} @router.post("/{instanceId}/contexts/{contextId}/tasks") @limiter.limit("30/minute") async def createTask( request: Request, instanceId: str, contextId: str, body: CreateTaskRequest, context: RequestContext = Depends(getRequestContext), ): mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) ctx = interface.getContext(contextId) if not ctx: raise HTTPException(status_code=404, detail="Context not found") _validateOwnership(ctx, context) taskData = CoachingTask( contextId=contextId, userId=userId, mandateId=mandateId, title=body.title, description=body.description, priority=body.priority, dueDate=body.dueDate, ).model_dump() created = interface.createTask(taskData) return {"task": created} @router.put("/{instanceId}/tasks/{taskId}") @limiter.limit("30/minute") async def updateTask( request: Request, instanceId: str, taskId: str, body: UpdateTaskRequest, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) task = interface.getTask(taskId) if not task: raise HTTPException(status_code=404, detail="Task not found") _validateOwnership(task, context) updates = body.model_dump(exclude_none=True) updated = interface.updateTask(taskId, updates) return {"task": updated} @router.put("/{instanceId}/tasks/{taskId}/status") @limiter.limit("30/minute") async def updateTaskStatus( request: Request, instanceId: str, taskId: str, body: UpdateTaskStatusRequest, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) task = interface.getTask(taskId) if not task: raise HTTPException(status_code=404, detail="Task not found") _validateOwnership(task, context) updates = {"status": body.status.value} if body.status == CoachingTaskStatus.DONE: from modules.shared.timeUtils import getIsoTimestamp updates["completedAt"] = getIsoTimestamp() updated = interface.updateTask(taskId, updates) return {"task": updated} @router.delete("/{instanceId}/tasks/{taskId}") @limiter.limit("10/minute") async def deleteTask( request: Request, instanceId: str, taskId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) task = interface.getTask(taskId) if not task: raise HTTPException(status_code=404, detail="Task not found") _validateOwnership(task, context) interface.deleteTask(taskId) return {"deleted": True} # ========================================================================= # Dashboard # ========================================================================= @router.get("/{instanceId}/dashboard") @limiter.limit("60/minute") async def getDashboard( request: Request, instanceId: str, context: RequestContext = Depends(getRequestContext), ): _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) data = interface.getDashboardData(userId, instanceId) return {"dashboard": data} # ========================================================================= # User Profile # ========================================================================= @router.get("/{instanceId}/profile") @limiter.limit("60/minute") async def getProfile( request: Request, instanceId: str, context: RequestContext = Depends(getRequestContext), ): mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) profile = interface.getOrCreateProfile(userId, mandateId, instanceId) return {"profile": profile} @router.put("/{instanceId}/profile") @limiter.limit("10/minute") async def updateProfile( request: Request, instanceId: str, body: UpdateProfileRequest, context: RequestContext = Depends(getRequestContext), ): mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) userId = str(context.user.id) profile = interface.getOrCreateProfile(userId, mandateId, instanceId) updates = body.model_dump(exclude_none=True) updated = interface.updateProfile(profile.get("id"), updates) return {"profile": updated} # ========================================================================= # Voice Endpoints # ========================================================================= @router.get("/{instanceId}/voice/languages") @limiter.limit("30/minute") async def getVoiceLanguages( request: Request, instanceId: str, context: RequestContext = Depends(getRequestContext), ): mandateId = _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceVoiceObjects import getVoiceInterface voiceInterface = getVoiceInterface(context.user, mandateId) languagesResult = await voiceInterface.getAvailableLanguages() languageList = languagesResult.get("languages", []) if isinstance(languagesResult, dict) else languagesResult return {"languages": languageList} @router.get("/{instanceId}/voice/voices") @limiter.limit("30/minute") async def getVoiceVoices( request: Request, instanceId: str, language: str = "de-DE", context: RequestContext = Depends(getRequestContext), ): mandateId = _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceVoiceObjects import getVoiceInterface voiceInterface = getVoiceInterface(context.user, mandateId) voicesResult = await voiceInterface.getAvailableVoices(language) voiceList = voicesResult.get("voices", []) if isinstance(voicesResult, dict) else voicesResult return {"voices": voiceList} @router.post("/{instanceId}/voice/tts") @limiter.limit("10/minute") async def testVoice( request: Request, instanceId: str, context: RequestContext = Depends(getRequestContext), ): """TTS preview / voice test.""" mandateId = _validateInstanceAccess(instanceId, context) body = await request.json() text = body.get("text", "Hallo, ich bin dein Coaching-Assistent.") language = body.get("language", "de-DE") voiceId = body.get("voiceId") from modules.interfaces.interfaceVoiceObjects import getVoiceInterface voiceInterface = getVoiceInterface(context.user, mandateId) try: result = await voiceInterface.textToSpeech(text=text, languageCode=language, voiceName=voiceId) if result and isinstance(result, dict): audioContent = result.get("audioContent") if audioContent: audioB64 = base64.b64encode( audioContent if isinstance(audioContent, bytes) else audioContent.encode() ).decode() return {"success": True, "audio": audioB64, "format": "mp3", "text": text} return {"success": False, "error": "TTS returned no audio"} except Exception as e: logger.error(f"Voice test failed: {e}") raise HTTPException(status_code=500, detail=f"TTS test failed: {str(e)}")