From 0a5fa20cb8fdc8b318cbe073d4aac97a23e73970 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Wed, 1 Apr 2026 21:59:28 +0200 Subject: [PATCH] fixed voice feat commcoach --- modules/datamodels/datamodelAi.py | 1 + .../features/commcoach/datamodelCommcoach.py | 4 + .../commcoach/routeFeatureCommcoach.py | 15 +- .../features/commcoach/serviceCommcoach.py | 615 +++++++++++++++--- .../features/commcoach/serviceCommcoachAi.py | 63 +- .../commcoach/serviceCommcoachIndexer.py | 223 +++++++ .../commcoach/serviceCommcoachScheduler.py | 56 +- modules/interfaces/interfaceAiObjects.py | 18 +- modules/routes/routeVoiceGoogle.py | 4 +- .../services/serviceAgent/agentLoop.py | 15 +- .../services/serviceAgent/mainServiceAgent.py | 104 +-- .../services/serviceAgent/toolRegistry.py | 14 +- 12 files changed, 932 insertions(+), 200 deletions(-) create mode 100644 modules/features/commcoach/serviceCommcoachIndexer.py diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index 96e05185..662eded2 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -172,6 +172,7 @@ class AiCallRequest(BaseModel): contentParts: Optional[List['ContentPart']] = None # Content parts for model-aware chunking messages: Optional[List[Dict[str, Any]]] = Field(default=None, description="OpenAI-style messages for multi-turn agent conversations") tools: Optional[List[Dict[str, Any]]] = Field(default=None, description="Tool definitions for native function calling") + toolChoice: Optional[Any] = Field(default=None, description="Tool choice: 'auto', 'none', or specific tool (passed through to model call)") requireNeutralization: Optional[bool] = Field(default=None, description="Per-request neutralization override: True=force, False=skip, None=use config") diff --git a/modules/features/commcoach/datamodelCommcoach.py b/modules/features/commcoach/datamodelCommcoach.py index 635ba19a..82be6044 100644 --- a/modules/features/commcoach/datamodelCommcoach.py +++ b/modules/features/commcoach/datamodelCommcoach.py @@ -228,6 +228,10 @@ class UpdateContextRequest(BaseModel): class SendMessageRequest(BaseModel): content: str = Field(description="User message text") contentType: Optional[CoachingMessageContentType] = CoachingMessageContentType.TEXT + fileIds: Optional[List[str]] = Field(default=None, description="Attached file IDs for agent context") + dataSourceIds: Optional[List[str]] = Field(default=None, description="Personal data source IDs") + featureDataSourceIds: Optional[List[str]] = Field(default=None, description="Feature data source IDs") + allowedProviders: Optional[List[str]] = Field(default=None, description="Allowed AI providers") class CreateTaskRequest(BaseModel): diff --git a/modules/features/commcoach/routeFeatureCommcoach.py b/modules/features/commcoach/routeFeatureCommcoach.py index ccb4d342..8ffd3eca 100644 --- a/modules/features/commcoach/routeFeatureCommcoach.py +++ b/modules/features/commcoach/routeFeatureCommcoach.py @@ -334,9 +334,8 @@ async def startSession( try: from modules.interfaces.interfaceVoiceObjects import getVoiceInterface voiceInterface = getVoiceInterface(context.user, mandateId) - from .serviceCommcoach import _getUserVoicePrefs + from .serviceCommcoach import _getUserVoicePrefs, _stripMarkdownForTts, _buildTtsConfigErrorMessage language, voiceName = _getUserVoicePrefs(userId, mandateId) - from .serviceCommcoach import _stripMarkdownForTts ttsResult = await voiceInterface.textToSpeech( text=_stripMarkdownForTts(greetingText), languageCode=language, @@ -349,8 +348,12 @@ async def startSession( audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() ).decode() yield f"data: {json.dumps({'type': 'ttsAudio', 'data': {'audio': audioB64, 'format': 'mp3'}})}\n\n" + else: + errorDetail = ttsResult.get("error", "Text-to-Speech failed") + yield f"data: {json.dumps({'type': 'error', 'data': {'message': _buildTtsConfigErrorMessage(language, voiceName, errorDetail), 'detail': errorDetail, 'ttsLanguage': language, 'ttsVoice': voiceName}})}\n\n" except Exception as e: logger.warning(f"TTS failed for resumed session: {e}") + yield f"data: {json.dumps({'type': 'error', 'data': {'message': 'Die konfigurierte Stimme für diese Sprache ist ungültig oder nicht verfügbar. Bitte passe sie unter Einstellungen > Stimme & Sprache an.', 'detail': str(e)}})}\n\n" yield f"data: {json.dumps({'type': 'complete', 'data': {}, 'timestamp': getIsoTimestamp()})}\n\n" return StreamingResponse( @@ -511,7 +514,13 @@ async def sendMessageStream( _activeProcessTasks.pop(sessionId, None) task = asyncio.create_task( - service.processMessage(sessionId, contextId, body.content, interface) + service.processMessage( + sessionId, contextId, body.content, interface, + fileIds=body.fileIds, + dataSourceIds=body.dataSourceIds, + featureDataSourceIds=body.featureDataSourceIds, + allowedProviders=body.allowedProviders, + ) ) task.add_done_callback(_onTaskDone) _activeProcessTasks[sessionId] = task diff --git a/modules/features/commcoach/serviceCommcoach.py b/modules/features/commcoach/serviceCommcoach.py index 5e5aa810..332a4a01 100644 --- a/modules/features/commcoach/serviceCommcoach.py +++ b/modules/features/commcoach/serviceCommcoach.py @@ -6,6 +6,7 @@ Manages the coaching pipeline: message processing, AI calls, scoring, task extra """ import re +import html import logging import json import asyncio @@ -43,25 +44,117 @@ from .serviceCommcoachContextRetrieval import ( logger = logging.getLogger(__name__) +def _selectConfiguredVoice( + language: str, + voiceMap: Any, + legacyVoice: Optional[str] = None, + legacyLanguage: Optional[str] = None, +) -> Optional[str]: + """Resolve the configured TTS voice for a language from ttsVoiceMap, then legacy ttsVoice.""" + normalizedLanguage = str(language or "").strip() + normalizedLower = normalizedLanguage.lower() + baseLanguage = normalizedLower.split("-", 1)[0] if normalizedLower else "" + + if isinstance(voiceMap, dict) and voiceMap: + direct = voiceMap.get(normalizedLanguage) + if isinstance(direct, str) and direct.strip(): + return direct.strip() + + directBase = voiceMap.get(baseLanguage) + if isinstance(directBase, str) and directBase.strip(): + return directBase.strip() + + for mapKey, mapValue in voiceMap.items(): + if not isinstance(mapValue, str) or not mapValue.strip(): + continue + keyNorm = str(mapKey or "").strip().lower() + if keyNorm == normalizedLower or keyNorm == baseLanguage or (baseLanguage and keyNorm.startswith(baseLanguage + "-")): + return mapValue.strip() + + if legacyVoice and str(legacyVoice).strip(): + legacyLangNorm = str(legacyLanguage or "").strip().lower() + if not legacyLangNorm or legacyLangNorm == normalizedLower: + return str(legacyVoice).strip() + + return None + + +def _buildTtsConfigErrorMessage(language: str, voiceName: Optional[str], rawError: str = "") -> str: + if voiceName: + return ( + f'Die konfigurierte Stimme "{voiceName}" für {language} ist ungültig oder nicht verfügbar. ' + 'Bitte passe sie unter Einstellungen > Stimme & Sprache an.' + ) + return ( + f'Für die Sprache {language} ist keine gültige TTS-Stimme konfiguriert. ' + 'Bitte prüfe die Einstellungen unter Stimme & Sprache.' + ) + + def _getUserVoicePrefs(userId: str, mandateId: Optional[str] = None) -> tuple: """Load voice language and voiceName from central UserVoicePreferences. Returns (language, voiceName) tuple.""" try: from modules.datamodels.datamodelUam import UserVoicePreferences - from modules.security.rootAccess import getRootInterface + from modules.interfaces.interfaceDbApp import getRootInterface rootIf = getRootInterface() prefs = rootIf.db.getRecordset( UserVoicePreferences, - recordFilter={"userId": userId, "mandateId": mandateId} + recordFilter={"userId": userId} ) - if not prefs and mandateId: - prefs = rootIf.db.getRecordset( - UserVoicePreferences, - recordFilter={"userId": userId} - ) if prefs: - p = prefs[0] if isinstance(prefs[0], dict) else prefs[0].model_dump() - return (p.get("ttsLanguage") or p.get("sttLanguage") or "de-DE", p.get("ttsVoice")) + allPrefs = [ + pref if isinstance(pref, dict) else pref.model_dump() + for pref in prefs + ] + scopedPref = next( + ( + pref for pref in allPrefs + if str(pref.get("mandateId") or "").strip() == str(mandateId or "").strip() + ), + None, + ) + globalPref = next( + ( + pref for pref in allPrefs + if not str(pref.get("mandateId") or "").strip() + ), + None, + ) + + language = ( + (globalPref or {}).get("ttsLanguage") + or (globalPref or {}).get("sttLanguage") + or (scopedPref or {}).get("ttsLanguage") + or (scopedPref or {}).get("sttLanguage") + or "de-DE" + ) + + scopedVoiceFromMap = _selectConfiguredVoice( + language=language, + voiceMap=(scopedPref or {}).get("ttsVoiceMap"), + ) + globalVoice = _selectConfiguredVoice( + language=language, + voiceMap=(globalPref or {}).get("ttsVoiceMap"), + legacyVoice=(globalPref or {}).get("ttsVoice"), + legacyLanguage=(globalPref or {}).get("ttsLanguage"), + ) + scopedLegacyVoice = _selectConfiguredVoice( + language=language, + voiceMap=None, + legacyVoice=(scopedPref or {}).get("ttsVoice"), + legacyLanguage=(scopedPref or {}).get("ttsLanguage"), + ) + anyPref = allPrefs[0] + fallbackVoice = _selectConfiguredVoice( + language=language, + voiceMap=(anyPref or {}).get("ttsVoiceMap"), + legacyVoice=(anyPref or {}).get("ttsVoice"), + legacyLanguage=(anyPref or {}).get("ttsLanguage"), + ) + voiceName = scopedVoiceFromMap or globalVoice or scopedLegacyVoice or fallbackVoice + return (language, voiceName) except Exception as e: logger.warning(f"Failed to load UserVoicePreferences for user={userId}: {e}") return ("de-DE", None) @@ -111,26 +204,91 @@ def cleanupSessionEvents(sessionId: str): CHUNK_WORD_SIZE = 4 CHUNK_DELAY_SECONDS = 0.05 -def _wrapEmailHtml(contentHtml: str) -> str: - """Wrap AI-generated HTML content in a styled email shell.""" - return f""" - - - -
-
-
-

