6.8 KiB
6.8 KiB
Workflow Polling Logic - Unified Timestamp-Based Selective Data Transfer
Overview
This document describes the improved polling logic for workflow data that uses a unified timestamp-based approach for selective data transfer across all data types (messages, logs, stats).
Core Concept
- Use
_createdAttimestamp of last rendered ITEM (any type: message, log, stat) - Single unified endpoint returns all data types in one chronological list
- Each item identifies its type (message, log, stat)
- Chronological ordering ensures consistent data delivery
Workflow States
1. New Workflow
lastRenderedTimestamp = null- Backend returns all data in chronological order
- Data delivered as unified list with type indicators
2. Next Round in Existing Workflow
lastRenderedTimestamp = _createdAtof last rendered ITEM (message, log, or stat)- Backend returns only data after that timestamp
- Data delivered as unified list in chronological order
3. Loading Existing Workflow
lastRenderedTimestamp = null(load all data)- Backend returns all data in chronological order
- Data delivered as unified list with type indicators
API Design
Single Endpoint
GET /workflows/{id}/chatData?afterTimestamp=1234567890
Response Format
{
"items": [
{
"type": "message",
"createdAt": 1234567890.123,
"item": {
"id": "msg_123",
"workflowId": "wf_456",
"message": "Hello world",
"role": "user",
"status": "first",
"_createdAt": 1234567890.123,
// ... other message fields
}
},
{
"type": "log",
"createdAt": 1234567890.456,
"item": {
"id": "log_789",
"workflowId": "wf_456",
"level": "INFO",
"message": "Workflow started",
"_createdAt": 1234567890.456,
// ... other log fields
}
},
{
"type": "stat",
"createdAt": 1234567890.789,
"item": {
"id": "stat_101",
"workflowId": "wf_456",
"processingTime": 1500,
"tokenCount": 1000,
"_createdAt": 1234567890.789,
// ... other stat fields
}
}
]
}
Frontend Logic
Global Timestamp Tracking
- Global variable
lastRenderedTimestamptracks the_createdAtof the last rendered item - Updated with each rendered item (message, log, stat) as it's displayed
- Always ready for the next polling call
Polling Logic
- Check if rendering is complete (prevent race conditions)
- Use global
lastRenderedTimestampvariable - Call unified endpoint with that timestamp
- Process returned items in chronological order
- Update
lastRenderedTimestampas each item is rendered
Rendering Process
- Iterate through unified list in chronological order
- Route each item to appropriate handler based on
type - Update global
lastRenderedTimestampafter each item is rendered - Trigger next polling only when rendering is complete
Race Condition Prevention
- Polling only starts when previous rendering is complete
- Global timestamp ensures consistent state
- No complex searching for last rendered item needed
Backend Logic
Benefits
- ✅ Single Timestamp - One timestamp controls all data types
- ✅ Chronological Order - All items delivered in correct sequence
- ✅ Unified Processing - Single endpoint, single response format
- ✅ Efficient Polling - Only new data transferred
- ✅ Type Safety - Each item clearly identifies its type
- ✅ Works for All Scenarios - New, existing, and next round workflows
Implementation Steps
Phase 1: Backend Implementation
- Create unified endpoint
/workflows/{id}/chatData - Implement database query with UNION ALL
- Add timestamp filtering for selective data transfer
- Ensure chronological ordering by
_createdAt
Phase 2: Frontend Implementation
- Create global
lastRenderedTimestampvariable - Update polling logic to use unified endpoint
- Implement unified response processing
- Add timestamp tracking to each renderer (message, log, stat)
- Add race condition prevention (polling only when rendering complete)
- Remove complex filtering logic
Phase 3: Testing & Cleanup
- Test all workflow scenarios (new, existing, next round)
- Verify chronological ordering
- Remove old separate endpoints (optional)
- Update documentation
Technical Details
Frontend Implementation Example
// Global timestamp tracking
let lastRenderedTimestamp = null;
let isRendering = false;
// Polling function
async function pollWorkflowData(workflowId) {
// Prevent race conditions
if (isRendering) {
return;
}
// Use global timestamp
const afterTimestamp = lastRenderedTimestamp;
// Call unified endpoint
const response = await api.getWorkflowChatData(workflowId, afterTimestamp);
// Process items in chronological order
isRendering = true;
for (const wrapper of response.items) {
await renderItem(wrapper);
lastRenderedTimestamp = wrapper.createdAt;
}
isRendering = false;
// Trigger next polling
scheduleNextPoll();
}
// Render individual items
async function renderItem(wrapper) {
switch (wrapper.type) {
case 'message':
await renderMessage(wrapper.item);
break;
case 'log':
await renderLog(wrapper.item);
break;
case 'stat':
await renderStat(wrapper.item);
break;
}
}
Timestamp Handling
- Use
_createdAtas all database objects have this attribute - Unix timestamp format (float) for precision
- Null handling for initial loads (load all data)
Error Handling
- Invalid timestamps → return all data
- Missing workflow → return empty list
- Database errors → return appropriate error response
Performance Considerations
- Index on
_createdAtfor efficient timestamp filtering - Limit result size if needed (pagination)
- Caching for frequently accessed data
Migration Strategy
Backward Compatibility
- Keep old endpoints during transition
- Gradual migration of frontend code
- Feature flag to switch between old/new logic
Rollback Plan
- Revert to separate endpoints if issues arise
- Maintain old polling logic as fallback
- Monitor performance during transition
Future Enhancements
Potential Improvements
- Real-time updates via WebSocket
- Delta updates for large datasets
- Compression for large responses
- Caching strategies for better performance
Monitoring
- Track polling frequency
- Monitor response times
- Log data transfer volumes
- Alert on errors
Last updated: 2025-01-08 Version: 1.0