# Workflow Polling Logic - Unified Timestamp-Based Selective Data Transfer ## Overview This document describes the improved polling logic for workflow data that uses a unified timestamp-based approach for selective data transfer across all data types (messages, logs, stats). ## Core Concept - **Use `_createdAt` timestamp** of last rendered **ITEM** (any type: message, log, stat) - **Single unified endpoint** returns all data types in one chronological list - **Each item identifies its type** (message, log, stat) - **Chronological ordering** ensures consistent data delivery ## Workflow States ### 1. New Workflow - `lastRenderedTimestamp = null` - Backend returns **all data** in chronological order - Data delivered as unified list with type indicators ### 2. Next Round in Existing Workflow - `lastRenderedTimestamp = _createdAt` of last rendered **ITEM** (message, log, or stat) - Backend returns **only data after that timestamp** - Data delivered as unified list in chronological order ### 3. Loading Existing Workflow - `lastRenderedTimestamp = null` (load all data) - Backend returns **all data** in chronological order - Data delivered as unified list with type indicators ## API Design ### Single Endpoint ``` GET /workflows/{id}/chatData?afterTimestamp=1234567890 ``` ### Response Format ```json { "items": [ { "type": "message", "createdAt": 1234567890.123, "item": { "id": "msg_123", "workflowId": "wf_456", "message": "Hello world", "role": "user", "status": "first", "_createdAt": 1234567890.123, // ... other message fields } }, { "type": "log", "createdAt": 1234567890.456, "item": { "id": "log_789", "workflowId": "wf_456", "level": "INFO", "message": "Workflow started", "_createdAt": 1234567890.456, // ... other log fields } }, { "type": "stat", "createdAt": 1234567890.789, "item": { "id": "stat_101", "workflowId": "wf_456", "processingTime": 1500, "tokenCount": 1000, "_createdAt": 1234567890.789, // ... other stat fields } } ] } ``` ## Frontend Logic ### Global Timestamp Tracking - **Global variable `lastRenderedTimestamp`** tracks the `_createdAt` of the last rendered item - **Updated with each rendered item** (message, log, stat) as it's displayed - **Always ready** for the next polling call ### Polling Logic 1. **Check if rendering is complete** (prevent race conditions) 2. **Use global `lastRenderedTimestamp`** variable 3. **Call unified endpoint** with that timestamp 4. **Process returned items** in chronological order 5. **Update `lastRenderedTimestamp`** as each item is rendered ### Rendering Process 1. **Iterate through unified list** in chronological order 2. **Route each item** to appropriate handler based on `type` 3. **Update global `lastRenderedTimestamp`** after each item is rendered 4. **Trigger next polling** only when rendering is complete ### Race Condition Prevention - **Polling only starts** when previous rendering is complete - **Global timestamp** ensures consistent state - **No complex searching** for last rendered item needed ## Backend Logic ## Benefits 1. **✅ Single Timestamp** - One timestamp controls all data types 2. **✅ Chronological Order** - All items delivered in correct sequence 3. **✅ Unified Processing** - Single endpoint, single response format 4. **✅ Efficient Polling** - Only new data transferred 5. **✅ Type Safety** - Each item clearly identifies its type 6. **✅ Works for All Scenarios** - New, existing, and next round workflows ## Implementation Steps ### Phase 1: Backend Implementation 1. **Create unified endpoint** `/workflows/{id}/chatData` 2. **Implement database query** with UNION ALL 3. **Add timestamp filtering** for selective data transfer 4. **Ensure chronological ordering** by `_createdAt` ### Phase 2: Frontend Implementation 1. **Create global `lastRenderedTimestamp` variable** 2. **Update polling logic** to use unified endpoint 3. **Implement unified response processing** 4. **Add timestamp tracking** to each renderer (message, log, stat) 5. **Add race condition prevention** (polling only when rendering complete) 6. **Remove complex filtering logic** ### Phase 3: Testing & Cleanup 1. **Test all workflow scenarios** (new, existing, next round) 2. **Verify chronological ordering** 3. **Remove old separate endpoints** (optional) 4. **Update documentation** ## Technical Details ### Frontend Implementation Example ```javascript // Global timestamp tracking let lastRenderedTimestamp = null; let isRendering = false; // Polling function async function pollWorkflowData(workflowId) { // Prevent race conditions if (isRendering) { return; } // Use global timestamp const afterTimestamp = lastRenderedTimestamp; // Call unified endpoint const response = await api.getWorkflowChatData(workflowId, afterTimestamp); // Process items in chronological order isRendering = true; for (const wrapper of response.items) { await renderItem(wrapper); lastRenderedTimestamp = wrapper.createdAt; } isRendering = false; // Trigger next polling scheduleNextPoll(); } // Render individual items async function renderItem(wrapper) { switch (wrapper.type) { case 'message': await renderMessage(wrapper.item); break; case 'log': await renderLog(wrapper.item); break; case 'stat': await renderStat(wrapper.item); break; } } ``` ### Timestamp Handling - **Use `_createdAt`** as all database objects have this attribute - **Unix timestamp format** (float) for precision - **Null handling** for initial loads (load all data) ### Error Handling - **Invalid timestamps** → return all data - **Missing workflow** → return empty list - **Database errors** → return appropriate error response ### Performance Considerations - **Index on `_createdAt`** for efficient timestamp filtering - **Limit result size** if needed (pagination) - **Caching** for frequently accessed data ## Migration Strategy ### Backward Compatibility - **Keep old endpoints** during transition - **Gradual migration** of frontend code - **Feature flag** to switch between old/new logic ### Rollback Plan - **Revert to separate endpoints** if issues arise - **Maintain old polling logic** as fallback - **Monitor performance** during transition ## Future Enhancements ### Potential Improvements - **Real-time updates** via WebSocket - **Delta updates** for large datasets - **Compression** for large responses - **Caching strategies** for better performance ### Monitoring - **Track polling frequency** - **Monitor response times** - **Log data transfer volumes** - **Alert on errors** --- *Last updated: 2025-01-08* *Version: 1.0*