# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ CommCoach Service - Coaching Orchestration. Manages the coaching pipeline: message processing, AI calls, scoring, task extraction. """ import re import logging import json import asyncio from typing import Optional, Dict, Any, List from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum from modules.shared.timeUtils import getIsoTimestamp from .datamodelCommcoach import ( CoachingMessage, CoachingMessageRole, CoachingMessageContentType, CoachingSessionStatus, CoachingTask, CoachingTaskPriority, CoachingScore, CoachingScoreTrend, ) from . import serviceCommcoachAi as aiPrompts from .serviceCommcoachAi import ( COMPRESSION_MESSAGE_THRESHOLD, COMPRESSION_RECENT_COUNT, COMPRESSION_MAX_MESSAGES_FETCH, buildResumeGreetingPrompt, ) from .serviceCommcoachContextRetrieval import ( detectIntent, RetrievalIntent, buildSessionSummariesForPrompt, findSessionByDate, searchSessionsByTopic, _parseDateFromMessage, PREVIOUS_SESSION_SUMMARIES_COUNT, ROLLING_OVERVIEW_SESSION_THRESHOLD, ROLLING_OVERVIEW_EVERY_N_SESSIONS, ) logger = logging.getLogger(__name__) def _stripMarkdownForTts(text: str) -> str: """Strip markdown formatting so TTS reads clean speech text.""" t = text t = re.sub(r'\*\*(.+?)\*\*', r'\1', t) t = re.sub(r'\*(.+?)\*', r'\1', t) t = re.sub(r'__(.+?)__', r'\1', t) t = re.sub(r'_(.+?)_', r'\1', t) t = re.sub(r'`[^`]+`', lambda m: m.group(0)[1:-1], t) t = re.sub(r'^#{1,6}\s*', '', t, flags=re.MULTILINE) t = re.sub(r'^\s*[-*+]\s+', '', t, flags=re.MULTILINE) t = re.sub(r'^\s*\d+\.\s+', '', t, flags=re.MULTILINE) t = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', t) t = re.sub(r'\n{3,}', '\n\n', t) return t.strip() # Session event queues for SSE streaming _sessionEvents: Dict[str, asyncio.Queue] = {} async def emitSessionEvent(sessionId: str, eventType: str, data: Any): """Emit an event to the session's SSE stream.""" if sessionId not in _sessionEvents: _sessionEvents[sessionId] = asyncio.Queue() await _sessionEvents[sessionId].put({ "type": eventType, "data": data, "timestamp": getIsoTimestamp(), }) def getSessionEventQueue(sessionId: str) -> asyncio.Queue: if sessionId not in _sessionEvents: _sessionEvents[sessionId] = asyncio.Queue() return _sessionEvents[sessionId] def cleanupSessionEvents(sessionId: str): _sessionEvents.pop(sessionId, None) CHUNK_WORD_SIZE = 4 CHUNK_DELAY_SECONDS = 0.05 def _parseAiJsonResponse(rawText: str) -> Dict[str, Any]: """Parse the structured JSON response from AI. Strips optional markdown code fences.""" text = rawText.strip() if text.startswith("```"): lines = text.split("\n") lines = lines[1:] if lines and lines[-1].strip() == "```": lines = lines[:-1] text = "\n".join(lines) try: return json.loads(text) except json.JSONDecodeError: logger.warning(f"AI JSON parse failed, using raw text: {text[:200]}") return {"text": rawText.strip(), "speech": "", "documents": []} async def _generateAndEmitTts(sessionId: str, speechText: str, currentUser, mandateId: str, instanceId: str, interface): """Generate TTS audio from speech text and emit as SSE event.""" if not speechText: return try: from modules.interfaces.interfaceVoiceObjects import getVoiceInterface import base64 voiceInterface = getVoiceInterface(currentUser, mandateId) profile = interface.getProfile(str(currentUser.id), instanceId) language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" voiceName = profile.get("preferredVoice") if profile else None ttsResult = await voiceInterface.textToSpeech( text=_stripMarkdownForTts(speechText), languageCode=language, voiceName=voiceName, ) if ttsResult and isinstance(ttsResult, dict): audioBytes = ttsResult.get("audioContent") if audioBytes: audioB64 = base64.b64encode( audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() ).decode() await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"}) except Exception as e: logger.warning(f"TTS failed for session {sessionId}: {e}") async def _saveGeneratedDocument(doc: Dict[str, Any], contextId: str, userId: str, mandateId: str, instanceId: str, interface, sessionId: str): """Save a document generated by AI and emit SSE event.""" from .datamodelCommcoach import CoachingDocument try: title = doc.get("title", "Dokument") content = doc.get("content", "") docData = CoachingDocument( contextId=contextId, userId=userId, mandateId=mandateId, instanceId=instanceId, fileName=f"{title}.md", mimeType="text/markdown", fileSize=len(content.encode()), extractedText=content, summary=title, ).model_dump() created = interface.createDocument(docData) await emitSessionEvent(sessionId, "documentCreated", created) except Exception as e: logger.warning(f"Failed to save generated document: {e}") async def _emitChunkedResponse(sessionId: str, createdMsg: Dict[str, Any], fullText: str): """Emit response as messageChunk events for progressive display, then the full message.""" msgId = createdMsg.get("id") words = fullText.split() emitted = "" for i in range(0, len(words), CHUNK_WORD_SIZE): chunk = " ".join(words[i:i + CHUNK_WORD_SIZE]) emitted = (emitted + " " + chunk).strip() if emitted else chunk await emitSessionEvent(sessionId, "messageChunk", { "id": msgId, "role": "assistant", "chunk": chunk, "accumulated": emitted, }) await asyncio.sleep(CHUNK_DELAY_SECONDS) await emitSessionEvent(sessionId, "message", { "id": msgId, "role": "assistant", "content": fullText, "createdAt": createdMsg.get("createdAt"), }) def _resolvePersona(session: Optional[Dict[str, Any]], interface) -> Optional[Dict[str, Any]]: """Resolve persona data from session's personaId.""" if not session: return None personaId = session.get("personaId") if not personaId: return None try: return interface.getPersona(personaId) except Exception: return None def _getDocumentSummaries(contextId: str, userId: str, interface) -> Optional[List[str]]: """Get document summaries for context to include in the AI prompt.""" try: docs = interface.getDocuments(contextId, userId) summaries = [] for doc in docs[:5]: summary = doc.get("summary") if summary: summaries.append(f"[{doc.get('fileName', 'Dokument')}] {summary}") elif doc.get("extractedText"): summaries.append(f"[{doc.get('fileName', 'Dokument')}] {doc['extractedText'][:200]}...") return summaries if summaries else None except Exception: return None class CommcoachService: """Coaching orchestrator: processes messages, calls AI, extracts tasks and scores.""" def __init__(self, currentUser: User, mandateId: str, instanceId: str): self.currentUser = currentUser self.mandateId = mandateId self.instanceId = instanceId self.userId = str(currentUser.id) async def processMessage(self, sessionId: str, contextId: str, userContent: str, interface) -> Dict[str, Any]: """ Process a user message through the coaching pipeline: 1. Store user message 2. Build context with history 3. Call AI for coaching response 4. Store assistant message 5. Emit SSE events """ from . import interfaceFeatureCommcoach as interfaceDb # Store user message userMsg = CoachingMessage( sessionId=sessionId, contextId=contextId, userId=self.userId, role=CoachingMessageRole.USER, content=userContent, contentType=CoachingMessageContentType.TEXT, ).model_dump() createdUserMsg = interface.createMessage(userMsg) await emitSessionEvent(sessionId, "message", { "id": createdUserMsg.get("id"), "role": "user", "content": userContent, "createdAt": createdUserMsg.get("createdAt"), }) # Build context context = interface.getContext(contextId) if not context: logger.error(f"Context {contextId} not found") return createdUserMsg messages = interface.getRecentMessages(sessionId, count=COMPRESSION_MAX_MESSAGES_FETCH) session = interface.getSession(sessionId) compressedSummary = session.get("compressedHistorySummary") if session else None compressedUpTo = session.get("compressedHistoryUpToMessageCount") if session else None earlierSummary, previousMessages = aiPrompts.prepareMessagesForPrompt( messages, compressedSummary, compressedUpTo ) if earlierSummary is None and len(messages) > COMPRESSION_MESSAGE_THRESHOLD: toSummarizeCount = len(messages) - COMPRESSION_RECENT_COUNT if toSummarizeCount > 0: toSummarize = messages[:toSummarizeCount] try: summaryPrompt = aiPrompts.buildEarlierConversationSummaryPrompt(toSummarize) summaryResponse = await self._callAi( "Du fasst Coaching-Gespräche präzise zusammen.", summaryPrompt ) if summaryResponse and summaryResponse.errorCount == 0 and summaryResponse.content: earlierSummary = summaryResponse.content.strip() interface.updateSession(sessionId, { "compressedHistorySummary": earlierSummary, "compressedHistoryUpToMessageCount": toSummarizeCount, }) previousMessages = messages[-COMPRESSION_RECENT_COUNT:] logger.info(f"Session {sessionId}: Compressed history ({toSummarizeCount} msgs -> {len(earlierSummary)} chars)") except Exception as e: logger.warning(f"History compression failed for session {sessionId}: {e}") previousMessages = messages[-20:] tasks = interface.getTasks(contextId, self.userId) retrievalResult = await self._buildRetrievalContext( contextId, sessionId, userContent, context, interface ) persona = _resolvePersona(session, interface) documentSummaries = _getDocumentSummaries(contextId, self.userId, interface) systemPrompt = aiPrompts.buildCoachingSystemPrompt( context, previousMessages, tasks, previousSessionSummaries=retrievalResult.get("previousSessionSummaries"), earlierSummary=earlierSummary, rollingOverview=retrievalResult.get("rollingOverview"), retrievedSession=retrievalResult.get("retrievedSession"), retrievedByTopic=retrievalResult.get("retrievedByTopic"), persona=persona, documentSummaries=documentSummaries, ) if retrievalResult.get("intent") == RetrievalIntent.SUMMARIZE_ALL: systemPrompt += "\n\nWICHTIG: Der Benutzer möchte eine Gesamtzusammenfassung. Erstelle eine umfassende Zusammenfassung aller genannten Sessions und der aktuellen Session." # Call AI await emitSessionEvent(sessionId, "status", {"label": "Coach denkt nach..."}) try: aiResponse = await self._callAi(systemPrompt, userContent) except Exception as e: logger.error(f"AI call failed for session {sessionId}: {e}") await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"}) return createdUserMsg responseRaw = aiResponse.content.strip() if aiResponse and aiResponse.errorCount == 0 else "" if not responseRaw: parsed = {"text": "Entschuldigung, ich konnte gerade nicht antworten. Bitte versuche es erneut.", "speech": "", "documents": []} else: parsed = _parseAiJsonResponse(responseRaw) textContent = parsed.get("text", "") speechContent = parsed.get("speech", "") documents = parsed.get("documents", []) for doc in documents: await _saveGeneratedDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId) assistantMsg = CoachingMessage( sessionId=sessionId, contextId=contextId, userId=self.userId, role=CoachingMessageRole.ASSISTANT, content=textContent, contentType=CoachingMessageContentType.TEXT, ).model_dump() createdAssistantMsg = interface.createMessage(assistantMsg) messages = interface.getMessages(sessionId) interface.updateSession(sessionId, {"messageCount": len(messages)}) ttsTask = asyncio.create_task( _generateAndEmitTts(sessionId, speechContent, self.currentUser, self.mandateId, self.instanceId, interface) ) await _emitChunkedResponse(sessionId, createdAssistantMsg, textContent) await ttsTask await emitSessionEvent(sessionId, "complete", {}) return createdAssistantMsg async def processSessionOpening(self, sessionId: str, contextId: str, interface) -> Dict[str, Any]: """ Generate and stream the opening greeting for a new session. Emits status, message, and complete events to the session queue. """ await emitSessionEvent(sessionId, "status", {"label": "Coach bereitet sich vor..."}) context = interface.getContext(contextId) if not context: logger.error(f"Context {contextId} not found") await emitSessionEvent(sessionId, "error", {"message": "Context not found"}) await emitSessionEvent(sessionId, "complete", {}) return {} tasks = interface.getTasks(contextId, self.userId) previousMessages = [] allSessions = interface.getSessions(contextId, self.userId) previousSessionSummaries = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT ) session = interface.getSession(sessionId) persona = _resolvePersona(session, interface) documentSummaries = _getDocumentSummaries(contextId, self.userId, interface) systemPrompt = aiPrompts.buildCoachingSystemPrompt( context, previousMessages, tasks, previousSessionSummaries=previousSessionSummaries, persona=persona, documentSummaries=documentSummaries, ) isFirstSession = not previousSessionSummaries or len(previousSessionSummaries) == 0 if persona and persona.get("key") != "coach": personaLabel = persona.get("label", "Gesprächspartner") openingUserPrompt = f"Beginne das Gespräch in deiner Rolle als {personaLabel}. Stelle dich kurz vor und eröffne die Situation gemäss deiner Rollenbeschreibung." elif isFirstSession: openingUserPrompt = "Dies ist die ERSTE Session zu diesem Thema. Begrüsse den Benutzer, stelle das Thema kurz vor und stelle eine offene Einstiegsfrage. Erfinde KEINE vorherigen Gespräche oder Zusammenfassungen." else: openingUserPrompt = "Begrüsse den Benutzer zurück, fasse in einem Satz zusammen wo wir stehen, und stelle eine gezielte Einstiegsfrage." try: aiResponse = await self._callAi(systemPrompt, openingUserPrompt) except Exception as e: logger.error(f"AI opening failed for session {sessionId}: {e}") await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"}) await emitSessionEvent(sessionId, "complete", {}) return {} responseRaw = ( aiResponse.content.strip() if aiResponse and aiResponse.errorCount == 0 else "" ) if not responseRaw: parsed = {"text": f"Willkommen zur Coaching-Session zum Thema \"{context.get('title')}\". Was möchtest du heute besprechen?", "speech": "", "documents": []} else: parsed = _parseAiJsonResponse(responseRaw) textContent = parsed.get("text", "") speechContent = parsed.get("speech", "") documents = parsed.get("documents", []) for doc in documents: await _saveGeneratedDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId) assistantMsg = CoachingMessage( sessionId=sessionId, contextId=contextId, userId=self.userId, role=CoachingMessageRole.ASSISTANT, content=textContent, contentType=CoachingMessageContentType.TEXT, ).model_dump() createdMsg = interface.createMessage(assistantMsg) interface.updateSession(sessionId, {"messageCount": 1}) ttsTask = asyncio.create_task( _generateAndEmitTts(sessionId, speechContent, self.currentUser, self.mandateId, self.instanceId, interface) ) await _emitChunkedResponse(sessionId, createdMsg, textContent) await ttsTask await emitSessionEvent(sessionId, "complete", {}) logger.info(f"CommCoach session opening completed: {sessionId}") return createdMsg async def generateResumeGreeting(self, sessionId: str, contextId: str, messages: list, interface) -> str: """Generate a follow-up greeting when user returns to an active session.""" context = interface.getContext(contextId) if not context: raise ValueError(f"Context {contextId} not found for resume greeting") contextTitle = context.get("title", "Coaching") prompt = buildResumeGreetingPrompt(messages, contextTitle) aiResponse = await self._callAi( "Du bist ein freundlicher Coach. Antworte kurz und einladend.", prompt, ) if not aiResponse or aiResponse.errorCount > 0 or not aiResponse.content: raise RuntimeError(f"AI resume greeting failed: {getattr(aiResponse, 'errorMessage', 'no content')}") return aiResponse.content.strip() async def processAudioMessage(self, sessionId: str, contextId: str, audioContent: bytes, language: str, interface) -> Dict[str, Any]: """Process an audio message: STT -> coaching pipeline -> TTS response.""" from modules.interfaces.interfaceVoiceObjects import getVoiceInterface await emitSessionEvent(sessionId, "status", {"label": "Sprache wird erkannt..."}) voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) sttResult = await voiceInterface.speechToText( audioContent=audioContent, language=language, skipFallbacks=True, ) transcribedText = "" if sttResult and isinstance(sttResult, dict): transcribedText = sttResult.get("text", "") elif isinstance(sttResult, str): transcribedText = sttResult if not transcribedText.strip(): sttError = sttResult.get("error", "Unbekannter Fehler") if isinstance(sttResult, dict) else "Unbekannter Fehler" msg = f"Sprache konnte nicht erkannt werden. ({sttError})" await emitSessionEvent(sessionId, "error", {"message": msg, "detail": sttError}) return {} result = await self.processMessage(sessionId, contextId, transcribedText, interface) return result async def completeSession(self, sessionId: str, interface) -> Dict[str, Any]: """ Complete a session: 1. Generate summary 2. Extract tasks 3. Generate scores 4. Update context stats 5. Send email summary """ session = interface.getSession(sessionId) if not session: return {} contextId = session.get("contextId") context = interface.getContext(contextId) if contextId else None messages = interface.getMessages(sessionId) if len(messages) < 2: interface.updateSession(sessionId, { "status": CoachingSessionStatus.COMPLETED.value, "endedAt": getIsoTimestamp(), }) return session # Generate summary try: summaryPrompt = aiPrompts.buildSummaryPrompt(messages, context.get("title", "Coaching")) summaryResponse = await self._callAi("Du bist ein präziser Zusammenfasser.", summaryPrompt) summary = summaryResponse.content.strip() if summaryResponse and summaryResponse.errorCount == 0 else None except Exception as e: logger.warning(f"Summary generation failed: {e}") summary = None keyTopics = None if summary: try: keyTopicsPrompt = aiPrompts.buildKeyTopicsExtractionPrompt(summary, messages) keyTopicsResponse = await self._callAi( "Du extrahierst Kernthemen aus Zusammenfassungen.", keyTopicsPrompt ) if keyTopicsResponse and keyTopicsResponse.errorCount == 0 and keyTopicsResponse.content: parsed = aiPrompts.parseJsonResponse(keyTopicsResponse.content, []) if isinstance(parsed, list) and parsed: keyTopics = json.dumps([str(t) for t in parsed[:5]]) except Exception as e: logger.warning(f"Key topics extraction failed: {e}") # Extract tasks try: taskPrompt = aiPrompts.buildTaskExtractionPrompt(messages) taskResponse = await self._callAi("Du extrahierst Aufgaben aus Gesprächen.", taskPrompt) if taskResponse and taskResponse.errorCount == 0: extractedTasks = aiPrompts.parseJsonResponse(taskResponse.content, []) if isinstance(extractedTasks, list): for taskData in extractedTasks[:3]: if isinstance(taskData, dict) and taskData.get("title"): newTask = CoachingTask( contextId=contextId, sessionId=sessionId, userId=self.userId, mandateId=self.mandateId, title=taskData["title"], description=taskData.get("description"), priority=taskData.get("priority", "medium"), ).model_dump() created = interface.createTask(newTask) await emitSessionEvent(sessionId, "taskCreated", created) except Exception as e: logger.warning(f"Task extraction failed: {e}") # Generate scores try: scorePrompt = aiPrompts.buildScoringPrompt(messages, context.get("category", "custom") if context else "custom") scoreResponse = await self._callAi("Du bewertest Kommunikationskompetenz.", scorePrompt) competenceScore = None if scoreResponse and scoreResponse.errorCount == 0: scores = aiPrompts.parseJsonResponse(scoreResponse.content, []) if isinstance(scores, list): scoreValues = [] for scoreData in scores: if isinstance(scoreData, dict) and "dimension" in scoreData and "score" in scoreData: newScore = CoachingScore( contextId=contextId, sessionId=sessionId, userId=self.userId, mandateId=self.mandateId, dimension=scoreData["dimension"], score=float(scoreData["score"]), trend=scoreData.get("trend", "stable"), evidence=scoreData.get("evidence"), ).model_dump() interface.createScore(newScore) scoreValues.append(float(scoreData["score"])) await emitSessionEvent(sessionId, "scoreUpdate", scoreData) if scoreValues: competenceScore = sum(scoreValues) / len(scoreValues) except Exception as e: logger.warning(f"Scoring failed: {e}") competenceScore = None # Generate insights try: insightPrompt = aiPrompts.buildInsightPrompt(messages, summary) insightResponse = await self._callAi("Du generierst kurze Coaching-Insights.", insightPrompt) if insightResponse and insightResponse.errorCount == 0 and insightResponse.content: insights = aiPrompts.parseJsonResponse(insightResponse.content, []) if isinstance(insights, list): existingInsights = aiPrompts._parseJsonField(context.get("insights") if context else None, []) for ins in insights[:3]: insightText = ins.get("text", ins) if isinstance(ins, dict) else str(ins) if insightText: existingInsights.append({"text": insightText, "sessionId": sessionId, "createdAt": getIsoTimestamp()}) await emitSessionEvent(sessionId, "insightGenerated", {"text": insightText, "sessionId": sessionId}) if contextId and existingInsights: interface.updateContext(contextId, {"insights": json.dumps(existingInsights[-10:])}) except Exception as e: logger.warning(f"Insight generation failed: {e}") # Calculate duration startedAt = session.get("startedAt", "") durationSeconds = 0 if startedAt: try: from datetime import datetime start = datetime.fromisoformat(startedAt.replace("Z", "+00:00")) end = datetime.now(start.tzinfo) if start.tzinfo else datetime.now() durationSeconds = int((end - start).total_seconds()) except Exception: pass # Update session sessionUpdates = { "status": CoachingSessionStatus.COMPLETED.value, "endedAt": getIsoTimestamp(), "summary": summary, "durationSeconds": durationSeconds, "messageCount": len(messages), } if competenceScore is not None: sessionUpdates["competenceScore"] = round(competenceScore, 1) if keyTopics is not None: sessionUpdates["keyTopics"] = keyTopics interface.updateSession(sessionId, sessionUpdates) # Update context stats if contextId: allSessions = interface.getSessions(contextId, self.userId) completedCount = len([s for s in allSessions if s.get("status") == CoachingSessionStatus.COMPLETED.value]) interface.updateContext(contextId, { "sessionCount": completedCount, "lastSessionAt": getIsoTimestamp(), }) # Update user profile streak self._updateStreak(interface) # Check and award badges try: from .serviceCommcoachGamification import checkAndAwardBadges updatedSession = interface.getSession(sessionId) newBadges = await checkAndAwardBadges( interface, self.userId, self.mandateId, self.instanceId, session=updatedSession ) for badge in newBadges: await emitSessionEvent(sessionId, "badgeAwarded", badge) except Exception as e: logger.warning(f"Badge check failed: {e}") # Send email summary if summary: await self._sendSessionEmail(session, summary, interface) await emitSessionEvent(sessionId, "sessionState", { "status": "completed", "summary": summary, "competenceScore": competenceScore, }) await emitSessionEvent(sessionId, "complete", {}) return interface.getSession(sessionId) def _updateStreak(self, interface): """Update the user's streak in their profile.""" try: profile = interface.getProfile(self.userId, self.instanceId) if not profile: profile = interface.getOrCreateProfile(self.userId, self.mandateId, self.instanceId) from datetime import datetime, timedelta lastSessionAt = profile.get("lastSessionAt") currentStreak = profile.get("streakDays", 0) longestStreak = profile.get("longestStreak", 0) totalSessions = profile.get("totalSessions", 0) today = datetime.now().date() isConsecutive = False if lastSessionAt: try: lastDate = datetime.fromisoformat(lastSessionAt.replace("Z", "+00:00")).date() diff = (today - lastDate).days if diff == 1: isConsecutive = True elif diff == 0: isConsecutive = True # Same day, maintain streak except Exception: pass newStreak = (currentStreak + 1) if isConsecutive else 1 newLongest = max(longestStreak, newStreak) interface.updateProfile(profile.get("id"), { "streakDays": newStreak, "longestStreak": newLongest, "totalSessions": totalSessions + 1, "lastSessionAt": getIsoTimestamp(), }) except Exception as e: logger.warning(f"Failed to update streak: {e}") async def _sendSessionEmail(self, session: Dict[str, Any], summary: str, interface): """Send session summary via email if enabled.""" try: profile = interface.getProfile(self.userId, self.instanceId) if profile and not profile.get("emailSummaryEnabled", True): return from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface from modules.interfaces.interfaceDbApp import getRootInterface rootInterface = getRootInterface() user = rootInterface.getUser(self.userId) if not user or not user.email: return messaging = getMessagingInterface() subject = f"Coaching-Session Zusammenfassung: {session.get('contextId', 'Session')}" htmlMessage = f"""

