diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py
index 96e05185..662eded2 100644
--- a/modules/datamodels/datamodelAi.py
+++ b/modules/datamodels/datamodelAi.py
@@ -172,6 +172,7 @@ class AiCallRequest(BaseModel):
contentParts: Optional[List['ContentPart']] = None # Content parts for model-aware chunking
messages: Optional[List[Dict[str, Any]]] = Field(default=None, description="OpenAI-style messages for multi-turn agent conversations")
tools: Optional[List[Dict[str, Any]]] = Field(default=None, description="Tool definitions for native function calling")
+ toolChoice: Optional[Any] = Field(default=None, description="Tool choice: 'auto', 'none', or specific tool (passed through to model call)")
requireNeutralization: Optional[bool] = Field(default=None, description="Per-request neutralization override: True=force, False=skip, None=use config")
diff --git a/modules/features/commcoach/datamodelCommcoach.py b/modules/features/commcoach/datamodelCommcoach.py
index 635ba19a..82be6044 100644
--- a/modules/features/commcoach/datamodelCommcoach.py
+++ b/modules/features/commcoach/datamodelCommcoach.py
@@ -228,6 +228,10 @@ class UpdateContextRequest(BaseModel):
class SendMessageRequest(BaseModel):
content: str = Field(description="User message text")
contentType: Optional[CoachingMessageContentType] = CoachingMessageContentType.TEXT
+ fileIds: Optional[List[str]] = Field(default=None, description="Attached file IDs for agent context")
+ dataSourceIds: Optional[List[str]] = Field(default=None, description="Personal data source IDs")
+ featureDataSourceIds: Optional[List[str]] = Field(default=None, description="Feature data source IDs")
+ allowedProviders: Optional[List[str]] = Field(default=None, description="Allowed AI providers")
class CreateTaskRequest(BaseModel):
diff --git a/modules/features/commcoach/routeFeatureCommcoach.py b/modules/features/commcoach/routeFeatureCommcoach.py
index ccb4d342..8ffd3eca 100644
--- a/modules/features/commcoach/routeFeatureCommcoach.py
+++ b/modules/features/commcoach/routeFeatureCommcoach.py
@@ -334,9 +334,8 @@ async def startSession(
try:
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
voiceInterface = getVoiceInterface(context.user, mandateId)
- from .serviceCommcoach import _getUserVoicePrefs
+ from .serviceCommcoach import _getUserVoicePrefs, _stripMarkdownForTts, _buildTtsConfigErrorMessage
language, voiceName = _getUserVoicePrefs(userId, mandateId)
- from .serviceCommcoach import _stripMarkdownForTts
ttsResult = await voiceInterface.textToSpeech(
text=_stripMarkdownForTts(greetingText),
languageCode=language,
@@ -349,8 +348,12 @@ async def startSession(
audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode()
).decode()
yield f"data: {json.dumps({'type': 'ttsAudio', 'data': {'audio': audioB64, 'format': 'mp3'}})}\n\n"
+ else:
+ errorDetail = ttsResult.get("error", "Text-to-Speech failed")
+ yield f"data: {json.dumps({'type': 'error', 'data': {'message': _buildTtsConfigErrorMessage(language, voiceName, errorDetail), 'detail': errorDetail, 'ttsLanguage': language, 'ttsVoice': voiceName}})}\n\n"
except Exception as e:
logger.warning(f"TTS failed for resumed session: {e}")
+ yield f"data: {json.dumps({'type': 'error', 'data': {'message': 'Die konfigurierte Stimme für diese Sprache ist ungültig oder nicht verfügbar. Bitte passe sie unter Einstellungen > Stimme & Sprache an.', 'detail': str(e)}})}\n\n"
yield f"data: {json.dumps({'type': 'complete', 'data': {}, 'timestamp': getIsoTimestamp()})}\n\n"
return StreamingResponse(
@@ -511,7 +514,13 @@ async def sendMessageStream(
_activeProcessTasks.pop(sessionId, None)
task = asyncio.create_task(
- service.processMessage(sessionId, contextId, body.content, interface)
+ service.processMessage(
+ sessionId, contextId, body.content, interface,
+ fileIds=body.fileIds,
+ dataSourceIds=body.dataSourceIds,
+ featureDataSourceIds=body.featureDataSourceIds,
+ allowedProviders=body.allowedProviders,
+ )
)
task.add_done_callback(_onTaskDone)
_activeProcessTasks[sessionId] = task
diff --git a/modules/features/commcoach/serviceCommcoach.py b/modules/features/commcoach/serviceCommcoach.py
index 5e5aa810..332a4a01 100644
--- a/modules/features/commcoach/serviceCommcoach.py
+++ b/modules/features/commcoach/serviceCommcoach.py
@@ -6,6 +6,7 @@ Manages the coaching pipeline: message processing, AI calls, scoring, task extra
"""
import re
+import html
import logging
import json
import asyncio
@@ -43,25 +44,117 @@ from .serviceCommcoachContextRetrieval import (
logger = logging.getLogger(__name__)
+def _selectConfiguredVoice(
+ language: str,
+ voiceMap: Any,
+ legacyVoice: Optional[str] = None,
+ legacyLanguage: Optional[str] = None,
+) -> Optional[str]:
+ """Resolve the configured TTS voice for a language from ttsVoiceMap, then legacy ttsVoice."""
+ normalizedLanguage = str(language or "").strip()
+ normalizedLower = normalizedLanguage.lower()
+ baseLanguage = normalizedLower.split("-", 1)[0] if normalizedLower else ""
+
+ if isinstance(voiceMap, dict) and voiceMap:
+ direct = voiceMap.get(normalizedLanguage)
+ if isinstance(direct, str) and direct.strip():
+ return direct.strip()
+
+ directBase = voiceMap.get(baseLanguage)
+ if isinstance(directBase, str) and directBase.strip():
+ return directBase.strip()
+
+ for mapKey, mapValue in voiceMap.items():
+ if not isinstance(mapValue, str) or not mapValue.strip():
+ continue
+ keyNorm = str(mapKey or "").strip().lower()
+ if keyNorm == normalizedLower or keyNorm == baseLanguage or (baseLanguage and keyNorm.startswith(baseLanguage + "-")):
+ return mapValue.strip()
+
+ if legacyVoice and str(legacyVoice).strip():
+ legacyLangNorm = str(legacyLanguage or "").strip().lower()
+ if not legacyLangNorm or legacyLangNorm == normalizedLower:
+ return str(legacyVoice).strip()
+
+ return None
+
+
+def _buildTtsConfigErrorMessage(language: str, voiceName: Optional[str], rawError: str = "") -> str:
+ if voiceName:
+ return (
+ f'Die konfigurierte Stimme "{voiceName}" für {language} ist ungültig oder nicht verfügbar. '
+ 'Bitte passe sie unter Einstellungen > Stimme & Sprache an.'
+ )
+ return (
+ f'Für die Sprache {language} ist keine gültige TTS-Stimme konfiguriert. '
+ 'Bitte prüfe die Einstellungen unter Stimme & Sprache.'
+ )
+
+
def _getUserVoicePrefs(userId: str, mandateId: Optional[str] = None) -> tuple:
"""Load voice language and voiceName from central UserVoicePreferences.
Returns (language, voiceName) tuple."""
try:
from modules.datamodels.datamodelUam import UserVoicePreferences
- from modules.security.rootAccess import getRootInterface
+ from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
prefs = rootIf.db.getRecordset(
UserVoicePreferences,
- recordFilter={"userId": userId, "mandateId": mandateId}
+ recordFilter={"userId": userId}
)
- if not prefs and mandateId:
- prefs = rootIf.db.getRecordset(
- UserVoicePreferences,
- recordFilter={"userId": userId}
- )
if prefs:
- p = prefs[0] if isinstance(prefs[0], dict) else prefs[0].model_dump()
- return (p.get("ttsLanguage") or p.get("sttLanguage") or "de-DE", p.get("ttsVoice"))
+ allPrefs = [
+ pref if isinstance(pref, dict) else pref.model_dump()
+ for pref in prefs
+ ]
+ scopedPref = next(
+ (
+ pref for pref in allPrefs
+ if str(pref.get("mandateId") or "").strip() == str(mandateId or "").strip()
+ ),
+ None,
+ )
+ globalPref = next(
+ (
+ pref for pref in allPrefs
+ if not str(pref.get("mandateId") or "").strip()
+ ),
+ None,
+ )
+
+ language = (
+ (globalPref or {}).get("ttsLanguage")
+ or (globalPref or {}).get("sttLanguage")
+ or (scopedPref or {}).get("ttsLanguage")
+ or (scopedPref or {}).get("sttLanguage")
+ or "de-DE"
+ )
+
+ scopedVoiceFromMap = _selectConfiguredVoice(
+ language=language,
+ voiceMap=(scopedPref or {}).get("ttsVoiceMap"),
+ )
+ globalVoice = _selectConfiguredVoice(
+ language=language,
+ voiceMap=(globalPref or {}).get("ttsVoiceMap"),
+ legacyVoice=(globalPref or {}).get("ttsVoice"),
+ legacyLanguage=(globalPref or {}).get("ttsLanguage"),
+ )
+ scopedLegacyVoice = _selectConfiguredVoice(
+ language=language,
+ voiceMap=None,
+ legacyVoice=(scopedPref or {}).get("ttsVoice"),
+ legacyLanguage=(scopedPref or {}).get("ttsLanguage"),
+ )
+ anyPref = allPrefs[0]
+ fallbackVoice = _selectConfiguredVoice(
+ language=language,
+ voiceMap=(anyPref or {}).get("ttsVoiceMap"),
+ legacyVoice=(anyPref or {}).get("ttsVoice"),
+ legacyLanguage=(anyPref or {}).get("ttsLanguage"),
+ )
+ voiceName = scopedVoiceFromMap or globalVoice or scopedLegacyVoice or fallbackVoice
+ return (language, voiceName)
except Exception as e:
logger.warning(f"Failed to load UserVoicePreferences for user={userId}: {e}")
return ("de-DE", None)
@@ -111,26 +204,91 @@ def cleanupSessionEvents(sessionId: str):
CHUNK_WORD_SIZE = 4
CHUNK_DELAY_SECONDS = 0.05
-def _wrapEmailHtml(contentHtml: str) -> str:
- """Wrap AI-generated HTML content in a styled email shell."""
- return f"""
-
-
-
-
-
-
-
Coaching-Session Zusammenfassung
-
PowerOn CommCoach
-
-
{contentHtml}
-
-
Diese Zusammenfassung wurde automatisch erstellt.
-
-
-
-
-"""
+
+def _normalizeEmailBulletList(values: Any, maxItems: int = 4) -> List[str]:
+ items: List[str] = []
+ if not isinstance(values, list):
+ return items
+ for value in values:
+ text = str(value or "").strip()
+ if text:
+ items.append(text)
+ if len(items) >= maxItems:
+ break
+ return items
+
+
+def _buildSummaryEmailBlock(
+ emailData: Optional[Dict[str, Any]],
+ summary: str,
+ contextTitle: str,
+) -> str:
+ """Render a stable, mail-client-friendly CommCoach summary block."""
+ payload = emailData or {}
+ headline = str(payload.get("headline") or contextTitle or "Coaching-Session").strip()
+ intro = str(payload.get("intro") or "").strip()
+ coreTopic = str(payload.get("coreTopic") or "").strip()
+ insights = _normalizeEmailBulletList(payload.get("insights"))
+ nextSteps = _normalizeEmailBulletList(payload.get("nextSteps"))
+ progress = _normalizeEmailBulletList(payload.get("progress"))
+
+ if not (intro or coreTopic or insights or nextSteps or progress):
+ escapedSummary = html.escape(summary or "").replace("\n", "
")
+ return (
+ ''
+ f'
{html.escape(headline)}
'
+ f'
{escapedSummary}
'
+ '
'
+ )
+
+ def _renderSection(title: str, bodyHtml: str) -> str:
+ if not bodyHtml:
+ return ""
+ return (
+ '| '
+ f' {html.escape(title)} '
+ f'{bodyHtml} '
+ ' |
'
+ )
+
+ def _renderList(values: List[str]) -> str:
+ if not values:
+ return ""
+ rows = "".join(
+ ''
+ '| • | '
+ f'{html.escape(item)} | '
+ '
'
+ for item in values
+ )
+ return f''
+
+ introHtml = f'{html.escape(intro)}
' if intro else ""
+ coreTopicHtml = f'{html.escape(coreTopic)}
' if coreTopic else ""
+
+ sectionsHtml = "".join([
+ _renderSection("Kernbotschaft", introHtml),
+ _renderSection("Kernthema", coreTopicHtml),
+ _renderSection("Erkenntnisse", _renderList(insights)),
+ _renderSection("Nächste Schritte", _renderList(nextSteps)),
+ _renderSection("Fortschritt", _renderList(progress)),
+ ])
+
+ return (
+ ''
+ ''
+ f'{html.escape(headline)}'
+ f'Thema: {html.escape(contextTitle)} '
+ ''
+ f'{sectionsHtml}'
+ ' '
+ ' |
'
+ '
'
+ )
DOC_INTENT_MAX_DOCS = 3
DOC_CONTENT_MAX_CHARS = 3000
@@ -160,7 +318,7 @@ def _stripPendingUserMessages(messages: List[Dict[str, Any]]) -> List[Dict[str,
def _parseAiJsonResponse(rawText: str) -> Dict[str, Any]:
- """Parse the structured JSON response from AI. Strips optional markdown code fences."""
+ """Parse optional structured AI output; otherwise treat free text as normal response."""
text = rawText.strip()
if text.startswith("```"):
lines = text.split("\n")
@@ -169,10 +327,14 @@ def _parseAiJsonResponse(rawText: str) -> Dict[str, Any]:
lines = lines[:-1]
text = "\n".join(lines)
try:
- return json.loads(text)
+ parsed = json.loads(text)
+ if isinstance(parsed, dict):
+ if parsed.get("text") and not parsed.get("speech"):
+ parsed["speech"] = parsed.get("text")
+ return parsed
+ return {"text": rawText.strip(), "speech": rawText.strip(), "documents": []}
except json.JSONDecodeError:
- logger.warning(f"AI JSON parse failed, using raw text: {text[:200]}")
- return {"text": rawText.strip(), "speech": "", "documents": []}
+ return {"text": rawText.strip(), "speech": rawText.strip(), "documents": []}
async def _generateAndEmitTts(sessionId: str, speechText: str, currentUser, mandateId: str,
@@ -197,8 +359,20 @@ async def _generateAndEmitTts(sessionId: str, speechText: str, currentUser, mand
audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode()
).decode()
await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"})
+ return
+ errorDetail = ttsResult.get("error", "Text-to-Speech failed")
+ await emitSessionEvent(sessionId, "error", {
+ "message": _buildTtsConfigErrorMessage(language, voiceName, errorDetail),
+ "detail": errorDetail,
+ "ttsLanguage": language,
+ "ttsVoice": voiceName,
+ })
except Exception as e:
logger.warning(f"TTS failed for session {sessionId}: {e}")
+ await emitSessionEvent(sessionId, "error", {
+ "message": _buildTtsConfigErrorMessage("de-DE", None, str(e)),
+ "detail": str(e),
+ })
def _resolveFileNameAndMime(title: str) -> tuple:
@@ -400,6 +574,151 @@ def _getDocumentSummaries(contextId: str, userId: str, interface,
return None
+def _createCommcoachRagFn(
+ userId: str,
+ featureInstanceId: str,
+ mandateId: str,
+ context: Dict[str, Any],
+ tasks: List[Dict[str, Any]],
+ currentUser=None,
+):
+ """Create a CommCoach-specific RAG function combining KnowledgeService RAG with live coaching DB context."""
+
+ async def _buildRagContext(
+ currentPrompt: str, workflowId: str, userId: str,
+ featureInstanceId: str, mandateId: str, **kwargs
+ ) -> str:
+ parts = []
+
+ # 1. Standard KnowledgeService RAG (finds indexed session chunks + files)
+ try:
+ from modules.serviceCenter import getService
+ from modules.serviceCenter.context import ServiceCenterContext
+ serviceContext = ServiceCenterContext(
+ user=currentUser,
+ mandate_id=mandateId,
+ feature_instance_id=featureInstanceId,
+ )
+ knowledgeService = getService("knowledge", serviceContext)
+ ragContext = await knowledgeService.buildAgentContext(
+ currentPrompt=currentPrompt,
+ workflowId=workflowId,
+ userId=userId,
+ featureInstanceId=featureInstanceId,
+ mandateId=mandateId,
+ )
+ if ragContext:
+ parts.append(ragContext)
+ except Exception as e:
+ logger.debug(f"CommCoach RAG knowledge context failed: {e}")
+
+ # 2. Live coaching DB context (current goals, tasks, rolling overview)
+ liveContext = []
+ goals = _parseJsonField(context.get("goals")) if context else None
+ if goals:
+ goalTexts = [g.get("text", g) if isinstance(g, dict) else str(g) for g in goals if g]
+ if goalTexts:
+ liveContext.append("Aktuelle Ziele:\n" + "\n".join(f"- {g}" for g in goalTexts))
+
+ openTasks = [t for t in (tasks or []) if t.get("status") in ("open", "inProgress")]
+ if openTasks:
+ taskLines = [f"- {t.get('title', '')}" for t in openTasks[:5]]
+ liveContext.append("Offene Aufgaben:\n" + "\n".join(taskLines))
+
+ rollingOverview = context.get("rollingOverview") if context else None
+ if rollingOverview:
+ liveContext.append(f"Gesamtüberblick bisheriger Sessions:\n{rollingOverview[:500]}")
+
+ insights = _parseJsonField(context.get("insights")) if context else None
+ if insights:
+ insightTexts = [i.get("text", i) if isinstance(i, dict) else str(i) for i in insights[-5:] if i]
+ if insightTexts:
+ liveContext.append("Bisherige Erkenntnisse:\n" + "\n".join(f"- {t}" for t in insightTexts))
+
+ if liveContext:
+ parts.append("--- Coaching-Kontext (Live) ---\n" + "\n\n".join(liveContext))
+
+ return "\n\n".join(parts) if parts else ""
+
+ return _buildRagContext
+
+
+def _parseJsonField(value, fallback=None):
+ if not value:
+ return fallback
+ if isinstance(value, (list, dict)):
+ return value
+ try:
+ return json.loads(value)
+ except (json.JSONDecodeError, TypeError):
+ return fallback
+
+
+_RESEARCH_KEYWORDS = re.compile(
+ r"\b(such|recherchier|schau nach|im web|finde heraus|google|online|nachschlagen|"
+ r"search|look up|find out|browse)\b",
+ re.IGNORECASE,
+)
+
+
+def _shouldActivateTools(
+ fileIds: Optional[List[str]],
+ dataSourceIds: Optional[List[str]],
+ featureDataSourceIds: Optional[List[str]],
+ userMessage: str,
+) -> bool:
+ """Decide whether the agent should have tools activated for this turn."""
+ if fileIds:
+ return True
+ if dataSourceIds:
+ return True
+ if featureDataSourceIds:
+ return True
+ if _RESEARCH_KEYWORDS.search(userMessage or ""):
+ return True
+ return False
+
+
+def _buildConversationHistory(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """Convert coaching messages to OpenAI-style conversation history for the agent."""
+ history = []
+ for msg in messages:
+ role = msg.get("role", "user")
+ content = msg.get("content", "")
+ if role in ("user", "assistant") and content:
+ history.append({"role": role, "content": content})
+ return history
+
+
+_TTS_WORD_LIMIT = 200
+
+
+async def _prepareSpeechText(fullText: str, callAiFn) -> str:
+ """Prepare text for TTS. Short responses used directly; long ones get summarized."""
+ cleaned = _stripMarkdownForTts(fullText)
+ wordCount = len(cleaned.split())
+ if wordCount <= _TTS_WORD_LIMIT:
+ return cleaned
+ try:
+ prompt = f"""Fasse den folgenden Text in 3-4 natürlichen, gesprochenen Sätzen zusammen.
+Der Text soll vorgelesen werden – schreibe daher natürlich und flüssig, keine Aufzählungen.
+Behalte die wichtigsten Punkte und den Ton bei.
+
+Text:
+{cleaned[:3000]}
+
+Antworte NUR mit der gekürzten Sprachversion."""
+ response = await callAiFn(
+ "Du kürzt Texte für Sprachausgabe. Antworte kurz und natürlich.",
+ prompt,
+ )
+ if response and response.errorCount == 0 and response.content:
+ return response.content.strip()
+ except Exception as e:
+ logger.warning(f"Speech summary generation failed: {e}")
+ return cleaned[:1500]
+
+
class CommcoachService:
"""Coaching orchestrator: processes messages, calls AI, extracts tasks and scores."""
@@ -409,14 +728,20 @@ class CommcoachService:
self.instanceId = instanceId
self.userId = str(currentUser.id)
- async def processMessage(self, sessionId: str, contextId: str, userContent: str, interface) -> Dict[str, Any]:
+ async def processMessage(
+ self, sessionId: str, contextId: str, userContent: str, interface,
+ fileIds: Optional[List[str]] = None,
+ dataSourceIds: Optional[List[str]] = None,
+ featureDataSourceIds: Optional[List[str]] = None,
+ allowedProviders: Optional[List[str]] = None,
+ ) -> Dict[str, Any]:
"""
- Process a user message through the coaching pipeline:
+ Process a user message through the agent-based coaching pipeline:
1. Store user message
- 2. Build context with history
- 3. Call AI for coaching response
- 4. Store assistant message
- 5. Emit SSE events
+ 2. Build coaching system prompt + session history
+ 3. Run AgentService with CommCoach RAG and optional tools
+ 4. Map agent events to CommCoach SSE events
+ 5. Post-processing: store message, TTS, tasks, scores
"""
from . import interfaceFeatureCommcoach as interfaceDb
@@ -474,88 +799,62 @@ class CommcoachService:
logger.warning(f"History compression failed for session {sessionId}: {e}")
previousMessages = messages[-20:]
- # Combine all pending user messages (after last assistant message) as the user prompt
combinedUserPrompt = _buildCombinedUserPrompt(previousMessages)
if not combinedUserPrompt:
combinedUserPrompt = userContent
- # Strip pending user messages from previousMessages to avoid redundancy in system prompt
contextMessages = _stripPendingUserMessages(previousMessages)
-
tasks = interface.getTasks(contextId, self.userId)
await emitSessionEvent(sessionId, "status", {"label": "Kontext wird geladen..."})
- retrievalResult = await self._buildRetrievalContext(
- contextId, sessionId, combinedUserPrompt, context, interface
- )
-
persona = _resolvePersona(session, interface)
- documentSummaries = _getDocumentSummaries(
- contextId, self.userId, interface, mandateId=self.mandateId, instanceId=self.instanceId
- )
-
- # Document intent detection (pre-AI-call)
- referencedDocumentContents = None
- allDocs = _getPlatformFileList(self.mandateId, self.instanceId) if documentSummaries else []
- if allDocs:
- await emitSessionEvent(sessionId, "status", {"label": "Dokumente werden geprueft..."})
- docIntent = await _resolveDocumentIntent(combinedUserPrompt, allDocs, self._callAi)
- if not docIntent.get("noDocumentAction"):
- docIdsToLoad = list(set((docIntent.get("read") or []) + (docIntent.get("update") or [])))
- if docIdsToLoad:
- referencedDocumentContents = _loadDocumentContents(
- docIdsToLoad, interface, mandateId=self.mandateId, instanceId=self.instanceId
- )
systemPrompt = aiPrompts.buildCoachingSystemPrompt(
context,
contextMessages,
tasks,
- previousSessionSummaries=retrievalResult.get("previousSessionSummaries"),
earlierSummary=earlierSummary,
- rollingOverview=retrievalResult.get("rollingOverview"),
- retrievedSession=retrievalResult.get("retrievedSession"),
- retrievedByTopic=retrievalResult.get("retrievedByTopic"),
persona=persona,
- documentSummaries=documentSummaries,
- referencedDocumentContents=referencedDocumentContents,
)
- if retrievalResult.get("intent") == RetrievalIntent.SUMMARIZE_ALL:
- systemPrompt += "\n\nWICHTIG: Der Benutzer möchte eine Gesamtzusammenfassung. Erstelle eine umfassende Zusammenfassung aller genannten Sessions und der aktuellen Session."
+ # Build conversation history for the agent
+ conversationHistory = _buildConversationHistory(contextMessages)
+
+ # Dynamic tool activation
+ useTools = _shouldActivateTools(fileIds, dataSourceIds, featureDataSourceIds, combinedUserPrompt)
- # Call AI
await emitSessionEvent(sessionId, "status", {"label": "Coach formuliert Antwort..."})
try:
- aiResponse = await self._callAi(systemPrompt, combinedUserPrompt)
+ agentResponse = await self._runAgent(
+ sessionId=sessionId,
+ prompt=combinedUserPrompt,
+ systemPrompt=systemPrompt,
+ conversationHistory=conversationHistory,
+ context=context,
+ tasks=tasks,
+ fileIds=fileIds,
+ useTools=useTools,
+ allowedProviders=allowedProviders,
+ )
except asyncio.CancelledError:
logger.info(f"processMessage cancelled for session {sessionId} (new message arrived)")
return createdUserMsg
except Exception as e:
- logger.error(f"AI call failed for session {sessionId}: {e}")
+ logger.error(f"Agent call failed for session {sessionId}: {e}")
await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"})
return createdUserMsg
- responseRaw = aiResponse.content.strip() if aiResponse and aiResponse.errorCount == 0 else ""
+ textContent = agentResponse or ""
- if not responseRaw:
- parsed = {"text": "Entschuldigung, ich konnte gerade nicht antworten. Bitte versuche es erneut.", "speech": "", "documents": []}
- else:
- parsed = _parseAiJsonResponse(responseRaw)
-
- textContent = parsed.get("text", "")
- speechContent = parsed.get("speech", "")
- documents = parsed.get("documents", [])
+ if not textContent:
+ textContent = "Entschuldigung, ich konnte gerade nicht antworten. Bitte versuche es erneut."
if asyncio.current_task() and asyncio.current_task().cancelled():
logger.info(f"processMessage cancelled before storing response for session {sessionId}")
return createdUserMsg
- for doc in documents:
- await _saveOrUpdateDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser)
-
assistantMsg = CoachingMessage(
sessionId=sessionId,
contextId=contextId,
@@ -571,8 +870,11 @@ class CommcoachService:
await emitSessionEvent(sessionId, "status", {"label": "Antwort wird verarbeitet..."})
+ # TTS: use free-text directly; for long responses, generate speech summary
+ speechText = await _prepareSpeechText(textContent, self._callAi)
+
ttsTask = asyncio.create_task(
- _generateAndEmitTts(sessionId, speechContent, self.currentUser, self.mandateId, self.instanceId, interface)
+ _generateAndEmitTts(sessionId, speechText, self.currentUser, self.mandateId, self.instanceId, interface)
)
await _emitChunkedResponse(sessionId, createdAssistantMsg, textContent)
await ttsTask
@@ -580,6 +882,75 @@ class CommcoachService:
await emitSessionEvent(sessionId, "complete", {})
return createdAssistantMsg
+ async def _runAgent(
+ self,
+ sessionId: str,
+ prompt: str,
+ systemPrompt: str,
+ conversationHistory: List[Dict[str, Any]],
+ context: Dict[str, Any],
+ tasks: List[Dict[str, Any]],
+ fileIds: Optional[List[str]] = None,
+ useTools: bool = False,
+ allowedProviders: Optional[List[str]] = None,
+ ) -> str:
+ """Run the AgentService for a coaching message. Returns the final text response."""
+ from modules.serviceCenter import getService
+ from modules.serviceCenter.context import ServiceCenterContext
+ from modules.serviceCenter.services.serviceAgent.datamodelAgent import AgentConfig, AgentEventTypeEnum
+
+ serviceContext = ServiceCenterContext(
+ user=self.currentUser,
+ mandate_id=self.mandateId,
+ feature_instance_id=self.instanceId,
+ )
+ agentService = getService("agent", serviceContext)
+
+ config = AgentConfig(
+ toolSet="commcoach" if useTools else "none",
+ maxRounds=3 if useTools else 1,
+ temperature=0.4,
+ )
+
+ buildRagContextFn = _createCommcoachRagFn(
+ userId=self.userId,
+ featureInstanceId=self.instanceId,
+ mandateId=self.mandateId,
+ context=context,
+ tasks=tasks,
+ currentUser=self.currentUser,
+ )
+
+ finalText = ""
+ async for event in agentService.runAgent(
+ prompt=prompt,
+ fileIds=fileIds,
+ config=config,
+ toolSet=config.toolSet,
+ workflowId=f"commcoach:{sessionId}",
+ conversationHistory=conversationHistory,
+ buildRagContextFn=buildRagContextFn,
+ systemPromptOverride=systemPrompt,
+ ):
+ if event.type == AgentEventTypeEnum.CHUNK:
+ chunk = event.content or ""
+ finalText += chunk
+ elif event.type == AgentEventTypeEnum.MESSAGE:
+ finalText += event.content or ""
+ elif event.type == AgentEventTypeEnum.FINAL:
+ if not finalText:
+ finalText = event.content or ""
+ elif event.type == AgentEventTypeEnum.TOOL_CALL:
+ await emitSessionEvent(sessionId, "toolCall", event.data or {})
+ elif event.type == AgentEventTypeEnum.TOOL_RESULT:
+ await emitSessionEvent(sessionId, "toolResult", event.data or {})
+ elif event.type == AgentEventTypeEnum.AGENT_PROGRESS:
+ await emitSessionEvent(sessionId, "agentProgress", event.data or {})
+ elif event.type == AgentEventTypeEnum.ERROR:
+ await emitSessionEvent(sessionId, "error", {"message": event.content or "Agent error"})
+
+ return finalText.strip()
+
async def processSessionOpening(self, sessionId: str, contextId: str, interface) -> Dict[str, Any]:
"""
Generate and stream the opening greeting for a new session.
@@ -742,9 +1113,9 @@ class CommcoachService:
})
return session
- # Generate summary (AI returns JSON with summary + emailHtml)
+ # Generate summary (AI returns JSON with summary + structured email payload)
summary = None
- emailHtml = None
+ emailData = None
try:
summaryPrompt = aiPrompts.buildSummaryPrompt(messages, context.get("title", "Coaching"))
summaryResponse = await self._callAi("Du bist ein präziser Zusammenfasser. Antworte NUR als JSON.", summaryPrompt)
@@ -752,7 +1123,10 @@ class CommcoachService:
parsed = aiPrompts.parseJsonResponse(summaryResponse.content.strip(), None)
if isinstance(parsed, dict):
summary = parsed.get("summary") or parsed.get("text")
- emailHtml = parsed.get("emailHtml")
+ if isinstance(parsed.get("email"), dict):
+ emailData = parsed.get("email")
+ elif isinstance(parsed.get("emailData"), dict):
+ emailData = parsed.get("emailData")
else:
summary = summaryResponse.content.strip()
except Exception as e:
@@ -843,6 +1217,40 @@ class CommcoachService:
except Exception as e:
logger.warning(f"Insight generation failed: {e}")
+ # Index session data for RAG-based long-term memory
+ try:
+ from .serviceCommcoachIndexer import indexSessionData
+ from modules.serviceCenter import getService
+ from modules.serviceCenter.context import ServiceCenterContext
+
+ serviceContext = ServiceCenterContext(
+ user=self.currentUser,
+ mandate_id=self.mandateId,
+ feature_instance_id=self.instanceId,
+ )
+ knowledgeService = getService("knowledge", serviceContext)
+ parsedGoals = aiPrompts._parseJsonField(context.get("goals") if context else None, [])
+ parsedInsights = aiPrompts._parseJsonField(context.get("insights") if context else None, [])
+ allTasks = interface.getTasks(contextId, self.userId)
+
+ await indexSessionData(
+ sessionId=sessionId,
+ contextId=contextId,
+ userId=self.userId,
+ featureInstanceId=self.instanceId,
+ mandateId=self.mandateId,
+ messages=messages,
+ summary=summary,
+ keyTopics=keyTopics,
+ goals=parsedGoals,
+ insights=parsedInsights,
+ tasks=allTasks,
+ contextTitle=context.get("title", "Coaching") if context else "Coaching",
+ knowledgeService=knowledgeService,
+ )
+ except Exception as e:
+ logger.warning(f"Coaching session indexing failed (non-blocking): {e}")
+
# Calculate duration
startedAt = session.get("startedAt", "")
durationSeconds = 0
@@ -898,7 +1306,7 @@ class CommcoachService:
# Send email summary
if summary:
contextTitle = context.get("title", "Coaching") if context else "Coaching"
- await self._sendSessionEmail(session, summary, emailHtml, contextTitle, interface)
+ await self._sendSessionEmail(session, summary, emailData, contextTitle, interface)
await emitSessionEvent(sessionId, "sessionState", {
"status": "completed",
@@ -949,8 +1357,15 @@ class CommcoachService:
except Exception as e:
logger.warning(f"Failed to update streak: {e}")
- async def _sendSessionEmail(self, session: Dict[str, Any], summary: str, emailHtml: str, contextTitle: str, interface):
- """Send session summary via email if enabled. Uses AI-generated HTML directly."""
+ async def _sendSessionEmail(
+ self,
+ session: Dict[str, Any],
+ summary: str,
+ emailData: Optional[Dict[str, Any]],
+ contextTitle: str,
+ interface,
+ ):
+ """Send session summary via email with the standard PowerOn layout."""
try:
profile = interface.getProfile(self.userId, self.instanceId)
if profile and not profile.get("emailSummaryEnabled", True):
@@ -958,6 +1373,7 @@ class CommcoachService:
from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface
from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.shared.notifyMandateAdmins import _renderHtmlEmail, _resolveMandateName
rootInterface = getRootInterface()
user = rootInterface.getUser(self.userId)
@@ -966,9 +1382,18 @@ class CommcoachService:
messaging = getMessagingInterface()
subject = f"Coaching-Session Zusammenfassung: {contextTitle}"
-
- contentHtml = emailHtml if emailHtml else f"{summary}
"
- htmlMessage = _wrapEmailHtml(contentHtml)
+ mandateName = _resolveMandateName(self.mandateId)
+ contentHtml = _buildSummaryEmailBlock(emailData, summary, contextTitle)
+ htmlMessage = _renderHtmlEmail(
+ "Coaching-Session Zusammenfassung",
+ [
+ f'Thema: {contextTitle}',
+ "Hier ist die kompakte Zusammenfassung deiner abgeschlossenen Session.",
+ ],
+ mandateName,
+ footerNote="Diese Zusammenfassung wurde automatisch aus deiner Coaching-Session erstellt.",
+ rawHtmlBlock=contentHtml,
+ )
messaging.send("email", user.email, subject, htmlMessage)
interface.updateSession(session.get("id"), {"emailSent": True})
diff --git a/modules/features/commcoach/serviceCommcoachAi.py b/modules/features/commcoach/serviceCommcoachAi.py
index 97deb373..8b916005 100644
--- a/modules/features/commcoach/serviceCommcoachAi.py
+++ b/modules/features/commcoach/serviceCommcoachAi.py
@@ -168,29 +168,18 @@ Handlungsprinzip:
- Wenn der Benutzer dich bittet, etwas zu erstellen (Dokument, Präsentation, Checkliste, Plan), dann TU ES SOFORT. Frage NICHT nochmals nach Bestätigung.
- Verwende alle verfügbaren Informationen aus dem Chat-Verlauf, den Dokumenten und dem Kontext.
- Wenn der Benutzer sagt "erstelle", "mach", "schreib", dann liefere das fertige Ergebnis — keine Aufzählung von Punkten, die du "gleich umsetzen wirst".
+- Dir wird automatisch relevanter Kontext aus früheren Sessions bereitgestellt (Relevant Knowledge). Nutze diesen für Kontinuität und Bezugnahme auf frühere Gespräche.
Antwortformat:
-Du antwortest IMMER als reines JSON-Objekt mit exakt diesen Feldern:
-{"text": "...", "speech": "...", "documents": []}
+- Antworte direkt als Freitext (KEIN JSON). Markdown-Formatierung ist erlaubt.
+- Halte Antworten gesprächig und kurz (2-6 Sätze im Normalfall), wie in einem echten Coaching-Gespräch.
+- Bei komplexen Themen oder wenn der Benutzer Details anfragt, darf die Antwort ausführlicher sein.
+- Dein Text wird sowohl angezeigt als auch vorgelesen – schreibe daher natürlich und gut sprechbar.
-"text": Dein schriftlicher Chat-Text. Details, Struktur, Übungen, Beispiele. Markdown-Formatierung erlaubt.
-"speech": Dein gesprochener Kommentar. Natürlich, wie ein Gespräch. Fasse zusammen, kommentiere, motiviere, stelle Fragen. Lies NICHT den Text vor, ergänze ihn mündlich. 2-4 Sätze, reiner Redetext ohne Formatierung.
-"documents": Dokumente die der Benutzer aufbewahren kann. Erstelle ein Dokument wenn: der Benutzer explizit darum bittet, du strukturierte Inhalte lieferst, oder Material zum Aufbewahren sinnvoll ist. Wenn keine: leeres Array [].
-
-Dokument-Format:
-{"title": "Dateiname_mit_Extension.html", "content": "...vollstaendiger Inhalt..."}
-- Der Title IST der Dateiname inkl. Extension (.html, .md, .txt etc.)
-- Fuer HTML-Dokumente: Erstelle VOLLSTAENDIGES, professionell gestyltes HTML mit inline CSS. Kein Markdown, sondern fertiges HTML mit Farben, Layout, Typografie.
-- Fuer andere Dokumente: Verwende Markdown.
-- WICHTIG: Der Content muss VOLLSTAENDIG und AUSFUEHRLICH sein. Keine Platzhalter, keine "hier kommt..."-Abschnitte. Schreibe echte, detaillierte Inhalte basierend auf allen verfuegbaren Informationen aus dem Chat und den Dokumenten.
-- Laengenbeschraenkung fuer Dokumente: KEINE. Schreibe so viel wie noetig fuer ein vollstaendiges Ergebnis.
-
-Kanalverteilung:
-- Fakten, Listen, Übungen -> text
-- Empathie, Einordnung, Nachfragen -> speech
-- Erstellte Dateien, Materialien zum Aufbewahren -> documents
-
-WICHTIG: Antworte NUR mit dem JSON-Objekt. Kein Text vor oder nach dem JSON."""
+Tool-Nutzung:
+- Du hast Zugriff auf Tools (Dateien lesen, Web-Suche, Datenquellen abfragen) wenn der Benutzer Dateien/Quellen angehängt hat oder Recherche benötigt.
+- Nutze Tools NUR wenn nötig. Für normales Coaching-Gespräch: antworte direkt ohne Tools.
+- Wenn du ein Tool nutzt, erkläre kurz was du tust."""
if contextDescription:
prompt += f"\n\nKontext-Beschreibung: {contextDescription}"
@@ -279,7 +268,7 @@ Fuer ein NEUES Dokument: {"title": "...", "content": "...Inhalt..."}"""
def buildSummaryPrompt(messages: List[Dict[str, Any]], contextTitle: str) -> str:
- """Build a prompt to generate a session summary as JSON with plain text and styled HTML email."""
+ """Build a prompt to generate a session summary plus structured email content."""
conversation = ""
for msg in messages:
role = "Benutzer" if msg.get("role") == "user" else "Coach"
@@ -287,27 +276,33 @@ def buildSummaryPrompt(messages: List[Dict[str, Any]], contextTitle: str) -> str
return f"""Erstelle eine Zusammenfassung dieser Coaching-Session zum Thema "{contextTitle}".
-Antworte AUSSCHLIESSLICH als JSON mit zwei Feldern:
+Antworte AUSSCHLIESSLICH als JSON im folgenden Format:
{{
- "summary": "Kompakte Zusammenfassung als Plaintext (fuer Anzeige in der App). Struktur: 1. Kernthema, 2. Erkenntnisse, 3. Naechste Schritte, 4. Fortschritt.",
- "emailHtml": "...
"
+ "summary": "Kompakte Plaintext-Zusammenfassung fuer die App. Struktur: Kernthema, Erkenntnisse, Naechste Schritte, Fortschritt.",
+ "email": {{
+ "headline": "Kurze, professionelle Titelzeile fuer die E-Mail",
+ "intro": "1-2 Saetze, die den Kern der Session auf den Punkt bringen",
+ "coreTopic": "Das zentrale Thema in einem praezisen Satz",
+ "insights": ["Erkenntnis 1", "Erkenntnis 2"],
+ "nextSteps": ["Naechster Schritt 1", "Naechster Schritt 2"],
+ "progress": ["Fortschritt 1", "Fortschritt 2"]
+ }}
}}
-Fuer "emailHtml": Erstelle ein professionell formatiertes HTML-Fragment (KEIN vollstaendiges HTML-Dokument, nur der Inhalt-Block).
-Verwende inline CSS fuer schoene Darstellung in E-Mail-Clients:
-- Verwende fuer Abschnitte (color: #1e40af; margin: 20px 0 8px; font-size: 16px)
-- Verwende /- fuer Stichpunkte (margin: 4px 0; line-height: 1.6)
-- Verwende fuer Hervorhebungen
-- Verwende
fuer Fliesstext (color: #374151; line-height: 1.65; font-size: 15px)
-- Verwende
als Trenner
-
-Fuer "summary": Kompakter Plaintext ohne HTML/Markdown. Abschnitte mit Zeilenumbruechen trennen.
+Regeln:
+- KEIN HTML erzeugen.
+- "summary" ist reiner Plaintext ohne Markdown.
+- "headline" kurz und professionell.
+- "intro" in natuerlichem Business-Deutsch.
+- "insights", "nextSteps" und "progress" jeweils als kurze Stichpunkte.
+- Maximal 4 Eintraege pro Liste.
+- Wenn eine Liste leer ist, gib [] zurueck.
Gespräch:
{conversation}
-Antworte auf Deutsch, sachlich und kompakt. NUR JSON, keine Erklaerungen."""
+Antworte auf Deutsch, sachlich, klar und kompakt. NUR JSON, keine Erklaerungen."""
def buildScoringPrompt(messages: List[Dict[str, Any]], contextCategory: str) -> str:
diff --git a/modules/features/commcoach/serviceCommcoachIndexer.py b/modules/features/commcoach/serviceCommcoachIndexer.py
new file mode 100644
index 00000000..b43764a1
--- /dev/null
+++ b/modules/features/commcoach/serviceCommcoachIndexer.py
@@ -0,0 +1,223 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+CommCoach Session Indexer.
+Indexes coaching session data into the knowledge store (pgvector) for RAG-based long-term memory.
+Called after session completion to ensure semantic searchability across 20+ sessions.
+"""
+
+import logging
+import uuid
+import json
+from typing import List, Dict, Any, Optional
+
+logger = logging.getLogger(__name__)
+
+_COACHING_FILE_PREFIX = "coaching-session:"
+
+
+async def indexSessionData(
+ sessionId: str,
+ contextId: str,
+ userId: str,
+ featureInstanceId: str,
+ mandateId: str,
+ messages: List[Dict[str, Any]],
+ summary: Optional[str],
+ keyTopics: Optional[str],
+ goals: Optional[List[Any]],
+ insights: Optional[List[Any]],
+ tasks: Optional[List[Dict[str, Any]]],
+ contextTitle: str = "",
+ knowledgeService=None,
+):
+ """Index a completed coaching session into the knowledge store.
+
+ Creates ContentChunks with embeddings for:
+ - Each User+Assistant message pair (maximum detail depth)
+ - Session summary
+ - Key topics (individually, for precise retrieval)
+ - Current goals
+ - New insights
+ - Tasks (open + done)
+ """
+ if not knowledgeService:
+ logger.warning("No knowledge service available for coaching indexer")
+ return
+
+ syntheticFileId = f"{_COACHING_FILE_PREFIX}{sessionId}"
+
+ chunks = []
+
+ # 1. Message pairs (User + Assistant) as individual chunks
+ messagePairs = _extractMessagePairs(messages)
+ for idx, pair in enumerate(messagePairs):
+ chunks.append({
+ "contentObjectId": f"{sessionId}:msg-pair:{idx}",
+ "data": pair["text"],
+ "contextRef": {
+ "containerPath": f"session:{sessionId}",
+ "location": f"message-pair-{idx}",
+ "type": "coaching-message-pair",
+ "contextId": contextId,
+ "sessionId": sessionId,
+ "contextTitle": contextTitle,
+ },
+ })
+
+ # 2. Session summary
+ if summary:
+ chunks.append({
+ "contentObjectId": f"{sessionId}:summary",
+ "data": f"Session-Zusammenfassung ({contextTitle}): {summary}",
+ "contextRef": {
+ "containerPath": f"session:{sessionId}",
+ "location": "summary",
+ "type": "coaching-session-summary",
+ "contextId": contextId,
+ "sessionId": sessionId,
+ "contextTitle": contextTitle,
+ },
+ })
+
+ # 3. Key topics (each as separate chunk for precise retrieval)
+ parsedTopics = _parseJsonSafe(keyTopics, [])
+ for tidx, topic in enumerate(parsedTopics):
+ topicStr = str(topic).strip()
+ if topicStr:
+ chunks.append({
+ "contentObjectId": f"{sessionId}:topic:{tidx}",
+ "data": f"Coaching-Thema ({contextTitle}): {topicStr}",
+ "contextRef": {
+ "containerPath": f"session:{sessionId}",
+ "location": f"topic-{tidx}",
+ "type": "coaching-key-topic",
+ "contextId": contextId,
+ "sessionId": sessionId,
+ "contextTitle": contextTitle,
+ },
+ })
+
+ # 4. Goals
+ if goals:
+ goalTexts = [g.get("text", g) if isinstance(g, dict) else str(g) for g in goals if g]
+ if goalTexts:
+ goalsStr = "\n".join(f"- {g}" for g in goalTexts)
+ chunks.append({
+ "contentObjectId": f"{sessionId}:goals",
+ "data": f"Coaching-Ziele ({contextTitle}):\n{goalsStr}",
+ "contextRef": {
+ "containerPath": f"session:{sessionId}",
+ "location": "goals",
+ "type": "coaching-goals",
+ "contextId": contextId,
+ "sessionId": sessionId,
+ "contextTitle": contextTitle,
+ },
+ })
+
+ # 5. Insights
+ if insights:
+ insightTexts = [i.get("text", i) if isinstance(i, dict) else str(i) for i in insights if i]
+ if insightTexts:
+ insightsStr = "\n".join(f"- {t}" for t in insightTexts)
+ chunks.append({
+ "contentObjectId": f"{sessionId}:insights",
+ "data": f"Coaching-Erkenntnisse ({contextTitle}):\n{insightsStr}",
+ "contextRef": {
+ "containerPath": f"session:{sessionId}",
+ "location": "insights",
+ "type": "coaching-insights",
+ "contextId": contextId,
+ "sessionId": sessionId,
+ "contextTitle": contextTitle,
+ },
+ })
+
+ # 6. Tasks
+ if tasks:
+ taskLines = []
+ for t in tasks:
+ status = t.get("status", "open")
+ title = t.get("title", "")
+ if title:
+ taskLines.append(f"- [{status}] {title}")
+ if taskLines:
+ tasksStr = "\n".join(taskLines)
+ chunks.append({
+ "contentObjectId": f"{sessionId}:tasks",
+ "data": f"Coaching-Aufgaben ({contextTitle}):\n{tasksStr}",
+ "contextRef": {
+ "containerPath": f"session:{sessionId}",
+ "location": "tasks",
+ "type": "coaching-tasks",
+ "contextId": contextId,
+ "sessionId": sessionId,
+ "contextTitle": contextTitle,
+ },
+ })
+
+ if not chunks:
+ logger.info(f"No chunks to index for session {sessionId}")
+ return
+
+ logger.info(f"Indexing {len(chunks)} chunks for coaching session {sessionId}")
+
+ try:
+ contentObjects = [
+ {
+ "contentObjectId": c["contentObjectId"],
+ "contentType": "text",
+ "data": c["data"],
+ "contextRef": c["contextRef"],
+ }
+ for c in chunks
+ ]
+
+ await knowledgeService.indexFile(
+ fileId=syntheticFileId,
+ fileName=f"coaching-session-{sessionId[:8]}",
+ mimeType="application/x-coaching-session",
+ userId=userId,
+ featureInstanceId=featureInstanceId,
+ mandateId=mandateId,
+ contentObjects=contentObjects,
+ )
+ logger.info(f"Successfully indexed coaching session {sessionId} ({len(chunks)} chunks)")
+ except Exception as e:
+ logger.error(f"Failed to index coaching session {sessionId}: {e}", exc_info=True)
+
+
+def _extractMessagePairs(messages: List[Dict[str, Any]]) -> List[Dict[str, str]]:
+ """Extract User+Assistant pairs from message list."""
+ pairs = []
+ i = 0
+ while i < len(messages):
+ msg = messages[i]
+ if msg.get("role") == "user":
+ userText = (msg.get("content") or "").strip()
+ assistantText = ""
+ if i + 1 < len(messages) and messages[i + 1].get("role") == "assistant":
+ assistantText = (messages[i + 1].get("content") or "").strip()
+ i += 2
+ else:
+ i += 1
+ if userText:
+ text = f"Benutzer: {userText}"
+ if assistantText:
+ text += f"\nCoach: {assistantText}"
+ pairs.append({"text": text})
+ else:
+ i += 1
+ return pairs
+
+
+def _parseJsonSafe(value, fallback):
+ if not value:
+ return fallback
+ if isinstance(value, (list, dict)):
+ return value
+ try:
+ return json.loads(value)
+ except (json.JSONDecodeError, TypeError):
+ return fallback
diff --git a/modules/features/commcoach/serviceCommcoachScheduler.py b/modules/features/commcoach/serviceCommcoachScheduler.py
index 3db548cf..dcbc1e86 100644
--- a/modules/features/commcoach/serviceCommcoachScheduler.py
+++ b/modules/features/commcoach/serviceCommcoachScheduler.py
@@ -6,11 +6,44 @@ Handles daily reminders and scheduled email summaries.
"""
import logging
+import html
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
+def _buildReminderHtmlBlock(contextTitles: List[str], streakDays: int) -> str:
+ rows = "".join(
+ ''
+ '| • | '
+ f'{html.escape(title)} | '
+ '
'
+ for title in contextTitles[:3]
+ )
+ topicsBlock = (
+ ''
+ '| '
+ ' Aktive Coaching-Themen '
+ f''
+ ' |
'
+ )
+ streakBlock = (
+ ''
+ '| '
+ ' Dein Rhythmus '
+ f'Aktueller Streak: '
+ f'{int(streakDays or 0)} Tage '
+ ' |
'
+ )
+ return topicsBlock + streakBlock
+
+
def registerScheduledJobs(eventManagement):
"""Register CommCoach scheduled jobs with the event management system."""
try:
@@ -31,6 +64,7 @@ async def _runDailyReminders():
from modules.connectors.connectorDbPostgre import DatabaseConnector
from .datamodelCommcoach import CoachingUserProfile, CoachingContextStatus
from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface
+ from modules.shared.notifyMandateAdmins import _renderHtmlEmail, _resolveMandateName
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
db = DatabaseConnector(
@@ -71,15 +105,21 @@ async def _runDailyReminders():
contextTitles = [c.get("title", "Unbenannt") for c in contexts[:3]]
contextList = ", ".join(contextTitles)
- subject = "Dein taegliches Coaching wartet"
- message = f"""
- Zeit fuer dein Coaching
- Du hast aktive Coaching-Themen: {contextList}
- Nimm dir 10 Minuten fuer eine kurze Session. Konsistenz ist der Schluessel zu Fortschritt.
- Dein aktueller Streak: {profile.get('streakDays', 0)} Tage
- """
+ subject = "Dein tägliches Coaching wartet"
+ mandateName = _resolveMandateName(profile.get("mandateId"))
+ htmlMessage = _renderHtmlEmail(
+ "Zeit für dein tägliches Coaching",
+ [
+ f"Du hast aktuell {len(contexts)} aktive Coaching-Themen.",
+ "Schon 10 Minuten reichen oft, um einen Gedanken zu klären, eine nächste Aktion festzulegen oder ein Gespräch vorzubereiten.",
+ f"Im Fokus: {contextList}",
+ ],
+ mandateName,
+ footerNote="Diese Erinnerung wurde automatisch auf Basis deiner CommCoach-Einstellungen versendet.",
+ rawHtmlBlock=_buildReminderHtmlBlock(contextTitles, int(profile.get("streakDays", 0) or 0)),
+ )
- messaging.send("email", user.email, subject, message)
+ messaging.send("email", user.email, subject, htmlMessage)
sentCount += 1
except Exception as e:
logger.warning(f"Failed to send reminder to user {profile.get('userId')}: {e}")
diff --git a/modules/interfaces/interfaceAiObjects.py b/modules/interfaces/interfaceAiObjects.py
index f0aedc87..a859ffa7 100644
--- a/modules/interfaces/interfaceAiObjects.py
+++ b/modules/interfaces/interfaceAiObjects.py
@@ -134,7 +134,7 @@ class AiObjects:
logger.info(f"Attempting AI call with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})")
if request.messages:
- response = await self._callWithMessages(model, request.messages, options, request.tools)
+ response = await self._callWithMessages(model, request.messages, options, request.tools, toolChoice=request.toolChoice)
else:
response = await self._callWithModel(model, prompt, context, options)
@@ -149,7 +149,7 @@ class AiObjects:
await asyncio.sleep(retryAfter + 0.5)
try:
if request.messages:
- response = await self._callWithMessages(model, request.messages, options, request.tools)
+ response = await self._callWithMessages(model, request.messages, options, request.tools, toolChoice=request.toolChoice)
else:
response = await self._callWithModel(model, prompt, context, options)
logger.info(f"AI call successful with {model.name} after rate-limit retry")
@@ -288,7 +288,8 @@ class AiObjects:
async def _callWithMessages(self, model: AiModel, messages: List[Dict[str, Any]],
options: AiCallOptions = None,
- tools: List[Dict[str, Any]] = None) -> AiCallResponse:
+ tools: List[Dict[str, Any]] = None,
+ toolChoice: Any = None) -> AiCallResponse:
"""Call a model with pre-built messages (agent mode). Supports tools for native function calling."""
import json as _json
@@ -302,7 +303,8 @@ class AiObjects:
messages=messages,
model=model,
options=options or {},
- tools=tools
+ tools=tools,
+ toolChoice=toolChoice,
)
modelResponse = await model.functionCall(modelCall)
@@ -379,7 +381,7 @@ class AiObjects:
for attempt, model in enumerate(failoverModelList):
try:
logger.info(f"Streaming AI call with model: {model.name} (attempt {attempt + 1})")
- async for chunk in self._callWithMessagesStream(model, request.messages, options, request.tools):
+ async for chunk in self._callWithMessagesStream(model, request.messages, options, request.tools, toolChoice=request.toolChoice):
yield chunk
return
@@ -390,7 +392,7 @@ class AiObjects:
logger.info(f"Rate limit on {model.name}, waiting {retryAfter:.1f}s before retry")
await asyncio.sleep(retryAfter + 0.5)
try:
- async for chunk in self._callWithMessagesStream(model, request.messages, options, request.tools):
+ async for chunk in self._callWithMessagesStream(model, request.messages, options, request.tools, toolChoice=request.toolChoice):
yield chunk
return
except Exception as retryErr:
@@ -421,6 +423,7 @@ class AiObjects:
async def _callWithMessagesStream(
self, model: AiModel, messages: List[Dict[str, Any]],
options: AiCallOptions = None, tools: List[Dict[str, Any]] = None,
+ toolChoice: Any = None,
) -> AsyncGenerator[Union[str, AiCallResponse], None]:
"""Stream a model call. Yields str deltas, then final AiCallResponse with billing."""
from modules.datamodels.datamodelAi import AiModelCall, AiModelResponse
@@ -429,7 +432,7 @@ class AiObjects:
startTime = time.time()
if not model.functionCallStream:
- response = await self._callWithMessages(model, messages, options, tools)
+ response = await self._callWithMessages(model, messages, options, tools, toolChoice=toolChoice)
if response.content:
yield response.content
yield response
@@ -438,6 +441,7 @@ class AiObjects:
modelCall = AiModelCall(
messages=messages, model=model,
options=options or {}, tools=tools,
+ toolChoice=toolChoice,
)
finalModelResponse = None
diff --git a/modules/routes/routeVoiceGoogle.py b/modules/routes/routeVoiceGoogle.py
index 1c796361..309e59bb 100644
--- a/modules/routes/routeVoiceGoogle.py
+++ b/modules/routes/routeVoiceGoogle.py
@@ -444,7 +444,7 @@ async def health_check(currentUser: User = Depends(getCurrentUser)):
async def get_voice_settings(currentUser: User = Depends(getCurrentUser)):
"""Get voice settings for the current user (reads from UserVoicePreferences)."""
from modules.datamodels.datamodelUam import UserVoicePreferences
- from modules.security.rootAccess import getRootInterface
+ from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
userId = str(currentUser.id)
@@ -464,7 +464,7 @@ async def save_voice_settings(
):
"""Save voice settings for the current user (writes to UserVoicePreferences)."""
from modules.datamodels.datamodelUam import UserVoicePreferences, _normalizeTtsVoiceMap
- from modules.security.rootAccess import getRootInterface
+ from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
userId = str(currentUser.id)
diff --git a/modules/serviceCenter/services/serviceAgent/agentLoop.py b/modules/serviceCenter/services/serviceAgent/agentLoop.py
index c196d237..fa76141d 100644
--- a/modules/serviceCenter/services/serviceAgent/agentLoop.py
+++ b/modules/serviceCenter/services/serviceAgent/agentLoop.py
@@ -48,6 +48,7 @@ async def runAgentLoop(
conversationHistory: List[Dict[str, Any]] = None,
persistRoundMemoryFn: Callable[..., Awaitable[None]] = None,
getExternalMemoryKeysFn: Callable[[], List[str]] = None,
+ systemPromptOverride: str = None,
) -> AsyncGenerator[AgentEvent, None]:
"""Run the agent loop. Yields AgentEvent for each step (SSE-ready).
@@ -74,16 +75,20 @@ async def runAgentLoop(
featureInstanceId=featureInstanceId
)
- tools = toolRegistry.getTools()
- toolDefinitions = toolRegistry.formatToolsForFunctionCalling()
+ activeToolSet = config.toolSet if config else None
+ tools = toolRegistry.getTools(toolSet=activeToolSet)
+ toolDefinitions = toolRegistry.formatToolsForFunctionCalling(toolSet=activeToolSet)
# Text-based tool descriptions are ONLY used as fallback when native function
# calling is unavailable. Including both creates conflicting instructions
# (text ```tool_call format vs native tool_use blocks) and can cause the model
# to respond with plain text instead of actual tool calls.
- toolsText = "" if toolDefinitions else toolRegistry.formatToolsForPrompt()
+ toolsText = "" if toolDefinitions else toolRegistry.formatToolsForPrompt(toolSet=activeToolSet)
- systemPrompt = buildSystemPrompt(tools, toolsText, userLanguage=userLanguage)
+ if systemPromptOverride:
+ systemPrompt = systemPromptOverride
+ else:
+ systemPrompt = buildSystemPrompt(tools, toolsText, userLanguage=userLanguage)
conversation = ConversationManager(systemPrompt)
if conversationHistory:
conversation.loadHistory(conversationHistory)
@@ -168,7 +173,7 @@ async def runAgentLoop(
temperature=config.temperature
),
messages=conversation.messages,
- tools=toolDefinitions
+ tools=toolDefinitions if toolDefinitions else None,
)
try:
diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
index 08950ea3..b370b827 100644
--- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
+++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
@@ -132,6 +132,8 @@ class AgentService:
additionalTools: List[Dict[str, Any]] = None,
userLanguage: str = "",
conversationHistory: List[Dict[str, Any]] = None,
+ buildRagContextFn: Callable = None,
+ systemPromptOverride: str = None,
) -> AsyncGenerator[AgentEvent, None]:
"""Run an agent with the given prompt and tools.
@@ -144,6 +146,8 @@ class AgentService:
additionalTools: Extra tool definitions to register dynamically
userLanguage: ISO 639-1 language code; falls back to user.language from profile
conversationHistory: Prior messages for follow-up context
+ buildRagContextFn: Optional custom RAG context builder (overrides default)
+ systemPromptOverride: Optional system prompt override (replaces generated prompt)
Yields:
AgentEvent for each step (SSE-ready)
@@ -163,7 +167,8 @@ class AgentService:
aiCallFn = self._createAiCallFn()
aiCallStreamFn = self._createAiCallStreamFn()
getWorkflowCostFn = self._createGetWorkflowCostFn(workflowId)
- buildRagContextFn = self._createBuildRagContextFn()
+ if buildRagContextFn is None:
+ buildRagContextFn = self._createBuildRagContextFn()
persistRoundMemoryFn = self._createPersistRoundMemoryFn(workflowId)
getExternalMemoryKeysFn = self._createGetExternalMemoryKeysFn(workflowId)
@@ -183,6 +188,7 @@ class AgentService:
conversationHistory=conversationHistory,
persistRoundMemoryFn=persistRoundMemoryFn,
getExternalMemoryKeysFn=getExternalMemoryKeysFn,
+ systemPromptOverride=systemPromptOverride,
):
if event.type == AgentEventTypeEnum.AGENT_SUMMARY:
await self._persistTrace(workflowId, event.data or {})
@@ -2610,54 +2616,54 @@ def _registerCoreTools(registry: ToolRegistry, services):
if not voiceName:
try:
from modules.datamodels.datamodelUam import UserVoicePreferences
- from modules.security.rootAccess import getRootInterface
+ from modules.interfaces.interfaceDbApp import getRootInterface
userId = context.get("userId", "")
if userId:
rootIf = getRootInterface()
prefRecords = rootIf.db.getRecordset(
UserVoicePreferences,
- recordFilter={"userId": userId, "mandateId": mandateId}
+ recordFilter={"userId": userId}
)
- if not prefRecords and mandateId:
- prefRecords = rootIf.db.getRecordset(
- UserVoicePreferences,
- recordFilter={"userId": userId}
- )
if prefRecords:
- vs = prefRecords[0] if isinstance(prefRecords[0], dict) else prefRecords[0].model_dump() if hasattr(prefRecords[0], "model_dump") else prefRecords[0]
- voiceMap = vs.get("ttsVoiceMap", {}) or {}
- if isinstance(voiceMap, dict) and voiceMap:
- selectedKey = None
- selectedVoiceEntry = None
- baseLanguage = language.split("-")[0].lower() if isinstance(language, str) and language else ""
+ allPrefs = [
+ r if isinstance(r, dict) else r.model_dump() if hasattr(r, "model_dump") else r
+ for r in prefRecords
+ ]
+ _mid = str(mandateId or "").strip()
+ scopedPref = next((p for p in allPrefs if str(p.get("mandateId") or "").strip() == _mid), None)
+ globalPref = next((p for p in allPrefs if not str(p.get("mandateId") or "").strip()), None)
- if isinstance(language, str) and language in voiceMap:
- selectedKey = language
- selectedVoiceEntry = voiceMap[language]
+ def _resolveVoiceFromMap(prefDict, lang):
+ vm = (prefDict or {}).get("ttsVoiceMap", {}) or {}
+ if not isinstance(vm, dict) or not vm:
+ return None
+ baseLang = lang.split("-")[0].lower() if isinstance(lang, str) and lang else ""
+ langNorm = str(lang or "").strip()
+ if langNorm in vm:
+ entry = vm[langNorm]
+ return entry.get("voiceName") if isinstance(entry, dict) else entry
+ if baseLang and baseLang in vm:
+ entry = vm[baseLang]
+ return entry.get("voiceName") if isinstance(entry, dict) else entry
+ if baseLang:
+ for mk, mv in vm.items():
+ mkn = str(mk).lower()
+ if mkn == baseLang or mkn.startswith(f"{baseLang}-"):
+ return mv.get("voiceName") if isinstance(mv, dict) else mv
+ return None
- if selectedVoiceEntry is None and baseLanguage and baseLanguage in voiceMap:
- selectedKey = baseLanguage
- selectedVoiceEntry = voiceMap[baseLanguage]
-
- if selectedVoiceEntry is None and baseLanguage:
- for mapKey, mapValue in voiceMap.items():
- mapKeyNorm = str(mapKey).lower()
- if mapKeyNorm == baseLanguage or mapKeyNorm.startswith(f"{baseLanguage}-"):
- selectedKey = str(mapKey)
- selectedVoiceEntry = mapValue
- break
-
- if selectedVoiceEntry is not None:
- voiceName = (
- selectedVoiceEntry.get("voiceName")
- if isinstance(selectedVoiceEntry, dict)
- else selectedVoiceEntry
- )
- logger.info(
- f"textToSpeech: using configured voice '{voiceName}' for requested language '{language}' (matched key '{selectedKey}')"
- )
- if not voiceName and vs.get("ttsVoice") and vs.get("ttsLanguage") == language:
- voiceName = vs["ttsVoice"]
+ voiceName = (
+ _resolveVoiceFromMap(scopedPref, language)
+ or _resolveVoiceFromMap(globalPref, language)
+ or _resolveVoiceFromMap(allPrefs[0], language)
+ )
+ if not voiceName:
+ for candidate in [globalPref, scopedPref, allPrefs[0]]:
+ if candidate and candidate.get("ttsVoice") and candidate.get("ttsLanguage") == language:
+ voiceName = candidate["ttsVoice"]
+ break
+ if voiceName:
+ logger.info(f"textToSpeech: using configured voice '{voiceName}' for language '{language}'")
except Exception as prefErr:
logger.debug(f"textToSpeech: could not load voice preferences: {prefErr}")
@@ -3416,3 +3422,21 @@ def _registerCoreTools(registry: ToolRegistry, services):
},
readOnly=True,
)
+
+ # Tag core-only tools so restricted toolSets (e.g. "commcoach") exclude them.
+ # Tools NOT in this set remain toolSet=None → available to ALL sets.
+ _CORE_ONLY_TOOLS = {
+ "listFiles", "listFolders", "tagFile", "moveFile", "createFolder",
+ "writeFile", "deleteFile", "renameFile", "translateText",
+ "deleteFolder", "renameFolder", "moveFolder", "copyFile", "replaceInFile",
+ "listConnections", "uploadToExternal", "sendMail", "downloadFromDataSource",
+ "browseContainer", "readContentObjects", "extractContainerItem",
+ "summarizeContent", "describeImage", "renderDocument",
+ "textToSpeech", "generateImage", "createChart",
+ "speechToText", "detectLanguage", "neutralizeData", "executeCode",
+ "listWorkflowHistory", "readWorkflowMessages",
+ }
+ for _toolName in _CORE_ONLY_TOOLS:
+ _td = registry.getTool(_toolName)
+ if _td:
+ _td.toolSet = "core"
diff --git a/modules/serviceCenter/services/serviceAgent/toolRegistry.py b/modules/serviceCenter/services/serviceAgent/toolRegistry.py
index d241bb93..b4b5cd86 100644
--- a/modules/serviceCenter/services/serviceAgent/toolRegistry.py
+++ b/modules/serviceCenter/services/serviceAgent/toolRegistry.py
@@ -125,20 +125,22 @@ class ToolRegistry:
durationMs=durationMs
)
- def formatToolsForPrompt(self) -> str:
- """Format all tools as text for system prompt (text-based fallback)."""
+ def formatToolsForPrompt(self, toolSet: str = None) -> str:
+ """Format tools as text for system prompt (text-based fallback)."""
+ tools = self.getTools(toolSet=toolSet) if toolSet else list(self._tools.values())
parts = []
- for tool in self._tools.values():
+ for tool in tools:
paramStr = ", ".join(
f"{k}: {v}" for k, v in tool.parameters.items()
) if tool.parameters else "none"
parts.append(f"- **{tool.name}**: {tool.description}\n Parameters: {{{paramStr}}}")
return "\n".join(parts)
- def formatToolsForFunctionCalling(self) -> List[Dict[str, Any]]:
- """Format all tools as OpenAI-compatible function definitions for native function calling."""
+ def formatToolsForFunctionCalling(self, toolSet: str = None) -> List[Dict[str, Any]]:
+ """Format tools as OpenAI-compatible function definitions for native function calling."""
+ tools = self.getTools(toolSet=toolSet) if toolSet else list(self._tools.values())
functions = []
- for tool in self._tools.values():
+ for tool in tools:
functions.append({
"type": "function",
"function": {