From f5fd1d24065b0b2c80633222502297e67957a4a8 Mon Sep 17 00:00:00 2001 From: patrick-motsch Date: Mon, 2 Mar 2026 00:51:27 +0100 Subject: [PATCH] com feature mvp --- modules/connectors/connectorVoiceGoogle.py | 51 +- modules/features/commcoach/CONCEPT.md | 178 ++++ modules/features/commcoach/__init__.py | 1 + .../features/commcoach/datamodelCommcoach.py | 247 +++++ .../commcoach/interfaceFeatureCommcoach.py | 315 +++++++ modules/features/commcoach/mainCommcoach.py | 281 ++++++ .../commcoach/routeFeatureCommcoach.py | 868 ++++++++++++++++++ .../features/commcoach/serviceCommcoach.py | 746 +++++++++++++++ .../features/commcoach/serviceCommcoachAi.py | 363 ++++++++ .../serviceCommcoachContextRetrieval.py | 223 +++++ .../commcoach/serviceCommcoachScheduler.py | 91 ++ modules/features/commcoach/tests/__init__.py | 0 .../commcoach/tests/test_contextRetrieval.py | 103 +++ .../commcoach/tests/test_datamodel.py | 168 ++++ .../commcoach/tests/test_mainCommcoach.py | 105 +++ .../commcoach/tests/test_serviceAi.py | 193 ++++ modules/routes/routeSystem.py | 3 + 17 files changed, 3914 insertions(+), 22 deletions(-) create mode 100644 modules/features/commcoach/CONCEPT.md create mode 100644 modules/features/commcoach/__init__.py create mode 100644 modules/features/commcoach/datamodelCommcoach.py create mode 100644 modules/features/commcoach/interfaceFeatureCommcoach.py create mode 100644 modules/features/commcoach/mainCommcoach.py create mode 100644 modules/features/commcoach/routeFeatureCommcoach.py create mode 100644 modules/features/commcoach/serviceCommcoach.py create mode 100644 modules/features/commcoach/serviceCommcoachAi.py create mode 100644 modules/features/commcoach/serviceCommcoachContextRetrieval.py create mode 100644 modules/features/commcoach/serviceCommcoachScheduler.py create mode 100644 modules/features/commcoach/tests/__init__.py create mode 100644 modules/features/commcoach/tests/test_contextRetrieval.py create mode 100644 modules/features/commcoach/tests/test_datamodel.py create mode 100644 modules/features/commcoach/tests/test_mainCommcoach.py create mode 100644 modules/features/commcoach/tests/test_serviceAi.py diff --git a/modules/connectors/connectorVoiceGoogle.py b/modules/connectors/connectorVoiceGoogle.py index 9fad87b9..31f5f728 100644 --- a/modules/connectors/connectorVoiceGoogle.py +++ b/modules/connectors/connectorVoiceGoogle.py @@ -108,15 +108,10 @@ class ConnectorGoogleSpeech: # Determine encoding based on detected format # Google Cloud Speech API has specific requirements for different formats if audioFormat == "webm_opus": - # For WEBM OPUS, we need to ensure proper format encoding = speech.RecognitionConfig.AudioEncoding.WEBM_OPUS - # WEBM_OPUS requires specific sample rate handling - must match header - if sampleRate != 48000: - logger.warning(f"WEBM_OPUS detected but sample rate is {sampleRate}, adjusting to 48000") + if sampleRate not in [8000, 12000, 16000, 24000, 48000]: sampleRate = 48000 - # For WEBM_OPUS, don't specify sample_rate_hertz in config - # Google Cloud will read it from the WEBM header - useSampleRate = False + useSampleRate = True elif audioFormat == "linear16": # For LINEAR16 format (PCM) encoding = speech.RecognitionConfig.AudioEncoding.LINEAR16 @@ -190,7 +185,8 @@ class ConnectorGoogleSpeech: except Exception as apiError: logger.error(f"Google Cloud API error: {apiError}") - # Try with different encoding as fallback + if skipFallbacks: + raise if encoding != speech.RecognitionConfig.AudioEncoding.LINEAR16: logger.info("Trying fallback with LINEAR16 encoding...") fallbackConfig = speech.RecognitionConfig( @@ -201,7 +197,6 @@ class ConnectorGoogleSpeech: enable_automatic_punctuation=True, model="latest_long" ) - try: response = await asyncio.to_thread( self.speech_client.recognize, config=fallbackConfig, audio=audio @@ -213,14 +208,20 @@ class ConnectorGoogleSpeech: else: raise apiError - # Process results + # Process results - use longest transcript (complete utterance, avoids partials) if response.results: - result = response.results[0] - if result.alternatives: - alternative = result.alternatives[0] - transcribed_text = alternative.transcript - confidence = alternative.confidence - + bestText = "" + bestConfidence = 0.0 + for result in response.results: + if result.alternatives: + for alt in result.alternatives: + t = alt.transcript.strip() + if len(t) > len(bestText): + bestText = t + bestConfidence = alt.confidence + if bestText: + transcribed_text = bestText + confidence = bestConfidence logger.info(f"Transcription successful: '{transcribed_text}' (confidence: {confidence:.2f})") return { @@ -355,12 +356,18 @@ class ConnectorGoogleSpeech: ) if fallback_response.results: - result = fallback_response.results[0] - if result.alternatives: - alternative = result.alternatives[0] - transcribed_text = alternative.transcript - confidence = alternative.confidence - + bestText = "" + bestConfidence = 0.0 + for result in fallback_response.results: + if result.alternatives: + for alt in result.alternatives: + t = alt.transcript.strip() + if len(t) > len(bestText): + bestText = t + bestConfidence = alt.confidence + if bestText: + transcribed_text = bestText + confidence = bestConfidence logger.info(f"Fallback transcription successful: '{transcribed_text}' (confidence: {confidence:.2f})") return { diff --git a/modules/features/commcoach/CONCEPT.md b/modules/features/commcoach/CONCEPT.md new file mode 100644 index 00000000..5444c5f3 --- /dev/null +++ b/modules/features/commcoach/CONCEPT.md @@ -0,0 +1,178 @@ +# CommCoach – Communication Coach for Leaders + +## Product Goal + +An AI coaching agent for executives that: +- Captures topics, concerns, and questions +- Asks active diagnostic follow-up questions +- Builds a continuable context per topic (Dossier) +- Conducts daily training conversations +- Makes progress visible (Gamification) +- Supports voice natively (STT/TTS, voice selection) + +## Architecture + +### Layers + +``` +Transport (REST/SSE) → routeFeatureCommcoach.py +Orchestration → serviceCommcoach.py +AI Pipeline → serviceCommcoachAi.py +Scheduler → serviceCommcoachScheduler.py +Domain / Storage → interfaceFeatureCommcoach.py +Data Models → datamodelCommcoach.py +Feature Registration → mainCommcoach.py +``` + +### Reuse from Existing Codebase + +| Component | Source | Usage | +|-----------|--------|-------| +| Feature Plug&Play | `registry.py` | Auto-discovery via `routeFeature*.py` | +| RequestContext + RBAC | `authentication.py`, `interfaceRbac.py` | Auth + ownership | +| DatabaseConnector | `connectorDbPostgre.py` | New DB `poweron_commcoach` | +| VoiceObjects (STT/TTS) | `interfaceVoiceObjects.py` | Voice pipeline | +| MessagingInterface | `interfaceMessaging.py` | Email summaries | +| SSE Pattern | chatbot `routeFeatureChatbot.py` | Chat streaming | +| PDF Renderer | `rendererPdf.py` | Dossier export (Iteration 2) | +| EventManagement | `eventManagement.py` | Scheduled reminders | + +## Domain Model + +### Entities + +``` +User (1) ──── owns ──── (N) CoachingContext + │ +CoachingContext (1) ────── (N) CoachingSession + │ +CoachingSession (1) ───── (N) CoachingMessage + │ +CoachingContext (1) ────── (N) CoachingTask +CoachingContext (1) ────── (N) CoachingScore +User (1) ──────────────── (1) CoachingUserProfile +``` + +### Status Models + +``` +CoachingContext: active → paused → active | archived → active | completed +CoachingSession: active → completed | cancelled +CoachingTask: open → in_progress → done | skipped +``` + +## API Design + +``` +PREFIX: /api/commcoach/{instanceId} + +# Contexts (Dossier) +GET /contexts +POST /contexts +GET /contexts/{contextId} +PUT /contexts/{contextId} +DELETE /contexts/{contextId} +POST /contexts/{contextId}/archive +POST /contexts/{contextId}/activate + +# Sessions +GET /contexts/{contextId}/sessions +POST /contexts/{contextId}/sessions/start +GET /sessions/{sessionId} +POST /sessions/{sessionId}/complete +POST /sessions/{sessionId}/cancel + +# Streaming Chat +POST /sessions/{sessionId}/message/stream +POST /sessions/{sessionId}/audio/stream +GET /sessions/{sessionId}/stream + +# Tasks +GET /contexts/{contextId}/tasks +POST /contexts/{contextId}/tasks +PUT /tasks/{taskId} +PUT /tasks/{taskId}/status +DELETE /tasks/{taskId} + +# Dashboard +GET /dashboard + +# User Profile +GET /profile +PUT /profile + +# Voice +GET /voice/languages +GET /voice/voices +POST /voice/tts +``` + +### SSE Event Types + +- `message` – Complete message +- `messageChunk` – Streaming token +- `sessionState` – Status update +- `taskCreated` – New task from coach +- `insightGenerated` – New insight +- `scoreUpdate` – Score change +- `status` – UI status label +- `complete` – Stream ended +- `error` – Error +- `ping` – Keepalive + +## RBAC Model + +### Ownership Rules (Critical) +- **Strict MY-only**: User sees only own contexts/sessions/messages/tasks/scores +- **SysAdmin**: Only technical monitoring, NO content access +- **No admin override** on userId filter + +### Template Roles +- `commcoach-user`: DATA=MY on all entities, UI=ALL, RESOURCE=ALL +- `commcoach-admin`: DATA=MY (intentionally not ALL), UI=ALL, RESOURCE=ALL + +### Audit Events +- `commcoach.context.created/archived` +- `commcoach.session.started/completed` +- `commcoach.export.requested` + +## Iterations + +### Iteration 1 (MVP) +- Context management (create, switch, archive) +- Chat + SSE streaming +- STT/TTS with language/voice selection +- Coaching session with active diagnostic questions +- Auto session protocol +- Tasks/Checklist per context +- Session summary via email +- RBAC + strict ownership +- Basic dashboard: continuity, competence score, goal progress +- Long-session compression: ab 25 Nachrichten wird der aeltere Verlauf per AI zusammengefasst, letzte 15 Nachrichten bleiben vollstaendig (Teamsbot-Pattern) +- Context Memory (Phasen 1-7): previousSessionSummaries im Chat, keyTopics bei completeSession, Intent-Erkennung (summarize_all, recall_session, recall_topic), Datums-Lookup, Topic-Suche, Rolling Overview, RAG-Platzhalter + +### Iteration 2 +- Roleplay personas (critical CFO, difficult employee, etc.) +- Document upload + context binding +- Exports (Markdown/PDF) +- Extended gamification (streaks, levels, badges) +- Better scoring/insights + +## Database + +- Database name: `poweron_commcoach` +- Tables auto-created from Pydantic models via `DatabaseConnector` + +## Frontend + +### Views +- `CommcoachDashboardView` – KPIs, streaks, quick start +- `CommcoachCoachingView` – Chat UI with voice + context tabs +- `CommcoachDossierView` – Dossier: timeline, tasks, scores +- `CommcoachSettingsView` – Voice, reminder, profile settings + +### UX +- Multiple active contexts as quick-switch tabs/chips +- "Daily Coach" entry point prominent +- Voice first, always with text fallback +- Dossier view: timeline, learnings, tasks, next exercise diff --git a/modules/features/commcoach/__init__.py b/modules/features/commcoach/__init__.py new file mode 100644 index 00000000..ea99083a --- /dev/null +++ b/modules/features/commcoach/__init__.py @@ -0,0 +1 @@ +# CommCoach Feature Container diff --git a/modules/features/commcoach/datamodelCommcoach.py b/modules/features/commcoach/datamodelCommcoach.py new file mode 100644 index 00000000..0ba636ff --- /dev/null +++ b/modules/features/commcoach/datamodelCommcoach.py @@ -0,0 +1,247 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +CommCoach Feature - Data Models. +Pydantic models for coaching contexts, sessions, messages, tasks, scores, and user profiles. +""" +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field +from enum import Enum +import uuid + + +# ============================================================================ +# Enums +# ============================================================================ + +class CoachingContextStatus(str, Enum): + ACTIVE = "active" + PAUSED = "paused" + ARCHIVED = "archived" + COMPLETED = "completed" + + +class CoachingContextCategory(str, Enum): + LEADERSHIP = "leadership" + CONFLICT = "conflict" + NEGOTIATION = "negotiation" + PRESENTATION = "presentation" + FEEDBACK = "feedback" + DELEGATION = "delegation" + CHANGE_MANAGEMENT = "changeManagement" + CUSTOM = "custom" + + +class CoachingSessionStatus(str, Enum): + ACTIVE = "active" + COMPLETED = "completed" + CANCELLED = "cancelled" + + +class CoachingMessageRole(str, Enum): + USER = "user" + ASSISTANT = "assistant" + SYSTEM = "system" + + +class CoachingMessageContentType(str, Enum): + TEXT = "text" + AUDIO_TRANSCRIPT = "audioTranscript" + SYSTEM_NOTE = "systemNote" + + +class CoachingTaskStatus(str, Enum): + OPEN = "open" + IN_PROGRESS = "inProgress" + DONE = "done" + SKIPPED = "skipped" + + +class CoachingTaskPriority(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +class CoachingScoreTrend(str, Enum): + IMPROVING = "improving" + STABLE = "stable" + DECLINING = "declining" + + +# ============================================================================ +# Database Models +# ============================================================================ + +class CoachingContext(BaseModel): + """A coaching context/dossier representing a topic the user is working on.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + userId: str = Field(description="Owner user ID (strict ownership)") + mandateId: str = Field(description="Mandate ID") + instanceId: str = Field(description="Feature instance ID") + title: str = Field(description="Context title, e.g. 'Conflict with team lead'") + description: Optional[str] = Field(default=None, description="Short description") + category: CoachingContextCategory = Field(default=CoachingContextCategory.CUSTOM) + status: CoachingContextStatus = Field(default=CoachingContextStatus.ACTIVE) + goals: Optional[str] = Field(default=None, description="JSON array of goals [{id, text, status, createdAt}]") + insights: Optional[str] = Field(default=None, description="JSON array of AI insights [{id, text, sessionId, createdAt}]") + metadata: Optional[str] = Field(default=None, description="JSON object with flexible metadata") + sessionCount: int = Field(default=0) + taskCount: int = Field(default=0) + lastSessionAt: Optional[str] = Field(default=None) + rollingOverview: Optional[str] = Field(default=None, description="AI summary of older sessions for long context history") + rollingOverviewUpToSessionCount: Optional[int] = Field(default=None, description="Session count covered by rollingOverview") + createdAt: Optional[str] = Field(default=None) + updatedAt: Optional[str] = Field(default=None) + + +class CoachingSession(BaseModel): + """A single coaching conversation session within a context.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + contextId: str = Field(description="FK to CoachingContext") + userId: str = Field(description="Owner user ID") + mandateId: str = Field(description="Mandate ID") + instanceId: str = Field(description="Feature instance ID") + status: CoachingSessionStatus = Field(default=CoachingSessionStatus.ACTIVE) + summary: Optional[str] = Field(default=None, description="AI-generated session summary") + coachNotes: Optional[str] = Field(default=None, description="JSON: AI internal notes for continuity") + compressedHistorySummary: Optional[str] = Field(default=None, description="AI summary of older messages for long sessions") + compressedHistoryUpToMessageCount: Optional[int] = Field(default=None, description="Message count covered by compressedHistorySummary") + keyTopics: Optional[str] = Field(default=None, description="JSON array of key topics extracted at session complete") + durationSeconds: int = Field(default=0) + messageCount: int = Field(default=0) + competenceScore: Optional[float] = Field(default=None, ge=0.0, le=100.0) + emailSent: bool = Field(default=False) + startedAt: Optional[str] = Field(default=None) + endedAt: Optional[str] = Field(default=None) + createdAt: Optional[str] = Field(default=None) + updatedAt: Optional[str] = Field(default=None) + + +class CoachingMessage(BaseModel): + """A single message in a coaching session.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + sessionId: str = Field(description="FK to CoachingSession") + contextId: str = Field(description="FK to CoachingContext") + userId: str = Field(description="Owner user ID") + role: CoachingMessageRole = Field(description="Message author role") + content: str = Field(description="Message content (Markdown)") + contentType: CoachingMessageContentType = Field(default=CoachingMessageContentType.TEXT) + audioRef: Optional[str] = Field(default=None, description="Reference to audio file") + metadata: Optional[str] = Field(default=None, description="JSON: token count, voice info, etc.") + createdAt: Optional[str] = Field(default=None) + + +class CoachingTask(BaseModel): + """A task/checklist item assigned within a coaching context.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + contextId: str = Field(description="FK to CoachingContext") + sessionId: Optional[str] = Field(default=None, description="FK to originating session") + userId: str = Field(description="Owner user ID") + mandateId: str = Field(description="Mandate ID") + title: str = Field(description="Task title") + description: Optional[str] = Field(default=None) + status: CoachingTaskStatus = Field(default=CoachingTaskStatus.OPEN) + priority: CoachingTaskPriority = Field(default=CoachingTaskPriority.MEDIUM) + dueDate: Optional[str] = Field(default=None) + completedAt: Optional[str] = Field(default=None) + createdAt: Optional[str] = Field(default=None) + updatedAt: Optional[str] = Field(default=None) + + +class CoachingScore(BaseModel): + """A competence score for a dimension, recorded after a session.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + contextId: str = Field(description="FK to CoachingContext") + sessionId: str = Field(description="FK to CoachingSession") + userId: str = Field(description="Owner user ID") + mandateId: str = Field(description="Mandate ID") + dimension: str = Field(description="e.g. empathy, clarity, assertiveness, listening") + score: float = Field(ge=0.0, le=100.0) + trend: CoachingScoreTrend = Field(default=CoachingScoreTrend.STABLE) + evidence: Optional[str] = Field(default=None, description="AI reasoning for the score") + createdAt: Optional[str] = Field(default=None) + + +class CoachingUserProfile(BaseModel): + """Per-user coaching profile and preferences.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + userId: str = Field(description="Owner user ID") + mandateId: str = Field(description="Mandate ID") + instanceId: str = Field(description="Feature instance ID") + preferredLanguage: str = Field(default="de-DE") + preferredVoice: Optional[str] = Field(default=None, description="Google TTS voice name") + dailyReminderTime: Optional[str] = Field(default=None, description="HH:MM format") + dailyReminderEnabled: bool = Field(default=False) + emailSummaryEnabled: bool = Field(default=True) + streakDays: int = Field(default=0) + longestStreak: int = Field(default=0) + totalSessions: int = Field(default=0) + totalMinutes: int = Field(default=0) + lastSessionAt: Optional[str] = Field(default=None) + createdAt: Optional[str] = Field(default=None) + updatedAt: Optional[str] = Field(default=None) + + +# ============================================================================ +# API Request/Response Models +# ============================================================================ + +class CreateContextRequest(BaseModel): + title: str = Field(description="Context title") + description: Optional[str] = None + category: Optional[CoachingContextCategory] = CoachingContextCategory.CUSTOM + goals: Optional[List[str]] = None + + +class UpdateContextRequest(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + category: Optional[CoachingContextCategory] = None + goals: Optional[str] = None + + +class SendMessageRequest(BaseModel): + content: str = Field(description="User message text") + contentType: Optional[CoachingMessageContentType] = CoachingMessageContentType.TEXT + + +class CreateTaskRequest(BaseModel): + title: str + description: Optional[str] = None + priority: Optional[CoachingTaskPriority] = CoachingTaskPriority.MEDIUM + dueDate: Optional[str] = None + + +class UpdateTaskRequest(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + priority: Optional[CoachingTaskPriority] = None + dueDate: Optional[str] = None + + +class UpdateTaskStatusRequest(BaseModel): + status: CoachingTaskStatus + + +class UpdateProfileRequest(BaseModel): + preferredLanguage: Optional[str] = None + preferredVoice: Optional[str] = None + dailyReminderTime: Optional[str] = None + dailyReminderEnabled: Optional[bool] = None + emailSummaryEnabled: Optional[bool] = None + + +class DashboardData(BaseModel): + """Aggregated dashboard data for the user.""" + totalContexts: int = 0 + activeContexts: int = 0 + totalSessions: int = 0 + totalMinutes: int = 0 + streakDays: int = 0 + longestStreak: int = 0 + averageScore: Optional[float] = None + recentScores: List[Dict[str, Any]] = Field(default_factory=list) + openTasks: int = 0 + completedTasks: int = 0 + contexts: List[Dict[str, Any]] = Field(default_factory=list) diff --git a/modules/features/commcoach/interfaceFeatureCommcoach.py b/modules/features/commcoach/interfaceFeatureCommcoach.py new file mode 100644 index 00000000..830aa261 --- /dev/null +++ b/modules/features/commcoach/interfaceFeatureCommcoach.py @@ -0,0 +1,315 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Interface to CommCoach database. +Uses the PostgreSQL connector for data access with strict user ownership. +""" + +import logging +from typing import Dict, Any, List, Optional + +from modules.datamodels.datamodelUam import User +from modules.connectors.connectorDbPostgre import DatabaseConnector +from modules.shared.timeUtils import getIsoTimestamp +from modules.shared.configuration import APP_CONFIG + +from .datamodelCommcoach import ( + CoachingContext, CoachingContextStatus, + CoachingSession, CoachingSessionStatus, + CoachingMessage, + CoachingTask, CoachingTaskStatus, + CoachingScore, + CoachingUserProfile, +) + +logger = logging.getLogger(__name__) + +_interfaces = {} + + +def getInterface(currentUser: User, mandateId: str = None, featureInstanceId: str = None): + """Factory: get or create a CommcoachObjects interface instance.""" + key = f"{currentUser.id}_{mandateId}_{featureInstanceId}" + if key not in _interfaces: + _interfaces[key] = CommcoachObjects(currentUser, mandateId, featureInstanceId) + else: + _interfaces[key].currentUser = currentUser + _interfaces[key].mandateId = mandateId + _interfaces[key].featureInstanceId = featureInstanceId + return _interfaces[key] + + +class CommcoachObjects: + """Database interface for CommCoach feature. All reads enforce strict userId ownership.""" + + def __init__(self, currentUser: User, mandateId: str = None, featureInstanceId: str = None): + self.currentUser = currentUser + self.mandateId = mandateId + self.featureInstanceId = featureInstanceId + self.userId = str(currentUser.id) if currentUser else "system" + + dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data") + dbDatabase = "poweron_commcoach" + dbUser = APP_CONFIG.get("DB_USER") + dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") + dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) + + self.db = DatabaseConnector( + dbHost=dbHost, + dbDatabase=dbDatabase, + dbUser=dbUser, + dbPassword=dbPassword, + dbPort=dbPort, + userId=self.userId, + ) + + # ========================================================================= + # Contexts + # ========================================================================= + + def getContexts(self, instanceId: str, userId: str, includeArchived: bool = False) -> List[Dict[str, Any]]: + """Get all coaching contexts for a user. Strict ownership.""" + records = self.db.getRecordset( + CoachingContext, + recordFilter={"instanceId": instanceId, "userId": userId}, + ) + if not includeArchived: + records = [r for r in records if r.get("status") != CoachingContextStatus.ARCHIVED.value] + records.sort(key=lambda r: r.get("updatedAt") or r.get("createdAt") or "", reverse=True) + return records + + def getContext(self, contextId: str) -> Optional[Dict[str, Any]]: + records = self.db.getRecordset(CoachingContext, recordFilter={"id": contextId}) + return records[0] if records else None + + def createContext(self, data: Dict[str, Any]) -> Dict[str, Any]: + data["createdAt"] = getIsoTimestamp() + data["updatedAt"] = getIsoTimestamp() + return self.db.recordCreate(CoachingContext, data) + + def updateContext(self, contextId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + updates["updatedAt"] = getIsoTimestamp() + return self.db.recordModify(CoachingContext, contextId, updates) + + def deleteContext(self, contextId: str) -> bool: + self._deleteSessionsByContext(contextId) + self._deleteTasksByContext(contextId) + self._deleteScoresByContext(contextId) + return self.db.recordDelete(CoachingContext, contextId) + + # ========================================================================= + # Sessions + # ========================================================================= + + def getSessions(self, contextId: str, userId: str) -> List[Dict[str, Any]]: + records = self.db.getRecordset( + CoachingSession, + recordFilter={"contextId": contextId, "userId": userId}, + ) + records.sort(key=lambda r: r.get("startedAt") or r.get("createdAt") or "", reverse=True) + return records + + def getSession(self, sessionId: str) -> Optional[Dict[str, Any]]: + records = self.db.getRecordset(CoachingSession, recordFilter={"id": sessionId}) + return records[0] if records else None + + def getActiveSession(self, contextId: str, userId: str) -> Optional[Dict[str, Any]]: + records = self.db.getRecordset( + CoachingSession, + recordFilter={"contextId": contextId, "userId": userId, "status": CoachingSessionStatus.ACTIVE.value}, + ) + return records[0] if records else None + + def createSession(self, data: Dict[str, Any]) -> Dict[str, Any]: + data["createdAt"] = getIsoTimestamp() + data["updatedAt"] = getIsoTimestamp() + data["startedAt"] = getIsoTimestamp() + return self.db.recordCreate(CoachingSession, data) + + def updateSession(self, sessionId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + updates["updatedAt"] = getIsoTimestamp() + return self.db.recordModify(CoachingSession, sessionId, updates) + + def _deleteSessionsByContext(self, contextId: str) -> int: + records = self.db.getRecordset(CoachingSession, recordFilter={"contextId": contextId}) + count = 0 + for record in records: + self._deleteMessagesBySession(record.get("id")) + self.db.recordDelete(CoachingSession, record.get("id")) + count += 1 + return count + + # ========================================================================= + # Messages + # ========================================================================= + + def getMessages(self, sessionId: str) -> List[Dict[str, Any]]: + records = self.db.getRecordset(CoachingMessage, recordFilter={"sessionId": sessionId}) + records.sort(key=lambda r: r.get("createdAt") or "") + return records + + def getRecentMessages(self, sessionId: str, count: int = 20) -> List[Dict[str, Any]]: + records = self.getMessages(sessionId) + return records[-count:] + + def createMessage(self, data: Dict[str, Any]) -> Dict[str, Any]: + data["createdAt"] = getIsoTimestamp() + return self.db.recordCreate(CoachingMessage, data) + + def _deleteMessagesBySession(self, sessionId: str) -> int: + records = self.db.getRecordset(CoachingMessage, recordFilter={"sessionId": sessionId}) + count = 0 + for record in records: + self.db.recordDelete(CoachingMessage, record.get("id")) + count += 1 + return count + + # ========================================================================= + # Tasks + # ========================================================================= + + def getTasks(self, contextId: str, userId: str) -> List[Dict[str, Any]]: + records = self.db.getRecordset( + CoachingTask, + recordFilter={"contextId": contextId, "userId": userId}, + ) + records.sort(key=lambda r: r.get("createdAt") or "", reverse=True) + return records + + def getTask(self, taskId: str) -> Optional[Dict[str, Any]]: + records = self.db.getRecordset(CoachingTask, recordFilter={"id": taskId}) + return records[0] if records else None + + def createTask(self, data: Dict[str, Any]) -> Dict[str, Any]: + data["createdAt"] = getIsoTimestamp() + data["updatedAt"] = getIsoTimestamp() + return self.db.recordCreate(CoachingTask, data) + + def updateTask(self, taskId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + updates["updatedAt"] = getIsoTimestamp() + return self.db.recordModify(CoachingTask, taskId, updates) + + def deleteTask(self, taskId: str) -> bool: + return self.db.recordDelete(CoachingTask, taskId) + + def _deleteTasksByContext(self, contextId: str) -> int: + records = self.db.getRecordset(CoachingTask, recordFilter={"contextId": contextId}) + count = 0 + for record in records: + self.db.recordDelete(CoachingTask, record.get("id")) + count += 1 + return count + + def getOpenTaskCount(self, userId: str, instanceId: str) -> int: + records = self.db.getRecordset(CoachingTask, recordFilter={"userId": userId}) + return len([r for r in records if r.get("status") in (CoachingTaskStatus.OPEN.value, CoachingTaskStatus.IN_PROGRESS.value)]) + + def getCompletedTaskCount(self, userId: str, instanceId: str) -> int: + records = self.db.getRecordset(CoachingTask, recordFilter={"userId": userId}) + return len([r for r in records if r.get("status") == CoachingTaskStatus.DONE.value]) + + # ========================================================================= + # Scores + # ========================================================================= + + def getScores(self, contextId: str, userId: str) -> List[Dict[str, Any]]: + records = self.db.getRecordset( + CoachingScore, + recordFilter={"contextId": contextId, "userId": userId}, + ) + records.sort(key=lambda r: r.get("createdAt") or "") + return records + + def getRecentScores(self, userId: str, limit: int = 20) -> List[Dict[str, Any]]: + records = self.db.getRecordset(CoachingScore, recordFilter={"userId": userId}) + records.sort(key=lambda r: r.get("createdAt") or "", reverse=True) + return records[:limit] + + def createScore(self, data: Dict[str, Any]) -> Dict[str, Any]: + data["createdAt"] = getIsoTimestamp() + return self.db.recordCreate(CoachingScore, data) + + def _deleteScoresByContext(self, contextId: str) -> int: + records = self.db.getRecordset(CoachingScore, recordFilter={"contextId": contextId}) + count = 0 + for record in records: + self.db.recordDelete(CoachingScore, record.get("id")) + count += 1 + return count + + # ========================================================================= + # User Profile + # ========================================================================= + + def getProfile(self, userId: str, instanceId: str) -> Optional[Dict[str, Any]]: + records = self.db.getRecordset( + CoachingUserProfile, + recordFilter={"userId": userId, "instanceId": instanceId}, + ) + return records[0] if records else None + + def getOrCreateProfile(self, userId: str, mandateId: str, instanceId: str) -> Dict[str, Any]: + existing = self.getProfile(userId, instanceId) + if existing: + return existing + data = CoachingUserProfile( + userId=userId, + mandateId=mandateId, + instanceId=instanceId, + ).model_dump() + data["createdAt"] = getIsoTimestamp() + data["updatedAt"] = getIsoTimestamp() + return self.db.recordCreate(CoachingUserProfile, data) + + def updateProfile(self, profileId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + updates["updatedAt"] = getIsoTimestamp() + return self.db.recordModify(CoachingUserProfile, profileId, updates) + + # ========================================================================= + # Dashboard Aggregation + # ========================================================================= + + def getDashboardData(self, userId: str, instanceId: str) -> Dict[str, Any]: + contexts = self.db.getRecordset(CoachingContext, recordFilter={"userId": userId, "instanceId": instanceId}) + sessions = self.db.getRecordset(CoachingSession, recordFilter={"userId": userId, "instanceId": instanceId}) + profile = self.getProfile(userId, instanceId) + + activeContexts = [c for c in contexts if c.get("status") == CoachingContextStatus.ACTIVE.value] + completedSessions = [s for s in sessions if s.get("status") == CoachingSessionStatus.COMPLETED.value] + + totalMinutes = sum(s.get("durationSeconds", 0) for s in completedSessions) // 60 + scores = [] + for s in completedSessions: + raw = s.get("competenceScore") + if raw is not None: + try: + scores.append(float(raw)) + except (ValueError, TypeError): + pass + avgScore = sum(scores) / len(scores) if scores else None + + recentScores = self.getRecentScores(userId, limit=10) + + contextSummaries = [] + for ctx in activeContexts: + contextSummaries.append({ + "id": ctx.get("id"), + "title": ctx.get("title"), + "category": ctx.get("category"), + "sessionCount": ctx.get("sessionCount", 0), + "lastSessionAt": ctx.get("lastSessionAt"), + }) + + return { + "totalContexts": len(contexts), + "activeContexts": len(activeContexts), + "totalSessions": len(completedSessions), + "totalMinutes": totalMinutes, + "streakDays": profile.get("streakDays", 0) if profile else 0, + "longestStreak": profile.get("longestStreak", 0) if profile else 0, + "averageScore": round(avgScore, 1) if avgScore else None, + "recentScores": recentScores, + "openTasks": self.getOpenTaskCount(userId, instanceId), + "completedTasks": self.getCompletedTaskCount(userId, instanceId), + "contexts": contextSummaries, + } diff --git a/modules/features/commcoach/mainCommcoach.py b/modules/features/commcoach/mainCommcoach.py new file mode 100644 index 00000000..c5a0a7c1 --- /dev/null +++ b/modules/features/commcoach/mainCommcoach.py @@ -0,0 +1,281 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +CommCoach Feature Container - Main Module. +Handles feature initialization and RBAC catalog registration. +""" + +import logging +from typing import Dict, List, Any + +logger = logging.getLogger(__name__) + +FEATURE_CODE = "commcoach" +FEATURE_LABEL = {"en": "Communication Coach", "de": "Kommunikations-Coach", "fr": "Coach Communication"} +FEATURE_ICON = "mdi-account-voice" + +UI_OBJECTS = [ + { + "objectKey": "ui.feature.commcoach.dashboard", + "label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"}, + "meta": {"area": "dashboard"} + }, + { + "objectKey": "ui.feature.commcoach.coaching", + "label": {"en": "Coaching", "de": "Coaching", "fr": "Coaching"}, + "meta": {"area": "coaching"} + }, + { + "objectKey": "ui.feature.commcoach.dossier", + "label": {"en": "Dossier", "de": "Dossier", "fr": "Dossier"}, + "meta": {"area": "dossier"} + }, + { + "objectKey": "ui.feature.commcoach.settings", + "label": {"en": "Settings", "de": "Einstellungen", "fr": "Parametres"}, + "meta": {"area": "settings"} + }, +] + +DATA_OBJECTS = [ + { + "objectKey": "data.feature.commcoach.CoachingContext", + "label": {"en": "Coaching Context", "de": "Coaching-Kontext", "fr": "Contexte coaching"}, + "meta": {"table": "CoachingContext", "fields": ["id", "title", "category", "status"]} + }, + { + "objectKey": "data.feature.commcoach.CoachingSession", + "label": {"en": "Coaching Session", "de": "Coaching-Session", "fr": "Session coaching"}, + "meta": {"table": "CoachingSession", "fields": ["id", "contextId", "status", "summary"]} + }, + { + "objectKey": "data.feature.commcoach.CoachingMessage", + "label": {"en": "Coaching Message", "de": "Coaching-Nachricht", "fr": "Message coaching"}, + "meta": {"table": "CoachingMessage", "fields": ["id", "sessionId", "role", "content"]} + }, + { + "objectKey": "data.feature.commcoach.CoachingTask", + "label": {"en": "Coaching Task", "de": "Coaching-Aufgabe", "fr": "Tache coaching"}, + "meta": {"table": "CoachingTask", "fields": ["id", "contextId", "title", "status"]} + }, + { + "objectKey": "data.feature.commcoach.CoachingScore", + "label": {"en": "Coaching Score", "de": "Coaching-Score", "fr": "Score coaching"}, + "meta": {"table": "CoachingScore", "fields": ["id", "dimension", "score", "trend"]} + }, + { + "objectKey": "data.feature.commcoach.CoachingUserProfile", + "label": {"en": "User Profile", "de": "Benutzerprofil", "fr": "Profil utilisateur"}, + "meta": {"table": "CoachingUserProfile", "fields": ["id", "userId", "preferredLanguage"]} + }, + { + "objectKey": "data.feature.commcoach.*", + "label": {"en": "All CommCoach Data", "de": "Alle CommCoach-Daten", "fr": "Toutes les donnees CommCoach"}, + "meta": {"wildcard": True} + }, +] + +RESOURCE_OBJECTS = [ + { + "objectKey": "resource.feature.commcoach.context.create", + "label": {"en": "Create Context", "de": "Kontext erstellen", "fr": "Creer contexte"}, + "meta": {"endpoint": "/api/commcoach/{instanceId}/contexts", "method": "POST"} + }, + { + "objectKey": "resource.feature.commcoach.context.archive", + "label": {"en": "Archive Context", "de": "Kontext archivieren", "fr": "Archiver contexte"}, + "meta": {"endpoint": "/api/commcoach/{instanceId}/contexts/{contextId}/archive", "method": "POST"} + }, + { + "objectKey": "resource.feature.commcoach.session.start", + "label": {"en": "Start Session", "de": "Session starten", "fr": "Demarrer session"}, + "meta": {"endpoint": "/api/commcoach/{instanceId}/contexts/{contextId}/sessions/start", "method": "POST"} + }, + { + "objectKey": "resource.feature.commcoach.session.complete", + "label": {"en": "Complete Session", "de": "Session abschliessen", "fr": "Terminer session"}, + "meta": {"endpoint": "/api/commcoach/{instanceId}/sessions/{sessionId}/complete", "method": "POST"} + }, + { + "objectKey": "resource.feature.commcoach.task.manage", + "label": {"en": "Manage Tasks", "de": "Aufgaben verwalten", "fr": "Gerer taches"}, + "meta": {"endpoint": "/api/commcoach/{instanceId}/contexts/{contextId}/tasks", "method": "POST"} + }, +] + +TEMPLATE_ROLES = [ + { + "roleLabel": "commcoach-user", + "description": { + "en": "Communication Coach User - Can manage own coaching contexts and sessions", + "de": "Kommunikations-Coach Benutzer - Kann eigene Coaching-Kontexte und Sessions verwalten", + "fr": "Utilisateur Coach Communication - Peut gerer ses propres contextes et sessions" + }, + "accessRules": [ + {"context": "UI", "item": "ui.feature.commcoach.dashboard", "view": True}, + {"context": "UI", "item": "ui.feature.commcoach.coaching", "view": True}, + {"context": "UI", "item": "ui.feature.commcoach.dossier", "view": True}, + {"context": "UI", "item": "ui.feature.commcoach.settings", "view": True}, + {"context": "DATA", "item": "data.feature.commcoach.CoachingContext", "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"}, + {"context": "DATA", "item": "data.feature.commcoach.CoachingSession", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"}, + {"context": "DATA", "item": "data.feature.commcoach.CoachingMessage", "view": True, "read": "m", "create": "m", "update": "n", "delete": "n"}, + {"context": "DATA", "item": "data.feature.commcoach.CoachingTask", "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"}, + {"context": "DATA", "item": "data.feature.commcoach.CoachingScore", "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"}, + {"context": "DATA", "item": "data.feature.commcoach.CoachingUserProfile", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"}, + {"context": "RESOURCE", "item": "resource.feature.commcoach.context.create", "view": True}, + {"context": "RESOURCE", "item": "resource.feature.commcoach.context.archive", "view": True}, + {"context": "RESOURCE", "item": "resource.feature.commcoach.session.start", "view": True}, + {"context": "RESOURCE", "item": "resource.feature.commcoach.session.complete", "view": True}, + {"context": "RESOURCE", "item": "resource.feature.commcoach.task.manage", "view": True}, + ] + }, +] + + +def getFeatureDefinition() -> Dict[str, Any]: + return { + "code": FEATURE_CODE, + "label": FEATURE_LABEL, + "icon": FEATURE_ICON, + "autoCreateInstance": True, + } + + +def getUiObjects() -> List[Dict[str, Any]]: + return UI_OBJECTS + + +def getResourceObjects() -> List[Dict[str, Any]]: + return RESOURCE_OBJECTS + + +def getTemplateRoles() -> List[Dict[str, Any]]: + return TEMPLATE_ROLES + + +def getDataObjects() -> List[Dict[str, Any]]: + return DATA_OBJECTS + + +def registerFeature(catalogService) -> bool: + try: + for uiObj in UI_OBJECTS: + catalogService.registerUiObject( + featureCode=FEATURE_CODE, + objectKey=uiObj["objectKey"], + label=uiObj["label"], + meta=uiObj.get("meta") + ) + + for resObj in RESOURCE_OBJECTS: + catalogService.registerResourceObject( + featureCode=FEATURE_CODE, + objectKey=resObj["objectKey"], + label=resObj["label"], + meta=resObj.get("meta") + ) + + for dataObj in DATA_OBJECTS: + catalogService.registerDataObject( + featureCode=FEATURE_CODE, + objectKey=dataObj["objectKey"], + label=dataObj["label"], + meta=dataObj.get("meta") + ) + + _syncTemplateRolesToDb() + + logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI, {len(RESOURCE_OBJECTS)} resource, {len(DATA_OBJECTS)} data objects") + return True + + except Exception as e: + logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}") + return False + + +def _syncTemplateRolesToDb() -> int: + try: + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext + + rootInterface = getRootInterface() + existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE) + templateRoles = [r for r in existingRoles if r.mandateId is None] + existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles} + + createdCount = 0 + for roleTemplate in TEMPLATE_ROLES: + roleLabel = roleTemplate["roleLabel"] + + if roleLabel in existingRoleLabels: + roleId = existingRoleLabels[roleLabel] + _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) + else: + newRole = Role( + roleLabel=roleLabel, + description=roleTemplate.get("description", {}), + featureCode=FEATURE_CODE, + mandateId=None, + featureInstanceId=None, + isSystemRole=False + ) + createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump()) + roleId = createdRole.get("id") + _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) + logger.info(f"Created template role '{roleLabel}' with ID {roleId}") + createdCount += 1 + + if createdCount > 0: + logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles") + + return createdCount + + except Exception as e: + logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}") + return 0 + + +def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int: + from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext + + existingRules = rootInterface.getAccessRulesByRole(roleId) + existingSignatures = set() + for rule in existingRules: + sig = (rule.context.value if rule.context else None, rule.item) + existingSignatures.add(sig) + + createdCount = 0 + for template in ruleTemplates: + context = template.get("context", "UI") + item = template.get("item") + sig = (context, item) + + if sig in existingSignatures: + continue + + if context == "UI": + contextEnum = AccessRuleContext.UI + elif context == "DATA": + contextEnum = AccessRuleContext.DATA + elif context == "RESOURCE": + contextEnum = AccessRuleContext.RESOURCE + else: + contextEnum = context + + newRule = AccessRule( + roleId=roleId, + context=contextEnum, + item=item, + view=template.get("view", False), + read=template.get("read"), + create=template.get("create"), + update=template.get("update"), + delete=template.get("delete"), + ) + rootInterface.db.recordCreate(AccessRule, newRule.model_dump()) + createdCount += 1 + + if createdCount > 0: + logger.debug(f"Created {createdCount} AccessRules for role {roleId}") + + return createdCount diff --git a/modules/features/commcoach/routeFeatureCommcoach.py b/modules/features/commcoach/routeFeatureCommcoach.py new file mode 100644 index 00000000..66a4b347 --- /dev/null +++ b/modules/features/commcoach/routeFeatureCommcoach.py @@ -0,0 +1,868 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +CommCoach routes for the backend API. +Implements coaching context management, session streaming, tasks, dashboard, and voice endpoints. +""" + +import logging +import json +import asyncio +import base64 +from typing import Optional +from fastapi import APIRouter, HTTPException, Depends, Request +from fastapi.responses import StreamingResponse + +from modules.auth import limiter, getRequestContext, RequestContext +from modules.shared.timeUtils import getIsoTimestamp +from modules.interfaces.interfaceDbApp import getRootInterface +from modules.interfaces.interfaceFeatures import getFeatureInterface + +from . import interfaceFeatureCommcoach as interfaceDb +from .datamodelCommcoach import ( + CoachingContext, CoachingContextStatus, CoachingSession, CoachingSessionStatus, + CoachingMessage, CoachingMessageRole, CoachingMessageContentType, + CoachingTask, CoachingTaskStatus, + CreateContextRequest, UpdateContextRequest, + SendMessageRequest, CreateTaskRequest, UpdateTaskRequest, UpdateTaskStatusRequest, + UpdateProfileRequest, +) +from .serviceCommcoach import CommcoachService, emitSessionEvent, getSessionEventQueue, cleanupSessionEvents + +logger = logging.getLogger(__name__) + +router = APIRouter( + prefix="/api/commcoach", + tags=["CommCoach"], + responses={404: {"description": "Not found"}} +) + + +# ========================================================================= +# Helpers +# ========================================================================= + +def _getInterface(context: RequestContext, instanceId: Optional[str] = None): + mandateId = str(context.mandateId) if context.mandateId else None + return interfaceDb.getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId) + + +def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str: + rootInterface = getRootInterface() + featureInterface = getFeatureInterface(rootInterface.db) + instance = featureInterface.getFeatureInstance(instanceId) + if not instance: + raise HTTPException(status_code=404, detail=f"Feature instance '{instanceId}' not found") + mandateId = instance.get("mandateId") if isinstance(instance, dict) else getattr(instance, "mandateId", None) + if not mandateId: + raise HTTPException(status_code=500, detail="Feature instance has no mandateId") + return str(mandateId) + + +def _validateOwnership(record: dict, context: RequestContext, fieldName: str = "userId") -> None: + """Strict ownership check. SysAdmin does NOT bypass for content access.""" + if record.get(fieldName) != str(context.user.id): + raise HTTPException(status_code=404, detail="Not found") + + +# ========================================================================= +# Context Endpoints +# ========================================================================= + +@router.get("/{instanceId}/contexts") +@limiter.limit("60/minute") +async def listContexts( + request: Request, + instanceId: str, + includeArchived: bool = False, + context: RequestContext = Depends(getRequestContext), +): + """List all coaching contexts for the current user.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + contexts = interface.getContexts(instanceId, userId, includeArchived=includeArchived) + return {"contexts": contexts} + + +@router.post("/{instanceId}/contexts") +@limiter.limit("20/minute") +async def createContext( + request: Request, + instanceId: str, + body: CreateContextRequest, + context: RequestContext = Depends(getRequestContext), +): + """Create a new coaching context/dossier.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + + goalsJson = None + if body.goals: + import uuid as _uuid + goalsList = [{"id": str(_uuid.uuid4()), "text": g, "status": "open", "createdAt": ""} for g in body.goals] + goalsJson = json.dumps(goalsList) + + contextData = CoachingContext( + userId=userId, + mandateId=mandateId, + instanceId=instanceId, + title=body.title, + description=body.description, + category=body.category, + goals=goalsJson, + ).model_dump() + + created = interface.createContext(contextData) + logger.info(f"CommCoach context created: {created.get('id')} for user {userId}") + return {"context": created} + + +@router.get("/{instanceId}/contexts/{contextId}") +@limiter.limit("60/minute") +async def getContext( + request: Request, + instanceId: str, + contextId: str, + context: RequestContext = Depends(getRequestContext), +): + """Get a coaching context with tasks and score summary.""" + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + + ctx = interface.getContext(contextId) + if not ctx: + raise HTTPException(status_code=404, detail="Context not found") + _validateOwnership(ctx, context) + + tasks = interface.getTasks(contextId, userId) + scores = interface.getScores(contextId, userId) + sessions = interface.getSessions(contextId, userId) + + return { + "context": ctx, + "tasks": tasks, + "scores": scores, + "sessions": sessions, + } + + +@router.put("/{instanceId}/contexts/{contextId}") +@limiter.limit("30/minute") +async def updateContext( + request: Request, + instanceId: str, + contextId: str, + body: UpdateContextRequest, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + ctx = interface.getContext(contextId) + if not ctx: + raise HTTPException(status_code=404, detail="Context not found") + _validateOwnership(ctx, context) + + updates = body.model_dump(exclude_none=True) + updated = interface.updateContext(contextId, updates) + return {"context": updated} + + +@router.delete("/{instanceId}/contexts/{contextId}") +@limiter.limit("10/minute") +async def deleteContext( + request: Request, + instanceId: str, + contextId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + ctx = interface.getContext(contextId) + if not ctx: + raise HTTPException(status_code=404, detail="Context not found") + _validateOwnership(ctx, context) + + interface.deleteContext(contextId) + return {"deleted": True} + + +@router.post("/{instanceId}/contexts/{contextId}/archive") +@limiter.limit("10/minute") +async def archiveContext( + request: Request, + instanceId: str, + contextId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + ctx = interface.getContext(contextId) + if not ctx: + raise HTTPException(status_code=404, detail="Context not found") + _validateOwnership(ctx, context) + + updated = interface.updateContext(contextId, {"status": CoachingContextStatus.ARCHIVED.value}) + return {"context": updated} + + +@router.post("/{instanceId}/contexts/{contextId}/activate") +@limiter.limit("10/minute") +async def activateContext( + request: Request, + instanceId: str, + contextId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + ctx = interface.getContext(contextId) + if not ctx: + raise HTTPException(status_code=404, detail="Context not found") + _validateOwnership(ctx, context) + + updated = interface.updateContext(contextId, {"status": CoachingContextStatus.ACTIVE.value}) + return {"context": updated} + + +# ========================================================================= +# Session Endpoints +# ========================================================================= + +@router.get("/{instanceId}/contexts/{contextId}/sessions") +@limiter.limit("60/minute") +async def listSessions( + request: Request, + instanceId: str, + contextId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + + ctx = interface.getContext(contextId) + if not ctx: + raise HTTPException(status_code=404, detail="Context not found") + _validateOwnership(ctx, context) + + sessions = interface.getSessions(contextId, userId) + return {"sessions": sessions} + + +@router.post("/{instanceId}/contexts/{contextId}/sessions/start") +@limiter.limit("10/minute") +async def startSession( + request: Request, + instanceId: str, + contextId: str, + context: RequestContext = Depends(getRequestContext), +): + """Start a new coaching session or resume active one. Returns SSE stream with sessionState, messages, and complete.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + + ctx = interface.getContext(contextId) + if not ctx: + raise HTTPException(status_code=404, detail="Context not found") + _validateOwnership(ctx, context) + + activeSession = interface.getActiveSession(contextId, userId) + if activeSession: + sessionId = activeSession.get("id") + messages = interface.getMessages(sessionId) + + async def _resumedEventGenerator(): + service = CommcoachService(context.user, mandateId, instanceId) + greetingText = await service.generateResumeGreeting(sessionId, contextId, messages, interface) + assistantMsg = CoachingMessage( + sessionId=sessionId, + contextId=contextId, + userId=userId, + role=CoachingMessageRole.ASSISTANT, + content=greetingText, + contentType=CoachingMessageContentType.TEXT, + ).model_dump() + createdGreeting = interface.createMessage(assistantMsg) + interface.updateSession(sessionId, {"messageCount": len(messages) + 1}) + greetingForFrontend = { + "id": createdGreeting.get("id"), + "sessionId": sessionId, + "contextId": contextId, + "role": "assistant", + "content": greetingText, + "contentType": "text", + "createdAt": createdGreeting.get("createdAt"), + } + messagesWithGreeting = messages + [greetingForFrontend] + sessionWithCount = {**activeSession, "messageCount": len(messagesWithGreeting)} + yield f"data: {json.dumps({'type': 'sessionState', 'data': {'session': sessionWithCount, 'resumed': True, 'messages': messagesWithGreeting}})}\n\n" + if greetingText: + try: + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + voiceInterface = getVoiceInterface(context.user, mandateId) + profile = interface.getProfile(userId, instanceId) + language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" + voiceName = profile.get("preferredVoice") if profile else None + from .serviceCommcoach import _stripMarkdownForTts + ttsResult = await voiceInterface.textToSpeech( + text=_stripMarkdownForTts(greetingText), + languageCode=language, + voiceName=voiceName, + ) + if ttsResult and isinstance(ttsResult, dict): + audioBytes = ttsResult.get("audioContent") + if audioBytes: + audioB64 = base64.b64encode( + audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() + ).decode() + yield f"data: {json.dumps({'type': 'ttsAudio', 'data': {'audio': audioB64, 'format': 'mp3'}})}\n\n" + except Exception as e: + logger.warning(f"TTS failed for resumed session: {e}") + yield f"data: {json.dumps({'type': 'complete', 'data': {}, 'timestamp': getIsoTimestamp()})}\n\n" + + return StreamingResponse( + _resumedEventGenerator(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, + ) + + sessionData = CoachingSession( + contextId=contextId, + userId=userId, + mandateId=mandateId, + instanceId=instanceId, + ).model_dump() + created = interface.createSession(sessionData) + sessionId = created.get("id") + + eventQueue = getSessionEventQueue(sessionId) + await emitSessionEvent(sessionId, "sessionState", {"session": created, "resumed": False}) + + service = CommcoachService(context.user, mandateId, instanceId) + asyncio.create_task(service.processSessionOpening(sessionId, contextId, interface)) + + async def _newSessionEventGenerator(): + from modules.shared.timeUtils import getIsoTimestamp + timeoutCount = 0 + try: + while True: + try: + event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) + yield f"data: {json.dumps(event)}\n\n" + timeoutCount = 0 + if event.get("type") in ("complete", "error"): + break + except asyncio.TimeoutError: + yield f"data: {json.dumps({'type': 'ping', 'timestamp': getIsoTimestamp()})}\n\n" + timeoutCount += 1 + if timeoutCount > 10: + break + except asyncio.CancelledError: + pass + + logger.info(f"CommCoach session started (streaming): {sessionId} for context {contextId}") + return StreamingResponse( + _newSessionEventGenerator(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, + ) + + + +@router.get("/{instanceId}/sessions/{sessionId}") +@limiter.limit("60/minute") +async def getSession( + request: Request, + instanceId: str, + sessionId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + _validateOwnership(session, context) + + messages = interface.getMessages(sessionId) + return {"session": session, "messages": messages} + + +@router.post("/{instanceId}/sessions/{sessionId}/complete") +@limiter.limit("10/minute") +async def completeSession( + request: Request, + instanceId: str, + sessionId: str, + context: RequestContext = Depends(getRequestContext), +): + """Complete a coaching session. Triggers summary, scoring, task extraction, email.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + _validateOwnership(session, context) + + if session.get("status") != CoachingSessionStatus.ACTIVE.value: + raise HTTPException(status_code=400, detail=f"Session is already {session.get('status')}") + + service = CommcoachService(context.user, mandateId, instanceId) + result = await service.completeSession(sessionId, interface) + return {"session": result} + + +@router.post("/{instanceId}/sessions/{sessionId}/cancel") +@limiter.limit("10/minute") +async def cancelSession( + request: Request, + instanceId: str, + sessionId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + _validateOwnership(session, context) + + from modules.shared.timeUtils import getIsoTimestamp + interface.updateSession(sessionId, { + "status": CoachingSessionStatus.CANCELLED.value, + "endedAt": getIsoTimestamp(), + }) + return {"cancelled": True} + + +# ========================================================================= +# Chat Streaming Endpoints +# ========================================================================= + +@router.post("/{instanceId}/sessions/{sessionId}/message/stream") +@limiter.limit("30/minute") +async def sendMessageStream( + request: Request, + instanceId: str, + sessionId: str, + body: SendMessageRequest, + context: RequestContext = Depends(getRequestContext), +): + """Send a text message and stream the coaching response via SSE.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + _validateOwnership(session, context) + + if session.get("status") != CoachingSessionStatus.ACTIVE.value: + raise HTTPException(status_code=400, detail="Session is not active") + + contextId = session.get("contextId") + service = CommcoachService(context.user, mandateId, instanceId) + + # Process in background + asyncio.create_task( + service.processMessage(sessionId, contextId, body.content, interface) + ) + + # Stream events + async def _eventGenerator(): + eventQueue = getSessionEventQueue(sessionId) + try: + timeout_count = 0 + while True: + try: + event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) + yield f"data: {json.dumps(event)}\n\n" + timeout_count = 0 + + eventType = event.get("type") + if eventType in ("complete", "error"): + break + except asyncio.TimeoutError: + yield f"data: {json.dumps({'type': 'ping'})}\n\n" + timeout_count += 1 + if timeout_count > 10: + break + except asyncio.CancelledError: + pass + + return StreamingResponse( + _eventGenerator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + ) + + +@router.post("/{instanceId}/sessions/{sessionId}/audio/stream") +@limiter.limit("20/minute") +async def sendAudioStream( + request: Request, + instanceId: str, + sessionId: str, + context: RequestContext = Depends(getRequestContext), +): + """Send audio, get STT -> coaching response -> TTS via SSE.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + _validateOwnership(session, context) + + if session.get("status") != CoachingSessionStatus.ACTIVE.value: + raise HTTPException(status_code=400, detail="Session is not active") + + audioBody = await request.body() + if not audioBody: + raise HTTPException(status_code=400, detail="No audio data received") + + profile = interface.getProfile(str(context.user.id), instanceId) + language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" + + contextId = session.get("contextId") + service = CommcoachService(context.user, mandateId, instanceId) + + asyncio.create_task( + service.processAudioMessage(sessionId, contextId, audioBody, language, interface) + ) + + async def _eventGenerator(): + eventQueue = getSessionEventQueue(sessionId) + try: + timeout_count = 0 + while True: + try: + event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) + yield f"data: {json.dumps(event)}\n\n" + timeout_count = 0 + + eventType = event.get("type") + if eventType in ("complete", "error"): + break + if eventType == "message" and event.get("data", {}).get("role") == "assistant": + break + except asyncio.TimeoutError: + yield f"data: {json.dumps({'type': 'ping'})}\n\n" + timeout_count += 1 + if timeout_count > 10: + break + except asyncio.CancelledError: + pass + + return StreamingResponse( + _eventGenerator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + ) + + +@router.get("/{instanceId}/sessions/{sessionId}/stream") +@limiter.limit("60/minute") +async def streamSession( + request: Request, + instanceId: str, + sessionId: str, + context: RequestContext = Depends(getRequestContext), +): + """Reconnect to an active session's SSE stream.""" + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + _validateOwnership(session, context) + + async def _eventGenerator(): + yield f"data: {json.dumps({'type': 'sessionState', 'data': session})}\n\n" + + messages = interface.getMessages(sessionId) + for msg in messages: + yield f"data: {json.dumps({'type': 'message', 'data': msg})}\n\n" + + eventQueue = getSessionEventQueue(sessionId) + try: + while True: + try: + event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) + yield f"data: {json.dumps(event)}\n\n" + if event.get("type") in ("complete", "error"): + break + except asyncio.TimeoutError: + yield f"data: {json.dumps({'type': 'ping'})}\n\n" + except asyncio.CancelledError: + pass + + return StreamingResponse( + _eventGenerator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + ) + + +# ========================================================================= +# Task Endpoints +# ========================================================================= + +@router.get("/{instanceId}/contexts/{contextId}/tasks") +@limiter.limit("60/minute") +async def listTasks( + request: Request, + instanceId: str, + contextId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + tasks = interface.getTasks(contextId, userId) + return {"tasks": tasks} + + +@router.post("/{instanceId}/contexts/{contextId}/tasks") +@limiter.limit("30/minute") +async def createTask( + request: Request, + instanceId: str, + contextId: str, + body: CreateTaskRequest, + context: RequestContext = Depends(getRequestContext), +): + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + + ctx = interface.getContext(contextId) + if not ctx: + raise HTTPException(status_code=404, detail="Context not found") + _validateOwnership(ctx, context) + + taskData = CoachingTask( + contextId=contextId, + userId=userId, + mandateId=mandateId, + title=body.title, + description=body.description, + priority=body.priority, + dueDate=body.dueDate, + ).model_dump() + + created = interface.createTask(taskData) + return {"task": created} + + +@router.put("/{instanceId}/tasks/{taskId}") +@limiter.limit("30/minute") +async def updateTask( + request: Request, + instanceId: str, + taskId: str, + body: UpdateTaskRequest, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + task = interface.getTask(taskId) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + _validateOwnership(task, context) + + updates = body.model_dump(exclude_none=True) + updated = interface.updateTask(taskId, updates) + return {"task": updated} + + +@router.put("/{instanceId}/tasks/{taskId}/status") +@limiter.limit("30/minute") +async def updateTaskStatus( + request: Request, + instanceId: str, + taskId: str, + body: UpdateTaskStatusRequest, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + task = interface.getTask(taskId) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + _validateOwnership(task, context) + + updates = {"status": body.status.value} + if body.status == CoachingTaskStatus.DONE: + from modules.shared.timeUtils import getIsoTimestamp + updates["completedAt"] = getIsoTimestamp() + + updated = interface.updateTask(taskId, updates) + return {"task": updated} + + +@router.delete("/{instanceId}/tasks/{taskId}") +@limiter.limit("10/minute") +async def deleteTask( + request: Request, + instanceId: str, + taskId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + task = interface.getTask(taskId) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + _validateOwnership(task, context) + + interface.deleteTask(taskId) + return {"deleted": True} + + +# ========================================================================= +# Dashboard +# ========================================================================= + +@router.get("/{instanceId}/dashboard") +@limiter.limit("60/minute") +async def getDashboard( + request: Request, + instanceId: str, + context: RequestContext = Depends(getRequestContext), +): + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + data = interface.getDashboardData(userId, instanceId) + return {"dashboard": data} + + +# ========================================================================= +# User Profile +# ========================================================================= + +@router.get("/{instanceId}/profile") +@limiter.limit("60/minute") +async def getProfile( + request: Request, + instanceId: str, + context: RequestContext = Depends(getRequestContext), +): + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + profile = interface.getOrCreateProfile(userId, mandateId, instanceId) + return {"profile": profile} + + +@router.put("/{instanceId}/profile") +@limiter.limit("10/minute") +async def updateProfile( + request: Request, + instanceId: str, + body: UpdateProfileRequest, + context: RequestContext = Depends(getRequestContext), +): + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + userId = str(context.user.id) + + profile = interface.getOrCreateProfile(userId, mandateId, instanceId) + updates = body.model_dump(exclude_none=True) + updated = interface.updateProfile(profile.get("id"), updates) + return {"profile": updated} + + +# ========================================================================= +# Voice Endpoints +# ========================================================================= + +@router.get("/{instanceId}/voice/languages") +@limiter.limit("30/minute") +async def getVoiceLanguages( + request: Request, + instanceId: str, + context: RequestContext = Depends(getRequestContext), +): + mandateId = _validateInstanceAccess(instanceId, context) + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + voiceInterface = getVoiceInterface(context.user, mandateId) + languagesResult = await voiceInterface.getAvailableLanguages() + languageList = languagesResult.get("languages", []) if isinstance(languagesResult, dict) else languagesResult + return {"languages": languageList} + + +@router.get("/{instanceId}/voice/voices") +@limiter.limit("30/minute") +async def getVoiceVoices( + request: Request, + instanceId: str, + language: str = "de-DE", + context: RequestContext = Depends(getRequestContext), +): + mandateId = _validateInstanceAccess(instanceId, context) + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + voiceInterface = getVoiceInterface(context.user, mandateId) + voicesResult = await voiceInterface.getAvailableVoices(language) + voiceList = voicesResult.get("voices", []) if isinstance(voicesResult, dict) else voicesResult + return {"voices": voiceList} + + +@router.post("/{instanceId}/voice/tts") +@limiter.limit("10/minute") +async def testVoice( + request: Request, + instanceId: str, + context: RequestContext = Depends(getRequestContext), +): + """TTS preview / voice test.""" + mandateId = _validateInstanceAccess(instanceId, context) + body = await request.json() + text = body.get("text", "Hallo, ich bin dein Coaching-Assistent.") + language = body.get("language", "de-DE") + voiceId = body.get("voiceId") + + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + voiceInterface = getVoiceInterface(context.user, mandateId) + + try: + result = await voiceInterface.textToSpeech(text=text, languageCode=language, voiceName=voiceId) + if result and isinstance(result, dict): + audioContent = result.get("audioContent") + if audioContent: + audioB64 = base64.b64encode( + audioContent if isinstance(audioContent, bytes) else audioContent.encode() + ).decode() + return {"success": True, "audio": audioB64, "format": "mp3", "text": text} + return {"success": False, "error": "TTS returned no audio"} + except Exception as e: + logger.error(f"Voice test failed: {e}") + raise HTTPException(status_code=500, detail=f"TTS test failed: {str(e)}") diff --git a/modules/features/commcoach/serviceCommcoach.py b/modules/features/commcoach/serviceCommcoach.py new file mode 100644 index 00000000..0778a978 --- /dev/null +++ b/modules/features/commcoach/serviceCommcoach.py @@ -0,0 +1,746 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +CommCoach Service - Coaching Orchestration. +Manages the coaching pipeline: message processing, AI calls, scoring, task extraction. +""" + +import re +import logging +import json +import asyncio +from typing import Optional, Dict, Any, List + +from modules.datamodels.datamodelUam import User +from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum +from modules.shared.timeUtils import getIsoTimestamp + +from .datamodelCommcoach import ( + CoachingMessage, CoachingMessageRole, CoachingMessageContentType, + CoachingSessionStatus, CoachingTask, CoachingTaskPriority, + CoachingScore, CoachingScoreTrend, +) +from . import serviceCommcoachAi as aiPrompts +from .serviceCommcoachAi import ( + COMPRESSION_MESSAGE_THRESHOLD, + COMPRESSION_RECENT_COUNT, + COMPRESSION_MAX_MESSAGES_FETCH, + buildResumeGreetingPrompt, +) +from .serviceCommcoachContextRetrieval import ( + detectIntent, + RetrievalIntent, + buildSessionSummariesForPrompt, + findSessionByDate, + searchSessionsByTopic, + _parseDateFromMessage, + PREVIOUS_SESSION_SUMMARIES_COUNT, + ROLLING_OVERVIEW_SESSION_THRESHOLD, + ROLLING_OVERVIEW_EVERY_N_SESSIONS, +) + +logger = logging.getLogger(__name__) + + +def _stripMarkdownForTts(text: str) -> str: + """Strip markdown formatting so TTS reads clean speech text.""" + t = text + t = re.sub(r'\*\*(.+?)\*\*', r'\1', t) + t = re.sub(r'\*(.+?)\*', r'\1', t) + t = re.sub(r'__(.+?)__', r'\1', t) + t = re.sub(r'_(.+?)_', r'\1', t) + t = re.sub(r'`[^`]+`', lambda m: m.group(0)[1:-1], t) + t = re.sub(r'^#{1,6}\s*', '', t, flags=re.MULTILINE) + t = re.sub(r'^\s*[-*+]\s+', '', t, flags=re.MULTILINE) + t = re.sub(r'^\s*\d+\.\s+', '', t, flags=re.MULTILINE) + t = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', t) + t = re.sub(r'\n{3,}', '\n\n', t) + return t.strip() + + +# Session event queues for SSE streaming +_sessionEvents: Dict[str, asyncio.Queue] = {} + + +async def emitSessionEvent(sessionId: str, eventType: str, data: Any): + """Emit an event to the session's SSE stream.""" + if sessionId not in _sessionEvents: + _sessionEvents[sessionId] = asyncio.Queue() + await _sessionEvents[sessionId].put({ + "type": eventType, + "data": data, + "timestamp": getIsoTimestamp(), + }) + + +def getSessionEventQueue(sessionId: str) -> asyncio.Queue: + if sessionId not in _sessionEvents: + _sessionEvents[sessionId] = asyncio.Queue() + return _sessionEvents[sessionId] + + +def cleanupSessionEvents(sessionId: str): + _sessionEvents.pop(sessionId, None) + + +class CommcoachService: + """Coaching orchestrator: processes messages, calls AI, extracts tasks and scores.""" + + def __init__(self, currentUser: User, mandateId: str, instanceId: str): + self.currentUser = currentUser + self.mandateId = mandateId + self.instanceId = instanceId + self.userId = str(currentUser.id) + + async def processMessage(self, sessionId: str, contextId: str, userContent: str, interface) -> Dict[str, Any]: + """ + Process a user message through the coaching pipeline: + 1. Store user message + 2. Build context with history + 3. Call AI for coaching response + 4. Store assistant message + 5. Emit SSE events + """ + from . import interfaceFeatureCommcoach as interfaceDb + + # Store user message + userMsg = CoachingMessage( + sessionId=sessionId, + contextId=contextId, + userId=self.userId, + role=CoachingMessageRole.USER, + content=userContent, + contentType=CoachingMessageContentType.TEXT, + ).model_dump() + createdUserMsg = interface.createMessage(userMsg) + + await emitSessionEvent(sessionId, "message", { + "id": createdUserMsg.get("id"), + "role": "user", + "content": userContent, + "createdAt": createdUserMsg.get("createdAt"), + }) + + # Build context + context = interface.getContext(contextId) + if not context: + logger.error(f"Context {contextId} not found") + return createdUserMsg + + messages = interface.getRecentMessages(sessionId, count=COMPRESSION_MAX_MESSAGES_FETCH) + session = interface.getSession(sessionId) + compressedSummary = session.get("compressedHistorySummary") if session else None + compressedUpTo = session.get("compressedHistoryUpToMessageCount") if session else None + + earlierSummary, previousMessages = aiPrompts.prepareMessagesForPrompt( + messages, compressedSummary, compressedUpTo + ) + + if earlierSummary is None and len(messages) > COMPRESSION_MESSAGE_THRESHOLD: + toSummarizeCount = len(messages) - COMPRESSION_RECENT_COUNT + if toSummarizeCount > 0: + toSummarize = messages[:toSummarizeCount] + try: + summaryPrompt = aiPrompts.buildEarlierConversationSummaryPrompt(toSummarize) + summaryResponse = await self._callAi( + "Du fasst Coaching-Gespraeche praezise zusammen.", summaryPrompt + ) + if summaryResponse and summaryResponse.errorCount == 0 and summaryResponse.content: + earlierSummary = summaryResponse.content.strip() + interface.updateSession(sessionId, { + "compressedHistorySummary": earlierSummary, + "compressedHistoryUpToMessageCount": toSummarizeCount, + }) + previousMessages = messages[-COMPRESSION_RECENT_COUNT:] + logger.info(f"Session {sessionId}: Compressed history ({toSummarizeCount} msgs -> {len(earlierSummary)} chars)") + except Exception as e: + logger.warning(f"History compression failed for session {sessionId}: {e}") + previousMessages = messages[-20:] + + tasks = interface.getTasks(contextId, self.userId) + + retrievalResult = await self._buildRetrievalContext( + contextId, sessionId, userContent, context, interface + ) + + systemPrompt = aiPrompts.buildCoachingSystemPrompt( + context, + previousMessages, + tasks, + previousSessionSummaries=retrievalResult.get("previousSessionSummaries"), + earlierSummary=earlierSummary, + rollingOverview=retrievalResult.get("rollingOverview"), + retrievedSession=retrievalResult.get("retrievedSession"), + retrievedByTopic=retrievalResult.get("retrievedByTopic"), + ) + + if retrievalResult.get("intent") == RetrievalIntent.SUMMARIZE_ALL: + systemPrompt += "\n\nWICHTIG: Der Benutzer moechte eine Gesamtzusammenfassung. Erstelle eine umfassende Zusammenfassung aller genannten Sessions und der aktuellen Session." + + # Call AI + await emitSessionEvent(sessionId, "status", {"label": "Coach denkt nach..."}) + + try: + aiResponse = await self._callAi(systemPrompt, userContent) + except Exception as e: + logger.error(f"AI call failed for session {sessionId}: {e}") + await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"}) + return createdUserMsg + + responseText = aiResponse.content.strip() if aiResponse and aiResponse.errorCount == 0 else "Entschuldigung, ich konnte gerade nicht antworten. Bitte versuche es erneut." + + # Store assistant message + assistantMsg = CoachingMessage( + sessionId=sessionId, + contextId=contextId, + userId=self.userId, + role=CoachingMessageRole.ASSISTANT, + content=responseText, + contentType=CoachingMessageContentType.TEXT, + ).model_dump() + createdAssistantMsg = interface.createMessage(assistantMsg) + + # Update session message count + messages = interface.getMessages(sessionId) + interface.updateSession(sessionId, {"messageCount": len(messages)}) + + await emitSessionEvent(sessionId, "message", { + "id": createdAssistantMsg.get("id"), + "role": "assistant", + "content": responseText, + "createdAt": createdAssistantMsg.get("createdAt"), + }) + + if responseText: + try: + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + import base64 + voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) + profile = interface.getProfile(self.userId, self.instanceId) + language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" + voiceName = profile.get("preferredVoice") if profile else None + ttsResult = await voiceInterface.textToSpeech( + text=_stripMarkdownForTts(responseText), + languageCode=language, + voiceName=voiceName, + ) + if ttsResult and isinstance(ttsResult, dict): + audioBytes = ttsResult.get("audioContent") + if audioBytes: + audioB64 = base64.b64encode( + audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() + ).decode() + await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"}) + except Exception as e: + logger.warning(f"TTS failed for text message session {sessionId}: {e}") + + await emitSessionEvent(sessionId, "complete", {}) + return createdAssistantMsg + + async def processSessionOpening(self, sessionId: str, contextId: str, interface) -> Dict[str, Any]: + """ + Generate and stream the opening greeting for a new session. + Emits status, message, and complete events to the session queue. + """ + await emitSessionEvent(sessionId, "status", {"label": "Coach bereitet sich vor..."}) + + context = interface.getContext(contextId) + if not context: + logger.error(f"Context {contextId} not found") + await emitSessionEvent(sessionId, "error", {"message": "Context not found"}) + await emitSessionEvent(sessionId, "complete", {}) + return {} + + tasks = interface.getTasks(contextId, self.userId) + previousMessages = [] + + allSessions = interface.getSessions(contextId, self.userId) + previousSessionSummaries = buildSessionSummariesForPrompt( + allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT + ) + + systemPrompt = aiPrompts.buildCoachingSystemPrompt( + context, previousMessages, tasks, previousSessionSummaries=previousSessionSummaries + ) + openingUserPrompt = "Beginne die Coaching-Session mit einer kurzen Begruesssung, fasse in einem Satz zusammen wo wir stehen (falls vorherige Sessions), und stelle eine gezielte Einstiegsfrage zum Thema." + + try: + aiResponse = await self._callAi(systemPrompt, openingUserPrompt) + except Exception as e: + logger.error(f"AI opening failed for session {sessionId}: {e}") + await emitSessionEvent(sessionId, "error", {"message": f"AI error: {str(e)}"}) + await emitSessionEvent(sessionId, "complete", {}) + return {} + + openingContent = ( + aiResponse.content.strip() + if aiResponse and aiResponse.errorCount == 0 + else f"Willkommen zur Coaching-Session zum Thema \"{context.get('title')}\". Was moechtest du heute besprechen?" + ) + + assistantMsg = CoachingMessage( + sessionId=sessionId, + contextId=contextId, + userId=self.userId, + role=CoachingMessageRole.ASSISTANT, + content=openingContent, + contentType=CoachingMessageContentType.TEXT, + ).model_dump() + createdMsg = interface.createMessage(assistantMsg) + interface.updateSession(sessionId, {"messageCount": 1}) + + await emitSessionEvent(sessionId, "message", { + "id": createdMsg.get("id"), + "sessionId": sessionId, + "contextId": contextId, + "role": "assistant", + "content": openingContent, + "contentType": "text", + "createdAt": createdMsg.get("createdAt"), + }) + if openingContent: + try: + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + import base64 + voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) + profile = interface.getProfile(self.userId, self.instanceId) + language = profile.get("preferredLanguage", "de-DE") if profile else "de-DE" + voiceName = profile.get("preferredVoice") if profile else None + ttsResult = await voiceInterface.textToSpeech( + text=_stripMarkdownForTts(openingContent), + languageCode=language, + voiceName=voiceName, + ) + if ttsResult and isinstance(ttsResult, dict): + audioBytes = ttsResult.get("audioContent") + if audioBytes: + audioB64 = base64.b64encode( + audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() + ).decode() + await emitSessionEvent(sessionId, "ttsAudio", {"audio": audioB64, "format": "mp3"}) + except Exception as e: + logger.warning(f"TTS failed for opening: {e}") + await emitSessionEvent(sessionId, "complete", {}) + + logger.info(f"CommCoach session opening completed: {sessionId}") + return createdMsg + + async def generateResumeGreeting(self, sessionId: str, contextId: str, messages: list, interface) -> str: + """Generate a follow-up greeting when user returns to an active session.""" + context = interface.getContext(contextId) + if not context: + raise ValueError(f"Context {contextId} not found for resume greeting") + contextTitle = context.get("title", "Coaching") + prompt = buildResumeGreetingPrompt(messages, contextTitle) + aiResponse = await self._callAi( + "Du bist ein freundlicher Coach. Antworte kurz und einladend.", + prompt, + ) + if not aiResponse or aiResponse.errorCount > 0 or not aiResponse.content: + raise RuntimeError(f"AI resume greeting failed: {getattr(aiResponse, 'errorMessage', 'no content')}") + return aiResponse.content.strip() + + async def processAudioMessage(self, sessionId: str, contextId: str, audioContent: bytes, language: str, interface) -> Dict[str, Any]: + """Process an audio message: STT -> coaching pipeline -> TTS response.""" + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + + await emitSessionEvent(sessionId, "status", {"label": "Sprache wird erkannt..."}) + + voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) + sttResult = await voiceInterface.speechToText( + audioContent=audioContent, + language=language, + skipFallbacks=True, + ) + + transcribedText = "" + if sttResult and isinstance(sttResult, dict): + transcribedText = sttResult.get("text", "") + elif isinstance(sttResult, str): + transcribedText = sttResult + + if not transcribedText.strip(): + sttError = sttResult.get("error", "Unbekannter Fehler") if isinstance(sttResult, dict) else "Unbekannter Fehler" + msg = f"Sprache konnte nicht erkannt werden. ({sttError})" + await emitSessionEvent(sessionId, "error", {"message": msg, "detail": sttError}) + return {} + + # Process through normal pipeline + result = await self.processMessage(sessionId, contextId, transcribedText, interface) + + # Generate TTS for the response + assistantContent = result.get("content", "") + if assistantContent: + await emitSessionEvent(sessionId, "status", {"label": "Antwort wird gesprochen..."}) + try: + profile = interface.getProfile(self.userId, self.instanceId) + voiceName = profile.get("preferredVoice") if profile else None + + ttsResult = await voiceInterface.textToSpeech( + text=_stripMarkdownForTts(assistantContent), + languageCode=language, + voiceName=voiceName, + ) + if ttsResult and isinstance(ttsResult, dict): + import base64 + audioBytes = ttsResult.get("audioContent") + if audioBytes: + audioB64 = base64.b64encode( + audioBytes if isinstance(audioBytes, bytes) else audioBytes.encode() + ).decode() + await emitSessionEvent(sessionId, "ttsAudio", { + "audio": audioB64, + "format": "mp3", + }) + except Exception as e: + logger.warning(f"TTS failed for session {sessionId}: {e}") + + return result + + async def completeSession(self, sessionId: str, interface) -> Dict[str, Any]: + """ + Complete a session: + 1. Generate summary + 2. Extract tasks + 3. Generate scores + 4. Update context stats + 5. Send email summary + """ + session = interface.getSession(sessionId) + if not session: + return {} + + contextId = session.get("contextId") + context = interface.getContext(contextId) if contextId else None + messages = interface.getMessages(sessionId) + + if len(messages) < 2: + interface.updateSession(sessionId, { + "status": CoachingSessionStatus.COMPLETED.value, + "endedAt": getIsoTimestamp(), + }) + return session + + # Generate summary + try: + summaryPrompt = aiPrompts.buildSummaryPrompt(messages, context.get("title", "Coaching")) + summaryResponse = await self._callAi("Du bist ein praeziser Zusammenfasser.", summaryPrompt) + summary = summaryResponse.content.strip() if summaryResponse and summaryResponse.errorCount == 0 else None + except Exception as e: + logger.warning(f"Summary generation failed: {e}") + summary = None + + keyTopics = None + if summary: + try: + keyTopicsPrompt = aiPrompts.buildKeyTopicsExtractionPrompt(summary, messages) + keyTopicsResponse = await self._callAi( + "Du extrahierst Kernthemen aus Zusammenfassungen.", keyTopicsPrompt + ) + if keyTopicsResponse and keyTopicsResponse.errorCount == 0 and keyTopicsResponse.content: + parsed = aiPrompts.parseJsonResponse(keyTopicsResponse.content, []) + if isinstance(parsed, list) and parsed: + keyTopics = json.dumps([str(t) for t in parsed[:5]]) + except Exception as e: + logger.warning(f"Key topics extraction failed: {e}") + + # Extract tasks + try: + taskPrompt = aiPrompts.buildTaskExtractionPrompt(messages) + taskResponse = await self._callAi("Du extrahierst Aufgaben aus Gespraechen.", taskPrompt) + if taskResponse and taskResponse.errorCount == 0: + extractedTasks = aiPrompts.parseJsonResponse(taskResponse.content, []) + if isinstance(extractedTasks, list): + for taskData in extractedTasks[:3]: + if isinstance(taskData, dict) and taskData.get("title"): + newTask = CoachingTask( + contextId=contextId, + sessionId=sessionId, + userId=self.userId, + mandateId=self.mandateId, + title=taskData["title"], + description=taskData.get("description"), + priority=taskData.get("priority", "medium"), + ).model_dump() + created = interface.createTask(newTask) + await emitSessionEvent(sessionId, "taskCreated", created) + except Exception as e: + logger.warning(f"Task extraction failed: {e}") + + # Generate scores + try: + scorePrompt = aiPrompts.buildScoringPrompt(messages, context.get("category", "custom") if context else "custom") + scoreResponse = await self._callAi("Du bewertest Kommunikationskompetenz.", scorePrompt) + competenceScore = None + if scoreResponse and scoreResponse.errorCount == 0: + scores = aiPrompts.parseJsonResponse(scoreResponse.content, []) + if isinstance(scores, list): + scoreValues = [] + for scoreData in scores: + if isinstance(scoreData, dict) and "dimension" in scoreData and "score" in scoreData: + newScore = CoachingScore( + contextId=contextId, + sessionId=sessionId, + userId=self.userId, + mandateId=self.mandateId, + dimension=scoreData["dimension"], + score=float(scoreData["score"]), + trend=scoreData.get("trend", "stable"), + evidence=scoreData.get("evidence"), + ).model_dump() + interface.createScore(newScore) + scoreValues.append(float(scoreData["score"])) + await emitSessionEvent(sessionId, "scoreUpdate", scoreData) + if scoreValues: + competenceScore = sum(scoreValues) / len(scoreValues) + except Exception as e: + logger.warning(f"Scoring failed: {e}") + competenceScore = None + + # Calculate duration + startedAt = session.get("startedAt", "") + durationSeconds = 0 + if startedAt: + try: + from datetime import datetime + start = datetime.fromisoformat(startedAt.replace("Z", "+00:00")) + end = datetime.now(start.tzinfo) if start.tzinfo else datetime.now() + durationSeconds = int((end - start).total_seconds()) + except Exception: + pass + + # Update session + sessionUpdates = { + "status": CoachingSessionStatus.COMPLETED.value, + "endedAt": getIsoTimestamp(), + "summary": summary, + "durationSeconds": durationSeconds, + "messageCount": len(messages), + } + if competenceScore is not None: + sessionUpdates["competenceScore"] = round(competenceScore, 1) + if keyTopics is not None: + sessionUpdates["keyTopics"] = keyTopics + interface.updateSession(sessionId, sessionUpdates) + + # Update context stats + if contextId: + allSessions = interface.getSessions(contextId, self.userId) + completedCount = len([s for s in allSessions if s.get("status") == CoachingSessionStatus.COMPLETED.value]) + interface.updateContext(contextId, { + "sessionCount": completedCount, + "lastSessionAt": getIsoTimestamp(), + }) + + # Update user profile streak + self._updateStreak(interface) + + # Send email summary + if summary: + await self._sendSessionEmail(session, summary, interface) + + await emitSessionEvent(sessionId, "sessionState", { + "status": "completed", + "summary": summary, + "competenceScore": competenceScore, + }) + await emitSessionEvent(sessionId, "complete", {}) + + return interface.getSession(sessionId) + + def _updateStreak(self, interface): + """Update the user's streak in their profile.""" + try: + profile = interface.getProfile(self.userId, self.instanceId) + if not profile: + profile = interface.getOrCreateProfile(self.userId, self.mandateId, self.instanceId) + + from datetime import datetime, timedelta + + lastSessionAt = profile.get("lastSessionAt") + currentStreak = profile.get("streakDays", 0) + longestStreak = profile.get("longestStreak", 0) + totalSessions = profile.get("totalSessions", 0) + + today = datetime.now().date() + isConsecutive = False + + if lastSessionAt: + try: + lastDate = datetime.fromisoformat(lastSessionAt.replace("Z", "+00:00")).date() + diff = (today - lastDate).days + if diff == 1: + isConsecutive = True + elif diff == 0: + isConsecutive = True # Same day, maintain streak + except Exception: + pass + + newStreak = (currentStreak + 1) if isConsecutive else 1 + newLongest = max(longestStreak, newStreak) + + interface.updateProfile(profile.get("id"), { + "streakDays": newStreak, + "longestStreak": newLongest, + "totalSessions": totalSessions + 1, + "lastSessionAt": getIsoTimestamp(), + }) + except Exception as e: + logger.warning(f"Failed to update streak: {e}") + + async def _sendSessionEmail(self, session: Dict[str, Any], summary: str, interface): + """Send session summary via email if enabled.""" + try: + profile = interface.getProfile(self.userId, self.instanceId) + if profile and not profile.get("emailSummaryEnabled", True): + return + + from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface + from modules.interfaces.interfaceDbApp import getRootInterface + + rootInterface = getRootInterface() + user = rootInterface.getUser(self.userId) + if not user or not user.email: + return + + messaging = getMessagingInterface() + subject = f"Coaching-Session Zusammenfassung: {session.get('contextId', 'Session')}" + htmlMessage = f""" +

