# Stats Refactory Concept and Implementation Plan Date: 2025-10-16 Owner: Workflow/Data Layer ## Goals - Clean, consistent statistics (ChatStat) model and lifecycle. - Workflow is the authoritative container for stats (similar to logs). - Simple, predictable DB APIs for reading full stats history per workflow. - Lightweight instrumentation points at key producers (AI calls, extraction, generation) to emit stats. --- ## Datamodel Changes (modules/datamodels/datamodelChat.py) - ChatStat (all fields optional) - Keep: id, workflowId, processingTime, bytesSent, bytesReceived, errorCount - Remove: successRate, tokenCount, tokenPriceUnit, tokenPriceAmount - Add: process: str - Example values: "action.outlook.readMails", "ai.process.document.name" - Add: engine: str - Example values: "ai.anthropic.35", "ai.tavily.basic", "renderer.docx" - Add: priceCHF: float (calculated price in USD for the operation) - ChatWorkflow - stats: change from Optional[ChatStat] to List[ChatStat] (default []) - ChatMessage - Remove stats field entirely to avoid coupling to message lifecycles. - Remove messageId from ChatStat (no longer needed) Rationale: - Stats are workflow-scope artifacts for billing/analytics. They should not depend on transient message presence. - Attaching stats to workflow aligns with logs; UI polling returns consolidated workflow data including stats list. --- ## Interface/API Changes (modules/interfaces/interfaceDbChatObjects.py) - Replace getWorkflowStats(workflowId) returning single latest with: - getWorkflowStatsList(workflowId) -> List[ChatStat] - Add createWorkflowStat(statData: Dict[str, Any]) -> ChatStat (write-only per record) - Keep updateWorkflowStats(...) only if absolutely needed; prefer append-only stats records for auditability - Update unified polling (getUnifiedChatData) to include full current stats list for workflow for each poll DB logic: - Store each stat emission as one row (append-only). No modify unless explicitly required. - Deletion remains cascading on workflow delete. --- ## Service Layer Changes (modules/services/serviceWorkflow/mainServiceWorkflow.py) - Replace storeWorkflowStat(workflow, statData) to: - Coerce statData to ChatStat fields (process, engine, priceCHF, bytesSent, bytesReceived, processingTime, errorCount) --> to handover pydantic model (with id empty), not the single values - Set id and workflowId - Persist via interfaceDbChat.createWorkflowStat - Append to workflow.stats in memory - Return ChatStat - Remove storeMessageStat (no message-level stats in new model) --- ## Instrumentation Points (where to emit stats) 1) AI Calls (modules/interfaces/interfaceAiObjects.py) - Each AI call measures processing time, calculates priceCHF, and tracks bytes sent/received - Returns standardized AiCallResponse with priceCHF, processingTime, bytesSent, bytesReceived, errorCount included (to adapt pydantic model) - All AI functions (call, callImage, generateImage, webQuery, etc.) return this standardized response - Model-specific pricing functions calculate priceCHF based on (processingTime, bytesSent, bytesReceived) 2) Service Layer (modules/services/serviceWorkflow/mainServiceWorkflow.py) - Receives AiCallResponse from AI interface - Creates ChatStat objects with data from AiCallResponse - Emits stats to database via interfaceDbChat.createWorkflowStat 3) Extraction/Generation Services - services/serviceExtraction/mainServiceExtraction.py and related extractors - services/serviceGeneration/* (document rendering/generation) - Emit ChatStat per completed operation with process, engine, processingTime, bytes, priceCHF, etc. 4) Workflow-level summaries (optional) - At workflow completion, compute a summary stat (aggregated bytes/costs) if desired --- ## AiCallResponse Changes (modules/datamodels/datamodelAi.py) Update AiCallResponse to include standardized stats fields: - Remove: usedTokens, costEstimate - Add: priceCHF: float (calculated price in USD) - Add: processingTime: float (duration in seconds) - Add: bytesSent: int (input data size in bytes) - Add: bytesReceived: int (output data size in bytes) - Add: errorCount: int (0 for success, 1+ for errors) ## Model-Specific Pricing Functions Each AI model should implement a pricing calculation function that takes the operation parameters and returns the cost in USD: - Function signature: `_calculatepriceCHF(processingTime: float, bytesSent: int, bytesReceived: int) -> float` - Parameters: - `processingTime`: Duration of the operation in seconds - `bytesSent`: Size of input data in bytes - `bytesReceived`: Size of output data in bytes - Returns: Price in USD as float Implementation approach: - Each model class (e.g., OpenAI, Anthropic, etc.) implements its own pricing logic - Complex pricing models can handle different rates for input/output, time-based costs, etc. - Models can access their specific pricing configuration (rates, tiers, etc.) - No fallback! each model MUST have its pricing calculation Example implementations: - OpenAI: Token-based pricing with different rates for input/output tokens - Anthropic: Similar token-based pricing with model-specific rates - Tavily: Simple per-request pricing - Custom models: Any complex pricing logic (tiered, time-based, etc.) --- ## Routing/Polling (UI) - Update route (unified polling) to include stats list for workflow: - interfaceDbChatObjects.getUnifiedChatData(workflowId): include - { type: "stat", createdAt: stat_timestamp, item: ChatStat(dict) } for all stats - UI will receive a complete and growing list of stats with each poll; no special latest logic required. --- ## Step-by-Step Implementation Plan ### Phase 1: Core Data Model Changes 1. **Update ChatStat model** (modules/datamodels/datamodelChat.py) - Remove: successRate, tokenCount, tokenPriceUnit, tokenPriceAmount, messageId - Add: process: str, engine: str, priceCHF: float - Keep: id, workflowId, processingTime, bytesSent, bytesReceived, errorCount 2. **Update ChatWorkflow model** (modules/datamodels/datamodelChat.py) - Change stats from Optional[ChatStat] to List[ChatStat] (default []) 3. **Update ChatMessage model** (modules/datamodels/datamodelChat.py) - Remove stats field entirely - Remove messageId from ChatStat (no longer needed) 4. **Update AiCallResponse model** (modules/datamodels/datamodelAi.py) - Remove: usedTokens, costEstimate - Add: priceCHF: float, processingTime: float, bytesSent: int, bytesReceived: int, errorCount: int ### Phase 2: Database Interface Changes 5. **Update interfaceDbChatObjects.py** - Implement createWorkflowStat(workflow: ChatWorkflow, statData: Dict[str, Any]) -> ChatStat (the function creates a new ChatStat object, persists it to the database, and includes it into the workflows stats list) - Implement getWorkflowStatsList(workflowId) -> List[ChatStat] (the function returns the list of stats for a given workflow) - Update getUnifiedChatData to include full stats list - Remove getWorkflowStats (single latest) - replace with getWorkflowStatsList ### Phase 3: AI Interface Layer Changes 6. **Add model-specific pricing functions** (modules/interfaces/interfaceAiObjects.py) - Add _calculatepriceCHF() method to each model in aiModels registry - Implement pricing logic for each connector type (OpenAI, Anthropic, Perplexity, Tavily) 7. **Update all AI call methods** (modules/interfaces/interfaceAiObjects.py) - Add timing measurement (start/end timestamps) - Calculate bytesSent (input data size) - Calculate bytesReceived (output data size) - Call model-specific _calculatepriceCHF() - Return standardized AiCallResponse with all stats fields 8. **Update specific AI methods:** - call() - text generation - callImage() - image analysis - generateImage() - image generation - webQuery() - web research - All web methods (search_websites, crawl_websites, etc.) ### Phase 4: Service Layer Changes 9. **Update mainServiceWorkflow.py** - Implement storeWorkflowStat(workflow: ChatWorkflow, aiResponse: AiCallResponse, process: str) -> ChatStat - Remove storeMessageStat (no message-level stats) - Update storeWorkflowStat to use new ChatStat fields 10. **Update workflow service methods** - Replace old stats handling with new storeWorkflowStat calls - Ensure all AI calls go through storeWorkflowStat ### Phase 5: Extraction/Generation Services 11. **Update serviceExtraction services** - Add stats emission for extraction operations - Calculate processingTime, bytesSent, bytesReceived - Emit ChatStat via storeWorkflowStat 12. **Update serviceGeneration services** - Add stats emission for generation operations - Calculate processingTime, bytesSent, bytesReceived - Emit ChatStat via storeWorkflowStat ### Phase 6: Integration and Testing 13. **Update all AI call sites** - Replace old stats handling with new AiCallResponse usage - Ensure all AI calls emit stats via storeWorkflowStat 14. **Update routing/polling** - Ensure getUnifiedChatData includes full stats list - Update UI polling to handle stats array 15. **Add comprehensive logging** - Debug logs for each stat emission (process, engine, bytes, priceCHF) - Error handling for stats emission failures ### Phase 7: Cleanup and Validation 16. **Remove deprecated code** - Remove old getWorkflowStats calls - Remove old updateWorkflowStats calls - Remove token-based pricing logic 17. **Add validation and tests** - Test stats emission for all AI operations - Validate pricing calculations - Test database operations 18. **Performance optimization** - Ensure stats emission doesn't impact AI call performance - Optimize database queries for stats retrieval --- ## Smart Implementation Notes - **Centralized measurement**: AI interface layer handles all timing and byte counting, ensuring consistency. - **Standardized responses**: All AI calls return AiCallResponse with complete stats data. - **Model-specific pricing**: Each model implements `_calculatepriceCHF()` with its own pricing strategy. - **Service layer simplicity**: Service layer just creates ChatStat objects from AiCallResponse data. - **Append-only stats**: Simplifies auditing and invoicing; no implicit overwrites. - **Workflow-scoped stats**: All stats attached to workflow, not messages; UI gets complete history. - **Performance consideration**: Stats emission should not impact AI call performance. - **Error handling**: Graceful degradation if stats emission fails; AI calls should still succeed.