10 KiB
Stats Refactory Concept and Implementation Plan
Date: 2025-10-16 Owner: Workflow/Data Layer
Goals
- Clean, consistent statistics (ChatStat) model and lifecycle.
- Workflow is the authoritative container for stats (similar to logs).
- Simple, predictable DB APIs for reading full stats history per workflow.
- Lightweight instrumentation points at key producers (AI calls, extraction, generation) to emit stats.
Datamodel Changes (modules/datamodels/datamodelChat.py)
-
ChatStat (all fields optional)
- Keep: id, workflowId, processingTime, bytesSent, bytesReceived, errorCount
- Remove: successRate, tokenCount, tokenPriceUnit, tokenPriceAmount
- Add: process: str
- Example values: "action.outlook.readMails", "ai.process.document.name"
- Add: engine: str
- Example values: "ai.anthropic.35", "ai.tavily.basic", "renderer.docx"
- Add: priceUsd: float (calculated price in USD for the operation)
-
ChatWorkflow
- stats: change from Optional[ChatStat] to List[ChatStat] (default [])
-
ChatMessage
- Remove stats field entirely to avoid coupling to message lifecycles.
- Remove messageId from ChatStat (no longer needed)
Rationale:
- Stats are workflow-scope artifacts for billing/analytics. They should not depend on transient message presence.
- Attaching stats to workflow aligns with logs; UI polling returns consolidated workflow data including stats list.
Interface/API Changes (modules/interfaces/interfaceDbChatObjects.py)
- Replace getWorkflowStats(workflowId) returning single latest with:
- getWorkflowStatsList(workflowId) -> List[ChatStat]
- Add createWorkflowStat(statData: Dict[str, Any]) -> ChatStat (write-only per record)
- Keep updateWorkflowStats(...) only if absolutely needed; prefer append-only stats records for auditability
- Update unified polling (getUnifiedChatData) to include full current stats list for workflow for each poll
DB logic:
- Store each stat emission as one row (append-only). No modify unless explicitly required.
- Deletion remains cascading on workflow delete.
Service Layer Changes (modules/services/serviceWorkflow/mainServiceWorkflow.py)
-
Replace storeWorkflowStat(workflow, statData) to:
- Coerce statData to ChatStat fields (process, engine, priceUsd, bytesSent, bytesReceived, processingTime, errorCount) --> to handover pydantic model (with id empty), not the single values
- Set id and workflowId
- Persist via interfaceDbChat.createWorkflowStat
- Append to workflow.stats in memory
- Return ChatStat
-
Remove storeMessageStat (no message-level stats in new model)
Instrumentation Points (where to emit stats)
-
AI Calls (modules/interfaces/interfaceAiObjects.py)
- Each AI call measures processing time, calculates priceUsd, and tracks bytes sent/received
- Returns standardized AiCallResponse with priceUsd, processingTime, bytesSent, bytesReceived, errorCount included (to adapt pydantic model)
- All AI functions (call, callImage, generateImage, webQuery, etc.) return this standardized response
- Model-specific pricing functions calculate priceUsd based on (processingTime, bytesSent, bytesReceived)
-
Service Layer (modules/services/serviceWorkflow/mainServiceWorkflow.py)
- Receives AiCallResponse from AI interface
- Creates ChatStat objects with data from AiCallResponse
- Emits stats to database via interfaceDbChat.createWorkflowStat
-
Extraction/Generation Services
- services/serviceExtraction/mainServiceExtraction.py and related extractors
- services/serviceGeneration/* (document rendering/generation)
- Emit ChatStat per completed operation with process, engine, processingTime, bytes, priceUsd, etc.
-
Workflow-level summaries (optional)
- At workflow completion, compute a summary stat (aggregated bytes/costs) if desired
AiCallResponse Changes (modules/datamodels/datamodelAi.py)
Update AiCallResponse to include standardized stats fields:
- Remove: usedTokens, costEstimate
- Add: priceUsd: float (calculated price in USD)
- Add: processingTime: float (duration in seconds)
- Add: bytesSent: int (input data size in bytes)
- Add: bytesReceived: int (output data size in bytes)
- Add: errorCount: int (0 for success, 1+ for errors)
Model-Specific Pricing Functions
Each AI model should implement a pricing calculation function that takes the operation parameters and returns the cost in USD:
- Function signature:
_calculatePriceUsd(processingTime: float, bytesSent: int, bytesReceived: int) -> float - Parameters:
processingTime: Duration of the operation in secondsbytesSent: Size of input data in bytesbytesReceived: Size of output data in bytes
- Returns: Price in USD as float
Implementation approach:
- Each model class (e.g., OpenAI, Anthropic, etc.) implements its own pricing logic
- Complex pricing models can handle different rates for input/output, time-based costs, etc.
- Models can access their specific pricing configuration (rates, tiers, etc.)
- No fallback! each model MUST have its pricing calculation
Example implementations:
- OpenAI: Token-based pricing with different rates for input/output tokens
- Anthropic: Similar token-based pricing with model-specific rates
- Tavily: Simple per-request pricing
- Custom models: Any complex pricing logic (tiered, time-based, etc.)
Routing/Polling (UI)
-
Update route (unified polling) to include stats list for workflow:
- interfaceDbChatObjects.getUnifiedChatData(workflowId): include
- { type: "stat", createdAt: stat_timestamp, item: ChatStat(dict) } for all stats
- interfaceDbChatObjects.getUnifiedChatData(workflowId): include
-
UI will receive a complete and growing list of stats with each poll; no special latest logic required.
Step-by-Step Implementation Plan
Phase 1: Core Data Model Changes
-
Update ChatStat model (modules/datamodels/datamodelChat.py)
- Remove: successRate, tokenCount, tokenPriceUnit, tokenPriceAmount, messageId
- Add: process: str, engine: str, priceUsd: float
- Keep: id, workflowId, processingTime, bytesSent, bytesReceived, errorCount
-
Update ChatWorkflow model (modules/datamodels/datamodelChat.py)
- Change stats from Optional[ChatStat] to List[ChatStat] (default [])
-
Update ChatMessage model (modules/datamodels/datamodelChat.py)
- Remove stats field entirely
- Remove messageId from ChatStat (no longer needed)
-
Update AiCallResponse model (modules/datamodels/datamodelAi.py)
- Remove: usedTokens, costEstimate
- Add: priceUsd: float, processingTime: float, bytesSent: int, bytesReceived: int, errorCount: int
Phase 2: Database Interface Changes
- Update interfaceDbChatObjects.py
- Implement createWorkflowStat(workflow: ChatWorkflow, statData: Dict[str, Any]) -> ChatStat (the function creates a new ChatStat object, persists it to the database, and includes it into the workflows stats list)
- Implement getWorkflowStatsList(workflowId) -> List[ChatStat] (the function returns the list of stats for a given workflow)
- Update getUnifiedChatData to include full stats list
- Remove getWorkflowStats (single latest) - replace with getWorkflowStatsList
Phase 3: AI Interface Layer Changes
-
Add model-specific pricing functions (modules/interfaces/interfaceAiObjects.py)
- Add _calculatePriceUsd() method to each model in aiModels registry
- Implement pricing logic for each connector type (OpenAI, Anthropic, Perplexity, Tavily)
-
Update all AI call methods (modules/interfaces/interfaceAiObjects.py)
- Add timing measurement (start/end timestamps)
- Calculate bytesSent (input data size)
- Calculate bytesReceived (output data size)
- Call model-specific _calculatePriceUsd()
- Return standardized AiCallResponse with all stats fields
-
Update specific AI methods:
- call() - text generation
- callImage() - image analysis
- generateImage() - image generation
- webQuery() - web research
- All web methods (search_websites, crawl_websites, etc.)
Phase 4: Service Layer Changes
-
Update mainServiceWorkflow.py
- Implement storeWorkflowStat(workflow: ChatWorkflow, aiResponse: AiCallResponse, process: str) -> ChatStat
- Remove storeMessageStat (no message-level stats)
- Update storeWorkflowStat to use new ChatStat fields
-
Update workflow service methods
- Replace old stats handling with new storeWorkflowStat calls
- Ensure all AI calls go through storeWorkflowStat
Phase 5: Extraction/Generation Services
-
Update serviceExtraction services
- Add stats emission for extraction operations
- Calculate processingTime, bytesSent, bytesReceived
- Emit ChatStat via storeWorkflowStat
-
Update serviceGeneration services
- Add stats emission for generation operations
- Calculate processingTime, bytesSent, bytesReceived
- Emit ChatStat via storeWorkflowStat
Phase 6: Integration and Testing
-
Update all AI call sites
- Replace old stats handling with new AiCallResponse usage
- Ensure all AI calls emit stats via storeWorkflowStat
-
Update routing/polling
- Ensure getUnifiedChatData includes full stats list
- Update UI polling to handle stats array
-
Add comprehensive logging
- Debug logs for each stat emission (process, engine, bytes, priceUsd)
- Error handling for stats emission failures
Phase 7: Cleanup and Validation
-
Remove deprecated code
- Remove old getWorkflowStats calls
- Remove old updateWorkflowStats calls
- Remove token-based pricing logic
-
Add validation and tests
- Test stats emission for all AI operations
- Validate pricing calculations
- Test database operations
-
Performance optimization
- Ensure stats emission doesn't impact AI call performance
- Optimize database queries for stats retrieval
Smart Implementation Notes
- Centralized measurement: AI interface layer handles all timing and byte counting, ensuring consistency.
- Standardized responses: All AI calls return AiCallResponse with complete stats data.
- Model-specific pricing: Each model implements
_calculatePriceUsd()with its own pricing strategy. - Service layer simplicity: Service layer just creates ChatStat objects from AiCallResponse data.
- Append-only stats: Simplifies auditing and invoicing; no implicit overwrites.
- Workflow-scoped stats: All stats attached to workflow, not messages; UI gets complete history.
- Performance consideration: Stats emission should not impact AI call performance.
- Error handling: Graceful degradation if stats emission fails; AI calls should still succeed.