ui-nyla/src/hooks/playground/useWorkflowPolling.ts

205 lines
5.5 KiB
TypeScript

import { useRef, useCallback } from 'react';
interface PollingState {
activeWorkflowId: string | null;
isPolling: boolean;
isPollInProgress: boolean;
isPaused: boolean;
currentInterval: number;
failureCount: number;
rateLimitFailureCount: number;
timeoutId: NodeJS.Timeout | null;
}
const BASE_INTERVAL = 5000; // 5 seconds
const MAX_INTERVAL = 10000; // 10 seconds
const BACKOFF_MULTIPLIER = 1.5;
const RATE_LIMIT_BACKOFF_MULTIPLIER = 2.0;
const MAX_RATE_LIMIT_FAILURES = 5;
export type PollCallback = (workflowId: string) => Promise<void>;
export function useWorkflowPolling() {
const stateRef = useRef<PollingState>({
activeWorkflowId: null,
isPolling: false,
isPollInProgress: false,
isPaused: false,
currentInterval: BASE_INTERVAL,
failureCount: 0,
rateLimitFailureCount: 0,
timeoutId: null
});
const pollCallbackRef = useRef<PollCallback | null>(null);
const calculateInterval = useCallback((isRateLimit: boolean = false): number => {
const state = stateRef.current;
const multiplier = isRateLimit ? RATE_LIMIT_BACKOFF_MULTIPLIER : BACKOFF_MULTIPLIER;
const newInterval = Math.min(
BASE_INTERVAL * Math.pow(multiplier, state.failureCount),
MAX_INTERVAL
);
return Math.floor(newInterval);
}, []);
const scheduleNextPoll = useCallback((interval: number) => {
const state = stateRef.current;
// Clear any existing timeout
if (state.timeoutId) {
clearTimeout(state.timeoutId);
state.timeoutId = null;
}
// Don't schedule if not polling or paused
if (!state.isPolling || state.isPaused || !state.activeWorkflowId) {
return;
}
// Schedule next poll
state.timeoutId = setTimeout(() => {
state.timeoutId = null;
doPolling();
}, interval);
}, []);
const doPolling = useCallback(async () => {
const state = stateRef.current;
// Prevent concurrent polls
if (state.isPollInProgress) {
return;
}
// Validate workflow is still active
if (!state.activeWorkflowId || !state.isPolling || state.isPaused) {
return;
}
const workflowId = state.activeWorkflowId;
state.isPollInProgress = true;
try {
if (pollCallbackRef.current) {
await pollCallbackRef.current(workflowId);
}
// Success - reset failure counts and interval
state.failureCount = 0;
state.rateLimitFailureCount = 0;
state.currentInterval = BASE_INTERVAL;
// Schedule next poll
scheduleNextPoll(state.currentInterval);
} catch (error: any) {
// Handle errors
const isRateLimit = error?.status === 429 || error?.response?.status === 429;
if (isRateLimit) {
state.rateLimitFailureCount++;
// Stop polling after too many rate limit errors
if (state.rateLimitFailureCount >= MAX_RATE_LIMIT_FAILURES) {
console.error('Too many rate limit errors, stopping polling');
stopPolling();
return;
}
} else {
state.rateLimitFailureCount = 0; // Reset rate limit count on non-rate-limit errors
}
state.failureCount++;
const nextInterval = calculateInterval(isRateLimit);
state.currentInterval = nextInterval;
console.warn(`Polling error (attempt ${state.failureCount}):`, error);
// Schedule next poll with backoff
scheduleNextPoll(nextInterval);
} finally {
state.isPollInProgress = false;
}
}, [scheduleNextPoll, calculateInterval]);
const startPolling = useCallback((workflowId: string, callback: PollCallback) => {
const state = stateRef.current;
// Stop any existing polling
if (state.isPolling) {
stopPolling();
}
// Validate workflow ID
if (!workflowId || typeof workflowId !== 'string') {
console.error('Invalid workflow ID for polling:', workflowId);
return;
}
// Set up polling state
state.activeWorkflowId = workflowId;
state.isPolling = true;
state.isPaused = false;
state.failureCount = 0;
state.rateLimitFailureCount = 0;
state.currentInterval = BASE_INTERVAL;
pollCallbackRef.current = callback;
// Execute immediate first poll (no delay)
doPolling();
}, [doPolling]);
const stopPolling = useCallback(() => {
const state = stateRef.current;
// Clear timeout
if (state.timeoutId) {
clearTimeout(state.timeoutId);
state.timeoutId = null;
}
// Reset state
state.isPolling = false;
state.isPollInProgress = false;
state.activeWorkflowId = null;
state.failureCount = 0;
state.rateLimitFailureCount = 0;
state.currentInterval = BASE_INTERVAL;
state.isPaused = false;
pollCallbackRef.current = null;
}, []);
const pausePolling = useCallback(() => {
const state = stateRef.current;
state.isPaused = true;
}, []);
const resumePolling = useCallback(() => {
const state = stateRef.current;
if (state.isPolling && state.isPaused) {
state.isPaused = false;
// Resume polling immediately
if (!state.isPollInProgress) {
scheduleNextPoll(0);
}
}
}, [scheduleNextPoll]);
const isPolling = useCallback((): boolean => {
return stateRef.current.isPolling && !stateRef.current.isPaused;
}, []);
const getActiveWorkflowId = useCallback((): string | null => {
return stateRef.current.activeWorkflowId;
}, []);
return {
startPolling,
stopPolling,
pausePolling,
resumePolling,
isPolling,
getActiveWorkflowId
};
}