frontend_nyla/src/utils/sseClient.ts
2026-03-17 22:51:36 +01:00

183 lines
5.3 KiB
TypeScript

/**
* Shared SSE Client Utility
*
* Generic fetch-based SSE streaming for POST requests with JSON body.
* Reusable SSE implementation across all workspace features.
*/
import { addCSRFTokenToHeaders, getCSRFToken, generateAndStoreCSRFToken } from './csrfUtils';
export interface SseEvent {
type: string;
[key: string]: any;
}
export interface SseEventHandlers {
onMessage?: (event: SseEvent) => void;
onChunk?: (event: SseEvent) => void;
onStatus?: (event: SseEvent) => void;
onFileEditProposal?: (event: SseEvent) => void;
onFileVersion?: (event: SseEvent) => void;
onFileEditRejected?: (event: SseEvent) => void;
onFileUpdated?: (event: SseEvent) => void;
onToolCall?: (event: SseEvent) => void;
onToolResult?: (event: SseEvent) => void;
onAgentProgress?: (event: SseEvent) => void;
onAgentSummary?: (event: SseEvent) => void;
onFileCreated?: (event: SseEvent) => void;
onDataSourceAccess?: (event: SseEvent) => void;
onVoiceResponse?: (event: SseEvent) => void;
onWorkflowUpdated?: (event: SseEvent) => void;
onComplete?: (event: SseEvent) => void;
onStopped?: (event: SseEvent) => void;
onError?: (event: SseEvent) => void;
onRawEvent?: (event: SseEvent) => void;
}
export interface SseClientOptions {
url: string;
body: Record<string, any>;
handlers: SseEventHandlers;
signal?: AbortSignal;
onConnectionError?: (error: Error) => void;
onStreamEnd?: () => void;
}
const _EVENT_ROUTER: Record<string, keyof SseEventHandlers> = {
message: 'onMessage',
chunk: 'onChunk',
status: 'onStatus',
file_edit_proposal: 'onFileEditProposal',
fileEditProposal: 'onFileEditProposal',
file_version: 'onFileVersion',
fileVersion: 'onFileVersion',
file_edit_rejected: 'onFileEditRejected',
fileEditRejected: 'onFileEditRejected',
file_updated: 'onFileUpdated',
fileUpdated: 'onFileUpdated',
toolCall: 'onToolCall',
toolResult: 'onToolResult',
agent_progress: 'onAgentProgress',
agentProgress: 'onAgentProgress',
agent_summary: 'onAgentSummary',
agentSummary: 'onAgentSummary',
fileCreated: 'onFileCreated',
dataSourceAccess: 'onDataSourceAccess',
voiceResponse: 'onVoiceResponse',
workflowUpdated: 'onWorkflowUpdated',
complete: 'onComplete',
stopped: 'onStopped',
error: 'onError',
};
/**
* Start an SSE stream via POST request.
* Returns a cleanup function to abort the connection.
*/
export function startSseStream(options: SseClientOptions): () => void {
const { url, body, handlers, signal, onConnectionError, onStreamEnd } = options;
const abortController = new AbortController();
const combinedSignal = signal
? _combineAbortSignals(signal, abortController.signal)
: abortController.signal;
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
const authToken = localStorage.getItem('authToken');
if (authToken) {
headers['Authorization'] = `Bearer ${authToken}`;
}
if (!getCSRFToken()) {
generateAndStoreCSRFToken();
}
addCSRFTokenToHeaders(headers);
fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(body),
credentials: 'include',
signal: combinedSignal,
})
.then(async (response) => {
if (!response.ok) {
const errorText = await response.text();
throw new Error(`HTTP ${response.status}: ${errorText}`);
}
if (!response.body) {
throw new Error('Response body is null');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const jsonStr = line.slice(6);
try {
if (jsonStr.trim()) {
const event: SseEvent = JSON.parse(jsonStr);
_dispatchEvent(event, handlers);
}
} catch {
// skip unparseable lines
}
}
}
}
if (buffer.trim()) {
for (const line of buffer.split('\n')) {
if (line.startsWith('data: ')) {
try {
const event: SseEvent = JSON.parse(line.slice(6));
_dispatchEvent(event, handlers);
} catch { /* skip */ }
}
}
}
onStreamEnd?.();
})
.catch((err) => {
if (err.name === 'AbortError') return;
onConnectionError?.(err instanceof Error ? err : new Error(String(err)));
});
return () => abortController.abort();
}
function _dispatchEvent(event: SseEvent, handlers: SseEventHandlers): void {
handlers.onRawEvent?.(event);
const handlerKey = _EVENT_ROUTER[event.type];
if (handlerKey) {
const handler = handlers[handlerKey];
if (handler) {
(handler as (e: SseEvent) => void)(event);
}
}
}
function _combineAbortSignals(...signals: AbortSignal[]): AbortSignal {
const controller = new AbortController();
for (const sig of signals) {
if (sig.aborted) {
controller.abort(sig.reason);
return controller.signal;
}
sig.addEventListener('abort', () => controller.abort(sig.reason), { once: true });
}
return controller.signal;
}