diff --git a/WORKFLOW_IMPLEMENTATION_GAPS.md b/WORKFLOW_IMPLEMENTATION_GAPS.md new file mode 100644 index 00000000..6eb68ed9 --- /dev/null +++ b/WORKFLOW_IMPLEMENTATION_GAPS.md @@ -0,0 +1,239 @@ +# Workflow Implementation Gaps Analysis + +## Overview + +After refactoring the AI process for fast track and full track, the workflow implementation does not match the architecture described in `ai_plan_architecture.md`. This document identifies the gaps and missing implementations. + +--- + +## Critical Issues Found + +### 1. ❌ Fast Path Implementation Missing + +**Architecture Requirement** (`ai_plan_architecture.md` lines 377-395): +- `detectComplexity()` function to determine "simple" | "moderate" | "complex" +- `fastPathExecute()` function for simple requests (5-15s) +- Fast path should skip full workflow and return directly + +**Current Implementation**: +- ❌ **NO** `detectComplexity()` function found +- ❌ **NO** `fastPathExecute()` function found +- ❌ **NO** complexity detection before workflow execution +- ✅ User intention analysis exists but doesn't route to fast path + +**Location**: Should be in `workflowManager.py` before `_planTasks()` + +**Impact**: All requests go through full workflow, even simple ones that could be answered in 5-15s. + +--- + +### 2. ❌ Task Numbering Always Shows "Task 0" + +**Architecture Requirement** (`ai_plan_architecture.md` lines 117-152): +- `ChatWorkflow.currentTask` should increment properly +- `workflow.incrementTask()` should be called when starting new task +- Task numbers should be 1-indexed for display + +**Current Implementation**: +- ✅ `ChatWorkflow.currentTask` field exists +- ✅ `_updateWorkflowBeforeExecutingTask()` exists in `modeDynamic.py` (line 902) +- ❌ **NOT CALLED** from `workflowManager._executeTasks()` (line 368) +- ❌ Task index calculation uses `idx + 1` but never updates `workflow.currentTask` +- ❌ Messages show `taskNumber: 0` because `currentTask` is never incremented + +**Problem Location**: `workflowManager.py` line 368-396 + +**Current Code**: +```python +for idx, taskStep in enumerate(taskPlan.tasks): + currentTaskIndex = idx + 1 # Calculated but never used to update workflow + logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}") + # ... task execution ... + taskResult = await handling.executeTask(taskStep, workflow, taskContext) + # ❌ Missing: workflow.currentTask = currentTaskIndex + # ❌ Missing: workflow.incrementTask() or updateWorkflowBeforeExecutingTask() +``` + +**Fix Required**: +```python +for idx, taskStep in enumerate(taskPlan.tasks): + currentTaskIndex = idx + 1 + logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}") + + # ✅ ADD: Update workflow before executing task + handling.updateWorkflowBeforeExecutingTask(currentTaskIndex) + + # ... task execution ... + taskResult = await handling.executeTask(taskStep, workflow, taskContext) +``` + +**Impact**: UI always shows "Task 0" instead of "Task 1", "Task 2", etc. + +--- + +### 3. ❌ User Language Not in ChatMessages + +**Architecture Requirement** (`ai_plan_architecture.md` lines 1300-1321): +- ChatMessages should contain user-friendly text in **user's language** +- User-facing workflows: Full `ChatMessage` with user language +- Process automation: Minimal `ChatMessage` (system role) + +**Current Implementation**: +- ✅ User language detection exists (`workflowManager.py` line 256-277) +- ✅ `userMessage` field exists in `TaskStep` and `TaskPlan` +- ❌ **Messages use technical text** instead of user-friendly language +- ❌ **No AI-generated user messages** in user's language +- ❌ Messages show action names (`ai.process`, `outlook.readMails`) instead of user-friendly descriptions + +**Problem Locations**: +1. `messageCreator.py` line 136-145: Uses `action.userMessage` if available, but this is often missing +2. `messageCreator.py` line 77: Task start message is just `"🚀 **Task {taskProgress}**"` - no user-friendly text +3. `messageCreator.py` line 204: Task completion message is generic `"✅ Task completed successfully"` - not in user language + +**Example Current Message**: +``` +🚀 **Task 1** + +💬 Analyze the provided documents and extract key information +``` + +**Should Be** (in user's language, e.g., German): +``` +🚀 **Aufgabe 1** + +💬 Ich analysiere die bereitgestellten Dokumente und extrahiere die wichtigsten Informationen für Sie. +``` + +**Fix Required**: +1. Generate user-friendly messages in user's language during task planning +2. Store user messages in `TaskStep.userMessage` (already exists but not populated) +3. Use user messages in `messageCreator.py` instead of technical action names + +**Impact**: Users see technical messages instead of friendly, localized messages. + +--- + +### 4. ❌ Workflow-Level Architecture Not Implemented + +**Architecture Requirement** (`ai_plan_architecture.md` lines 368-428): +- `RequestContext` model for normalized request +- `UnderstandingResult` model for initial understanding +- `TaskDefinition` model with `extractionOptions` +- `TaskResult` model for task results +- `persistTaskResult()` function to create ChatMessages + +**Current Implementation**: +- ❌ **NO** `RequestContext` model +- ❌ **NO** `UnderstandingResult` model +- ❌ **NO** `TaskDefinition` model (using `TaskStep` instead) +- ❌ **NO** `TaskResult` model (using `ActionResult` instead) +- ❌ **NO** `persistTaskResult()` function +- ✅ `TaskStep` exists but doesn't match `TaskDefinition` structure + +**Impact**: Workflow-level architecture described in `ai_plan_architecture.md` is not implemented. The system uses old `TaskStep` model instead of new `TaskDefinition` model. + +--- + +### 5. ❌ Initial Understanding Phase Missing + +**Architecture Requirement** (`ai_plan_architecture.md` lines 411-414): +- `initialUnderstanding(context)` → `UnderstandingResult` +- Combined AI call: parameters + intention + context + tasks +- Creates `TaskDefinition[]` from `UnderstandingResult` + +**Current Implementation**: +- ✅ User intention analysis exists (`workflowManager.py` line 221-289) +- ❌ **NOT** structured as `initialUnderstanding()` function +- ❌ **NOT** returning `UnderstandingResult` model +- ❌ **NOT** creating `TaskDefinition[]` - creates `TaskStep[]` instead +- ❌ **NO** combined AI call - separate calls for different purposes + +**Impact**: Initial understanding phase doesn't match architecture. Uses old approach instead of new unified `UnderstandingResult` model. + +--- + +## Summary of Missing Implementations + +| Component | Status | Priority | Location | +|-----------|--------|----------|----------| +| Fast Path (`fastPathExecute`) | ❌ Missing | 🔴 Critical | `workflowManager.py` | +| Complexity Detection (`detectComplexity`) | ❌ Missing | 🔴 Critical | `workflowManager.py` | +| Task Numbering Update | ❌ Not Called | 🔴 Critical | `workflowManager._executeTasks()` | +| User Language Messages | ⚠️ Partial | 🟡 High | `messageCreator.py` | +| RequestContext Model | ❌ Missing | 🟡 High | `datamodelWorkflow.py` | +| UnderstandingResult Model | ❌ Missing | 🟡 High | `datamodelWorkflow.py` | +| TaskDefinition Model | ❌ Missing | 🟡 High | `datamodelWorkflow.py` | +| TaskResult Model | ❌ Missing | 🟡 High | `datamodelWorkflow.py` | +| persistTaskResult() | ❌ Missing | 🟡 High | `workflowProcessor.py` | +| initialUnderstanding() | ❌ Missing | 🟡 High | `workflowProcessor.py` | + +--- + +## Recommended Fix Order + +### Phase 1: Critical Fixes (Immediate) +1. **Fix Task Numbering** (30 min) + - Call `updateWorkflowBeforeExecutingTask()` in `workflowManager._executeTasks()` + - Verify `currentTask` increments properly + +2. **Add Fast Path Detection** (2 hours) + - Implement `detectComplexity()` function + - Add routing logic to `fastPathExecute()` for simple requests + - Test with simple vs. complex prompts + +### Phase 2: User Experience (High Priority) +3. **Generate User-Friendly Messages** (4 hours) + - Add AI call to generate user messages in user's language + - Update `messageCreator.py` to use user messages + - Test with different languages + +### Phase 3: Architecture Alignment (Medium Priority) +4. **Implement Workflow-Level Models** (8 hours) + - Create `RequestContext`, `UnderstandingResult`, `TaskDefinition`, `TaskResult` models + - Migrate from `TaskStep` to `TaskDefinition` + - Update all call sites + +5. **Implement Initial Understanding** (4 hours) + - Create `initialUnderstanding()` function + - Return `UnderstandingResult` model + - Create `TaskDefinition[]` from result + +--- + +## Code References + +### Current Task Execution Flow +- **Entry Point**: `workflowManager.py` line 166 → `_executeTasks()` +- **Task Loop**: `workflowManager.py` line 368-396 +- **Task Execution**: `workflowProcessor.py` → `executeTask()` +- **Message Creation**: `messageCreator.py` → `createTaskStartMessage()`, `createActionMessage()` + +### Missing Fast Path Flow +- **Should Be**: `workflowManager.py` → `detectComplexity()` → `fastPathExecute()` OR `_planTasks()` +- **Currently**: Always goes to `_planTasks()` → `_executeTasks()` + +### Task Numbering Issue +- **Problem**: `workflowManager._executeTasks()` line 369 calculates `currentTaskIndex` but never updates `workflow.currentTask` +- **Solution**: Call `handling.updateWorkflowBeforeExecutingTask(currentTaskIndex)` before `executeTask()` + +--- + +## Testing Checklist + +After fixes, verify: +- [ ] Simple requests use fast path (5-15s response time) +- [ ] Complex requests use full workflow (30-120s response time) +- [ ] Task numbers increment correctly (Task 1, Task 2, Task 3...) +- [ ] ChatMessages contain user-friendly text in user's language +- [ ] User messages are generated, not technical action names +- [ ] Workflow state (`currentTask`, `currentRound`, `currentAction`) updates correctly + +--- + +## Next Steps + +1. **Immediate**: Fix task numbering (30 min fix, high impact) +2. **This Week**: Implement fast path detection and routing +3. **Next Sprint**: Add user-friendly message generation +4. **Future**: Migrate to workflow-level models (`TaskDefinition`, `UnderstandingResult`, etc.) + diff --git a/modules/CHATLOG_CREATION_OVERVIEW.md b/modules/CHATLOG_CREATION_OVERVIEW.md new file mode 100644 index 00000000..4c7b80b5 --- /dev/null +++ b/modules/CHATLOG_CREATION_OVERVIEW.md @@ -0,0 +1,429 @@ +# ChatLog Erzeugung - Übersicht + +Diese Übersicht zeigt alle Stellen im `modules` Verzeichnis, wo ChatLog-Datensätze erzeugt werden. + +## Status der Dokumentation + +- ✅ **Kapitel 6, 5, 4**: Vollständig und korrekt dokumentiert +- 📋 **Kapitel 3**: Vollständig erweitert mit allen `progressLogStart/Update/Finish` Aufrufen +- ✅ **Kapitel 2, 1**: Vollständig dokumentiert (Service-Layer und Datenbank-Methoden) + +## Erzeugungswege + +ChatLog-Datensätze werden über zwei Hauptwege erzeugt: + +1. **Direkter Weg**: `interfaceDbChatObjects.createLog()` - erstellt direkt in der Datenbank +2. **Service-Weg**: `mainServiceChat.storeLog()` → `interfaceDbChatObjects.createLog()` - erstellt über Service-Layer + +## Übersicht nach Datei + +### 1. `gateway/modules/interfaces/interfaceDbChatObjects.py` + +**Hauptmethode für ChatLog-Erzeugung:** +- **Zeile 1042-1091**: `createLog()` - Direkte Erzeugung von ChatLog-Datensätzen in der Datenbank + - Wird von `storeLog()` aufgerufen + - Validiert Daten gegen ChatLog-Modell + - Erstellt Datensatz in normalisierter Tabelle + +### 2. `gateway/modules/services/serviceChat/mainServiceChat.py` + +**Service-Layer Methode:** +- **Zeile 605-622**: `storeLog()` - Wrapper um `createLog()` + - Fügt `workflowId` hinzu + - Synchronisiert mit in-memory workflow.logs Liste + - **Wird von vielen Stellen aufgerufen** (siehe unten) + +### 3. `gateway/modules/shared/progressLogger.py` + +**Progress-Logging System:** +- **Zeile 102-132**: `_logProgress()` - Erstellt ChatLog für Progress-Updates + - Wird von `startOperation()`, `updateOperation()`, `finishOperation()` aufgerufen + - Erstellt Logs für alle Progress-Updates während Operationen + - **Häufige Erzeugung**: Bei jedem Progress-Update wird ein ChatLog erstellt + +**Alle Aufrufe von `progressLogStart()`, `progressLogUpdate()`, `progressLogFinish()`:** + +#### 3.1. `gateway/modules/workflows/processing/workflowProcessor.py` + +**Task Plan Generation:** +- **Zeile 51**: `progressLogStart()` - Start Task Plan Generation + ```python + self.services.chat.progressLogStart( + operationId, + "Workflow Planning", + "Task Plan Generation", + f"Mode: {workflow.workflowMode.value}" + ) + ``` +- **Zeile 68**: `progressLogUpdate(0.3)` - "Analyzing input" +- **Zeile 74**: `progressLogUpdate(0.8)` - "Creating plan" +- **Zeile 80**: `progressLogFinish(True)` - Erfolg +- **Zeile 86**: `progressLogFinish(False)` - Fehler + +**Task Execution:** +- **Zeile 104**: `progressLogStart()` - Start Task Execution + ```python + self.services.chat.progressLogStart( + operationId, + "Workflow Execution", + "Task Execution", + f"Task {taskIndex}" + ) + ``` +- **Zeile 117**: `progressLogUpdate(0.2)` - "Executing" +- **Zeile 123**: `progressLogFinish(True)` - Erfolg +- **Zeile 129**: `progressLogFinish(False)` - Fehler + +#### 3.2. `gateway/modules/services/serviceAi/mainServiceAi.py` + +**AI Call with Looping (_callAiWithLooping):** +- **Zeile 198**: `progressLogUpdate(0.5)` - "Starting AI call iteration {iteration}" (erste Iteration) +- **Zeile 202**: `progressLogUpdate(baseProgress)` - "Continuing generation (iteration {iteration})" (weitere Iterationen) +- **Zeile 220**: `progressLogUpdate(0.51)` - "Calling AI model" (erste Iteration) +- **Zeile 239**: `progressLogUpdate(0.6)` - "AI response received (iteration {iteration})" (erste Iteration) +- **Zeile 242**: `progressLogUpdate(progress)` - "Processing response (iteration {iteration})" (weitere Iterationen) +- **Zeile 284**: `progressLogUpdate(0.65 + ...)` - "Extracted {len} sections (iteration {iteration})" +- **Zeile 304**: `progressLogUpdate(0.95)` - "Generation complete ({iteration} iterations, {len} sections)" + +**AI Content Processing (callAiContent):** +- **Zeile 568**: `progressLogStart()` - Start AI Content Processing + ```python + self.services.chat.progressLogStart( + aiOperationId, + "AI content processing", + "Content Processing", + f"Format: {outputFormat or 'text'}" + ) + ``` +- **Zeile 593**: `progressLogUpdate(0.1)` - "Analyzing prompt parameters" +- **Zeile 613**: `progressLogUpdate(0.4)` - "Calling AI for image generation" (IMAGE_GENERATE) +- **Zeile 642**: `progressLogUpdate(0.9)` - "Image generated" +- **Zeile 643**: `progressLogFinish(True)` - Erfolg (IMAGE_GENERATE) +- **Zeile 653**: `progressLogFinish(False)` - Fehler (IMAGE_GENERATE) +- **Zeile 658**: `progressLogUpdate(0.4)` - "Calling AI for {opType.name}" (WEB_SEARCH/WEB_CRAWL) +- **Zeile 679**: `progressLogUpdate(0.9)` - "{opType.name} completed" +- **Zeile 680**: `progressLogFinish(True)` - Erfolg (WEB_SEARCH/WEB_CRAWL) +- **Zeile 689**: `progressLogFinish(False)` - Fehler (WEB_SEARCH/WEB_CRAWL) +- **Zeile 705**: `progressLogUpdate(0.3)` - "Building generation prompt" (Document Generation) +- **Zeile 719**: `progressLogUpdate(0.4)` - "Calling AI for content generation" (Document Generation) +- **Zeile 729**: `progressLogUpdate(0.7)` - "Parsing generated JSON" (Document Generation) +- **Zeile 736**: `progressLogFinish(False)` - Fehler JSON Parsing +- **Zeile 758**: `progressLogUpdate(0.8)` - "Rendering to {outputFormat} format" (Document Generation) +- **Zeile 796**: `progressLogFinish(True)` - Erfolg (Document Generation) +- **Zeile 806**: `progressLogFinish(False)` - Fehler Rendering +- **Zeile 810**: `progressLogUpdate(0.5)` - "Processing text call" (Text Processing) +- **Zeile 830**: `progressLogFinish(True)` - Erfolg (Text Processing) +- **Zeile 839**: `progressLogFinish(False)` - Fehler (Text Processing) + +#### 3.3. `gateway/modules/services/serviceExtraction/mainServiceExtraction.py` + +**Process Documents Per Chunk (processDocumentsPerChunk):** +- **Zeile 424**: `progressLogStart()` - Start Document Processing + ```python + self.services.chat.progressLogStart( + operationId, + "AI Text Extract", + "Document Processing", + f"Processing {len(documents)} documents" + ) + ``` +- **Zeile 451**: `progressLogUpdate(0.1)` - "Extracting content from {len} documents" +- **Zeile 456**: `progressLogFinish(False)` - Fehler bei Extraction +- **Zeile 461**: `progressLogUpdate(0.3)` - "Processing {len} extracted content parts" +- **Zeile 466**: `progressLogUpdate(0.9)` - "Merging {len} part results" +- **Zeile 473**: `progressLogFinish(True)` - Erfolg +- **Zeile 479**: `progressLogFinish(False)` - Fehler +- **Zeile 538**: `progressLogUpdate(progress)` - "Processing part {processedCount}/{totalParts}" (in Chunking Callback) + +#### 3.4. `gateway/modules/workflows/methods/methodAi.py` + +**AI Process (process):** +- **Zeile 52**: `progressLogStart()` - Start AI Processing + ```python + self.services.chat.progressLogStart( + operationId, + "Generate", + "AI Processing", + f"Format: {parameters.get('resultType', 'txt')}" + ) + ``` +- **Zeile 63**: `progressLogUpdate(0.2)` - "Preparing parameters" +- **Zeile 111**: `progressLogUpdate(0.3)` - "Extracting content from documents" +- **Zeile 145**: `progressLogUpdate(0.4)` - "Preparing AI call" +- **Zeile 155**: `progressLogUpdate(0.6)` - "Calling AI" +- **Zeile 166**: `progressLogUpdate(0.8)` - "Processing result" +- **Zeile 211**: `progressLogFinish(True)` - Erfolg +- **Zeile 220**: `progressLogFinish(False)` - Fehler + +**Extract Content (extractContent):** +- **Zeile 251**: `progressLogStart()` - Start Content Extraction + ```python + self.services.chat.progressLogStart( + operationId, + "Extracting content from documents", + "Content Extraction", + f"Documents: {len(parameters.documentList.references)}" + ) + ``` +- **Zeile 259**: `progressLogUpdate(0.2)` - "Loading documents" +- **Zeile 263**: `progressLogFinish(False)` - Fehler (keine Dokumente) +- **Zeile 269**: `progressLogUpdate(0.3)` - "Preparing extraction options" +- **Zeile 286**: `progressLogUpdate(0.5)` - "Extracting content from {len} documents" +- **Zeile 290**: `progressLogUpdate(0.8)` - "Building result documents" +- **Zeile 301**: `progressLogFinish(True)` - Erfolg +- **Zeile 310**: `progressLogFinish(False)` - Fehler + +**Generate Document (generateDocument):** +- **Zeile 341**: `progressLogStart()` - Start Document Generation + ```python + self.services.chat.progressLogStart(...) + ``` +- **Zeile 359**: `progressLogFinish(True)` - Erfolg +- **Zeile 401**: `progressLogFinish(False)` - Fehler + +#### 3.5. `gateway/modules/services/serviceWeb/mainServiceWeb.py` + +**Research (research) - nur Updates, kein Start/Finish:** +- **Zeile 50**: `progressLogUpdate(0.1)` - "Analyzing research intent" +- **Zeile 75**: `progressLogUpdate(0.3)` - "Searching for URLs" +- **Zeile 87**: `progressLogUpdate(0.5)` - "Found {len} total URLs" +- **Zeile 102**: `progressLogUpdate(0.6)` - "Crawling {len} URLs" +- **Zeile 110**: `progressLogUpdate(0.9)` - "Consolidating results" + +**Hinweis**: `serviceWeb` verwendet nur `progressLogUpdate()`, aber kein `progressLogStart()` oder `progressLogFinish()`. Dies könnte ein Bug sein oder die Operation wird von einem anderen Service gestartet/beendet. + +### 4. `gateway/modules/workflows/workflowManager.py` + +**Workflow-Management - 11 ChatLog-Erzeugungen:** + +- **Zeile 58**: Workflow gestoppt für neuen Prompt + ```python + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped for new prompt", + "type": "info", + "status": "stopped", + "progress": 1.0 + }) + ``` + +- **Zeile 79**: Workflow fortgesetzt + ```python + self.services.chat.storeLog(workflow, { + "message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}", + "type": "info", + "status": "running", + "progress": 0 + }) + ``` + +- **Zeile 140**: Workflow gestoppt (workflowStop) + ```python + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped", + "type": "warning", + "status": "stopped", + "progress": 1.0 + }) + ``` + +- **Zeile 635**: Workflow gestoppt durch Benutzer (in _processWorkflowResults) + ```python + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped by user", + "type": "warning", + "status": "stopped", + "progress": 1.0 + }) + ``` + +- **Zeile 674**: Workflow fehlgeschlagen (in _processWorkflowResults) + ```python + self.services.chat.storeLog(workflow, { + "message": "Workflow failed: Unknown error", + "type": "error", + "status": "failed", + "progress": 1.0 + }) + ``` + +- **Zeile 762**: Workflow abgeschlossen (in _sendLastMessage) + ```python + self.services.chat.storeLog(workflow, { + "message": "Workflow completed", + "type": "success", + "status": "completed", + "progress": 1.0 + }) + ``` + +- **Zeile 837**: Workflow gestoppt (in _handleWorkflowStop) + ```python + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped by user", + "type": "warning", + "status": "stopped", + "progress": 1.0 + }) + ``` + +- **Zeile 880**: Workflow-Fehler (in _handleWorkflowError) + ```python + self.services.chat.storeLog(workflow, { + "message": f"Workflow failed: {str(error)}", + "type": "error", + "status": "failed", + "progress": 1.0 + }) + ``` + +- **Zeile 931**: Binäre Datei Info (in _processFileIds - Neutralization) + ```python + self.services.chat.storeLog(workflow, { + "message": infoMsg, + "type": "info", + "status": "running", + "progress": 50 + }) + ``` + +- **Zeile 964**: Neutralization Fehler (in _processFileIds) + ```python + self.services.chat.storeLog(workflow, { + "message": errorMsg, + "type": "error", + "status": "error", + "progress": -1 + }) + ``` + +- **Zeile 974**: Neutralization Fehler (Exception) (in _processFileIds) + ```python + self.services.chat.storeLog(workflow, { + "message": errorMsg, + "type": "error", + "status": "error", + "progress": -1 + }) + ``` + +- **Zeile 999**: Datei-Verarbeitungsfehler (in _processFileIds) + ```python + self.services.chat.storeLog(workflow, { + "message": errorMsg, + "type": "error", + "status": "error", + "progress": -1 + }) + ``` + +### 5. `gateway/modules/workflows/processing/core/actionExecutor.py` + +**Action Execution:** +- **Zeile 148**: Action-Fehler + ```python + self.services.chat.storeLog(workflow, { + "message": f"❌ **Task {taskNum}**❌ **Action {actionNum}** failed: {result.error}", + "type": "error", + "progress": 1.0 + }) + ``` + +### 6. `gateway/modules/services/serviceExtraction/mainServiceExtraction.py` + +**Extraction Service:** +- **Zeile 553**: Chunking Progress Callback + ```python + self.services.chat.storeLog(workflow, logData) + ``` + - Wird während AI-Chunking aufgerufen + - **Häufige Erzeugung**: Bei jedem Chunking-Progress-Update + +## Zusammenfassung nach Kategorie + +### ✅ Kapitel 6, 5, 4: Korrekt dokumentiert + +Diese Kapitel sind vollständig und korrekt dokumentiert: +- **Kapitel 6**: Extraction Service Chunking (`mainServiceExtraction.py` Zeile 553) +- **Kapitel 5**: Action Executor (`actionExecutor.py` Zeile 148) +- **Kapitel 4**: Workflow Manager (`workflowManager.py` - 11 Stellen) + +### 📋 Kapitel 3: ProgressLogger Aufrufe - Vollständige Liste + +**Alle Aufrufe von `progressLogStart()`, `progressLogUpdate()`, `progressLogFinish()`:** + +**Gesamtanzahl Aufrufe:** +- **startOperation**: 7 Aufrufe + - workflowProcessor.py: 2x (Task Plan, Task Execution) + - mainServiceAi.py: 1x (AI Content Processing) + - mainServiceExtraction.py: 1x (Document Processing) + - methodAi.py: 3x (AI Process, Extract Content, Generate Document) + +- **updateOperation**: ~35+ Aufrufe + - workflowProcessor.py: 2x + - mainServiceAi.py: ~15x (AI Looping + Content Processing) + - mainServiceExtraction.py: 4x (+ Chunking Callback) + - methodAi.py: 8x + - mainServiceWeb.py: 5x (ohne Start/Finish!) + +- **finishOperation**: ~15 Aufrufe + - workflowProcessor.py: 4x (2x Success, 2x Failure) + - mainServiceAi.py: ~6x (verschiedene Operationen) + - mainServiceExtraction.py: 2x + - methodAi.py: 6x + +**Problematische Stellen:** +1. **Zu viele Update-Aufrufe**: Besonders in `_callAiWithLooping()` werden bei jeder Iteration mehrere Updates erstellt +2. **Fehlende Start/Finish**: `mainServiceWeb.py` verwendet nur Updates ohne Start/Finish +3. **Redundante Updates**: Viele Updates mit ähnlichen Progress-Werten (z.B. 0.4, 0.5, 0.6) + +### 📋 Kapitel 2 und 1: Vollständig dokumentiert + +- **Kapitel 2**: `mainServiceChat.storeLog()` - Service-Layer Methode +- **Kapitel 1**: `interfaceDbChatObjects.createLog()` - Datenbank-Methode + +## Empfehlungen zur Reduzierung + +1. **ProgressLogger**: + - Nur bei Start/Finish loggen, nicht bei jedem Update + - Oder: Batch-Updates alle N Sekunden + +2. **Chunking Progress**: + - Nur bei wichtigen Meilensteinen (z.B. 25%, 50%, 75%, 100%) + - Nicht bei jedem einzelnen Chunk + +3. **Workflow Status**: + - Behalten, aber Duplikate entfernen (z.B. mehrere "stopped" Logs) + +4. **Fehler-Logging**: + - Nur kritische Fehler loggen, nicht alle Warnungen/Infos + +## Statistik + +### ChatLog-Erzeugungen nach Quelle: + +1. **ProgressLogger (Kapitel 3)**: ~57+ Aufrufe + - 7x `startOperation()` → erzeugt ChatLog + - ~35+ `updateOperation()` → erzeugt ChatLog bei jedem Update + - ~15x `finishOperation()` → erzeugt ChatLog + - **Häufigste Quelle** - kann bei langen Operationen sehr viele Logs erzeugen + +2. **Extraction Service Chunking (Kapitel 6)**: Variable Häufigkeit + - Wird bei jedem Chunking-Progress-Update aufgerufen + - **Zweithäufigste Quelle** - abhängig von Anzahl der Chunks + +3. **Workflow Manager (Kapitel 4)**: 11 feste Stellen + - Workflow Status-Änderungen (einmalig pro Status-Übergang) + - Fehler-Logging bei Datei-Verarbeitung + +4. **Action Executor (Kapitel 5)**: 1 Stelle + - Action-Fehler-Logging + +### Gesamtübersicht: + +- **Direkte `storeLog()` Aufrufe**: 12 Stellen (Kapitel 4, 5, 6) +- **ProgressLogger Aufrufe**: ~57+ Stellen (Kapitel 3) +- **Gesamtanzahl Code-Stellen**: ~70+ Stellen +- **Potenzial für Reduzierung**: + - ProgressLogger: ~35+ Update-Aufrufe könnten reduziert werden + - Chunking: Variable, abhängig von Chunk-Anzahl + diff --git a/modules/datamodels/datamodelChat.py b/modules/datamodels/datamodelChat.py index 4931d463..7c1cdff3 100644 --- a/modules/datamodels/datamodelChat.py +++ b/modules/datamodels/datamodelChat.py @@ -61,6 +61,12 @@ class ChatLog(BaseModel): performance: Optional[Dict[str, Any]] = Field( None, description="Performance metrics" ) + parentId: Optional[str] = Field( + None, description="Parent log entry ID for hierarchical display" + ) + operationId: Optional[str] = Field( + None, description="Operation ID to group related log entries" + ) registerModelLabels( diff --git a/modules/datamodels/datamodelWorkflow.py b/modules/datamodels/datamodelWorkflow.py index 55bee215..bf3ec319 100644 --- a/modules/datamodels/datamodelWorkflow.py +++ b/modules/datamodels/datamodelWorkflow.py @@ -22,6 +22,10 @@ class ActionDefinition(BaseModel): # Core action selection (Stage 1) action: str = Field(description="Compound action name (method.action)") actionObjective: str = Field(description="Objective for this action") + userMessage: Optional[str] = Field( + None, + description="User-friendly message in user's language explaining what this action will do (generated by AI in prompts)" + ) parametersContext: Optional[str] = Field( None, description="Context for parameter generation" @@ -274,6 +278,111 @@ class TaskResult(BaseModel): actionResult: Any = Field(description="ActionResult from task execution") # ActionResult - forward reference +# Register model labels for UI +registerModelLabels( + "RequestContext", + {"en": "Request Context", "fr": "Contexte de la demande"}, + { + "originalPrompt": {"en": "Original Prompt", "fr": "Invite originale"}, + "documents": {"en": "Documents", "fr": "Documents"}, + "userLanguage": {"en": "User Language", "fr": "Langue de l'utilisateur"}, + "detectedComplexity": {"en": "Detected Complexity", "fr": "Complexité détectée"}, + "requiresDocuments": {"en": "Requires Documents", "fr": "Nécessite des documents"}, + "requiresWebResearch": {"en": "Requires Web Research", "fr": "Nécessite une recherche web"}, + "requiresAnalysis": {"en": "Requires Analysis", "fr": "Nécessite une analyse"}, + "expectedOutputFormat": {"en": "Expected Output Format", "fr": "Format de sortie attendu"}, + "expectedOutputType": {"en": "Expected Output Type", "fr": "Type de sortie attendu"}, + }, +) + +registerModelLabels( + "UnderstandingResult", + {"en": "Understanding Result", "fr": "Résultat de compréhension"}, + { + "parameters": {"en": "Parameters", "fr": "Paramètres"}, + "intention": {"en": "Intention", "fr": "Intention"}, + "context": {"en": "Context", "fr": "Contexte"}, + "documentReferences": {"en": "Document References", "fr": "Références de documents"}, + "tasks": {"en": "Tasks", "fr": "Tâches"}, + }, +) + +registerModelLabels( + "TaskDefinition", + {"en": "Task Definition", "fr": "Définition de tâche"}, + { + "id": {"en": "Task ID", "fr": "ID de la tâche"}, + "objective": {"en": "Objective", "fr": "Objectif"}, + "deliverable": {"en": "Deliverable", "fr": "Livrable"}, + "requiresWebResearch": {"en": "Requires Web Research", "fr": "Nécessite une recherche web"}, + "requiresDocumentAnalysis": {"en": "Requires Document Analysis", "fr": "Nécessite une analyse de documents"}, + "requiresContentGeneration": {"en": "Requires Content Generation", "fr": "Nécessite une génération de contenu"}, + "requiredDocuments": {"en": "Required Documents", "fr": "Documents requis"}, + "extractionOptions": {"en": "Extraction Options", "fr": "Options d'extraction"}, + }, +) + +registerModelLabels( + "TaskResult", + {"en": "Task Result", "fr": "Résultat de tâche"}, + { + "taskId": {"en": "Task ID", "fr": "ID de la tâche"}, + "actionResult": {"en": "Action Result", "fr": "Résultat de l'action"}, + }, +) + +registerModelLabels( + "RequestContext", + {"en": "Request Context", "fr": "Contexte de la demande"}, + { + "originalPrompt": {"en": "Original Prompt", "fr": "Invite originale"}, + "documents": {"en": "Documents", "fr": "Documents"}, + "userLanguage": {"en": "User Language", "fr": "Langue de l'utilisateur"}, + "detectedComplexity": {"en": "Detected Complexity", "fr": "Complexité détectée"}, + "requiresDocuments": {"en": "Requires Documents", "fr": "Nécessite des documents"}, + "requiresWebResearch": {"en": "Requires Web Research", "fr": "Nécessite une recherche web"}, + "requiresAnalysis": {"en": "Requires Analysis", "fr": "Nécessite une analyse"}, + "expectedOutputFormat": {"en": "Expected Output Format", "fr": "Format de sortie attendu"}, + "expectedOutputType": {"en": "Expected Output Type", "fr": "Type de sortie attendu"}, + }, +) + +registerModelLabels( + "UnderstandingResult", + {"en": "Understanding Result", "fr": "Résultat de compréhension"}, + { + "parameters": {"en": "Parameters", "fr": "Paramètres"}, + "intention": {"en": "Intention", "fr": "Intention"}, + "context": {"en": "Context", "fr": "Contexte"}, + "documentReferences": {"en": "Document References", "fr": "Références de documents"}, + "tasks": {"en": "Tasks", "fr": "Tâches"}, + }, +) + +registerModelLabels( + "TaskDefinition", + {"en": "Task Definition", "fr": "Définition de tâche"}, + { + "id": {"en": "Task ID", "fr": "ID de la tâche"}, + "objective": {"en": "Objective", "fr": "Objectif"}, + "deliverable": {"en": "Deliverable", "fr": "Livrable"}, + "requiresWebResearch": {"en": "Requires Web Research", "fr": "Nécessite une recherche web"}, + "requiresDocumentAnalysis": {"en": "Requires Document Analysis", "fr": "Nécessite une analyse de documents"}, + "requiresContentGeneration": {"en": "Requires Content Generation", "fr": "Nécessite une génération de contenu"}, + "requiredDocuments": {"en": "Required Documents", "fr": "Documents requis"}, + "extractionOptions": {"en": "Extraction Options", "fr": "Options d'extraction"}, + }, +) + +registerModelLabels( + "TaskResult", + {"en": "Task Result", "fr": "Résultat de tâche"}, + { + "taskId": {"en": "Task ID", "fr": "ID de la tâche"}, + "actionResult": {"en": "Action Result", "fr": "Résultat de l'action"}, + }, +) + # Register model labels for UI registerModelLabels( "ActionDefinition", diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 14218247..055a8b9e 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -3,7 +3,7 @@ import logging import re import time from typing import Dict, Any, List, Optional, Tuple -from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument +from modules.datamodels.datamodelChat import PromptPlaceholder from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum from modules.datamodels.datamodelExtraction import ContentPart @@ -189,17 +189,25 @@ Respond with ONLY a JSON object in this exact format: lastRawResponse = None # Store last raw JSON response for continuation documentMetadata = None # Store document metadata (title, filename) from first iteration + # Get parent log ID for iteration operations + parentLogId = None + if operationId: + parentLogId = self.services.chat.getOperationLogId(operationId) + while iteration < maxIterations: iteration += 1 - # Update progress for iteration start + # Create separate operation for each iteration with parent reference + iterationOperationId = None if operationId: - if iteration == 1: - self.services.chat.progressLogUpdate(operationId, 0.5, f"Starting AI call iteration {iteration}") - else: - # For continuation iterations, show progress incrementally - baseProgress = 0.5 + (min(iteration - 1, maxIterations) / maxIterations * 0.4) # Progress from 0.5 to 0.9 over maxIterations iterations - self.services.chat.progressLogUpdate(operationId, baseProgress, f"Continuing generation (iteration {iteration})") + iterationOperationId = f"{operationId}_iter_{iteration}" + self.services.chat.progressLogStart( + iterationOperationId, + "AI Call", + f"Iteration {iteration}", + "", + parentId=parentLogId + ) # Build iteration prompt if len(allSections) > 0 and promptBuilder and promptArgs: @@ -216,8 +224,8 @@ Respond with ONLY a JSON object in this exact format: # Make AI call try: - if operationId and iteration == 1: - self.services.chat.progressLogUpdate(operationId, 0.51, "Calling AI model") + if iterationOperationId: + self.services.chat.progressLogUpdate(iterationOperationId, 0.3, "Calling AI model") request = AiCallRequest( prompt=iterationPrompt, context="", @@ -234,12 +242,8 @@ Respond with ONLY a JSON object in this exact format: result = response.content # Update progress after AI call - if operationId: - if iteration == 1: - self.services.chat.progressLogUpdate(operationId, 0.6, f"AI response received (iteration {iteration})") - else: - progress = 0.6 + (min(iteration - 1, 10) * 0.03) - self.services.chat.progressLogUpdate(operationId, progress, f"Processing response (iteration {iteration})") + if iterationOperationId: + self.services.chat.progressLogUpdate(iterationOperationId, 0.6, "AI response received") # Write raw AI response to debug file if iteration == 1: @@ -279,9 +283,9 @@ Respond with ONLY a JSON object in this exact format: documentMetadata = self._extractDocumentMetadata(parsedResult) # Update progress after parsing - if operationId: + if iterationOperationId: if extractedSections: - self.services.chat.progressLogUpdate(operationId, 0.65 + (min(iteration - 1, 10) * 0.025), f"Extracted {len(extractedSections)} sections (iteration {iteration})") + self.services.chat.progressLogUpdate(iterationOperationId, 0.8, f"Extracted {len(extractedSections)} sections") if not extractedSections: # If we're in continuation mode and JSON was incomplete, don't stop - continue to allow retry @@ -297,9 +301,14 @@ Respond with ONLY a JSON object in this exact format: # Check if we should continue (completion detection) if self._shouldContinueGeneration(allSections, iteration, wasJsonComplete, result): + # Finish iteration operation (will continue with next iteration) + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, True) continue else: - # Done - build final result + # Done - finish iteration and update main operation + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, True) if operationId: self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)") logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections") @@ -307,6 +316,8 @@ Respond with ONLY a JSON object in this exact format: except Exception as e: logger.error(f"Error in AI call iteration {iteration}: {str(e)}") + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, False) break if iteration >= maxIterations: @@ -542,7 +553,7 @@ Respond with ONLY a JSON object in this exact format: contentParts: Optional[List[ContentPart]] = None, outputFormat: Optional[str] = None, title: Optional[str] = None, - documents: Optional[List[ChatDocument]] = None # Phase 6: backward compatibility, Phase 7: remove + parentOperationId: Optional[str] = None # Parent operation ID for hierarchical logging ) -> AiResponse: """ Unified AI content processing method (replaces callAiDocuments and callAiText). @@ -553,7 +564,7 @@ Respond with ONLY a JSON object in this exact format: options: AI call configuration options (REQUIRED - operationType must be set) outputFormat: Optional output format for document generation (e.g., 'pdf', 'docx', 'xlsx') title: Optional title for generated documents - documents: Optional list of documents (Phase 6: backward compatibility - extracts internally) + parentOperationId: Optional parent operation ID for hierarchical logging Returns: AiResponse with content, metadata, and optional documents @@ -564,25 +575,23 @@ Respond with ONLY a JSON object in this exact format: workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" aiOperationId = f"ai_content_{workflowId}_{int(time.time())}" - # Start progress tracking + # Get parent log ID if parent operation exists + parentLogId = None + if parentOperationId: + parentLogId = self.services.chat.getOperationLogId(parentOperationId) + + # Start progress tracking with parent reference self.services.chat.progressLogStart( aiOperationId, "AI content processing", "Content Processing", - f"Format: {outputFormat or 'text'}" + f"Format: {outputFormat or 'text'}", + parentId=parentLogId ) try: - # Phase 7: Extraction is now separate - contentParts must be extracted before calling - # If documents parameter is still provided (backward compatibility), raise error - if documents and len(documents) > 0: - raise ValueError( - "callAiContent() no longer accepts 'documents' parameter. " - "Extract content first using 'ai.extractContent' action, then pass 'contentParts'." - ) - - # Phase 6: Analyze prompt if operationType not set (backward compatibility) - # Phase 7: Require operationType to be set before calling + # Extraction is now separate - contentParts must be extracted before calling + # Require operationType to be set before calling opType = getattr(options, "operationType", None) if not opType: # If outputFormat is specified, default to DATA_GENERATE @@ -755,13 +764,25 @@ Respond with ONLY a JSON object in this exact format: if extractedTitle: generated_data["metadata"]["title"] = extractedTitle - self.services.chat.progressLogUpdate(aiOperationId, 0.8, f"Rendering to {outputFormat} format") + # Create separate operation for content rendering + renderOperationId = f"{aiOperationId}_render" + renderParentLogId = self.services.chat.getOperationLogId(aiOperationId) + self.services.chat.progressLogStart( + renderOperationId, + "Content Rendering", + "Rendering", + f"Format: {outputFormat}", + parentId=renderParentLogId + ) + try: from modules.services.serviceGeneration.mainServiceGeneration import GenerationService generationService = GenerationService(self.services) + self.services.chat.progressLogUpdate(renderOperationId, 0.5, f"Rendering to {outputFormat} format") rendered_content, mime_type = await generationService.renderReport( generated_data, outputFormat, extractedTitle or "Generated Document", prompt, self ) + self.services.chat.progressLogFinish(renderOperationId, True) # Determine document name if extractedFilename: @@ -803,6 +824,8 @@ Respond with ONLY a JSON object in this exact format: except Exception as e: logger.error(f"Error rendering document: {str(e)}") + if renderOperationId: + self.services.chat.progressLogFinish(renderOperationId, False) self.services.chat.progressLogFinish(aiOperationId, False) raise ValueError(f"Rendering failed: {str(e)}") diff --git a/modules/services/serviceChat/mainServiceChat.py b/modules/services/serviceChat/mainServiceChat.py index bab544ca..8836712c 100644 --- a/modules/services/serviceChat/mainServiceChat.py +++ b/modules/services/serviceChat/mainServiceChat.py @@ -976,10 +976,10 @@ class ChatService: def createProgressLogger(self) -> ProgressLogger: return ProgressLogger(self.services) - def progressLogStart(self, operationId: str, serviceName: str, actionName: str, context: str = ""): + def progressLogStart(self, operationId: str, serviceName: str, actionName: str, context: str = "", parentId: Optional[str] = None): """Wrapper for ProgressLogger.startOperation""" progressLogger = self._getProgressLogger() - return progressLogger.startOperation(operationId, serviceName, actionName, context) + return progressLogger.startOperation(operationId, serviceName, actionName, context, parentId) def progressLogUpdate(self, operationId: str, progress: float, statusUpdate: str = ""): """Wrapper for ProgressLogger.updateOperation""" @@ -990,4 +990,9 @@ class ChatService: """Wrapper for ProgressLogger.finishOperation""" progressLogger = self._getProgressLogger() return progressLogger.finishOperation(operationId, success) + + def getOperationLogId(self, operationId: str) -> Optional[str]: + """Get the log entry ID for an operation (the start log entry).""" + progressLogger = self._getProgressLogger() + return progressLogger.getOperationLogId(operationId) diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py index 126c7ffd..c35e6156 100644 --- a/modules/services/serviceExtraction/mainServiceExtraction.py +++ b/modules/services/serviceExtraction/mainServiceExtraction.py @@ -459,7 +459,11 @@ class ExtractionService: # Process parts (not chunks) with model-aware AI calls if operationId: self.services.chat.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts") - partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId) + # Get parent log ID for part operations + parentLogId = None + if operationId: + parentLogId = self.services.chat.getOperationLogId(operationId) + partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId, parentLogId) # Merge results using existing merging system if operationId: @@ -485,7 +489,8 @@ class ExtractionService: prompt: str, aiObjects: Any, options: Optional[AiCallOptions] = None, - operationId: Optional[str] = None + operationId: Optional[str] = None, + parentLogId: Optional[str] = None ) -> List[PartResult]: """Process content parts with model-aware chunking and proper mapping.""" @@ -522,6 +527,19 @@ class ExtractionService: start_time = time.time() + # Create separate operation for each part with parent reference + partOperationId = None + if operationId: + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + partOperationId = f"{operationId}_part_{part_index}" + self.services.chat.progressLogStart( + partOperationId, + "Content Processing", + f"Part {part_index + 1}", + f"Type: {part.typeGroup}", + parentId=parentLogId + ) + try: # Create AI call request with content part request = AiCallRequest( @@ -531,31 +549,17 @@ class ExtractionService: contentParts=[part] # Pass as list for unified processing ) - # Update progress before AI call - if operationId and totalParts > 0: - processedCount[0] += 1 - progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9 - self.services.chat.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}") + # Update progress - initiating + if partOperationId: + self.services.chat.progressLogUpdate(partOperationId, 0.3, "Initiating") - # Create progress callback for chunking - def chunkingProgressCallback(chunkProgress: float, status: str): - """Callback to log chunking progress as ChatLog entries""" - workflow = self.services.workflow - if workflow: - logData = { - "workflowId": workflow.id, - "message": "Service AI", - "type": "info", - "status": status, - "progress": chunkProgress - } - try: - self.services.chat.storeLog(workflow, logData) - except Exception as e: - logger.warning(f"Failed to store chunking progress log: {e}") + # Call AI with model-aware chunking (no progress callback - handled by parent operation) + response = await aiObjects.call(request) - # Call AI with model-aware chunking and progress callback - response = await aiObjects.call(request, chunkingProgressCallback) + # Update progress - completed + if partOperationId: + self.services.chat.progressLogUpdate(partOperationId, 0.9, "Completed") + self.services.chat.progressLogFinish(partOperationId, True) processing_time = time.time() - start_time diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py index 3772e12e..b771cb9d 100644 --- a/modules/services/serviceWeb/mainServiceWeb.py +++ b/modules/services/serviceWeb/mainServiceWeb.py @@ -5,6 +5,7 @@ Manages the two-step process: WEB_SEARCH then WEB_CRAWL. import json import logging +import time from typing import Dict, Any, List, Optional from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl @@ -45,9 +46,19 @@ class WebService: Returns: Consolidated research results as dictionary """ + # Start progress tracking if operationId provided + if operationId: + self.services.chat.progressLogStart( + operationId, + "Web Research", + "Research", + f"Depth: {researchDepth}" + ) + try: # Step 1: AI intention analysis - extract URLs and parameters from prompt - self.services.chat.progressLogUpdate(operationId, 0.1, "Analyzing research intent") + if operationId: + self.services.chat.progressLogUpdate(operationId, 0.1, "Analyzing research intent") analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth) @@ -98,16 +109,27 @@ class WebService: depthMap = {"fast": 1, "general": 2, "deep": 3} maxDepth = depthMap.get(finalResearchDepth.lower(), 2) - # Step 5: Crawl all URLs - self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") + # Step 5: Crawl all URLs with hierarchical logging + if operationId: + self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") + self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") + + # Get parent log ID for URL-level operations + parentLogId = None + if operationId: + parentLogId = self.services.chat.getOperationLogId(operationId) crawlResult = await self._performWebCrawl( instruction=instruction, urls=allUrls, - maxDepth=maxDepth + maxDepth=maxDepth, + parentLogId=parentLogId ) - self.services.chat.progressLogUpdate(operationId, 0.9, "Consolidating results") + if operationId: + self.services.chat.progressLogUpdate(operationId, 0.9, "Consolidating results") + self.services.chat.progressLogUpdate(operationId, 0.95, "Completed") + self.services.chat.progressLogFinish(operationId, True) # Return consolidated result result = { @@ -126,6 +148,8 @@ class WebService: except Exception as e: logger.error(f"Error in web research: {str(e)}") + if operationId: + self.services.chat.progressLogFinish(operationId, False) raise async def _analyzeResearchIntent( @@ -286,16 +310,33 @@ Return ONLY valid JSON, no additional text: self, instruction: str, urls: List[str], - maxDepth: int = 2 + maxDepth: int = 2, + parentLogId: Optional[str] = None ) -> List[Dict[str, Any]]: """Perform web crawl on list of URLs - calls plugin for each URL individually.""" crawlResults = [] # Loop over each URL and crawl one at a time - for url in urls: + for urlIndex, url in enumerate(urls): + # Create separate operation for each URL with parent reference + urlOperationId = None + if parentLogId: + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}" + self.services.chat.progressLogStart( + urlOperationId, + "Web Crawl", + f"URL {urlIndex + 1}", + url[:50] + "..." if len(url) > 50 else url, + parentId=parentLogId + ) + try: logger.info(f"Crawling URL: {url}") + if urlOperationId: + self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating") + # Build crawl prompt model for single URL crawlPromptModel = AiCallPromptWebCrawl( instruction=instruction, @@ -322,6 +363,10 @@ Return ONLY valid JSON, no additional text: outputFormat="json" ) + if urlOperationId: + self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Completed") + self.services.chat.progressLogFinish(urlOperationId, True) + # Extract content from AiResponse crawlResult = crawlResponse.content @@ -355,6 +400,8 @@ Return ONLY valid JSON, no additional text: except Exception as e: logger.error(f"Error crawling URL {url}: {str(e)}") + if urlOperationId: + self.services.chat.progressLogFinish(urlOperationId, False) crawlResults.append({"url": url, "error": str(e)}) return crawlResults diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py index 4d1c890f..bbc000ae 100644 --- a/modules/shared/progressLogger.py +++ b/modules/shared/progressLogger.py @@ -11,7 +11,7 @@ logger = logging.getLogger(__name__) class ProgressLogger: - """Centralized progress logger for workflow operations.""" + """Centralized progress logger for workflow operations with hierarchical support.""" def __init__(self, services): """Initialize progress logger. @@ -22,8 +22,9 @@ class ProgressLogger: self.services = services self.activeOperations = {} self.finishedOperations = set() # Track finished operations to avoid repeated warnings + self.operationLogIds = {} # Map operationId to the log entry ID for parent reference - def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = ""): + def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = "", parentId: Optional[str] = None): """Start a new long-running operation. Args: @@ -31,6 +32,7 @@ class ProgressLogger: serviceName: Name of the service (e.g., "Extract", "AI", "Generate") actionName: Name of the action being performed context: Additional context information + parentId: Optional parent log entry ID for hierarchical display """ # Remove from finished operations if it was there (for restart scenarios) self.finishedOperations.discard(operationId) @@ -39,9 +41,12 @@ class ProgressLogger: 'service': serviceName, 'action': actionName, 'context': context, - 'startTime': time.time() + 'startTime': time.time(), + 'parentId': parentId } - self._logProgress(operationId, 0.0, f"Starting {actionName}") + logId = self._logProgress(operationId, 0.0, f"Starting {actionName}", parentId=parentId) + if logId: + self.operationLogIds[operationId] = logId logger.debug(f"Started operation {operationId}: {serviceName} - {actionName}") def updateOperation(self, operationId: str, progress: float, statusUpdate: str = ""): @@ -65,7 +70,9 @@ class ProgressLogger: op = self.activeOperations[operationId] context = f"{op['context']} {statusUpdate}".strip() - self._logProgress(operationId, progress, context) + # Use the same parentId as the start operation - all logs (start/update/finish) share the same parent + parentId = op.get('parentId') + self._logProgress(operationId, progress, context, parentId=parentId) logger.debug(f"Updated operation {operationId}: {progress:.2f} - {context}") def finishOperation(self, operationId: str, success: bool = True): @@ -86,8 +93,11 @@ class ProgressLogger: finalProgress = 1.0 if success else 0.0 status = "Done" if success else "Failed" + # Use the same parentId as the start operation - all logs (start/update/finish) share the same parent + parentId = op.get('parentId') + # Create completion log BEFORE removing from activeOperations - self._logProgress(operationId, finalProgress, status) + self._logProgress(operationId, finalProgress, status, parentId=parentId) # Log completion time duration = time.time() - op['startTime'] @@ -95,20 +105,26 @@ class ProgressLogger: # Remove from active operations AFTER creating the log del self.activeOperations[operationId] + if operationId in self.operationLogIds: + del self.operationLogIds[operationId] # Mark as finished to prevent repeated warnings from updateOperation calls self.finishedOperations.add(operationId) - def _logProgress(self, operationId: str, progress: float, status: str): + def _logProgress(self, operationId: str, progress: float, status: str, parentId: Optional[str] = None) -> Optional[str]: """Create standardized ChatLog entry. Args: operationId: Unique identifier for the operation progress: Progress value between 0.0 and 1.0 status: Status information for the log entry + parentId: Optional parent log entry ID for hierarchical display + + Returns: + The created log entry ID, or None if creation failed """ if operationId not in self.activeOperations: - return + return None op = self.activeOperations[operationId] message = f"Service {op['service']}" @@ -116,20 +132,35 @@ class ProgressLogger: workflow = self.services.workflow if not workflow: logger.warning(f"Cannot log progress: no workflow available") - return + return None logData = { "workflowId": workflow.id, "message": message, "type": "info", "status": status, - "progress": progress + "progress": progress, + "operationId": operationId, + "parentId": parentId } try: - self.services.chat.storeLog(workflow, logData) + chatLog = self.services.chat.storeLog(workflow, logData) + return chatLog.id if chatLog else None except Exception as e: logger.error(f"Failed to store progress log: {e}") + return None + + def getOperationLogId(self, operationId: str) -> Optional[str]: + """Get the log entry ID for an operation (the start log entry). + + Args: + operationId: Unique identifier for the operation + + Returns: + The log entry ID for the operation start, or None if not found + """ + return self.operationLogIds.get(operationId) def getActiveOperations(self) -> Dict[str, Dict[str, Any]]: """Get all currently active operations. diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py index ae9b7c98..e1c91e1a 100644 --- a/modules/workflows/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -159,7 +159,8 @@ class MethodAi(MethodBase): prompt=aiPrompt, options=options, contentParts=contentParts, # Already extracted (or None if no documents) - outputFormat=output_format + outputFormat=output_format, + parentOperationId=operationId ) # Update progress - processing result @@ -282,7 +283,11 @@ class MethodAi(MethodBase): processDocumentsIndividually=True ) + # Get parent log ID for document-level operations + parentLogId = self.services.chat.getOperationLogId(operationId) + # Call extraction service + self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") self.services.chat.progressLogUpdate(operationId, 0.5, f"Extracting content from {len(chatDocuments)} documents") extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions) diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py index ee484572..55222ece 100644 --- a/modules/workflows/processing/core/messageCreator.py +++ b/modules/workflows/processing/core/messageCreator.py @@ -141,14 +141,24 @@ class MessageCreator: userFriendlyText = taskObjective if result.success: - messageText = f"**Action {currentAction} ({action.execMethod}.{action.execAction})**\n\n" - messageText += f"✅ {userFriendlyText}\n\n" + # Use user-friendly message without technical action names if available + if userFriendlyText and userFriendlyText != taskObjective: + messageText = f"✅ {userFriendlyText}\n\n" + else: + # Fallback to technical format if no user message available + messageText = f"**Action {currentAction} ({action.execMethod}.{action.execAction})**\n\n" + messageText += f"✅ {userFriendlyText}\n\n" else: # ⚠️ FAILURE MESSAGE - Show error details to user errorDetails = result.error if result.error else "Unknown error occurred" - messageText = f"**Action {currentAction} ({action.execMethod}.{action.execAction})**\n\n" - messageText += f"❌ {userFriendlyText}\n\n" - messageText += f"{errorDetails}\n\n" + if userFriendlyText and userFriendlyText != taskObjective: + messageText = f"❌ {userFriendlyText}\n\n" + messageText += f"{errorDetails}\n\n" + else: + # Fallback to technical format if no user message available + messageText = f"**Action {currentAction} ({action.execMethod}.{action.execAction})**\n\n" + messageText += f"❌ {userFriendlyText}\n\n" + messageText += f"{errorDetails}\n\n" # Build concise summary to persist for history context doc_count = len(createdDocuments) if createdDocuments else 0 diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py index ec6d3bb7..d9baa462 100644 --- a/modules/workflows/processing/core/taskPlanner.py +++ b/modules/workflows/processing/core/taskPlanner.py @@ -169,6 +169,8 @@ class TaskPlanner: try: task = TaskStep(**taskDict) + # User message is already generated by the AI in the task planning prompt + # No separate call needed - userMessage comes directly from the AI response tasks.append(task) except Exception as e: logger.warning(f"Skipping invalid task {i+1}: {str(e)}") diff --git a/modules/workflows/processing/modes/modeDynamic.py b/modules/workflows/processing/modes/modeDynamic.py index cd4149d2..6af8e8e1 100644 --- a/modules/workflows/processing/modes/modeDynamic.py +++ b/modules/workflows/processing/modes/modeDynamic.py @@ -376,6 +376,11 @@ class DynamicMode(BaseMode): actionDef = parseJsonWithModel(paramsResp, ActionDefinition) # Extract parameters from parsed model parameters = actionDef.parameters if actionDef.parameters else {} + + # Extract userMessage from Stage 2 response if available + # Stage 2 can override Stage 1 userMessage with more specific message + if hasattr(actionDef, 'userMessage') and actionDef.userMessage: + selection['userMessage'] = actionDef.userMessage except ValueError as e: logger.error(f"Failed to parse ActionDefinition from parameters response: {e}") logger.error(f"Response was: {paramsResp[:500]}...") @@ -421,12 +426,21 @@ class DynamicMode(BaseMode): currentTask = getattr(self.services.workflow, 'currentTask', 0) resultLabel = f"round{currentRound}_task{currentTask}_action{stepIndex}_results" + # User message is generated by AI in the action selection/parameters prompt + # Extract from selection if available (from Stage 1 or Stage 2) + userMessage = None + if hasattr(selection, 'userMessage') and selection.get('userMessage'): + userMessage = selection.get('userMessage') + elif isinstance(selection, dict) and 'userMessage' in selection: + userMessage = selection['userMessage'] + taskAction = self._createActionItem({ "execMethod": methodName, "execAction": actionName, "execParameters": parameters, "execResultLabel": resultLabel, - "status": TaskStatus.PENDING + "status": TaskStatus.PENDING, + "userMessage": userMessage # User message from AI prompt (if provided) }) # Execute using existing single action flow (message creation is handled internally) diff --git a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py index 6dfd9090..7985fdb1 100644 --- a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py +++ b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py @@ -93,6 +93,7 @@ REPLY: Return ONLY a JSON object with the following structure (no comments, no e {{ "action": "method.action_name", "actionObjective": "...", + "userMessage": "User-friendly message in language '{{KEY:USER_LANGUAGE}}' explaining what this action will do (1 sentence, first person, friendly tone)", "learnings": ["..."], "requiredInputDocuments": ["docList:..."], "requiredConnection": "connection:..." | null, @@ -244,6 +245,7 @@ PREVIOUS FAILURE ANALYSIS: REPLY (ONLY JSON): {{ "schema": "parameters_v1", + "userMessage": "User-friendly message in language '{{KEY:USER_LANGUAGE}}' explaining what this action will do (1 sentence, first person, friendly tone)", "parameters": {{ "paramName": "value" }} diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index df502ffa..34fa8c12 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -2,13 +2,20 @@ # Main workflow processor with delegation pattern import logging -from typing import Dict, Any, Optional, List -from modules.datamodels.datamodelChat import TaskStep, TaskContext, TaskPlan, TaskResult +import json +from typing import Dict, Any, Optional, List, TYPE_CHECKING +from modules.datamodels import datamodelChat +from modules.datamodels.datamodelChat import TaskStep, TaskContext, TaskPlan, ActionResult, ActionDocument, ChatDocument, ChatMessage from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum from modules.workflows.processing.modes.modeBase import BaseMode from modules.workflows.processing.modes.modeDynamic import DynamicMode from modules.workflows.processing.modes.modeAutomation import AutomationMode from modules.workflows.processing.shared.stateTools import checkWorkflowStopped +from modules.datamodels.datamodelAi import OperationTypeEnum, PriorityEnum, ProcessingModeEnum +from modules.shared.jsonUtils import extractJsonString, repairBrokenJson + +if TYPE_CHECKING: + from modules.datamodels.datamodelWorkflow import TaskResult logger = logging.getLogger(__name__) @@ -18,6 +25,7 @@ class WorkflowProcessor: def __init__(self, services): self.services = services self.mode = self._createMode(services.workflow.workflowMode) + self.workflow = services.workflow def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode: """Create the appropriate mode implementation based on workflow mode""" @@ -78,7 +86,7 @@ class WorkflowProcessor: self.services.chat.progressLogFinish(operationId, False) raise - async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext) -> TaskResult: + async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext) -> datamodelChat.TaskResult: """Execute a task step using the appropriate mode""" import time @@ -300,3 +308,353 @@ class WorkflowProcessor: except Exception as e: logger.error(f"Error in prepareTaskHandover: {str(e)}") return {'error': str(e)} + + # Fast Path Implementation + + async def detectComplexity(self, prompt: str, documents: Optional[List[ChatDocument]] = None) -> str: + """ + Detect request complexity using AI-based semantic understanding. + + Returns: + "simple" | "moderate" | "complex" + + Simple: Single question, no documents, straightforward answer (5-15s) + Moderate: Multiple steps, some documents, structured response (30-60s) + Complex: Multi-task, many documents, research needed, generation required (60-120s) + """ + try: + # Ensure AI service is initialized + await self.services.ai._ensureAiObjectsInitialized() + + # Build complexity detection prompt (language-agnostic, semantic) + complexityPrompt = ( + "You are a complexity analyzer. Analyze the user's request and determine its complexity level.\n\n" + "Consider:\n" + "- Number of distinct tasks or steps required\n" + "- Amount and type of documents provided\n" + "- Need for external research or web search\n" + "- Need for document analysis or extraction\n" + "- Need for content generation (reports, summaries, etc.)\n" + "- Need for multi-step reasoning or planning\n\n" + "Complexity levels:\n" + "- 'simple': Single question, no documents or minimal documents, straightforward answer that can be provided in one AI response (5-15s)\n" + "- 'moderate': Multiple steps, some documents, structured response requiring some processing (30-60s)\n" + "- 'complex': Multi-task workflow, many documents, research needed, content generation required, multi-step planning (60-120s)\n\n" + f"User request:\n{prompt}\n\n" + ) + + if documents and len(documents) > 0: + complexityPrompt += f"\nDocuments provided: {len(documents)} document(s)\n" + # Add document types + docTypes = [doc.mimeType for doc in documents if hasattr(doc, 'mimeType')] + if docTypes: + complexityPrompt += f"Document types: {', '.join(set(docTypes))}\n" + + complexityPrompt += ( + "\nReturn ONLY a JSON object with this exact structure:\n" + "{\n" + ' "complexity": "simple" | "moderate" | "complex",\n' + ' "reasoning": "Brief explanation of why this complexity level"\n' + "}\n" + ) + + # Call AI for complexity detection (planning call - no documents needed) + aiResponse = await self.services.ai.callAiPlanning( + prompt=complexityPrompt, + placeholders=None, + debugType="complexity_detection" + ) + + # Parse response + complexity = "moderate" # Default fallback + try: + # Extract response content (AiResponse.content is a string) + responseContent = aiResponse.content if isinstance(aiResponse, str) else (aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse)) + + # Extract JSON from response + jsonStr = extractJsonString(responseContent) + if not jsonStr: + # Try repair if broken + jsonStr = repairBrokenJson(responseContent) + + if jsonStr: + parsed = json.loads(jsonStr) + complexity = parsed.get("complexity", "moderate") + reasoning = parsed.get("reasoning", "") + logger.info(f"Complexity detected: {complexity} - {reasoning}") + else: + logger.warning("Could not parse complexity detection response, defaulting to 'moderate'") + except Exception as e: + logger.warning(f"Error parsing complexity detection: {str(e)}, defaulting to 'moderate'") + + return complexity + + except Exception as e: + logger.error(f"Error in detectComplexity: {str(e)}") + # Default to moderate on error (safe fallback) + return "moderate" + + async def fastPathExecute(self, prompt: str, documents: Optional[List[ChatDocument]] = None, userLanguage: Optional[str] = None) -> ActionResult: + """ + Execute simple requests via fast path (single AI call). + + Fast path is for simple requests that can be answered in one AI response: + - Single question, no complex processing + - No document extraction needed + - No multi-step planning required + - Direct answer generation + + Returns: + ActionResult with response text and optional documents + """ + try: + # Ensure AI service is initialized + await self.services.ai._ensureAiObjectsInitialized() + + # Build fast path prompt (understand + execute + deliver in one call) + fastPathPrompt = ( + "You are a helpful assistant. Answer the user's question directly and comprehensively.\n\n" + f"User question:\n{prompt}\n\n" + ) + + # Add user language context if available + if userLanguage: + fastPathPrompt += f"Respond in the user's language: {userLanguage}\n\n" + + fastPathPrompt += ( + "Provide a clear, complete answer. If the question requires information from documents, " + "extract and present the relevant information. If it's a general question, provide a helpful response.\n\n" + "Format your response as plain text (no markdown code blocks unless showing code examples)." + ) + + # Prepare AI call options for fast path (balanced, fast processing) + from modules.datamodels.datamodelAi import AiCallOptions + + options = AiCallOptions( + operationType=OperationTypeEnum.TEXT, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.STANDARD, + maxCost=0.10, # Low cost for simple requests + maxProcessingTime=15 # Fast path should complete in 15s + ) + + # Call AI (content call - no documents needed for fast path) + aiResponse = await self.services.ai.callAiContent( + prompt=fastPathPrompt, + contentParts=None, # Fast path doesn't process documents + options=options, + outputFormat=None # Text response, not document generation + ) + + # Extract response content (AiResponse.content is a string) + responseText = aiResponse.content if isinstance(aiResponse, str) else (aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse)) + + # Create ActionResult with response + # For fast path, we create a simple text document with the response + from modules.datamodels.datamodelChat import ActionDocument + + responseDoc = ActionDocument( + documentName="fast_path_response.txt", + documentData=responseText.encode('utf-8') if isinstance(responseText, str) else responseText, + mimeType="text/plain" + ) + + result = ActionResult( + success=True, + documents=[responseDoc], + resultLabel="fast_path_response" + ) + + logger.info(f"Fast path executed successfully, response length: {len(responseText)} chars") + return result + + except Exception as e: + logger.error(f"Error in fastPathExecute: {str(e)}") + return ActionResult.isFailure(f"Fast path execution failed: {str(e)}") + + # Workflow-Level Functions + + async def initialUnderstanding(self, context: Any) -> Any: # RequestContext -> UnderstandingResult + """ + Initial understanding phase: Combined AI call for parameters + intention + context + tasks. + + This function performs a unified understanding of the user's request: + - Extracts basic parameters (language, format, detail level) + - Determines user intention (primaryGoal, secondaryGoals, intentionType) + - Extracts context (topics, requirements, constraints) + - Identifies document references with purpose and relevance + - Creates TaskDefinition[] with deliverables + + Args: + context: RequestContext with normalized user input + + Returns: + UnderstandingResult with all understanding components + """ + try: + from modules.datamodels.datamodelWorkflow import UnderstandingResult, TaskDefinition + from modules.shared.jsonUtils import parseJsonWithModel + + # Ensure AI service is initialized + await self.services.ai._ensureAiObjectsInitialized() + + # Build combined understanding prompt + understandingPrompt = ( + "You are a request understanding system. Analyze the user's request comprehensively and provide:\n\n" + "1. **Parameters**: Basic parameters (language, format, detail level)\n" + "2. **Intention**: User intention (primaryGoal, secondaryGoals, intentionType)\n" + "3. **Context**: Extracted context (topics, requirements, constraints)\n" + "4. **Document References**: Document references with purpose and relevance\n" + "5. **Tasks**: Task definitions with deliverables\n\n" + f"User request:\n{context.originalPrompt}\n\n" + f"User language: {context.userLanguage}\n" + f"Complexity: {context.detectedComplexity}\n" + ) + + if context.documents and len(context.documents) > 0: + understandingPrompt += f"\nDocuments provided: {len(context.documents)} document(s)\n" + docTypes = [doc.mimeType for doc in context.documents if hasattr(doc, 'mimeType')] + if docTypes: + understandingPrompt += f"Document types: {', '.join(set(docTypes))}\n" + + understandingPrompt += ( + "\nReturn ONLY a JSON object with this exact structure:\n" + "{\n" + ' "parameters": {"language": "...", "format": "...", "detailLevel": "..."},\n' + ' "intention": {"primaryGoal": "...", "secondaryGoals": [...], "intentionType": "..."},\n' + ' "context": {"topics": [...], "requirements": [...], "constraints": [...]},\n' + ' "documentReferences": [{"reference": "...", "purpose": "...", "relevance": "..."}],\n' + ' "tasks": [{"id": "...", "objective": "...", "deliverable": {...}, ...}]\n' + "}\n" + ) + + # Call AI for understanding (planning call) + aiResponse = await self.services.ai.callAiPlanning( + prompt=understandingPrompt, + placeholders=None, + debugType="initial_understanding" + ) + + # Parse response using UnderstandingResult model + try: + understandingResult = parseJsonWithModel(aiResponse, UnderstandingResult) + logger.info(f"Initial understanding completed: {len(understandingResult.tasks)} tasks identified") + return understandingResult + except Exception as e: + logger.error(f"Error parsing UnderstandingResult: {str(e)}") + # Return minimal UnderstandingResult on error + return UnderstandingResult( + parameters={"language": context.userLanguage}, + intention={"primaryGoal": context.originalPrompt}, + context={}, + documentReferences=[], + tasks=[] + ) + + except Exception as e: + logger.error(f"Error in initialUnderstanding: {str(e)}") + # Return minimal UnderstandingResult on error + from modules.datamodels.datamodelWorkflow import UnderstandingResult + return UnderstandingResult( + parameters={"language": context.userLanguage}, + intention={"primaryGoal": context.originalPrompt}, + context={}, + documentReferences=[], + tasks=[] + ) + + async def persistTaskResult(self, taskResult: Any, workflow: ChatWorkflow, context: Optional[TaskContext] = None) -> ChatMessage: # TaskResult -> ChatMessage + """ + Persist task result as ChatMessage + ChatDocuments for cross-task/round references. + + This function converts a TaskResult (workflow execution format) into a ChatMessage + (persistent format) so that documents can be referenced by subsequent tasks or rounds + using docList: references. + + Args: + taskResult: TaskResult from task execution + workflow: Current workflow + context: Optional TaskContext for additional context + + Returns: + ChatMessage with persisted documents + """ + try: + from modules.datamodels.datamodelChat import ChatMessage, ChatDocument, ActionDocument + from modules.workflows.processing.shared.stateTools import checkWorkflowStopped + + # Check workflow status + checkWorkflowStopped(self.services) + + # Extract documents from ActionResult + chatDocuments = [] + if taskResult.actionResult and taskResult.actionResult.documents: + for actionDoc in taskResult.actionResult.documents: + if hasattr(actionDoc, 'documentData') and actionDoc.documentData: + # Create file in component storage + fileItem = self.services.interfaceDbComponent.createFile( + name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else f"task_{taskResult.taskId}_result.txt", + mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain", + content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8') + ) + # Persist file data + self.services.interfaceDbComponent.createFileData( + fileItem.id, + actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8') + ) + + # Get file info + fileInfo = self.services.chat.getFileInfo(fileItem.id) + + # Create ChatDocument + chatDoc = ChatDocument( + fileId=fileItem.id, + fileName=fileInfo.get("fileName", actionDoc.documentName) if fileInfo else actionDoc.documentName, + fileSize=fileInfo.get("size", len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))) if fileInfo else (len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))), + mimeType=fileInfo.get("mimeType", actionDoc.mimeType) if fileInfo else actionDoc.mimeType, + roundNumber=workflow.currentRound, + taskNumber=workflow.getTaskIndex(), + actionNumber=workflow.getActionIndex() + ) + chatDocuments.append(chatDoc) + + # Create documentsLabel for docList: references + documentsLabel = f"task_{taskResult.taskId}_results" + if taskResult.actionResult and taskResult.actionResult.resultLabel: + documentsLabel = taskResult.actionResult.resultLabel + + # Build user-friendly message + userMessage = "Task completed successfully" + if context and hasattr(context, 'taskStep') and context.taskStep and hasattr(context.taskStep, 'userMessage') and context.taskStep.userMessage: + userMessage = context.taskStep.userMessage + elif context and hasattr(context, 'taskStep') and context.taskStep and hasattr(context.taskStep, 'objective'): + userMessage = f"Completed: {context.taskStep.objective}" + + # Create ChatMessage + messageData = { + "workflowId": workflow.id, + "role": "assistant", + "message": userMessage, + "status": "step", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": documentsLabel, + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": workflow.getTaskIndex(), + "actionNumber": workflow.getActionIndex(), + # Add progress status + "taskProgress": "success" if taskResult.actionResult and taskResult.actionResult.success else "fail", + "actionProgress": "success" if taskResult.actionResult and taskResult.actionResult.success else "fail" + } + + # Store message with documents + chatMessage = self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments) + + logger.info(f"Persisted task result for task {taskResult.taskId}: {len(chatDocuments)} documents") + return chatMessage + + except Exception as e: + logger.error(f"Error in persistTaskResult: {str(e)}") + raise diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 6751bde6..85a6e32f 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -162,6 +162,30 @@ class WorkflowManager: self.workflowProcessor = WorkflowProcessor(self.services) await self._sendFirstMessage(userInput) + + # Fast Path Detection and Routing + # Get documents from first message if available + workflow = self.services.workflow + documents = [] + if workflow.messages and len(workflow.messages) > 0: + # Get documents from the first message + firstMessageId = workflow.messages[0] if isinstance(workflow.messages[0], str) else workflow.messages[0].id + firstMessage = self.services.chat.getMessage(firstMessageId) + if firstMessage and hasattr(firstMessage, 'documents'): + documents = firstMessage.documents + + # Detect complexity (AI-based semantic understanding) + complexity = await self.workflowProcessor.detectComplexity(userInput.prompt, documents) + logger.info(f"Request complexity detected: {complexity}") + + # Route to fast path for simple requests + if complexity == "simple": + logger.info("Routing to fast path for simple request") + await self._executeFastPath(userInput, documents) + return # Fast path completes the workflow + + # Route to full workflow for moderate/complex requests + logger.info(f"Routing to full workflow for {complexity} request") taskPlan = await self._planTasks(userInput) await self._executeTasks(taskPlan) await self._processWorkflowResults() @@ -174,6 +198,111 @@ class WorkflowManager: # Helper functions + async def _executeFastPath(self, userInput: UserInputRequest, documents: List[ChatDocument]) -> None: + """Execute fast path for simple requests and deliver result to user""" + try: + workflow = self.services.workflow + checkWorkflowStopped(self.services) + + # Get user language if available + userLanguage = getattr(self.services, 'currentUserLanguage', None) + + # Execute fast path + result = await self.workflowProcessor.fastPathExecute( + prompt=userInput.prompt, + documents=documents, + userLanguage=userLanguage + ) + + if not result.success: + # Fast path failed, fall back to full workflow + logger.warning(f"Fast path failed: {result.error}, falling back to full workflow") + taskPlan = await self._planTasks(userInput) + await self._executeTasks(taskPlan) + await self._processWorkflowResults() + return + + # Extract response text from ActionResult + responseText = "" + chatDocuments = [] + + if result.documents and len(result.documents) > 0: + # Get response text from first document + firstDoc = result.documents[0] + if hasattr(firstDoc, 'documentData'): + docData = firstDoc.documentData + if isinstance(docData, bytes): + responseText = docData.decode('utf-8') + else: + responseText = str(docData) + + # Convert ActionDocuments to ChatDocuments for persistence + for actionDoc in result.documents: + if hasattr(actionDoc, 'documentData') and actionDoc.documentData: + # Create file in component storage + fileItem = self.services.interfaceDbComponent.createFile( + name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else "fast_path_response.txt", + mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain", + content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8') + ) + # Persist file data + self.services.interfaceDbComponent.createFileData(fileItem.id, actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8')) + + # Get file info + fileInfo = self.services.chat.getFileInfo(fileItem.id) + + # Create ChatDocument + chatDoc = ChatDocument( + fileId=fileItem.id, + fileName=fileInfo.get("fileName", actionDoc.documentName) if fileInfo else actionDoc.documentName, + fileSize=fileInfo.get("size", len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))) if fileInfo else (len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))), + mimeType=fileInfo.get("mimeType", actionDoc.mimeType) if fileInfo else actionDoc.mimeType, + roundNumber=workflow.currentRound, + taskNumber=0, # Fast path doesn't have tasks + actionNumber=0 + ) + chatDocuments.append(chatDoc) + + # Create ChatMessage with fast path response (in user's language) + messageData = { + "workflowId": workflow.id, + "role": "assistant", + "message": responseText or "Fast path response completed", + "status": "last", # Fast path completes the workflow + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "fast_path_response", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, # Fast path doesn't have tasks + "actionNumber": 0, + # Add progress status + "taskProgress": "success", + "actionProgress": "success" + } + + # Store message with documents + self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments) + + # Mark workflow as completed + workflow.status = "completed" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "completed", + "lastActivity": workflow.lastActivity + }) + + logger.info(f"Fast path completed successfully, response length: {len(responseText)} chars") + + except Exception as e: + logger.error(f"Error in _executeFastPath: {str(e)}") + # Fall back to full workflow on error + logger.info("Falling back to full workflow due to fast path error") + taskPlan = await self._planTasks(userInput) + await self._executeTasks(taskPlan) + await self._processWorkflowResults() + async def _sendFirstMessage(self, userInput: UserInputRequest) -> None: """Send first message to start workflow""" try: @@ -369,6 +498,9 @@ class WorkflowManager: currentTaskIndex = idx + 1 logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}") + # Update workflow state before executing task (fixes "Task 0" issue) + handling.updateWorkflowBeforeExecutingTask(currentTaskIndex) + # Build TaskContext (mode-specific behavior is inside WorkflowProcessor) taskContext = TaskContext( taskStep=taskStep, @@ -393,7 +525,30 @@ class WorkflowManager: } ) - taskResult = await handling.executeTask(taskStep, workflow, taskContext, currentTaskIndex, totalTasks) + taskResult = await handling.executeTask(taskStep, workflow, taskContext) + + # Persist task result for cross-task/round document references + # Convert ChatTaskResult to WorkflowTaskResult for persistence + from modules.datamodels.datamodelWorkflow import TaskResult as WorkflowTaskResult + from modules.datamodels.datamodelChat import ActionResult + + # Get final ActionResult from task execution (last action result) + finalActionResult = None + if hasattr(taskResult, 'actionResult'): + finalActionResult = taskResult.actionResult + elif taskContext.previousActionResults and len(taskContext.previousActionResults) > 0: + # Use last action result from context + finalActionResult = taskContext.previousActionResults[-1] + + # Create WorkflowTaskResult for persistence + if finalActionResult: + workflowTaskResult = WorkflowTaskResult( + taskId=taskStep.id, + actionResult=finalActionResult + ) + # Persist task result (creates ChatMessage + ChatDocuments) + await handling.persistTaskResult(workflowTaskResult, workflow, taskContext) + handoverData = await handling.prepareTaskHandover(taskStep, [], taskResult, workflow) allTaskResults.append({ 'taskStep': taskStep,