1147 lines
42 KiB
Python
1147 lines
42 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
CommCoach routes for the backend API.
|
|
Implements training module management, session streaming, tasks, and dashboard.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import asyncio
|
|
import base64
|
|
import uuid
|
|
|
|
|
|
from typing import Optional
|
|
from fastapi import APIRouter, HTTPException, Depends, Request, WebSocket, WebSocketDisconnect, Query
|
|
from fastapi.responses import StreamingResponse, Response
|
|
|
|
from modules.auth import limiter, getRequestContext, RequestContext
|
|
from modules.shared.timeUtils import getIsoTimestamp
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
|
|
from . import interfaceFeatureCommcoach as interfaceDb
|
|
from .datamodelCommcoach import (
|
|
TrainingModule, TrainingModuleStatus, CoachingSession, CoachingSessionStatus,
|
|
CoachingMessage, CoachingMessageRole, CoachingMessageContentType,
|
|
CoachingTask, CoachingTaskStatus,
|
|
CoachingPersona, CoachingBadge, ModulePersonaMapping,
|
|
CreateModuleRequest, UpdateModuleRequest,
|
|
SendMessageRequest, CreateTaskRequest, UpdateTaskRequest, UpdateTaskStatusRequest,
|
|
UpdateProfileRequest,
|
|
StartSessionRequest, CreatePersonaRequest, UpdatePersonaRequest, SetModulePersonasRequest,
|
|
)
|
|
from .serviceCommcoach import CommcoachService, emitSessionEvent, getSessionEventQueue, cleanupSessionEvents
|
|
from modules.shared.i18nRegistry import apiRouteContext
|
|
routeApiMsg = apiRouteContext("routeFeatureCommcoach")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_activeProcessTasks: dict = {}
|
|
|
|
|
|
def _audit(context: RequestContext, action: str, resourceType: str = None, resourceId: str = None, details: str = ""):
|
|
"""Log an audit event for CommCoach. Non-blocking, best-effort."""
|
|
try:
|
|
from modules.shared.auditLogger import audit_logger
|
|
audit_logger.logEvent(
|
|
userId=str(context.user.id),
|
|
mandateId=str(context.mandateId) if context.mandateId else None,
|
|
category="commcoach",
|
|
action=action,
|
|
resourceType=resourceType,
|
|
resourceId=resourceId,
|
|
details=details,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
router = APIRouter(
|
|
prefix="/api/commcoach",
|
|
tags=["CommCoach"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
# =========================================================================
|
|
# Helpers
|
|
# =========================================================================
|
|
|
|
def _getInterface(context: RequestContext, instanceId: Optional[str] = None):
|
|
mandateId = str(context.mandateId) if context.mandateId else None
|
|
return interfaceDb.getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
|
|
|
|
|
|
def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
instance = featureInterface.getFeatureInstance(instanceId)
|
|
if not instance:
|
|
raise HTTPException(status_code=404, detail=f"Feature instance '{instanceId}' not found")
|
|
mandateId = instance.get("mandateId") if isinstance(instance, dict) else getattr(instance, "mandateId", None)
|
|
if not mandateId:
|
|
raise HTTPException(status_code=500, detail=routeApiMsg("Feature instance has no mandateId"))
|
|
return str(mandateId)
|
|
|
|
|
|
def _validateOwnership(record: dict, context: RequestContext, fieldName: str = "userId") -> None:
|
|
"""Strict ownership check. SysAdmin does NOT bypass for content access."""
|
|
if record.get(fieldName) != str(context.user.id):
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Not found"))
|
|
|
|
|
|
# =========================================================================
|
|
# Module Endpoints (formerly Context)
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/modules")
|
|
@limiter.limit("60/minute")
|
|
async def listModules(
|
|
request: Request,
|
|
instanceId: str,
|
|
includeArchived: bool = False,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""List all training modules for the current user."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
modules = interface.getModules(instanceId, userId, includeArchived=includeArchived)
|
|
return {"modules": modules}
|
|
|
|
|
|
@router.post("/{instanceId}/modules")
|
|
@limiter.limit("20/minute")
|
|
async def createModule(
|
|
request: Request,
|
|
instanceId: str,
|
|
body: CreateModuleRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Create a new training module."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
|
|
moduleData = TrainingModule(
|
|
userId=userId,
|
|
mandateId=mandateId,
|
|
instanceId=instanceId,
|
|
title=body.title,
|
|
description=body.description,
|
|
moduleType=body.moduleType,
|
|
goals=body.goals,
|
|
personaId=body.personaId,
|
|
kpiTargets=body.kpiTargets,
|
|
).model_dump()
|
|
|
|
created = interface.createModule(moduleData)
|
|
logger.info(f"CommCoach module created: {created.get('id')} for user {userId}")
|
|
_audit(context, "commcoach.module.created", "TrainingModule", created.get("id"), f"Title: {body.title}")
|
|
return {"module": created}
|
|
|
|
|
|
@router.get("/{instanceId}/modules/{moduleId}")
|
|
@limiter.limit("60/minute")
|
|
async def getModuleDetail(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Get a training module with tasks and score summary."""
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
tasks = interface.getTasks(moduleId, userId)
|
|
scores = interface.getScores(moduleId, userId)
|
|
sessions = interface.getSessions(moduleId, userId)
|
|
|
|
return {
|
|
"module": mod,
|
|
"tasks": tasks,
|
|
"scores": scores,
|
|
"sessions": sessions,
|
|
}
|
|
|
|
|
|
@router.put("/{instanceId}/modules/{moduleId}")
|
|
@limiter.limit("30/minute")
|
|
async def updateModuleFields(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
body: UpdateModuleRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
updates = body.model_dump(exclude_none=True)
|
|
updated = interface.updateModule(moduleId, updates)
|
|
return {"module": updated}
|
|
|
|
|
|
@router.delete("/{instanceId}/modules/{moduleId}")
|
|
@limiter.limit("10/minute")
|
|
async def deleteModuleAndData(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
interface.deleteModule(moduleId)
|
|
return {"deleted": True}
|
|
|
|
|
|
@router.post("/{instanceId}/modules/{moduleId}/archive")
|
|
@limiter.limit("10/minute")
|
|
async def archiveModule(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
updated = interface.updateModule(moduleId, {"status": TrainingModuleStatus.ARCHIVED.value})
|
|
_audit(context, "commcoach.module.archived", "TrainingModule", moduleId)
|
|
return {"module": updated}
|
|
|
|
|
|
@router.post("/{instanceId}/modules/{moduleId}/activate")
|
|
@limiter.limit("10/minute")
|
|
async def activateModule(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
updated = interface.updateModule(moduleId, {"status": TrainingModuleStatus.ACTIVE.value})
|
|
return {"module": updated}
|
|
|
|
|
|
# =========================================================================
|
|
# Session Endpoints
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/modules/{moduleId}/sessions")
|
|
@limiter.limit("60/minute")
|
|
async def listSessions(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
sessions = interface.getSessions(moduleId, userId)
|
|
return {"sessions": sessions}
|
|
|
|
|
|
@router.post("/{instanceId}/modules/{moduleId}/sessions/start")
|
|
@limiter.limit("10/minute")
|
|
async def startSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
personaId: Optional[str] = None,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Start a new coaching session or resume active one. Returns SSE stream with sessionState, messages, and complete."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
activeSession = interface.getActiveSession(moduleId, userId)
|
|
if activeSession:
|
|
sessionId = activeSession.get("id")
|
|
messages = interface.getMessages(sessionId)
|
|
|
|
async def _resumedEventGenerator():
|
|
service = CommcoachService(context.user, mandateId, instanceId)
|
|
greetingText = await service.generateResumeGreeting(sessionId, moduleId, messages, interface)
|
|
assistantMsg = CoachingMessage(
|
|
sessionId=sessionId,
|
|
moduleId=moduleId,
|
|
userId=userId,
|
|
role=CoachingMessageRole.ASSISTANT,
|
|
content=greetingText,
|
|
contentType=CoachingMessageContentType.TEXT,
|
|
).model_dump()
|
|
createdGreeting = interface.createMessage(assistantMsg)
|
|
interface.updateSession(sessionId, {"messageCount": len(messages) + 1})
|
|
greetingForFrontend = {
|
|
"id": createdGreeting.get("id"),
|
|
"sessionId": sessionId,
|
|
"moduleId": moduleId,
|
|
"role": "assistant",
|
|
"content": greetingText,
|
|
"contentType": "text",
|
|
"createdAt": createdGreeting.get("createdAt"),
|
|
}
|
|
messagesWithGreeting = messages + [greetingForFrontend]
|
|
sessionWithCount = {**activeSession, "messageCount": len(messagesWithGreeting)}
|
|
yield f"data: {json.dumps({'type': 'sessionState', 'data': {'session': sessionWithCount, 'resumed': True, 'messages': messagesWithGreeting}})}\n\n"
|
|
if greetingText:
|
|
try:
|
|
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
|
|
voiceInterface = getVoiceInterface(context.user, mandateId)
|
|
from .serviceCommcoach import getUserVoicePrefs, stripMarkdownForTts, buildTtsConfigErrorMessage
|
|
language, voiceName = getUserVoicePrefs(userId, mandateId)
|
|
ttsResult = await voiceInterface.textToSpeech(
|
|
text=stripMarkdownForTts(greetingText),
|
|
languageCode=language,
|
|
voiceName=voiceName,
|
|
)
|
|
if ttsResult and isinstance(ttsResult, dict):
|
|
audioBytes = ttsResult.get("audioContent")
|
|
if audioBytes:
|
|
audioB64 = base64.b64encode(
|
|
audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode()
|
|
).decode()
|
|
yield f"data: {json.dumps({'type': 'ttsAudio', 'data': {'audio': audioB64, 'format': 'mp3'}})}\n\n"
|
|
else:
|
|
errorDetail = ttsResult.get("error", "Text-to-Speech failed")
|
|
yield f"data: {json.dumps({'type': 'error', 'data': {'message': _buildTtsConfigErrorMessage(language, voiceName, errorDetail), 'detail': errorDetail, 'ttsLanguage': language, 'ttsVoice': voiceName}})}\n\n"
|
|
except Exception as e:
|
|
logger.warning(f"TTS failed for resumed session: {e}")
|
|
yield f"data: {json.dumps({'type': 'error', 'data': {'message': 'Die konfigurierte Stimme für diese Sprache ist ungültig oder nicht verfügbar. Bitte passe sie unter Einstellungen > Stimme & Sprache an.', 'detail': str(e)}})}\n\n"
|
|
yield f"data: {json.dumps({'type': 'complete', 'data': {}, 'timestamp': getIsoTimestamp()})}\n\n"
|
|
|
|
return StreamingResponse(
|
|
_resumedEventGenerator(),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
|
|
)
|
|
|
|
sessionData = CoachingSession(
|
|
moduleId=moduleId,
|
|
userId=userId,
|
|
mandateId=mandateId,
|
|
instanceId=instanceId,
|
|
personaId=personaId,
|
|
).model_dump()
|
|
created = interface.createSession(sessionData)
|
|
sessionId = created.get("id")
|
|
|
|
eventQueue = getSessionEventQueue(sessionId)
|
|
await emitSessionEvent(sessionId, "sessionState", {"session": created, "resumed": False})
|
|
|
|
service = CommcoachService(context.user, mandateId, instanceId)
|
|
asyncio.create_task(service.processSessionOpening(sessionId, moduleId, interface))
|
|
|
|
async def _newSessionEventGenerator():
|
|
from modules.shared.timeUtils import getIsoTimestamp
|
|
timeoutCount = 0
|
|
try:
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(eventQueue.get(), timeout=30.0)
|
|
yield f"data: {json.dumps(event)}\n\n"
|
|
timeoutCount = 0
|
|
if event.get("type") in ("complete", "error"):
|
|
break
|
|
except asyncio.TimeoutError:
|
|
yield f"data: {json.dumps({'type': 'ping', 'timestamp': getIsoTimestamp()})}\n\n"
|
|
timeoutCount += 1
|
|
if timeoutCount > 10:
|
|
break
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
logger.info(f"CommCoach session started (streaming): {sessionId} for module {moduleId}")
|
|
_audit(context, "commcoach.session.started", "CoachingSession", sessionId, f"Module: {moduleId}")
|
|
return StreamingResponse(
|
|
_newSessionEventGenerator(),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
|
|
)
|
|
|
|
|
|
|
|
@router.get("/{instanceId}/sessions/{sessionId}")
|
|
@limiter.limit("60/minute")
|
|
async def getSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
|
|
_validateOwnership(session, context)
|
|
|
|
messages = interface.getMessages(sessionId)
|
|
return {"session": session, "messages": messages}
|
|
|
|
|
|
@router.post("/{instanceId}/sessions/{sessionId}/complete")
|
|
@limiter.limit("10/minute")
|
|
async def completeSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Complete a coaching session. Triggers summary, scoring, task extraction, email."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
|
|
_validateOwnership(session, context)
|
|
|
|
if session.get("status") != CoachingSessionStatus.ACTIVE.value:
|
|
raise HTTPException(status_code=400, detail=f"Session is already {session.get('status')}")
|
|
|
|
service = CommcoachService(context.user, mandateId, instanceId)
|
|
result = await service.completeSession(sessionId, interface)
|
|
_audit(context, "commcoach.session.completed", "CoachingSession", sessionId)
|
|
return {"session": result}
|
|
|
|
|
|
@router.post("/{instanceId}/sessions/{sessionId}/cancel")
|
|
@limiter.limit("10/minute")
|
|
async def cancelSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
|
|
_validateOwnership(session, context)
|
|
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
interface.updateSession(sessionId, {
|
|
"status": CoachingSessionStatus.CANCELLED.value,
|
|
"endedAt": getUtcTimestamp(),
|
|
})
|
|
return {"cancelled": True}
|
|
|
|
|
|
# =========================================================================
|
|
# Chat Streaming Endpoints
|
|
# =========================================================================
|
|
|
|
@router.post("/{instanceId}/sessions/{sessionId}/message/stream")
|
|
@limiter.limit("30/minute")
|
|
async def sendMessageStream(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
body: SendMessageRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Send a text message and stream the coaching response via SSE."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
|
|
_validateOwnership(session, context)
|
|
|
|
if session.get("status") != CoachingSessionStatus.ACTIVE.value:
|
|
raise HTTPException(status_code=400, detail=routeApiMsg("Session is not active"))
|
|
|
|
moduleId = session.get("moduleId")
|
|
service = CommcoachService(context.user, mandateId, instanceId)
|
|
|
|
existingTask = _activeProcessTasks.get(sessionId)
|
|
if existingTask and not existingTask.done():
|
|
existingTask.cancel()
|
|
logger.info(f"Cancelled previous processMessage task for session {sessionId}")
|
|
|
|
def _onTaskDone(task):
|
|
_activeProcessTasks.pop(sessionId, None)
|
|
|
|
task = asyncio.create_task(
|
|
service.processMessage(
|
|
sessionId, moduleId, body.content, interface,
|
|
fileIds=body.fileIds,
|
|
dataSourceIds=body.dataSourceIds,
|
|
featureDataSourceIds=body.featureDataSourceIds,
|
|
allowedProviders=body.allowedProviders,
|
|
)
|
|
)
|
|
task.add_done_callback(_onTaskDone)
|
|
_activeProcessTasks[sessionId] = task
|
|
|
|
# Stream events
|
|
async def _eventGenerator():
|
|
eventQueue = getSessionEventQueue(sessionId)
|
|
try:
|
|
timeout_count = 0
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(eventQueue.get(), timeout=30.0)
|
|
yield f"data: {json.dumps(event)}\n\n"
|
|
timeout_count = 0
|
|
|
|
eventType = event.get("type")
|
|
if eventType in ("complete", "error"):
|
|
break
|
|
except asyncio.TimeoutError:
|
|
yield f"data: {json.dumps({'type': 'ping'})}\n\n"
|
|
timeout_count += 1
|
|
if timeout_count > 10:
|
|
break
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
_eventGenerator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
)
|
|
|
|
|
|
@router.post("/{instanceId}/sessions/{sessionId}/audio/stream")
|
|
@limiter.limit("20/minute")
|
|
async def sendAudioStream(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Send audio, get STT -> coaching response -> TTS via SSE."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
|
|
_validateOwnership(session, context)
|
|
|
|
if session.get("status") != CoachingSessionStatus.ACTIVE.value:
|
|
raise HTTPException(status_code=400, detail=routeApiMsg("Session is not active"))
|
|
|
|
audioBody = await request.body()
|
|
if not audioBody:
|
|
raise HTTPException(status_code=400, detail=routeApiMsg("No audio data received"))
|
|
|
|
from .serviceCommcoach import getUserVoicePrefs
|
|
language, _ = getUserVoicePrefs(str(context.user.id), mandateId)
|
|
|
|
moduleId = session.get("moduleId")
|
|
service = CommcoachService(context.user, mandateId, instanceId)
|
|
|
|
asyncio.create_task(
|
|
service.processAudioMessage(sessionId, moduleId, audioBody, language, interface)
|
|
)
|
|
|
|
async def _eventGenerator():
|
|
eventQueue = getSessionEventQueue(sessionId)
|
|
try:
|
|
timeout_count = 0
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(eventQueue.get(), timeout=30.0)
|
|
yield f"data: {json.dumps(event)}\n\n"
|
|
timeout_count = 0
|
|
|
|
eventType = event.get("type")
|
|
if eventType in ("complete", "error"):
|
|
break
|
|
if eventType == "message" and event.get("data", {}).get("role") == "assistant":
|
|
break
|
|
except asyncio.TimeoutError:
|
|
yield f"data: {json.dumps({'type': 'ping'})}\n\n"
|
|
timeout_count += 1
|
|
if timeout_count > 10:
|
|
break
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
_eventGenerator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/{instanceId}/sessions/{sessionId}/stream")
|
|
@limiter.limit("60/minute")
|
|
async def streamSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Reconnect to an active session's SSE stream."""
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
|
|
_validateOwnership(session, context)
|
|
|
|
async def _eventGenerator():
|
|
yield f"data: {json.dumps({'type': 'sessionState', 'data': session})}\n\n"
|
|
|
|
messages = interface.getMessages(sessionId)
|
|
for msg in messages:
|
|
yield f"data: {json.dumps({'type': 'message', 'data': msg})}\n\n"
|
|
|
|
eventQueue = getSessionEventQueue(sessionId)
|
|
try:
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(eventQueue.get(), timeout=30.0)
|
|
yield f"data: {json.dumps(event)}\n\n"
|
|
if event.get("type") in ("complete", "error"):
|
|
break
|
|
except asyncio.TimeoutError:
|
|
yield f"data: {json.dumps({'type': 'ping'})}\n\n"
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
_eventGenerator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
)
|
|
|
|
|
|
# =========================================================================
|
|
# Task Endpoints
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/modules/{moduleId}/tasks")
|
|
@limiter.limit("60/minute")
|
|
async def listTasks(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
tasks = interface.getTasks(moduleId, userId)
|
|
return {"tasks": tasks}
|
|
|
|
|
|
@router.post("/{instanceId}/modules/{moduleId}/tasks")
|
|
@limiter.limit("30/minute")
|
|
async def createTask(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
body: CreateTaskRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
taskData = CoachingTask(
|
|
moduleId=moduleId,
|
|
userId=userId,
|
|
mandateId=mandateId,
|
|
title=body.title,
|
|
description=body.description,
|
|
priority=body.priority,
|
|
dueDate=body.dueDate,
|
|
).model_dump()
|
|
|
|
created = interface.createTask(taskData)
|
|
return {"task": created}
|
|
|
|
|
|
@router.put("/{instanceId}/tasks/{taskId}")
|
|
@limiter.limit("30/minute")
|
|
async def updateTask(
|
|
request: Request,
|
|
instanceId: str,
|
|
taskId: str,
|
|
body: UpdateTaskRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
task = interface.getTask(taskId)
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Task not found"))
|
|
_validateOwnership(task, context)
|
|
|
|
updates = body.model_dump(exclude_none=True)
|
|
updated = interface.updateTask(taskId, updates)
|
|
return {"task": updated}
|
|
|
|
|
|
@router.put("/{instanceId}/tasks/{taskId}/status")
|
|
@limiter.limit("30/minute")
|
|
async def updateTaskStatus(
|
|
request: Request,
|
|
instanceId: str,
|
|
taskId: str,
|
|
body: UpdateTaskStatusRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
task = interface.getTask(taskId)
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Task not found"))
|
|
_validateOwnership(task, context)
|
|
|
|
updates = {"status": body.status.value}
|
|
if body.status == CoachingTaskStatus.DONE:
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
updates["completedAt"] = getUtcTimestamp()
|
|
|
|
updated = interface.updateTask(taskId, updates)
|
|
return {"task": updated}
|
|
|
|
|
|
@router.delete("/{instanceId}/tasks/{taskId}")
|
|
@limiter.limit("10/minute")
|
|
async def deleteTask(
|
|
request: Request,
|
|
instanceId: str,
|
|
taskId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
task = interface.getTask(taskId)
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Task not found"))
|
|
_validateOwnership(task, context)
|
|
|
|
interface.deleteTask(taskId)
|
|
return {"deleted": True}
|
|
|
|
|
|
# =========================================================================
|
|
# Dashboard
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/dashboard")
|
|
@limiter.limit("60/minute")
|
|
async def getDashboard(
|
|
request: Request,
|
|
instanceId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
data = interface.getDashboardData(userId, instanceId)
|
|
return {"dashboard": data}
|
|
|
|
|
|
# =========================================================================
|
|
# User Profile
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/profile")
|
|
@limiter.limit("60/minute")
|
|
async def getProfile(
|
|
request: Request,
|
|
instanceId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
profile = interface.getOrCreateProfile(userId, mandateId, instanceId)
|
|
return {"profile": profile}
|
|
|
|
|
|
@router.put("/{instanceId}/profile")
|
|
@limiter.limit("10/minute")
|
|
async def updateProfile(
|
|
request: Request,
|
|
instanceId: str,
|
|
body: UpdateProfileRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
|
|
profile = interface.getOrCreateProfile(userId, mandateId, instanceId)
|
|
updates = body.model_dump(exclude_none=True)
|
|
updated = interface.updateProfile(profile.get("id"), updates)
|
|
return {"profile": updated}
|
|
|
|
|
|
# =========================================================================
|
|
# Export Endpoints (Iteration 2)
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/modules/{moduleId}/export")
|
|
@limiter.limit("10/minute")
|
|
async def exportDossier(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
format: str = "md",
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Export a dossier as Markdown or PDF."""
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
|
|
mod = interface.getModule(moduleId)
|
|
if not mod:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
_validateOwnership(mod, context)
|
|
|
|
tasks = interface.getTasks(moduleId, userId)
|
|
scores = interface.getScores(moduleId, userId)
|
|
sessions = interface.getSessions(moduleId, userId)
|
|
|
|
from .serviceCommcoachExport import buildDossierMarkdown, renderDossierPdf
|
|
_audit(context, "commcoach.export.requested", "TrainingModule", moduleId, f"format={format}")
|
|
|
|
if format == "pdf":
|
|
pdfBytes = await renderDossierPdf(mod, sessions, tasks, scores)
|
|
return Response(content=pdfBytes, media_type="application/pdf",
|
|
headers={"Content-Disposition": f'attachment; filename="dossier_{moduleId[:8]}.pdf"'})
|
|
|
|
md = buildDossierMarkdown(mod, sessions, tasks, scores)
|
|
return Response(content=md, media_type="text/markdown",
|
|
headers={"Content-Disposition": f'attachment; filename="dossier_{moduleId[:8]}.md"'})
|
|
|
|
|
|
@router.get("/{instanceId}/sessions/{sessionId}/export")
|
|
@limiter.limit("10/minute")
|
|
async def exportSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
format: str = "md",
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Export a session as Markdown or PDF."""
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
|
|
_validateOwnership(session, context)
|
|
|
|
moduleId = session.get("moduleId")
|
|
userId = str(context.user.id)
|
|
messages = interface.getMessages(sessionId)
|
|
tasks = interface.getTasks(moduleId, userId) if moduleId else []
|
|
scores = interface.getScores(moduleId, userId) if moduleId else []
|
|
|
|
from .serviceCommcoachExport import buildSessionMarkdown, renderSessionPdf
|
|
_audit(context, "commcoach.export.requested", "CoachingSession", sessionId, f"format={format}")
|
|
|
|
if format == "pdf":
|
|
pdfBytes = await renderSessionPdf(session, messages, tasks, scores)
|
|
return Response(content=pdfBytes, media_type="application/pdf",
|
|
headers={"Content-Disposition": f'attachment; filename="session_{sessionId[:8]}.pdf"'})
|
|
|
|
md = buildSessionMarkdown(session, messages, tasks, scores)
|
|
return Response(content=md, media_type="text/markdown",
|
|
headers={"Content-Disposition": f'attachment; filename="session_{sessionId[:8]}.md"'})
|
|
|
|
|
|
# =========================================================================
|
|
# Persona Endpoints (Iteration 2)
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/personas")
|
|
@limiter.limit("60/minute")
|
|
async def listPersonas(
|
|
request: Request,
|
|
instanceId: str,
|
|
pagination: Optional[str] = Query(None),
|
|
mode: Optional[str] = Query(None, description="'filterValues' or 'ids'"),
|
|
column: Optional[str] = Query(None, description="Column key for mode=filterValues"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
allPersonas = interface.getAllPersonas(instanceId)
|
|
|
|
if mode == "filterValues":
|
|
from modules.routes.routeHelpers import handleFilterValuesInMemory
|
|
if not column:
|
|
raise HTTPException(status_code=400, detail=routeApiMsg("column parameter required"))
|
|
return handleFilterValuesInMemory(allPersonas, column, pagination)
|
|
if mode == "ids":
|
|
from modules.routes.routeHelpers import handleIdsInMemory
|
|
return handleIdsInMemory(allPersonas, pagination)
|
|
|
|
if pagination:
|
|
import json as _json
|
|
from modules.datamodels.datamodelPagination import PaginationParams, PaginationMetadata, normalize_pagination_dict
|
|
from modules.routes.routeHelpers import applyFiltersAndSort, paginateInMemory
|
|
paginationDict = _json.loads(pagination)
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationParams = PaginationParams(**paginationDict)
|
|
filtered = applyFiltersAndSort(allPersonas, paginationParams)
|
|
pageItems, totalItems = paginateInMemory(filtered, paginationParams)
|
|
import math
|
|
return {
|
|
"items": pageItems,
|
|
"pagination": PaginationMetadata(
|
|
currentPage=paginationParams.page,
|
|
pageSize=paginationParams.pageSize,
|
|
totalItems=totalItems,
|
|
totalPages=math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0,
|
|
sort=[s.model_dump() for s in paginationParams.sort] if paginationParams.sort else [],
|
|
filters=paginationParams.filters,
|
|
).model_dump(),
|
|
}
|
|
|
|
return {"items": allPersonas, "pagination": None}
|
|
|
|
|
|
@router.post("/{instanceId}/personas")
|
|
@limiter.limit("10/minute")
|
|
async def createPersona(
|
|
request: Request,
|
|
instanceId: str,
|
|
body: CreatePersonaRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
|
|
data = CoachingPersona(
|
|
userId=userId,
|
|
mandateId=mandateId,
|
|
instanceId=instanceId,
|
|
key=f"custom_{str(uuid.uuid4())[:8]}",
|
|
label=body.label,
|
|
description=body.description,
|
|
gender=body.gender,
|
|
systemPromptOverride=body.systemPromptOverride,
|
|
category="custom",
|
|
).model_dump()
|
|
created = interface.createPersona(data)
|
|
return {"persona": created}
|
|
|
|
|
|
@router.put("/{instanceId}/personas/{personaId}")
|
|
@limiter.limit("10/minute")
|
|
async def updatePersonaRoute(
|
|
request: Request,
|
|
instanceId: str,
|
|
personaId: str,
|
|
body: UpdatePersonaRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
persona = interface.getPersona(personaId)
|
|
if not persona:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Persona not found"))
|
|
if persona.get("category") == "builtin":
|
|
raise HTTPException(status_code=403, detail=routeApiMsg("Builtin personas cannot be edited"))
|
|
_validateOwnership(persona, context)
|
|
|
|
updates = body.model_dump(exclude_none=True)
|
|
updated = interface.updatePersona(personaId, updates)
|
|
return {"persona": updated}
|
|
|
|
|
|
@router.delete("/{instanceId}/personas/{personaId}")
|
|
@limiter.limit("10/minute")
|
|
async def deletePersonaRoute(
|
|
request: Request,
|
|
instanceId: str,
|
|
personaId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
persona = interface.getPersona(personaId)
|
|
if not persona:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Persona not found"))
|
|
if persona.get("category") == "builtin":
|
|
raise HTTPException(status_code=403, detail=routeApiMsg("Builtin personas cannot be deleted"))
|
|
_validateOwnership(persona, context)
|
|
|
|
interface.deletePersona(personaId)
|
|
return {"deleted": True}
|
|
|
|
|
|
# =========================================================================
|
|
# Module-Persona Mapping Endpoints
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/modules/{moduleId}/personas")
|
|
@limiter.limit("60/minute")
|
|
async def getModulePersonas(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
mappings = interface.getModulePersonas(moduleId)
|
|
personaIds = [m["personaId"] for m in mappings]
|
|
return {"personaIds": personaIds}
|
|
|
|
|
|
@router.put("/{instanceId}/modules/{moduleId}/personas")
|
|
@limiter.limit("20/minute")
|
|
async def setModulePersonas(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
body: SetModulePersonasRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
module = interface.getModule(moduleId)
|
|
if not module:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Module not found"))
|
|
interface.setModulePersonas(moduleId, body.personaIds, instanceId)
|
|
return {"personaIds": body.personaIds}
|
|
|
|
|
|
# =========================================================================
|
|
# Badge + Score History Endpoints (Iteration 2)
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/badges")
|
|
@limiter.limit("60/minute")
|
|
async def listBadges(
|
|
request: Request,
|
|
instanceId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
badges = interface.getBadges(userId, instanceId)
|
|
return {"badges": badges}
|
|
|
|
|
|
@router.get("/{instanceId}/modules/{moduleId}/scores/history")
|
|
@limiter.limit("60/minute")
|
|
async def getScoreHistory(
|
|
request: Request,
|
|
instanceId: str,
|
|
moduleId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
userId = str(context.user.id)
|
|
history = interface.getScoreHistory(moduleId, userId)
|
|
return {"history": history}
|
|
|
|
|
|
# =========================================================================
|
|
# Backward-Compatibility Redirects (old /contexts/ paths → /modules/)
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/contexts")
|
|
async def _redirectListContexts(instanceId: str, request: Request):
|
|
qs = f"?{request.query_params}" if request.query_params else ""
|
|
return Response(status_code=301, headers={"Location": f"/api/commcoach/{instanceId}/modules{qs}"})
|
|
|
|
|
|
@router.post("/{instanceId}/contexts")
|
|
async def _redirectCreateContext(instanceId: str, request: Request):
|
|
return Response(status_code=301, headers={"Location": f"/api/commcoach/{instanceId}/modules"})
|
|
|
|
|
|
@router.get("/{instanceId}/contexts/{contextId}")
|
|
async def _redirectGetContext(instanceId: str, contextId: str, request: Request):
|
|
return Response(status_code=301, headers={"Location": f"/api/commcoach/{instanceId}/modules/{contextId}"})
|
|
|
|
|
|
@router.put("/{instanceId}/contexts/{contextId}")
|
|
async def _redirectUpdateContext(instanceId: str, contextId: str, request: Request):
|
|
return Response(status_code=301, headers={"Location": f"/api/commcoach/{instanceId}/modules/{contextId}"})
|
|
|
|
|
|
@router.delete("/{instanceId}/contexts/{contextId}")
|
|
async def _redirectDeleteContext(instanceId: str, contextId: str, request: Request):
|
|
return Response(status_code=301, headers={"Location": f"/api/commcoach/{instanceId}/modules/{contextId}"})
|