updated document handling

This commit is contained in:
ValueOn AG 2025-10-20 09:52:02 +02:00
parent c49a59f46c
commit 62bea8c8aa
5 changed files with 1072 additions and 0 deletions

View file

@ -0,0 +1,627 @@
# Document Generation One-Path Refactoring Plan
## Overview
This document outlines the refactoring plan to unify the document generation system by eliminating the dual-path approach (single-file vs multi-file) and implementing a unified multi-file approach that handles both single and multiple document generation seamlessly.
## Current State Analysis
### Current Dual-Path Structure
- **Single File Path**: `_callAiWithSingleFileGeneration()`
- **Multi File Path**: `_callAiWithMultiFileGeneration()`
- **Code Duplication**: ~80% of functionality is duplicated
- **Maintenance Overhead**: Two separate code paths to maintain
### Key Differences to Address
1. **Prompt Generation**: `getExtractionPrompt` vs `getAdaptiveExtractionPrompt`
2. **Result Structure**: Single object vs array structure
3. **Validation Logic**: Different validation rules for single vs multi-file
4. **Processing Pipeline**: Separate processing flows
## Refactoring Goals
1. **Unify Code Paths**: Single processing pipeline for all document generation
2. **Eliminate Duplication**: Remove ~200 lines of duplicate code
3. **Improve Maintainability**: Single code path to maintain and test
4. **Enhance Flexibility**: Dynamic switching between single/multi based on content
5. **Preserve Functionality**: Maintain all existing capabilities
## Implementation Plan
### Phase 1: Prompt Generation Unification
#### 1.1 Modify `getAdaptiveExtractionPrompt` to Handle Single File
```python
async def getAdaptiveExtractionPrompt(
self,
outputFormat: str,
userPrompt: str,
title: str,
promptAnalysis: Dict[str, Any],
aiService: AiService
) -> str:
"""
Unified extraction prompt that handles both single and multi-file cases.
Hides multi-file specific parts when single file is requested.
"""
# Base prompt structure
basePrompt = f"""
Generate a structured document in {outputFormat} format based on the user request.
User Request: {userPrompt}
Title: {title}
"""
# Add multi-file logic only if needed
if promptAnalysis.get("is_multi_file", False):
multiFileSection = f"""
MULTI-FILE GENERATION:
- Split strategy: {promptAnalysis.get("strategy", "custom")}
- Split criteria: {promptAnalysis.get("criteria", "content-based")}
- File naming pattern: {promptAnalysis.get("file_naming_pattern", "document_{index}")}
Return JSON structure:
{{
"documents": [
{{
"id": "doc_1",
"title": "Document Title",
"filename": "document_1.{outputFormat}",
"sections": [...]
}}
]
}}
"""
basePrompt += multiFileSection
else:
singleFileSection = f"""
SINGLE-FILE GENERATION:
Return JSON structure:
{{
"documents": [
{{
"id": "doc_1",
"title": "{title}",
"filename": "{title}.{outputFormat}",
"sections": [...]
}}
]
}}
"""
basePrompt += singleFileSection
# Add chunking support for large documents
chunkingSection = """
CHUNKING SUPPORT:
If the document is too large to generate in one response, include:
- "continue": true
- "continuation_context": {
"last_section_id": "section_id",
"last_element_index": 0,
"remaining_requirements": "description"
}
The system will automatically request continuation chunks until complete.
"""
basePrompt += chunkingSection
return basePrompt
```
#### 1.2 Remove `getExtractionPrompt` Method
- Delete the single-file specific prompt generation method
- Update all references to use `getAdaptiveExtractionPrompt`
### Phase 2: Unified Processing Pipeline
#### 2.1 Create Unified `callAiWithDocumentGeneration` Method
```python
async def callAiWithDocumentGeneration(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
outputFormat: str,
title: Optional[str]
) -> Dict[str, Any]:
"""
Unified document generation method that handles both single and multi-file cases.
Always uses multi-file approach internally.
"""
try:
# 1. Analyze prompt intent
promptAnalysis = await self._analyzePromptIntent(prompt, self)
logger.info(f"Prompt analysis result: {promptAnalysis}")
# 2. Get unified extraction prompt
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
extractionPrompt = await generationService.getAdaptiveExtractionPrompt(
outputFormat=outputFormat,
userPrompt=prompt,
title=title,
promptAnalysis=promptAnalysis,
aiService=self
)
# 3. Process with unified pipeline (always multi-file approach)
aiResponse = await self._processDocumentsUnified(
documents, extractionPrompt, options, outputFormat, title, promptAnalysis
)
# 4. Return unified result structure
return self._buildUnifiedResult(aiResponse, outputFormat, title, promptAnalysis)
except Exception as e:
logger.error(f"Error in unified document generation: {str(e)}")
return self._buildErrorResult(str(e), outputFormat, title)
```
#### 2.2 Create Unified Processing Method
```python
async def _processDocumentsUnified(
self,
documents: Optional[List[ChatDocument]],
extractionPrompt: str,
options: AiCallOptions,
outputFormat: str,
title: str,
promptAnalysis: Dict[str, Any]
) -> Dict[str, Any]:
"""
Unified document processing that handles both single and multi-file cases.
Always processes as multi-file structure internally.
"""
import time
# Create progress logger
workflow = self.services.currentWorkflow
progressLogger = self.services.workflow.createProgressLogger(workflow)
operationId = f"docGenUnified_{workflow.id}_{int(time.time())}"
try:
# Start progress tracking
progressLogger.startOperation(
operationId,
"Generate",
"Unified Document Generation",
f"Processing {len(documents) if documents else 0} documents"
)
# Update progress - generating extraction prompt
progressLogger.updateProgress(operationId, 0.1, "Generating prompt")
# Process with unified JSON pipeline
aiResponse = await self.documentProcessor.processDocumentsPerChunkJsonWithPrompt(
documents, extractionPrompt, options
)
# Update progress - AI processing completed
progressLogger.updateProgress(operationId, 0.6, "Processing done")
# Validate response structure
if not self._validateUnifiedResponseStructure(aiResponse):
raise Exception("AI response is not valid unified document structure")
# Emit raw extracted data as a chat message attachment
try:
await self._postRawDataChatMessage(aiResponse, label="raw_extraction_unified")
except Exception:
logger.warning("Failed to emit raw extraction chat message (unified)")
# Complete progress tracking
progressLogger.completeOperation(operationId, True)
return aiResponse
except Exception as e:
logger.error(f"Error in unified document processing: {str(e)}")
progressLogger.completeOperation(operationId, False)
raise
```
### Phase 3: Unified Validation System
#### 3.1 Create Unified Validation Method
```python
def _validateUnifiedResponseStructure(self, response: Dict[str, Any]) -> bool:
"""
Unified validation that checks for multi-file structure.
Validates that response has documents array and each document has sections.
"""
try:
if not isinstance(response, dict):
logger.warning(f"Response validation failed: Response is not a dict, got {type(response)}")
return False
# Check for documents array
hasDocuments = "documents" in response
isDocumentsList = isinstance(response.get("documents"), list)
if not (hasDocuments and isDocumentsList):
logger.warning(f"Unified validation failed: documents key present={hasDocuments}, documents is list={isDocumentsList}")
logger.warning(f"Available keys: {list(response.keys())}")
return False
documents = response.get("documents", [])
if not documents:
logger.warning("Unified validation failed: documents array is empty")
return False
# Validate each document individually
validDocuments = 0
for i, doc in enumerate(documents):
if self._validateDocumentStructure(doc, i):
validDocuments += 1
else:
logger.warning(f"Document {i} failed validation, but continuing with others")
# Process succeeds if at least one document is valid
if validDocuments == 0:
logger.error("Unified validation failed: no valid documents found")
return False
logger.info(f"Unified validation passed: {validDocuments}/{len(documents)} documents valid")
return True
except Exception as e:
logger.warning(f"Unified response validation failed with exception: {str(e)}")
return False
def _validateDocumentStructure(self, document: Dict[str, Any], documentIndex: int) -> bool:
"""
Validate individual document structure.
Returns True if document is valid, False otherwise.
Does not fail the entire process if one document is invalid.
"""
try:
if not isinstance(document, dict):
logger.warning(f"Document {documentIndex} validation failed: not a dict")
return False
# Check for required fields
hasTitle = "title" in document
hasSections = "sections" in document
isSectionsList = isinstance(document.get("sections"), list)
if not (hasTitle and hasSections and isSectionsList):
logger.warning(f"Document {documentIndex} validation failed: title={hasTitle}, sections={hasSections}, sections_list={isSectionsList}")
return False
sections = document.get("sections", [])
if not sections:
logger.warning(f"Document {documentIndex} validation failed: sections array is empty")
return False
logger.info(f"Document {documentIndex} validation passed")
return True
except Exception as e:
logger.warning(f"Document {documentIndex} validation failed with exception: {str(e)}")
return False
```
#### 3.2 Remove Old Validation Methods
- Delete `_validateResponseStructure` method
- Update all references to use `_validateUnifiedResponseStructure`
### Phase 4: Unified Result Structure
#### 4.1 Create Unified Result Builder
```python
def _buildUnifiedResult(
self,
aiResponse: Dict[str, Any],
outputFormat: str,
title: str,
promptAnalysis: Dict[str, Any]
) -> Dict[str, Any]:
"""
Build unified result structure that always returns array-based format.
Content is always a multi-document structure.
"""
try:
# Process all documents uniformly
generatedDocuments = []
documents = aiResponse.get("documents", [])
for i, docData in enumerate(documents):
try:
processedDocument = await self._processDocument(
docData, outputFormat, title, promptAnalysis, i
)
generatedDocuments.append(processedDocument)
except Exception as e:
logger.warning(f"Failed to process document {i}: {str(e)}, skipping")
continue
if not generatedDocuments:
raise Exception("No documents could be processed successfully")
# Build unified result
result = {
"success": True,
"content": aiResponse, # Always multi-document structure
"documents": generatedDocuments, # Always array
"is_multi_file": len(generatedDocuments) > 1,
"format": outputFormat,
"title": title,
"split_strategy": promptAnalysis.get("strategy", "single"),
"total_documents": len(generatedDocuments),
"processed_documents": len(generatedDocuments)
}
return result
except Exception as e:
logger.error(f"Error building unified result: {str(e)}")
return self._buildErrorResult(str(e), outputFormat, title)
async def _processDocument(
self,
docData: Dict[str, Any],
outputFormat: str,
title: str,
promptAnalysis: Dict[str, Any],
documentIndex: int
) -> Dict[str, Any]:
"""
Process individual document with content enhancement and rendering.
"""
try:
# Get generation service
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
# Use AI generation to enhance the extracted JSON before rendering
enhancedContent = docData # Default to original
if docData.get("sections"):
try:
# Get generation prompt
generationPrompt = await generationService.getGenerationPrompt(
outputFormat=outputFormat,
userPrompt=title,
title=docData.get("title", title),
aiService=self
)
# Prepare the AI call
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
requestOptions = AiCallOptions()
requestOptions.operationType = OperationType.GENERAL
# Create context with the extracted JSON content
import json
context = f"Extracted JSON content:\n{json.dumps(docData, indent=2)}"
request = AiCallRequest(
prompt=generationPrompt,
context=context,
options=requestOptions
)
# Call AI to enhance the content
response = await self.aiObjects.call(request)
if response and response.content:
# Parse the AI response as JSON
try:
import re
result = response.content.strip()
# Extract JSON from markdown if present
jsonMatch = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL)
if jsonMatch:
result = jsonMatch.group(1).strip()
elif result.startswith('```json'):
result = re.sub(r'^```json\s*', '', result)
result = re.sub(r'\s*```$', '', result)
elif result.startswith('```'):
result = re.sub(r'^```\s*', '', result)
result = re.sub(r'\s*```$', '', result)
# Try to parse JSON
enhancedContent = json.loads(result)
logger.info(f"AI enhanced JSON content successfully for document {documentIndex}")
except json.JSONDecodeError as e:
logger.warning(f"AI generation returned invalid JSON for document {documentIndex}: {str(e)}, using original content")
enhancedContent = docData
else:
logger.warning(f"AI generation returned empty response for document {documentIndex}, using original content")
enhancedContent = docData
except Exception as e:
logger.warning(f"AI generation failed for document {documentIndex}: {str(e)}, using original content")
enhancedContent = docData
# Render the enhanced JSON content
renderedContent, mimeType = await generationService.renderReport(
extractedContent=enhancedContent,
outputFormat=outputFormat,
title=docData.get("title", title),
userPrompt=title,
aiService=self
)
# Generate proper filename
baseFilename = docData.get("filename", f"document_{documentIndex + 1}")
if '.' in baseFilename:
baseFilename = baseFilename.rsplit('.', 1)[0]
# Add proper extension based on output format
if outputFormat.lower() == "docx":
filename = f"{baseFilename}.docx"
elif outputFormat.lower() == "pdf":
filename = f"{baseFilename}.pdf"
elif outputFormat.lower() == "html":
filename = f"{baseFilename}.html"
else:
filename = f"{baseFilename}.{outputFormat}"
return {
"documentName": filename,
"documentData": renderedContent,
"mimeType": mimeType,
"title": docData.get("title", title),
"documentIndex": documentIndex
}
except Exception as e:
logger.error(f"Error processing document {documentIndex}: {str(e)}")
raise
def _buildErrorResult(self, errorMessage: str, outputFormat: str, title: str) -> Dict[str, Any]:
"""
Build error result with unified structure.
"""
return {
"success": False,
"error": errorMessage,
"content": {},
"documents": [],
"is_multi_file": False,
"format": outputFormat,
"title": title,
"split_strategy": "error",
"total_documents": 0,
"processed_documents": 0
}
```
### Phase 5: Remove Legacy Methods
#### 5.1 Delete Single-File Methods
```python
# Remove these methods:
- _callAiWithSingleFileGeneration()
- _callAiWithMultiFileGeneration()
- _validateResponseStructure()
- getExtractionPrompt() (in GenerationService)
```
#### 5.2 Update Method References
- Update all callers to use `callAiWithDocumentGeneration()`
- Update tests to use unified approach
- Update documentation
### Phase 6: Testing and Validation
#### 6.1 Unit Tests
```python
async def test_unified_single_file_generation():
"""Test that single file generation works with unified approach"""
result = await aiService.callAiWithDocumentGeneration(
prompt="Generate a single document",
documents=None,
options=options,
outputFormat="html",
title="Test Document"
)
assert result["success"] == True
assert result["is_multi_file"] == False
assert len(result["documents"]) == 1
assert isinstance(result["content"], dict)
assert "documents" in result["content"]
async def test_unified_multi_file_generation():
"""Test that multi file generation works with unified approach"""
result = await aiService.callAiWithDocumentGeneration(
prompt="Generate multiple documents",
documents=None,
options=options,
outputFormat="html",
title="Test Documents"
)
assert result["success"] == True
assert result["is_multi_file"] == True
assert len(result["documents"]) > 1
assert isinstance(result["content"], dict)
assert "documents" in result["content"]
async def test_unified_validation_partial_failure():
"""Test that partial document failure doesn't fail entire process"""
# Mock scenario where one document fails validation
# Should process remaining documents successfully
pass
```
#### 6.2 Integration Tests
- Test with various document types
- Test with different output formats
- Test chunking functionality
- Test error handling scenarios
## Migration Strategy
### Step 1: Implement Unified Methods
1. Create new unified methods alongside existing ones
2. Add feature flag to switch between old and new approaches
3. Test new methods thoroughly
### Step 2: Update Callers
1. Update all callers to use unified approach
2. Update tests to use new methods
3. Verify functionality is preserved
### Step 3: Remove Legacy Code
1. Remove old single-file and multi-file methods
2. Remove old validation methods
3. Clean up unused imports and references
### Step 4: Final Testing
1. Run full test suite
2. Test with real-world scenarios
3. Performance testing
4. Documentation updates
## Benefits of Unified Approach
### Code Quality
- **Reduced Duplication**: ~200 lines of duplicate code removed
- **Single Code Path**: Easier to maintain and debug
- **Consistent Behavior**: Same logic for all document types
### Performance
- **Better CPU Cache Usage**: Single code path
- **Reduced Memory Footprint**: No duplicate code
- **Faster Development**: Changes affect all cases automatically
### Maintainability
- **Single Point of Truth**: All document generation logic in one place
- **Easier Testing**: One code path to test
- **Simpler Debugging**: Single call stack to trace
### Flexibility
- **Dynamic Switching**: Can switch between single/multi based on content
- **Easy Extensions**: New features automatically work for all cases
- **Better Error Handling**: Unified error handling approach
## Risk Mitigation
### Backward Compatibility
- **No Backward Compatibility Required**: As specified
- **Clean Migration**: Complete replacement of old system
### Testing
- **Comprehensive Testing**: Unit and integration tests
- **Real-World Testing**: Test with actual use cases
- **Performance Testing**: Ensure no performance regression
### Rollback Plan
- **Feature Flag**: Can quickly switch back to old system if needed
- **Gradual Migration**: Can migrate callers one by one
- **Monitoring**: Monitor for any issues during migration
## Conclusion
This refactoring plan provides a clear path to unify the document generation system, eliminating code duplication while preserving all existing functionality. The unified approach is more maintainable, performant, and flexible than the current dual-path system.
The key insight is that single-file generation is just a special case of multi-file generation with one document, so the unified approach is more elegant and maintainable than maintaining separate code paths.

View file

@ -0,0 +1,206 @@
# DB-backed Object Consistency Refactor
Date: 2025-10-16
Owner: Workflow/Data Layer
Goal: Eliminate divergence between in-memory objects and database records by centralizing create/update/delete (CUD) logic in service/DB layer, and prohibiting direct list mutations in workflow objects.
---
## Inputs / Constraints
- No caching subsystem: we use the in-memory workflow objects directly (no separate cache layer to reconcile).
- ChatDocuments must be able to be produced and exist in memory only (transient) when needed; this capability must be preserved.
Implication: Service methods must support both DB-backed and in-memory-only flows, with clear, explicit steps for when to persist.
---
## 1) Affected Pydantic Models (with DB representation)
Source: `gateway/modules/interfaces/interfaceDbChatObjects.py`
- ChatWorkflow (table: workflows)
- ChatMessage (table: messages)
- ChatDocument (table: documents)
- ChatLog (table: logs)
- ChatStat (table: stats)
Notes:
- `ChatWorkflow` has references to: `messages`, `logs`, `stats` (loaded via read operations only)
- `ChatMessage` has references to: `documents`, `stats`
---
## 2) Where CUD is performed today (by module/function)
- Workflow (ChatWorkflow)
- createWorkflow/updateWorkflow/deleteWorkflow: `interfaceDbChatObjects.ChatObjects`
- updateWorkflowStats: `serviceWorkflow.WorkflowService`
- Message (ChatMessage)
- createMessage/updateMessage/deleteMessage: `interfaceDbChatObjects.ChatObjects`
- createMessage (write-through + local append attempt): `serviceWorkflow.WorkflowService.createMessage`
- MessageCreator paths (indirect, see forbidden appends below): `workflows/processing/core/messageCreator.py`
- workflowManager paths (indirect, see forbidden appends below): `workflows/workflowManager.py`
- Document (ChatDocument)
- createDocument/getDocuments: `interfaceDbChatObjects.ChatObjects`
- deleteFileFromMessage: `interfaceDbChatObjects.ChatObjects`
- Log (ChatLog)
- createLog/getLogs: `interfaceDbChatObjects.ChatObjects`
- Stat (ChatStat)
- getMessageStats/getWorkflowStats: `interfaceDbChatObjects.ChatObjects`
- updateWorkflowStats: `serviceWorkflow.WorkflowService`
---
## 3) Forbidden in-memory `.append(...)` sites (must be removed or guarded)
- `gateway/modules/services/serviceWorkflow/mainServiceWorkflow.py`
- createMessage: `workflow.messages.append(message)` (has id-based guard; keep only this site to have calls to self.services.workflow.createMessage(...) in the code)
- (Two occurrences) `workflow.logs.append(log_entry)`
- `gateway/modules/workflows/processing/core/messageCreator.py`
- Multiple occurrences of `workflow.messages.append(message)`
- `gateway/modules/workflows/workflowManager.py`
- Multiple occurrences of `workflow.messages.append(message)` across handling steps
- `gateway/modules/workflows/processing/modes/modeReact.py`
- `workflow.messages.append(message)`
- `gateway/modules/workflows/processing/modes/modeActionplan.py`
- `workflow.messages.append(message)`
Action: Replace all of the above with calls to `serviceWorkflow.createMessage/updateMessage` methods. No direct appends.
---
## 4) Cascading CUD that should NOT happen
Observed in: `interfaceDbChatObjects.ChatObjects.updateWorkflow` (lines updating `logs`, `messages`, `stats` inside workflow update).
Issue: Parent-level update attempts to write child tables (cascade write) during a workflow update. This causes ordering issues and potential duplicates.
Action:
- Remove cascade CUD from `updateWorkflow`. Workflow update should only modify workflow fields.
- CUD for `messages`, `logs`, `stats`, `documents` must be done via their own dedicated service methods.
- Keep cascade on READ ONLY (e.g., `getWorkflow` loads logs/messages/stats/documents).
---
## 5) Proposed Operating Model (write-through, DB as source of truth)
- For each DB-backed object (workflow, message, document, log, stat):
1) All create/update/delete goes through `interfaceDbChatObjects.ChatObjects` or `serviceWorkflow.WorkflowService` dedicated methods.
2) After DB success, update in-memory cache (replace by id or remove by id). No direct `.append()` other than in the single service function immediately after DB write (and guarded by id check).
3) All other modules must not mutate lists directly. Replace with service calls.
- Reads: May hydrate workflow with referenced objects (messages/logs/stats/documents). No writes in read functions.
---
## 6) Tasklist: Interface changes in `interfaceDbChatObjects.py`
- [ ] Remove cascade writes in `updateWorkflow` for `logs`, `messages`, `stats`.
- [ ] Ensure `createMessage` returns persisted `ChatMessage` and does not assume local `sequenceNr` from in-memory list; optionally compute from DB if required, or drop if unused.
- [ ] Ensure `updateMessage` only updates message table and related tables when explicitly provided via dedicated methods.
- [ ] Ensure `createDocument` and `deleteFileFromMessage` are the only paths to manage documents.
- [ ] Add helper `syncWorkflowInMemory(workflowId)` to refresh `services.currentWorkflow` from DB when needed.
---
## 7) Tasklist: Module/function updates
- `services/serviceWorkflow/mainServiceWorkflow.py`
- [ ] createMessage: keep DB write-through; keep single guarded in-memory append (or drop append and rely on refresh); add optional `syncWorkflowInMemory` call.
- [ ] Remove any direct `workflow.logs.append(...)` writes; replace with `createLog` and in-memory refresh.
- `workflows/processing/core/messageCreator.py`
- [ ] Replace all `workflow.messages.append(message)` with `services.workflow.createMessage(...)` (or `updateMessage(...)` if appropriate).
- `workflows/workflowManager.py`
- [ ] Replace all direct appends with service calls.
- `workflows/processing/modes/modeReact.py`
- [ ] Replace direct append with service calls.
- `workflows/processing/modes/modeActionplan.py`
- [ ] Replace direct append with service calls.
---
## 8) Diagnostics to add (temporary)
- Before any in-memory list mutation (temporary while refactoring):
- Log caller, object type, id, len(list) before/after.
- This will surface any remaining illegal appends.
---
## 10) Service-level Transactions (must update DB and in-memory references)
All transactions below are implemented at the service layer (not interface) and must perform both the database write and the in-memory reference updates to keep the live `workflow` object consistent.
### A) Store message with documents to workflow
Function: `services.workflow.storeMessageWithDocuments(workflow, messageData, documents)`
Required behavior:
1) Ensure ChatMessage exists in DB (create if not exists) with proper `workflowId` (source of truth id coming from parameter).
2) Persist each ChatDocument in DB referencing the created message (set `messageId`) and the same `workflowId` when applicable.
3) Rehydrate/construct a `ChatMessage` object that contains the newly created `ChatDocument` objects.
4) Update in-memory workflow object:
- Append/replace the message in `services.currentWorkflow.messages` by id (replace if exists, else append).
- Ensure the in-memory `ChatMessage.documents` points to the same list of in-memory `ChatDocument` instances.
5) Return the in-memory `ChatMessage` (with documents) as confirmation.
Notes:
- If `documents` include in-memory-only `ChatDocument` instances (no DB ids yet), step (2) must persist them and update their ids before binding them back to the message in-memory object.
- No other modules are allowed to mutate `workflow.messages` directly; this transaction is the single entry point for this behavior.
Future transactions (to be specified similarly):
- Update message with added/removed documents
- Remove document from message
- Delete message (+ cascade delete of its documents in DB) and remove from in-memory workflow
### B) Store log entry to workflow
Function: `services.workflow.storeLog(workflow, logData)`
Required behavior:
1) Persist `ChatLog` in DB with `workflowId`.
2) Update in-memory workflow object: append/replace the `ChatLog` in `services.currentWorkflow.logs` by id (if logs are held in-memory), or trigger a lightweight refresh.
3) Return the in-memory `ChatLog`.
Notes:
- No direct `workflow.logs.append(...)` outside this service method.
### C) Store stat entry to workflow (workflow-level stat)
Function: `services.workflow.storeWorkflowStat(workflow, statData)`
Required behavior:
1) Persist `ChatStat` in DB with `workflowId` (no `messageId`).
2) Update in-memory workflow object: set/replace workflow-level `stats` on `services.currentWorkflow` as applicable.
3) Return the in-memory `ChatStat`.
Notes:
- No direct in-memory mutation outside this service method.
### D) Store stat entry to message (message-level stat)
Function: `services.workflow.storeMessageStat(workflow, messageId, statData)`
Required behavior:
1) Persist `ChatStat` in DB with both `workflowId` and `messageId`.
2) Update in-memory workflow object: locate message by `messageId` in `services.currentWorkflow.messages` and set/replace its `stats`.
3) Return the in-memory `ChatStat`.
Notes:
- If the target message is not present in memory, optionally refresh the workflow messages or require caller to provide the message object; no direct appends outside service method.

View file

@ -0,0 +1,239 @@
# Stats Refactory Concept and Implementation Plan
Date: 2025-10-16
Owner: Workflow/Data Layer
## Goals
- Clean, consistent statistics (ChatStat) model and lifecycle.
- Workflow is the authoritative container for stats (similar to logs).
- Simple, predictable DB APIs for reading full stats history per workflow.
- Lightweight instrumentation points at key producers (AI calls, extraction, generation) to emit stats.
---
## Datamodel Changes (modules/datamodels/datamodelChat.py)
- ChatStat (all fields optional)
- Keep: id, workflowId, processingTime, bytesSent, bytesReceived, errorCount
- Remove: successRate, tokenCount, tokenPriceUnit, tokenPriceAmount
- Add: process: str
- Example values: "action.outlook.readMails", "ai.process.document.name"
- Add: engine: str
- Example values: "ai.anthropic.35", "ai.tavily.basic", "renderer.docx"
- Add: priceUsd: float (calculated price in USD for the operation)
- ChatWorkflow
- stats: change from Optional[ChatStat] to List[ChatStat] (default [])
- ChatMessage
- Remove stats field entirely to avoid coupling to message lifecycles.
- Remove messageId from ChatStat (no longer needed)
Rationale:
- Stats are workflow-scope artifacts for billing/analytics. They should not depend on transient message presence.
- Attaching stats to workflow aligns with logs; UI polling returns consolidated workflow data including stats list.
---
## Interface/API Changes (modules/interfaces/interfaceDbChatObjects.py)
- Replace getWorkflowStats(workflowId) returning single latest with:
- getWorkflowStatsList(workflowId) -> List[ChatStat]
- Add createWorkflowStat(statData: Dict[str, Any]) -> ChatStat (write-only per record)
- Keep updateWorkflowStats(...) only if absolutely needed; prefer append-only stats records for auditability
- Update unified polling (getUnifiedChatData) to include full current stats list for workflow for each poll
DB logic:
- Store each stat emission as one row (append-only). No modify unless explicitly required.
- Deletion remains cascading on workflow delete.
---
## Service Layer Changes (modules/services/serviceWorkflow/mainServiceWorkflow.py)
- Replace storeWorkflowStat(workflow, statData) to:
- Coerce statData to ChatStat fields (process, engine, priceUsd, bytesSent, bytesReceived, processingTime, errorCount) --> to handover pydantic model (with id empty), not the single values
- Set id and workflowId
- Persist via interfaceDbChat.createWorkflowStat
- Append to workflow.stats in memory
- Return ChatStat
- Remove storeMessageStat (no message-level stats in new model)
---
## Instrumentation Points (where to emit stats)
1) AI Calls (modules/interfaces/interfaceAiObjects.py)
- Each AI call measures processing time, calculates priceUsd, and tracks bytes sent/received
- Returns standardized AiCallResponse with priceUsd, processingTime, bytesSent, bytesReceived, errorCount included (to adapt pydantic model)
- All AI functions (call, callImage, generateImage, webQuery, etc.) return this standardized response
- Model-specific pricing functions calculate priceUsd based on (processingTime, bytesSent, bytesReceived)
2) Service Layer (modules/services/serviceWorkflow/mainServiceWorkflow.py)
- Receives AiCallResponse from AI interface
- Creates ChatStat objects with data from AiCallResponse
- Emits stats to database via interfaceDbChat.createWorkflowStat
3) Extraction/Generation Services
- services/serviceExtraction/mainServiceExtraction.py and related extractors
- services/serviceGeneration/* (document rendering/generation)
- Emit ChatStat per completed operation with process, engine, processingTime, bytes, priceUsd, etc.
4) Workflow-level summaries (optional)
- At workflow completion, compute a summary stat (aggregated bytes/costs) if desired
---
## AiCallResponse Changes (modules/datamodels/datamodelAi.py)
Update AiCallResponse to include standardized stats fields:
- Remove: usedTokens, costEstimate
- Add: priceUsd: float (calculated price in USD)
- Add: processingTime: float (duration in seconds)
- Add: bytesSent: int (input data size in bytes)
- Add: bytesReceived: int (output data size in bytes)
- Add: errorCount: int (0 for success, 1+ for errors)
## Model-Specific Pricing Functions
Each AI model should implement a pricing calculation function that takes the operation parameters and returns the cost in USD:
- Function signature: `_calculatePriceUsd(processingTime: float, bytesSent: int, bytesReceived: int) -> float`
- Parameters:
- `processingTime`: Duration of the operation in seconds
- `bytesSent`: Size of input data in bytes
- `bytesReceived`: Size of output data in bytes
- Returns: Price in USD as float
Implementation approach:
- Each model class (e.g., OpenAI, Anthropic, etc.) implements its own pricing logic
- Complex pricing models can handle different rates for input/output, time-based costs, etc.
- Models can access their specific pricing configuration (rates, tiers, etc.)
- No fallback! each model MUST have its pricing calculation
Example implementations:
- OpenAI: Token-based pricing with different rates for input/output tokens
- Anthropic: Similar token-based pricing with model-specific rates
- Tavily: Simple per-request pricing
- Custom models: Any complex pricing logic (tiered, time-based, etc.)
---
## Routing/Polling (UI)
- Update route (unified polling) to include stats list for workflow:
- interfaceDbChatObjects.getUnifiedChatData(workflowId): include
- { type: "stat", createdAt: stat_timestamp, item: ChatStat(dict) } for all stats
- UI will receive a complete and growing list of stats with each poll; no special latest logic required.
---
## Step-by-Step Implementation Plan
### Phase 1: Core Data Model Changes
1. **Update ChatStat model** (modules/datamodels/datamodelChat.py)
- Remove: successRate, tokenCount, tokenPriceUnit, tokenPriceAmount, messageId
- Add: process: str, engine: str, priceUsd: float
- Keep: id, workflowId, processingTime, bytesSent, bytesReceived, errorCount
2. **Update ChatWorkflow model** (modules/datamodels/datamodelChat.py)
- Change stats from Optional[ChatStat] to List[ChatStat] (default [])
3. **Update ChatMessage model** (modules/datamodels/datamodelChat.py)
- Remove stats field entirely
- Remove messageId from ChatStat (no longer needed)
4. **Update AiCallResponse model** (modules/datamodels/datamodelAi.py)
- Remove: usedTokens, costEstimate
- Add: priceUsd: float, processingTime: float, bytesSent: int, bytesReceived: int, errorCount: int
### Phase 2: Database Interface Changes
5. **Update interfaceDbChatObjects.py**
- Implement createWorkflowStat(workflow: ChatWorkflow, statData: Dict[str, Any]) -> ChatStat (the function creates a new ChatStat object, persists it to the database, and includes it into the workflows stats list)
- Implement getWorkflowStatsList(workflowId) -> List[ChatStat] (the function returns the list of stats for a given workflow)
- Update getUnifiedChatData to include full stats list
- Remove getWorkflowStats (single latest) - replace with getWorkflowStatsList
### Phase 3: AI Interface Layer Changes
6. **Add model-specific pricing functions** (modules/interfaces/interfaceAiObjects.py)
- Add _calculatePriceUsd() method to each model in aiModels registry
- Implement pricing logic for each connector type (OpenAI, Anthropic, Perplexity, Tavily)
7. **Update all AI call methods** (modules/interfaces/interfaceAiObjects.py)
- Add timing measurement (start/end timestamps)
- Calculate bytesSent (input data size)
- Calculate bytesReceived (output data size)
- Call model-specific _calculatePriceUsd()
- Return standardized AiCallResponse with all stats fields
8. **Update specific AI methods:**
- call() - text generation
- callImage() - image analysis
- generateImage() - image generation
- webQuery() - web research
- All web methods (search_websites, crawl_websites, etc.)
### Phase 4: Service Layer Changes
9. **Update mainServiceWorkflow.py**
- Implement storeWorkflowStat(workflow: ChatWorkflow, aiResponse: AiCallResponse, process: str) -> ChatStat
- Remove storeMessageStat (no message-level stats)
- Update storeWorkflowStat to use new ChatStat fields
10. **Update workflow service methods**
- Replace old stats handling with new storeWorkflowStat calls
- Ensure all AI calls go through storeWorkflowStat
### Phase 5: Extraction/Generation Services
11. **Update serviceExtraction services**
- Add stats emission for extraction operations
- Calculate processingTime, bytesSent, bytesReceived
- Emit ChatStat via storeWorkflowStat
12. **Update serviceGeneration services**
- Add stats emission for generation operations
- Calculate processingTime, bytesSent, bytesReceived
- Emit ChatStat via storeWorkflowStat
### Phase 6: Integration and Testing
13. **Update all AI call sites**
- Replace old stats handling with new AiCallResponse usage
- Ensure all AI calls emit stats via storeWorkflowStat
14. **Update routing/polling**
- Ensure getUnifiedChatData includes full stats list
- Update UI polling to handle stats array
15. **Add comprehensive logging**
- Debug logs for each stat emission (process, engine, bytes, priceUsd)
- Error handling for stats emission failures
### Phase 7: Cleanup and Validation
16. **Remove deprecated code**
- Remove old getWorkflowStats calls
- Remove old updateWorkflowStats calls
- Remove token-based pricing logic
17. **Add validation and tests**
- Test stats emission for all AI operations
- Validate pricing calculations
- Test database operations
18. **Performance optimization**
- Ensure stats emission doesn't impact AI call performance
- Optimize database queries for stats retrieval
---
## Smart Implementation Notes
- **Centralized measurement**: AI interface layer handles all timing and byte counting, ensuring consistency.
- **Standardized responses**: All AI calls return AiCallResponse with complete stats data.
- **Model-specific pricing**: Each model implements `_calculatePriceUsd()` with its own pricing strategy.
- **Service layer simplicity**: Service layer just creates ChatStat objects from AiCallResponse data.
- **Append-only stats**: Simplifies auditing and invoicing; no implicit overwrites.
- **Workflow-scoped stats**: All stats attached to workflow, not messages; UI gets complete history.
- **Performance consideration**: Stats emission should not impact AI call performance.
- **Error handling**: Graceful degradation if stats emission fails; AI calls should still succeed.