746 lines
33 KiB
Python
746 lines
33 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
CommCoach Service - Coaching Orchestration.
|
|
Manages the coaching pipeline: message processing, AI calls, scoring, task extraction.
|
|
"""
|
|
|
|
import re
|
|
import logging
|
|
import json
|
|
import asyncio
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum
|
|
from modules.shared.timeUtils import getIsoTimestamp
|
|
|
|
from .datamodelCommcoach import (
|
|
CoachingMessage, CoachingMessageRole, CoachingMessageContentType,
|
|
CoachingSessionStatus, CoachingTask, CoachingTaskPriority,
|
|
CoachingScore, CoachingScoreTrend,
|
|
)
|
|
from . import serviceCommcoachAi as aiPrompts
|
|
from .serviceCommcoachAi import (
|
|
COMPRESSION_MESSAGE_THRESHOLD,
|
|
COMPRESSION_RECENT_COUNT,
|
|
COMPRESSION_MAX_MESSAGES_FETCH,
|
|
buildResumeGreetingPrompt,
|
|
)
|
|
from .serviceCommcoachContextRetrieval import (
|
|
detectIntent,
|
|
RetrievalIntent,
|
|
buildSessionSummariesForPrompt,
|
|
findSessionByDate,
|
|
searchSessionsByTopic,
|
|
_parseDateFromMessage,
|
|
PREVIOUS_SESSION_SUMMARIES_COUNT,
|
|
ROLLING_OVERVIEW_SESSION_THRESHOLD,
|
|
ROLLING_OVERVIEW_EVERY_N_SESSIONS,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _stripMarkdownForTts(text: str) -> str:
|
|
"""Strip markdown formatting so TTS reads clean speech text."""
|
|
t = text
|
|
t = re.sub(r'\*\*(.+?)\*\*', r'\1', t)
|
|
t = re.sub(r'\*(.+?)\*', r'\1', t)
|
|
t = re.sub(r'__(.+?)__', r'\1', t)
|
|
t = re.sub(r'_(.+?)_', r'\1', t)
|
|
t = re.sub(r'`[^`]+`', lambda m: m.group(0)[1:-1], t)
|
|
t = re.sub(r'^#{1,6}\s*', '', t, flags=re.MULTILINE)
|
|
t = re.sub(r'^\s*[-*+]\s+', '', t, flags=re.MULTILINE)
|
|
t = re.sub(r'^\s*\d+\.\s+', '', t, flags=re.MULTILINE)
|
|
t = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', t)
|
|
t = re.sub(r'\n{3,}', '\n\n', t)
|
|
return t.strip()
|
|
|
|
|
|
# Session event queues for SSE streaming
|
|
_sessionEvents: Dict[str, asyncio.Queue] = {}
|
|
|
|
|
|
async def emitSessionEvent(sessionId: str, eventType: str, data: Any):
|
|
"""Emit an event to the session's SSE stream."""
|
|
if sessionId not in _sessionEvents:
|
|
_sessionEvents[sessionId] = asyncio.Queue()
|
|
await _sessionEvents[sessionId].put({
|
|
"type": eventType,
|
|
"data": data,
|
|
"timestamp": getIsoTimestamp(),
|
|
})
|
|
|
|
|
|
def getSessionEventQueue(sessionId: str) -> asyncio.Queue:
|
|
if sessionId not in _sessionEvents:
|
|
_sessionEvents[sessionId] = asyncio.Queue()
|
|
return _sessionEvents[sessionId]
|
|
|
|
|
|
def cleanupSessionEvents(sessionId: str):
|
|
_sessionEvents.pop(sessionId, None)
|
|
|
|
|
|
class CommcoachService:
|
|
"""Coaching orchestrator: processes messages, calls AI, extracts tasks and scores."""
|
|
|
|
def __init__(self, currentUser: User, mandateId: str, instanceId: str):
|
|
self.currentUser = currentUser
|
|
self.mandateId = mandateId
|
|
self.instanceId = instanceId
|
|
self.userId = str(currentUser.id)
|
|
|
|
async def processMessage(self, sessionId: str, contextId: str, userContent: str, interface) -> Dict[str, Any]:
|
|
"""
|
|
Process a user message through the coaching pipeline:
|
|
1. Store user message
|
|
2. Build context with history
|
|
3. Call AI for coaching response
|
|
4. Store assistant message
|
|
5. Emit SSE events
|
|
"""
|
|
from . import interfaceFeatureCommcoach as interfaceDb
|
|
|
|
# Store user message
|
|
userMsg = CoachingMessage(
|
|
sessionId=sessionId,
|
|
contextId=contextId,
|
|
userId=self.userId,
|
|
role=CoachingMessageRole.USER,
|
|
content=userContent,
|
|
contentType=CoachingMessageContentType.TEXT,
|
|
).model_dump()
|
|
createdUserMsg = interface.createMessage(userMsg)
|
|
|
|
await emitSessionEvent(sessionId, "message", {
|
|
"id": createdUserMsg.get("id"),
|
|
"role": "user",
|
|
"content": userContent,
|
|
"createdAt": createdUserMsg.get("createdAt"),
|
|
})
|
|
|
|
# Build context
|
|
context = interface.getContext(contextId)
|
|
if not context:
|
|
logger.error(f"Context {contextId} not found")
|
|
return createdUserMsg
|
|
|
|
messages = interface.getRecentMessages(sessionId, count=COMPRESSION_MAX_MESSAGES_FETCH)
|
|
session = interface.getSession(sessionId)
|
|
compressedSummary = session.get("compressedHistorySummary") if session else None
|
|
compressedUpTo = session.get("compressedHistoryUpToMessageCount") if session else None
|
|
|
|
earlierSummary, previousMessages = aiPrompts.prepareMessagesForPrompt(
|
|
messages, compressedSummary, compressedUpTo
|
|
)
|
|
|
|
if earlierSummary is None and len(messages) > COMPRESSION_MESSAGE_THRESHOLD:
|
|
toSummarizeCount = len(messages) - COMPRESSION_RECENT_COUNT
|
|
if toSummarizeCount > 0:
|
|
toSummarize = messages[:toSummarizeCount]
|
|
try:
|
|
summaryPrompt = aiPrompts.buildEarlierConversationSummaryPrompt(toSummarize)
|
|
summaryResponse = await self._callAi(
|
|
"Du fasst Coaching-Gespraeche praezise zusammen.", summaryPrompt
|
|
)
|
|
if summaryResponse and summaryResponse.errorCount == 0 and summaryResponse.content:
|
|
earlierSummary = summaryResponse.content.strip()
|
|
interface.updateSession(sessionId, {
|
|
"compressedHistorySummary": earlierSummary,
|
|
"compressedHistoryUpToMessageCount": toSummarizeCount,
|
|
})
|
|
previousMessages = messages[-COMPRESSION_RECENT_COUNT:]
|
|
logger.info(f"Session {sessionId}: Compressed history ({toSummarizeCount} msgs -> {len(earlierSummary)} chars)")
|
|
except Exception as e:
|
|
logger.warning(f"History compression failed for session {sessionId}: {e}")
|
|
previousMessages = messages[-20:]
|
|
|
|
tasks = interface.getTasks(contextId, self.userId)
|
|
|
|
retrievalResult = await self._buildRetrievalContext(
|
|
contextId, sessionId, userContent, context, interface
|
|
)
|
|
|
|
systemPrompt = aiPrompts.buildCoachingSystemPrompt(
|
|
context,
|
|
previousMessages,
|
|
tasks,
|
|
previousSessionSummaries=retrievalResult.get("previousSessionSummaries"),
|
|
earlierSummary=earlierSummary,
|
|
rollingOverview=retrievalResult.get("rollingOverview"),
|
|
retrievedSession=retrievalResult.get("retrievedSession"),
|
|
retrievedByTopic=retrievalResult.get("retrievedByTopic"),
|
|
)
|
|
|
|
if retrievalResult.get("intent") == RetrievalIntent.SUMMARIZE_ALL:
|
|
systemPrompt += "\n\nWICHTIG: Der Benutzer moechte eine Gesamtzusammenfassung. Erstelle eine umfassende Zusammenfassung aller genannten Sessions und der aktuellen Session."
|
|
|
|
# Call AI
|
|
await emitSessionEvent(sessionId, "status", {"label": "Coach denkt nach..."})
|
|
|
|
try:
|
|
aiResponse = await self._callAi(systemPrompt, userContent)
|
|
except Exception as e:
|
|
logger.error(f"AI call failed for session {sessionId}: {e}")
|
|
await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"})
|
|
return createdUserMsg
|
|
|
|
responseText = aiResponse.content.strip() if aiResponse and aiResponse.errorCount == 0 else "Entschuldigung, ich konnte gerade nicht antworten. Bitte versuche es erneut."
|
|
|
|
# Store assistant message
|
|
assistantMsg = CoachingMessage(
|
|
sessionId=sessionId,
|
|
contextId=contextId,
|
|
userId=self.userId,
|
|
role=CoachingMessageRole.ASSISTANT,
|
|
content=responseText,
|
|
contentType=CoachingMessageContentType.TEXT,
|
|
).model_dump()
|
|
createdAssistantMsg = interface.createMessage(assistantMsg)
|
|
|
|
# Update session message count
|
|
messages = interface.getMessages(sessionId)
|
|
interface.updateSession(sessionId, {"messageCount": len(messages)})
|
|
|
|
await emitSessionEvent(sessionId, "message", {
|
|
"id": createdAssistantMsg.get("id"),
|
|
"role": "assistant",
|
|
"content": responseText,
|
|
"createdAt": createdAssistantMsg.get("createdAt"),
|
|
})
|
|
|
|
if responseText:
|
|
try:
|
|
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
|
|
import base64
|
|
voiceInterface = getVoiceInterface(self.currentUser, self.mandateId)
|
|
profile = interface.getProfile(self.userId, self.instanceId)
|
|
language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE"
|
|
voiceName = profile.get("preferredVoice") if profile else None
|
|
ttsResult = await voiceInterface.textToSpeech(
|
|
text=_stripMarkdownForTts(responseText),
|
|
languageCode=language,
|
|
voiceName=voiceName,
|
|
)
|
|
if ttsResult and isinstance(ttsResult, dict):
|
|
audioBytes = ttsResult.get("audioContent")
|
|
if audioBytes:
|
|
audioB64 = base64.b64encode(
|
|
audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode()
|
|
).decode()
|
|
await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"})
|
|
except Exception as e:
|
|
logger.warning(f"TTS failed for text message session {sessionId}: {e}")
|
|
|
|
await emitSessionEvent(sessionId, "complete", {})
|
|
return createdAssistantMsg
|
|
|
|
async def processSessionOpening(self, sessionId: str, contextId: str, interface) -> Dict[str, Any]:
|
|
"""
|
|
Generate and stream the opening greeting for a new session.
|
|
Emits status, message, and complete events to the session queue.
|
|
"""
|
|
await emitSessionEvent(sessionId, "status", {"label": "Coach bereitet sich vor..."})
|
|
|
|
context = interface.getContext(contextId)
|
|
if not context:
|
|
logger.error(f"Context {contextId} not found")
|
|
await emitSessionEvent(sessionId, "error", {"message": "Context not found"})
|
|
await emitSessionEvent(sessionId, "complete", {})
|
|
return {}
|
|
|
|
tasks = interface.getTasks(contextId, self.userId)
|
|
previousMessages = []
|
|
|
|
allSessions = interface.getSessions(contextId, self.userId)
|
|
previousSessionSummaries = buildSessionSummariesForPrompt(
|
|
allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT
|
|
)
|
|
|
|
systemPrompt = aiPrompts.buildCoachingSystemPrompt(
|
|
context, previousMessages, tasks, previousSessionSummaries=previousSessionSummaries
|
|
)
|
|
openingUserPrompt = "Beginne die Coaching-Session mit einer kurzen Begruesssung, fasse in einem Satz zusammen wo wir stehen (falls vorherige Sessions), und stelle eine gezielte Einstiegsfrage zum Thema."
|
|
|
|
try:
|
|
aiResponse = await self._callAi(systemPrompt, openingUserPrompt)
|
|
except Exception as e:
|
|
logger.error(f"AI opening failed for session {sessionId}: {e}")
|
|
await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"})
|
|
await emitSessionEvent(sessionId, "complete", {})
|
|
return {}
|
|
|
|
openingContent = (
|
|
aiResponse.content.strip()
|
|
if aiResponse and aiResponse.errorCount == 0
|
|
else f"Willkommen zur Coaching-Session zum Thema \"{context.get('title')}\". Was moechtest du heute besprechen?"
|
|
)
|
|
|
|
assistantMsg = CoachingMessage(
|
|
sessionId=sessionId,
|
|
contextId=contextId,
|
|
userId=self.userId,
|
|
role=CoachingMessageRole.ASSISTANT,
|
|
content=openingContent,
|
|
contentType=CoachingMessageContentType.TEXT,
|
|
).model_dump()
|
|
createdMsg = interface.createMessage(assistantMsg)
|
|
interface.updateSession(sessionId, {"messageCount": 1})
|
|
|
|
await emitSessionEvent(sessionId, "message", {
|
|
"id": createdMsg.get("id"),
|
|
"sessionId": sessionId,
|
|
"contextId": contextId,
|
|
"role": "assistant",
|
|
"content": openingContent,
|
|
"contentType": "text",
|
|
"createdAt": createdMsg.get("createdAt"),
|
|
})
|
|
if openingContent:
|
|
try:
|
|
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
|
|
import base64
|
|
voiceInterface = getVoiceInterface(self.currentUser, self.mandateId)
|
|
profile = interface.getProfile(self.userId, self.instanceId)
|
|
language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE"
|
|
voiceName = profile.get("preferredVoice") if profile else None
|
|
ttsResult = await voiceInterface.textToSpeech(
|
|
text=_stripMarkdownForTts(openingContent),
|
|
languageCode=language,
|
|
voiceName=voiceName,
|
|
)
|
|
if ttsResult and isinstance(ttsResult, dict):
|
|
audioBytes = ttsResult.get("audioContent")
|
|
if audioBytes:
|
|
audioB64 = base64.b64encode(
|
|
audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode()
|
|
).decode()
|
|
await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"})
|
|
except Exception as e:
|
|
logger.warning(f"TTS failed for opening: {e}")
|
|
await emitSessionEvent(sessionId, "complete", {})
|
|
|
|
logger.info(f"CommCoach session opening completed: {sessionId}")
|
|
return createdMsg
|
|
|
|
async def generateResumeGreeting(self, sessionId: str, contextId: str, messages: list, interface) -> str:
|
|
"""Generate a follow-up greeting when user returns to an active session."""
|
|
context = interface.getContext(contextId)
|
|
if not context:
|
|
raise ValueError(f"Context {contextId} not found for resume greeting")
|
|
contextTitle = context.get("title", "Coaching")
|
|
prompt = buildResumeGreetingPrompt(messages, contextTitle)
|
|
aiResponse = await self._callAi(
|
|
"Du bist ein freundlicher Coach. Antworte kurz und einladend.",
|
|
prompt,
|
|
)
|
|
if not aiResponse or aiResponse.errorCount > 0 or not aiResponse.content:
|
|
raise RuntimeError(f"AI resume greeting failed: {getattr(aiResponse, 'errorMessage', 'no content')}")
|
|
return aiResponse.content.strip()
|
|
|
|
async def processAudioMessage(self, sessionId: str, contextId: str, audioContent: bytes, language: str, interface) -> Dict[str, Any]:
|
|
"""Process an audio message: STT -> coaching pipeline -> TTS response."""
|
|
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
|
|
|
|
await emitSessionEvent(sessionId, "status", {"label": "Sprache wird erkannt..."})
|
|
|
|
voiceInterface = getVoiceInterface(self.currentUser, self.mandateId)
|
|
sttResult = await voiceInterface.speechToText(
|
|
audioContent=audioContent,
|
|
language=language,
|
|
skipFallbacks=True,
|
|
)
|
|
|
|
transcribedText = ""
|
|
if sttResult and isinstance(sttResult, dict):
|
|
transcribedText = sttResult.get("text", "")
|
|
elif isinstance(sttResult, str):
|
|
transcribedText = sttResult
|
|
|
|
if not transcribedText.strip():
|
|
sttError = sttResult.get("error", "Unbekannter Fehler") if isinstance(sttResult, dict) else "Unbekannter Fehler"
|
|
msg = f"Sprache konnte nicht erkannt werden. ({sttError})"
|
|
await emitSessionEvent(sessionId, "error", {"message": msg, "detail": sttError})
|
|
return {}
|
|
|
|
# Process through normal pipeline
|
|
result = await self.processMessage(sessionId, contextId, transcribedText, interface)
|
|
|
|
# Generate TTS for the response
|
|
assistantContent = result.get("content", "")
|
|
if assistantContent:
|
|
await emitSessionEvent(sessionId, "status", {"label": "Antwort wird gesprochen..."})
|
|
try:
|
|
profile = interface.getProfile(self.userId, self.instanceId)
|
|
voiceName = profile.get("preferredVoice") if profile else None
|
|
|
|
ttsResult = await voiceInterface.textToSpeech(
|
|
text=_stripMarkdownForTts(assistantContent),
|
|
languageCode=language,
|
|
voiceName=voiceName,
|
|
)
|
|
if ttsResult and isinstance(ttsResult, dict):
|
|
import base64
|
|
audioBytes = ttsResult.get("audioContent")
|
|
if audioBytes:
|
|
audioB64 = base64.b64encode(
|
|
audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode()
|
|
).decode()
|
|
await emitSessionEvent(sessionId, "ttsAudio", {
|
|
"audio": audioB64,
|
|
"format": "mp3",
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"TTS failed for session {sessionId}: {e}")
|
|
|
|
return result
|
|
|
|
async def completeSession(self, sessionId: str, interface) -> Dict[str, Any]:
|
|
"""
|
|
Complete a session:
|
|
1. Generate summary
|
|
2. Extract tasks
|
|
3. Generate scores
|
|
4. Update context stats
|
|
5. Send email summary
|
|
"""
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
return {}
|
|
|
|
contextId = session.get("contextId")
|
|
context = interface.getContext(contextId) if contextId else None
|
|
messages = interface.getMessages(sessionId)
|
|
|
|
if len(messages) < 2:
|
|
interface.updateSession(sessionId, {
|
|
"status": CoachingSessionStatus.COMPLETED.value,
|
|
"endedAt": getIsoTimestamp(),
|
|
})
|
|
return session
|
|
|
|
# Generate summary
|
|
try:
|
|
summaryPrompt = aiPrompts.buildSummaryPrompt(messages, context.get("title", "Coaching"))
|
|
summaryResponse = await self._callAi("Du bist ein praeziser Zusammenfasser.", summaryPrompt)
|
|
summary = summaryResponse.content.strip() if summaryResponse and summaryResponse.errorCount == 0 else None
|
|
except Exception as e:
|
|
logger.warning(f"Summary generation failed: {e}")
|
|
summary = None
|
|
|
|
keyTopics = None
|
|
if summary:
|
|
try:
|
|
keyTopicsPrompt = aiPrompts.buildKeyTopicsExtractionPrompt(summary, messages)
|
|
keyTopicsResponse = await self._callAi(
|
|
"Du extrahierst Kernthemen aus Zusammenfassungen.", keyTopicsPrompt
|
|
)
|
|
if keyTopicsResponse and keyTopicsResponse.errorCount == 0 and keyTopicsResponse.content:
|
|
parsed = aiPrompts.parseJsonResponse(keyTopicsResponse.content, [])
|
|
if isinstance(parsed, list) and parsed:
|
|
keyTopics = json.dumps([str(t) for t in parsed[:5]])
|
|
except Exception as e:
|
|
logger.warning(f"Key topics extraction failed: {e}")
|
|
|
|
# Extract tasks
|
|
try:
|
|
taskPrompt = aiPrompts.buildTaskExtractionPrompt(messages)
|
|
taskResponse = await self._callAi("Du extrahierst Aufgaben aus Gespraechen.", taskPrompt)
|
|
if taskResponse and taskResponse.errorCount == 0:
|
|
extractedTasks = aiPrompts.parseJsonResponse(taskResponse.content, [])
|
|
if isinstance(extractedTasks, list):
|
|
for taskData in extractedTasks[:3]:
|
|
if isinstance(taskData, dict) and taskData.get("title"):
|
|
newTask = CoachingTask(
|
|
contextId=contextId,
|
|
sessionId=sessionId,
|
|
userId=self.userId,
|
|
mandateId=self.mandateId,
|
|
title=taskData["title"],
|
|
description=taskData.get("description"),
|
|
priority=taskData.get("priority", "medium"),
|
|
).model_dump()
|
|
created = interface.createTask(newTask)
|
|
await emitSessionEvent(sessionId, "taskCreated", created)
|
|
except Exception as e:
|
|
logger.warning(f"Task extraction failed: {e}")
|
|
|
|
# Generate scores
|
|
try:
|
|
scorePrompt = aiPrompts.buildScoringPrompt(messages, context.get("category", "custom") if context else "custom")
|
|
scoreResponse = await self._callAi("Du bewertest Kommunikationskompetenz.", scorePrompt)
|
|
competenceScore = None
|
|
if scoreResponse and scoreResponse.errorCount == 0:
|
|
scores = aiPrompts.parseJsonResponse(scoreResponse.content, [])
|
|
if isinstance(scores, list):
|
|
scoreValues = []
|
|
for scoreData in scores:
|
|
if isinstance(scoreData, dict) and "dimension" in scoreData and "score" in scoreData:
|
|
newScore = CoachingScore(
|
|
contextId=contextId,
|
|
sessionId=sessionId,
|
|
userId=self.userId,
|
|
mandateId=self.mandateId,
|
|
dimension=scoreData["dimension"],
|
|
score=float(scoreData["score"]),
|
|
trend=scoreData.get("trend", "stable"),
|
|
evidence=scoreData.get("evidence"),
|
|
).model_dump()
|
|
interface.createScore(newScore)
|
|
scoreValues.append(float(scoreData["score"]))
|
|
await emitSessionEvent(sessionId, "scoreUpdate", scoreData)
|
|
if scoreValues:
|
|
competenceScore = sum(scoreValues) / len(scoreValues)
|
|
except Exception as e:
|
|
logger.warning(f"Scoring failed: {e}")
|
|
competenceScore = None
|
|
|
|
# Calculate duration
|
|
startedAt = session.get("startedAt", "")
|
|
durationSeconds = 0
|
|
if startedAt:
|
|
try:
|
|
from datetime import datetime
|
|
start = datetime.fromisoformat(startedAt.replace("Z", "+00:00"))
|
|
end = datetime.now(start.tzinfo) if start.tzinfo else datetime.now()
|
|
durationSeconds = int((end - start).total_seconds())
|
|
except Exception:
|
|
pass
|
|
|
|
# Update session
|
|
sessionUpdates = {
|
|
"status": CoachingSessionStatus.COMPLETED.value,
|
|
"endedAt": getIsoTimestamp(),
|
|
"summary": summary,
|
|
"durationSeconds": durationSeconds,
|
|
"messageCount": len(messages),
|
|
}
|
|
if competenceScore is not None:
|
|
sessionUpdates["competenceScore"] = round(competenceScore, 1)
|
|
if keyTopics is not None:
|
|
sessionUpdates["keyTopics"] = keyTopics
|
|
interface.updateSession(sessionId, sessionUpdates)
|
|
|
|
# Update context stats
|
|
if contextId:
|
|
allSessions = interface.getSessions(contextId, self.userId)
|
|
completedCount = len([s for s in allSessions if s.get("status") == CoachingSessionStatus.COMPLETED.value])
|
|
interface.updateContext(contextId, {
|
|
"sessionCount": completedCount,
|
|
"lastSessionAt": getIsoTimestamp(),
|
|
})
|
|
|
|
# Update user profile streak
|
|
self._updateStreak(interface)
|
|
|
|
# Send email summary
|
|
if summary:
|
|
await self._sendSessionEmail(session, summary, interface)
|
|
|
|
await emitSessionEvent(sessionId, "sessionState", {
|
|
"status": "completed",
|
|
"summary": summary,
|
|
"competenceScore": competenceScore,
|
|
})
|
|
await emitSessionEvent(sessionId, "complete", {})
|
|
|
|
return interface.getSession(sessionId)
|
|
|
|
def _updateStreak(self, interface):
|
|
"""Update the user's streak in their profile."""
|
|
try:
|
|
profile = interface.getProfile(self.userId, self.instanceId)
|
|
if not profile:
|
|
profile = interface.getOrCreateProfile(self.userId, self.mandateId, self.instanceId)
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
lastSessionAt = profile.get("lastSessionAt")
|
|
currentStreak = profile.get("streakDays", 0)
|
|
longestStreak = profile.get("longestStreak", 0)
|
|
totalSessions = profile.get("totalSessions", 0)
|
|
|
|
today = datetime.now().date()
|
|
isConsecutive = False
|
|
|
|
if lastSessionAt:
|
|
try:
|
|
lastDate = datetime.fromisoformat(lastSessionAt.replace("Z", "+00:00")).date()
|
|
diff = (today - lastDate).days
|
|
if diff == 1:
|
|
isConsecutive = True
|
|
elif diff == 0:
|
|
isConsecutive = True # Same day, maintain streak
|
|
except Exception:
|
|
pass
|
|
|
|
newStreak = (currentStreak + 1) if isConsecutive else 1
|
|
newLongest = max(longestStreak, newStreak)
|
|
|
|
interface.updateProfile(profile.get("id"), {
|
|
"streakDays": newStreak,
|
|
"longestStreak": newLongest,
|
|
"totalSessions": totalSessions + 1,
|
|
"lastSessionAt": getIsoTimestamp(),
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update streak: {e}")
|
|
|
|
async def _sendSessionEmail(self, session: Dict[str, Any], summary: str, interface):
|
|
"""Send session summary via email if enabled."""
|
|
try:
|
|
profile = interface.getProfile(self.userId, self.instanceId)
|
|
if profile and not profile.get("emailSummaryEnabled", True):
|
|
return
|
|
|
|
from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
|
|
rootInterface = getRootInterface()
|
|
user = rootInterface.getUser(self.userId)
|
|
if not user or not user.email:
|
|
return
|
|
|
|
messaging = getMessagingInterface()
|
|
subject = f"Coaching-Session Zusammenfassung: {session.get('contextId', 'Session')}"
|
|
htmlMessage = f"""
|
|
<h2>Coaching-Session Zusammenfassung</h2>
|
|
<p>{summary.replace(chr(10), '<br>')}</p>
|
|
<hr>
|
|
<p><small>Diese Zusammenfassung wurde automatisch erstellt.</small></p>
|
|
"""
|
|
|
|
messaging.send("email", user.email, subject, htmlMessage)
|
|
interface.updateSession(session.get("id"), {"emailSent": True})
|
|
logger.info(f"Session summary email sent to {user.email}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to send session email: {e}")
|
|
|
|
async def _buildRetrievalContext(
|
|
self,
|
|
contextId: str,
|
|
sessionId: str,
|
|
userContent: str,
|
|
context: Dict[str, Any],
|
|
interface,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build retrieval context based on user intent.
|
|
Returns: previousSessionSummaries, rollingOverview, retrievedSession, retrievedByTopic, intent, sessionSummaries.
|
|
"""
|
|
intent = detectIntent(userContent)
|
|
allSessions = interface.getSessions(contextId, self.userId)
|
|
completedSessions = [s for s in allSessions if s.get("status") == CoachingSessionStatus.COMPLETED.value]
|
|
|
|
for s in completedSessions:
|
|
startedAt = s.get("startedAt") or s.get("createdAt") or ""
|
|
if startedAt:
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(str(startedAt).replace("Z", "+00:00"))
|
|
s["date"] = dt.strftime("%d.%m.%Y")
|
|
except Exception:
|
|
s["date"] = ""
|
|
|
|
result = {
|
|
"intent": intent,
|
|
"previousSessionSummaries": [],
|
|
"rollingOverview": None,
|
|
"retrievedSession": None,
|
|
"retrievedByTopic": None,
|
|
"sessionSummaries": [],
|
|
}
|
|
|
|
ctx = interface.getContext(contextId)
|
|
rollingOverview = ctx.get("rollingOverview") if ctx else None
|
|
rollingUpTo = ctx.get("rollingOverviewUpToSessionCount") if ctx else None
|
|
|
|
if intent == RetrievalIntent.SUMMARIZE_ALL:
|
|
result["previousSessionSummaries"] = buildSessionSummariesForPrompt(
|
|
allSessions, excludeSessionId=sessionId, limit=20
|
|
)
|
|
result["sessionSummaries"] = result["previousSessionSummaries"]
|
|
if len(completedSessions) >= ROLLING_OVERVIEW_SESSION_THRESHOLD and rollingOverview:
|
|
result["rollingOverview"] = rollingOverview
|
|
|
|
elif intent == RetrievalIntent.RECALL_SESSION:
|
|
targetDate = _parseDateFromMessage(userContent)
|
|
retrieved = findSessionByDate(completedSessions, targetDate)
|
|
if retrieved:
|
|
result["retrievedSession"] = retrieved
|
|
logger.info(f"Session recall: found session {retrieved.get('id')} for date {targetDate}")
|
|
result["previousSessionSummaries"] = buildSessionSummariesForPrompt(
|
|
allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT
|
|
)
|
|
if rollingOverview:
|
|
result["rollingOverview"] = rollingOverview
|
|
|
|
elif intent == RetrievalIntent.RECALL_TOPIC:
|
|
retrieved = searchSessionsByTopic(completedSessions, userContent)
|
|
result["retrievedByTopic"] = retrieved
|
|
if retrieved:
|
|
logger.info(f"Topic recall: found {len(retrieved)} sessions for query")
|
|
result["previousSessionSummaries"] = buildSessionSummariesForPrompt(
|
|
allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT
|
|
)
|
|
if rollingOverview:
|
|
result["rollingOverview"] = rollingOverview
|
|
|
|
else:
|
|
result["previousSessionSummaries"] = buildSessionSummariesForPrompt(
|
|
allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT
|
|
)
|
|
if len(completedSessions) >= ROLLING_OVERVIEW_SESSION_THRESHOLD:
|
|
if rollingOverview and rollingUpTo is not None and rollingUpTo >= len(completedSessions) - 3:
|
|
result["rollingOverview"] = rollingOverview
|
|
else:
|
|
try:
|
|
toSummarize = completedSessions[ROLLING_OVERVIEW_EVERY_N_SESSIONS:]
|
|
toSummarize = toSummarize[:ROLLING_OVERVIEW_EVERY_N_SESSIONS * 2]
|
|
if len(toSummarize) >= ROLLING_OVERVIEW_EVERY_N_SESSIONS:
|
|
summariesForOverview = buildSessionSummariesForPrompt(
|
|
toSummarize, limit=len(toSummarize),
|
|
)
|
|
overviewPrompt = aiPrompts.buildRollingOverviewPrompt(
|
|
summariesForOverview, context.get("title", "Coaching")
|
|
)
|
|
overviewResponse = await self._callAi(
|
|
"Du fasst Coaching-Sessions kompakt zusammen.", overviewPrompt
|
|
)
|
|
if overviewResponse and overviewResponse.errorCount == 0 and overviewResponse.content:
|
|
newOverview = overviewResponse.content.strip()
|
|
interface.updateContext(contextId, {
|
|
"rollingOverview": newOverview,
|
|
"rollingOverviewUpToSessionCount": len(completedSessions),
|
|
})
|
|
result["rollingOverview"] = newOverview
|
|
logger.info(f"Context {contextId}: Rolling overview updated ({len(toSummarize)} sessions)")
|
|
except Exception as e:
|
|
logger.warning(f"Rolling overview failed for context {contextId}: {e}")
|
|
|
|
return result
|
|
|
|
async def _callAi(self, systemPrompt: str, userPrompt: str):
|
|
"""Call the AI service with the given prompts."""
|
|
from modules.services.serviceAi.mainServiceAi import AiService
|
|
|
|
serviceContext = type('Ctx', (), {
|
|
'user': self.currentUser,
|
|
'mandateId': self.mandateId,
|
|
'featureInstanceId': self.instanceId,
|
|
'featureCode': 'commcoach',
|
|
})()
|
|
aiService = AiService(serviceCenter=serviceContext)
|
|
await aiService.ensureAiObjectsInitialized()
|
|
|
|
aiRequest = AiCallRequest(
|
|
prompt=userPrompt,
|
|
context=systemPrompt,
|
|
options=AiCallOptions(
|
|
operationType=OperationTypeEnum.DATA_ANALYSE,
|
|
priority=PriorityEnum.QUALITY,
|
|
)
|
|
)
|
|
return await aiService.callAi(aiRequest)
|