Coaching-Session Zusammenfassung

-

PowerOn CommCoach

-
-
{contentHtml}
-
-

Diese Zusammenfassung wurde automatisch erstellt.

-
-
-
- -""" + +def _normalizeEmailBulletList(values: Any, maxItems: int = 4) -> List[str]: + items: List[str] = [] + if not isinstance(values, list): + return items + for value in values: + text = str(value or "").strip() + if text: + items.append(text) + if len(items) >= maxItems: + break + return items + + +def _buildSummaryEmailBlock( + emailData: Optional[Dict[str, Any]], + summary: str, + contextTitle: str, +) -> str: + """Render a stable, mail-client-friendly CommCoach summary block.""" + payload = emailData or {} + headline = str(payload.get("headline") or contextTitle or "Coaching-Session").strip() + intro = str(payload.get("intro") or "").strip() + coreTopic = str(payload.get("coreTopic") or "").strip() + insights = _normalizeEmailBulletList(payload.get("insights")) + nextSteps = _normalizeEmailBulletList(payload.get("nextSteps")) + progress = _normalizeEmailBulletList(payload.get("progress")) + + if not (intro or coreTopic or insights or nextSteps or progress): + escapedSummary = html.escape(summary or "").replace("\n", "
") + return ( + '
' + f'

{html.escape(headline)}

' + f'
{escapedSummary}
' + '
' + ) + + def _renderSection(title: str, bodyHtml: str) -> str: + if not bodyHtml: + return "" + return ( + '' + f'
{html.escape(title)}
' + f'
{bodyHtml}
' + '' + ) + + def _renderList(values: List[str]) -> str: + if not values: + return "" + rows = "".join( + '' + '•' + f'{html.escape(item)}' + '' + for item in values + ) + return f'{rows}
' + + introHtml = f'

{html.escape(intro)}

' if intro else "" + coreTopicHtml = f'

{html.escape(coreTopic)}

' if coreTopic else "" + + sectionsHtml = "".join([ + _renderSection("Kernbotschaft", introHtml), + _renderSection("Kernthema", coreTopicHtml), + _renderSection("Erkenntnisse", _renderList(insights)), + _renderSection("Nächste Schritte", _renderList(nextSteps)), + _renderSection("Fortschritt", _renderList(progress)), + ]) + + return ( + '' + '' + '
' + f'

{html.escape(headline)}

' + f'

Thema: {html.escape(contextTitle)}

' + '' + f'{sectionsHtml}' + '
' + '
' + ) DOC_INTENT_MAX_DOCS = 3 DOC_CONTENT_MAX_CHARS = 3000 @@ -160,7 +318,7 @@ def _stripPendingUserMessages(messages: List[Dict[str, Any]]) -> List[Dict[str, def _parseAiJsonResponse(rawText: str) -> Dict[str, Any]: - """Parse the structured JSON response from AI. Strips optional markdown code fences.""" + """Parse optional structured AI output; otherwise treat free text as normal response.""" text = rawText.strip() if text.startswith("```"): lines = text.split("\n") @@ -169,10 +327,14 @@ def _parseAiJsonResponse(rawText: str) -> Dict[str, Any]: lines = lines[:-1] text = "\n".join(lines) try: - return json.loads(text) + parsed = json.loads(text) + if isinstance(parsed, dict): + if parsed.get("text") and not parsed.get("speech"): + parsed["speech"] = parsed.get("text") + return parsed + return {"text": rawText.strip(), "speech": rawText.strip(), "documents": []} except json.JSONDecodeError: - logger.warning(f"AI JSON parse failed, using raw text: {text[:200]}") - return {"text": rawText.strip(), "speech": "", "documents": []} + return {"text": rawText.strip(), "speech": rawText.strip(), "documents": []} async def _generateAndEmitTts(sessionId: str, speechText: str, currentUser, mandateId: str, @@ -197,8 +359,20 @@ async def _generateAndEmitTts(sessionId: str, speechText: str, currentUser, mand audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() ).decode() await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"}) + return + errorDetail = ttsResult.get("error", "Text-to-Speech failed") + await emitSessionEvent(sessionId, "error", { + "message": _buildTtsConfigErrorMessage(language, voiceName, errorDetail), + "detail": errorDetail, + "ttsLanguage": language, + "ttsVoice": voiceName, + }) except Exception as e: logger.warning(f"TTS failed for session {sessionId}: {e}") + await emitSessionEvent(sessionId, "error", { + "message": _buildTtsConfigErrorMessage("de-DE", None, str(e)), + "detail": str(e), + }) def _resolveFileNameAndMime(title: str) -> tuple: @@ -400,6 +574,151 @@ def _getDocumentSummaries(contextId: str, userId: str, interface, return None +def _createCommcoachRagFn( + userId: str, + featureInstanceId: str, + mandateId: str, + context: Dict[str, Any], + tasks: List[Dict[str, Any]], + currentUser=None, +): + """Create a CommCoach-specific RAG function combining KnowledgeService RAG with live coaching DB context.""" + + async def _buildRagContext( + currentPrompt: str, workflowId: str, userId: str, + featureInstanceId: str, mandateId: str, **kwargs + ) -> str: + parts = [] + + # 1. Standard KnowledgeService RAG (finds indexed session chunks + files) + try: + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + serviceContext = ServiceCenterContext( + user=currentUser, + mandate_id=mandateId, + feature_instance_id=featureInstanceId, + ) + knowledgeService = getService("knowledge", serviceContext) + ragContext = await knowledgeService.buildAgentContext( + currentPrompt=currentPrompt, + workflowId=workflowId, + userId=userId, + featureInstanceId=featureInstanceId, + mandateId=mandateId, + ) + if ragContext: + parts.append(ragContext) + except Exception as e: + logger.debug(f"CommCoach RAG knowledge context failed: {e}") + + # 2. Live coaching DB context (current goals, tasks, rolling overview) + liveContext = [] + goals = _parseJsonField(context.get("goals")) if context else None + if goals: + goalTexts = [g.get("text", g) if isinstance(g, dict) else str(g) for g in goals if g] + if goalTexts: + liveContext.append("Aktuelle Ziele:\n" + "\n".join(f"- {g}" for g in goalTexts)) + + openTasks = [t for t in (tasks or []) if t.get("status") in ("open", "inProgress")] + if openTasks: + taskLines = [f"- {t.get('title', '')}" for t in openTasks[:5]] + liveContext.append("Offene Aufgaben:\n" + "\n".join(taskLines)) + + rollingOverview = context.get("rollingOverview") if context else None + if rollingOverview: + liveContext.append(f"Gesamtüberblick bisheriger Sessions:\n{rollingOverview[:500]}") + + insights = _parseJsonField(context.get("insights")) if context else None + if insights: + insightTexts = [i.get("text", i) if isinstance(i, dict) else str(i) for i in insights[-5:] if i] + if insightTexts: + liveContext.append("Bisherige Erkenntnisse:\n" + "\n".join(f"- {t}" for t in insightTexts)) + + if liveContext: + parts.append("--- Coaching-Kontext (Live) ---\n" + "\n\n".join(liveContext)) + + return "\n\n".join(parts) if parts else "" + + return _buildRagContext + + +def _parseJsonField(value, fallback=None): + if not value: + return fallback + if isinstance(value, (list, dict)): + return value + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return fallback + + +_RESEARCH_KEYWORDS = re.compile( + r"\b(such|recherchier|schau nach|im web|finde heraus|google|online|nachschlagen|" + r"search|look up|find out|browse)\b", + re.IGNORECASE, +) + + +def _shouldActivateTools( + fileIds: Optional[List[str]], + dataSourceIds: Optional[List[str]], + featureDataSourceIds: Optional[List[str]], + userMessage: str, +) -> bool: + """Decide whether the agent should have tools activated for this turn.""" + if fileIds: + return True + if dataSourceIds: + return True + if featureDataSourceIds: + return True + if _RESEARCH_KEYWORDS.search(userMessage or ""): + return True + return False + + +def _buildConversationHistory(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert coaching messages to OpenAI-style conversation history for the agent.""" + history = [] + for msg in messages: + role = msg.get("role", "user") + content = msg.get("content", "") + if role in ("user", "assistant") and content: + history.append({"role": role, "content": content}) + return history + + +_TTS_WORD_LIMIT = 200 + + +async def _prepareSpeechText(fullText: str, callAiFn) -> str: + """Prepare text for TTS. Short responses used directly; long ones get summarized.""" + cleaned = _stripMarkdownForTts(fullText) + wordCount = len(cleaned.split()) + if wordCount <= _TTS_WORD_LIMIT: + return cleaned + try: + prompt = f"""Fasse den folgenden Text in 3-4 natürlichen, gesprochenen Sätzen zusammen. +Der Text soll vorgelesen werden – schreibe daher natürlich und flüssig, keine Aufzählungen. +Behalte die wichtigsten Punkte und den Ton bei. + +Text: +{cleaned[:3000]} + +Antworte NUR mit der gekürzten Sprachversion.""" + response = await callAiFn( + "Du kürzt Texte für Sprachausgabe. Antworte kurz und natürlich.", + prompt, + ) + if response and response.errorCount == 0 and response.content: + return response.content.strip() + except Exception as e: + logger.warning(f"Speech summary generation failed: {e}") + return cleaned[:1500] + + class CommcoachService: """Coaching orchestrator: processes messages, calls AI, extracts tasks and scores.""" @@ -409,14 +728,20 @@ class CommcoachService: self.instanceId = instanceId self.userId = str(currentUser.id) - async def processMessage(self, sessionId: str, contextId: str, userContent: str, interface) -> Dict[str, Any]: + async def processMessage( + self, sessionId: str, contextId: str, userContent: str, interface, + fileIds: Optional[List[str]] = None, + dataSourceIds: Optional[List[str]] = None, + featureDataSourceIds: Optional[List[str]] = None, + allowedProviders: Optional[List[str]] = None, + ) -> Dict[str, Any]: """ - Process a user message through the coaching pipeline: + Process a user message through the agent-based coaching pipeline: 1. Store user message - 2. Build context with history - 3. Call AI for coaching response - 4. Store assistant message - 5. Emit SSE events + 2. Build coaching system prompt + session history + 3. Run AgentService with CommCoach RAG and optional tools + 4. Map agent events to CommCoach SSE events + 5. Post-processing: store message, TTS, tasks, scores """ from . import interfaceFeatureCommcoach as interfaceDb @@ -474,88 +799,62 @@ class CommcoachService: logger.warning(f"History compression failed for session {sessionId}: {e}") previousMessages = messages[-20:] - # Combine all pending user messages (after last assistant message) as the user prompt combinedUserPrompt = _buildCombinedUserPrompt(previousMessages) if not combinedUserPrompt: combinedUserPrompt = userContent - # Strip pending user messages from previousMessages to avoid redundancy in system prompt contextMessages = _stripPendingUserMessages(previousMessages) - tasks = interface.getTasks(contextId, self.userId) await emitSessionEvent(sessionId, "status", {"label": "Kontext wird geladen..."}) - retrievalResult = await self._buildRetrievalContext( - contextId, sessionId, combinedUserPrompt, context, interface - ) - persona = _resolvePersona(session, interface) - documentSummaries = _getDocumentSummaries( - contextId, self.userId, interface, mandateId=self.mandateId, instanceId=self.instanceId - ) - - # Document intent detection (pre-AI-call) - referencedDocumentContents = None - allDocs = _getPlatformFileList(self.mandateId, self.instanceId) if documentSummaries else [] - if allDocs: - await emitSessionEvent(sessionId, "status", {"label": "Dokumente werden geprueft..."}) - docIntent = await _resolveDocumentIntent(combinedUserPrompt, allDocs, self._callAi) - if not docIntent.get("noDocumentAction"): - docIdsToLoad = list(set((docIntent.get("read") or []) + (docIntent.get("update") or []))) - if docIdsToLoad: - referencedDocumentContents = _loadDocumentContents( - docIdsToLoad, interface, mandateId=self.mandateId, instanceId=self.instanceId - ) systemPrompt = aiPrompts.buildCoachingSystemPrompt( context, contextMessages, tasks, - previousSessionSummaries=retrievalResult.get("previousSessionSummaries"), earlierSummary=earlierSummary, - rollingOverview=retrievalResult.get("rollingOverview"), - retrievedSession=retrievalResult.get("retrievedSession"), - retrievedByTopic=retrievalResult.get("retrievedByTopic"), persona=persona, - documentSummaries=documentSummaries, - referencedDocumentContents=referencedDocumentContents, ) - if retrievalResult.get("intent") == RetrievalIntent.SUMMARIZE_ALL: - systemPrompt += "\n\nWICHTIG: Der Benutzer möchte eine Gesamtzusammenfassung. Erstelle eine umfassende Zusammenfassung aller genannten Sessions und der aktuellen Session." + # Build conversation history for the agent + conversationHistory = _buildConversationHistory(contextMessages) + + # Dynamic tool activation + useTools = _shouldActivateTools(fileIds, dataSourceIds, featureDataSourceIds, combinedUserPrompt) - # Call AI await emitSessionEvent(sessionId, "status", {"label": "Coach formuliert Antwort..."}) try: - aiResponse = await self._callAi(systemPrompt, combinedUserPrompt) + agentResponse = await self._runAgent( + sessionId=sessionId, + prompt=combinedUserPrompt, + systemPrompt=systemPrompt, + conversationHistory=conversationHistory, + context=context, + tasks=tasks, + fileIds=fileIds, + useTools=useTools, + allowedProviders=allowedProviders, + ) except asyncio.CancelledError: logger.info(f"processMessage cancelled for session {sessionId} (new message arrived)") return createdUserMsg except Exception as e: - logger.error(f"AI call failed for session {sessionId}: {e}") + logger.error(f"Agent call failed for session {sessionId}: {e}") await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"}) return createdUserMsg - responseRaw = aiResponse.content.strip() if aiResponse and aiResponse.errorCount == 0 else "" + textContent = agentResponse or "" - if not responseRaw: - parsed = {"text": "Entschuldigung, ich konnte gerade nicht antworten. Bitte versuche es erneut.", "speech": "", "documents": []} - else: - parsed = _parseAiJsonResponse(responseRaw) - - textContent = parsed.get("text", "") - speechContent = parsed.get("speech", "") - documents = parsed.get("documents", []) + if not textContent: + textContent = "Entschuldigung, ich konnte gerade nicht antworten. Bitte versuche es erneut." if asyncio.current_task() and asyncio.current_task().cancelled(): logger.info(f"processMessage cancelled before storing response for session {sessionId}") return createdUserMsg - for doc in documents: - await _saveOrUpdateDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser) - assistantMsg = CoachingMessage( sessionId=sessionId, contextId=contextId, @@ -571,8 +870,11 @@ class CommcoachService: await emitSessionEvent(sessionId, "status", {"label": "Antwort wird verarbeitet..."}) + # TTS: use free-text directly; for long responses, generate speech summary + speechText = await _prepareSpeechText(textContent, self._callAi) + ttsTask = asyncio.create_task( - _generateAndEmitTts(sessionId, speechContent, self.currentUser, self.mandateId, self.instanceId, interface) + _generateAndEmitTts(sessionId, speechText, self.currentUser, self.mandateId, self.instanceId, interface) ) await _emitChunkedResponse(sessionId, createdAssistantMsg, textContent) await ttsTask @@ -580,6 +882,75 @@ class CommcoachService: await emitSessionEvent(sessionId, "complete", {}) return createdAssistantMsg + async def _runAgent( + self, + sessionId: str, + prompt: str, + systemPrompt: str, + conversationHistory: List[Dict[str, Any]], + context: Dict[str, Any], + tasks: List[Dict[str, Any]], + fileIds: Optional[List[str]] = None, + useTools: bool = False, + allowedProviders: Optional[List[str]] = None, + ) -> str: + """Run the AgentService for a coaching message. Returns the final text response.""" + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + from modules.serviceCenter.services.serviceAgent.datamodelAgent import AgentConfig, AgentEventTypeEnum + + serviceContext = ServiceCenterContext( + user=self.currentUser, + mandate_id=self.mandateId, + feature_instance_id=self.instanceId, + ) + agentService = getService("agent", serviceContext) + + config = AgentConfig( + toolSet="commcoach" if useTools else "none", + maxRounds=3 if useTools else 1, + temperature=0.4, + ) + + buildRagContextFn = _createCommcoachRagFn( + userId=self.userId, + featureInstanceId=self.instanceId, + mandateId=self.mandateId, + context=context, + tasks=tasks, + currentUser=self.currentUser, + ) + + finalText = "" + async for event in agentService.runAgent( + prompt=prompt, + fileIds=fileIds, + config=config, + toolSet=config.toolSet, + workflowId=f"commcoach:{sessionId}", + conversationHistory=conversationHistory, + buildRagContextFn=buildRagContextFn, + systemPromptOverride=systemPrompt, + ): + if event.type == AgentEventTypeEnum.CHUNK: + chunk = event.content or "" + finalText += chunk + elif event.type == AgentEventTypeEnum.MESSAGE: + finalText += event.content or "" + elif event.type == AgentEventTypeEnum.FINAL: + if not finalText: + finalText = event.content or "" + elif event.type == AgentEventTypeEnum.TOOL_CALL: + await emitSessionEvent(sessionId, "toolCall", event.data or {}) + elif event.type == AgentEventTypeEnum.TOOL_RESULT: + await emitSessionEvent(sessionId, "toolResult", event.data or {}) + elif event.type == AgentEventTypeEnum.AGENT_PROGRESS: + await emitSessionEvent(sessionId, "agentProgress", event.data or {}) + elif event.type == AgentEventTypeEnum.ERROR: + await emitSessionEvent(sessionId, "error", {"message": event.content or "Agent error"}) + + return finalText.strip() + async def processSessionOpening(self, sessionId: str, contextId: str, interface) -> Dict[str, Any]: """ Generate and stream the opening greeting for a new session. @@ -742,9 +1113,9 @@ class CommcoachService: }) return session - # Generate summary (AI returns JSON with summary + emailHtml) + # Generate summary (AI returns JSON with summary + structured email payload) summary = None - emailHtml = None + emailData = None try: summaryPrompt = aiPrompts.buildSummaryPrompt(messages, context.get("title", "Coaching")) summaryResponse = await self._callAi("Du bist ein präziser Zusammenfasser. Antworte NUR als JSON.", summaryPrompt) @@ -752,7 +1123,10 @@ class CommcoachService: parsed = aiPrompts.parseJsonResponse(summaryResponse.content.strip(), None) if isinstance(parsed, dict): summary = parsed.get("summary") or parsed.get("text") - emailHtml = parsed.get("emailHtml") + if isinstance(parsed.get("email"), dict): + emailData = parsed.get("email") + elif isinstance(parsed.get("emailData"), dict): + emailData = parsed.get("emailData") else: summary = summaryResponse.content.strip() except Exception as e: @@ -843,6 +1217,40 @@ class CommcoachService: except Exception as e: logger.warning(f"Insight generation failed: {e}") + # Index session data for RAG-based long-term memory + try: + from .serviceCommcoachIndexer import indexSessionData + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + + serviceContext = ServiceCenterContext( + user=self.currentUser, + mandate_id=self.mandateId, + feature_instance_id=self.instanceId, + ) + knowledgeService = getService("knowledge", serviceContext) + parsedGoals = aiPrompts._parseJsonField(context.get("goals") if context else None, []) + parsedInsights = aiPrompts._parseJsonField(context.get("insights") if context else None, []) + allTasks = interface.getTasks(contextId, self.userId) + + await indexSessionData( + sessionId=sessionId, + contextId=contextId, + userId=self.userId, + featureInstanceId=self.instanceId, + mandateId=self.mandateId, + messages=messages, + summary=summary, + keyTopics=keyTopics, + goals=parsedGoals, + insights=parsedInsights, + tasks=allTasks, + contextTitle=context.get("title", "Coaching") if context else "Coaching", + knowledgeService=knowledgeService, + ) + except Exception as e: + logger.warning(f"Coaching session indexing failed (non-blocking): {e}") + # Calculate duration startedAt = session.get("startedAt", "") durationSeconds = 0 @@ -898,7 +1306,7 @@ class CommcoachService: # Send email summary if summary: contextTitle = context.get("title", "Coaching") if context else "Coaching" - await self._sendSessionEmail(session, summary, emailHtml, contextTitle, interface) + await self._sendSessionEmail(session, summary, emailData, contextTitle, interface) await emitSessionEvent(sessionId, "sessionState", { "status": "completed", @@ -949,8 +1357,15 @@ class CommcoachService: except Exception as e: logger.warning(f"Failed to update streak: {e}") - async def _sendSessionEmail(self, session: Dict[str, Any], summary: str, emailHtml: str, contextTitle: str, interface): - """Send session summary via email if enabled. Uses AI-generated HTML directly.""" + async def _sendSessionEmail( + self, + session: Dict[str, Any], + summary: str, + emailData: Optional[Dict[str, Any]], + contextTitle: str, + interface, + ): + """Send session summary via email with the standard PowerOn layout.""" try: profile = interface.getProfile(self.userId, self.instanceId) if profile and not profile.get("emailSummaryEnabled", True): @@ -958,6 +1373,7 @@ class CommcoachService: from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface from modules.interfaces.interfaceDbApp import getRootInterface + from modules.shared.notifyMandateAdmins import _renderHtmlEmail, _resolveMandateName rootInterface = getRootInterface() user = rootInterface.getUser(self.userId) @@ -966,9 +1382,18 @@ class CommcoachService: messaging = getMessagingInterface() subject = f"Coaching-Session Zusammenfassung: {contextTitle}" - - contentHtml = emailHtml if emailHtml else f"

