# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ CommCoach Service - Coaching Orchestration. Manages the coaching pipeline: message processing, AI calls, scoring, task extraction. """ import re import logging import json import asyncio from typing import Optional, Dict, Any, List from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum from modules.shared.timeUtils import getIsoTimestamp from .datamodelCommcoach import ( CoachingMessage, CoachingMessageRole, CoachingMessageContentType, CoachingSessionStatus, CoachingTask, CoachingTaskPriority, CoachingScore, CoachingScoreTrend, ) from . import serviceCommcoachAi as aiPrompts from .serviceCommcoachAi import ( COMPRESSION_MESSAGE_THRESHOLD, COMPRESSION_RECENT_COUNT, COMPRESSION_MAX_MESSAGES_FETCH, buildResumeGreetingPrompt, ) from .serviceCommcoachContextRetrieval import ( detectIntent, RetrievalIntent, buildSessionSummariesForPrompt, findSessionByDate, searchSessionsByTopic, _parseDateFromMessage, PREVIOUS_SESSION_SUMMARIES_COUNT, ROLLING_OVERVIEW_SESSION_THRESHOLD, ROLLING_OVERVIEW_EVERY_N_SESSIONS, ) logger = logging.getLogger(__name__) def _stripMarkdownForTts(text: str) -> str: """Strip markdown formatting so TTS reads clean speech text.""" t = text t = re.sub(r'\*\*(.+?)\*\*', r'\1', t) t = re.sub(r'\*(.+?)\*', r'\1', t) t = re.sub(r'__(.+?)__', r'\1', t) t = re.sub(r'_(.+?)_', r'\1', t) t = re.sub(r'`[^`]+`', lambda m: m.group(0)[1:-1], t) t = re.sub(r'^#{1,6}\s*', '', t, flags=re.MULTILINE) t = re.sub(r'^\s*[-*+]\s+', '', t, flags=re.MULTILINE) t = re.sub(r'^\s*\d+\.\s+', '', t, flags=re.MULTILINE) t = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', t) t = re.sub(r'\n{3,}', '\n\n', t) return t.strip() # Session event queues for SSE streaming _sessionEvents: Dict[str, asyncio.Queue] = {} async def emitSessionEvent(sessionId: str, eventType: str, data: Any): """Emit an event to the session's SSE stream.""" if sessionId not in _sessionEvents: _sessionEvents[sessionId] = asyncio.Queue() await _sessionEvents[sessionId].put({ "type": eventType, "data": data, "timestamp": getIsoTimestamp(), }) def getSessionEventQueue(sessionId: str) -> asyncio.Queue: if sessionId not in _sessionEvents: _sessionEvents[sessionId] = asyncio.Queue() return _sessionEvents[sessionId] def cleanupSessionEvents(sessionId: str): _sessionEvents.pop(sessionId, None) CHUNK_WORD_SIZE = 4 CHUNK_DELAY_SECONDS = 0.05 async def _emitChunkedResponse(sessionId: str, createdMsg: Dict[str, Any], fullText: str): """Emit response as messageChunk events for progressive display, then the full message.""" msgId = createdMsg.get("id") words = fullText.split() emitted = "" for i in range(0, len(words), CHUNK_WORD_SIZE): chunk = " ".join(words[i:i + CHUNK_WORD_SIZE]) emitted = (emitted + " " + chunk).strip() if emitted else chunk await emitSessionEvent(sessionId, "messageChunk", { "id": msgId, "role": "assistant", "chunk": chunk, "accumulated": emitted, }) await asyncio.sleep(CHUNK_DELAY_SECONDS) await emitSessionEvent(sessionId, "message", { "id": msgId, "role": "assistant", "content": fullText, "createdAt": createdMsg.get("createdAt"), }) def _resolvePersona(session: Optional[Dict[str, Any]], interface) -> Optional[Dict[str, Any]]: """Resolve persona data from session's personaId.""" if not session: return None personaId = session.get("personaId") if not personaId: return None try: return interface.getPersona(personaId) except Exception: return None def _getDocumentSummaries(contextId: str, userId: str, interface) -> Optional[List[str]]: """Get document summaries for context to include in the AI prompt.""" try: docs = interface.getDocuments(contextId, userId) summaries = [] for doc in docs[:5]: summary = doc.get("summary") if summary: summaries.append(f"[{doc.get('fileName', 'Dokument')}] {summary}") elif doc.get("extractedText"): summaries.append(f"[{doc.get('fileName', 'Dokument')}] {doc['extractedText'][:200]}...") return summaries if summaries else None except Exception: return None class CommcoachService: """Coaching orchestrator: processes messages, calls AI, extracts tasks and scores.""" def __init__(self, currentUser: User, mandateId: str, instanceId: str): self.currentUser = currentUser self.mandateId = mandateId self.instanceId = instanceId self.userId = str(currentUser.id) async def processMessage(self, sessionId: str, contextId: str, userContent: str, interface) -> Dict[str, Any]: """ Process a user message through the coaching pipeline: 1. Store user message 2. Build context with history 3. Call AI for coaching response 4. Store assistant message 5. Emit SSE events """ from . import interfaceFeatureCommcoach as interfaceDb # Store user message userMsg = CoachingMessage( sessionId=sessionId, contextId=contextId, userId=self.userId, role=CoachingMessageRole.USER, content=userContent, contentType=CoachingMessageContentType.TEXT, ).model_dump() createdUserMsg = interface.createMessage(userMsg) await emitSessionEvent(sessionId, "message", { "id": createdUserMsg.get("id"), "role": "user", "content": userContent, "createdAt": createdUserMsg.get("createdAt"), }) # Build context context = interface.getContext(contextId) if not context: logger.error(f"Context {contextId} not found") return createdUserMsg messages = interface.getRecentMessages(sessionId, count=COMPRESSION_MAX_MESSAGES_FETCH) session = interface.getSession(sessionId) compressedSummary = session.get("compressedHistorySummary") if session else None compressedUpTo = session.get("compressedHistoryUpToMessageCount") if session else None earlierSummary, previousMessages = aiPrompts.prepareMessagesForPrompt( messages, compressedSummary, compressedUpTo ) if earlierSummary is None and len(messages) > COMPRESSION_MESSAGE_THRESHOLD: toSummarizeCount = len(messages) - COMPRESSION_RECENT_COUNT if toSummarizeCount > 0: toSummarize = messages[:toSummarizeCount] try: summaryPrompt = aiPrompts.buildEarlierConversationSummaryPrompt(toSummarize) summaryResponse = await self._callAi( "Du fasst Coaching-Gespraeche praezise zusammen.", summaryPrompt ) if summaryResponse and summaryResponse.errorCount == 0 and summaryResponse.content: earlierSummary = summaryResponse.content.strip() interface.updateSession(sessionId, { "compressedHistorySummary": earlierSummary, "compressedHistoryUpToMessageCount": toSummarizeCount, }) previousMessages = messages[-COMPRESSION_RECENT_COUNT:] logger.info(f"Session {sessionId}: Compressed history ({toSummarizeCount} msgs -> {len(earlierSummary)} chars)") except Exception as e: logger.warning(f"History compression failed for session {sessionId}: {e}") previousMessages = messages[-20:] tasks = interface.getTasks(contextId, self.userId) retrievalResult = await self._buildRetrievalContext( contextId, sessionId, userContent, context, interface ) persona = _resolvePersona(session, interface) documentSummaries = _getDocumentSummaries(contextId, self.userId, interface) systemPrompt = aiPrompts.buildCoachingSystemPrompt( context, previousMessages, tasks, previousSessionSummaries=retrievalResult.get("previousSessionSummaries"), earlierSummary=earlierSummary, rollingOverview=retrievalResult.get("rollingOverview"), retrievedSession=retrievalResult.get("retrievedSession"), retrievedByTopic=retrievalResult.get("retrievedByTopic"), persona=persona, documentSummaries=documentSummaries, ) if retrievalResult.get("intent") == RetrievalIntent.SUMMARIZE_ALL: systemPrompt += "\n\nWICHTIG: Der Benutzer moechte eine Gesamtzusammenfassung. Erstelle eine umfassende Zusammenfassung aller genannten Sessions und der aktuellen Session." # Call AI await emitSessionEvent(sessionId, "status", {"label": "Coach denkt nach..."}) try: aiResponse = await self._callAi(systemPrompt, userContent) except Exception as e: logger.error(f"AI call failed for session {sessionId}: {e}") await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"}) return createdUserMsg responseText = aiResponse.content.strip() if aiResponse and aiResponse.errorCount == 0 else "Entschuldigung, ich konnte gerade nicht antworten. Bitte versuche es erneut." # Store assistant message assistantMsg = CoachingMessage( sessionId=sessionId, contextId=contextId, userId=self.userId, role=CoachingMessageRole.ASSISTANT, content=responseText, contentType=CoachingMessageContentType.TEXT, ).model_dump() createdAssistantMsg = interface.createMessage(assistantMsg) # Update session message count messages = interface.getMessages(sessionId) interface.updateSession(sessionId, {"messageCount": len(messages)}) await _emitChunkedResponse(sessionId, createdAssistantMsg, responseText) if responseText: try: from modules.interfaces.interfaceVoiceObjects import getVoiceInterface import base64 voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) profile = interface.getProfile(self.userId, self.instanceId) language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" voiceName = profile.get("preferredVoice") if profile else None ttsResult = await voiceInterface.textToSpeech( text=_stripMarkdownForTts(responseText), languageCode=language, voiceName=voiceName, ) if ttsResult and isinstance(ttsResult, dict): audioBytes = ttsResult.get("audioContent") if audioBytes: audioB64 = base64.b64encode( audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() ).decode() await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"}) except Exception as e: logger.warning(f"TTS failed for text message session {sessionId}: {e}") await emitSessionEvent(sessionId, "complete", {}) return createdAssistantMsg async def processSessionOpening(self, sessionId: str, contextId: str, interface) -> Dict[str, Any]: """ Generate and stream the opening greeting for a new session. Emits status, message, and complete events to the session queue. """ await emitSessionEvent(sessionId, "status", {"label": "Coach bereitet sich vor..."}) context = interface.getContext(contextId) if not context: logger.error(f"Context {contextId} not found") await emitSessionEvent(sessionId, "error", {"message": "Context not found"}) await emitSessionEvent(sessionId, "complete", {}) return {} tasks = interface.getTasks(contextId, self.userId) previousMessages = [] allSessions = interface.getSessions(contextId, self.userId) previousSessionSummaries = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT ) session = interface.getSession(sessionId) persona = _resolvePersona(session, interface) documentSummaries = _getDocumentSummaries(contextId, self.userId, interface) systemPrompt = aiPrompts.buildCoachingSystemPrompt( context, previousMessages, tasks, previousSessionSummaries=previousSessionSummaries, persona=persona, documentSummaries=documentSummaries, ) if persona and persona.get("key") != "coach": personaLabel = persona.get("label", "Gespraechspartner") openingUserPrompt = f"Beginne das Gespraech in deiner Rolle als {personaLabel}. Stelle dich kurz vor und eroeffne die Situation gemaess deiner Rollenbeschreibung." else: openingUserPrompt = "Beginne die Coaching-Session mit einer kurzen Begruesssung, fasse in einem Satz zusammen wo wir stehen (falls vorherige Sessions), und stelle eine gezielte Einstiegsfrage zum Thema." try: aiResponse = await self._callAi(systemPrompt, openingUserPrompt) except Exception as e: logger.error(f"AI opening failed for session {sessionId}: {e}") await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"}) await emitSessionEvent(sessionId, "complete", {}) return {} openingContent = ( aiResponse.content.strip() if aiResponse and aiResponse.errorCount == 0 else f"Willkommen zur Coaching-Session zum Thema \"{context.get('title')}\". Was moechtest du heute besprechen?" ) assistantMsg = CoachingMessage( sessionId=sessionId, contextId=contextId, userId=self.userId, role=CoachingMessageRole.ASSISTANT, content=openingContent, contentType=CoachingMessageContentType.TEXT, ).model_dump() createdMsg = interface.createMessage(assistantMsg) interface.updateSession(sessionId, {"messageCount": 1}) await _emitChunkedResponse(sessionId, createdMsg, openingContent) if openingContent: try: from modules.interfaces.interfaceVoiceObjects import getVoiceInterface import base64 voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) profile = interface.getProfile(self.userId, self.instanceId) language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" voiceName = profile.get("preferredVoice") if profile else None ttsResult = await voiceInterface.textToSpeech( text=_stripMarkdownForTts(openingContent), languageCode=language, voiceName=voiceName, ) if ttsResult and isinstance(ttsResult, dict): audioBytes = ttsResult.get("audioContent") if audioBytes: audioB64 = base64.b64encode( audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() ).decode() await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"}) except Exception as e: logger.warning(f"TTS failed for opening: {e}") await emitSessionEvent(sessionId, "complete", {}) logger.info(f"CommCoach session opening completed: {sessionId}") return createdMsg async def generateResumeGreeting(self, sessionId: str, contextId: str, messages: list, interface) -> str: """Generate a follow-up greeting when user returns to an active session.""" context = interface.getContext(contextId) if not context: raise ValueError(f"Context {contextId} not found for resume greeting") contextTitle = context.get("title", "Coaching") prompt = buildResumeGreetingPrompt(messages, contextTitle) aiResponse = await self._callAi( "Du bist ein freundlicher Coach. Antworte kurz und einladend.", prompt, ) if not aiResponse or aiResponse.errorCount > 0 or not aiResponse.content: raise RuntimeError(f"AI resume greeting failed: {getattr(aiResponse, 'errorMessage', 'no content')}") return aiResponse.content.strip() async def processAudioMessage(self, sessionId: str, contextId: str, audioContent: bytes, language: str, interface) -> Dict[str, Any]: """Process an audio message: STT -> coaching pipeline -> TTS response.""" from modules.interfaces.interfaceVoiceObjects import getVoiceInterface await emitSessionEvent(sessionId, "status", {"label": "Sprache wird erkannt..."}) voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) sttResult = await voiceInterface.speechToText( audioContent=audioContent, language=language, skipFallbacks=True, ) transcribedText = "" if sttResult and isinstance(sttResult, dict): transcribedText = sttResult.get("text", "") elif isinstance(sttResult, str): transcribedText = sttResult if not transcribedText.strip(): sttError = sttResult.get("error", "Unbekannter Fehler") if isinstance(sttResult, dict) else "Unbekannter Fehler" msg = f"Sprache konnte nicht erkannt werden. ({sttError})" await emitSessionEvent(sessionId, "error", {"message": msg, "detail": sttError}) return {} # Process through normal pipeline result = await self.processMessage(sessionId, contextId, transcribedText, interface) # Generate TTS for the response assistantContent = result.get("content", "") if assistantContent: await emitSessionEvent(sessionId, "status", {"label": "Antwort wird gesprochen..."}) try: profile = interface.getProfile(self.userId, self.instanceId) voiceName = profile.get("preferredVoice") if profile else None ttsResult = await voiceInterface.textToSpeech( text=_stripMarkdownForTts(assistantContent), languageCode=language, voiceName=voiceName, ) if ttsResult and isinstance(ttsResult, dict): import base64 audioBytes = ttsResult.get("audioContent") if audioBytes: audioB64 = base64.b64encode( audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() ).decode() await emitSessionEvent(sessionId, "ttsAudio", { "audio": audioB64, "format": "mp3", }) except Exception as e: logger.warning(f"TTS failed for session {sessionId}: {e}") return result async def completeSession(self, sessionId: str, interface) -> Dict[str, Any]: """ Complete a session: 1. Generate summary 2. Extract tasks 3. Generate scores 4. Update context stats 5. Send email summary """ session = interface.getSession(sessionId) if not session: return {} contextId = session.get("contextId") context = interface.getContext(contextId) if contextId else None messages = interface.getMessages(sessionId) if len(messages) < 2: interface.updateSession(sessionId, { "status": CoachingSessionStatus.COMPLETED.value, "endedAt": getIsoTimestamp(), }) return session # Generate summary try: summaryPrompt = aiPrompts.buildSummaryPrompt(messages, context.get("title", "Coaching")) summaryResponse = await self._callAi("Du bist ein praeziser Zusammenfasser.", summaryPrompt) summary = summaryResponse.content.strip() if summaryResponse and summaryResponse.errorCount == 0 else None except Exception as e: logger.warning(f"Summary generation failed: {e}") summary = None keyTopics = None if summary: try: keyTopicsPrompt = aiPrompts.buildKeyTopicsExtractionPrompt(summary, messages) keyTopicsResponse = await self._callAi( "Du extrahierst Kernthemen aus Zusammenfassungen.", keyTopicsPrompt ) if keyTopicsResponse and keyTopicsResponse.errorCount == 0 and keyTopicsResponse.content: parsed = aiPrompts.parseJsonResponse(keyTopicsResponse.content, []) if isinstance(parsed, list) and parsed: keyTopics = json.dumps([str(t) for t in parsed[:5]]) except Exception as e: logger.warning(f"Key topics extraction failed: {e}") # Extract tasks try: taskPrompt = aiPrompts.buildTaskExtractionPrompt(messages) taskResponse = await self._callAi("Du extrahierst Aufgaben aus Gespraechen.", taskPrompt) if taskResponse and taskResponse.errorCount == 0: extractedTasks = aiPrompts.parseJsonResponse(taskResponse.content, []) if isinstance(extractedTasks, list): for taskData in extractedTasks[:3]: if isinstance(taskData, dict) and taskData.get("title"): newTask = CoachingTask( contextId=contextId, sessionId=sessionId, userId=self.userId, mandateId=self.mandateId, title=taskData["title"], description=taskData.get("description"), priority=taskData.get("priority", "medium"), ).model_dump() created = interface.createTask(newTask) await emitSessionEvent(sessionId, "taskCreated", created) except Exception as e: logger.warning(f"Task extraction failed: {e}") # Generate scores try: scorePrompt = aiPrompts.buildScoringPrompt(messages, context.get("category", "custom") if context else "custom") scoreResponse = await self._callAi("Du bewertest Kommunikationskompetenz.", scorePrompt) competenceScore = None if scoreResponse and scoreResponse.errorCount == 0: scores = aiPrompts.parseJsonResponse(scoreResponse.content, []) if isinstance(scores, list): scoreValues = [] for scoreData in scores: if isinstance(scoreData, dict) and "dimension" in scoreData and "score" in scoreData: newScore = CoachingScore( contextId=contextId, sessionId=sessionId, userId=self.userId, mandateId=self.mandateId, dimension=scoreData["dimension"], score=float(scoreData["score"]), trend=scoreData.get("trend", "stable"), evidence=scoreData.get("evidence"), ).model_dump() interface.createScore(newScore) scoreValues.append(float(scoreData["score"])) await emitSessionEvent(sessionId, "scoreUpdate", scoreData) if scoreValues: competenceScore = sum(scoreValues) / len(scoreValues) except Exception as e: logger.warning(f"Scoring failed: {e}") competenceScore = None # Generate insights try: insightPrompt = aiPrompts.buildInsightPrompt(messages, summary) insightResponse = await self._callAi("Du generierst kurze Coaching-Insights.", insightPrompt) if insightResponse and insightResponse.errorCount == 0 and insightResponse.content: insights = aiPrompts.parseJsonResponse(insightResponse.content, []) if isinstance(insights, list): existingInsights = aiPrompts._parseJsonField(context.get("insights") if context else None, []) for ins in insights[:3]: insightText = ins.get("text", ins) if isinstance(ins, dict) else str(ins) if insightText: existingInsights.append({"text": insightText, "sessionId": sessionId, "createdAt": getIsoTimestamp()}) await emitSessionEvent(sessionId, "insightGenerated", {"text": insightText, "sessionId": sessionId}) if contextId and existingInsights: interface.updateContext(contextId, {"insights": json.dumps(existingInsights[-10:])}) except Exception as e: logger.warning(f"Insight generation failed: {e}") # Calculate duration startedAt = session.get("startedAt", "") durationSeconds = 0 if startedAt: try: from datetime import datetime start = datetime.fromisoformat(startedAt.replace("Z", "+00:00")) end = datetime.now(start.tzinfo) if start.tzinfo else datetime.now() durationSeconds = int((end - start).total_seconds()) except Exception: pass # Update session sessionUpdates = { "status": CoachingSessionStatus.COMPLETED.value, "endedAt": getIsoTimestamp(), "summary": summary, "durationSeconds": durationSeconds, "messageCount": len(messages), } if competenceScore is not None: sessionUpdates["competenceScore"] = round(competenceScore, 1) if keyTopics is not None: sessionUpdates["keyTopics"] = keyTopics interface.updateSession(sessionId, sessionUpdates) # Update context stats if contextId: allSessions = interface.getSessions(contextId, self.userId) completedCount = len([s for s in allSessions if s.get("status") == CoachingSessionStatus.COMPLETED.value]) interface.updateContext(contextId, { "sessionCount": completedCount, "lastSessionAt": getIsoTimestamp(), }) # Update user profile streak self._updateStreak(interface) # Check and award badges try: from .serviceCommcoachGamification import checkAndAwardBadges updatedSession = interface.getSession(sessionId) newBadges = await checkAndAwardBadges( interface, self.userId, self.mandateId, self.instanceId, session=updatedSession ) for badge in newBadges: await emitSessionEvent(sessionId, "badgeAwarded", badge) except Exception as e: logger.warning(f"Badge check failed: {e}") # Send email summary if summary: await self._sendSessionEmail(session, summary, interface) await emitSessionEvent(sessionId, "sessionState", { "status": "completed", "summary": summary, "competenceScore": competenceScore, }) await emitSessionEvent(sessionId, "complete", {}) return interface.getSession(sessionId) def _updateStreak(self, interface): """Update the user's streak in their profile.""" try: profile = interface.getProfile(self.userId, self.instanceId) if not profile: profile = interface.getOrCreateProfile(self.userId, self.mandateId, self.instanceId) from datetime import datetime, timedelta lastSessionAt = profile.get("lastSessionAt") currentStreak = profile.get("streakDays", 0) longestStreak = profile.get("longestStreak", 0) totalSessions = profile.get("totalSessions", 0) today = datetime.now().date() isConsecutive = False if lastSessionAt: try: lastDate = datetime.fromisoformat(lastSessionAt.replace("Z", "+00:00")).date() diff = (today - lastDate).days if diff == 1: isConsecutive = True elif diff == 0: isConsecutive = True # Same day, maintain streak except Exception: pass newStreak = (currentStreak + 1) if isConsecutive else 1 newLongest = max(longestStreak, newStreak) interface.updateProfile(profile.get("id"), { "streakDays": newStreak, "longestStreak": newLongest, "totalSessions": totalSessions + 1, "lastSessionAt": getIsoTimestamp(), }) except Exception as e: logger.warning(f"Failed to update streak: {e}") async def _sendSessionEmail(self, session: Dict[str, Any], summary: str, interface): """Send session summary via email if enabled.""" try: profile = interface.getProfile(self.userId, self.instanceId) if profile and not profile.get("emailSummaryEnabled", True): return from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface from modules.interfaces.interfaceDbApp import getRootInterface rootInterface = getRootInterface() user = rootInterface.getUser(self.userId) if not user or not user.email: return messaging = getMessagingInterface() subject = f"Coaching-Session Zusammenfassung: {session.get('contextId', 'Session')}" htmlMessage = f"""
{summary.replace(chr(10), '
')}
Diese Zusammenfassung wurde automatisch erstellt.
""" messaging.send("email", user.email, subject, htmlMessage) interface.updateSession(session.get("id"), {"emailSent": True}) logger.info(f"Session summary email sent to {user.email}") except Exception as e: logger.warning(f"Failed to send session email: {e}") async def _buildRetrievalContext( self, contextId: str, sessionId: str, userContent: str, context: Dict[str, Any], interface, ) -> Dict[str, Any]: """ Build retrieval context based on user intent. Returns: previousSessionSummaries, rollingOverview, retrievedSession, retrievedByTopic, intent, sessionSummaries. """ intent = detectIntent(userContent) allSessions = interface.getSessions(contextId, self.userId) completedSessions = [s for s in allSessions if s.get("status") == CoachingSessionStatus.COMPLETED.value] for s in completedSessions: startedAt = s.get("startedAt") or s.get("createdAt") or "" if startedAt: try: from datetime import datetime dt = datetime.fromisoformat(str(startedAt).replace("Z", "+00:00")) s["date"] = dt.strftime("%d.%m.%Y") except Exception: s["date"] = "" result = { "intent": intent, "previousSessionSummaries": [], "rollingOverview": None, "retrievedSession": None, "retrievedByTopic": None, "sessionSummaries": [], } ctx = interface.getContext(contextId) rollingOverview = ctx.get("rollingOverview") if ctx else None rollingUpTo = ctx.get("rollingOverviewUpToSessionCount") if ctx else None if intent == RetrievalIntent.SUMMARIZE_ALL: result["previousSessionSummaries"] = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=20 ) result["sessionSummaries"] = result["previousSessionSummaries"] if len(completedSessions) >= ROLLING_OVERVIEW_SESSION_THRESHOLD and rollingOverview: result["rollingOverview"] = rollingOverview elif intent == RetrievalIntent.RECALL_SESSION: targetDate = _parseDateFromMessage(userContent) retrieved = findSessionByDate(completedSessions, targetDate) if retrieved: result["retrievedSession"] = retrieved logger.info(f"Session recall: found session {retrieved.get('id')} for date {targetDate}") result["previousSessionSummaries"] = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT ) if rollingOverview: result["rollingOverview"] = rollingOverview elif intent == RetrievalIntent.RECALL_TOPIC: retrieved = searchSessionsByTopic(completedSessions, userContent) result["retrievedByTopic"] = retrieved if retrieved: logger.info(f"Topic recall: found {len(retrieved)} sessions for query") result["previousSessionSummaries"] = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT ) if rollingOverview: result["rollingOverview"] = rollingOverview else: result["previousSessionSummaries"] = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT ) if len(completedSessions) >= ROLLING_OVERVIEW_SESSION_THRESHOLD: if rollingOverview and rollingUpTo is not None and rollingUpTo >= len(completedSessions) - 3: result["rollingOverview"] = rollingOverview else: try: toSummarize = completedSessions[ROLLING_OVERVIEW_EVERY_N_SESSIONS:] toSummarize = toSummarize[:ROLLING_OVERVIEW_EVERY_N_SESSIONS * 2] if len(toSummarize) >= ROLLING_OVERVIEW_EVERY_N_SESSIONS: summariesForOverview = buildSessionSummariesForPrompt( toSummarize, limit=len(toSummarize), ) overviewPrompt = aiPrompts.buildRollingOverviewPrompt( summariesForOverview, context.get("title", "Coaching") ) overviewResponse = await self._callAi( "Du fasst Coaching-Sessions kompakt zusammen.", overviewPrompt ) if overviewResponse and overviewResponse.errorCount == 0 and overviewResponse.content: newOverview = overviewResponse.content.strip() interface.updateContext(contextId, { "rollingOverview": newOverview, "rollingOverviewUpToSessionCount": len(completedSessions), }) result["rollingOverview"] = newOverview logger.info(f"Context {contextId}: Rolling overview updated ({len(toSummarize)} sessions)") except Exception as e: logger.warning(f"Rolling overview failed for context {contextId}: {e}") return result async def _callAi(self, systemPrompt: str, userPrompt: str): """Call the AI service with the given prompts.""" from modules.services.serviceAi.mainServiceAi import AiService serviceContext = type('Ctx', (), { 'user': self.currentUser, 'mandateId': self.mandateId, 'featureInstanceId': self.instanceId, 'featureCode': 'commcoach', })() aiService = AiService(serviceCenter=serviceContext) await aiService.ensureAiObjectsInitialized() aiRequest = AiCallRequest( prompt=userPrompt, context=systemPrompt, options=AiCallOptions( operationType=OperationTypeEnum.DATA_ANALYSE, priority=PriorityEnum.QUALITY, ) ) return await aiService.callAi(aiRequest)