Coaching-Session Zusammenfassung

+

{summary.replace(chr(10), '
')}

+
+

Diese Zusammenfassung wurde automatisch erstellt.

+ """ + + messaging.send("email", user.email, subject, htmlMessage) + interface.updateSession(session.get("id"), {"emailSent": True}) + logger.info(f"Session summary email sent to {user.email}") + except Exception as e: + logger.warning(f"Failed to send session email: {e}") + + async def _buildRetrievalContext( + self, + contextId: str, + sessionId: str, + userContent: str, + context: Dict[str, Any], + interface, + ) -> Dict[str, Any]: + """ + Build retrieval context based on user intent. + Returns: previousSessionSummaries, rollingOverview, retrievedSession, retrievedByTopic, intent, sessionSummaries. + """ + intent = detectIntent(userContent) + allSessions = interface.getSessions(contextId, self.userId) + completedSessions = [s for s in allSessions if s.get("status") == CoachingSessionStatus.COMPLETED.value] + + for s in completedSessions: + startedAt = s.get("startedAt") or s.get("createdAt") or "" + if startedAt: + try: + from datetime import datetime + dt = datetime.fromisoformat(str(startedAt).replace("Z", "+00:00")) + s["date"] = dt.strftime("%d.%m.%Y") + except Exception: + s["date"] = "" + + result = { + "intent": intent, + "previousSessionSummaries": [], + "rollingOverview": None, + "retrievedSession": None, + "retrievedByTopic": None, + "sessionSummaries": [], + } + + ctx = interface.getContext(contextId) + rollingOverview = ctx.get("rollingOverview") if ctx else None + rollingUpTo = ctx.get("rollingOverviewUpToSessionCount") if ctx else None + + if intent == RetrievalIntent.SUMMARIZE_ALL: + result["previousSessionSummaries"] = buildSessionSummariesForPrompt( + allSessions, excludeSessionId=sessionId, limit=20 + ) + result["sessionSummaries"] = result["previousSessionSummaries"] + if len(completedSessions) >= ROLLING_OVERVIEW_SESSION_THRESHOLD and rollingOverview: + result["rollingOverview"] = rollingOverview + + elif intent == RetrievalIntent.RECALL_SESSION: + targetDate = _parseDateFromMessage(userContent) + retrieved = findSessionByDate(completedSessions, targetDate) + if retrieved: + result["retrievedSession"] = retrieved + logger.info(f"Session recall: found session {retrieved.get('id')} for date {targetDate}") + result["previousSessionSummaries"] = buildSessionSummariesForPrompt( + allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT + ) + if rollingOverview: + result["rollingOverview"] = rollingOverview + + elif intent == RetrievalIntent.RECALL_TOPIC: + retrieved = searchSessionsByTopic(completedSessions, userContent) + result["retrievedByTopic"] = retrieved + if retrieved: + logger.info(f"Topic recall: found {len(retrieved)} sessions for query") + result["previousSessionSummaries"] = buildSessionSummariesForPrompt( + allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT + ) + if rollingOverview: + result["rollingOverview"] = rollingOverview + + else: + result["previousSessionSummaries"] = buildSessionSummariesForPrompt( + allSessions, excludeSessionId=sessionId, limit=PREVIOUS_SESSION_SUMMARIES_COUNT + ) + if len(completedSessions) >= ROLLING_OVERVIEW_SESSION_THRESHOLD: + if rollingOverview and rollingUpTo is not None and rollingUpTo >= len(completedSessions) - 3: + result["rollingOverview"] = rollingOverview + else: + try: + toSummarize = completedSessions[ROLLING_OVERVIEW_EVERY_N_SESSIONS:] + toSummarize = toSummarize[:ROLLING_OVERVIEW_EVERY_N_SESSIONS * 2] + if len(toSummarize) >= ROLLING_OVERVIEW_EVERY_N_SESSIONS: + summariesForOverview = buildSessionSummariesForPrompt( + toSummarize, limit=len(toSummarize), + ) + overviewPrompt = aiPrompts.buildRollingOverviewPrompt( + summariesForOverview, context.get("title", "Coaching") + ) + overviewResponse = await self._callAi( + "Du fasst Coaching-Sessions kompakt zusammen.", overviewPrompt + ) + if overviewResponse and overviewResponse.errorCount == 0 and overviewResponse.content: + newOverview = overviewResponse.content.strip() + interface.updateContext(contextId, { + "rollingOverview": newOverview, + "rollingOverviewUpToSessionCount": len(completedSessions), + }) + result["rollingOverview"] = newOverview + logger.info(f"Context {contextId}: Rolling overview updated ({len(toSummarize)} sessions)") + except Exception as e: + logger.warning(f"Rolling overview failed for context {contextId}: {e}") + + return result + + async def _callAi(self, systemPrompt: str, userPrompt: str): + """Call the AI service with the given prompts.""" + from modules.services.serviceAi.mainServiceAi import AiService + + serviceContext = type('Ctx', (), { + 'user': self.currentUser, + 'mandateId': self.mandateId, + 'featureInstanceId': self.instanceId, + 'featureCode': 'commcoach', + })() + aiService = AiService(serviceCenter=serviceContext) + await aiService.ensureAiObjectsInitialized() + + aiRequest = AiCallRequest( + prompt=userPrompt, + context=systemPrompt, + options=AiCallOptions( + operationType=OperationTypeEnum.DATA_ANALYSE, + priority=PriorityEnum.QUALITY, + ) + ) + return await aiService.callAi(aiRequest) diff --git a/modules/features/commcoach/serviceCommcoachAi.py b/modules/features/commcoach/serviceCommcoachAi.py new file mode 100644 index 00000000..ea58488b --- /dev/null +++ b/modules/features/commcoach/serviceCommcoachAi.py @@ -0,0 +1,363 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +CommCoach AI Service. +Handles system prompts, diagnostic question generation, session summarization, and scoring. +""" + +import logging +import json +from typing import Optional, Dict, Any, List, Tuple + +logger = logging.getLogger(__name__) + +# Compression thresholds (Teamsbot-style) +COMPRESSION_MESSAGE_THRESHOLD = 25 +COMPRESSION_RECENT_COUNT = 15 +COMPRESSION_MAX_MESSAGES_FETCH = 80 + + +def buildResumeGreetingPrompt(messages: List[Dict[str, Any]], contextTitle: str) -> str: + """Build prompt for AI to generate a follow-up greeting when user returns to session.""" + recent = messages[-6:] if len(messages) > 6 else messages + conversation = "" + for msg in recent: + role = "Benutzer" if msg.get("role") == "user" else "Coach" + conversation += f"\n{role}: {msg.get('content', '')[:200]}" + return f"""Der User kehrt zur laufenden Coaching-Session zum Thema "{contextTitle}" zurueck. +Bisheriger Verlauf: +{conversation} + +Erstelle eine kurze, freundliche Begruesssung fuer den Wiedereinstieg (2-3 Saetze): +- Begruesse den User zurueck +- Fasse in einem Satz zusammen, worum es zuletzt ging +- Lade ein, dort weiterzumachen oder eine neue Frage zu stellen + +Antworte NUR mit der Begruesssung, keine Erklaerungen.""" + + +def buildEarlierConversationSummaryPrompt(messages: List[Dict[str, Any]]) -> str: + """Build prompt to summarize older messages for long-session compression.""" + conversation = "" + for msg in messages: + role = "Benutzer" if msg.get("role") == "user" else "Coach" + conversation += f"\n{role}: {msg.get('content', '')}" + + return f"""Fasse das folgende Coaching-Gespraech in 4-6 Saetzen zusammen. +Behalte: Kernthemen, wichtige Erkenntnisse, erwaehnte Aufgaben, emotionale Wendepunkte, Fortschritte. +Entferne Wiederholungen und Fuelltext. +Antworte NUR mit der Zusammenfassung, keine Erklaerungen. + +Gespraech: +{conversation}""" + + +def prepareMessagesForPrompt( + messages: List[Dict[str, Any]], + compressedSummary: Optional[str], + compressedUpToCount: Optional[int], +) -> Tuple[Optional[str], List[Dict[str, Any]]]: + """ + Prepare message history for the coaching prompt. + Returns (earlierSummary, recentMessages). + If messages <= THRESHOLD: (None, messages). + If messages > THRESHOLD: (summary or None, last RECENT_COUNT messages). + Cached summary is reused when compressedUpToCount >= len(toSummarize). + """ + if len(messages) <= COMPRESSION_MESSAGE_THRESHOLD: + return None, messages + + toSummarizeCount = len(messages) - COMPRESSION_RECENT_COUNT + if toSummarizeCount <= 0: + return None, messages + + toSummarize = messages[:toSummarizeCount] + recent = messages[-COMPRESSION_RECENT_COUNT:] + + try: + upTo = int(compressedUpToCount) if compressedUpToCount is not None else None + except (TypeError, ValueError): + upTo = None + if compressedSummary and upTo is not None and upTo >= toSummarizeCount: + return compressedSummary, recent + + return None, messages + + +def buildCoachingSystemPrompt( + context: Dict[str, Any], + previousMessages: List[Dict[str, Any]], + tasks: List[Dict[str, Any]], + previousSessionSummaries: Optional[List[Dict[str, Any]]] = None, + earlierSummary: Optional[str] = None, + rollingOverview: Optional[str] = None, + retrievedSession: Optional[Dict[str, Any]] = None, + retrievedByTopic: Optional[List[Dict[str, Any]]] = None, +) -> str: + """Build the system prompt for a coaching session, including context history, tasks, and session continuity.""" + contextTitle = context.get("title", "General Coaching") + contextCategory = context.get("category", "custom") + contextDescription = context.get("description", "") + goalsRaw = context.get("goals") + insightsRaw = context.get("insights") + + goals = _parseJsonField(goalsRaw, []) + insights = _parseJsonField(insightsRaw, []) + + openTasks = [t for t in tasks if t.get("status") in ("open", "inProgress")] + doneTasks = [t for t in tasks if t.get("status") == "done"] + + summaries = previousSessionSummaries or [] + + prompt = f"""Du bist ein erfahrener Kommunikations-Coach fuer Fuehrungskraefte. Du arbeitest mit dem Benutzer am Thema: "{contextTitle}" (Kategorie: {contextCategory}). + +Deine Rolle: +- Stelle gezielte diagnostische Rueckfragen, um das Problem/Thema besser zu verstehen +- Gib konkrete, praxisnahe Tipps und Uebungen +- Baue auf fruehere Sessions auf (Kontext-Kontinuitaet) +- Erkenne Fortschritte und benenne sie +- Schlage am Ende der Session konkrete naechste Schritte vor (als Tasks) +- Kommuniziere empathisch, klar und auf Augenhoehe + +Kommunikationsstil: +- Duze den Benutzer +- Sei direkt aber wertschaetzend +- Verwende keine Emojis +- Antworte in der Sprache des Benutzers +- Halte Antworten fokussiert (max 3-4 Absaetze) +- WICHTIG: Schreibe reinen Redetext ohne jegliche Formatierung. Kein Markdown, keine Sternchen, keine Hashes, keine Aufzaehlungszeichen, keine Backticks. Deine Antworten werden direkt vorgelesen.""" + + if contextDescription: + prompt += f"\n\nKontext-Beschreibung: {contextDescription}" + + if goals: + goalTexts = [g.get("text", g) if isinstance(g, dict) else str(g) for g in goals] + prompt += f"\n\nZiele des Benutzers:\n" + "\n".join(f"- {g}" for g in goalTexts) + + if insights: + insightTexts = [i.get("text", i) if isinstance(i, dict) else str(i) for i in insights[-5:]] + prompt += f"\n\nBisherige Erkenntnisse:\n" + "\n".join(f"- {i}" for i in insightTexts) + + if rollingOverview: + prompt += f"\n\nGesamtueberblick bisheriger Sessions:\n{rollingOverview[:600]}" + + if summaries: + prompt += "\n\nBisherige Sessions (Zusammenfassungen):" + for s in summaries[-5:]: + summary = s.get("summary", s.get("text", "")) + dateStr = s.get("date", "") + prefix = f"[{dateStr}] " if dateStr else "" + if summary: + prompt += f"\n- {prefix}{summary[:350]}" + + if retrievedSession: + dateStr = "" + startedAt = retrievedSession.get("startedAt") or retrievedSession.get("createdAt") + if startedAt: + try: + from datetime import datetime + dt = datetime.fromisoformat(str(startedAt).replace("Z", "+00:00")) + dateStr = dt.strftime("%d.%m.%Y") + except Exception: + pass + prompt += f"\n\nVom Benutzer angefragte Session ({dateStr}):" + prompt += f"\n{retrievedSession.get('summary', '')[:500]}" + + if retrievedByTopic: + prompt += "\n\nRelevante Sessions zum angefragten Thema:" + for s in retrievedByTopic[:3]: + summary = s.get("summary", "") + dateStr = s.get("date", "") + if summary: + prompt += f"\n- [{dateStr}] {summary[:300]}" + + if openTasks: + prompt += "\n\nOffene Aufgaben:" + for t in openTasks: + prompt += f"\n- [{t.get('status')}] {t.get('title')}" + + if doneTasks: + prompt += f"\n\nAbgeschlossene Aufgaben: {len(doneTasks)}" + + if earlierSummary: + prompt += f"\n\nAelterer Gespraechsverlauf (zusammengefasst):\n{earlierSummary[:800]}" + + if previousMessages: + prompt += "\n\nVorige Nachrichten dieser Session (Kontext):" + for msg in previousMessages[-12:]: + role = msg.get("role", "user") + content = msg.get("content", "")[:400] + prompt += f"\n[{role}]: {content}" + + return prompt + + +def buildSummaryPrompt(messages: List[Dict[str, Any]], contextTitle: str) -> str: + """Build a prompt to generate a session summary.""" + conversation = "" + for msg in messages: + role = "Benutzer" if msg.get("role") == "user" else "Coach" + conversation += f"\n{role}: {msg.get('content', '')}" + + return f"""Erstelle eine kompakte Zusammenfassung dieser Coaching-Session zum Thema "{contextTitle}". + +Struktur: +1. **Kernthema**: Was wurde besprochen (1-2 Saetze) +2. **Erkenntnisse**: Was wurde erkannt/gelernt (Stichpunkte) +3. **Naechste Schritte**: Konkrete Aufgaben fuer den Benutzer (Stichpunkte) +4. **Fortschritt**: Einschaetzung des Fortschritts + +Gespraech: +{conversation} + +Antworte auf Deutsch, sachlich und kompakt.""" + + +def buildScoringPrompt(messages: List[Dict[str, Any]], contextCategory: str) -> str: + """Build a prompt to evaluate competence dimensions after a session.""" + conversation = "" + for msg in messages: + role = "Benutzer" if msg.get("role") == "user" else "Coach" + conversation += f"\n{role}: {msg.get('content', '')}" + + return f"""Bewerte die Kommunikationskompetenz des Benutzers basierend auf dieser Coaching-Session. +Kategorie: {contextCategory} + +Bewerte folgende Dimensionen auf einer Skala von 0-100: +- empathy: Einfuehlungsvermoegen +- clarity: Klarheit der Kommunikation +- assertiveness: Durchsetzungsfaehigkeit +- listening: Zuhoerfaehigkeit +- selfReflection: Selbstreflexion + +Antworte AUSSCHLIESSLICH als JSON-Array: +[ + {{"dimension": "empathy", "score": 65, "trend": "improving", "evidence": "Zeigt zunehmendes Verstaendnis..."}}, + {{"dimension": "clarity", "score": 70, "trend": "stable", "evidence": "..."}} +] + +Trend: "improving", "stable", oder "declining" basierend auf dem Gespraechsverlauf. + +Gespraech: +{conversation}""" + + +def buildKeyTopicsExtractionPrompt(summary: str, messages: List[Dict[str, Any]]) -> str: + """Extract 2-5 key topics from session for indexing.""" + return f"""Extrahiere 2-5 Kernthemen aus dieser Coaching-Session. +Antworte AUSSCHLIESSLICH als JSON-Array von Strings: +["Thema 1", "Thema 2", "Thema 3"] + +Zusammenfassung: {summary[:500]} + +Nur konkrete Themen (z.B. Delegation, Feedback-Gespraech, Konflikt mit Vorgesetztem).""" + + +def buildFullContextSummaryPrompt( + sessionSummaries: List[Dict[str, Any]], + currentSessionSummary: Optional[str], + currentSessionMessages: List[Dict[str, Any]], + contextTitle: str, +) -> str: + """Build prompt for full context summary (summarize_all intent).""" + parts = [] + for s in sessionSummaries: + dateStr = s.get("date", "") + summary = s.get("summary", "") + if summary: + parts.append(f"Session {dateStr}: {summary}") + + if currentSessionSummary: + parts.append(f"Aktuelle Session (zusammengefasst): {currentSessionSummary}") + + recent = "\n".join( + f"{m.get('role','user')}: {m.get('content','')[:200]}" + for m in currentSessionMessages[-10:] + ) + if recent: + parts.append(f"Aktuelle Session (letzte Nachrichten):\n{recent}") + + combined = "\n\n".join(parts) + return f"""Erstelle eine kompakte Gesamtzusammenfassung aller Coaching-Sessions zum Thema "{contextTitle}". + +Struktur: +1. **Gesamtueberblick**: Was wurde ueber alle Sessions hinweg besprochen +2. **Entwicklung**: Wie hat sich das Thema/thematische Schwerpunkte entwickelt +3. **Offene Punkte**: Was steht noch aus +4. **Empfehlung**: Kurzer naechster Fokus + +Inhalt: +{combined[:6000]} + +Antworte auf Deutsch, sachlich, 4-6 Absaetze.""" + + +def buildRollingOverviewPrompt(sessionSummaries: List[Dict[str, Any]], contextTitle: str) -> str: + """Build prompt for rolling overview (compress many sessions).""" + parts = [] + for s in sessionSummaries: + dateStr = s.get("date", "") + summary = s.get("summary", "") + if summary: + parts.append(f"- {dateStr}: {summary[:300]}") + + combined = "\n".join(parts) + return f"""Fasse die folgenden Coaching-Sessions zum Thema "{contextTitle}" in 4-6 Saetzen zusammen. +Behalte: Kernthemen, Fortschritte, wichtige Erkenntnisse, offene Punkte. +Entferne Wiederholungen. + +Sessions: +{combined} + +Antworte NUR mit der Zusammenfassung.""" + + +def buildTaskExtractionPrompt(messages: List[Dict[str, Any]]) -> str: + """Build a prompt to extract actionable tasks from a session.""" + recentForTasks = messages[-25:] if len(messages) > 25 else messages + conversation = "" + for msg in recentForTasks: + role = "Benutzer" if msg.get("role") == "user" else "Coach" + conversation += f"\n{role}: {msg.get('content', '')}" + + return f"""Extrahiere konkrete Aufgaben/naechste Schritte aus diesem Coaching-Gespraech. +Nur Aufgaben, die der Benutzer selbst umsetzen soll. + +Antworte AUSSCHLIESSLICH als JSON-Array: +[ + {{"title": "Aufgabentitel", "description": "Kurze Beschreibung", "priority": "medium"}} +] + +priority: "low", "medium", oder "high" +Maximal 3 Aufgaben. Wenn keine klar erkennbar: leeres Array []. + +Gespraech: +{conversation}""" + + +def parseJsonResponse(responseText: str, fallback: Any = None) -> Any: + """Parse a JSON response from AI, handling markdown code blocks.""" + text = responseText.strip() + if text.startswith("```"): + lines = text.split("\n") + lines = lines[1:] # remove opening ```json + if lines and lines[-1].strip() == "```": + lines = lines[:-1] + text = "\n".join(lines) + + try: + return json.loads(text) + except json.JSONDecodeError: + logger.warning(f"Failed to parse AI JSON response: {text[:200]}") + return fallback + + +def _parseJsonField(value: Optional[str], fallback: Any = None) -> Any: + if not value: + return fallback + if isinstance(value, (list, dict)): + return value + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return fallback diff --git a/modules/features/commcoach/serviceCommcoachContextRetrieval.py b/modules/features/commcoach/serviceCommcoachContextRetrieval.py new file mode 100644 index 00000000..d673b04a --- /dev/null +++ b/modules/features/commcoach/serviceCommcoachContextRetrieval.py @@ -0,0 +1,223 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +CommCoach Context Retrieval. +Intent detection, retrieval strategies, and context assembly for intelligent session continuity. +""" + +import re +import logging +from datetime import datetime +from typing import Optional, Dict, Any, List, Tuple +from enum import Enum + +logger = logging.getLogger(__name__) + +# Retrieval config +PREVIOUS_SESSION_SUMMARIES_COUNT = 5 +ROLLING_OVERVIEW_SESSION_THRESHOLD = 10 +ROLLING_OVERVIEW_EVERY_N_SESSIONS = 10 +TOPIC_SEARCH_MAX_RESULTS = 5 + + +class RetrievalIntent(str, Enum): + NORMAL = "normal" + SUMMARIZE_ALL = "summarize_all" + RECALL_SESSION = "recall_session" + RECALL_TOPIC = "recall_topic" + + +def detectIntent(userMessage: str) -> RetrievalIntent: + """ + Lightweight intent detection from user message. + Uses keyword/regex heuristics. + """ + text = (userMessage or "").strip().lower() + if len(text) < 3: + return RetrievalIntent.NORMAL + + summarizePatterns = [ + r"\b(fasse|zusammenfass|zusammenfassung|ueberblick|gesamte?r?\s*chat|alles\s+zusammen)\b", + r"\b(summarize|summary\s+of\s+all|complete\s+summary)\b", + r"zusammenfassung\s+(des\s+)?gesamten", + r"gesamten\s+chat", + ] + for p in summarizePatterns: + if re.search(p, text, re.IGNORECASE): + return RetrievalIntent.SUMMARIZE_ALL + + datePatterns = [ + r"\b(session|gespraech|besprechung)\s+(vom|am|vom)\s*(\d{1,2})\.(\d{1,2})\.(\d{2,4})", + r"\b(am|vom)\s*(\d{1,2})\.(\d{1,2})\.(\d{2,4})", + r"\b(letzte\s+woche|voriger\s+monat|gestern)\b", + r"\b(session|gespraech)\s+(vom|from)\s+(\d{4}-\d{2}-\d{2})", + ] + for p in datePatterns: + if re.search(p, text, re.IGNORECASE): + return RetrievalIntent.RECALL_SESSION + + recallTopicPatterns = [ + r"\b(erinnerst\s+du\s+dich|damals\s+als|thema\s+.*\s+von|ueber\s+was\s+haben\s+wir)\b", + r"\b(was\s+war\s+.*\s+nochmal|thema\s+.*\s+besprochen)\b", + r"\b(recall|remember|vor\s+\d+\s+sessions?)\b", + ] + for p in recallTopicPatterns: + if re.search(p, text, re.IGNORECASE): + return RetrievalIntent.RECALL_TOPIC + + return RetrievalIntent.NORMAL + + +def _parseDateFromMessage(text: str) -> Optional[datetime]: + """Extract date from user message. Returns date or None.""" + text = text.strip() + patterns = [ + (r"(\d{1,2})\.(\d{1,2})\.(\d{2,4})", lambda m: (int(m[1]), int(m[2]), int(m[3]))), + (r"(\d{4})-(\d{2})-(\d{2})", lambda m: (int(m[3]), int(m[2]), int(m[1]))), + ] + for pattern, extractor in patterns: + match = re.search(pattern, text) + if match: + try: + day, month, year = extractor(match) + if year < 100: + year += 2000 + return datetime(year, month, day) + except (ValueError, IndexError): + pass + return None + + +def findSessionByDate( + sessions: List[Dict[str, Any]], + targetDate: Optional[datetime], +) -> Optional[Dict[str, Any]]: + """ + Find session closest to targetDate. + sessions: list of session dicts with startedAt/endedAt. + """ + if not targetDate or not sessions: + return None + + targetDateOnly = targetDate.date() + bestMatch = None + bestDiff = None + + for s in sessions: + if s.get("status") != "completed": + continue + startedAt = s.get("startedAt") or s.get("endedAt") or s.get("createdAt") + if not startedAt: + continue + try: + dt = datetime.fromisoformat(startedAt.replace("Z", "+00:00")) + sessionDate = dt.date() + diff = abs((sessionDate - targetDateOnly).days) + if bestDiff is None or diff < bestDiff: + bestDiff = diff + bestMatch = s + except Exception: + continue + + return bestMatch + + +def searchSessionsByTopic( + sessions: List[Dict[str, Any]], + query: str, + maxResults: int = TOPIC_SEARCH_MAX_RESULTS, +) -> List[Dict[str, Any]]: + """ + Topic search over sessions. + Phase 5: Keyword-based (keyTopics + summary). + Phase 7: Falls back to embedding search when available; for now uses keyword only. + """ + if not query or not sessions: + return [] + + queryWords = set(re.findall(r"\w+", query.lower())) + if not queryWords: + return [] + + scored = [] + for s in sessions: + if s.get("status") != "completed": + continue + score = 0 + summary = (s.get("summary") or "").lower() + keyTopicsRaw = s.get("keyTopics") + keyTopics = [] + if keyTopicsRaw: + try: + import json + parsed = json.loads(keyTopicsRaw) if isinstance(keyTopicsRaw, str) else keyTopicsRaw + keyTopics = [t.lower() if isinstance(t, str) else str(t).lower() for t in parsed] if isinstance(parsed, list) else [] + except Exception: + pass + + for word in queryWords: + if len(word) < 3: + continue + if word in summary: + score += 1 + for topic in keyTopics: + if word in topic: + score += 2 + + if score > 0: + scored.append((score, s)) + + scored.sort(key=lambda x: -x[0]) + return [s for _, s in scored[:maxResults]] + + +def searchSessionsByTopicRag( + sessions: List[Dict[str, Any]], + query: str, + maxResults: int = TOPIC_SEARCH_MAX_RESULTS, + embeddingProvider: Optional[Any] = None, +) -> List[Dict[str, Any]]: + """ + Phase 7 RAG: Semantic search via embeddings. + When embeddingProvider is None, falls back to keyword search. + Future: Pass embeddingProvider that has embed(text) -> vector and similarity search. + """ + if embeddingProvider is None: + return searchSessionsByTopic(sessions, query, maxResults) + # TODO: When embedding API exists: embed query, embed session summaries, cosine similarity + return searchSessionsByTopic(sessions, query, maxResults) + + +def buildSessionSummariesForPrompt( + sessions: List[Dict[str, Any]], + excludeSessionId: Optional[str] = None, + limit: int = PREVIOUS_SESSION_SUMMARIES_COUNT, +) -> List[Dict[str, Any]]: + """ + Build list of session summaries with date for prompt. + Each item: {summary, date, sessionId, keyTopics}. + """ + completed = [ + s for s in sessions + if s.get("status") == "completed" + and s.get("summary") + and s.get("id") != excludeSessionId + ] + completed.sort(key=lambda x: x.get("startedAt") or x.get("createdAt") or "", reverse=True) + result = [] + for s in completed[:limit]: + startedAt = s.get("startedAt") or s.get("createdAt") or "" + dateStr = "" + if startedAt: + try: + dt = datetime.fromisoformat(startedAt.replace("Z", "+00:00")) + dateStr = dt.strftime("%d.%m.%Y") + except Exception: + pass + result.append({ + "summary": s.get("summary", ""), + "date": dateStr, + "sessionId": s.get("id"), + "keyTopics": s.get("keyTopics"), + }) + return result diff --git a/modules/features/commcoach/serviceCommcoachScheduler.py b/modules/features/commcoach/serviceCommcoachScheduler.py new file mode 100644 index 00000000..3db548cf --- /dev/null +++ b/modules/features/commcoach/serviceCommcoachScheduler.py @@ -0,0 +1,91 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +CommCoach Scheduler Service. +Handles daily reminders and scheduled email summaries. +""" + +import logging +from typing import Dict, Any, List + +logger = logging.getLogger(__name__) + + +def registerScheduledJobs(eventManagement): + """Register CommCoach scheduled jobs with the event management system.""" + try: + eventManagement.registerCron( + jobId="commcoach_daily_reminder", + func=_runDailyReminders, + cronKwargs={"hour": 8, "minute": 0}, + ) + logger.info("CommCoach scheduler: daily reminder job registered at 08:00") + except Exception as e: + logger.error(f"CommCoach scheduler: failed to register jobs: {e}") + + +async def _runDailyReminders(): + """Send daily coaching reminders to users who have opted in.""" + try: + from modules.shared.configuration import APP_CONFIG + from modules.connectors.connectorDbPostgre import DatabaseConnector + from .datamodelCommcoach import CoachingUserProfile, CoachingContextStatus + from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface + + dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data") + db = DatabaseConnector( + dbHost=dbHost, + dbDatabase="poweron_commcoach", + dbUser=APP_CONFIG.get("DB_USER"), + dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"), + dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), + userId="system", + ) + + profiles = db.getRecordset(CoachingUserProfile, recordFilter={"dailyReminderEnabled": True}) + if not profiles: + return + + messaging = getMessagingInterface() + + from modules.interfaces.interfaceDbApp import getRootInterface + rootInterface = getRootInterface() + + sentCount = 0 + for profile in profiles: + try: + userId = profile.get("userId") + user = rootInterface.getUser(userId) + if not user or not user.email: + continue + + # Check if user has active contexts + from .datamodelCommcoach import CoachingContext + contexts = db.getRecordset(CoachingContext, recordFilter={ + "userId": userId, + "status": CoachingContextStatus.ACTIVE.value, + }) + if not contexts: + continue + + contextTitles = [c.get("title", "Unbenannt") for c in contexts[:3]] + contextList = ", ".join(contextTitles) + + subject = "Dein taegliches Coaching wartet" + message = f""" +

Zeit fuer dein Coaching

+

Du hast aktive Coaching-Themen: {contextList}

+

Nimm dir 10 Minuten fuer eine kurze Session. Konsistenz ist der Schluessel zu Fortschritt.

+

Dein aktueller Streak: {profile.get('streakDays', 0)} Tage

+ """ + + messaging.send("email", user.email, subject, message) + sentCount += 1 + except Exception as e: + logger.warning(f"Failed to send reminder to user {profile.get('userId')}: {e}") + + if sentCount > 0: + logger.info(f"CommCoach scheduler: sent {sentCount} daily reminders") + + except Exception as e: + logger.error(f"CommCoach daily reminders failed: {e}") diff --git a/modules/features/commcoach/tests/__init__.py b/modules/features/commcoach/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/features/commcoach/tests/test_contextRetrieval.py b/modules/features/commcoach/tests/test_contextRetrieval.py new file mode 100644 index 00000000..a0dcf226 --- /dev/null +++ b/modules/features/commcoach/tests/test_contextRetrieval.py @@ -0,0 +1,103 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Tests for CommCoach context retrieval (intent detection, session lookup).""" + +import pytest +from datetime import datetime + +from ..serviceCommcoachContextRetrieval import ( + detectIntent, + RetrievalIntent, + buildSessionSummariesForPrompt, + findSessionByDate, + searchSessionsByTopic, + _parseDateFromMessage, +) + + +class TestDetectIntent: + def test_normal(self): + assert detectIntent("Ich habe ein Problem mit meinem Team") == RetrievalIntent.NORMAL + assert detectIntent("Wie kann ich besser delegieren?") == RetrievalIntent.NORMAL + + def test_summarizeAll(self): + assert detectIntent("Fasse alles zusammen") == RetrievalIntent.SUMMARIZE_ALL + assert detectIntent("Gib mir eine Zusammenfassung des gesamten Chats") == RetrievalIntent.SUMMARIZE_ALL + + def test_recallSession(self): + assert detectIntent("Was haben wir am 01.02.2026 besprochen?") == RetrievalIntent.RECALL_SESSION + assert detectIntent("Session vom 15.03.2026") == RetrievalIntent.RECALL_SESSION + + def test_recallTopic(self): + assert detectIntent("Erinnerst du dich an das Thema Delegation?") == RetrievalIntent.RECALL_TOPIC + assert detectIntent("Was war das Thema von vor 5 Sessions?") == RetrievalIntent.RECALL_TOPIC + + +class TestParseDateFromMessage: + def test_dd_mm_yyyy(self): + d = _parseDateFromMessage("am 01.02.2026") + assert d is not None + assert d.year == 2026 + assert d.month == 2 + assert d.day == 1 + + def test_iso(self): + d = _parseDateFromMessage("2026-02-01") + assert d is not None + assert d.year == 2026 + assert d.month == 2 + assert d.day == 1 + + +class TestFindSessionByDate: + def test_findsClosest(self): + sessions = [ + {"id": "1", "status": "completed", "startedAt": "2026-02-01T10:00:00Z"}, + {"id": "2", "status": "completed", "startedAt": "2026-02-05T10:00:00Z"}, + ] + target = datetime(2026, 2, 4) + found = findSessionByDate(sessions, target) + assert found is not None + assert found["id"] == "2" + + def test_findsExactDate(self): + sessions = [ + {"id": "1", "status": "completed", "startedAt": "2026-02-01T10:00:00Z"}, + {"id": "2", "status": "completed", "startedAt": "2026-02-05T10:00:00Z"}, + ] + target = datetime(2026, 2, 1) + found = findSessionByDate(sessions, target) + assert found is not None + assert found["id"] == "1" + + def test_returnsNoneForEmpty(self): + assert findSessionByDate([], datetime(2026, 2, 1)) is None + + +class TestSearchSessionsByTopic: + def test_matchesKeyTopics(self): + sessions = [ + {"id": "1", "status": "completed", "summary": "Delegation besprochen", "keyTopics": '["Delegation", "Feedback"]'}, + ] + result = searchSessionsByTopic(sessions, "Delegation") + assert len(result) == 1 + assert result[0]["id"] == "1" + + def test_matchesSummary(self): + sessions = [ + {"id": "1", "status": "completed", "summary": "Konflikt mit Vorgesetztem", "keyTopics": None}, + ] + result = searchSessionsByTopic(sessions, "Konflikt") + assert len(result) == 1 + + +class TestBuildSessionSummariesForPrompt: + def test_excludesSession(self): + sessions = [ + {"id": "1", "status": "completed", "summary": "A", "startedAt": "2026-02-01T10:00:00Z"}, + {"id": "2", "status": "completed", "summary": "B", "startedAt": "2026-02-02T10:00:00Z"}, + ] + result = buildSessionSummariesForPrompt(sessions, excludeSessionId="2", limit=5) + assert len(result) == 1 + assert result[0]["summary"] == "A" + assert result[0]["date"] diff --git a/modules/features/commcoach/tests/test_datamodel.py b/modules/features/commcoach/tests/test_datamodel.py new file mode 100644 index 00000000..fb39ba34 --- /dev/null +++ b/modules/features/commcoach/tests/test_datamodel.py @@ -0,0 +1,168 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Tests for CommCoach data models. +Validates model creation, defaults, enums, and serialization. +""" + +import pytest +import json +from ..datamodelCommcoach import ( + CoachingContext, CoachingContextStatus, CoachingContextCategory, + CoachingSession, CoachingSessionStatus, + CoachingMessage, CoachingMessageRole, CoachingMessageContentType, + CoachingTask, CoachingTaskStatus, CoachingTaskPriority, + CoachingScore, CoachingScoreTrend, + CoachingUserProfile, + CreateContextRequest, SendMessageRequest, CreateTaskRequest, + UpdateTaskStatusRequest, DashboardData, +) + + +class TestCoachingContext: + def test_createWithDefaults(self): + ctx = CoachingContext( + userId="user-1", + mandateId="mandate-1", + instanceId="instance-1", + title="Test Topic", + ) + assert ctx.userId == "user-1" + assert ctx.title == "Test Topic" + assert ctx.status == CoachingContextStatus.ACTIVE + assert ctx.category == CoachingContextCategory.CUSTOM + assert ctx.sessionCount == 0 + assert ctx.id is not None and len(ctx.id) > 0 + + def test_createWithAllFields(self): + ctx = CoachingContext( + userId="user-1", + mandateId="mandate-1", + instanceId="instance-1", + title="Conflict Management", + description="Working on conflict with team", + category=CoachingContextCategory.CONFLICT, + status=CoachingContextStatus.PAUSED, + goals=json.dumps([{"id": "g1", "text": "Resolve conflict", "status": "open"}]), + ) + assert ctx.category == CoachingContextCategory.CONFLICT + assert ctx.status == CoachingContextStatus.PAUSED + goals = json.loads(ctx.goals) + assert len(goals) == 1 + + def test_modelDump(self): + ctx = CoachingContext( + userId="u1", mandateId="m1", instanceId="i1", title="T" + ) + data = ctx.model_dump() + assert isinstance(data, dict) + assert data["userId"] == "u1" + assert "id" in data + + +class TestCoachingSession: + def test_createWithDefaults(self): + session = CoachingSession( + contextId="ctx-1", + userId="user-1", + mandateId="m1", + instanceId="i1", + ) + assert session.status == CoachingSessionStatus.ACTIVE + assert session.messageCount == 0 + assert session.durationSeconds == 0 + assert session.emailSent is False + + def test_competenceScoreRange(self): + session = CoachingSession( + contextId="ctx-1", userId="u1", mandateId="m1", instanceId="i1", + competenceScore=75.5, + ) + assert session.competenceScore == 75.5 + + +class TestCoachingMessage: + def test_createUserMessage(self): + msg = CoachingMessage( + sessionId="s1", contextId="c1", userId="u1", + role=CoachingMessageRole.USER, + content="Hello coach", + ) + assert msg.role == CoachingMessageRole.USER + assert msg.contentType == CoachingMessageContentType.TEXT + + def test_createAssistantMessage(self): + msg = CoachingMessage( + sessionId="s1", contextId="c1", userId="u1", + role=CoachingMessageRole.ASSISTANT, + content="Welcome to coaching!", + ) + assert msg.role == CoachingMessageRole.ASSISTANT + + +class TestCoachingTask: + def test_createWithDefaults(self): + task = CoachingTask( + contextId="c1", userId="u1", mandateId="m1", + title="Practice active listening", + ) + assert task.status == CoachingTaskStatus.OPEN + assert task.priority == CoachingTaskPriority.MEDIUM + + def test_allStatuses(self): + for status in CoachingTaskStatus: + task = CoachingTask( + contextId="c1", userId="u1", mandateId="m1", + title="Task", status=status, + ) + assert task.status == status + + +class TestCoachingScore: + def test_createScore(self): + score = CoachingScore( + contextId="c1", sessionId="s1", userId="u1", mandateId="m1", + dimension="empathy", score=72.5, + trend=CoachingScoreTrend.IMPROVING, + evidence="Shows increasing empathy", + ) + assert score.dimension == "empathy" + assert score.score == 72.5 + assert score.trend == CoachingScoreTrend.IMPROVING + + +class TestCoachingUserProfile: + def test_createWithDefaults(self): + profile = CoachingUserProfile( + userId="u1", mandateId="m1", instanceId="i1", + ) + assert profile.preferredLanguage == "de-DE" + assert profile.dailyReminderEnabled is False + assert profile.emailSummaryEnabled is True + assert profile.streakDays == 0 + + +class TestApiModels: + def test_createContextRequest(self): + req = CreateContextRequest(title="Test", goals=["Goal 1", "Goal 2"]) + assert req.title == "Test" + assert len(req.goals) == 2 + + def test_sendMessageRequest(self): + req = SendMessageRequest(content="Hello") + assert req.content == "Hello" + assert req.contentType == CoachingMessageContentType.TEXT + + def test_createTaskRequest(self): + req = CreateTaskRequest(title="Task 1") + assert req.title == "Task 1" + assert req.priority == CoachingTaskPriority.MEDIUM + + def test_updateTaskStatusRequest(self): + req = UpdateTaskStatusRequest(status=CoachingTaskStatus.DONE) + assert req.status == CoachingTaskStatus.DONE + + def test_dashboardData(self): + data = DashboardData(totalContexts=3, activeContexts=2, totalSessions=10) + assert data.totalContexts == 3 + assert data.openTasks == 0 diff --git a/modules/features/commcoach/tests/test_mainCommcoach.py b/modules/features/commcoach/tests/test_mainCommcoach.py new file mode 100644 index 00000000..85d85cf6 --- /dev/null +++ b/modules/features/commcoach/tests/test_mainCommcoach.py @@ -0,0 +1,105 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Tests for CommCoach feature registration module. +""" + +import pytest +from ..mainCommcoach import ( + FEATURE_CODE, FEATURE_LABEL, FEATURE_ICON, + UI_OBJECTS, DATA_OBJECTS, RESOURCE_OBJECTS, TEMPLATE_ROLES, + getFeatureDefinition, getUiObjects, getResourceObjects, getDataObjects, getTemplateRoles, +) + + +class TestFeatureMetadata: + def test_featureCode(self): + assert FEATURE_CODE == "commcoach" + + def test_featureLabel(self): + assert "de" in FEATURE_LABEL + assert "en" in FEATURE_LABEL + assert "Coach" in FEATURE_LABEL["de"] + + def test_featureIcon(self): + assert FEATURE_ICON.startswith("mdi-") + + +class TestFeatureDefinition: + def test_structure(self): + defn = getFeatureDefinition() + assert defn["code"] == "commcoach" + assert "label" in defn + assert "icon" in defn + assert defn["autoCreateInstance"] is True + + +class TestRbacObjects: + def test_uiObjectsExist(self): + objs = getUiObjects() + assert len(objs) >= 4 + keys = [o["objectKey"] for o in objs] + assert "ui.feature.commcoach.dashboard" in keys + assert "ui.feature.commcoach.coaching" in keys + assert "ui.feature.commcoach.dossier" in keys + assert "ui.feature.commcoach.settings" in keys + + def test_uiObjectsHaveLabels(self): + for obj in getUiObjects(): + assert "label" in obj + assert "de" in obj["label"] + + def test_dataObjectsExist(self): + objs = getDataObjects() + assert len(objs) >= 7 + keys = [o["objectKey"] for o in objs] + assert "data.feature.commcoach.CoachingContext" in keys + assert "data.feature.commcoach.CoachingSession" in keys + assert "data.feature.commcoach.CoachingMessage" in keys + assert "data.feature.commcoach.CoachingTask" in keys + assert "data.feature.commcoach.CoachingScore" in keys + assert "data.feature.commcoach.*" in keys + + def test_resourceObjectsExist(self): + objs = getResourceObjects() + assert len(objs) >= 5 + keys = [o["objectKey"] for o in objs] + assert "resource.feature.commcoach.context.create" in keys + assert "resource.feature.commcoach.session.start" in keys + assert "resource.feature.commcoach.task.manage" in keys + + def test_resourceObjectsHaveMeta(self): + for obj in getResourceObjects(): + assert "meta" in obj + assert "endpoint" in obj["meta"] + assert "method" in obj["meta"] + + +class TestTemplateRoles: + def test_rolesExist(self): + roles = getTemplateRoles() + assert len(roles) >= 1 + labels = [r["roleLabel"] for r in roles] + assert "commcoach-user" in labels + + def test_userRoleHasStrictOwnership(self): + """Verify that commcoach-user role uses MY (m) access, not ALL (a).""" + roles = getTemplateRoles() + userRole = next(r for r in roles if r["roleLabel"] == "commcoach-user") + dataRules = [r for r in userRole["accessRules"] if r["context"] == "DATA"] + for rule in dataRules: + if rule.get("read"): + assert rule["read"] == "m", f"DATA rule for {rule.get('item')} uses '{rule['read']}' instead of 'm' (MY)" + + def test_roleHasDescription(self): + for role in getTemplateRoles(): + assert "description" in role + assert "de" in role["description"] + + def test_roleHasAccessRules(self): + for role in getTemplateRoles(): + assert len(role["accessRules"]) > 0 + contexts = set(r["context"] for r in role["accessRules"]) + assert "UI" in contexts + assert "DATA" in contexts + assert "RESOURCE" in contexts diff --git a/modules/features/commcoach/tests/test_serviceAi.py b/modules/features/commcoach/tests/test_serviceAi.py new file mode 100644 index 00000000..b4410ee8 --- /dev/null +++ b/modules/features/commcoach/tests/test_serviceAi.py @@ -0,0 +1,193 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Tests for CommCoach AI service (prompt building and response parsing). +These tests don't require AI calls -- they test the prompt construction and JSON parsing. +""" + +import pytest +import json +from ..serviceCommcoachAi import ( + buildCoachingSystemPrompt, + buildSummaryPrompt, + buildScoringPrompt, + buildTaskExtractionPrompt, + buildEarlierConversationSummaryPrompt, + prepareMessagesForPrompt, + parseJsonResponse, +) + + +class TestBuildCoachingSystemPrompt: + def test_basicPrompt(self): + context = {"title": "Conflict Resolution", "category": "conflict"} + prompt = buildCoachingSystemPrompt(context, [], []) + assert "Conflict Resolution" in prompt + assert "conflict" in prompt + assert "coach" in prompt.lower() + + def test_withGoals(self): + context = { + "title": "Leadership", + "category": "leadership", + "goals": json.dumps([{"text": "Improve delegation"}]), + } + prompt = buildCoachingSystemPrompt(context, [], []) + assert "Improve delegation" in prompt + + def test_withInsights(self): + context = { + "title": "Test", + "category": "custom", + "insights": json.dumps([{"text": "User shows progress in empathy"}]), + } + prompt = buildCoachingSystemPrompt(context, [], []) + assert "empathy" in prompt + + def test_withPreviousMessages(self): + context = {"title": "Test", "category": "custom"} + messages = [ + {"role": "user", "content": "I had a conflict with my team"}, + {"role": "assistant", "content": "Tell me more about that"}, + ] + prompt = buildCoachingSystemPrompt(context, messages, []) + assert "conflict" in prompt.lower() + + def test_withTasks(self): + context = {"title": "Test", "category": "custom"} + tasks = [ + {"title": "Practice listening", "status": "open"}, + {"title": "Read book", "status": "done"}, + ] + prompt = buildCoachingSystemPrompt(context, [], tasks) + assert "Practice listening" in prompt + + def test_promptLanguageIsGerman(self): + context = {"title": "Test", "category": "custom"} + prompt = buildCoachingSystemPrompt(context, [], []) + assert "Fuehrungskraefte" in prompt or "Coach" in prompt + + def test_withEarlierSummary(self): + context = {"title": "Test", "category": "custom"} + messages = [{"role": "user", "content": "Recent question"}] + earlierSummary = "User discussed delegation. Coach suggested practice." + prompt = buildCoachingSystemPrompt(context, messages, [], earlierSummary=earlierSummary) + assert "Aelterer Gespraechsverlauf" in prompt + assert "delegation" in prompt.lower() + assert "Recent question" in prompt + + def test_withRollingOverview(self): + context = {"title": "Test", "category": "custom"} + prompt = buildCoachingSystemPrompt( + context, [], [], rollingOverview="User arbeitet an Delegation. Fortschritt sichtbar." + ) + assert "Gesamtueberblick" in prompt + assert "Delegation" in prompt + + def test_withRetrievedSession(self): + context = {"title": "Test", "category": "custom"} + retrieved = {"summary": "Delegation und Feedback besprochen", "startedAt": "2026-02-01T10:00:00Z"} + prompt = buildCoachingSystemPrompt(context, [], [], retrievedSession=retrieved) + assert "angefragte Session" in prompt + assert "Delegation" in prompt + + +class TestPrepareMessagesForPrompt: + def test_underThreshold(self): + messages = [{"role": "user", "content": f"msg {i}"} for i in range(10)] + earlier, recent = prepareMessagesForPrompt(messages, None, None) + assert earlier is None + assert len(recent) == 10 + + def test_overThresholdWithCachedSummary(self): + messages = [{"role": "user", "content": f"msg {i}"} for i in range(40)] + cached = "Summary of first 25 messages" + earlier, recent = prepareMessagesForPrompt(messages, cached, 25) + assert earlier == cached + assert len(recent) == 15 + + def test_overThresholdNeedsRegenerate(self): + messages = [{"role": "user", "content": f"msg {i}"} for i in range(40)] + earlier, recent = prepareMessagesForPrompt(messages, "old summary", 20) + assert earlier is None + assert len(recent) == 40 + + +class TestBuildEarlierConversationSummaryPrompt: + def test_basic(self): + messages = [ + {"role": "user", "content": "I have a conflict"}, + {"role": "assistant", "content": "Tell me more"}, + ] + prompt = buildEarlierConversationSummaryPrompt(messages) + assert "Fasse" in prompt + assert "conflict" in prompt + + +class TestBuildSummaryPrompt: + def test_basic(self): + messages = [ + {"role": "user", "content": "I need help with delegation"}, + {"role": "assistant", "content": "Let's work on that"}, + ] + prompt = buildSummaryPrompt(messages, "Delegation") + assert "Delegation" in prompt + assert "Zusammenfassung" in prompt + + def test_emptyMessages(self): + prompt = buildSummaryPrompt([], "Test") + assert "Test" in prompt + + +class TestBuildScoringPrompt: + def test_basic(self): + messages = [{"role": "user", "content": "I tried active listening today"}] + prompt = buildScoringPrompt(messages, "leadership") + assert "empathy" in prompt + assert "clarity" in prompt + assert "JSON" in prompt + + def test_containsScaleInfo(self): + prompt = buildScoringPrompt([{"role": "user", "content": "test"}], "custom") + assert "0-100" in prompt + + +class TestBuildTaskExtractionPrompt: + def test_basic(self): + messages = [ + {"role": "assistant", "content": "You should try practicing active listening this week"}, + ] + prompt = buildTaskExtractionPrompt(messages) + assert "JSON" in prompt + assert "Aufgaben" in prompt or "Aufgabe" in prompt + + def test_limitedMessages(self): + messages = [{"role": "user", "content": f"msg {i}"} for i in range(35)] + prompt = buildTaskExtractionPrompt(messages) + assert "msg 34" in prompt + assert "msg 0" not in prompt + + +class TestParseJsonResponse: + def test_validJson(self): + result = parseJsonResponse('[{"dimension": "empathy", "score": 70}]') + assert isinstance(result, list) + assert result[0]["score"] == 70 + + def test_codeBlockWrapped(self): + result = parseJsonResponse('```json\n[{"title": "task1"}]\n```') + assert isinstance(result, list) + assert result[0]["title"] == "task1" + + def test_invalidJson(self): + result = parseJsonResponse('not json at all', fallback=[]) + assert result == [] + + def test_emptyString(self): + result = parseJsonResponse('', fallback=None) + assert result is None + + def test_nestedCodeBlock(self): + text = '```\n{"key": "value"}\n```' + result = parseJsonResponse(text) + assert result["key"] == "value" diff --git a/modules/routes/routeSystem.py b/modules/routes/routeSystem.py index 4011d679..2ef07db7 100644 --- a/modules/routes/routeSystem.py +++ b/modules/routes/routeSystem.py @@ -120,6 +120,9 @@ def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]: elif featureCode == "chatbot": from modules.features.chatbot.mainChatbot import UI_OBJECTS return UI_OBJECTS + elif featureCode == "commcoach": + from modules.features.commcoach.mainCommcoach import UI_OBJECTS + return UI_OBJECTS else: logger.warning(f"Unknown feature code: {featureCode}") return []