diff --git a/modules/features/commcoach/interfaceFeatureCommcoach.py b/modules/features/commcoach/interfaceFeatureCommcoach.py index 9e21f677..e612c6ba 100644 --- a/modules/features/commcoach/interfaceFeatureCommcoach.py +++ b/modules/features/commcoach/interfaceFeatureCommcoach.py @@ -289,6 +289,10 @@ class CommcoachObjects: data["createdAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingDocument, data) + def updateDocument(self, documentId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + from .datamodelCommcoach import CoachingDocument + return self.db.recordModify(CoachingDocument, documentId, updates) + def deleteDocument(self, documentId: str) -> bool: from .datamodelCommcoach import CoachingDocument return self.db.recordDelete(CoachingDocument, documentId) diff --git a/modules/features/commcoach/routeFeatureCommcoach.py b/modules/features/commcoach/routeFeatureCommcoach.py index 1efa6556..654d6c03 100644 --- a/modules/features/commcoach/routeFeatureCommcoach.py +++ b/modules/features/commcoach/routeFeatureCommcoach.py @@ -34,6 +34,8 @@ from .serviceCommcoach import CommcoachService, emitSessionEvent, getSessionEven logger = logging.getLogger(__name__) +_activeProcessTasks: dict = {} + def _audit(context: RequestContext, action: str, resourceType: str = None, resourceId: str = None, details: str = ""): """Log an audit event for CommCoach. Non-blocking, best-effort.""" @@ -500,10 +502,19 @@ async def sendMessageStream( contextId = session.get("contextId") service = CommcoachService(context.user, mandateId, instanceId) - # Process in background - asyncio.create_task( + existingTask = _activeProcessTasks.get(sessionId) + if existingTask and not existingTask.done(): + existingTask.cancel() + logger.info(f"Cancelled previous processMessage task for session {sessionId}") + + def _onTaskDone(task): + _activeProcessTasks.pop(sessionId, None) + + task = asyncio.create_task( service.processMessage(sessionId, contextId, body.content, interface) ) + task.add_done_callback(_onTaskDone) + _activeProcessTasks[sessionId] = task # Stream events async def _eventGenerator(): diff --git a/modules/features/commcoach/serviceCommcoach.py b/modules/features/commcoach/serviceCommcoach.py index d6bd1243..24c23044 100644 --- a/modules/features/commcoach/serviceCommcoach.py +++ b/modules/features/commcoach/serviceCommcoach.py @@ -86,6 +86,32 @@ def cleanupSessionEvents(sessionId: str): CHUNK_WORD_SIZE = 4 CHUNK_DELAY_SECONDS = 0.05 +DOC_INTENT_MAX_DOCS = 3 +DOC_CONTENT_MAX_CHARS = 3000 + + +def _buildCombinedUserPrompt(messages: List[Dict[str, Any]]) -> str: + """Collect all user messages after the last assistant message into one combined prompt.""" + pending = [] + for msg in reversed(messages): + if msg.get("role") == "assistant": + break + if msg.get("role") == "user": + pending.insert(0, msg.get("content", "")) + return " ".join(pending).strip() + + +def _stripPendingUserMessages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Return messages up to and including the last assistant message (remove trailing user-only tail).""" + lastAssistantIdx = -1 + for i in range(len(messages) - 1, -1, -1): + if messages[i].get("role") == "assistant": + lastAssistantIdx = i + break + if lastAssistantIdx < 0: + return [] + return messages[:lastAssistantIdx + 1] + def _parseAiJsonResponse(rawText: str) -> Dict[str, Any]: """Parse the structured JSON response from AI. Strips optional markdown code fences.""" @@ -131,12 +157,13 @@ async def _generateAndEmitTts(sessionId: str, speechText: str, currentUser, mand logger.warning(f"TTS failed for session {sessionId}: {e}") -async def _saveGeneratedDocument(doc: Dict[str, Any], contextId: str, userId: str, - mandateId: str, instanceId: str, interface, sessionId: str, - user=None): - """Save a document generated by AI. Stores file in Management DB.""" +async def _saveOrUpdateDocument(doc: Dict[str, Any], contextId: str, userId: str, + mandateId: str, instanceId: str, interface, sessionId: str, + user=None): + """Save a new document or update an existing one. Stores file in Management DB.""" from .datamodelCommcoach import CoachingDocument try: + docId = doc.get("id") title = doc.get("title", "Dokument") content = doc.get("content", "") contentBytes = content.encode("utf-8") @@ -152,24 +179,71 @@ async def _saveGeneratedDocument(doc: Dict[str, Any], contextId: str, userId: st mgmtInterface.createFileData(fileItem.id, contentBytes) fileRef = fileItem.id except Exception as e: - logger.warning(f"Failed to store generated document in file DB: {e}") + logger.warning(f"Failed to store document in file DB: {e}") - docData = CoachingDocument( - contextId=contextId, - userId=userId, - mandateId=mandateId, - instanceId=instanceId, - fileName=fileName, - mimeType="text/markdown", - fileSize=len(contentBytes), - extractedText=content, - summary=title, - fileRef=fileRef, - ).model_dump() - created = interface.createDocument(docData) - await emitSessionEvent(sessionId, "documentCreated", created) + if docId: + updates = { + "fileName": fileName, + "extractedText": content, + "summary": title, + "fileSize": len(contentBytes), + } + if fileRef: + updates["fileRef"] = fileRef + updated = interface.updateDocument(docId, updates) + if updated: + await emitSessionEvent(sessionId, "documentUpdated", updated) + logger.info(f"Document updated: {docId} ({title})") + else: + logger.warning(f"Document update failed, id not found: {docId}") + else: + docData = CoachingDocument( + contextId=contextId, + userId=userId, + mandateId=mandateId, + instanceId=instanceId, + fileName=fileName, + mimeType="text/markdown", + fileSize=len(contentBytes), + extractedText=content, + summary=title, + fileRef=fileRef, + ).model_dump() + created = interface.createDocument(docData) + await emitSessionEvent(sessionId, "documentCreated", created) except Exception as e: - logger.warning(f"Failed to save generated document: {e}") + logger.warning(f"Failed to save/update document: {e}") + + +async def _resolveDocumentIntent(combinedUserPrompt: str, docs: List[Dict[str, Any]], callAiFn) -> Dict[str, Any]: + """Pre-AI-call: identify which documents the user references and what action is needed.""" + if not docs: + return {"read": [], "update": [], "create": [], "noDocumentAction": True} + from . import serviceCommcoachAi as aiPrompts + docCatalog = [{"id": d.get("id", ""), "title": d.get("summary") or d.get("fileName", ""), "summary": (d.get("summary") or "")[:100]} for d in docs] + prompt = aiPrompts.buildDocumentIntentPrompt(combinedUserPrompt, docCatalog) + try: + response = await callAiFn("Du analysierst Dokumentreferenzen in Benutzeranfragen. Antworte NUR als JSON.", prompt) + if response and response.errorCount == 0 and response.content: + parsed = aiPrompts._parseAiJsonSafe(response.content.strip(), {"read": [], "update": [], "create": [], "noDocumentAction": True}) + return parsed + except Exception as e: + logger.warning(f"Document intent detection failed: {e}") + return {"read": [], "update": [], "create": [], "noDocumentAction": True} + + +def _loadDocumentContents(docIds: List[str], interface) -> List[Dict[str, Any]]: + """Load full extractedText for the given document IDs.""" + results = [] + for docId in docIds[:DOC_INTENT_MAX_DOCS]: + doc = interface.getDocument(docId) + if doc and doc.get("extractedText"): + results.append({ + "id": doc.get("id", ""), + "title": doc.get("summary") or doc.get("fileName", ""), + "content": doc.get("extractedText", "")[:DOC_CONTENT_MAX_CHARS], + }) + return results async def _emitChunkedResponse(sessionId: str, createdMsg: Dict[str, Any], fullText: str): @@ -298,18 +372,39 @@ class CommcoachService: logger.warning(f"History compression failed for session {sessionId}: {e}") previousMessages = messages[-20:] + # Combine all pending user messages (after last assistant message) as the user prompt + combinedUserPrompt = _buildCombinedUserPrompt(previousMessages) + if not combinedUserPrompt: + combinedUserPrompt = userContent + + # Strip pending user messages from previousMessages to avoid redundancy in system prompt + contextMessages = _stripPendingUserMessages(previousMessages) + tasks = interface.getTasks(contextId, self.userId) + await emitSessionEvent(sessionId, "status", {"label": "Kontext wird geladen..."}) + retrievalResult = await self._buildRetrievalContext( - contextId, sessionId, userContent, context, interface + contextId, sessionId, combinedUserPrompt, context, interface ) persona = _resolvePersona(session, interface) documentSummaries = _getDocumentSummaries(contextId, self.userId, interface) + # Document intent detection (pre-AI-call) + referencedDocumentContents = None + allDocs = interface.getDocuments(contextId, self.userId) if documentSummaries else [] + if allDocs: + await emitSessionEvent(sessionId, "status", {"label": "Dokumente werden geprueft..."}) + docIntent = await _resolveDocumentIntent(combinedUserPrompt, allDocs, self._callAi) + if not docIntent.get("noDocumentAction"): + docIdsToLoad = list(set((docIntent.get("read") or []) + (docIntent.get("update") or []))) + if docIdsToLoad: + referencedDocumentContents = _loadDocumentContents(docIdsToLoad, interface) + systemPrompt = aiPrompts.buildCoachingSystemPrompt( context, - previousMessages, + contextMessages, tasks, previousSessionSummaries=retrievalResult.get("previousSessionSummaries"), earlierSummary=earlierSummary, @@ -318,16 +413,20 @@ class CommcoachService: retrievedByTopic=retrievalResult.get("retrievedByTopic"), persona=persona, documentSummaries=documentSummaries, + referencedDocumentContents=referencedDocumentContents, ) if retrievalResult.get("intent") == RetrievalIntent.SUMMARIZE_ALL: systemPrompt += "\n\nWICHTIG: Der Benutzer möchte eine Gesamtzusammenfassung. Erstelle eine umfassende Zusammenfassung aller genannten Sessions und der aktuellen Session." # Call AI - await emitSessionEvent(sessionId, "status", {"label": "Coach denkt nach..."}) + await emitSessionEvent(sessionId, "status", {"label": "Coach formuliert Antwort..."}) try: - aiResponse = await self._callAi(systemPrompt, userContent) + aiResponse = await self._callAi(systemPrompt, combinedUserPrompt) + except asyncio.CancelledError: + logger.info(f"processMessage cancelled for session {sessionId} (new message arrived)") + return createdUserMsg except Exception as e: logger.error(f"AI call failed for session {sessionId}: {e}") await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"}) @@ -344,8 +443,12 @@ class CommcoachService: speechContent = parsed.get("speech", "") documents = parsed.get("documents", []) + if asyncio.current_task() and asyncio.current_task().cancelled(): + logger.info(f"processMessage cancelled before storing response for session {sessionId}") + return createdUserMsg + for doc in documents: - await _saveGeneratedDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser) + await _saveOrUpdateDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser) assistantMsg = CoachingMessage( sessionId=sessionId, @@ -360,6 +463,8 @@ class CommcoachService: messages = interface.getMessages(sessionId) interface.updateSession(sessionId, {"messageCount": len(messages)}) + await emitSessionEvent(sessionId, "status", {"label": "Antwort wird verarbeitet..."}) + ttsTask = asyncio.create_task( _generateAndEmitTts(sessionId, speechContent, self.currentUser, self.mandateId, self.instanceId, interface) ) @@ -436,7 +541,7 @@ class CommcoachService: documents = parsed.get("documents", []) for doc in documents: - await _saveGeneratedDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser) + await _saveOrUpdateDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser) assistantMsg = CoachingMessage( sessionId=sessionId, diff --git a/modules/features/commcoach/serviceCommcoachAi.py b/modules/features/commcoach/serviceCommcoachAi.py index 7b67406f..11844c28 100644 --- a/modules/features/commcoach/serviceCommcoachAi.py +++ b/modules/features/commcoach/serviceCommcoachAi.py @@ -11,9 +11,9 @@ from typing import Optional, Dict, Any, List, Tuple logger = logging.getLogger(__name__) -# Compression thresholds (Teamsbot-style) -COMPRESSION_MESSAGE_THRESHOLD = 25 -COMPRESSION_RECENT_COUNT = 15 +# Compression thresholds — lowered for voice fragment pattern (multiple user msgs per turn) +COMPRESSION_MESSAGE_THRESHOLD = 15 +COMPRESSION_RECENT_COUNT = 10 COMPRESSION_MAX_MESSAGES_FETCH = 80 @@ -95,6 +95,7 @@ def buildCoachingSystemPrompt( retrievedByTopic: Optional[List[Dict[str, Any]]] = None, persona: Optional[Dict[str, Any]] = None, documentSummaries: Optional[List[str]] = None, + referencedDocumentContents: Optional[List[Dict[str, Any]]] = None, ) -> str: """Build the system prompt for a coaching session, including context history, tasks, and session continuity.""" contextTitle = context.get("title", "General Coaching") @@ -238,6 +239,16 @@ WICHTIG: Antworte NUR mit dem JSON-Objekt. Kein Text vor oder nach dem JSON.""" for docSummary in documentSummaries[:5]: prompt += f"\n- {docSummary[:300]}" + if referencedDocumentContents: + prompt += "\n\nReferenzierte Dokumente (vollstaendiger Inhalt):" + for doc in referencedDocumentContents[:3]: + prompt += f"\n\n=== {doc.get('title', 'Dokument')} (id: {doc.get('id', '')}) ===\n{doc.get('content', '')[:3000]}" + prompt += """ + +Du kannst bestehende Dokumente aendern oder neue erstellen. +Fuer UPDATE eines bestehenden Dokuments: {"id": "", "title": "...", "content": "...neuer vollstaendiger Inhalt..."} +Fuer ein NEUES Dokument: {"title": "...", "content": "...Inhalt..."}""" + if previousMessages: prompt += "\n\nVorige Nachrichten dieser Session (Kontext):" for msg in previousMessages[-12:]: @@ -418,7 +429,7 @@ def parseJsonResponse(responseText: str, fallback: Any = None) -> Any: text = responseText.strip() if text.startswith("```"): lines = text.split("\n") - lines = lines[1:] # remove opening ```json + lines = lines[1:] if lines and lines[-1].strip() == "```": lines = lines[:-1] text = "\n".join(lines) @@ -429,6 +440,8 @@ def parseJsonResponse(responseText: str, fallback: Any = None) -> Any: logger.warning(f"Failed to parse AI JSON response: {text[:200]}") return fallback +_parseAiJsonSafe = parseJsonResponse + def _parseJsonField(value: Optional[str], fallback: Any = None) -> Any: if not value: @@ -439,3 +452,27 @@ def _parseJsonField(value: Optional[str], fallback: Any = None) -> Any: return json.loads(value) except (json.JSONDecodeError, TypeError): return fallback + + +def buildDocumentIntentPrompt(userInput: str, docCatalog: List[Dict[str, Any]]) -> str: + """Build a lightweight prompt for the pre-AI-call that identifies referenced documents.""" + catalogStr = "" + for doc in docCatalog: + catalogStr += f'\n- id: {doc.get("id", "")}, title: "{doc.get("title", "")}", summary: "{doc.get("summary", "")[:100]}"' + + return f"""Analysiere den User-Input und die Dokumentliste. +Welche Dokumente referenziert der User? Was soll damit passieren? + +User-Input: "{userInput}" + +Verfuegbare Dokumente:{catalogStr} + +Antworte NUR als JSON: +{{"read": ["doc-id-1"], "update": ["doc-id-2"], "create": ["Titel fuer neues Dokument"], "noDocumentAction": true/false}} + +- "read": IDs von Dokumenten deren Inhalt der User lesen/besprechen will +- "update": IDs von Dokumenten die geaendert/angepasst werden sollen +- "create": Titel fuer neue Dokumente die erstellt werden sollen +- "noDocumentAction": true wenn kein Dokument-Bezug erkannt wurde + +Wenn kein Dokument-Bezug: {{"read": [], "update": [], "create": [], "noDocumentAction": true}}"""