/** * Shared SSE Client Utility * * Generic fetch-based SSE streaming for POST requests with JSON body. * Extracted from useCodeEditor.ts and chatbotApi.ts to provide a single * reusable SSE implementation across all workspace features. */ import { addCSRFTokenToHeaders, getCSRFToken, generateAndStoreCSRFToken } from './csrfUtils'; export interface SseEvent { type: string; [key: string]: any; } export interface SseEventHandlers { onMessage?: (event: SseEvent) => void; onChunk?: (event: SseEvent) => void; onStatus?: (event: SseEvent) => void; onFileEditProposal?: (event: SseEvent) => void; onFileVersion?: (event: SseEvent) => void; onToolCall?: (event: SseEvent) => void; onToolResult?: (event: SseEvent) => void; onAgentProgress?: (event: SseEvent) => void; onAgentSummary?: (event: SseEvent) => void; onFileCreated?: (event: SseEvent) => void; onDataSourceAccess?: (event: SseEvent) => void; onVoiceResponse?: (event: SseEvent) => void; onWorkflowUpdated?: (event: SseEvent) => void; onComplete?: (event: SseEvent) => void; onStopped?: (event: SseEvent) => void; onError?: (event: SseEvent) => void; onRawEvent?: (event: SseEvent) => void; } export interface SseClientOptions { url: string; body: Record; handlers: SseEventHandlers; signal?: AbortSignal; onConnectionError?: (error: Error) => void; onStreamEnd?: () => void; } const _EVENT_ROUTER: Record = { message: 'onMessage', chunk: 'onChunk', status: 'onStatus', file_edit_proposal: 'onFileEditProposal', fileEditProposal: 'onFileEditProposal', file_version: 'onFileVersion', fileVersion: 'onFileVersion', toolCall: 'onToolCall', toolResult: 'onToolResult', agent_progress: 'onAgentProgress', agentProgress: 'onAgentProgress', agent_summary: 'onAgentSummary', agentSummary: 'onAgentSummary', fileCreated: 'onFileCreated', dataSourceAccess: 'onDataSourceAccess', voiceResponse: 'onVoiceResponse', workflowUpdated: 'onWorkflowUpdated', complete: 'onComplete', stopped: 'onStopped', error: 'onError', }; /** * Start an SSE stream via POST request. * Returns a cleanup function to abort the connection. */ export function startSseStream(options: SseClientOptions): () => void { const { url, body, handlers, signal, onConnectionError, onStreamEnd } = options; const abortController = new AbortController(); const combinedSignal = signal ? _combineAbortSignals(signal, abortController.signal) : abortController.signal; const headers: Record = { 'Content-Type': 'application/json', }; const authToken = localStorage.getItem('authToken'); if (authToken) { headers['Authorization'] = `Bearer ${authToken}`; } if (!getCSRFToken()) { generateAndStoreCSRFToken(); } addCSRFTokenToHeaders(headers); fetch(url, { method: 'POST', headers, body: JSON.stringify(body), credentials: 'include', signal: combinedSignal, }) .then(async (response) => { if (!response.ok) { const errorText = await response.text(); throw new Error(`HTTP ${response.status}: ${errorText}`); } if (!response.body) { throw new Error('Response body is null'); } const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.startsWith('data: ')) { const jsonStr = line.slice(6); try { if (jsonStr.trim()) { const event: SseEvent = JSON.parse(jsonStr); _dispatchEvent(event, handlers); } } catch { // skip unparseable lines } } } } if (buffer.trim()) { for (const line of buffer.split('\n')) { if (line.startsWith('data: ')) { try { const event: SseEvent = JSON.parse(line.slice(6)); _dispatchEvent(event, handlers); } catch { /* skip */ } } } } onStreamEnd?.(); }) .catch((err) => { if (err.name === 'AbortError') return; onConnectionError?.(err instanceof Error ? err : new Error(String(err))); }); return () => abortController.abort(); } function _dispatchEvent(event: SseEvent, handlers: SseEventHandlers): void { handlers.onRawEvent?.(event); const handlerKey = _EVENT_ROUTER[event.type]; if (handlerKey) { const handler = handlers[handlerKey]; if (handler) { (handler as (e: SseEvent) => void)(event); } } } function _combineAbortSignals(...signals: AbortSignal[]): AbortSignal { const controller = new AbortController(); for (const sig of signals) { if (sig.aborted) { controller.abort(sig.reason); return controller.signal; } sig.addEventListener('abort', () => controller.abort(sig.reason), { once: true }); } return controller.signal; }