From 62bea8c8aaba264960ca59527c2e6f0dffdd6a75 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Mon, 20 Oct 2025 09:52:02 +0200 Subject: [PATCH] updated document handling --- ...ementation_document_generation_one-path.md | 627 ++++++++++++++++++ ...> implementation_normalization_service.md} | 0 .../implementation_refactor_db-consistency.md | 206 ++++++ .../implementation_refactor_stats-unified.md | 239 +++++++ ...tation_workflow_processing_refactoring.md} | 0 5 files changed, 1072 insertions(+) create mode 100644 poweron/implementation/implementation_document_generation_one-path.md rename poweron/implementation/{implement_normalization_service.md => implementation_normalization_service.md} (100%) create mode 100644 poweron/implementation/implementation_refactor_db-consistency.md create mode 100644 poweron/implementation/implementation_refactor_stats-unified.md rename poweron/implementation/{implement_workflow_processing_refactoring.md => implementation_workflow_processing_refactoring.md} (100%) diff --git a/poweron/implementation/implementation_document_generation_one-path.md b/poweron/implementation/implementation_document_generation_one-path.md new file mode 100644 index 0000000..1ac50d2 --- /dev/null +++ b/poweron/implementation/implementation_document_generation_one-path.md @@ -0,0 +1,627 @@ +# Document Generation One-Path Refactoring Plan + +## Overview + +This document outlines the refactoring plan to unify the document generation system by eliminating the dual-path approach (single-file vs multi-file) and implementing a unified multi-file approach that handles both single and multiple document generation seamlessly. + +## Current State Analysis + +### Current Dual-Path Structure +- **Single File Path**: `_callAiWithSingleFileGeneration()` +- **Multi File Path**: `_callAiWithMultiFileGeneration()` +- **Code Duplication**: ~80% of functionality is duplicated +- **Maintenance Overhead**: Two separate code paths to maintain + +### Key Differences to Address +1. **Prompt Generation**: `getExtractionPrompt` vs `getAdaptiveExtractionPrompt` +2. **Result Structure**: Single object vs array structure +3. **Validation Logic**: Different validation rules for single vs multi-file +4. **Processing Pipeline**: Separate processing flows + +## Refactoring Goals + +1. **Unify Code Paths**: Single processing pipeline for all document generation +2. **Eliminate Duplication**: Remove ~200 lines of duplicate code +3. **Improve Maintainability**: Single code path to maintain and test +4. **Enhance Flexibility**: Dynamic switching between single/multi based on content +5. **Preserve Functionality**: Maintain all existing capabilities + +## Implementation Plan + +### Phase 1: Prompt Generation Unification + +#### 1.1 Modify `getAdaptiveExtractionPrompt` to Handle Single File +```python +async def getAdaptiveExtractionPrompt( + self, + outputFormat: str, + userPrompt: str, + title: str, + promptAnalysis: Dict[str, Any], + aiService: AiService +) -> str: + """ + Unified extraction prompt that handles both single and multi-file cases. + Hides multi-file specific parts when single file is requested. + """ + + # Base prompt structure + basePrompt = f""" + Generate a structured document in {outputFormat} format based on the user request. + + User Request: {userPrompt} + Title: {title} + """ + + # Add multi-file logic only if needed + if promptAnalysis.get("is_multi_file", False): + multiFileSection = f""" + + MULTI-FILE GENERATION: + - Split strategy: {promptAnalysis.get("strategy", "custom")} + - Split criteria: {promptAnalysis.get("criteria", "content-based")} + - File naming pattern: {promptAnalysis.get("file_naming_pattern", "document_{index}")} + + Return JSON structure: + {{ + "documents": [ + {{ + "id": "doc_1", + "title": "Document Title", + "filename": "document_1.{outputFormat}", + "sections": [...] + }} + ] + }} + """ + basePrompt += multiFileSection + else: + singleFileSection = f""" + + SINGLE-FILE GENERATION: + Return JSON structure: + {{ + "documents": [ + {{ + "id": "doc_1", + "title": "{title}", + "filename": "{title}.{outputFormat}", + "sections": [...] + }} + ] + }} + """ + basePrompt += singleFileSection + + # Add chunking support for large documents + chunkingSection = """ + + CHUNKING SUPPORT: + If the document is too large to generate in one response, include: + - "continue": true + - "continuation_context": { + "last_section_id": "section_id", + "last_element_index": 0, + "remaining_requirements": "description" + } + + The system will automatically request continuation chunks until complete. + """ + basePrompt += chunkingSection + + return basePrompt +``` + +#### 1.2 Remove `getExtractionPrompt` Method +- Delete the single-file specific prompt generation method +- Update all references to use `getAdaptiveExtractionPrompt` + +### Phase 2: Unified Processing Pipeline + +#### 2.1 Create Unified `callAiWithDocumentGeneration` Method +```python +async def callAiWithDocumentGeneration( + self, + prompt: str, + documents: Optional[List[ChatDocument]], + options: AiCallOptions, + outputFormat: str, + title: Optional[str] +) -> Dict[str, Any]: + """ + Unified document generation method that handles both single and multi-file cases. + Always uses multi-file approach internally. + """ + try: + # 1. Analyze prompt intent + promptAnalysis = await self._analyzePromptIntent(prompt, self) + logger.info(f"Prompt analysis result: {promptAnalysis}") + + # 2. Get unified extraction prompt + from modules.services.serviceGeneration.mainServiceGeneration import GenerationService + generationService = GenerationService(self.services) + + extractionPrompt = await generationService.getAdaptiveExtractionPrompt( + outputFormat=outputFormat, + userPrompt=prompt, + title=title, + promptAnalysis=promptAnalysis, + aiService=self + ) + + # 3. Process with unified pipeline (always multi-file approach) + aiResponse = await self._processDocumentsUnified( + documents, extractionPrompt, options, outputFormat, title, promptAnalysis + ) + + # 4. Return unified result structure + return self._buildUnifiedResult(aiResponse, outputFormat, title, promptAnalysis) + + except Exception as e: + logger.error(f"Error in unified document generation: {str(e)}") + return self._buildErrorResult(str(e), outputFormat, title) +``` + +#### 2.2 Create Unified Processing Method +```python +async def _processDocumentsUnified( + self, + documents: Optional[List[ChatDocument]], + extractionPrompt: str, + options: AiCallOptions, + outputFormat: str, + title: str, + promptAnalysis: Dict[str, Any] +) -> Dict[str, Any]: + """ + Unified document processing that handles both single and multi-file cases. + Always processes as multi-file structure internally. + """ + import time + + # Create progress logger + workflow = self.services.currentWorkflow + progressLogger = self.services.workflow.createProgressLogger(workflow) + operationId = f"docGenUnified_{workflow.id}_{int(time.time())}" + + try: + # Start progress tracking + progressLogger.startOperation( + operationId, + "Generate", + "Unified Document Generation", + f"Processing {len(documents) if documents else 0} documents" + ) + + # Update progress - generating extraction prompt + progressLogger.updateProgress(operationId, 0.1, "Generating prompt") + + # Process with unified JSON pipeline + aiResponse = await self.documentProcessor.processDocumentsPerChunkJsonWithPrompt( + documents, extractionPrompt, options + ) + + # Update progress - AI processing completed + progressLogger.updateProgress(operationId, 0.6, "Processing done") + + # Validate response structure + if not self._validateUnifiedResponseStructure(aiResponse): + raise Exception("AI response is not valid unified document structure") + + # Emit raw extracted data as a chat message attachment + try: + await self._postRawDataChatMessage(aiResponse, label="raw_extraction_unified") + except Exception: + logger.warning("Failed to emit raw extraction chat message (unified)") + + # Complete progress tracking + progressLogger.completeOperation(operationId, True) + + return aiResponse + + except Exception as e: + logger.error(f"Error in unified document processing: {str(e)}") + progressLogger.completeOperation(operationId, False) + raise +``` + +### Phase 3: Unified Validation System + +#### 3.1 Create Unified Validation Method +```python +def _validateUnifiedResponseStructure(self, response: Dict[str, Any]) -> bool: + """ + Unified validation that checks for multi-file structure. + Validates that response has documents array and each document has sections. + """ + try: + if not isinstance(response, dict): + logger.warning(f"Response validation failed: Response is not a dict, got {type(response)}") + return False + + # Check for documents array + hasDocuments = "documents" in response + isDocumentsList = isinstance(response.get("documents"), list) + + if not (hasDocuments and isDocumentsList): + logger.warning(f"Unified validation failed: documents key present={hasDocuments}, documents is list={isDocumentsList}") + logger.warning(f"Available keys: {list(response.keys())}") + return False + + documents = response.get("documents", []) + if not documents: + logger.warning("Unified validation failed: documents array is empty") + return False + + # Validate each document individually + validDocuments = 0 + for i, doc in enumerate(documents): + if self._validateDocumentStructure(doc, i): + validDocuments += 1 + else: + logger.warning(f"Document {i} failed validation, but continuing with others") + + # Process succeeds if at least one document is valid + if validDocuments == 0: + logger.error("Unified validation failed: no valid documents found") + return False + + logger.info(f"Unified validation passed: {validDocuments}/{len(documents)} documents valid") + return True + + except Exception as e: + logger.warning(f"Unified response validation failed with exception: {str(e)}") + return False + +def _validateDocumentStructure(self, document: Dict[str, Any], documentIndex: int) -> bool: + """ + Validate individual document structure. + Returns True if document is valid, False otherwise. + Does not fail the entire process if one document is invalid. + """ + try: + if not isinstance(document, dict): + logger.warning(f"Document {documentIndex} validation failed: not a dict") + return False + + # Check for required fields + hasTitle = "title" in document + hasSections = "sections" in document + isSectionsList = isinstance(document.get("sections"), list) + + if not (hasTitle and hasSections and isSectionsList): + logger.warning(f"Document {documentIndex} validation failed: title={hasTitle}, sections={hasSections}, sections_list={isSectionsList}") + return False + + sections = document.get("sections", []) + if not sections: + logger.warning(f"Document {documentIndex} validation failed: sections array is empty") + return False + + logger.info(f"Document {documentIndex} validation passed") + return True + + except Exception as e: + logger.warning(f"Document {documentIndex} validation failed with exception: {str(e)}") + return False +``` + +#### 3.2 Remove Old Validation Methods +- Delete `_validateResponseStructure` method +- Update all references to use `_validateUnifiedResponseStructure` + +### Phase 4: Unified Result Structure + +#### 4.1 Create Unified Result Builder +```python +def _buildUnifiedResult( + self, + aiResponse: Dict[str, Any], + outputFormat: str, + title: str, + promptAnalysis: Dict[str, Any] +) -> Dict[str, Any]: + """ + Build unified result structure that always returns array-based format. + Content is always a multi-document structure. + """ + try: + # Process all documents uniformly + generatedDocuments = [] + documents = aiResponse.get("documents", []) + + for i, docData in enumerate(documents): + try: + processedDocument = await self._processDocument( + docData, outputFormat, title, promptAnalysis, i + ) + generatedDocuments.append(processedDocument) + except Exception as e: + logger.warning(f"Failed to process document {i}: {str(e)}, skipping") + continue + + if not generatedDocuments: + raise Exception("No documents could be processed successfully") + + # Build unified result + result = { + "success": True, + "content": aiResponse, # Always multi-document structure + "documents": generatedDocuments, # Always array + "is_multi_file": len(generatedDocuments) > 1, + "format": outputFormat, + "title": title, + "split_strategy": promptAnalysis.get("strategy", "single"), + "total_documents": len(generatedDocuments), + "processed_documents": len(generatedDocuments) + } + + return result + + except Exception as e: + logger.error(f"Error building unified result: {str(e)}") + return self._buildErrorResult(str(e), outputFormat, title) + +async def _processDocument( + self, + docData: Dict[str, Any], + outputFormat: str, + title: str, + promptAnalysis: Dict[str, Any], + documentIndex: int +) -> Dict[str, Any]: + """ + Process individual document with content enhancement and rendering. + """ + try: + # Get generation service + from modules.services.serviceGeneration.mainServiceGeneration import GenerationService + generationService = GenerationService(self.services) + + # Use AI generation to enhance the extracted JSON before rendering + enhancedContent = docData # Default to original + if docData.get("sections"): + try: + # Get generation prompt + generationPrompt = await generationService.getGenerationPrompt( + outputFormat=outputFormat, + userPrompt=title, + title=docData.get("title", title), + aiService=self + ) + + # Prepare the AI call + from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType + requestOptions = AiCallOptions() + requestOptions.operationType = OperationType.GENERAL + + # Create context with the extracted JSON content + import json + context = f"Extracted JSON content:\n{json.dumps(docData, indent=2)}" + + request = AiCallRequest( + prompt=generationPrompt, + context=context, + options=requestOptions + ) + + # Call AI to enhance the content + response = await self.aiObjects.call(request) + + if response and response.content: + # Parse the AI response as JSON + try: + import re + result = response.content.strip() + + # Extract JSON from markdown if present + jsonMatch = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL) + if jsonMatch: + result = jsonMatch.group(1).strip() + elif result.startswith('```json'): + result = re.sub(r'^```json\s*', '', result) + result = re.sub(r'\s*```$', '', result) + elif result.startswith('```'): + result = re.sub(r'^```\s*', '', result) + result = re.sub(r'\s*```$', '', result) + + # Try to parse JSON + enhancedContent = json.loads(result) + logger.info(f"AI enhanced JSON content successfully for document {documentIndex}") + + except json.JSONDecodeError as e: + logger.warning(f"AI generation returned invalid JSON for document {documentIndex}: {str(e)}, using original content") + enhancedContent = docData + else: + logger.warning(f"AI generation returned empty response for document {documentIndex}, using original content") + enhancedContent = docData + + except Exception as e: + logger.warning(f"AI generation failed for document {documentIndex}: {str(e)}, using original content") + enhancedContent = docData + + # Render the enhanced JSON content + renderedContent, mimeType = await generationService.renderReport( + extractedContent=enhancedContent, + outputFormat=outputFormat, + title=docData.get("title", title), + userPrompt=title, + aiService=self + ) + + # Generate proper filename + baseFilename = docData.get("filename", f"document_{documentIndex + 1}") + if '.' in baseFilename: + baseFilename = baseFilename.rsplit('.', 1)[0] + + # Add proper extension based on output format + if outputFormat.lower() == "docx": + filename = f"{baseFilename}.docx" + elif outputFormat.lower() == "pdf": + filename = f"{baseFilename}.pdf" + elif outputFormat.lower() == "html": + filename = f"{baseFilename}.html" + else: + filename = f"{baseFilename}.{outputFormat}" + + return { + "documentName": filename, + "documentData": renderedContent, + "mimeType": mimeType, + "title": docData.get("title", title), + "documentIndex": documentIndex + } + + except Exception as e: + logger.error(f"Error processing document {documentIndex}: {str(e)}") + raise + +def _buildErrorResult(self, errorMessage: str, outputFormat: str, title: str) -> Dict[str, Any]: + """ + Build error result with unified structure. + """ + return { + "success": False, + "error": errorMessage, + "content": {}, + "documents": [], + "is_multi_file": False, + "format": outputFormat, + "title": title, + "split_strategy": "error", + "total_documents": 0, + "processed_documents": 0 + } +``` + +### Phase 5: Remove Legacy Methods + +#### 5.1 Delete Single-File Methods +```python +# Remove these methods: +- _callAiWithSingleFileGeneration() +- _callAiWithMultiFileGeneration() +- _validateResponseStructure() +- getExtractionPrompt() (in GenerationService) +``` + +#### 5.2 Update Method References +- Update all callers to use `callAiWithDocumentGeneration()` +- Update tests to use unified approach +- Update documentation + +### Phase 6: Testing and Validation + +#### 6.1 Unit Tests +```python +async def test_unified_single_file_generation(): + """Test that single file generation works with unified approach""" + result = await aiService.callAiWithDocumentGeneration( + prompt="Generate a single document", + documents=None, + options=options, + outputFormat="html", + title="Test Document" + ) + + assert result["success"] == True + assert result["is_multi_file"] == False + assert len(result["documents"]) == 1 + assert isinstance(result["content"], dict) + assert "documents" in result["content"] + +async def test_unified_multi_file_generation(): + """Test that multi file generation works with unified approach""" + result = await aiService.callAiWithDocumentGeneration( + prompt="Generate multiple documents", + documents=None, + options=options, + outputFormat="html", + title="Test Documents" + ) + + assert result["success"] == True + assert result["is_multi_file"] == True + assert len(result["documents"]) > 1 + assert isinstance(result["content"], dict) + assert "documents" in result["content"] + +async def test_unified_validation_partial_failure(): + """Test that partial document failure doesn't fail entire process""" + # Mock scenario where one document fails validation + # Should process remaining documents successfully + pass +``` + +#### 6.2 Integration Tests +- Test with various document types +- Test with different output formats +- Test chunking functionality +- Test error handling scenarios + +## Migration Strategy + +### Step 1: Implement Unified Methods +1. Create new unified methods alongside existing ones +2. Add feature flag to switch between old and new approaches +3. Test new methods thoroughly + +### Step 2: Update Callers +1. Update all callers to use unified approach +2. Update tests to use new methods +3. Verify functionality is preserved + +### Step 3: Remove Legacy Code +1. Remove old single-file and multi-file methods +2. Remove old validation methods +3. Clean up unused imports and references + +### Step 4: Final Testing +1. Run full test suite +2. Test with real-world scenarios +3. Performance testing +4. Documentation updates + +## Benefits of Unified Approach + +### Code Quality +- **Reduced Duplication**: ~200 lines of duplicate code removed +- **Single Code Path**: Easier to maintain and debug +- **Consistent Behavior**: Same logic for all document types + +### Performance +- **Better CPU Cache Usage**: Single code path +- **Reduced Memory Footprint**: No duplicate code +- **Faster Development**: Changes affect all cases automatically + +### Maintainability +- **Single Point of Truth**: All document generation logic in one place +- **Easier Testing**: One code path to test +- **Simpler Debugging**: Single call stack to trace + +### Flexibility +- **Dynamic Switching**: Can switch between single/multi based on content +- **Easy Extensions**: New features automatically work for all cases +- **Better Error Handling**: Unified error handling approach + +## Risk Mitigation + +### Backward Compatibility +- **No Backward Compatibility Required**: As specified +- **Clean Migration**: Complete replacement of old system + +### Testing +- **Comprehensive Testing**: Unit and integration tests +- **Real-World Testing**: Test with actual use cases +- **Performance Testing**: Ensure no performance regression + +### Rollback Plan +- **Feature Flag**: Can quickly switch back to old system if needed +- **Gradual Migration**: Can migrate callers one by one +- **Monitoring**: Monitor for any issues during migration + +## Conclusion + +This refactoring plan provides a clear path to unify the document generation system, eliminating code duplication while preserving all existing functionality. The unified approach is more maintainable, performant, and flexible than the current dual-path system. + +The key insight is that single-file generation is just a special case of multi-file generation with one document, so the unified approach is more elegant and maintainable than maintaining separate code paths. diff --git a/poweron/implementation/implement_normalization_service.md b/poweron/implementation/implementation_normalization_service.md similarity index 100% rename from poweron/implementation/implement_normalization_service.md rename to poweron/implementation/implementation_normalization_service.md diff --git a/poweron/implementation/implementation_refactor_db-consistency.md b/poweron/implementation/implementation_refactor_db-consistency.md new file mode 100644 index 0000000..ad6966b --- /dev/null +++ b/poweron/implementation/implementation_refactor_db-consistency.md @@ -0,0 +1,206 @@ +# DB-backed Object Consistency Refactor + +Date: 2025-10-16 + +Owner: Workflow/Data Layer + +Goal: Eliminate divergence between in-memory objects and database records by centralizing create/update/delete (CUD) logic in service/DB layer, and prohibiting direct list mutations in workflow objects. + +--- + +## Inputs / Constraints + +- No caching subsystem: we use the in-memory workflow objects directly (no separate cache layer to reconcile). +- ChatDocuments must be able to be produced and exist in memory only (transient) when needed; this capability must be preserved. + +Implication: Service methods must support both DB-backed and in-memory-only flows, with clear, explicit steps for when to persist. + +--- + +## 1) Affected Pydantic Models (with DB representation) + +Source: `gateway/modules/interfaces/interfaceDbChatObjects.py` + +- ChatWorkflow (table: workflows) +- ChatMessage (table: messages) +- ChatDocument (table: documents) +- ChatLog (table: logs) +- ChatStat (table: stats) + +Notes: +- `ChatWorkflow` has references to: `messages`, `logs`, `stats` (loaded via read operations only) +- `ChatMessage` has references to: `documents`, `stats` + +--- + +## 2) Where CUD is performed today (by module/function) + +- Workflow (ChatWorkflow) + - createWorkflow/updateWorkflow/deleteWorkflow: `interfaceDbChatObjects.ChatObjects` + - updateWorkflowStats: `serviceWorkflow.WorkflowService` + +- Message (ChatMessage) + - createMessage/updateMessage/deleteMessage: `interfaceDbChatObjects.ChatObjects` + - createMessage (write-through + local append attempt): `serviceWorkflow.WorkflowService.createMessage` + - MessageCreator paths (indirect, see forbidden appends below): `workflows/processing/core/messageCreator.py` + - workflowManager paths (indirect, see forbidden appends below): `workflows/workflowManager.py` + +- Document (ChatDocument) + - createDocument/getDocuments: `interfaceDbChatObjects.ChatObjects` + - deleteFileFromMessage: `interfaceDbChatObjects.ChatObjects` + +- Log (ChatLog) + - createLog/getLogs: `interfaceDbChatObjects.ChatObjects` + +- Stat (ChatStat) + - getMessageStats/getWorkflowStats: `interfaceDbChatObjects.ChatObjects` + - updateWorkflowStats: `serviceWorkflow.WorkflowService` + +--- + +## 3) Forbidden in-memory `.append(...)` sites (must be removed or guarded) + +- `gateway/modules/services/serviceWorkflow/mainServiceWorkflow.py` + - createMessage: `workflow.messages.append(message)` (has id-based guard; keep only this site to have calls to self.services.workflow.createMessage(...) in the code) + - (Two occurrences) `workflow.logs.append(log_entry)` + +- `gateway/modules/workflows/processing/core/messageCreator.py` + - Multiple occurrences of `workflow.messages.append(message)` + +- `gateway/modules/workflows/workflowManager.py` + - Multiple occurrences of `workflow.messages.append(message)` across handling steps + +- `gateway/modules/workflows/processing/modes/modeReact.py` + - `workflow.messages.append(message)` + +- `gateway/modules/workflows/processing/modes/modeActionplan.py` + - `workflow.messages.append(message)` + +Action: Replace all of the above with calls to `serviceWorkflow.createMessage/updateMessage` methods. No direct appends. + +--- + +## 4) Cascading CUD that should NOT happen + +Observed in: `interfaceDbChatObjects.ChatObjects.updateWorkflow` (lines updating `logs`, `messages`, `stats` inside workflow update). + +Issue: Parent-level update attempts to write child tables (‘cascade write’) during a workflow update. This causes ordering issues and potential duplicates. + +Action: +- Remove cascade CUD from `updateWorkflow`. Workflow update should only modify workflow fields. +- CUD for `messages`, `logs`, `stats`, `documents` must be done via their own dedicated service methods. +- Keep cascade on READ ONLY (e.g., `getWorkflow` loads logs/messages/stats/documents). + +--- + +## 5) Proposed Operating Model (write-through, DB as source of truth) + +- For each DB-backed object (workflow, message, document, log, stat): + 1) All create/update/delete goes through `interfaceDbChatObjects.ChatObjects` or `serviceWorkflow.WorkflowService` dedicated methods. + 2) After DB success, update in-memory cache (replace by id or remove by id). No direct `.append()` other than in the single service function immediately after DB write (and guarded by id check). + 3) All other modules must not mutate lists directly. Replace with service calls. + +- Reads: May hydrate workflow with referenced objects (messages/logs/stats/documents). No writes in read functions. + +--- + +## 6) Tasklist: Interface changes in `interfaceDbChatObjects.py` + +- [ ] Remove cascade writes in `updateWorkflow` for `logs`, `messages`, `stats`. +- [ ] Ensure `createMessage` returns persisted `ChatMessage` and does not assume local `sequenceNr` from in-memory list; optionally compute from DB if required, or drop if unused. +- [ ] Ensure `updateMessage` only updates message table and related tables when explicitly provided via dedicated methods. +- [ ] Ensure `createDocument` and `deleteFileFromMessage` are the only paths to manage documents. +- [ ] Add helper `syncWorkflowInMemory(workflowId)` to refresh `services.currentWorkflow` from DB when needed. + +--- + +## 7) Tasklist: Module/function updates + +- `services/serviceWorkflow/mainServiceWorkflow.py` + - [ ] createMessage: keep DB write-through; keep single guarded in-memory append (or drop append and rely on refresh); add optional `syncWorkflowInMemory` call. + - [ ] Remove any direct `workflow.logs.append(...)` writes; replace with `createLog` and in-memory refresh. + +- `workflows/processing/core/messageCreator.py` + - [ ] Replace all `workflow.messages.append(message)` with `services.workflow.createMessage(...)` (or `updateMessage(...)` if appropriate). + +- `workflows/workflowManager.py` + - [ ] Replace all direct appends with service calls. + +- `workflows/processing/modes/modeReact.py` + - [ ] Replace direct append with service calls. + +- `workflows/processing/modes/modeActionplan.py` + - [ ] Replace direct append with service calls. + +--- + +## 8) Diagnostics to add (temporary) + +- Before any in-memory list mutation (temporary while refactoring): + - Log caller, object type, id, len(list) before/after. + - This will surface any remaining illegal appends. + +--- + +## 10) Service-level Transactions (must update DB and in-memory references) + +All transactions below are implemented at the service layer (not interface) and must perform both the database write and the in-memory reference updates to keep the live `workflow` object consistent. + +### A) Store message with documents to workflow + +Function: `services.workflow.storeMessageWithDocuments(workflow, messageData, documents)` + +Required behavior: +1) Ensure ChatMessage exists in DB (create if not exists) with proper `workflowId` (source of truth id coming from parameter). +2) Persist each ChatDocument in DB referencing the created message (set `messageId`) and the same `workflowId` when applicable. +3) Rehydrate/construct a `ChatMessage` object that contains the newly created `ChatDocument` objects. +4) Update in-memory workflow object: + - Append/replace the message in `services.currentWorkflow.messages` by id (replace if exists, else append). + - Ensure the in-memory `ChatMessage.documents` points to the same list of in-memory `ChatDocument` instances. +5) Return the in-memory `ChatMessage` (with documents) as confirmation. + +Notes: +- If `documents` include in-memory-only `ChatDocument` instances (no DB ids yet), step (2) must persist them and update their ids before binding them back to the message in-memory object. +- No other modules are allowed to mutate `workflow.messages` directly; this transaction is the single entry point for this behavior. + +Future transactions (to be specified similarly): +- Update message with added/removed documents +- Remove document from message +- Delete message (+ cascade delete of its documents in DB) and remove from in-memory workflow + +### B) Store log entry to workflow + +Function: `services.workflow.storeLog(workflow, logData)` + +Required behavior: +1) Persist `ChatLog` in DB with `workflowId`. +2) Update in-memory workflow object: append/replace the `ChatLog` in `services.currentWorkflow.logs` by id (if logs are held in-memory), or trigger a lightweight refresh. +3) Return the in-memory `ChatLog`. + +Notes: +- No direct `workflow.logs.append(...)` outside this service method. + +### C) Store stat entry to workflow (workflow-level stat) + +Function: `services.workflow.storeWorkflowStat(workflow, statData)` + +Required behavior: +1) Persist `ChatStat` in DB with `workflowId` (no `messageId`). +2) Update in-memory workflow object: set/replace workflow-level `stats` on `services.currentWorkflow` as applicable. +3) Return the in-memory `ChatStat`. + +Notes: +- No direct in-memory mutation outside this service method. + +### D) Store stat entry to message (message-level stat) + +Function: `services.workflow.storeMessageStat(workflow, messageId, statData)` + +Required behavior: +1) Persist `ChatStat` in DB with both `workflowId` and `messageId`. +2) Update in-memory workflow object: locate message by `messageId` in `services.currentWorkflow.messages` and set/replace its `stats`. +3) Return the in-memory `ChatStat`. + +Notes: +- If the target message is not present in memory, optionally refresh the workflow messages or require caller to provide the message object; no direct appends outside service method. + diff --git a/poweron/implementation/implementation_refactor_stats-unified.md b/poweron/implementation/implementation_refactor_stats-unified.md new file mode 100644 index 0000000..236a69d --- /dev/null +++ b/poweron/implementation/implementation_refactor_stats-unified.md @@ -0,0 +1,239 @@ +# Stats Refactory Concept and Implementation Plan + +Date: 2025-10-16 +Owner: Workflow/Data Layer + +## Goals +- Clean, consistent statistics (ChatStat) model and lifecycle. +- Workflow is the authoritative container for stats (similar to logs). +- Simple, predictable DB APIs for reading full stats history per workflow. +- Lightweight instrumentation points at key producers (AI calls, extraction, generation) to emit stats. + +--- + +## Datamodel Changes (modules/datamodels/datamodelChat.py) + +- ChatStat (all fields optional) + - Keep: id, workflowId, processingTime, bytesSent, bytesReceived, errorCount + - Remove: successRate, tokenCount, tokenPriceUnit, tokenPriceAmount + - Add: process: str + - Example values: "action.outlook.readMails", "ai.process.document.name" + - Add: engine: str + - Example values: "ai.anthropic.35", "ai.tavily.basic", "renderer.docx" + - Add: priceUsd: float (calculated price in USD for the operation) + +- ChatWorkflow + - stats: change from Optional[ChatStat] to List[ChatStat] (default []) + +- ChatMessage + - Remove stats field entirely to avoid coupling to message lifecycles. + - Remove messageId from ChatStat (no longer needed) + +Rationale: +- Stats are workflow-scope artifacts for billing/analytics. They should not depend on transient message presence. +- Attaching stats to workflow aligns with logs; UI polling returns consolidated workflow data including stats list. + +--- + +## Interface/API Changes (modules/interfaces/interfaceDbChatObjects.py) + +- Replace getWorkflowStats(workflowId) returning single latest with: + - getWorkflowStatsList(workflowId) -> List[ChatStat] +- Add createWorkflowStat(statData: Dict[str, Any]) -> ChatStat (write-only per record) +- Keep updateWorkflowStats(...) only if absolutely needed; prefer append-only stats records for auditability +- Update unified polling (getUnifiedChatData) to include full current stats list for workflow for each poll + +DB logic: +- Store each stat emission as one row (append-only). No modify unless explicitly required. +- Deletion remains cascading on workflow delete. + +--- + +## Service Layer Changes (modules/services/serviceWorkflow/mainServiceWorkflow.py) + +- Replace storeWorkflowStat(workflow, statData) to: + - Coerce statData to ChatStat fields (process, engine, priceUsd, bytesSent, bytesReceived, processingTime, errorCount) --> to handover pydantic model (with id empty), not the single values + - Set id and workflowId + - Persist via interfaceDbChat.createWorkflowStat + - Append to workflow.stats in memory + - Return ChatStat + +- Remove storeMessageStat (no message-level stats in new model) + +--- + +## Instrumentation Points (where to emit stats) + +1) AI Calls (modules/interfaces/interfaceAiObjects.py) + - Each AI call measures processing time, calculates priceUsd, and tracks bytes sent/received + - Returns standardized AiCallResponse with priceUsd, processingTime, bytesSent, bytesReceived, errorCount included (to adapt pydantic model) + - All AI functions (call, callImage, generateImage, webQuery, etc.) return this standardized response + - Model-specific pricing functions calculate priceUsd based on (processingTime, bytesSent, bytesReceived) + +2) Service Layer (modules/services/serviceWorkflow/mainServiceWorkflow.py) + - Receives AiCallResponse from AI interface + - Creates ChatStat objects with data from AiCallResponse + - Emits stats to database via interfaceDbChat.createWorkflowStat + +3) Extraction/Generation Services + - services/serviceExtraction/mainServiceExtraction.py and related extractors + - services/serviceGeneration/* (document rendering/generation) + - Emit ChatStat per completed operation with process, engine, processingTime, bytes, priceUsd, etc. + +4) Workflow-level summaries (optional) + - At workflow completion, compute a summary stat (aggregated bytes/costs) if desired + +--- + +## AiCallResponse Changes (modules/datamodels/datamodelAi.py) + +Update AiCallResponse to include standardized stats fields: + +- Remove: usedTokens, costEstimate +- Add: priceUsd: float (calculated price in USD) +- Add: processingTime: float (duration in seconds) +- Add: bytesSent: int (input data size in bytes) +- Add: bytesReceived: int (output data size in bytes) +- Add: errorCount: int (0 for success, 1+ for errors) + +## Model-Specific Pricing Functions + +Each AI model should implement a pricing calculation function that takes the operation parameters and returns the cost in USD: + +- Function signature: `_calculatePriceUsd(processingTime: float, bytesSent: int, bytesReceived: int) -> float` +- Parameters: + - `processingTime`: Duration of the operation in seconds + - `bytesSent`: Size of input data in bytes + - `bytesReceived`: Size of output data in bytes +- Returns: Price in USD as float + +Implementation approach: +- Each model class (e.g., OpenAI, Anthropic, etc.) implements its own pricing logic +- Complex pricing models can handle different rates for input/output, time-based costs, etc. +- Models can access their specific pricing configuration (rates, tiers, etc.) +- No fallback! each model MUST have its pricing calculation + +Example implementations: +- OpenAI: Token-based pricing with different rates for input/output tokens +- Anthropic: Similar token-based pricing with model-specific rates +- Tavily: Simple per-request pricing +- Custom models: Any complex pricing logic (tiered, time-based, etc.) + +--- + +## Routing/Polling (UI) + +- Update route (unified polling) to include stats list for workflow: + - interfaceDbChatObjects.getUnifiedChatData(workflowId): include + - { type: "stat", createdAt: stat_timestamp, item: ChatStat(dict) } for all stats + +- UI will receive a complete and growing list of stats with each poll; no special latest logic required. + +--- + +## Step-by-Step Implementation Plan + +### Phase 1: Core Data Model Changes +1. **Update ChatStat model** (modules/datamodels/datamodelChat.py) + - Remove: successRate, tokenCount, tokenPriceUnit, tokenPriceAmount, messageId + - Add: process: str, engine: str, priceUsd: float + - Keep: id, workflowId, processingTime, bytesSent, bytesReceived, errorCount + +2. **Update ChatWorkflow model** (modules/datamodels/datamodelChat.py) + - Change stats from Optional[ChatStat] to List[ChatStat] (default []) + +3. **Update ChatMessage model** (modules/datamodels/datamodelChat.py) + - Remove stats field entirely + - Remove messageId from ChatStat (no longer needed) + +4. **Update AiCallResponse model** (modules/datamodels/datamodelAi.py) + - Remove: usedTokens, costEstimate + - Add: priceUsd: float, processingTime: float, bytesSent: int, bytesReceived: int, errorCount: int + +### Phase 2: Database Interface Changes +5. **Update interfaceDbChatObjects.py** + - Implement createWorkflowStat(workflow: ChatWorkflow, statData: Dict[str, Any]) -> ChatStat (the function creates a new ChatStat object, persists it to the database, and includes it into the workflows stats list) + - Implement getWorkflowStatsList(workflowId) -> List[ChatStat] (the function returns the list of stats for a given workflow) + - Update getUnifiedChatData to include full stats list + - Remove getWorkflowStats (single latest) - replace with getWorkflowStatsList + +### Phase 3: AI Interface Layer Changes +6. **Add model-specific pricing functions** (modules/interfaces/interfaceAiObjects.py) + - Add _calculatePriceUsd() method to each model in aiModels registry + - Implement pricing logic for each connector type (OpenAI, Anthropic, Perplexity, Tavily) + +7. **Update all AI call methods** (modules/interfaces/interfaceAiObjects.py) + - Add timing measurement (start/end timestamps) + - Calculate bytesSent (input data size) + - Calculate bytesReceived (output data size) + - Call model-specific _calculatePriceUsd() + - Return standardized AiCallResponse with all stats fields + +8. **Update specific AI methods:** + - call() - text generation + - callImage() - image analysis + - generateImage() - image generation + - webQuery() - web research + - All web methods (search_websites, crawl_websites, etc.) + +### Phase 4: Service Layer Changes +9. **Update mainServiceWorkflow.py** + - Implement storeWorkflowStat(workflow: ChatWorkflow, aiResponse: AiCallResponse, process: str) -> ChatStat + - Remove storeMessageStat (no message-level stats) + - Update storeWorkflowStat to use new ChatStat fields + +10. **Update workflow service methods** + - Replace old stats handling with new storeWorkflowStat calls + - Ensure all AI calls go through storeWorkflowStat + +### Phase 5: Extraction/Generation Services +11. **Update serviceExtraction services** + - Add stats emission for extraction operations + - Calculate processingTime, bytesSent, bytesReceived + - Emit ChatStat via storeWorkflowStat + +12. **Update serviceGeneration services** + - Add stats emission for generation operations + - Calculate processingTime, bytesSent, bytesReceived + - Emit ChatStat via storeWorkflowStat + +### Phase 6: Integration and Testing +13. **Update all AI call sites** + - Replace old stats handling with new AiCallResponse usage + - Ensure all AI calls emit stats via storeWorkflowStat + +14. **Update routing/polling** + - Ensure getUnifiedChatData includes full stats list + - Update UI polling to handle stats array + +15. **Add comprehensive logging** + - Debug logs for each stat emission (process, engine, bytes, priceUsd) + - Error handling for stats emission failures + +### Phase 7: Cleanup and Validation +16. **Remove deprecated code** + - Remove old getWorkflowStats calls + - Remove old updateWorkflowStats calls + - Remove token-based pricing logic + +17. **Add validation and tests** + - Test stats emission for all AI operations + - Validate pricing calculations + - Test database operations + +18. **Performance optimization** + - Ensure stats emission doesn't impact AI call performance + - Optimize database queries for stats retrieval + +--- + +## Smart Implementation Notes + +- **Centralized measurement**: AI interface layer handles all timing and byte counting, ensuring consistency. +- **Standardized responses**: All AI calls return AiCallResponse with complete stats data. +- **Model-specific pricing**: Each model implements `_calculatePriceUsd()` with its own pricing strategy. +- **Service layer simplicity**: Service layer just creates ChatStat objects from AiCallResponse data. +- **Append-only stats**: Simplifies auditing and invoicing; no implicit overwrites. +- **Workflow-scoped stats**: All stats attached to workflow, not messages; UI gets complete history. +- **Performance consideration**: Stats emission should not impact AI call performance. +- **Error handling**: Graceful degradation if stats emission fails; AI calls should still succeed. diff --git a/poweron/implementation/implement_workflow_processing_refactoring.md b/poweron/implementation/implementation_workflow_processing_refactoring.md similarity index 100% rename from poweron/implementation/implement_workflow_processing_refactoring.md rename to poweron/implementation/implementation_workflow_processing_refactoring.md