236 lines
6.8 KiB
Markdown
236 lines
6.8 KiB
Markdown
# Workflow Polling Logic - Unified Timestamp-Based Selective Data Transfer
|
|
|
|
## Overview
|
|
|
|
This document describes the improved polling logic for workflow data that uses a unified timestamp-based approach for selective data transfer across all data types (messages, logs, stats).
|
|
|
|
## Core Concept
|
|
|
|
- **Use `_createdAt` timestamp** of last rendered **ITEM** (any type: message, log, stat)
|
|
- **Single unified endpoint** returns all data types in one chronological list
|
|
- **Each item identifies its type** (message, log, stat)
|
|
- **Chronological ordering** ensures consistent data delivery
|
|
|
|
## Workflow States
|
|
|
|
### 1. New Workflow
|
|
- `lastRenderedTimestamp = null`
|
|
- Backend returns **all data** in chronological order
|
|
- Data delivered as unified list with type indicators
|
|
|
|
### 2. Next Round in Existing Workflow
|
|
- `lastRenderedTimestamp = _createdAt` of last rendered **ITEM** (message, log, or stat)
|
|
- Backend returns **only data after that timestamp**
|
|
- Data delivered as unified list in chronological order
|
|
|
|
### 3. Loading Existing Workflow
|
|
- `lastRenderedTimestamp = null` (load all data)
|
|
- Backend returns **all data** in chronological order
|
|
- Data delivered as unified list with type indicators
|
|
|
|
## API Design
|
|
|
|
### Single Endpoint
|
|
```
|
|
GET /workflows/{id}/chatData?afterTimestamp=1234567890
|
|
```
|
|
|
|
### Response Format
|
|
```json
|
|
{
|
|
"items": [
|
|
{
|
|
"type": "message",
|
|
"createdAt": 1234567890.123,
|
|
"item": {
|
|
"id": "msg_123",
|
|
"workflowId": "wf_456",
|
|
"message": "Hello world",
|
|
"role": "user",
|
|
"status": "first",
|
|
"_createdAt": 1234567890.123,
|
|
// ... other message fields
|
|
}
|
|
},
|
|
{
|
|
"type": "log",
|
|
"createdAt": 1234567890.456,
|
|
"item": {
|
|
"id": "log_789",
|
|
"workflowId": "wf_456",
|
|
"level": "INFO",
|
|
"message": "Workflow started",
|
|
"_createdAt": 1234567890.456,
|
|
// ... other log fields
|
|
}
|
|
},
|
|
{
|
|
"type": "stat",
|
|
"createdAt": 1234567890.789,
|
|
"item": {
|
|
"id": "stat_101",
|
|
"workflowId": "wf_456",
|
|
"processingTime": 1500,
|
|
"tokenCount": 1000,
|
|
"_createdAt": 1234567890.789,
|
|
// ... other stat fields
|
|
}
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
## Frontend Logic
|
|
|
|
### Global Timestamp Tracking
|
|
- **Global variable `lastRenderedTimestamp`** tracks the `_createdAt` of the last rendered item
|
|
- **Updated with each rendered item** (message, log, stat) as it's displayed
|
|
- **Always ready** for the next polling call
|
|
|
|
### Polling Logic
|
|
1. **Check if rendering is complete** (prevent race conditions)
|
|
2. **Use global `lastRenderedTimestamp`** variable
|
|
3. **Call unified endpoint** with that timestamp
|
|
4. **Process returned items** in chronological order
|
|
5. **Update `lastRenderedTimestamp`** as each item is rendered
|
|
|
|
### Rendering Process
|
|
1. **Iterate through unified list** in chronological order
|
|
2. **Route each item** to appropriate handler based on `type`
|
|
3. **Update global `lastRenderedTimestamp`** after each item is rendered
|
|
4. **Trigger next polling** only when rendering is complete
|
|
|
|
### Race Condition Prevention
|
|
- **Polling only starts** when previous rendering is complete
|
|
- **Global timestamp** ensures consistent state
|
|
- **No complex searching** for last rendered item needed
|
|
|
|
## Backend Logic
|
|
|
|
|
|
## Benefits
|
|
|
|
1. **✅ Single Timestamp** - One timestamp controls all data types
|
|
2. **✅ Chronological Order** - All items delivered in correct sequence
|
|
3. **✅ Unified Processing** - Single endpoint, single response format
|
|
4. **✅ Efficient Polling** - Only new data transferred
|
|
5. **✅ Type Safety** - Each item clearly identifies its type
|
|
6. **✅ Works for All Scenarios** - New, existing, and next round workflows
|
|
|
|
## Implementation Steps
|
|
|
|
### Phase 1: Backend Implementation
|
|
1. **Create unified endpoint** `/workflows/{id}/chatData`
|
|
2. **Implement database query** with UNION ALL
|
|
3. **Add timestamp filtering** for selective data transfer
|
|
4. **Ensure chronological ordering** by `_createdAt`
|
|
|
|
### Phase 2: Frontend Implementation
|
|
1. **Create global `lastRenderedTimestamp` variable**
|
|
2. **Update polling logic** to use unified endpoint
|
|
3. **Implement unified response processing**
|
|
4. **Add timestamp tracking** to each renderer (message, log, stat)
|
|
5. **Add race condition prevention** (polling only when rendering complete)
|
|
6. **Remove complex filtering logic**
|
|
|
|
### Phase 3: Testing & Cleanup
|
|
1. **Test all workflow scenarios** (new, existing, next round)
|
|
2. **Verify chronological ordering**
|
|
3. **Remove old separate endpoints** (optional)
|
|
4. **Update documentation**
|
|
|
|
## Technical Details
|
|
|
|
### Frontend Implementation Example
|
|
|
|
```javascript
|
|
// Global timestamp tracking
|
|
let lastRenderedTimestamp = null;
|
|
let isRendering = false;
|
|
|
|
// Polling function
|
|
async function pollWorkflowData(workflowId) {
|
|
// Prevent race conditions
|
|
if (isRendering) {
|
|
return;
|
|
}
|
|
|
|
// Use global timestamp
|
|
const afterTimestamp = lastRenderedTimestamp;
|
|
|
|
// Call unified endpoint
|
|
const response = await api.getWorkflowChatData(workflowId, afterTimestamp);
|
|
|
|
// Process items in chronological order
|
|
isRendering = true;
|
|
for (const wrapper of response.items) {
|
|
await renderItem(wrapper);
|
|
lastRenderedTimestamp = wrapper.createdAt;
|
|
}
|
|
isRendering = false;
|
|
|
|
// Trigger next polling
|
|
scheduleNextPoll();
|
|
}
|
|
|
|
// Render individual items
|
|
async function renderItem(wrapper) {
|
|
switch (wrapper.type) {
|
|
case 'message':
|
|
await renderMessage(wrapper.item);
|
|
break;
|
|
case 'log':
|
|
await renderLog(wrapper.item);
|
|
break;
|
|
case 'stat':
|
|
await renderStat(wrapper.item);
|
|
break;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Timestamp Handling
|
|
- **Use `_createdAt`** as all database objects have this attribute
|
|
- **Unix timestamp format** (float) for precision
|
|
- **Null handling** for initial loads (load all data)
|
|
|
|
### Error Handling
|
|
- **Invalid timestamps** → return all data
|
|
- **Missing workflow** → return empty list
|
|
- **Database errors** → return appropriate error response
|
|
|
|
### Performance Considerations
|
|
- **Index on `_createdAt`** for efficient timestamp filtering
|
|
- **Limit result size** if needed (pagination)
|
|
- **Caching** for frequently accessed data
|
|
|
|
## Migration Strategy
|
|
|
|
### Backward Compatibility
|
|
- **Keep old endpoints** during transition
|
|
- **Gradual migration** of frontend code
|
|
- **Feature flag** to switch between old/new logic
|
|
|
|
### Rollback Plan
|
|
- **Revert to separate endpoints** if issues arise
|
|
- **Maintain old polling logic** as fallback
|
|
- **Monitor performance** during transition
|
|
|
|
## Future Enhancements
|
|
|
|
### Potential Improvements
|
|
- **Real-time updates** via WebSocket
|
|
- **Delta updates** for large datasets
|
|
- **Compression** for large responses
|
|
- **Caching strategies** for better performance
|
|
|
|
### Monitoring
|
|
- **Track polling frequency**
|
|
- **Monitor response times**
|
|
- **Log data transfer volumes**
|
|
- **Alert on errors**
|
|
|
|
---
|
|
|
|
*Last updated: 2025-01-08*
|
|
*Version: 1.0*
|