{summary}

" - htmlMessage = _wrapEmailHtml(contentHtml) + mandateName = _resolveMandateName(self.mandateId) + contentHtml = _buildSummaryEmailBlock(emailData, summary, contextTitle) + htmlMessage = _renderHtmlEmail( + "Coaching-Session Zusammenfassung", + [ + f'Thema: {contextTitle}', + "Hier ist die kompakte Zusammenfassung deiner abgeschlossenen Session.", + ], + mandateName, + footerNote="Diese Zusammenfassung wurde automatisch aus deiner Coaching-Session erstellt.", + rawHtmlBlock=contentHtml, + ) messaging.send("email", user.email, subject, htmlMessage) interface.updateSession(session.get("id"), {"emailSent": True}) diff --git a/modules/features/commcoach/serviceCommcoachAi.py b/modules/features/commcoach/serviceCommcoachAi.py index 97deb373..8b916005 100644 --- a/modules/features/commcoach/serviceCommcoachAi.py +++ b/modules/features/commcoach/serviceCommcoachAi.py @@ -168,29 +168,18 @@ Handlungsprinzip: - Wenn der Benutzer dich bittet, etwas zu erstellen (Dokument, Präsentation, Checkliste, Plan), dann TU ES SOFORT. Frage NICHT nochmals nach Bestätigung. - Verwende alle verfügbaren Informationen aus dem Chat-Verlauf, den Dokumenten und dem Kontext. - Wenn der Benutzer sagt "erstelle", "mach", "schreib", dann liefere das fertige Ergebnis — keine Aufzählung von Punkten, die du "gleich umsetzen wirst". +- Dir wird automatisch relevanter Kontext aus früheren Sessions bereitgestellt (Relevant Knowledge). Nutze diesen für Kontinuität und Bezugnahme auf frühere Gespräche. Antwortformat: -Du antwortest IMMER als reines JSON-Objekt mit exakt diesen Feldern: -{"text": "...", "speech": "...", "documents": []} +- Antworte direkt als Freitext (KEIN JSON). Markdown-Formatierung ist erlaubt. +- Halte Antworten gesprächig und kurz (2-6 Sätze im Normalfall), wie in einem echten Coaching-Gespräch. +- Bei komplexen Themen oder wenn der Benutzer Details anfragt, darf die Antwort ausführlicher sein. +- Dein Text wird sowohl angezeigt als auch vorgelesen – schreibe daher natürlich und gut sprechbar. -"text": Dein schriftlicher Chat-Text. Details, Struktur, Übungen, Beispiele. Markdown-Formatierung erlaubt. -"speech": Dein gesprochener Kommentar. Natürlich, wie ein Gespräch. Fasse zusammen, kommentiere, motiviere, stelle Fragen. Lies NICHT den Text vor, ergänze ihn mündlich. 2-4 Sätze, reiner Redetext ohne Formatierung. -"documents": Dokumente die der Benutzer aufbewahren kann. Erstelle ein Dokument wenn: der Benutzer explizit darum bittet, du strukturierte Inhalte lieferst, oder Material zum Aufbewahren sinnvoll ist. Wenn keine: leeres Array []. - -Dokument-Format: -{"title": "Dateiname_mit_Extension.html", "content": "...vollstaendiger Inhalt..."} -- Der Title IST der Dateiname inkl. Extension (.html, .md, .txt etc.) -- Fuer HTML-Dokumente: Erstelle VOLLSTAENDIGES, professionell gestyltes HTML mit inline CSS. Kein Markdown, sondern fertiges HTML mit Farben, Layout, Typografie. -- Fuer andere Dokumente: Verwende Markdown. -- WICHTIG: Der Content muss VOLLSTAENDIG und AUSFUEHRLICH sein. Keine Platzhalter, keine "hier kommt..."-Abschnitte. Schreibe echte, detaillierte Inhalte basierend auf allen verfuegbaren Informationen aus dem Chat und den Dokumenten. -- Laengenbeschraenkung fuer Dokumente: KEINE. Schreibe so viel wie noetig fuer ein vollstaendiges Ergebnis. - -Kanalverteilung: -- Fakten, Listen, Übungen -> text -- Empathie, Einordnung, Nachfragen -> speech -- Erstellte Dateien, Materialien zum Aufbewahren -> documents - -WICHTIG: Antworte NUR mit dem JSON-Objekt. Kein Text vor oder nach dem JSON.""" +Tool-Nutzung: +- Du hast Zugriff auf Tools (Dateien lesen, Web-Suche, Datenquellen abfragen) wenn der Benutzer Dateien/Quellen angehängt hat oder Recherche benötigt. +- Nutze Tools NUR wenn nötig. Für normales Coaching-Gespräch: antworte direkt ohne Tools. +- Wenn du ein Tool nutzt, erkläre kurz was du tust.""" if contextDescription: prompt += f"\n\nKontext-Beschreibung: {contextDescription}" @@ -279,7 +268,7 @@ Fuer ein NEUES Dokument: {"title": "...", "content": "...Inhalt..."}""" def buildSummaryPrompt(messages: List[Dict[str, Any]], contextTitle: str) -> str: - """Build a prompt to generate a session summary as JSON with plain text and styled HTML email.""" + """Build a prompt to generate a session summary plus structured email content.""" conversation = "" for msg in messages: role = "Benutzer" if msg.get("role") == "user" else "Coach" @@ -287,27 +276,33 @@ def buildSummaryPrompt(messages: List[Dict[str, Any]], contextTitle: str) -> str return f"""Erstelle eine Zusammenfassung dieser Coaching-Session zum Thema "{contextTitle}". -Antworte AUSSCHLIESSLICH als JSON mit zwei Feldern: +Antworte AUSSCHLIESSLICH als JSON im folgenden Format: {{ - "summary": "Kompakte Zusammenfassung als Plaintext (fuer Anzeige in der App). Struktur: 1. Kernthema, 2. Erkenntnisse, 3. Naechste Schritte, 4. Fortschritt.", - "emailHtml": "
...
" + "summary": "Kompakte Plaintext-Zusammenfassung fuer die App. Struktur: Kernthema, Erkenntnisse, Naechste Schritte, Fortschritt.", + "email": {{ + "headline": "Kurze, professionelle Titelzeile fuer die E-Mail", + "intro": "1-2 Saetze, die den Kern der Session auf den Punkt bringen", + "coreTopic": "Das zentrale Thema in einem praezisen Satz", + "insights": ["Erkenntnis 1", "Erkenntnis 2"], + "nextSteps": ["Naechster Schritt 1", "Naechster Schritt 2"], + "progress": ["Fortschritt 1", "Fortschritt 2"] + }} }} -Fuer "emailHtml": Erstelle ein professionell formatiertes HTML-Fragment (KEIN vollstaendiges HTML-Dokument, nur der Inhalt-Block). -Verwende inline CSS fuer schoene Darstellung in E-Mail-Clients: -- Verwende

fuer Abschnitte (color: #1e40af; margin: 20px 0 8px; font-size: 16px) -- Verwende