diff --git a/how --stat HEAD b/how --stat HEAD new file mode 100644 index 00000000..52b1d7b3 --- /dev/null +++ b/how --stat HEAD @@ -0,0 +1,49 @@ +M app.py +A modules/.$DEPENDENCY_DIAGRAM.drawio.bkp +A modules/AUTOMATION_FEATURE_ANALYSIS.md +A modules/BIDIRECTIONAL_IMPORTS.md +A modules/DEPENDENCY_DIAGRAM.drawio +A modules/FEATURES_TO_INTERFACES_IMPORTS.md +M modules/connectors/connectorVoiceGoogle.py +M modules/datamodels/datamodelChat.py +M modules/datamodels/datamodelPagination.py +A modules/features/automation/__init__.py +A modules/features/automation/mainAutomation.py +A modules/features/automation/subAutomationUtils.py +D modules/features/chatAlthaus/COMPONENT_DIAGRAM.md +M modules/features/featuresLifecycle.py +M modules/interfaces/interfaceAiObjects.py +M modules/interfaces/interfaceDbAppObjects.py +M modules/interfaces/interfaceDbChatObjects.py +M modules/interfaces/interfaceDbComponentObjects.py +M modules/interfaces/interfaceVoiceObjects.py +M modules/routes/routeAdminAutomationEvents.py +M modules/routes/routeVoiceGoogle.py +M modules/services/__init__.py +M modules/services/serviceAi/mainServiceAi.py +M modules/services/serviceAi/subJsonResponseHandling.py +M modules/services/serviceChat/mainServiceChat.py +M modules/services/serviceExtraction/mainServiceExtraction.py +M modules/services/serviceExtraction/subPipeline.py +M modules/services/serviceExtraction/subPromptBuilderExtraction.py +M modules/services/serviceGeneration/renderers/rendererXlsx.py +M modules/services/serviceGeneration/subPromptBuilderGeneration.py +A modules/services/serviceSecurity/mainServiceSecurity.py +M modules/services/serviceSharepoint/mainServiceSharepoint.py +M modules/services/serviceUtils/mainServiceUtils.py +A modules/shared/callbackRegistry.py +M modules/shared/debugLogger.py +M modules/shared/jsonUtils.py +M modules/workflows/methods/methodAi.py +M modules/workflows/methods/methodBase.py +A modules/workflows/methods/methodContext.py +M modules/workflows/methods/methodOutlook.py +M modules/workflows/methods/methodSharepoint.py +M modules/workflows/processing/adaptive/contentValidator.py +M modules/workflows/processing/core/messageCreator.py +M modules/workflows/processing/modes/modeAutomation.py +M modules/workflows/processing/modes/modeDynamic.py +M modules/workflows/processing/shared/promptGenerationActionsDynamic.py +M modules/workflows/processing/shared/promptGenerationTaskplan.py +M modules/workflows/processing/workflowProcessor.py +M modules/workflows/workflowManager.py diff --git a/modules/.$DEPENDENCY_DIAGRAM.drawio.bkp b/modules/.$DEPENDENCY_DIAGRAM.drawio.bkp deleted file mode 100644 index 5d8a062a..00000000 --- a/modules/.$DEPENDENCY_DIAGRAM.drawio.bkp +++ /dev/null @@ -1,326 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/modules/AUTOMATION_FEATURE_ANALYSIS.md b/modules/AUTOMATION_FEATURE_ANALYSIS.md deleted file mode 100644 index 37dd4e75..00000000 --- a/modules/AUTOMATION_FEATURE_ANALYSIS.md +++ /dev/null @@ -1,407 +0,0 @@ -# Automation Feature Analysis: Moving Automation Handler to Features Layer - -## Executive Summary - -**Status: ✅ HIGHLY RECOMMENDED - Architectural Improvement** - -Moving automation workflow handler functionality from `interfaces/interfaceDbChatObjects.py` to a new feature in `features/` is **architecturally correct** and aligns with separation of concerns. - ---- - -## Current Architecture Analysis - -### Current Location: `interfaces/interfaceDbChatObjects.py` - -**Automation-related methods:** -1. `executeAutomation(automationId: str)` - Executes automation workflow immediately (test mode) -2. `syncAutomationEvents()` - Syncs scheduler with all active automations -3. `_createAutomationEventHandler(automationId: str)` - Creates event handler for scheduled execution -4. `_parseScheduleToCron(schedule: str)` - Parses schedule string to cron kwargs -5. `_planToPrompt(plan: Dict)` - Converts plan structure to prompt string -6. `_replacePlaceholders(template: str, placeholders: Dict)` - Replaces placeholders in template - -**Dependencies:** -- Uses `getAutomationDefinition()` - Database access (should stay in interface) -- Uses `chatStart()` from `features.chatPlayground` - Already imports from features -- Uses `eventManager` from `shared.eventManagement` - Foundation layer -- Creates workflows using `WorkflowModeEnum.WORKFLOW_AUTOMATION` - ---- - -## Why This Should Be a Feature - -### 1. **Business Logic vs. Data Access** - -**Current Problem:** -- Automation execution logic is **business logic** (orchestration, workflow creation) -- It's mixed with **data access** (interface layer) -- Interface layer should only provide data access, not business orchestration - -**After Move:** -- Interface layer: `getAutomationDefinition()`, `saveAutomationDefinition()` (data access) -- Feature layer: `executeAutomation()`, `syncAutomationEvents()` (business logic) - -### 2. **Feature Pattern Consistency** - -**Existing Features Pattern:** -- `features/chatPlayground/` - Chat workflow execution -- `features/chatAlthaus/` - Scheduled data updates -- `features/syncDelta/` - Sync management -- `features/neutralizePlayground/` - Neutralization workflows - -**Automation Handler Pattern:** -- Scheduled execution (like `chatAlthaus`) -- Workflow orchestration (like `chatPlayground`) -- Event-driven (like `syncDelta`) - -**Conclusion:** Automation handler fits the feature pattern perfectly. - -### 3. **Dependency Direction** - -**Current Violation:** -- `interfaces/` imports from `features/` (line 1927: `from modules.features.chatPlayground.mainChatPlayground import chatStart`) -- This creates bidirectional dependency: `interfaces/ ↔ features/` - -**After Move:** -- `features/automation/` imports from `interfaces/` (correct direction) -- `features/automation/` imports from `features/chatPlayground/` (feature-to-feature) -- Eliminates `interfaces/ → features/` import - -### 4. **Separation of Concerns** - -**Interface Layer Should:** -- ✅ Provide data access (`getAutomationDefinition`, `saveAutomationDefinition`) -- ✅ Handle CRUD operations -- ❌ NOT orchestrate workflows -- ❌ NOT manage scheduling -- ❌ NOT execute business logic - -**Feature Layer Should:** -- ✅ Orchestrate workflows -- ✅ Manage scheduling -- ✅ Execute business logic -- ✅ Coordinate between services and interfaces - ---- - -## Proposed Architecture - -### New Structure: `features/automation/` - -``` -features/automation/ - ├── mainAutomation.py # Main automation service - │ ├── executeAutomation() # Execute automation workflow - │ ├── syncAutomationEvents() # Sync scheduler with automations - │ └── _createAutomationEventHandler() # Create event handler - └── subAutomationUtils.py # Utility functions - ├── _parseScheduleToCron() # Parse schedule to cron - ├── _planToPrompt() # Convert plan to prompt - └── _replacePlaceholders() # Replace template placeholders -``` - -### Interface Layer (Keep) - -``` -interfaces/interfaceDbChatObjects.py - ├── getAutomationDefinition() # Data access - KEEP - ├── saveAutomationDefinition() # Data access - KEEP - └── (other CRUD methods) # Data access - KEEP -``` - -### Feature Lifecycle Integration - -``` -features/featuresLifecycle.py - ├── start() - │ └── from features.automation import mainAutomation - │ mainAutomation.startScheduler(eventUser) - └── stop() - └── mainAutomation.stopScheduler() -``` - ---- - -## Detailed Functionality Analysis - -### Functions to Move - -#### 1. `executeAutomation(automationId: str)` → `features/automation/mainAutomation.py` - -**Current Implementation:** -- Loads automation definition (calls interface method) -- Replaces placeholders in template -- Creates UserInputRequest -- Calls `chatStart()` from `features.chatPlayground` -- Returns ChatWorkflow - -**Dependencies:** -- `getAutomationDefinition()` - Interface method (import from interface) -- `_replacePlaceholders()` - Utility (move to feature) -- `_planToPrompt()` - Utility (move to feature) -- `chatStart()` - Feature method (import from feature) -- `getInterface()` - Interface factory (import from interface) - -**After Move:** -```python -# features/automation/mainAutomation.py -from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface -from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface -from modules.features.chatPlayground.mainChatPlayground import chatStart -from .subAutomationUtils import replacePlaceholders, planToPrompt - -async def executeAutomation(automationId: str, chatInterface) -> ChatWorkflow: - """Execute automation workflow immediately.""" - # Load automation (uses interface) - automation = chatInterface.getAutomationDefinition(automationId) - # ... rest of logic -``` - -#### 2. `syncAutomationEvents()` → `features/automation/mainAutomation.py` - -**Current Implementation:** -- Gets all automation definitions (calls interface method) -- Parses schedules -- Registers cron jobs with eventManager -- Creates event handlers - -**Dependencies:** -- `getRecordset()` - Interface method (via chatInterface) -- `_parseScheduleToCron()` - Utility (move to feature) -- `_createAutomationEventHandler()` - Handler creation (move to feature) -- `eventManager` - Foundation layer (import from shared) - -**After Move:** -```python -# features/automation/mainAutomation.py -from modules.shared.eventManagement import eventManager -from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface -from .subAutomationUtils import parseScheduleToCron - -async def syncAutomationEvents(chatInterface) -> Dict[str, Any]: - """Sync scheduler with all active automations.""" - # Get automations (uses interface) - allAutomations = chatInterface.db.getRecordset(AutomationDefinition) - # ... rest of logic -``` - -#### 3. `_createAutomationEventHandler(automationId: str)` → `features/automation/mainAutomation.py` - -**Current Implementation:** -- Creates async handler function -- Gets event user -- Loads automation -- Executes automation with creator user context - -**Dependencies:** -- `getRootInterface()` - Interface factory (import from interface) -- `getInterface()` - Interface factories (import from interfaces) -- `executeAutomation()` - Will be in same module - -**After Move:** -```python -# features/automation/mainAutomation.py -def createAutomationEventHandler(automationId: str): - """Create event handler function for scheduled automation.""" - async def handler(): - # Uses interfaces and executeAutomation from same module - await executeAutomation(automationId, eventInterface) - return handler -``` - -#### 4. Utility Functions → `features/automation/subAutomationUtils.py` - -**Functions:** -- `_parseScheduleToCron(schedule: str)` - Parse schedule to cron kwargs -- `_planToPrompt(plan: Dict)` - Convert plan to prompt string -- `_replacePlaceholders(template: str, placeholders: Dict)` - Replace placeholders - -**Dependencies:** -- No external dependencies (pure utility functions) - ---- - -## Migration Plan - -### Phase 1: Create Feature Structure - -1. Create `features/automation/` directory -2. Create `features/automation/__init__.py` -3. Create `features/automation/mainAutomation.py` -4. Create `features/automation/subAutomationUtils.py` - -### Phase 2: Move Functions - -1. Move utility functions to `subAutomationUtils.py` -2. Move `executeAutomation()` to `mainAutomation.py` -3. Move `syncAutomationEvents()` to `mainAutomation.py` -4. Move `_createAutomationEventHandler()` to `mainAutomation.py` - -### Phase 3: Update Dependencies - -1. Update `features/featuresLifecycle.py` to use new feature -2. Update `routes/routeAdminAutomationEvents.py` to use new feature -3. Update any other call sites - -### Phase 4: Cleanup Interface - -1. Remove moved functions from `interfaceDbChatObjects.py` -2. Keep only data access methods (`getAutomationDefinition`, etc.) -3. Remove import from `features.chatPlayground` from interface - -### Phase 5: Update Documentation - -1. Update `BIDIRECTIONAL_IMPORTS.md` to reflect resolved dependency -2. Document new feature structure - ---- - -## Benefits - -### ✅ Architectural Benefits - -1. **Correct Separation of Concerns** - - Interface layer: Data access only - - Feature layer: Business logic and orchestration - -2. **Resolves Bidirectional Dependency** - - Eliminates `interfaces/ → features/` import - - Only `features/ → interfaces/` remains (correct direction) - -3. **Consistency with Existing Patterns** - - Matches other feature implementations - - Follows established architecture - -4. **Better Testability** - - Feature logic can be tested independently - - Interface layer remains focused - -### ✅ Maintainability Benefits - -1. **Clearer Code Organization** - - Automation logic in one place - - Easier to find and modify - -2. **Reduced Coupling** - - Interface layer doesn't depend on features - - Features depend on interfaces (correct direction) - -3. **Easier to Extend** - - New automation features can be added to feature module - - Interface layer remains stable - ---- - -## Risks and Considerations - -### 🟢 Low Risk - -- **Functionality preservation** - Logic doesn't change, only location -- **Interface methods remain** - Data access methods stay in interface -- **Lazy imports** - Already using lazy imports for event handlers - -### 🟡 Medium Risk - -- **Call site updates** - Need to update routes and lifecycle -- **Interface method access** - Feature needs to call interface methods -- **Event user context** - Need to ensure proper user context handling - -### 🔴 Potential Issues - -1. **Interface method access** - Feature needs `chatInterface` instance - - **Solution:** Pass interface instance as parameter or create in feature - -2. **Event handler context** - Event handlers need interface access - - **Solution:** Create interface instances in handler (already doing this) - -3. **Backward compatibility** - Existing code calling `chatInterface.executeAutomation()` - - **Solution:** Update all call sites, or create wrapper method in interface (deprecated) - ---- - -## Call Sites Analysis - -### Current Call Sites - -1. **`features/featuresLifecycle.py` (line 20)** - ```python - await chatInterface.syncAutomationEvents() - ``` - **Update:** Import and call feature directly - -2. **`routes/routeAdminAutomationEvents.py` (line 97)** - ```python - result = await chatInterface.syncAutomationEvents() - ``` - **Update:** Import and call feature directly - -3. **`interfaces/interfaceDbChatObjects.py` (lines 1683, 1714, 1744)** - ```python - asyncio.create_task(self.syncAutomationEvents()) - ``` - **Update:** Remove (will be handled by feature lifecycle) - -4. **`_createAutomationEventHandler()` (line 2105)** - ```python - await creatorInterface.executeAutomation(automationId) - ``` - **Update:** Call feature method instead - -### Proposed Call Sites - -1. **`features/featuresLifecycle.py`** - ```python - from modules.features.automation import mainAutomation - await mainAutomation.syncAutomationEvents(chatInterface) - ``` - -2. **`routes/routeAdminAutomationEvents.py`** - ```python - from modules.features.automation import mainAutomation - result = await mainAutomation.syncAutomationEvents(chatInterface) - ``` - -3. **`features/automation/mainAutomation.py` (event handler)** - ```python - from .mainAutomation import executeAutomation - await executeAutomation(automationId, eventInterface) - ``` - ---- - -## Implementation Checklist - -- [ ] Create `features/automation/` directory structure -- [ ] Create `features/automation/__init__.py` -- [ ] Create `features/automation/mainAutomation.py` -- [ ] Create `features/automation/subAutomationUtils.py` -- [ ] Move `_parseScheduleToCron()` to `subAutomationUtils.py` -- [ ] Move `_planToPrompt()` to `subAutomationUtils.py` -- [ ] Move `_replacePlaceholders()` to `subAutomationUtils.py` -- [ ] Move `executeAutomation()` to `mainAutomation.py` -- [ ] Move `syncAutomationEvents()` to `mainAutomation.py` -- [ ] Move `_createAutomationEventHandler()` to `mainAutomation.py` -- [ ] Update `features/featuresLifecycle.py` to use feature -- [ ] Update `routes/routeAdminAutomationEvents.py` to use feature -- [ ] Remove moved functions from `interfaceDbChatObjects.py` -- [ ] Remove `features.chatPlayground` import from `interfaceDbChatObjects.py` -- [ ] Update `BIDIRECTIONAL_IMPORTS.md` -- [ ] Test automation execution (manual) -- [ ] Test automation scheduling (scheduled) -- [ ] Verify no circular dependencies - ---- - -## Conclusion - -**✅ STRONGLY RECOMMENDED: Move to Features Layer** - -This refactoring is: -- **Architecturally correct** - Business logic belongs in features, not interfaces -- **Resolves dependency violation** - Eliminates `interfaces/ → features/` import -- **Consistent with patterns** - Matches existing feature implementations -- **Low risk** - Logic doesn't change, only location -- **Improves maintainability** - Clearer separation of concerns - -**Recommendation: PROCEED** with moving automation handler functionality to `features/automation/` following the plan above. - diff --git a/modules/BIDIRECTIONAL_IMPORTS.md b/modules/BIDIRECTIONAL_IMPORTS.md deleted file mode 100644 index 55a8bd43..00000000 --- a/modules/BIDIRECTIONAL_IMPORTS.md +++ /dev/null @@ -1,406 +0,0 @@ -# Bidirectional Import Analysis - -## Summary - -After refactoring extraction functions and automation handler, **ALL bidirectional dependencies have been RESOLVED**: - -**Current Status:** -- ✅ **interfaces/ → services/**: **RESOLVED** (no imports) -- ✅ **interfaces/ → features/**: **RESOLVED** (uses callback registry, no direct imports) -- ✅ **services/ → interfaces/**: **UNIDIRECTIONAL** (correct dependency direction) -- ✅ **services/ → features/**: **NONE** (no imports) -- ✅ **features/ → interfaces/**: **UNIDIRECTIONAL** (correct dependency direction) -- ✅ **features/ → services/**: **1 lazy import** (correct direction) - -**Result:** ✅ **ZERO VIOLATIONS** - Perfect architectural compliance achieved. - ---- - -## Dependency Diagram - -### Mermaid Diagram - -```mermaid -graph TB - %% Foundation Layer (no dependencies) - shared[shared/
Foundation] - datamodels[datamodels/
Foundation] - aicore[aicore/
Infrastructure] - connectors[connectors/
Infrastructure] - - %% Data Layer - interfaces[interfaces/
Data Access
✅ No violations] - - %% Business Logic Layer - services[services/
Business Logic
✅ Unidirectional] - workflows[workflows/
Business Logic] - - %% Feature Layer - features[features/
Features
✅ Unidirectional] - - %% API Layer - routes[routes/
API Layer] - security[security/
Security] - - %% Foundation dependencies - datamodels -->|imports| shared - aicore -->|imports| datamodels - aicore -->|imports| shared - connectors -->|imports| datamodels - connectors -->|imports| shared - - %% Interface layer (foundation only) - interfaces -->|imports| aicore - interfaces -->|imports| connectors - interfaces -->|imports| datamodels - interfaces -.->|callbackRegistry| shared - - %% Service layer (interfaces only) - services -->|✅ imports| interfaces - services -->|imports| aicore - services -->|imports| datamodels - services -->|imports| security - services -->|imports| shared - - %% Workflow layer - workflows -->|imports| aicore - workflows -->|imports| datamodels - workflows -->|imports| services - workflows -->|imports| shared - - %% Feature layer (interfaces + services) - features -->|✅ imports| interfaces - features -->|✅ imports| services - features -->|imports| datamodels - features -->|imports| workflows - features -->|imports| shared - - %% API layer - routes -->|imports| interfaces - routes -->|imports| features - routes -->|imports| services - routes -->|imports| security - routes -->|imports| datamodels - routes -->|imports| shared - - %% Security layer - security -->|imports| interfaces - security -->|imports| datamodels - security -->|imports| shared - - %% Styling - classDef foundation fill:#e1f5ff,stroke:#01579b,stroke-width:2px - classDef data fill:#f3e5f5,stroke:#4a148c,stroke-width:3px - classDef business fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px - classDef feature fill:#fff3e0,stroke:#e65100,stroke-width:2px - classDef api fill:#fce4ec,stroke:#880e4f,stroke-width:2px - - class shared,datamodels,aicore,connectors foundation - class interfaces data - class services,workflows business - class features feature - class routes,security api -``` - -### Draw.io Diagram - -A detailed draw.io diagram is available in `DEPENDENCY_DIAGRAM.drawio` with: -- Color-coded layers (Foundation, Data, Business Logic, Features, API) -- Arrow directions showing import relationships -- ✅ markers on correct dependency directions -- Dashed line for callback registry pattern -- Status box showing zero violations - -**Key Visual Elements:** -- **Thick green arrows (✅)**: Correct dependency directions (services→interfaces, features→interfaces, features→services) -- **Dashed purple line**: Callback registry pattern (interfaces→shared, decoupled from features) -- **Color coding**: Each layer has distinct colors for easy identification -- **Status indicators**: ✅ markers show compliance, "No violations" labels confirm architectural correctness - ---- - -## Detailed Analysis - -### 1. interfaces/ → services/ ✅ RESOLVED - -**Current State:** -- ✅ **No imports from services/** in `interfaces/` -- All extraction-related functions moved to `services/serviceExtraction/` -- Dependency violations resolved - -**Impact:** Major architectural improvement - interfaces no longer depend on services. - ---- - -### 2. interfaces/ → features/ ✅ RESOLVED - -**Previous State:** -- `interfaceDbChatObjects.py` (line 1754): Lazy import in `_triggerAutomationSync()` helper method - ```python - from modules.features.automation import syncAutomationEvents - ``` - -**Current State:** -- ✅ **No imports from features/** in `interfaces/` -- Uses callback registry pattern (`shared.callbackRegistry`) for decoupled notifications -- Interface triggers callbacks without knowing which features are listening -- Feature registers callback in `featuresLifecycle.py` startup - -**Refactoring:** -- Created `shared/callbackRegistry.py` - decoupled event notification system -- Interface calls `callbackRegistry.trigger('automation.changed', self)` instead of importing feature -- Feature registers callback on startup: `callbackRegistry.register('automation.changed', onAutomationChanged)` - -**Impact:** Perfect separation - interface doesn't know about features, uses shared callback registry. - ---- - -### 3. services/ → interfaces/ ✅ CORRECT DIRECTION - -**Current State:** -- `serviceAi/mainServiceAi.py`: Imports `AiObjects` from `interfaceAiObjects` -- `serviceExtraction/mainServiceExtraction.py`: Lazy import from `interfaceDbComponentObjects` -- `serviceUtils/mainServiceUtils.py`: Lazy import from `interfaceDbChatObjects` -- `serviceTicket/mainServiceTicket.py`: Imports from `interfaceTicketObjects` -- `services/__init__.py`: Lazy imports from multiple interfaces - -**Impact:** This is **correct** - services should use interfaces for data access. This follows the dependency rule: `services/` → `interfaces/` ✅ - -**Note:** This is **unidirectional** (services → interfaces), not bidirectional. - ---- - -### 4. services/ → features/ ✅ NONE - -**Current State:** -- ✅ **No imports from features/** in `services/` - -**Impact:** Services correctly do not depend on features. - ---- - -### 5. features/ → interfaces/ ✅ CORRECT DIRECTION - -**Current State:** -- `features/automation/mainAutomation.py`: Imports from `interfaceDbChatObjects`, `interfaceDbAppObjects` -- `features/featuresLifecycle.py`: Imports from `interfaceDbAppObjects`, `interfaceDbChatObjects` (lazy) - -**Impact:** This is **correct** - features should use interfaces for data access. This follows the dependency rule: `features/` → `interfaces/` ✅ - -**Note:** This is **unidirectional** (features → interfaces), not bidirectional. - ---- - -### 6. features/ → services/ ✅ CORRECT DIRECTION - -**Current State:** -- `features/neutralizePlayground/mainNeutralizePlayground.py` (line 123): Lazy import from `serviceSharepoint` - -**Impact:** This is **correct** - features can use services. This follows the dependency rule: `features/` → `services/` ✅ - ---- - -## Complete Import Matrix (Fact-Based) - -### aicore/ -- **Imports from:** `datamodels/`, `shared/` -- **Imported by:** `interfaces/`, `services/`, `workflows/` -- **Bidirectional:** None ✅ - -### connectors/ -- **Imports from:** `datamodels/`, `shared/` -- **Imported by:** `interfaces/` -- **Bidirectional:** None ✅ - -### datamodels/ -- **Imports from:** `shared/` -- **Imported by:** `aicore/`, `connectors/`, `features/`, `interfaces/`, `routes/`, `security/`, `services/`, `workflows/` -- **Bidirectional:** None ✅ - -### features/ -- **Imports from:** `datamodels/`, `interfaces/`, `services/`, `shared/`, `workflows/` -- **Imported by:** `routes/` -- **✅ UNIDIRECTIONAL:** No longer imported by `interfaces/` - -**Detailed imports:** -- From `interfaces/`: `interfaceDbChatObjects`, `interfaceDbAppObjects` -- From `services/`: `serviceSharepoint` (lazy, in `neutralizePlayground`) -- From `features/`: `chatPlayground` (in `automation`), `syncDelta`, `chatAlthaus` (in `featuresLifecycle`) - -### interfaces/ -- **Imports from:** `aicore/`, `connectors/`, `datamodels/`, `shared/` -- **Imported by:** `features/`, `routes/`, `security/`, `services/` -- **✅ RESOLVED:** No longer imports from `services/` or `features/` - -**Detailed imports:** -- From `shared/`: `callbackRegistry` (for decoupled event notifications), `eventManagement` (for event removal in delete), `timeUtils`, `configuration`, `debugLogger` -- From `interfaces/`: Internal imports (`interfaceDbChatAccess`, `interfaceDbAppAccess`, `interfaceDbComponentAccess`) - -### routes/ -- **Imports from:** `datamodels/`, `features/`, `interfaces/`, `security/`, `services/`, `shared/` -- **Imported by:** None (top-level API layer) -- **Bidirectional:** None ✅ - -**Detailed imports:** -- From `interfaces/`: `interfaceDbChatObjects`, `interfaceDbAppObjects`, `interfaceDbComponentObjects`, `interfaceVoiceObjects` -- From `features/**: `features.automation`, `features.chatPlayground`, `features.neutralizePlayground` - -### security/ -- **Imports from:** `datamodels/`, `interfaces/`, `shared/` -- **Imported by:** `routes/`, `services/` -- **Bidirectional:** None ✅ - -**Detailed imports:** -- From `interfaces/**: `interfaceDbAppObjects` (lazy imports) - -### services/ -- **Imports from:** `aicore/`, `datamodels/`, `interfaces/`, `security/`, `shared/` -- **Imported by:** `features/`, `routes/`, `workflows/` -- **✅ UNIDIRECTIONAL:** Only imports from `interfaces/` (correct direction) -- **✅ RESOLVED:** No longer imported by `interfaces/` - -**Detailed imports:** -- From `interfaces/**: `interfaceAiObjects`, `interfaceDbComponentObjects` (lazy), `interfaceDbChatObjects` (lazy), `interfaceTicketObjects` -- From `services/**: Internal imports (service-to-service) - -### shared/ -- **Imports from:** None (foundation layer) -- **Imported by:** `aicore/`, `connectors/`, `datamodels/`, `features/`, `interfaces/`, `routes/`, `security/`, `services/`, `workflows/` -- **Bidirectional:** None ✅ - -### workflows/ -- **Imports from:** `aicore/`, `datamodels/`, `services/`, `shared/` -- **Imported by:** `features/` -- **Bidirectional:** None ✅ - -**Detailed imports:** -- From `services/**: `serviceGeneration` (lazy, in `methodAi.py`) - ---- - -## Refactoring Impact Summary - -### Before Refactoring: -- ❌ **interfaces/ ↔ services/**: Bidirectional (violations) - - `interfaces/` imported from `services/serviceExtraction/` (6 violations) - - `services/` imported from `interfaces/` (correct) -- ❌ **interfaces/ ↔ features/**: Bidirectional (violation) - - `interfaces/` imported from `features.chatPlayground` (1 violation) - - `features/` imported from `interfaces/` (correct) - -### After Refactoring: -- ✅ **interfaces/ → services/**: RESOLVED (no imports) -- ✅ **services/ → interfaces/**: UNIDIRECTIONAL (correct direction) -- ✅ **interfaces/ → features/**: RESOLVED (uses callback registry pattern) -- ✅ **features/ → interfaces/**: UNIDIRECTIONAL (correct direction) -- ✅ **features/ → services/**: CORRECT DIRECTION (1 lazy import) - ---- - -## Specific Import Details - -### interfaces/ → features/ ✅ RESOLVED - -**Previous:** `interfaceDbChatObjects.py` (line 1754) had lazy import from `features.automation` -```python -from modules.features.automation import syncAutomationEvents -``` - -**Current:** Uses `shared.callbackRegistry` pattern (line 1754): -- Interface calls: `callbackRegistry.trigger('automation.changed', self)` -- Feature registers callback in `featuresLifecycle.py`: `callbackRegistry.register('automation.changed', onAutomationChanged)` -- **Zero direct imports** from features in interfaces -- **Verification:** `grep "from modules.features" interfaces/` returns no matches ✅ - -### features/ → services/ (1 import, correct direction) - -**File:** `features/neutralizePlayground/mainNeutralizePlayground.py` -- **Line:** 123 -- **Import:** `from modules.services.serviceSharepoint.mainServiceSharepoint import SharepointService` -- **Type:** Lazy import (inside method) -- **Context:** Used for SharePoint file processing -- **Status:** ✅ CORRECT - Features can import from services - ---- - -## Recommendations - -### ✅ Completed Improvements -1. **Resolved interfaces/ → services/ violations** - Moved extraction functions to `serviceExtraction/` -2. **Resolved interfaces/ → features/ violations** - Moved automation handler to `features/automation/` -3. **Eliminated ALL bidirectional dependencies** - `interfaces/` no longer imports from `services/` or `features/` -4. **Implemented callback registry pattern** - Decoupled event notifications using `shared.callbackRegistry` -5. **Follows dependency rules perfectly** - All dependencies now follow correct direction: - - `services/` → `interfaces/` ✅ - - `features/` → `interfaces/` ✅ - - `features/` → `services/` ✅ - - `interfaces/` → `shared/` only ✅ - -### Best Practices -- ✅ Use lazy imports (inside functions) for dependencies when appropriate -- ✅ Services correctly depend on interfaces (unidirectional) -- ✅ Features correctly depend on interfaces and services (unidirectional) -- ✅ Interfaces completely independent (only foundation layers) -- ✅ Use callback registry for decoupled event notifications -- ✅ Document dependency relationships clearly -- ✅ Monitor for circular import errors - ---- - -## Dependency Rule Compliance - -### Current Rules: -- ✅ **features/** → **services/** ✅ (correct) -- ✅ **services/** → **interfaces/** ✅ (correct) -- ✅ **features/** → **interfaces/** ✅ (correct) - -### Status: -- ✅ **interfaces/** → **services/**: **RESOLVED** (was violation, now compliant) -- ✅ **services/** → **interfaces/**: **COMPLIANT** (correct direction) -- ✅ **features/** → **interfaces/**: **COMPLIANT** (correct direction) -- ✅ **features/** → **services/**: **COMPLIANT** (correct direction) -- ✅ **interfaces/** → **features/**: **RESOLVED** (was violation, now uses callback registry) - ---- - -## Conclusion - -The refactoring successfully resolved **ALL bidirectional dependencies**: -- ✅ **interfaces/ ↔ services/**: RESOLVED -- ✅ **interfaces/ ↔ features/**: RESOLVED (using callback registry pattern) - -The architecture now follows the intended dependency rules **perfectly**: -- `interfaces/` only imports from foundation layers (`aicore/`, `connectors/`, `datamodels/`, `shared/`) -- `services/` imports from `interfaces/` (correct direction) -- `features/` imports from `interfaces/` and `services/` (correct direction) -- **Zero violations** - perfect architectural compliance achieved through callback registry pattern - ---- - -## Architecture Layers - -The codebase follows a clean layered architecture: - -1. **Foundation Layer** (`shared/`, `datamodels/`) - - No dependencies on other modules - - Used by all layers - -2. **Infrastructure Layer** (`aicore/`, `connectors/`) - - Depends only on foundation - - Provides core capabilities - -3. **Data Access Layer** (`interfaces/`) - - Depends only on foundation and infrastructure - - Provides data access abstraction - -4. **Business Logic Layer** (`services/`, `workflows/`) - - Depends on interfaces and foundation - - Implements business logic - -5. **Feature Layer** (`features/`) - - Depends on interfaces, services, and foundation - - Implements user-facing features - -6. **API Layer** (`routes/`, `security/`) - - Depends on all layers - - Provides HTTP API endpoints diff --git a/modules/DEPENDENCY_DIAGRAM.drawio b/modules/DEPENDENCY_DIAGRAM.drawio deleted file mode 100644 index eabab094..00000000 --- a/modules/DEPENDENCY_DIAGRAM.drawio +++ /dev/null @@ -1 +0,0 @@ -5Z1bb6M4FMc/DdLOQyUuIZfHNm1nK3Wno8lepH1zwSSoBFeGNO1++jWNSXzLNKW2MR1ppCbHBJLz//n4cIAzXjRfP3/F4HH1B0ph4YV++uxFl14YBmEYes0/P31pLcFkZ1niPKW2g2GR/wep0afWTZ7CituwRqio80femKCyhEnN2QDGaMtvlqGCP+ojWELJsEhAIVv/ydN6Ra0j3z8M/A7z5ao9dNiOrEG7NTVUK5CiLWOKrrxojhGqd6/Wz3NYNO5rHbP73PWR0f03w7CsT/kAOT6G6e5TT6DYQN587YVjL4yI46OL367RpkxBnaPyC/3y9UvrEtyMwWanAdl0u8pruHgESTO6JRQQ26peF3Q4y4tijgqEXz8bwSCLs4zYqxqjB8iM+EE8md3vR1pnh80+UFkv6PEVv5qaniCu4TNjol74CtEa1viFbEJHwxlVhEIZtQptGYVb24oRd0xtgEK13O/64HbygnperQLxKlg386SSlGCHfh014phXY2xRDJAnCENJiNbMiXBTZhgQd2ySeoPhJxRiH7naWO1bVILGb4TlacEO/VqKTPtUJC9riDPiLlkRdohT5JJEMLLpeUKGqi/MiHcVerO5N43J6DfU7C1HxWtEq7TKlkUwzmKVbCMQjKaJJFtkYiJNeNlmI1m2/WRjZZtqkK2C+ClXiXYY4CS72FR5SdQiG9+iZZ4cU+2vMk9zTOYh0QwUeufaNIvhTCVacB/D0Lcy18a8ZiNFTmBMsy3CD1nR5KqiaMzIW6p9NkEmQvAbKYKfMUUyCJo1RRbkMMBnaNTcw+zJMhL0fJVYcBw3K4YNsWbC9JnZFIt4r1ZI1Zo5oc6/3zRzBrxArHfOZAkcwUQlw3Tqw1FmRYZImDMTmzJUMNngvH5RrDztACfFgpo/oQ5i7LKqA0yXMJBEkJwMy/S8KZCQd0kBqipPeL8eRFCHkGOJ8THvVWiDE8WZ7mtJB+AlpJuzZYrmp/zU34w/Y4U7WxuGTbL5xNd0VD6mR/iOSIp7kHMkyLmfZu0udr+NfootwYg7Es5191+w3dHOEdKOXiXf/+yTKQjdpoA9+2YJEOlwhAIxuHamILJLQTRMChyMA0MlYOQ2AWLtZwCxQKzMDGVFiIdLgoPxQBsFY7sUjHuggNbfTqFArDyyFLCrhSMUxMcKFR9dFSSc9FIwGS4FYpxwhISxLhImdkmYDpcER/MD8aJmZxLE/MAwCTOJhAQUxT1IHn7AZV61lQgbZEQXKahWtAr0UUwcTB60BQsxeTCMSHtnDhstDpVujXBEP7kAcQoP/JUwlgaRFEeIEMtMcVcipOhjNp0M+qg2agHBwWxSTAI7QxBaDgvhUCFwNIXQFg0spxBBH/VGLSA4mCSI2nWGwHaS0EfJUQ8E3FVMVzDQFQvEitXYcGbQR73xHRgIN944nhqIVYHBpAZ91Bv1UOBobiBO48HkBn3UHPWQwK8YjnCgLSKIHJg+Y+yj4qiJA/eSRG3RwHaSKFcbLVaS6N2Rp+DA3w06gEqS/ExNVyLEIqXhG5Z6rS1qIcLJdUIbD2+ej2rmoY/KohYMHE0cjYFgeKlodR8eCEIq4QgHYlzvzMGbGahmDvooLupZF9zLG7VBYDlvDPsoLtKnB06BgH0eZQDJolhcnHbFwPJlx7CP4qIGDPgY8dkhkIKKZgj6qC1qgMDJ0wRtENg+TeijrKgFAgevNYm3s3aGQCxLTQxD0EdNUQMEjp4pGosFppNDRVFxCBg4eH4gKtcZAcvnB5GiiugSAnzYH8AZghjJu4Ngt5wc9VE+1AKCo4uCNhAsLwpROFQQHFwW9EUDexAUcEn0lSC4ZcwMCjV8rnnxeXFLVJItuc4T1ASKfFk2BMEmkhND0zMiT0BxTgfWeZo2h1F2teAR45tS0Pe7Bpnt7XUfalIR+Ke0F1N1qYj848yc2qVip4gcoA+9Dj3ap0WSx3JfNxOeVnQEC1RTVp+n5QjINWsz4ux3d2Mz4+ypdWfLVynEhl1m4H5/3y4D/t43vLXnb/mCAG3IZQbr93fcMuFmVaNUs26WC+5MOy29Ln5/FycDLo5UC6JZFyvK2UwbuTnCTRM58uqy7SZnIHkpYFZ/JHUxoUR8ohKhNiXkmvLZ2VmjAX0glrw8+kxs/xqc9PTsRwQ5dVHVIUhVk2AuN/hjZ8a/Vz/uyJ+/b+5uz/+8ufu2YDrNfYc4282ac5w0fmv6BTddGMl8Wj8WOSgTqDV6JVM4TpTrcAgnaRRK0UvuQsun+0ai20gxp5RN6Tp0EyZvD43td+drh/8gILr6Hw== \ No newline at end of file diff --git a/modules/FEATURES_TO_INTERFACES_IMPORTS.md b/modules/FEATURES_TO_INTERFACES_IMPORTS.md deleted file mode 100644 index 32b0cc7f..00000000 --- a/modules/FEATURES_TO_INTERFACES_IMPORTS.md +++ /dev/null @@ -1,199 +0,0 @@ -# Features → Interfaces Import Analysis - -## Summary - -This document details all imports from `features/` modules to `interfaces/` modules. - -**Total Feature Modules:** 6 -**Modules Importing from Interfaces:** 2 (only `getRootInterface` - system-level function) -**Total Interface Imports:** 2 (both for `getRootInterface` only) - -**✅ REFACTORED:** All feature modules now use `services.getInterface()` to access interfaces, following the same pattern as `chatPlayground`. Only `getRootInterface()` remains as a direct import (system-level function, not user-specific). - ---- - -## Detailed Import List - -### 1. `features/featuresLifecycle.py` - -**Imports:** -- **Line 2:** `from modules.interfaces.interfaceDbAppObjects import getRootInterface` - - **Type:** Direct import - - **Usage:** Gets root interface to retrieve event user for feature initialization - - **Context:** Used in `start()` function to get event user for automation, syncDelta, and chatAlthaus features - - **Note:** `getRootInterface()` is a system-level function (no user required), so it remains as direct import - -**Refactored:** -- ✅ **Removed:** `from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface` -- ✅ **Added:** `from modules.services import getInterface as getServices` -- ✅ **Changed:** Now uses `services.interfaceDbChat` instead of direct interface import -- **Pattern:** `services = getServices(eventUser, None)` then `services.interfaceDbChat` - -**Purpose:** Feature lifecycle management - initializes and manages all features on startup/shutdown. - ---- - -### 2. `features/automation/mainAutomation.py` - -**Imports:** -- **Line 14:** `from modules.interfaces.interfaceDbAppObjects import getRootInterface` - - **Type:** Direct import - - **Usage:** Gets root interface to retrieve event user (system-level function) - - **Context:** Used in `createAutomationEventHandler()` to get event user - - **Note:** `getRootInterface()` is a system-level function (no user required), so it remains as direct import - -**Refactored:** -- ✅ **Removed:** `from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface` -- ✅ **Removed:** `from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface` -- ✅ **Added:** `from modules.services import getInterface as getServices` -- ✅ **Changed:** All interface access now goes through services: - - `executeAutomation()`: Uses `services.interfaceDbApp` and `services.interfaceDbChat` - - `createAutomationEventHandler()`: Uses `eventServices.interfaceDbChat` and `eventServices.interfaceDbApp` -- **Pattern:** `services = getServices(user, None)` then `services.interfaceDbChat` or `services.interfaceDbApp` - -**Purpose:** Automation workflow execution and scheduling - handles automated workflow triggers and event scheduling. - ---- - -### 3. `features/chatPlayground/mainChatPlayground.py` - -**Imports:** -- ❌ **No imports from interfaces/** -- Uses `modules.services.getInterface` instead (indirect access through services layer) - -**Purpose:** Chat playground feature - interactive chat interface. - ---- - -### 4. `features/chatAlthaus/mainChatAlthaus.py` - -**Imports:** -- ❌ **No imports from interfaces/** -- Uses `modules.services.getInterface` instead (indirect access through services layer) - -**Purpose:** Chat Althaus data scheduler - scheduled data updates for Althaus preprocessing. - ---- - -### 5. `features/syncDelta/mainSyncDelta.py` - -**Imports:** -- ❌ **No imports from interfaces/** -- Uses `modules.services.getInterface` instead (indirect access through services layer) - -**Purpose:** Delta Group sync manager - synchronizes tickets to SharePoint. - ---- - -### 6. `features/neutralizePlayground/mainNeutralizePlayground.py` - -**Imports:** -- ❌ **No imports from interfaces/** -- Uses `modules.services.getInterface` instead (indirect access through services layer) - -**Purpose:** Neutralization playground - UI wrapper for data neutralization service. - ---- - -## Import Statistics - -### By Interface Module - -**`interfaceDbAppObjects`:** -- `getRootInterface`: 2 imports (system-level function, remains as direct import) - - `features/featuresLifecycle.py` (line 2) - - `features/automation/mainAutomation.py` (line 14) - -**`interfaceDbChatObjects`:** -- ✅ **Removed:** All direct imports refactored to use services layer -- Now accessed via `services.interfaceDbChat` after calling `getServices(user, None)` - -**`interfaceDbAppObjects.getInterface`:** -- ✅ **Removed:** All direct imports refactored to use services layer -- Now accessed via `services.interfaceDbApp` after calling `getServices(user, None)` - -### By Import Type - -- **Direct imports:** 2 (only `getRootInterface` - system-level function) - - `features/featuresLifecycle.py`: 1 - - `features/automation/mainAutomation.py`: 1 - -- **Services-based access:** All user-specific interface access now goes through services layer - - Pattern: `services = getServices(user, None)` then `services.interfaceDbChat` or `services.interfaceDbApp` - ---- - -## Architectural Notes - -### ✅ Correct Dependency Direction - -All imports follow the correct architectural direction: -- **features/** → **interfaces/** ✅ - -This is compliant with the dependency rules: -- Features can import from interfaces (correct) -- Features can import from services (correct) -- Features do NOT import from other features (except internal feature-to-feature imports) - -### Import Patterns - -1. **Direct Interface Access:** - - `features/automation/mainAutomation.py` - Directly imports interfaces for automation management - - `features/featuresLifecycle.py` - Directly imports `getRootInterface` for feature initialization - -2. **Indirect Access via Services:** - - `features/chatPlayground/mainChatPlayground.py` - - `features/chatAlthaus/mainChatAlthaus.py` - - `features/syncDelta/mainSyncDelta.py` - - `features/neutralizePlayground/mainNeutralizePlayground.py` - - These features use `modules.services.getInterface` which provides a service layer abstraction. - -### Usage Context - -**`interfaceDbAppObjects`:** -- Used for user management (`getRootInterface`, `getAppInterface`) -- Primarily for getting event user and creator user in automation context - -**`interfaceDbChatObjects`:** -- Used for chat/automation data access (`getChatInterface`) -- Used for automation execution, event syncing, and workflow management - ---- - -## Summary Table - -| Feature Module | Interface Imports | Import Type | Purpose | -|---------------|-------------------|-------------|---------| -| `featuresLifecycle.py` | `interfaceDbAppObjects.getRootInterface` | Direct | Feature initialization (system-level) | -| `featuresLifecycle.py` | ✅ Via `services.interfaceDbChat` | Services | Automation event sync | -| `automation/mainAutomation.py` | `interfaceDbAppObjects.getRootInterface` | Direct | Event user access (system-level) | -| `automation/mainAutomation.py` | ✅ Via `services.interfaceDbChat` | Services | Automation execution | -| `automation/mainAutomation.py` | ✅ Via `services.interfaceDbApp` | Services | User management | -| `chatPlayground/mainChatPlayground.py` | None | - | Uses services layer | -| `chatAlthaus/mainChatAlthaus.py` | None | - | Uses services layer | -| `syncDelta/mainSyncDelta.py` | None | - | Uses services layer | -| `neutralizePlayground/mainNeutralizePlayground.py` | None | - | Uses services layer | - ---- - -## Conclusion - -**Total Interface Imports:** 2 (only `getRootInterface` - system-level function) -- 2 direct imports (both for `getRootInterface`) - -**Modules Using Direct Interface Imports:** 2 out of 6 (only for `getRootInterface`) -- `features/featuresLifecycle.py` -- `features/automation/mainAutomation.py` - -**✅ REFACTORED:** All user-specific interface access now goes through services layer -- Pattern: `services = getServices(user, None)` then `services.interfaceDbChat` or `services.interfaceDbApp` -- Consistent with other feature modules (`chatPlayground`, `chatAlthaus`, `syncDelta`, `neutralizePlayground`) - -**Architectural Compliance:** ✅ **PERFECT** -- All imports follow correct direction (features → interfaces) -- Only system-level function (`getRootInterface`) remains as direct import -- All user-specific interface access goes through services layer -- Clean separation maintained -- Consistent pattern across all feature modules - diff --git a/modules/datamodels/datamodelChat.py b/modules/datamodels/datamodelChat.py index 4a678c8b..967e0d9f 100644 --- a/modules/datamodels/datamodelChat.py +++ b/modules/datamodels/datamodelChat.py @@ -62,7 +62,7 @@ class ChatLog(BaseModel): None, description="Performance metrics" ) parentId: Optional[str] = Field( - None, description="Parent log entry ID for hierarchical display" + None, description="Parent operation ID (operationId of parent operation) for hierarchical display" ) operationId: Optional[str] = Field( None, description="Operation ID to group related log entries" @@ -828,6 +828,7 @@ class TaskContext(BaseModel): failurePatterns: Optional[list[str]] = Field(default_factory=list) failedActions: Optional[list] = Field(default_factory=list) successfulActions: Optional[list] = Field(default_factory=list) + executedActions: Optional[list] = Field(default_factory=list, description="List of executed actions with action name, parameters, and step number") criteriaProgress: Optional[dict] = None # Stage 2 context fields (NEW) diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 7e3c85e6..de4abc7e 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -570,7 +570,7 @@ class ChatObjects: allWorkflows = self.db.getRecordset(ChatWorkflow) filteredWorkflows = self._uam(ChatWorkflow, allWorkflows) - # If no pagination requested, return all items + # If no pagination requested, return all items (no sorting - frontend handles it) if pagination is None: return filteredWorkflows @@ -578,7 +578,7 @@ class ChatObjects: if pagination.filters: filteredWorkflows = self._applyFilters(filteredWorkflows, pagination.filters) - # Apply sorting (in order of sortFields) + # Apply sorting (in order of sortFields) - only if provided by frontend if pagination.sort: filteredWorkflows = self._applySorting(filteredWorkflows, pagination.sort) diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index b7d7aea0..87afd365 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -205,10 +205,8 @@ Respond with ONLY a JSON object in this exact format: documentMetadata = None # Store document metadata (title, filename) from first iteration accumulationState = None # Track accumulation state for string accumulation - # Get parent log ID for iteration operations - parentLogId = None - if operationId: - parentLogId = self.services.chat.getOperationLogId(operationId) + # Get parent operation ID for iteration operations (parentId should be operationId, not log entry ID) + parentOperationId = operationId # Use the parent's operationId directly while iteration < maxIterations: iteration += 1 @@ -222,7 +220,7 @@ Respond with ONLY a JSON object in this exact format: "AI Call", f"Iteration {iteration}", "", - parentId=parentLogId + parentOperationId=parentOperationId ) # Build iteration prompt @@ -235,11 +233,14 @@ Respond with ONLY a JSON object in this exact format: logger.warning(f"Iteration {iteration}: No previous response available for continuation!") # Filter promptArgs to only include parameters that buildGenerationPrompt accepts - # buildGenerationPrompt accepts: outputFormat, userPrompt, title, extracted_content, continuationContext + # buildGenerationPrompt accepts: outputFormat, userPrompt, title, extracted_content, continuationContext, services filteredPromptArgs = { k: v for k, v in promptArgs.items() - if k in ['outputFormat', 'userPrompt', 'title', 'extracted_content'] + if k in ['outputFormat', 'userPrompt', 'title', 'extracted_content', 'services'] } + # Always include services if available + if not filteredPromptArgs.get('services') and hasattr(self, 'services'): + filteredPromptArgs['services'] = self.services # Rebuild prompt with continuation context using the provided prompt builder iterationPrompt = await promptBuilder(**filteredPromptArgs, continuationContext=continuationContext) @@ -266,9 +267,20 @@ Respond with ONLY a JSON object in this exact format: response = await self.callAi(request) result = response.content - # Update progress after AI call + # Track bytes for progress reporting + bytesReceived = len(result.encode('utf-8')) if result else 0 + totalBytesSoFar = sum(len(section.get('content', '').encode('utf-8')) if isinstance(section.get('content'), str) else 0 for section in allSections) + bytesReceived + + # Update progress after AI call with byte information if iterationOperationId: - self.services.chat.progressLogUpdate(iterationOperationId, 0.6, "AI response received") + # Format bytes for display (kB or MB) + if totalBytesSoFar < 1024: + bytesDisplay = f"{totalBytesSoFar}B" + elif totalBytesSoFar < 1024 * 1024: + bytesDisplay = f"{totalBytesSoFar / 1024:.1f}kB" + else: + bytesDisplay = f"{totalBytesSoFar / (1024 * 1024):.1f}MB" + self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})") # Write raw AI response to debug file if iteration == 1: @@ -469,8 +481,24 @@ Respond with ONLY a JSON object in this exact format: # The break can occur anywhere - in any section, at any depth allSections = JsonResponseHandler.mergeSectionsIntelligently(allSections, extractedSections, iteration) - # Log merged sections for debugging + # Calculate total bytes in merged content for progress display merged_json_str = json.dumps(allSections, indent=2, ensure_ascii=False) + totalBytesGenerated = len(merged_json_str.encode('utf-8')) + + # Update main operation with byte progress + if operationId: + # Format bytes for display + if totalBytesGenerated < 1024: + bytesDisplay = f"{totalBytesGenerated}B" + elif totalBytesGenerated < 1024 * 1024: + bytesDisplay = f"{totalBytesGenerated / 1024:.1f}kB" + else: + bytesDisplay = f"{totalBytesGenerated / (1024 * 1024):.1f}MB" + # Estimate progress based on iterations (rough estimate) + estimatedProgress = min(0.9, 0.4 + (iteration * 0.1)) + self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})") + + # Log merged sections for debugging self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}") # Check if we should continue (completion detection) @@ -485,14 +513,40 @@ Respond with ONLY a JSON object in this exact format: if shouldContinue: # Finish iteration operation (will continue with next iteration) if iterationOperationId: + # Show byte progress in iteration completion + iterBytes = len(result.encode('utf-8')) if result else 0 + if iterBytes < 1024: + iterBytesDisplay = f"{iterBytes}B" + elif iterBytes < 1024 * 1024: + iterBytesDisplay = f"{iterBytes / 1024:.1f}kB" + else: + iterBytesDisplay = f"{iterBytes / (1024 * 1024):.1f}MB" + self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Completed ({iterBytesDisplay})") self.services.chat.progressLogFinish(iterationOperationId, True) continue else: # Done - finish iteration and update main operation if iterationOperationId: + # Show final byte count + finalBytes = len(merged_json_str.encode('utf-8')) + if finalBytes < 1024: + finalBytesDisplay = f"{finalBytes}B" + elif finalBytes < 1024 * 1024: + finalBytesDisplay = f"{finalBytes / 1024:.1f}kB" + else: + finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB" + self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Complete ({finalBytesDisplay})") self.services.chat.progressLogFinish(iterationOperationId, True) if operationId: - self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)") + # Show final size in main operation + finalBytes = len(merged_json_str.encode('utf-8')) + if finalBytes < 1024: + finalBytesDisplay = f"{finalBytes}B" + elif finalBytes < 1024 * 1024: + finalBytesDisplay = f"{finalBytes / 1024:.1f}kB" + else: + finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB" + self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete: {finalBytesDisplay} ({iteration} iterations, {len(allSections)} sections)") logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections") break @@ -907,7 +961,7 @@ If no trackable items can be identified, return: {{"kpis": []}} # Debug: persist prompt/response for analysis with context-specific naming debugPrefix = debugType if debugType else "plan" self.services.utils.writeDebugFile(fullPrompt, f"{debugPrefix}_prompt") - response = await self.aiObjects.call(request) + response = await self.aiObjects.callWithTextContext(request) result = response.content or "" self.services.utils.writeDebugFile(result, f"{debugPrefix}_response") return result @@ -941,10 +995,8 @@ If no trackable items can be identified, return: {{"kpis": []}} workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" aiOperationId = f"ai_content_{workflowId}_{int(time.time())}" - # Get parent log ID if parent operation exists - parentLogId = None - if parentOperationId: - parentLogId = self.services.chat.getOperationLogId(parentOperationId) + # Use parent operation ID directly (parentId should be operationId, not log entry ID) + # parentOperationId is already the operationId of the parent # Start progress tracking with parent reference self.services.chat.progressLogStart( @@ -952,7 +1004,7 @@ If no trackable items can be identified, return: {{"kpis": []}} "AI content processing", "Content Processing", f"Format: {outputFormat or 'text'}", - parentId=parentLogId + parentOperationId=parentOperationId ) try: @@ -1122,7 +1174,7 @@ If no trackable items can be identified, return: {{"kpis": []}} self.services.utils.writeDebugFile(extractionPrompt, "content_extraction_prompt") # Call generic content parts processor - handles images, text, chunking, merging - extractionResponse = await self.aiObjects.call(extractionRequest) + extractionResponse = await self.callAi(extractionRequest) # Write debug file for extraction response if extractionResponse.content: @@ -1153,14 +1205,15 @@ If no trackable items can be identified, return: {{"kpis": []}} from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt generation_prompt = await buildGenerationPrompt( - outputFormat, prompt, title, content_for_generation, None + outputFormat, prompt, title, content_for_generation, None, self.services ) promptArgs = { "outputFormat": outputFormat, "userPrompt": prompt, "title": title, - "extracted_content": content_for_generation + "extracted_content": content_for_generation, + "services": self.services } self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation") @@ -1169,6 +1222,7 @@ If no trackable items can be identified, return: {{"kpis": []}} if promptArgs: userPrompt = promptArgs.get("userPrompt") or promptArgs.get("user_prompt") + # Track generation progress - the looping function will update with byte progress generated_json = await self._callAiWithLooping( generation_prompt, options, @@ -1179,7 +1233,16 @@ If no trackable items can be identified, return: {{"kpis": []}} userPrompt=userPrompt ) - self.services.chat.progressLogUpdate(aiOperationId, 0.7, "Parsing generated JSON") + # Calculate final size for completion message + finalSize = len(generated_json.encode('utf-8')) if generated_json else 0 + if finalSize < 1024: + finalSizeDisplay = f"{finalSize}B" + elif finalSize < 1024 * 1024: + finalSizeDisplay = f"{finalSize / 1024:.1f}kB" + else: + finalSizeDisplay = f"{finalSize / (1024 * 1024):.1f}MB" + + self.services.chat.progressLogUpdate(aiOperationId, 0.7, f"Parsing generated JSON ({finalSizeDisplay})") try: extracted_json = self.services.utils.jsonExtractString(generated_json) generated_data = json.loads(extracted_json) @@ -1210,13 +1273,13 @@ If no trackable items can be identified, return: {{"kpis": []}} # Create separate operation for content rendering renderOperationId = f"{aiOperationId}_render" - renderParentLogId = self.services.chat.getOperationLogId(aiOperationId) + # Use aiOperationId directly as parentOperationId (operationId, not log entry ID) self.services.chat.progressLogStart( renderOperationId, "Content Rendering", "Rendering", f"Format: {outputFormat}", - parentId=renderParentLogId + parentOperationId=aiOperationId ) try: diff --git a/modules/services/serviceChat/mainServiceChat.py b/modules/services/serviceChat/mainServiceChat.py index 679b9d17..cb05279f 100644 --- a/modules/services/serviceChat/mainServiceChat.py +++ b/modules/services/serviceChat/mainServiceChat.py @@ -1015,10 +1015,19 @@ class ChatService: def createProgressLogger(self) -> ProgressLogger: return ProgressLogger(self.services) - def progressLogStart(self, operationId: str, serviceName: str, actionName: str, context: str = "", parentId: Optional[str] = None): - """Wrapper for ProgressLogger.startOperation""" + def progressLogStart(self, operationId: str, serviceName: str, actionName: str, context: str = "", parentOperationId: Optional[str] = None): + """Wrapper for ProgressLogger.startOperation + + Args: + operationId: Unique identifier for the operation + serviceName: Name of the service + actionName: Name of the action + context: Additional context information + parentOperationId: Optional parent operation ID (operationId of parent operation) + The parentId in ChatLog will be set to this parentOperationId + """ progressLogger = self._getProgressLogger() - return progressLogger.startOperation(operationId, serviceName, actionName, context, parentId) + return progressLogger.startOperation(operationId, serviceName, actionName, context, parentOperationId) def progressLogUpdate(self, operationId: str, progress: float, statusUpdate: str = ""): """Wrapper for ProgressLogger.updateOperation""" diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py index 360bfc23..e8249180 100644 --- a/modules/services/serviceExtraction/mainServiceExtraction.py +++ b/modules/services/serviceExtraction/mainServiceExtraction.py @@ -34,13 +34,21 @@ class ExtractionService: if model is None or model.calculatePriceUsd is None: raise RuntimeError(f"FATAL: Required internal model '{modelDisplayName}' is not available. Check connector registration.") - def extractContent(self, documents: List[ChatDocument], options: ExtractionOptions) -> List[ContentExtracted]: + def extractContent( + self, + documents: List[ChatDocument], + options: ExtractionOptions, + operationId: Optional[str] = None, + parentOperationId: Optional[str] = None + ) -> List[ContentExtracted]: """ Extract content from a list of ChatDocument objects. Args: documents: List of ChatDocument objects to extract content from options: Extraction options including maxSize, chunkAllowed, mergeStrategy, etc. + operationId: Optional operation ID for progress logging (parent operation) + parentOperationId: Optional parent operation ID for hierarchical logging Returns: List of ContentExtracted objects, one per input document @@ -52,125 +60,172 @@ class ExtractionService: from modules.interfaces.interfaceDbComponentObjects import getInterface dbInterface = getInterface() + totalDocs = len(documents) + for i, doc in enumerate(documents): - logger.info(f"=== DOCUMENT {i}: {doc.fileName} ===") + logger.info(f"=== DOCUMENT {i + 1}/{totalDocs}: {doc.fileName} ===") logger.info(f"Initial MIME type: {doc.mimeType}") + # Create child operation for this document if parent operationId is provided + docOperationId = None + if operationId: + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + docOperationId = f"{operationId}_doc_{i}" + self.services.chat.progressLogStart( + docOperationId, + "Extracting Document", + f"Document {i + 1}/{totalDocs}", + doc.fileName[:50] + "..." if len(doc.fileName) > 50 else doc.fileName, + parentOperationId=operationId # Use operationId as parent (not parentOperationId) + ) + # Start timing for this document startTime = time.time() - # Resolve raw bytes for this document using interface - documentBytes = dbInterface.getFileData(doc.fileId) - if not documentBytes: - raise ValueError(f"No file data found for fileId={doc.fileId}") - - # Convert ChatDocument to the format expected by runExtraction - documentData = { - "id": doc.id, - "bytes": documentBytes, - "fileName": doc.fileName, - "mimeType": doc.mimeType - } - - ec = runExtraction( - extractorRegistry=self._extractorRegistry, - chunkerRegistry=self._chunkerRegistry, - documentBytes=documentData["bytes"], - fileName=documentData["fileName"], - mimeType=documentData["mimeType"], - options=options - ) - - # Log content parts metadata - logger.debug(f"Content parts: {len(ec.parts)}") - for j, part in enumerate(ec.parts): - logger.debug(f" Part {j}: {part.typeGroup} ({part.mimeType}) - {len(part.data) if part.data else 0} chars") - if part.metadata: - logger.debug(f" Metadata: {part.metadata}") - - # Attach document id and MIME type to parts if missing - for p in ec.parts: - if "documentId" not in p.metadata: - p.metadata["documentId"] = documentData["id"] or str(uuid.uuid4()) - if "documentMimeType" not in p.metadata: - p.metadata["documentMimeType"] = documentData["mimeType"] - - # Log chunking information - chunkedParts = [p for p in ec.parts if p.metadata.get("chunk", False)] - if chunkedParts: - logger.debug(f"=== CHUNKING RESULTS ===") - logger.debug(f"Total parts: {len(ec.parts)}") - logger.debug(f"Chunked parts: {len(chunkedParts)}") - for chunk in chunkedParts: - logger.debug(f" Chunk: {chunk.label} - {len(chunk.data)} chars (parent: {chunk.parentId})") - else: - logger.debug(f"No chunking needed - {len(ec.parts)} parts fit within size limits") - - # Calculate timing and emit stats - endTime = time.time() - processingTime = endTime - startTime - bytesSent = len(documentBytes) - bytesReceived = sum(len(part.data) if part.data else 0 for part in ec.parts) - - # Emit stats for extraction operation - - # Use internal extraction model for pricing - modelDisplayName = "Internal Document Extractor" - model = modelRegistry.getModel(modelDisplayName) - # Hard fail if model is missing; caller must ensure connectors are registered - if model is None or model.calculatePriceUsd is None: - raise RuntimeError(f"Pricing model not available: {modelDisplayName}") - priceUsd = model.calculatePriceUsd(processingTime, bytesSent, bytesReceived) - - # Create AiCallResponse with real calculation - # Use model.name for the response (API identifier), not displayName - aiResponse = AiCallResponse( - content="", # No content for extraction stats needed - modelName=model.name, - priceUsd=priceUsd, - processingTime=processingTime, - bytesSent=bytesSent, - bytesReceived=bytesReceived, - errorCount=0 - ) - - self.services.chat.storeWorkflowStat( - self.services.workflow, - aiResponse, - f"extraction.process.{doc.mimeType}" - ) - - # Write extraction results to debug file try: - from modules.shared.debugLogger import writeDebugFile - import json - # Create summary of extraction results for debug - extractionSummary = { - "documentName": doc.fileName, - "documentMimeType": doc.mimeType, - "partsCount": len(ec.parts), - "parts": [] - } - for part in ec.parts: - partSummary = { - "typeGroup": part.typeGroup, - "mimeType": part.mimeType, - "label": part.label, - "dataLength": len(part.data) if part.data else 0, - "metadata": part.metadata - } - # Include data preview for small parts (first 500 chars) - if part.data and len(part.data) <= 500: - partSummary["dataPreview"] = part.data[:500] - elif part.data: - partSummary["dataPreview"] = f"[Large data: {len(part.data)} chars - truncated]" - extractionSummary["parts"].append(partSummary) + if docOperationId: + self.services.chat.progressLogUpdate(docOperationId, 0.1, "Loading document data") - writeDebugFile(json.dumps(extractionSummary, indent=2, ensure_ascii=False), f"extraction_result_{doc.fileName}") - except Exception as e: - logger.debug(f"Failed to write extraction debug file: {str(e)}") + # Resolve raw bytes for this document using interface + documentBytes = dbInterface.getFileData(doc.fileId) + if not documentBytes: + if docOperationId: + self.services.chat.progressLogFinish(docOperationId, False) + raise ValueError(f"No file data found for fileId={doc.fileId}") + + if docOperationId: + self.services.chat.progressLogUpdate(docOperationId, 0.2, "Running extraction pipeline") + + # Convert ChatDocument to the format expected by runExtraction + documentData = { + "id": doc.id, + "bytes": documentBytes, + "fileName": doc.fileName, + "mimeType": doc.mimeType + } + + ec = runExtraction( + extractorRegistry=self._extractorRegistry, + chunkerRegistry=self._chunkerRegistry, + documentBytes=documentData["bytes"], + fileName=documentData["fileName"], + mimeType=documentData["mimeType"], + options=options + ) + + if docOperationId: + self.services.chat.progressLogUpdate(docOperationId, 0.7, f"Extracted {len(ec.parts)} parts") + + # Log content parts metadata + logger.debug(f"Content parts: {len(ec.parts)}") + for j, part in enumerate(ec.parts): + logger.debug(f" Part {j + 1}/{len(ec.parts)}: {part.typeGroup} ({part.mimeType}) - {len(part.data) if part.data else 0} chars") + if part.metadata: + logger.debug(f" Metadata: {part.metadata}") - results.append(ec) + # Attach document id and MIME type to parts if missing + for p in ec.parts: + if "documentId" not in p.metadata: + p.metadata["documentId"] = documentData["id"] or str(uuid.uuid4()) + if "documentMimeType" not in p.metadata: + p.metadata["documentMimeType"] = documentData["mimeType"] + + # Log chunking information + chunkedParts = [p for p in ec.parts if p.metadata.get("chunk", False)] + if chunkedParts: + logger.debug(f"=== CHUNKING RESULTS ===") + logger.debug(f"Total parts: {len(ec.parts)}") + logger.debug(f"Chunked parts: {len(chunkedParts)}") + for chunk in chunkedParts: + logger.debug(f" Chunk: {chunk.label} - {len(chunk.data)} chars (parent: {chunk.parentId})") + else: + logger.debug(f"No chunking needed - {len(ec.parts)} parts fit within size limits") + + if docOperationId: + self.services.chat.progressLogUpdate(docOperationId, 0.9, f"Processing complete: {len(ec.parts)} parts extracted") + + # Calculate timing and emit stats + endTime = time.time() + processingTime = endTime - startTime + bytesSent = len(documentBytes) + bytesReceived = sum(len(part.data) if part.data else 0 for part in ec.parts) + + # Emit stats for extraction operation + + # Use internal extraction model for pricing + modelDisplayName = "Internal Document Extractor" + model = modelRegistry.getModel(modelDisplayName) + # Hard fail if model is missing; caller must ensure connectors are registered + if model is None or model.calculatePriceUsd is None: + if docOperationId: + self.services.chat.progressLogFinish(docOperationId, False) + raise RuntimeError(f"Pricing model not available: {modelDisplayName}") + priceUsd = model.calculatePriceUsd(processingTime, bytesSent, bytesReceived) + + # Create AiCallResponse with real calculation + # Use model.name for the response (API identifier), not displayName + aiResponse = AiCallResponse( + content="", # No content for extraction stats needed + modelName=model.name, + priceUsd=priceUsd, + processingTime=processingTime, + bytesSent=bytesSent, + bytesReceived=bytesReceived, + errorCount=0 + ) + + self.services.chat.storeWorkflowStat( + self.services.workflow, + aiResponse, + f"extraction.process.{doc.mimeType}" + ) + + # Write extraction results to debug file + try: + from modules.shared.debugLogger import writeDebugFile + import json + # Create summary of extraction results for debug + extractionSummary = { + "documentName": doc.fileName, + "documentMimeType": doc.mimeType, + "partsCount": len(ec.parts), + "parts": [] + } + for part in ec.parts: + partSummary = { + "typeGroup": part.typeGroup, + "mimeType": part.mimeType, + "label": part.label, + "dataLength": len(part.data) if part.data else 0, + "metadata": part.metadata + } + # Include data preview for small parts (first 500 chars) + if part.data and len(part.data) <= 500: + partSummary["dataPreview"] = part.data[:500] + elif part.data: + partSummary["dataPreview"] = f"[Large data: {len(part.data)} chars - truncated]" + extractionSummary["parts"].append(partSummary) + + writeDebugFile(json.dumps(extractionSummary, indent=2, ensure_ascii=False), f"extraction_result_{doc.fileName}") + except Exception as e: + logger.debug(f"Failed to write extraction debug file: {str(e)}") + + results.append(ec) + + # Finish document operation successfully + if docOperationId: + self.services.chat.progressLogFinish(docOperationId, True) + + except Exception as e: + logger.error(f"Error extracting content from document {i + 1}/{totalDocs} ({doc.fileName}): {str(e)}") + if docOperationId: + try: + self.services.chat.progressLogFinish(docOperationId, False) + except: + pass # Don't fail on progress logging errors + # Continue with next document instead of failing completely + # This allows parallel processing to continue even if one document fails + continue return results @@ -481,7 +536,8 @@ class ExtractionService: # Extract content WITHOUT chunking if operationId: self.services.chat.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents") - extractionResult = self.extractContent(documents, extractionOptions) + # Pass operationId as parentOperationId for hierarchical logging + extractionResult = self.extractContent(documents, extractionOptions, operationId=operationId, parentOperationId=parentOperationId) if not isinstance(extractionResult, list): if operationId: @@ -491,11 +547,9 @@ class ExtractionService: # Process parts (not chunks) with model-aware AI calls if operationId: self.services.chat.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts") - # Get parent log ID for part operations - parentLogId = None - if operationId: - parentLogId = self.services.chat.getOperationLogId(operationId) - partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId, parentLogId) + # Use parent operation ID directly (parentId should be operationId, not log entry ID) + parentOperationId = operationId # Use the parent's operationId directly + partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId, parentOperationId) # Merge results using existing merging system if operationId: @@ -522,7 +576,7 @@ class ExtractionService: aiObjects: Any, options: Optional[AiCallOptions] = None, operationId: Optional[str] = None, - parentLogId: Optional[str] = None + parentOperationId: Optional[str] = None ) -> List[PartResult]: """Process content parts with model-aware chunking and proper mapping.""" @@ -569,7 +623,7 @@ class ExtractionService: "Content Processing", f"Part {part_index + 1}", f"Type: {part.typeGroup}", - parentId=parentLogId + parentOperationId=parentOperationId ) try: diff --git a/modules/services/serviceExtraction/subPromptBuilderExtraction.py b/modules/services/serviceExtraction/subPromptBuilderExtraction.py index 7b91579a..9bd503bc 100644 --- a/modules/services/serviceExtraction/subPromptBuilderExtraction.py +++ b/modules/services/serviceExtraction/subPromptBuilderExtraction.py @@ -99,6 +99,18 @@ async def buildExtractionPrompt( # Parse extraction intent if AI service is available extraction_intent = await _parseExtractionIntent(userPrompt, outputFormat, aiService, services) if aiService else userPrompt + # Extract user language for document language instruction + userLanguage = 'en' # Default fallback + if services: + try: + # Prefer detected language if available + if hasattr(services, 'currentUserLanguage') and services.currentUserLanguage: + userLanguage = services.currentUserLanguage + elif hasattr(services, 'user') and services.user and hasattr(services.user, 'language'): + userLanguage = services.user.language + except Exception: + pass + # Build base prompt with clear user prompt markers sanitized_user_prompt = services.utils.sanitizePromptContent(userPrompt, 'userinput') if services else userPrompt adaptive_prompt = f""" @@ -114,6 +126,8 @@ You are a document processing assistant that extracts and structures content fro TASK: Extract the actual content from the document and organize it into documents. For single documents, create one document entry. For multi-document requests, create multiple document entries. +LANGUAGE REQUIREMENT: All extracted content must be in the language '{userLanguage}'. Extract and preserve content in this language. + {extraction_intent} REQUIREMENTS: diff --git a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py index 069601bc..bea6887c 100644 --- a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py +++ b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py @@ -362,7 +362,7 @@ class BaseRenderer(ABC): self.logger.debug(f"AI Style Template Prompt:") self.logger.debug(f"{styleTemplate}") - response = await aiService.aiObjects.call(request) + response = await aiService.callAi(request) # Save styling prompt and response to debug self.services.utils.writeDebugFile(styleTemplate, "renderer_styling_prompt") diff --git a/modules/services/serviceGeneration/renderers/rendererImage.py b/modules/services/serviceGeneration/renderers/rendererImage.py index ca51a73a..37b72c45 100644 --- a/modules/services/serviceGeneration/renderers/rendererImage.py +++ b/modules/services/serviceGeneration/renderers/rendererImage.py @@ -205,7 +205,7 @@ Return only the compressed prompt, no explanations. ) ) - response = await aiService.aiObjects.call(request) + response = await aiService.callAi(request) compressed = response.content.strip() # Validate the compressed prompt diff --git a/modules/services/serviceGeneration/renderers/rendererPdf.py b/modules/services/serviceGeneration/renderers/rendererPdf.py index 2c6ea293..053b72de 100644 --- a/modules/services/serviceGeneration/renderers/rendererPdf.py +++ b/modules/services/serviceGeneration/renderers/rendererPdf.py @@ -227,7 +227,7 @@ class RendererPdf(BaseRenderer): self.logger.warning("AI service not properly configured, using defaults") return default_styles - response = await ai_service.aiObjects.call(request) + response = await ai_service.callAi(request) # Check if response is valid if not response: diff --git a/modules/services/serviceGeneration/renderers/rendererPptx.py b/modules/services/serviceGeneration/renderers/rendererPptx.py index 8f10a9a2..d1cd2090 100644 --- a/modules/services/serviceGeneration/renderers/rendererPptx.py +++ b/modules/services/serviceGeneration/renderers/rendererPptx.py @@ -424,7 +424,7 @@ JSON ONLY. NO OTHER TEXT.""" self.logger.warning("AI service not properly configured, using defaults") return default_styles - response = await aiService.aiObjects.call(request) + response = await aiService.callAi(request) # Check if response is valid if not response: diff --git a/modules/services/serviceGeneration/renderers/rendererXlsx.py b/modules/services/serviceGeneration/renderers/rendererXlsx.py index b797aba3..37fd7470 100644 --- a/modules/services/serviceGeneration/renderers/rendererXlsx.py +++ b/modules/services/serviceGeneration/renderers/rendererXlsx.py @@ -346,7 +346,7 @@ class RendererXlsx(BaseRenderer): requestOptions.operationType = OperationTypeEnum.DATA_GENERATE request = AiCallRequest(prompt=styleTemplate, context="", options=requestOptions) - response = await aiService.aiObjects.call(request) + response = await aiService.callAi(request) import json import re diff --git a/modules/services/serviceGeneration/subPromptBuilderGeneration.py b/modules/services/serviceGeneration/subPromptBuilderGeneration.py index 91011539..d593a626 100644 --- a/modules/services/serviceGeneration/subPromptBuilderGeneration.py +++ b/modules/services/serviceGeneration/subPromptBuilderGeneration.py @@ -16,7 +16,8 @@ async def buildGenerationPrompt( userPrompt: str, title: str, extracted_content: str = None, - continuationContext: Dict[str, Any] = None + continuationContext: Dict[str, Any] = None, + services: Any = None ) -> str: """ Build the unified generation prompt using a single JSON template. @@ -28,10 +29,23 @@ async def buildGenerationPrompt( title: Title for the document extracted_content: Optional extracted content from documents to prepend to prompt continuationContext: Optional context from previous generation for continuation + services: Optional services instance for accessing user language Returns: Complete generation prompt string """ + # Extract user language for document language instruction + userLanguage = 'en' # Default fallback + if services: + try: + # Prefer detected language if available + if hasattr(services, 'currentUserLanguage') and services.currentUserLanguage: + userLanguage = services.currentUserLanguage + elif hasattr(services, 'user') and services.user and hasattr(services.user, 'language'): + userLanguage = services.user.language + except Exception: + pass + # Create a template - let AI generate title if not provided titleValue = title if title else "Generated Document" jsonTemplate = jsonTemplateDocument.replace("{{DOCUMENT_TITLE}}", titleValue) @@ -82,6 +96,8 @@ END OF USER REQUEST / USER PROMPT ⚠️ CONTINUATION MODE: Response was incomplete. Generate ONLY the remaining content. +LANGUAGE REQUIREMENT: All generated content must be in the language '{userLanguage}'. Generate all text, headings, paragraphs, and content in this language. + {continuationText} JSON structure template: @@ -92,6 +108,7 @@ Rules: - Reference elements shown above are ALREADY DELIVERED - DO NOT repeat them. - Generate ONLY the remaining content that comes AFTER the reference elements. - DO NOT regenerate the entire JSON structure - start directly with what comes next. +- All content must be in the language '{userLanguage}'. - Output JSON only; no markdown fences or extra text. Continue generating the remaining content now. @@ -124,6 +141,8 @@ EXTRACTED CONTENT FROM DOCUMENTS: END OF EXTRACTED CONTENT {'='*80} +LANGUAGE REQUIREMENT: All generated content must be in the language '{userLanguage}'. Generate all text, headings, paragraphs, and content in this language. If the extracted content is in a different language, translate it to '{userLanguage}' while preserving the structure and meaning. + Generate a VALID JSON response using the EXTRACTED CONTENT above as your data source. The JSON structure template below shows ONLY the structure pattern - the example values are NOT real data. You MUST use the actual data from EXTRACTED CONTENT above, NOT the example values from the template. @@ -136,6 +155,7 @@ Instructions: - Do NOT reuse example section IDs; create your own. - CRITICAL: Use the ACTUAL DATA from EXTRACTED CONTENT above, NOT the example values from the template. - Generate complete content based on the user request and the extracted content. Do NOT just give an instruction or comments. Deliver the complete response. +- All content must be in the language '{userLanguage}'. - IMPORTANT: Set a meaningful "filename" in each document with appropriate file extension (e.g., "prime_numbers.txt", "report.docx", "data.json"). The filename should reflect the content and task objective. - Output JSON only; no markdown fences or extra text. @@ -151,6 +171,8 @@ USER REQUEST / USER PROMPT: END OF USER REQUEST / USER PROMPT {'='*80} +LANGUAGE REQUIREMENT: All generated content must be in the language '{userLanguage}'. Generate all text, headings, paragraphs, and content in this language. + Generate a VALID JSON response for the user request. The template below shows ONLY the structure pattern - it is NOT existing content. JSON structure template: @@ -160,6 +182,7 @@ Instructions: - Return ONLY valid JSON (strict). No comments. No trailing commas. Use double quotes. - Do NOT reuse example section IDs; create your own. - Generate complete content based on the user request. Do NOT just give an instruction or comments. Deliver the complete response. +- All content must be in the language '{userLanguage}'. - IMPORTANT: Set a meaningful "filename" in each document with appropriate file extension (e.g., "prime_numbers.txt", "report.docx", "data.json"). The filename should reflect the content and task objective. - Output JSON only; no markdown fences or extra text. diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py index b771cb9d..7ce5a72f 100644 --- a/modules/services/serviceWeb/mainServiceWeb.py +++ b/modules/services/serviceWeb/mainServiceWeb.py @@ -114,16 +114,14 @@ class WebService: self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") - # Get parent log ID for URL-level operations - parentLogId = None - if operationId: - parentLogId = self.services.chat.getOperationLogId(operationId) + # Use parent operation ID directly (parentId should be operationId, not log entry ID) + parentOperationId = operationId # Use the parent's operationId directly crawlResult = await self._performWebCrawl( instruction=instruction, urls=allUrls, maxDepth=maxDepth, - parentLogId=parentLogId + parentOperationId=parentOperationId ) if operationId: @@ -131,18 +129,95 @@ class WebService: self.services.chat.progressLogUpdate(operationId, 0.95, "Completed") self.services.chat.progressLogFinish(operationId, True) - # Return consolidated result + # Calculate statistics about crawl results + totalResults = len(crawlResult) if isinstance(crawlResult, list) else 1 + totalContentLength = 0 + urlsWithContent = 0 + + # Analyze crawl results to gather statistics + if isinstance(crawlResult, list): + for item in crawlResult: + if isinstance(item, dict): + if item.get("url"): + urlsWithContent += 1 + content = item.get("content", "") + if isinstance(content, str): + totalContentLength += len(content) + elif isinstance(content, dict): + # Estimate size from dict + totalContentLength += len(str(content)) + elif isinstance(crawlResult, dict): + if crawlResult.get("url"): + urlsWithContent = 1 + content = crawlResult.get("content", "") + if isinstance(content, str): + totalContentLength = len(content) + elif isinstance(content, dict): + totalContentLength = len(str(content)) + + # Convert crawl results into sections format for generic validator + sections = [] + if isinstance(crawlResult, list): + for idx, item in enumerate(crawlResult): + if isinstance(item, dict): + section = { + "id": f"result_{idx}", + "content_type": "paragraph", + "title": item.get("url", f"Result {idx + 1}"), + "order": idx + } + # Add content preview + content = item.get("content", "") + if isinstance(content, str) and content: + section["textPreview"] = content[:200] + ("..." if len(content) > 200 else "") + sections.append(section) + elif isinstance(crawlResult, dict): + section = { + "id": "result_0", + "content_type": "paragraph", + "title": crawlResult.get("url", "Research Result"), + "order": 0 + } + content = crawlResult.get("content", "") + if isinstance(content, str) and content: + section["textPreview"] = content[:200] + ("..." if len(content) > 200 else "") + sections.append(section) + + # Return consolidated result with metadata in format that generic validator understands result = { + "metadata": { + "title": suggestedFilename or instruction[:100] if instruction else "Web Research Results", + "extraction_method": "web_crawl", + "research_depth": finalResearchDepth, + "max_depth": maxDepth, + "country": countryCode, + "language": languageCode, + "urls_crawled": allUrls[:20], # First 20 URLs for reference + "total_urls": len(allUrls), + "urls_with_content": urlsWithContent, + "total_content_length": totalContentLength, + "crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None + }, + "sections": sections, + "statistics": { + "sectionCount": len(sections), + "total_urls": len(allUrls), + "results_count": totalResults, + "urls_with_content": urlsWithContent, + "total_content_length": totalContentLength + }, + # Keep original structure for backward compatibility "instruction": instruction, "urls_crawled": allUrls, "total_urls": len(allUrls), "results": crawlResult, - "total_results": len(crawlResult) if isinstance(crawlResult, list) else 1 + "total_results": totalResults } # Add suggested filename if available if suggestedFilename: result["suggested_filename"] = suggestedFilename + result["metadata"]["suggested_filename"] = suggestedFilename return result @@ -311,7 +386,7 @@ Return ONLY valid JSON, no additional text: instruction: str, urls: List[str], maxDepth: int = 2, - parentLogId: Optional[str] = None + parentOperationId: Optional[str] = None ) -> List[Dict[str, Any]]: """Perform web crawl on list of URLs - calls plugin for each URL individually.""" crawlResults = [] @@ -320,7 +395,7 @@ Return ONLY valid JSON, no additional text: for urlIndex, url in enumerate(urls): # Create separate operation for each URL with parent reference urlOperationId = None - if parentLogId: + if parentOperationId: workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}" self.services.chat.progressLogStart( @@ -328,21 +403,23 @@ Return ONLY valid JSON, no additional text: "Web Crawl", f"URL {urlIndex + 1}", url[:50] + "..." if len(url) > 50 else url, - parentId=parentLogId + parentOperationId=parentOperationId ) try: - logger.info(f"Crawling URL: {url}") + logger.info(f"Crawling URL {urlIndex + 1}/{len(urls)}: {url}") if urlOperationId: - self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating") + displayUrl = url[:50] + "..." if len(url) > 50 else url + self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}") + self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl") # Build crawl prompt model for single URL crawlPromptModel = AiCallPromptWebCrawl( instruction=instruction, url=url, # Single URL maxDepth=maxDepth, - maxWidth=50 + maxWidth=5 # Default: 5 pages per level ) crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2) @@ -356,16 +433,19 @@ Return ONLY valid JSON, no additional text: resultFormat="json" ) - # Use unified callAiContent method + if urlOperationId: + self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector") + + # Use unified callAiContent method with parentOperationId for hierarchical logging crawlResponse = await self.services.ai.callAiContent( prompt=crawlPrompt, options=crawlOptions, - outputFormat="json" + outputFormat="json", + parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging ) if urlOperationId: - self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Completed") - self.services.chat.progressLogFinish(urlOperationId, True) + self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results") # Extract content from AiResponse crawlResult = crawlResponse.content @@ -387,16 +467,30 @@ Return ONLY valid JSON, no additional text: else: crawlData = crawlResult + # Process crawl results and create hierarchical progress logging for sub-URLs + if urlOperationId: + self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results") + + # Recursively process crawl results to find nested URLs and create child operations + processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0) + + # Count total URLs crawled (including sub-URLs) for progress message + totalUrlsCrawled = self._countUrlsInResults(processedResults) + # Ensure it's a list of results - if isinstance(crawlData, list): - crawlResults.extend(crawlData) - elif isinstance(crawlData, dict): - if "results" in crawlData: - crawlResults.extend(crawlData["results"]) - else: - crawlResults.append(crawlData) + if isinstance(processedResults, list): + crawlResults.extend(processedResults) + elif isinstance(processedResults, dict): + crawlResults.append(processedResults) else: - crawlResults.append({"url": url, "content": str(crawlData)}) + crawlResults.append({"url": url, "content": str(processedResults)}) + + if urlOperationId: + if totalUrlsCrawled > 1: + self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)") + else: + self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed") + self.services.chat.progressLogFinish(urlOperationId, True) except Exception as e: logger.error(f"Error crawling URL {url}: {str(e)}") @@ -405,4 +499,145 @@ Return ONLY valid JSON, no additional text: crawlResults.append({"url": url, "error": str(e)}) return crawlResults + + def _processCrawlResultsWithHierarchy( + self, + crawlData: Any, + parentUrl: str, + parentOperationId: Optional[str], + maxDepth: int, + currentDepth: int + ) -> List[Dict[str, Any]]: + """ + Recursively process crawl results to create hierarchical progress logging for sub-URLs. + + Args: + crawlData: Crawl result data (dict, list, or other) + parentUrl: Parent URL being crawled + parentOperationId: Parent operation ID for hierarchical logging + maxDepth: Maximum crawl depth + currentDepth: Current depth in the crawl tree + + Returns: + List of processed crawl results + """ + import time + results = [] + + # Handle list of results + if isinstance(crawlData, list): + for idx, item in enumerate(crawlData): + if isinstance(item, dict): + # Check if this item has sub-URLs or nested results + itemUrl = item.get("url") or item.get("source") or parentUrl + + # Create child operation for sub-URL if we're not at max depth + if currentDepth < maxDepth and parentOperationId: + # Check if this item has nested results or children + hasNestedResults = "results" in item or "children" in item or "subUrls" in item + + if hasNestedResults or (itemUrl != parentUrl and currentDepth > 0): + # This is a sub-URL - create child operation + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + subUrlOperationId = f"{parentOperationId}_sub_{idx}_{int(time.time())}" + self.services.chat.progressLogStart( + subUrlOperationId, + "Crawling Sub-URL", + f"Depth {currentDepth + 1}", + itemUrl[:50] + "..." if len(itemUrl) > 50 else itemUrl, + parentOperationId=parentOperationId + ) + + try: + # Process nested results recursively + if "results" in item: + nestedResults = self._processCrawlResultsWithHierarchy( + item["results"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1 + ) + item["results"] = nestedResults + elif "children" in item: + nestedResults = self._processCrawlResultsWithHierarchy( + item["children"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1 + ) + item["children"] = nestedResults + elif "subUrls" in item: + nestedResults = self._processCrawlResultsWithHierarchy( + item["subUrls"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1 + ) + item["subUrls"] = nestedResults + + self.services.chat.progressLogUpdate(subUrlOperationId, 0.9, "Completed") + self.services.chat.progressLogFinish(subUrlOperationId, True) + except Exception as e: + logger.error(f"Error processing sub-URL {itemUrl}: {str(e)}") + if subUrlOperationId: + self.services.chat.progressLogFinish(subUrlOperationId, False) + + results.append(item) + else: + results.append(item) + + # Handle dict with results array + elif isinstance(crawlData, dict): + if "results" in crawlData: + # Process nested results + nestedResults = self._processCrawlResultsWithHierarchy( + crawlData["results"], parentUrl, parentOperationId, maxDepth, currentDepth + ) + crawlData["results"] = nestedResults + results.append(crawlData) + elif "children" in crawlData: + # Process children + nestedResults = self._processCrawlResultsWithHierarchy( + crawlData["children"], parentUrl, parentOperationId, maxDepth, currentDepth + ) + crawlData["children"] = nestedResults + results.append(crawlData) + elif "subUrls" in crawlData: + # Process sub-URLs + nestedResults = self._processCrawlResultsWithHierarchy( + crawlData["subUrls"], parentUrl, parentOperationId, maxDepth, currentDepth + ) + crawlData["subUrls"] = nestedResults + results.append(crawlData) + else: + # Single result dict + results.append(crawlData) + else: + # Other types - wrap in dict + results.append({"url": parentUrl, "content": str(crawlData)}) + + return results + + def _countUrlsInResults(self, results: Any) -> int: + """ + Recursively count total URLs in crawl results (including nested sub-URLs). + + Args: + results: Crawl results (dict, list, or other) + + Returns: + Total count of URLs found + """ + count = 0 + + if isinstance(results, list): + for item in results: + count += self._countUrlsInResults(item) + elif isinstance(results, dict): + # Count this URL if it has a url field + if "url" in results or "source" in results: + count += 1 + # Recursively count nested results + if "results" in results: + count += self._countUrlsInResults(results["results"]) + if "children" in results: + count += self._countUrlsInResults(results["children"]) + if "subUrls" in results: + count += self._countUrlsInResults(results["subUrls"]) + elif isinstance(results, str): + # Single URL string + count = 1 + + return count diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py index bbc000ae..51207d62 100644 --- a/modules/shared/progressLogger.py +++ b/modules/shared/progressLogger.py @@ -24,7 +24,7 @@ class ProgressLogger: self.finishedOperations = set() # Track finished operations to avoid repeated warnings self.operationLogIds = {} # Map operationId to the log entry ID for parent reference - def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = "", parentId: Optional[str] = None): + def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = "", parentOperationId: Optional[str] = None): """Start a new long-running operation. Args: @@ -32,7 +32,8 @@ class ProgressLogger: serviceName: Name of the service (e.g., "Extract", "AI", "Generate") actionName: Name of the action being performed context: Additional context information - parentId: Optional parent log entry ID for hierarchical display + parentOperationId: Optional parent operation ID (operationId of parent operation) for hierarchical display + The parentId in ChatLog will be set to this parentOperationId """ # Remove from finished operations if it was there (for restart scenarios) self.finishedOperations.discard(operationId) @@ -42,9 +43,10 @@ class ProgressLogger: 'action': actionName, 'context': context, 'startTime': time.time(), - 'parentId': parentId + 'parentOperationId': parentOperationId # Store parent's operationId, not log entry ID } - logId = self._logProgress(operationId, 0.0, f"Starting {actionName}", parentId=parentId) + # Use parentOperationId as parentId in ChatLog (parentId should be the operationId of parent) + logId = self._logProgress(operationId, 0.0, f"Starting {actionName}", parentOperationId=parentOperationId) if logId: self.operationLogIds[operationId] = logId logger.debug(f"Started operation {operationId}: {serviceName} - {actionName}") @@ -70,9 +72,9 @@ class ProgressLogger: op = self.activeOperations[operationId] context = f"{op['context']} {statusUpdate}".strip() - # Use the same parentId as the start operation - all logs (start/update/finish) share the same parent - parentId = op.get('parentId') - self._logProgress(operationId, progress, context, parentId=parentId) + # Use the same parentOperationId as the start operation - all logs (start/update/finish) share the same parent + parentOperationId = op.get('parentOperationId') + self._logProgress(operationId, progress, context, parentOperationId=parentOperationId) logger.debug(f"Updated operation {operationId}: {progress:.2f} - {context}") def finishOperation(self, operationId: str, success: bool = True): @@ -93,11 +95,11 @@ class ProgressLogger: finalProgress = 1.0 if success else 0.0 status = "Done" if success else "Failed" - # Use the same parentId as the start operation - all logs (start/update/finish) share the same parent - parentId = op.get('parentId') + # Use the same parentOperationId as the start operation - all logs (start/update/finish) share the same parent + parentOperationId = op.get('parentOperationId') # Create completion log BEFORE removing from activeOperations - self._logProgress(operationId, finalProgress, status, parentId=parentId) + self._logProgress(operationId, finalProgress, status, parentOperationId=parentOperationId) # Log completion time duration = time.time() - op['startTime'] @@ -111,14 +113,15 @@ class ProgressLogger: # Mark as finished to prevent repeated warnings from updateOperation calls self.finishedOperations.add(operationId) - def _logProgress(self, operationId: str, progress: float, status: str, parentId: Optional[str] = None) -> Optional[str]: + def _logProgress(self, operationId: str, progress: float, status: str, parentOperationId: Optional[str] = None) -> Optional[str]: """Create standardized ChatLog entry. Args: operationId: Unique identifier for the operation progress: Progress value between 0.0 and 1.0 status: Status information for the log entry - parentId: Optional parent log entry ID for hierarchical display + parentOperationId: Optional parent operation ID (operationId of parent operation) for hierarchical display + This will be set as parentId in ChatLog (parentId = operationId of parent) Returns: The created log entry ID, or None if creation failed @@ -134,6 +137,7 @@ class ProgressLogger: logger.warning(f"Cannot log progress: no workflow available") return None + # parentId in ChatLog should be the operationId of the parent operation, not the log entry ID logData = { "workflowId": workflow.id, "message": message, @@ -141,7 +145,7 @@ class ProgressLogger: "status": status, "progress": progress, "operationId": operationId, - "parentId": parentId + "parentId": parentOperationId # Set to parent's operationId, not log entry ID } try: diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py index f3e6aed6..eee848f7 100644 --- a/modules/workflows/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -130,8 +130,9 @@ class MethodAi(MethodBase): processDocumentsIndividually=True ) - # Extract content using extraction service - extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions) + # Extract content using extraction service with hierarchical progress logging + # Pass operationId for per-document progress tracking + extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId) # Combine all ContentParts from all extracted results contentParts = [] diff --git a/modules/workflows/methods/methodContext.py b/modules/workflows/methods/methodContext.py index 604093bb..8bd16f9b 100644 --- a/modules/workflows/methods/methodContext.py +++ b/modules/workflows/methods/methodContext.py @@ -297,13 +297,11 @@ class MethodContext(MethodBase): processDocumentsIndividually=True ) - # Get parent log ID for document-level operations - parentLogId = self.services.chat.getOperationLogId(operationId) - - # Call extraction service + # Call extraction service with hierarchical progress logging self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") self.services.chat.progressLogUpdate(operationId, 0.5, f"Extracting content from {len(chatDocuments)} documents") - extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions) + # Pass operationId for hierarchical per-document progress logging + extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId) # Build ActionDocuments from ContentExtracted results self.services.chat.progressLogUpdate(operationId, 0.8, "Building result documents") diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py index 04d02830..033b5283 100644 --- a/modules/workflows/methods/methodOutlook.py +++ b/modules/workflows/methods/methodOutlook.py @@ -326,7 +326,21 @@ class MethodOutlook(MethodBase): - filter (str, optional): Sender, query operators, or subject text. - outputMimeType (str, optional): MIME type for output file. Options: "application/json" (default), "text/plain", "text/csv". Default: "application/json". """ + import time + operationId = None try: + # Init progress logger + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + operationId = f"outlook_read_{workflowId}_{int(time.time())}" + + # Start progress tracking + self.services.chat.progressLogStart( + operationId, + "Read Emails", + "Outlook Email Reading", + f"Folder: {parameters.get('folder', 'Inbox')}" + ) + connectionReference = parameters.get("connectionReference") folder = parameters.get("folder", "Inbox") limit = parameters.get("limit", 10) @@ -334,8 +348,12 @@ class MethodOutlook(MethodBase): outputMimeType = parameters.get("outputMimeType", "application/json") if not connectionReference: + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="Connection reference is required") + self.services.chat.progressLogUpdate(operationId, 0.2, "Validating parameters") + # Validate limit parameter if limit <= 0: limit = 1000 @@ -351,11 +369,14 @@ class MethodOutlook(MethodBase): # Get Microsoft connection + self.services.chat.progressLogUpdate(operationId, 0.3, "Getting Microsoft connection") connection = self._getMicrosoftConnection(connectionReference) if not connection: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference") # Read emails using Microsoft Graph API + self.services.chat.progressLogUpdate(operationId, 0.4, "Reading emails from Microsoft Graph API") try: # Microsoft Graph API endpoint for messages graph_url = "https://graph.microsoft.com/v1.0" @@ -387,6 +408,11 @@ class MethodOutlook(MethodBase): # If using $search, remove $orderby as they can't be combined if "$search" in params: params.pop("$orderby", None) + + # If using $filter with contains(), remove $orderby as they can't be combined + # Microsoft Graph API doesn't support contains() with orderby + if "$filter" in params and "contains(" in params["$filter"].lower(): + params.pop("$orderby", None) # Filter applied @@ -403,6 +429,7 @@ class MethodOutlook(MethodBase): response.raise_for_status() + self.services.chat.progressLogUpdate(operationId, 0.7, "Processing email data") emails_data = response.json() email_data = { "emails": emails_data.get("value", []), @@ -420,22 +447,34 @@ class MethodOutlook(MethodBase): except ImportError: logger.error("requests module not available") + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="requests module not available") except requests.exceptions.HTTPError as e: if e.response.status_code == 400: logger.error(f"Bad Request (400) - Invalid filter or parameter: {e.response.text}") + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error=f"Invalid filter syntax. Please check your filter parameter. Error: {e.response.text}") elif e.response.status_code == 401: logger.error("Unauthorized (401) - Access token may be expired or invalid") + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="Authentication failed. Please check your connection and try again.") elif e.response.status_code == 403: logger.error("Forbidden (403) - Insufficient permissions to access emails") + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="Insufficient permissions to read emails from this folder.") else: logger.error(f"HTTP Error {e.response.status_code}: {e.response.text}") + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error=f"HTTP Error {e.response.status_code}: {e.response.text}") except Exception as e: logger.error(f"Error reading emails from Microsoft Graph API: {str(e)}") + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error=f"Failed to read emails: {str(e)}") # Determine output format based on MIME type @@ -475,6 +514,9 @@ class MethodOutlook(MethodBase): "outputMimeType": outputMimeType } + self.services.chat.progressLogUpdate(operationId, 0.9, f"Found {email_data.get('count', 0)} emails") + self.services.chat.progressLogFinish(operationId, True) + return ActionResult.isSuccess( documents=[ActionDocument( documentName=f"outlook_emails_{self._format_timestamp_for_filename()}.json", @@ -486,6 +528,11 @@ class MethodOutlook(MethodBase): except Exception as e: logger.error(f"Error reading emails: {str(e)}") + if operationId: + try: + self.services.chat.progressLogFinish(operationId, False) + except: + pass # Don't fail on progress logging errors return ActionResult.isFailure( error=str(e) ) @@ -1491,14 +1538,32 @@ Return JSON: - connectionReference (str, required): Microsoft connection label. - documentList (list, required): Document reference(s) to draft emails in JSON format (outputs from outlook.composeAndDraftEmailWithContext function). """ + import time + operationId = None try: + # Init progress logger + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + operationId = f"outlook_send_{workflowId}_{int(time.time())}" + + # Start progress tracking + self.services.chat.progressLogStart( + operationId, + "Send Draft Email", + "Outlook Email Sending", + f"Processing {len(parameters.get('documentList', []))} draft(s)" + ) + connectionReference = parameters.get("connectionReference") documentList = parameters.get("documentList", []) if not connectionReference: + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="Connection reference is required") if not documentList: + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="documentList is required and cannot be empty") # Convert single value to list if needed @@ -1506,16 +1571,21 @@ Return JSON: documentList = [documentList] # Get Microsoft connection + self.services.chat.progressLogUpdate(operationId, 0.2, "Getting Microsoft connection") connection = self._getMicrosoftConnection(connectionReference) if not connection: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference") # Check permissions + self.services.chat.progressLogUpdate(operationId, 0.3, "Checking permissions") permissions_ok = await self._checkPermissions(connection) if not permissions_ok: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="Connection lacks necessary permissions for Outlook operations") # Read draft email JSON documents from documentList + self.services.chat.progressLogUpdate(operationId, 0.4, "Reading draft email documents") draftEmails = [] for docRef in documentList: try: @@ -1586,8 +1656,11 @@ Return JSON: continue if not draftEmails: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="No valid draft email JSON documents found in documentList") + self.services.chat.progressLogUpdate(operationId, 0.6, f"Found {len(draftEmails)} draft email(s) to send") + # Send all draft emails graph_url = "https://graph.microsoft.com/v1.0" headers = { @@ -1598,7 +1671,8 @@ Return JSON: sentResults = [] failedResults = [] - for draftEmail in draftEmails: + self.services.chat.progressLogUpdate(operationId, 0.7, "Sending emails") + for idx, draftEmail in enumerate(draftEmails): draftEmailJson = draftEmail["draftEmailJson"] draftId = draftEmail["draftId"] sourceDocument = draftEmail["sourceDocument"] @@ -1628,6 +1702,7 @@ Return JSON: "sourceDocument": sourceDocument }) logger.info(f"Email sent successfully. Draft ID: {draftId}, Subject: {subject}") + self.services.chat.progressLogUpdate(operationId, 0.7 + (idx + 1) * 0.2 / len(draftEmails), f"Sent {idx + 1}/{len(draftEmails)}: {subject}") else: errorResult = { "status": "error", @@ -1674,7 +1749,9 @@ Return JSON: } # Determine overall success status + self.services.chat.progressLogUpdate(operationId, 0.9, f"Sent {successfulEmails}/{totalEmails} email(s)") if successfulEmails == 0: + self.services.chat.progressLogFinish(operationId, False) validationMetadata = { "actionType": "outlook.sendDraftEmail", "connectionReference": connectionReference, @@ -1703,6 +1780,7 @@ Return JSON: "failedEmails": failedEmails, "status": "partial_success" } + self.services.chat.progressLogFinish(operationId, True) return ActionResult( success=True, documents=[ActionDocument( @@ -1723,6 +1801,7 @@ Return JSON: "failedEmails": failedEmails, "status": "all_successful" } + self.services.chat.progressLogFinish(operationId, True) return ActionResult( success=True, documents=[ActionDocument( diff --git a/modules/workflows/methods/methodSharepoint.py b/modules/workflows/methods/methodSharepoint.py index a433e04c..da3db26b 100644 --- a/modules/workflows/methods/methodSharepoint.py +++ b/modules/workflows/methods/methodSharepoint.py @@ -1120,7 +1120,21 @@ class MethodSharepoint(MethodBase): - documentData: Base64-encoded content (binary files) or plain text (text files) - mimeType: MIME type (e.g., application/pdf, text/plain) """ + import time + operationId = None try: + # Init progress logger + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + operationId = f"sharepoint_read_{workflowId}_{int(time.time())}" + + # Start progress tracking + self.services.chat.progressLogStart( + operationId, + "Read Documents", + "SharePoint Document Reading", + f"Path: {parameters.get('pathQuery', parameters.get('pathObject', '*'))}" + ) + documentList = parameters.get("documentList") if isinstance(documentList, str): documentList = [documentList] @@ -1131,11 +1145,16 @@ class MethodSharepoint(MethodBase): # Validate connection reference if not connectionReference: + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="Connection reference is required") # Get connection first - needed for both pathObject and documentList approaches + self.services.chat.progressLogUpdate(operationId, 0.2, "Getting Microsoft connection") connection = self._getMicrosoftConnection(connectionReference) if not connection: + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference") # If pathObject is provided, extract SharePoint file IDs and read them directly @@ -1150,6 +1169,8 @@ class MethodSharepoint(MethodBase): from modules.datamodels.datamodelDocref import DocumentReferenceList pathObjectDocuments = self.services.chat.getChatDocumentsFromDocumentList(DocumentReferenceList.from_string_list([pathObject])) if not pathObjectDocuments or len(pathObjectDocuments) == 0: + if operationId: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error=f"No document list found for reference: {pathObject}") # Get the first document's content (which should be the JSON from findDocumentPath) @@ -1267,8 +1288,10 @@ class MethodSharepoint(MethodBase): readResults = [] siteId = sites[0]['id'] - for fileId in sharePointFileIds: + self.services.chat.progressLogUpdate(operationId, 0.5, f"Reading {len(sharePointFileIds)} file(s) from SharePoint") + for idx, fileId in enumerate(sharePointFileIds): try: + self.services.chat.progressLogUpdate(operationId, 0.5 + (idx * 0.3 / len(sharePointFileIds)), f"Reading file {idx + 1}/{len(sharePointFileIds)}") # Get file info from SharePoint endpoint = f"sites/{siteId}/drive/items/{fileId}" fileInfo = await self._makeGraphApiCall(endpoint) @@ -1314,11 +1337,13 @@ class MethodSharepoint(MethodBase): continue if not readResults: + self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="No files could be read from pathObject") # Convert read results to ActionDocument objects # IMPORTANT: For binary files (PDFs), store Base64-encoded content directly in documentData # The system will create FileData and ChatDocument automatically + self.services.chat.progressLogUpdate(operationId, 0.8, f"Processing {len(readResults)} document(s)") from modules.datamodels.datamodelChat import ActionDocument import base64 @@ -1413,6 +1438,8 @@ class MethodSharepoint(MethodBase): actionDocuments.append(actionDoc) # Return success with action documents + self.services.chat.progressLogUpdate(operationId, 0.9, f"Read {len(actionDocuments)} document(s)") + self.services.chat.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=actionDocuments) # Fallback: Use documentList parameter (for backward compatibility) @@ -1643,6 +1670,11 @@ class MethodSharepoint(MethodBase): ) except Exception as e: logger.error(f"Error reading SharePoint documents: {str(e)}") + if operationId: + try: + self.services.chat.progressLogFinish(operationId, False) + except: + pass # Don't fail on progress logging errors return ActionResult( success=False, error=str(e) diff --git a/modules/workflows/processing/adaptive/contentValidator.py b/modules/workflows/processing/adaptive/contentValidator.py index b065b912..ebb433da 100644 --- a/modules/workflows/processing/adaptive/contentValidator.py +++ b/modules/workflows/processing/adaptive/contentValidator.py @@ -139,14 +139,11 @@ class ContentValidator: "statistics": {} } - # Extract metadata + # Extract metadata - include ALL metadata fields (generic for all action types) metadata = jsonData.get("metadata", {}) - if metadata: - summary["metadata"] = { - "title": metadata.get("title"), - "split_strategy": metadata.get("split_strategy"), - "extraction_method": metadata.get("extraction_method") - } + if metadata and isinstance(metadata, dict): + # Include all metadata fields, not just specific ones + summary["metadata"] = dict(metadata) # Extract documents array (if present) documents = jsonData.get("documents", []) @@ -195,6 +192,17 @@ class ContentValidator: text = textElement.get("text", "") if text: sectionSummary["textPreview"] = text[:100] + ("..." if len(text) > 100 else "") + # Also check for textPreview directly in section (for web crawl results) + if section.get("textPreview"): + sectionSummary["textPreview"] = section.get("textPreview") + + # Include any additional fields from section (generic approach) + # This ensures all action-specific fields are preserved + for key, value in section.items(): + if key not in sectionSummary and key not in ["elements"]: # Skip elements as they're processed separately + # Include simple types (str, int, float, bool, list of primitives) + if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10): + sectionSummary[key] = value summary["sections"].append(sectionSummary) else: @@ -206,7 +214,8 @@ class ContentValidator: sectionSummary = { "id": section.get("id"), "content_type": section.get("content_type"), - "title": section.get("title") + "title": section.get("title"), + "order": section.get("order") } if section.get("content_type") == "table": @@ -220,8 +229,21 @@ class ContentValidator: sectionSummary["rowCount"] = len(rows) sectionSummary["headers"] = headers + # Include any additional fields from section (generic approach) + for key, value in section.items(): + if key not in sectionSummary and key not in ["elements"]: # Skip elements as they're processed separately + # Include simple types (str, int, float, bool, list of primitives) + if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10): + sectionSummary[key] = value + summary["sections"].append(sectionSummary) + # Extract statistics from root level (generic - include all statistics fields) + rootStatistics = jsonData.get("statistics", {}) + if rootStatistics and isinstance(rootStatistics, dict): + # Merge root statistics into summary statistics + summary["statistics"].update(rootStatistics) + return summary except Exception as e: diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py index e6e3b580..597e4096 100644 --- a/modules/workflows/processing/core/taskPlanner.py +++ b/modules/workflows/processing/core/taskPlanner.py @@ -28,11 +28,21 @@ class TaskPlanner: logger.info(f"=== STARTING TASK PLAN GENERATION ===") logger.info(f"Workflow ID: {workflow.id}") - logger.info(f"User Input: {userInput}") + # Log normalized request instead of raw user input for security + normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) if self.services else None + if normalizedPrompt: + logger.info(f"Normalized Request: {normalizedPrompt}") + else: + logger.info(f"Normalized Request: {userInput}") - # Use stored user prompt if available, otherwise use the input - actualUserPrompt = self.services.currentUserPrompt if self.services and hasattr(self.services, 'currentUserPrompt') and self.services.currentUserPrompt else userInput - logger.info(f"Actual User Prompt: {actualUserPrompt}") + # Use normalized request if available, otherwise fallback to currentUserPrompt, then userInput + actualUserPrompt = None + if self.services and hasattr(self.services, 'currentUserPromptNormalized') and self.services.currentUserPromptNormalized: + actualUserPrompt = self.services.currentUserPromptNormalized + elif self.services and hasattr(self.services, 'currentUserPrompt') and self.services.currentUserPrompt: + actualUserPrompt = self.services.currentUserPrompt + else: + actualUserPrompt = userInput # Check workflow status before calling AI service checkWorkflowStopped(self.services) diff --git a/modules/workflows/processing/modes/modeDynamic.py b/modules/workflows/processing/modes/modeDynamic.py index 5cc8b866..592b5235 100644 --- a/modules/workflows/processing/modes/modeDynamic.py +++ b/modules/workflows/processing/modes/modeDynamic.py @@ -96,6 +96,10 @@ class DynamicMode(BaseMode): # NEW: Reset progress tracking for new task self.progressTracker.reset() + # Initialize executed actions tracking for this task + if not hasattr(context, 'executedActions') or context.executedActions is None: + context.executedActions = [] + # Update workflow object before executing task self._updateWorkflowBeforeExecutingTask(taskIndex) @@ -104,7 +108,8 @@ class DynamicMode(BaseMode): state = TaskExecutionState(taskStep) # Dynamic mode uses max_steps instead of max_retries - state.max_steps = max(1, int(getattr(workflow, 'maxSteps', 10))) + # maxSteps is set in workflowManager.py when workflow is created + state.max_steps = int(getattr(workflow, 'maxSteps', 1)) logger.info(f"Using Dynamic mode execution with max_steps: {state.max_steps}") step = 1 @@ -128,6 +133,19 @@ class DynamicMode(BaseMode): observation = self._observeBuild(result) # Note: resultLabel is already set correctly in _observeBuild from actionResult.resultLabel + # Store executed action in context for action history + if not hasattr(context, 'executedActions') or context.executedActions is None: + context.executedActions = [] + actionName = selection.get('action', 'unknown') + actionParameters = selection.get('parameters', {}) or {} + # Filter out documentList for clarity in history + relevantParams = {k: v for k, v in actionParameters.items() if k not in ['documentList', 'connections']} + context.executedActions.append({ + 'action': actionName, + 'parameters': relevantParams, + 'step': step + }) + # Content validation (against original cleaned user prompt / workflow intent) if getattr(self, 'workflowIntent', None) and result.documents: # Pass ALL documents to validator - validator decides what to validate (generic approach) @@ -883,9 +901,20 @@ class DynamicMode(BaseMode): elif progressState['nextActionsSuggested']: enhancedReviewContent += f"Next Action Suggestions: {', '.join(progressState['nextActionsSuggested'])}\n" - # NEW: Add action history to review content + # NEW: Add action history to review content - use all executed actions + actionHistory = [] + + # First, add all executed actions from the current task + if hasattr(context, 'executedActions') and context.executedActions: + for executedAction in context.executedActions: + action = executedAction.get('action', 'unknown') + params = executedAction.get('parameters', {}) or {} + paramsStr = json.dumps(params, ensure_ascii=False) if params else "{}" + step = executedAction.get('step', 0) + actionHistory.append(f"Step {step}: {action} {paramsStr}") + + # Also include refinement decisions for completeness (these show what was planned) if hasattr(context, 'previousReviewResult') and context.previousReviewResult: - actionHistory = [] for i, prevDecision in enumerate(context.previousReviewResult, 1): if prevDecision and hasattr(prevDecision, 'nextAction') and prevDecision.nextAction: action = prevDecision.nextAction @@ -895,21 +924,27 @@ class DynamicMode(BaseMode): paramsStr = json.dumps(relevantParams, ensure_ascii=False) if relevantParams else "{}" quality = getattr(prevDecision, 'qualityScore', None) qualityStr = f" (quality: {quality:.2f})" if quality is not None else "" - actionHistory.append(f"Round {i}: {action} {paramsStr}{qualityStr}") - - if actionHistory: - enhancedReviewContent += f"\nACTION HISTORY:\n" - enhancedReviewContent += "\n".join(f"- {entry}" for entry in actionHistory) - # Detect repeated actions - actionCounts = {} - for entry in actionHistory: - # Extract action name (before first space or {) - actionName = entry.split()[1] if len(entry.split()) > 1 else "unknown" + # Only add if not already in executedActions (avoid duplicates) + actionEntry = f"Refinement {i}: {action} {paramsStr}{qualityStr}" + if actionEntry not in actionHistory: + actionHistory.append(actionEntry) + + if actionHistory: + enhancedReviewContent += f"\nACTION HISTORY:\n" + enhancedReviewContent += "\n".join(f"- {entry}" for entry in actionHistory) + # Detect repeated actions + actionCounts = {} + for entry in actionHistory: + # Extract action name (after first space, before next space or {) + parts = entry.split() + if len(parts) > 1: + # Skip "Step", "Refinement" prefixes and get the action name + actionName = parts[1] if parts[0] in ['Step', 'Refinement'] else parts[0] actionCounts[actionName] = actionCounts.get(actionName, 0) + 1 - - repeatedActions = [action for action, count in actionCounts.items() if count >= 2] - if repeatedActions: - enhancedReviewContent += f"\nWARNING: Repeated actions detected: {', '.join(repeatedActions)}. Consider a fundamentally different approach.\n" + + repeatedActions = [action for action, count in actionCounts.items() if count >= 2] + if repeatedActions: + enhancedReviewContent += f"\nWARNING: Repeated actions detected: {', '.join(repeatedActions)}. Consider a fundamentally different approach.\n" # Update placeholders with enhanced review content placeholders["REVIEW_CONTENT"] = enhancedReviewContent diff --git a/modules/workflows/processing/shared/executionState.py b/modules/workflows/processing/shared/executionState.py index fd1299cf..2db6a3f3 100644 --- a/modules/workflows/processing/shared/executionState.py +++ b/modules/workflows/processing/shared/executionState.py @@ -19,7 +19,7 @@ class TaskExecutionState: self.max_retries = 3 # Iterative loop (dynamic mode) self.current_step = 0 - self.max_steps = 5 + self.max_steps = 0 # Will be overridden by workflow.maxSteps from workflowManager.py def addSuccessfulAction(self, action_result: ActionResult): """Add a successful action to the state""" @@ -56,7 +56,7 @@ class TaskExecutionState: patterns.append("permission_issues") return list(set(patterns)) -def shouldContinue(observation: Optional[Observation], review=None, current_step: int = 0, max_steps: int = 5) -> bool: +def shouldContinue(observation: Optional[Observation], review=None, current_step: int = 0, max_steps: int = 1) -> bool: """Helper to decide if the iterative loop should continue Args: diff --git a/modules/workflows/processing/shared/promptGenerationTaskplan.py b/modules/workflows/processing/shared/promptGenerationTaskplan.py index 56ee8771..27b5c5dc 100644 --- a/modules/workflows/processing/shared/promptGenerationTaskplan.py +++ b/modules/workflows/processing/shared/promptGenerationTaskplan.py @@ -61,6 +61,7 @@ Break down user requests into logical, executable task steps. ## 📋 Context ### User Request +The following is the user's normalized request: {{KEY:USER_PROMPT}} ### Workflow Intent diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index b4021726..403bf008 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -64,21 +64,30 @@ class WorkflowProcessor: logger.info(f"=== STARTING TASK PLAN GENERATION ===") logger.info(f"Using user language: {self.services.currentUserLanguage}") logger.info(f"Workflow ID: {workflow.id}") - logger.info(f"User Input: {userInput}") + # Log normalized request instead of raw user input for security + normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput + logger.info(f"Normalized Request: {normalizedPrompt}") modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode logger.info(f"Workflow Mode: {modeValue}") # Update progress - generating task plan self.services.chat.progressLogUpdate(operationId, 0.3, "Analyzing input") + # Use normalized request instead of raw userInput for security + normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput + # Delegate to the appropriate mode - taskPlan = await self.mode.generateTaskPlan(userInput, workflow) + taskPlan = await self.mode.generateTaskPlan(normalizedPrompt, workflow) # Update progress - creating task plan message self.services.chat.progressLogUpdate(operationId, 0.8, "Creating plan") - # Create task plan message - await self.mode.createTaskPlanMessage(taskPlan, workflow) + # Create task plan message only if there are 2+ tasks + # Single-task workflows don't need a task plan message + if taskPlan.tasks and len(taskPlan.tasks) >= 2: + await self.mode.createTaskPlanMessage(taskPlan, workflow) + else: + logger.info(f"Skipping task plan message creation - only {len(taskPlan.tasks) if taskPlan.tasks else 0} task(s)") # Complete progress tracking self.services.chat.progressLogFinish(operationId, True) @@ -315,12 +324,16 @@ class WorkflowProcessor: # Fast Path Implementation - async def detectComplexity(self, prompt: str, documents: Optional[List[ChatDocument]] = None) -> str: + async def detectComplexity(self, prompt: str, documents: Optional[List[ChatDocument]] = None) -> tuple[str, bool, Optional[str]]: """ Detect request complexity using AI-based semantic understanding. + Also detects user language for fast path responses. Returns: - "simple" | "moderate" | "complex" + Tuple of (complexity: str, needsWorkflowHistory: bool, detectedLanguage: Optional[str]) + complexity: "simple" | "moderate" | "complex" + needsWorkflowHistory: True if request needs previous workflow rounds/history + detectedLanguage: ISO 639-1 language code (e.g., "de", "en") or None Simple: Single question, no documents, straightforward answer (5-15s) Moderate: Multiple steps, some documents, structured response (30-60s) @@ -330,38 +343,47 @@ class WorkflowProcessor: # Ensure AI service is initialized await self.services.ai.ensureAiObjectsInitialized() - # Build complexity detection prompt (language-agnostic, semantic) + # Build complexity detection prompt (includes language detection) + # JSON template comes BEFORE user input for security complexityPrompt = ( - "You are a complexity analyzer. Analyze the user's request and determine its complexity level.\n\n" + "You are a complexity analyzer. Analyze the user's request and determine its complexity level and language.\n\n" "Consider:\n" "- Number of distinct tasks or steps required\n" "- Amount and type of documents provided\n" "- Need for external research or web search\n" "- Need for document analysis or extraction\n" "- Need for content generation (reports, summaries, etc.)\n" - "- Need for multi-step reasoning or planning\n\n" + "- Need for multi-step reasoning or planning\n" + "- Need for previous workflow rounds/history (e.g., 'continue', 'retry', 'fix', 'improve', 'update', 'modify', 'based on previous', 'build on', references to earlier work)\n" + "- Language: Detect the ISO 639-1 language code (e.g., de, en, fr, it) from the user's request\n\n" "Complexity levels:\n" "- 'simple': Single question, no documents or minimal documents, straightforward answer that can be provided in one AI response (5-15s)\n" "- 'moderate': Multiple steps, some documents, structured response requiring some processing (30-60s)\n" "- 'complex': Multi-task workflow, many documents, research needed, content generation required, multi-step planning (60-120s)\n\n" - f"User request:\n{prompt}\n\n" + "Return ONLY a JSON object with this exact structure:\n" + "{\n" + ' "complexity": "simple" | "moderate" | "complex",\n' + ' "reasoning": "Brief explanation of why this complexity level",\n' + ' "needsWorkflowHistory": true|false,\n' + ' "detectedLanguage": "de|en|fr|it|..." (ISO 639-1 language code)\n' + "}\n\n" + "################ USER INPUT START #################\n" ) + # Add sanitized user input with clear delimiters + # Escape curly braces for f-string safety, but preserve format (no quote wrapping) + sanitizedPrompt = prompt.replace('{', '{{').replace('}', '}}') if prompt else "" + complexityPrompt += f"{sanitizedPrompt}\n" + + complexityPrompt += "################ USER INPUT FINISH #################\n\n" + if documents and len(documents) > 0: - complexityPrompt += f"\nDocuments provided: {len(documents)} document(s)\n" + complexityPrompt += f"Documents provided: {len(documents)} document(s)\n" # Add document types docTypes = [doc.mimeType for doc in documents if hasattr(doc, 'mimeType')] if docTypes: complexityPrompt += f"Document types: {', '.join(set(docTypes))}\n" - complexityPrompt += ( - "\nReturn ONLY a JSON object with this exact structure:\n" - "{\n" - ' "complexity": "simple" | "moderate" | "complex",\n' - ' "reasoning": "Brief explanation of why this complexity level"\n' - "}\n" - ) - # Call AI for complexity detection (planning call - no documents needed) aiResponse = await self.services.ai.callAiPlanning( prompt=complexityPrompt, @@ -371,6 +393,8 @@ class WorkflowProcessor: # Parse response complexity = "moderate" # Default fallback + needsWorkflowHistory = False # Default fallback + detectedLanguage = None # Default fallback try: # callAiPlanning returns a string directly, not an object responseContent = str(aiResponse) if aiResponse else "" @@ -384,19 +408,21 @@ class WorkflowProcessor: if jsonStr: parsed = json.loads(jsonStr) complexity = parsed.get("complexity", "moderate") + needsWorkflowHistory = parsed.get("needsWorkflowHistory", False) + detectedLanguage = parsed.get("detectedLanguage") or None reasoning = parsed.get("reasoning", "") - logger.info(f"Complexity detected: {complexity} - {reasoning}") + logger.info(f"Complexity detected: {complexity}, needsWorkflowHistory: {needsWorkflowHistory}, language: {detectedLanguage} - {reasoning}") else: logger.warning("Could not parse complexity detection response, defaulting to 'moderate'") except Exception as e: logger.warning(f"Error parsing complexity detection: {str(e)}, defaulting to 'moderate'") - return complexity + return (complexity, needsWorkflowHistory, detectedLanguage) except Exception as e: logger.error(f"Error in detectComplexity: {str(e)}") # Default to moderate on error (safe fallback) - return "moderate" + return ("moderate", False, None) async def fastPathExecute(self, prompt: str, documents: Optional[List[ChatDocument]] = None, userLanguage: Optional[str] = None) -> ActionResult: """ @@ -416,9 +442,12 @@ class WorkflowProcessor: await self.services.ai.ensureAiObjectsInitialized() # Build fast path prompt (understand + execute + deliver in one call) + # Clearly separate user prompt for security fastPathPrompt = ( "You are a helpful assistant. Answer the user's question directly and comprehensively.\n\n" - f"User question:\n{prompt}\n\n" + "## User Question\n" + "The following is the user's request:\n\n" + f"{prompt}\n\n" ) # Add user language context if available @@ -443,7 +472,7 @@ class WorkflowProcessor: ) # Call AI directly (no document generation - just plain text response) - # Use aiObjects.call() instead of callAiContent() to avoid document generation path + # Use callWithTextContext() for text-only calls aiRequest = AiCallRequest( prompt=fastPathPrompt, context="", @@ -451,7 +480,7 @@ class WorkflowProcessor: contentParts=None # Fast path doesn't process documents ) - aiCallResponse = await self.services.ai.aiObjects.call(aiRequest) + aiCallResponse = await self.services.ai.aiObjects.callWithTextContext(aiRequest) # Extract response content (AiCallResponse.content is a string) responseText = aiCallResponse.content if aiCallResponse.content else "" diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 7754a1c7..c1eb7c45 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -97,7 +97,7 @@ class WorkflowManager: "mandateId": self.services.user.mandateId, "messageIds": [], "workflowMode": workflowMode, - "maxSteps": 5 if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC else 1, # Set maxSteps for Dynamic mode + "maxSteps": 10 , # Set maxSteps } workflow = self.services.chat.createWorkflow(workflowData) @@ -160,6 +160,9 @@ class WorkflowManager: # Reset progress logger for new workflow self.services.chat._progressLogger = None + # Reset workflow history flag at start of each workflow + setattr(self.services, '_needsWorkflowHistory', False) + self.workflowProcessor = WorkflowProcessor(self.services) # Get workflow mode to determine if complexity detection is needed @@ -169,6 +172,8 @@ class WorkflowManager: if skipComplexityDetection: logger.info("Skipping complexity detection for AUTOMATION mode - using predefined plan") complexity = "moderate" # Default for automation workflows + needsWorkflowHistory = False # Automation workflows don't need history + detectedLanguage = None # No language detection in automation mode else: # Process user-uploaded documents from userInput for complexity detection # This is the correct way: use the input data directly, not workflow state @@ -180,24 +185,27 @@ class WorkflowManager: logger.warning(f"Failed to process user fileIds for complexity detection: {e}") # Detect complexity (AI-based semantic understanding) using user input documents - complexity = await self.workflowProcessor.detectComplexity(userInput.prompt, documents) - logger.info(f"Request complexity detected: {complexity}") + # Also detects language for fast path responses + complexity, needsWorkflowHistory, detectedLanguage = await self.workflowProcessor.detectComplexity(userInput.prompt, documents) + logger.info(f"Request complexity detected: {complexity}, needsWorkflowHistory: {needsWorkflowHistory}, language: {detectedLanguage}") + + # Set detected language for fast path (if detected) + if detectedLanguage and isinstance(detectedLanguage, str): + self._setUserLanguage(detectedLanguage) + try: + setattr(self.services, 'currentUserLanguage', detectedLanguage) + except Exception: + pass - # Now send the first message (which will also process the documents again, but that's fine) - await self._sendFirstMessage(userInput) - - # Check if workflow history is needed before deciding on fast path - # Use AI intention analysis result only (no keyword matching) - needsHistory = getattr(self.services, '_needsWorkflowHistory', False) - hasHistory = self._checkIfHistoryAvailable() - - # Route to fast path for simple requests (skip for automation mode and if history is needed) - if not skipComplexityDetection and complexity == "simple" and not (needsHistory and hasHistory): + # Route to fast path for simple requests if history is not needed + # Skip fast path for automation mode or if history is needed + if complexity == "simple" and not needsWorkflowHistory: logger.info("Routing to fast path for simple request") await self._executeFastPath(userInput, documents) return # Fast path completes the workflow - elif needsHistory and hasHistory: - logger.info(f"Skipping fast path - workflow history is needed (from AI intention analysis) and available") + + # Now send the first message (which will also process the documents again, but that's fine) + await self._sendFirstMessage(userInput) # Route to full workflow for moderate/complex requests or automation mode logger.info(f"Routing to full workflow for {complexity} request" + (" (automation mode)" if skipComplexityDetection else "")) @@ -222,9 +230,10 @@ class WorkflowManager: # Get user language if available userLanguage = getattr(self.services, 'currentUserLanguage', None) - # Execute fast path + # Execute fast path - use normalizedRequest if available, otherwise use raw prompt + normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt result = await self.workflowProcessor.fastPathExecute( - prompt=userInput.prompt, + prompt=normalizedPrompt, documents=documents, userLanguage=userLanguage ) @@ -279,12 +288,20 @@ class WorkflowManager: } chatDocuments.append(chatDoc) + # Mark workflow as completed BEFORE storing message (so UI polling stops) + workflow.status = "completed" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "completed", + "lastActivity": workflow.lastActivity + }) + # Create ChatMessage with fast path response (in user's language) messageData = { "workflowId": workflow.id, "role": "assistant", "message": responseText or "Fast path response completed", - "status": "last", # Fast path completes the workflow + "status": "last", # Fast path completes the workflow - UI polling stops on this "sequenceNr": len(workflow.messages) + 1, "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": "fast_path_response", @@ -301,14 +318,6 @@ class WorkflowManager: # Store message with documents self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments) - # Mark workflow as completed - workflow.status = "completed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "completed", - "lastActivity": workflow.lastActivity - }) - logger.info(f"Fast path completed successfully, response length: {len(responseText)} chars") except Exception as e: @@ -405,7 +414,11 @@ class WorkflowManager: " \"successCriteria\": [\"specific criterion 1\", \"specific criterion 2\"],\n" " \"needsWorkflowHistory\": true|false\n" "}\n\n" - f"User message:\n{self.services.utils.sanitizePromptContent(userInput.prompt, 'userinput')}" + "## User Message\n" + "The following is the user's original input message. Extract intent, normalize the request, and identify any large context blocks that should be moved to separate documents:\n\n" + "################ USER INPUT START #################\n" + f"{userInput.prompt.replace('{', '{{').replace('}', '}}') if userInput.prompt else ''}\n" + "################ USER INPUT FINISH #################" ) # Call AI analyzer (planning call - will use static parameters) @@ -446,8 +459,8 @@ class WorkflowManager: # Store needsWorkflowHistory in services for fast path decision needsHistoryFromIntention = parsed.get('needsWorkflowHistory', False) - if isinstance(needsHistoryFromIntention, bool): - setattr(self.services, '_needsWorkflowHistory', needsHistoryFromIntention) + # Always set the value - default to False if not a boolean + setattr(self.services, '_needsWorkflowHistory', bool(needsHistoryFromIntention) if isinstance(needsHistoryFromIntention, bool) else False) # Store workflowIntent in workflow object for reuse if hasattr(self.services, 'workflow') and self.services.workflow: @@ -455,6 +468,8 @@ class WorkflowManager: except Exception: contextItems = [] workflowIntent = None + # Ensure needsWorkflowHistory is False if parsing fails + setattr(self.services, '_needsWorkflowHistory', False) # Update services state if detectedLanguage and isinstance(detectedLanguage, str): @@ -531,7 +546,9 @@ class WorkflowManager: workflow = self.services.workflow handling = self.workflowProcessor # Generate task plan first (shared for both modes) - taskPlan = await handling.generateTaskPlan(userInput.prompt, workflow) + # Use normalizedRequest instead of raw userInput.prompt for security + normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt + taskPlan = await handling.generateTaskPlan(normalizedPrompt, workflow) if not taskPlan or not taskPlan.tasks: raise Exception("No tasks generated in task plan.") workflowMode = getattr(workflow, 'workflowMode')