Coaching-Session Zusammenfassung

{summary.replace(chr(10), '
')}


Diese Zusammenfassung wurde automatisch erstellt.

""" messaging.send("email", user.email, subject, htmlMessage) interface.updateSession(session.get("id"), {"emailSent": True}) logger.info(f"Session summary email sent to {user.email}") except Exception as e: logger.warning(f"Failed to send session email: {e}") async def _buildRetrievalContext( self, contextId: str, sessionId: str, userContent: str, context: Dict[str, Any], interface, ) -> Dict[str, Any]: """ Build retrieval context based on user intent. Returns: previousSessionSummaries, rollingOverview, retrievedSession, retrievedByTopic, intent, sessionSummaries. """ intent = detectIntent(userContent) allSessions = interface.getSessions(contextId, self.userId) completedSessions = [s for s in allSessions if s.get("status") == CoachingSessionStatus.COMPLETED.value] for s in completedSessions: startedAt = s.get("startedAt") or s.get("createdAt") or "" if startedAt: try: from datetime import datetime dt = datetime.fromisoformat(str(startedAt).replace("Z", "+00:00")) s["date"] = dt.strftime("%d.%m.%Y") except Exception: s["date"] = "" result = { "intent": intent, "previousSessionSummaries": [], "rollingOverview": None, "retrievedSession": None, "retrievedByTopic": None, "sessionSummaries": [], } ctx = interface.getContext(contextId) rollingOverview = ctx.get("rollingOverview") if ctx else None rollingUpTo = ctx.get("rollingOverviewUpToSessionCount") if ctx else None if intent == RetrievalIntent.SUMMARIZE_ALL: result["previousSessionSummaries"] = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=20 ) result["sessionSummaries"] = result["previousSessionSummaries"] if len(completedSessions) >= ROLLING_OVERVIEW_SESSION_THRESHOLD and rollingOverview: result["rollingOverview"] = rollingOverview elif intent == RetrievalIntent.RECALL_SESSION: targetDate = _parseDateFromMessage(userContent) retrieved = findSessionByDate(completedSessions, targetDate) if retrieved: result["retrievedSession"] = retrieved logger.info(f"Session recall: found session {retrieved.get('id')} for date {targetDate}") result["previousSessionSummaries"] = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT ) if rollingOverview: result["rollingOverview"] = rollingOverview elif intent == RetrievalIntent.RECALL_TOPIC: retrieved = searchSessionsByTopic(completedSessions, userContent) result["retrievedByTopic"] = retrieved if retrieved: logger.info(f"Topic recall: found {len(retrieved)} sessions for query") result["previousSessionSummaries"] = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT ) if rollingOverview: result["rollingOverview"] = rollingOverview else: result["previousSessionSummaries"] = buildSessionSummariesForPrompt( allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT ) if len(completedSessions) >= ROLLING_OVERVIEW_SESSION_THRESHOLD: if rollingOverview and rollingUpTo is not None and rollingUpTo >= len(completedSessions) - 3: result["rollingOverview"] = rollingOverview else: try: toSummarize = completedSessions[ROLLING_OVERVIEW_EVERY_N_SESSIONS:] toSummarize = toSummarize[:ROLLING_OVERVIEW_EVERY_N_SESSIONS * 2] if len(toSummarize) >= ROLLING_OVERVIEW_EVERY_N_SESSIONS: summariesForOverview = buildSessionSummariesForPrompt( toSummarize, limit=len(toSummarize), ) overviewPrompt = aiPrompts.buildRollingOverviewPrompt( summariesForOverview, context.get("title", "Coaching") ) overviewResponse = await self._callAi( "Du fasst Coaching-Sessions kompakt zusammen.", overviewPrompt ) if overviewResponse and overviewResponse.errorCount == 0 and overviewResponse.content: newOverview = overviewResponse.content.strip() interface.updateContext(contextId, { "rollingOverview": newOverview, "rollingOverviewUpToSessionCount": len(completedSessions), }) result["rollingOverview"] = newOverview logger.info(f"Context {contextId}: Rolling overview updated ({len(toSummarize)} sessions)") except Exception as e: logger.warning(f"Rolling overview failed for context {contextId}: {e}") return result async def _callAi(self, systemPrompt: str, userPrompt: str): """Call the AI service with the given prompts.""" from modules.services.serviceAi.mainServiceAi import AiService serviceContext = type('Ctx', (), { 'user': self.currentUser, 'mandateId': self.mandateId, 'featureInstanceId': self.instanceId, 'featureCode': 'commcoach', })() aiService = AiService(serviceCenter=serviceContext) await aiService.ensureAiObjectsInitialized() aiRequest = AiCallRequest( prompt=userPrompt, context=systemPrompt, options=AiCallOptions( operationType=OperationTypeEnum.DATA_ANALYSE, priority=PriorityEnum.QUALITY, ) ) return await aiService.callAi(aiRequest)