From 364e4317496b530427fe938fed234ae630edde7a Mon Sep 17 00:00:00 2001
From: patrick-motsch
Date: Sat, 7 Mar 2026 01:02:18 +0100
Subject: [PATCH] commcoach: backend abort handling + compression + document
pre-AI-call + status events
TASK 5: Backend abort handling (cancel previous processMessage tasks)
TASK 6: Compression thresholds lowered (25->15, 15->10)
TASK 7: Combine pending user messages into single prompt
TASK 8: Document handling with pre-AI-call intent detection
TASK 9: Granular status events during AI processing
Made-with: Cursor
---
.../commcoach/interfaceFeatureCommcoach.py | 4 +
.../commcoach/routeFeatureCommcoach.py | 15 +-
.../features/commcoach/serviceCommcoach.py | 157 +++++++++++++++---
.../features/commcoach/serviceCommcoachAi.py | 45 ++++-
4 files changed, 189 insertions(+), 32 deletions(-)
diff --git a/modules/features/commcoach/interfaceFeatureCommcoach.py b/modules/features/commcoach/interfaceFeatureCommcoach.py
index 9e21f677..e612c6ba 100644
--- a/modules/features/commcoach/interfaceFeatureCommcoach.py
+++ b/modules/features/commcoach/interfaceFeatureCommcoach.py
@@ -289,6 +289,10 @@ class CommcoachObjects:
data["createdAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingDocument, data)
+ def updateDocument(self, documentId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+ from .datamodelCommcoach import CoachingDocument
+ return self.db.recordModify(CoachingDocument, documentId, updates)
+
def deleteDocument(self, documentId: str) -> bool:
from .datamodelCommcoach import CoachingDocument
return self.db.recordDelete(CoachingDocument, documentId)
diff --git a/modules/features/commcoach/routeFeatureCommcoach.py b/modules/features/commcoach/routeFeatureCommcoach.py
index 1efa6556..654d6c03 100644
--- a/modules/features/commcoach/routeFeatureCommcoach.py
+++ b/modules/features/commcoach/routeFeatureCommcoach.py
@@ -34,6 +34,8 @@ from .serviceCommcoach import CommcoachService, emitSessionEvent, getSessionEven
logger = logging.getLogger(__name__)
+_activeProcessTasks: dict = {}
+
def _audit(context: RequestContext, action: str, resourceType: str = None, resourceId: str = None, details: str = ""):
"""Log an audit event for CommCoach. Non-blocking, best-effort."""
@@ -500,10 +502,19 @@ async def sendMessageStream(
contextId = session.get("contextId")
service = CommcoachService(context.user, mandateId, instanceId)
- # Process in background
- asyncio.create_task(
+ existingTask = _activeProcessTasks.get(sessionId)
+ if existingTask and not existingTask.done():
+ existingTask.cancel()
+ logger.info(f"Cancelled previous processMessage task for session {sessionId}")
+
+ def _onTaskDone(task):
+ _activeProcessTasks.pop(sessionId, None)
+
+ task = asyncio.create_task(
service.processMessage(sessionId, contextId, body.content, interface)
)
+ task.add_done_callback(_onTaskDone)
+ _activeProcessTasks[sessionId] = task
# Stream events
async def _eventGenerator():
diff --git a/modules/features/commcoach/serviceCommcoach.py b/modules/features/commcoach/serviceCommcoach.py
index d6bd1243..24c23044 100644
--- a/modules/features/commcoach/serviceCommcoach.py
+++ b/modules/features/commcoach/serviceCommcoach.py
@@ -86,6 +86,32 @@ def cleanupSessionEvents(sessionId: str):
CHUNK_WORD_SIZE = 4
CHUNK_DELAY_SECONDS = 0.05
+DOC_INTENT_MAX_DOCS = 3
+DOC_CONTENT_MAX_CHARS = 3000
+
+
+def _buildCombinedUserPrompt(messages: List[Dict[str, Any]]) -> str:
+ """Collect all user messages after the last assistant message into one combined prompt."""
+ pending = []
+ for msg in reversed(messages):
+ if msg.get("role") == "assistant":
+ break
+ if msg.get("role") == "user":
+ pending.insert(0, msg.get("content", ""))
+ return " ".join(pending).strip()
+
+
+def _stripPendingUserMessages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """Return messages up to and including the last assistant message (remove trailing user-only tail)."""
+ lastAssistantIdx = -1
+ for i in range(len(messages) - 1, -1, -1):
+ if messages[i].get("role") == "assistant":
+ lastAssistantIdx = i
+ break
+ if lastAssistantIdx < 0:
+ return []
+ return messages[:lastAssistantIdx + 1]
+
def _parseAiJsonResponse(rawText: str) -> Dict[str, Any]:
"""Parse the structured JSON response from AI. Strips optional markdown code fences."""
@@ -131,12 +157,13 @@ async def _generateAndEmitTts(sessionId: str, speechText: str, currentUser, mand
logger.warning(f"TTS failed for session {sessionId}: {e}")
-async def _saveGeneratedDocument(doc: Dict[str, Any], contextId: str, userId: str,
- mandateId: str, instanceId: str, interface, sessionId: str,
- user=None):
- """Save a document generated by AI. Stores file in Management DB."""
+async def _saveOrUpdateDocument(doc: Dict[str, Any], contextId: str, userId: str,
+ mandateId: str, instanceId: str, interface, sessionId: str,
+ user=None):
+ """Save a new document or update an existing one. Stores file in Management DB."""
from .datamodelCommcoach import CoachingDocument
try:
+ docId = doc.get("id")
title = doc.get("title", "Dokument")
content = doc.get("content", "")
contentBytes = content.encode("utf-8")
@@ -152,24 +179,71 @@ async def _saveGeneratedDocument(doc: Dict[str, Any], contextId: str, userId: st
mgmtInterface.createFileData(fileItem.id, contentBytes)
fileRef = fileItem.id
except Exception as e:
- logger.warning(f"Failed to store generated document in file DB: {e}")
+ logger.warning(f"Failed to store document in file DB: {e}")
- docData = CoachingDocument(
- contextId=contextId,
- userId=userId,
- mandateId=mandateId,
- instanceId=instanceId,
- fileName=fileName,
- mimeType="text/markdown",
- fileSize=len(contentBytes),
- extractedText=content,
- summary=title,
- fileRef=fileRef,
- ).model_dump()
- created = interface.createDocument(docData)
- await emitSessionEvent(sessionId, "documentCreated", created)
+ if docId:
+ updates = {
+ "fileName": fileName,
+ "extractedText": content,
+ "summary": title,
+ "fileSize": len(contentBytes),
+ }
+ if fileRef:
+ updates["fileRef"] = fileRef
+ updated = interface.updateDocument(docId, updates)
+ if updated:
+ await emitSessionEvent(sessionId, "documentUpdated", updated)
+ logger.info(f"Document updated: {docId} ({title})")
+ else:
+ logger.warning(f"Document update failed, id not found: {docId}")
+ else:
+ docData = CoachingDocument(
+ contextId=contextId,
+ userId=userId,
+ mandateId=mandateId,
+ instanceId=instanceId,
+ fileName=fileName,
+ mimeType="text/markdown",
+ fileSize=len(contentBytes),
+ extractedText=content,
+ summary=title,
+ fileRef=fileRef,
+ ).model_dump()
+ created = interface.createDocument(docData)
+ await emitSessionEvent(sessionId, "documentCreated", created)
except Exception as e:
- logger.warning(f"Failed to save generated document: {e}")
+ logger.warning(f"Failed to save/update document: {e}")
+
+
+async def _resolveDocumentIntent(combinedUserPrompt: str, docs: List[Dict[str, Any]], callAiFn) -> Dict[str, Any]:
+ """Pre-AI-call: identify which documents the user references and what action is needed."""
+ if not docs:
+ return {"read": [], "update": [], "create": [], "noDocumentAction": True}
+ from . import serviceCommcoachAi as aiPrompts
+ docCatalog = [{"id": d.get("id", ""), "title": d.get("summary") or d.get("fileName", ""), "summary": (d.get("summary") or "")[:100]} for d in docs]
+ prompt = aiPrompts.buildDocumentIntentPrompt(combinedUserPrompt, docCatalog)
+ try:
+ response = await callAiFn("Du analysierst Dokumentreferenzen in Benutzeranfragen. Antworte NUR als JSON.", prompt)
+ if response and response.errorCount == 0 and response.content:
+ parsed = aiPrompts._parseAiJsonSafe(response.content.strip(), {"read": [], "update": [], "create": [], "noDocumentAction": True})
+ return parsed
+ except Exception as e:
+ logger.warning(f"Document intent detection failed: {e}")
+ return {"read": [], "update": [], "create": [], "noDocumentAction": True}
+
+
+def _loadDocumentContents(docIds: List[str], interface) -> List[Dict[str, Any]]:
+ """Load full extractedText for the given document IDs."""
+ results = []
+ for docId in docIds[:DOC_INTENT_MAX_DOCS]:
+ doc = interface.getDocument(docId)
+ if doc and doc.get("extractedText"):
+ results.append({
+ "id": doc.get("id", ""),
+ "title": doc.get("summary") or doc.get("fileName", ""),
+ "content": doc.get("extractedText", "")[:DOC_CONTENT_MAX_CHARS],
+ })
+ return results
async def _emitChunkedResponse(sessionId: str, createdMsg: Dict[str, Any], fullText: str):
@@ -298,18 +372,39 @@ class CommcoachService:
logger.warning(f"History compression failed for session {sessionId}: {e}")
previousMessages = messages[-20:]
+ # Combine all pending user messages (after last assistant message) as the user prompt
+ combinedUserPrompt = _buildCombinedUserPrompt(previousMessages)
+ if not combinedUserPrompt:
+ combinedUserPrompt = userContent
+
+ # Strip pending user messages from previousMessages to avoid redundancy in system prompt
+ contextMessages = _stripPendingUserMessages(previousMessages)
+
tasks = interface.getTasks(contextId, self.userId)
+ await emitSessionEvent(sessionId, "status", {"label": "Kontext wird geladen..."})
+
retrievalResult = await self._buildRetrievalContext(
- contextId, sessionId, userContent, context, interface
+ contextId, sessionId, combinedUserPrompt, context, interface
)
persona = _resolvePersona(session, interface)
documentSummaries = _getDocumentSummaries(contextId, self.userId, interface)
+ # Document intent detection (pre-AI-call)
+ referencedDocumentContents = None
+ allDocs = interface.getDocuments(contextId, self.userId) if documentSummaries else []
+ if allDocs:
+ await emitSessionEvent(sessionId, "status", {"label": "Dokumente werden geprueft..."})
+ docIntent = await _resolveDocumentIntent(combinedUserPrompt, allDocs, self._callAi)
+ if not docIntent.get("noDocumentAction"):
+ docIdsToLoad = list(set((docIntent.get("read") or []) + (docIntent.get("update") or [])))
+ if docIdsToLoad:
+ referencedDocumentContents = _loadDocumentContents(docIdsToLoad, interface)
+
systemPrompt = aiPrompts.buildCoachingSystemPrompt(
context,
- previousMessages,
+ contextMessages,
tasks,
previousSessionSummaries=retrievalResult.get("previousSessionSummaries"),
earlierSummary=earlierSummary,
@@ -318,16 +413,20 @@ class CommcoachService:
retrievedByTopic=retrievalResult.get("retrievedByTopic"),
persona=persona,
documentSummaries=documentSummaries,
+ referencedDocumentContents=referencedDocumentContents,
)
if retrievalResult.get("intent") == RetrievalIntent.SUMMARIZE_ALL:
systemPrompt += "\n\nWICHTIG: Der Benutzer möchte eine Gesamtzusammenfassung. Erstelle eine umfassende Zusammenfassung aller genannten Sessions und der aktuellen Session."
# Call AI
- await emitSessionEvent(sessionId, "status", {"label": "Coach denkt nach..."})
+ await emitSessionEvent(sessionId, "status", {"label": "Coach formuliert Antwort..."})
try:
- aiResponse = await self._callAi(systemPrompt, userContent)
+ aiResponse = await self._callAi(systemPrompt, combinedUserPrompt)
+ except asyncio.CancelledError:
+ logger.info(f"processMessage cancelled for session {sessionId} (new message arrived)")
+ return createdUserMsg
except Exception as e:
logger.error(f"AI call failed for session {sessionId}: {e}")
await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"})
@@ -344,8 +443,12 @@ class CommcoachService:
speechContent = parsed.get("speech", "")
documents = parsed.get("documents", [])
+ if asyncio.current_task() and asyncio.current_task().cancelled():
+ logger.info(f"processMessage cancelled before storing response for session {sessionId}")
+ return createdUserMsg
+
for doc in documents:
- await _saveGeneratedDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser)
+ await _saveOrUpdateDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser)
assistantMsg = CoachingMessage(
sessionId=sessionId,
@@ -360,6 +463,8 @@ class CommcoachService:
messages = interface.getMessages(sessionId)
interface.updateSession(sessionId, {"messageCount": len(messages)})
+ await emitSessionEvent(sessionId, "status", {"label": "Antwort wird verarbeitet..."})
+
ttsTask = asyncio.create_task(
_generateAndEmitTts(sessionId, speechContent, self.currentUser, self.mandateId, self.instanceId, interface)
)
@@ -436,7 +541,7 @@ class CommcoachService:
documents = parsed.get("documents", [])
for doc in documents:
- await _saveGeneratedDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser)
+ await _saveOrUpdateDocument(doc, contextId, self.userId, self.mandateId, self.instanceId, interface, sessionId, user=self.currentUser)
assistantMsg = CoachingMessage(
sessionId=sessionId,
diff --git a/modules/features/commcoach/serviceCommcoachAi.py b/modules/features/commcoach/serviceCommcoachAi.py
index 7b67406f..11844c28 100644
--- a/modules/features/commcoach/serviceCommcoachAi.py
+++ b/modules/features/commcoach/serviceCommcoachAi.py
@@ -11,9 +11,9 @@ from typing import Optional, Dict, Any, List, Tuple
logger = logging.getLogger(__name__)
-# Compression thresholds (Teamsbot-style)
-COMPRESSION_MESSAGE_THRESHOLD = 25
-COMPRESSION_RECENT_COUNT = 15
+# Compression thresholds — lowered for voice fragment pattern (multiple user msgs per turn)
+COMPRESSION_MESSAGE_THRESHOLD = 15
+COMPRESSION_RECENT_COUNT = 10
COMPRESSION_MAX_MESSAGES_FETCH = 80
@@ -95,6 +95,7 @@ def buildCoachingSystemPrompt(
retrievedByTopic: Optional[List[Dict[str, Any]]] = None,
persona: Optional[Dict[str, Any]] = None,
documentSummaries: Optional[List[str]] = None,
+ referencedDocumentContents: Optional[List[Dict[str, Any]]] = None,
) -> str:
"""Build the system prompt for a coaching session, including context history, tasks, and session continuity."""
contextTitle = context.get("title", "General Coaching")
@@ -238,6 +239,16 @@ WICHTIG: Antworte NUR mit dem JSON-Objekt. Kein Text vor oder nach dem JSON."""
for docSummary in documentSummaries[:5]:
prompt += f"\n- {docSummary[:300]}"
+ if referencedDocumentContents:
+ prompt += "\n\nReferenzierte Dokumente (vollstaendiger Inhalt):"
+ for doc in referencedDocumentContents[:3]:
+ prompt += f"\n\n=== {doc.get('title', 'Dokument')} (id: {doc.get('id', '')}) ===\n{doc.get('content', '')[:3000]}"
+ prompt += """
+
+Du kannst bestehende Dokumente aendern oder neue erstellen.
+Fuer UPDATE eines bestehenden Dokuments: {"id": "", "title": "...", "content": "...neuer vollstaendiger Inhalt..."}
+Fuer ein NEUES Dokument: {"title": "...", "content": "...Inhalt..."}"""
+
if previousMessages:
prompt += "\n\nVorige Nachrichten dieser Session (Kontext):"
for msg in previousMessages[-12:]:
@@ -418,7 +429,7 @@ def parseJsonResponse(responseText: str, fallback: Any = None) -> Any:
text = responseText.strip()
if text.startswith("```"):
lines = text.split("\n")
- lines = lines[1:] # remove opening ```json
+ lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
text = "\n".join(lines)
@@ -429,6 +440,8 @@ def parseJsonResponse(responseText: str, fallback: Any = None) -> Any:
logger.warning(f"Failed to parse AI JSON response: {text[:200]}")
return fallback
+_parseAiJsonSafe = parseJsonResponse
+
def _parseJsonField(value: Optional[str], fallback: Any = None) -> Any:
if not value:
@@ -439,3 +452,27 @@ def _parseJsonField(value: Optional[str], fallback: Any = None) -> Any:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return fallback
+
+
+def buildDocumentIntentPrompt(userInput: str, docCatalog: List[Dict[str, Any]]) -> str:
+ """Build a lightweight prompt for the pre-AI-call that identifies referenced documents."""
+ catalogStr = ""
+ for doc in docCatalog:
+ catalogStr += f'\n- id: {doc.get("id", "")}, title: "{doc.get("title", "")}", summary: "{doc.get("summary", "")[:100]}"'
+
+ return f"""Analysiere den User-Input und die Dokumentliste.
+Welche Dokumente referenziert der User? Was soll damit passieren?
+
+User-Input: "{userInput}"
+
+Verfuegbare Dokumente:{catalogStr}
+
+Antworte NUR als JSON:
+{{"read": ["doc-id-1"], "update": ["doc-id-2"], "create": ["Titel fuer neues Dokument"], "noDocumentAction": true/false}}
+
+- "read": IDs von Dokumenten deren Inhalt der User lesen/besprechen will
+- "update": IDs von Dokumenten die geaendert/angepasst werden sollen
+- "create": Titel fuer neue Dokumente die erstellt werden sollen
+- "noDocumentAction": true wenn kein Dokument-Bezug erkannt wurde
+
+Wenn kein Dokument-Bezug: {{"read": [], "update": [], "create": [], "noDocumentAction": true}}"""