ui-nyla/src/hooks/useChatbot.ts

623 lines
20 KiB
TypeScript

import { useState, useCallback, useRef, useMemo, useEffect } from 'react';
import { useApiRequest } from './useApi';
import {
startChatbotStreamApi,
stopChatbotApi,
getChatbotThreadsApi,
getChatbotThreadApi,
deleteChatbotWorkflowApi,
type ChatDataItem,
type StartChatbotRequest,
type ChatbotWorkflow
} from '../api/chatbotApi';
import { Message } from '../components/UiComponents/Messages/MessagesTypes';
// Simple sort function for messages
const sortMessages = (a: Message, b: Message) => {
if (a.publishedAt !== undefined && b.publishedAt !== undefined) {
return a.publishedAt - b.publishedAt;
}
if (a.publishedAt !== undefined) return -1;
if (b.publishedAt !== undefined) return 1;
if (a.sequenceNr !== undefined && b.sequenceNr !== undefined) {
return a.sequenceNr - b.sequenceNr;
}
return 0;
};
export function useChatbot() {
const [inputValue, setInputValue] = useState<string>('');
const [messages, setMessages] = useState<Message[]>([]);
const [workflowId, setWorkflowId] = useState<string | null>(null);
const [isRunning, setIsRunning] = useState<boolean>(false);
const [isSubmitting, setIsSubmitting] = useState<boolean>(false);
const [error, setError] = useState<string | null>(null);
// Chat history state
const [threads, setThreads] = useState<ChatbotWorkflow[]>([]);
const [selectedThreadId, setSelectedThreadId] = useState<string | null>(null);
const [threadsLoading, setThreadsLoading] = useState<boolean>(false);
const [threadsError, setThreadsError] = useState<string | null>(null);
const [deletingThreads, setDeletingThreads] = useState<Set<string>>(new Set());
const { request } = useApiRequest();
const streamAbortControllerRef = useRef<AbortController | null>(null);
const processedMessageIdsRef = useRef<Set<string>>(new Set());
const thinkingMessageIdRef = useRef<string | null>(null);
const thinkingLogsRef = useRef<string[]>([]); // Use ref instead of state to avoid batching
const logQueueRef = useRef<string[]>([]); // Queue for logs to process one by one
const isProcessingLogsRef = useRef<boolean>(false); // Flag to prevent concurrent processing
// Clear processed message IDs when workflow changes
const clearProcessedMessages = useCallback(() => {
processedMessageIdsRef.current.clear();
}, []);
// Clear thinking message when a new assistant message arrives
const clearThinkingMessage = useCallback(() => {
// Clear log queue and stop processing
logQueueRef.current = [];
isProcessingLogsRef.current = false;
if (thinkingMessageIdRef.current) {
const thinkingId = thinkingMessageIdRef.current;
thinkingMessageIdRef.current = null;
thinkingLogsRef.current = [];
setMessages(prevMessages => {
return prevMessages.filter(m => m.id !== thinkingId);
});
}
}, []);
// Process logs from queue one by one (progressive display)
const processLogQueue = useCallback(() => {
if (isProcessingLogsRef.current || logQueueRef.current.length === 0) {
return;
}
isProcessingLogsRef.current = true;
const logMessage = logQueueRef.current.shift()!;
// Add log to accumulated logs
thinkingLogsRef.current = [...thinkingLogsRef.current, logMessage];
// Get or create thinking message ID
const thinkingId = thinkingMessageIdRef.current || `thinking-${Date.now()}`;
thinkingMessageIdRef.current = thinkingId;
// Create/update thinking message with all accumulated logs
// Format logs as Markdown list items so each log appears on a separate line
const formattedLogs = thinkingLogsRef.current
.map(log => `- ${log.trim()}`)
.join('\n');
const thinkingMessage: Message = {
id: thinkingId,
workflowId: workflowId || '',
role: 'assistant',
message: formattedLogs,
publishedAt: Date.now() - 1,
status: 'thinking'
};
// Update messages immediately
setMessages(prevMessages => {
// Remove old thinking message if it exists
const filtered = prevMessages.filter(m => m.id !== thinkingId);
// Add updated thinking message
const updated = [...filtered, thinkingMessage];
return updated.sort(sortMessages);
});
// Process next log after a small delay (progressive display)
if (logQueueRef.current.length > 0) {
setTimeout(() => {
isProcessingLogsRef.current = false;
processLogQueue();
}, 50); // Small delay between logs for progressive display
} else {
isProcessingLogsRef.current = false;
}
}, [workflowId]);
// Add a single log to thinking message (progressive display)
const addLogToThinkingMessage = useCallback((logMessage: string) => {
// Add log to queue
logQueueRef.current.push(logMessage);
// Start processing if not already processing
if (!isProcessingLogsRef.current) {
processLogQueue();
}
}, [processLogQueue]);
// Process SSE event and update messages
const processChatDataItem = useCallback((item: ChatDataItem) => {
if (item.type === 'log' && item.item) {
// Process log item - add to thinking message one at a time
const logData = item.item as any;
const logMessage = logData.message || logData.text || '';
if (logMessage) {
// Add log immediately (progressive display)
addLogToThinkingMessage(logMessage);
}
} else if (item.type === 'message' && item.item) {
const messageData = item.item as any;
// Ensure message has required fields
if (!messageData.id) {
console.warn('⚠️ Invalid message item (missing id):', messageData);
return;
}
// Check if this is an assistant message that should clear thinking message
const messageRole = messageData.role?.toLowerCase();
const isAssistantMessage = messageRole === 'assistant' || messageRole === 'ai' || messageRole === 'system';
// ALWAYS clear thinking message when ANY message arrives (not just assistant)
// The thinking message should disappear when the real message comes
const thinkingId = thinkingMessageIdRef.current;
// Check if we've already processed this message
const messageId = messageData.id;
if (processedMessageIdsRef.current.has(messageId)) {
// Update existing message - clear thinking message first
setMessages(prevMessages => {
let filtered = prevMessages;
if (thinkingId) {
filtered = prevMessages.filter(m => m.id !== thinkingId);
thinkingMessageIdRef.current = null;
thinkingLogsRef.current = [];
}
const existingIndex = filtered.findIndex(m => m.id === messageId);
if (existingIndex >= 0) {
const updated = [...filtered];
updated[existingIndex] = messageData as Message;
return updated.sort(sortMessages);
}
return filtered;
});
} else {
// Add new message - clear thinking message first
processedMessageIdsRef.current.add(messageId);
setMessages(prevMessages => {
// Remove thinking message BEFORE adding new message (same state update)
let filtered = prevMessages;
if (thinkingId) {
filtered = prevMessages.filter(m => m.id !== thinkingId);
thinkingMessageIdRef.current = null;
thinkingLogsRef.current = [];
}
// Add new message
const updated = [...filtered, messageData as Message];
return updated.sort(sortMessages);
});
}
}
}, [addLogToThinkingMessage]);
// Load all threads (with loading state)
const loadThreads = useCallback(async () => {
try {
setThreadsLoading(true);
setThreadsError(null);
const result = await getChatbotThreadsApi(request);
// Sort threads by lastActivity (newest first)
const sortedThreads = [...result.items].sort((a, b) => {
const aTime = a.lastActivity || a.startedAt || 0;
const bTime = b.lastActivity || b.startedAt || 0;
return bTime - aTime; // Descending order (newest first)
});
setThreads(sortedThreads);
} catch (err: any) {
console.error('Error loading threads:', err);
setThreadsError(err.message || 'Failed to load threads');
} finally {
setThreadsLoading(false);
}
}, [request]);
// Load threads silently (without loading state) - keeps existing threads visible
const loadThreadsSilently = useCallback(async () => {
try {
setThreadsError(null);
const result = await getChatbotThreadsApi(request);
// Sort threads by lastActivity (newest first)
const sortedThreads = [...result.items].sort((a, b) => {
const aTime = a.lastActivity || a.startedAt || 0;
const bTime = b.lastActivity || b.startedAt || 0;
return bTime - aTime; // Descending order (newest first)
});
setThreads(sortedThreads);
} catch (err: any) {
console.error('Error loading threads silently:', err);
setThreadsError(err.message || 'Failed to load threads');
}
}, [request]);
// Load a specific workflow and its messages
const loadWorkflow = useCallback(async (workflowIdToLoad: string) => {
try {
setError(null);
const result = await getChatbotThreadApi(request, workflowIdToLoad);
console.log('[loadWorkflow] Full result:', JSON.stringify(result, null, 2));
console.log('[loadWorkflow] chatData structure:', {
isArray: Array.isArray(result.chatData),
isObject: result.chatData && typeof result.chatData === 'object',
chatDataKeys: result.chatData && typeof result.chatData === 'object' ? Object.keys(result.chatData) : [],
hasItems: result.chatData?.items !== undefined,
itemsLength: Array.isArray(result.chatData?.items) ? result.chatData.items.length : 'not an array'
});
// Convert chatData to Message[]
const loadedMessages: Message[] = [];
const processedIds = new Set<string>();
// Backend returns chatData as { items: ChatDataItem[] } structure
const chatDataArray: any[] = result.chatData?.items || [];
console.log('[loadWorkflow] Processing chatDataArray:', {
length: chatDataArray.length,
types: chatDataArray.map((item, idx) => ({
index: idx,
type: item?.type,
hasItem: !!item?.item,
keys: item ? Object.keys(item) : []
}))
});
for (const item of chatDataArray) {
// Handle ChatDataItem structure: { type: 'message', createdAt: number, item: Message }
if (item.type === 'message' && item.item) {
const messageData = item.item as any;
if (messageData.id && !processedIds.has(messageData.id)) {
processedIds.add(messageData.id);
loadedMessages.push(messageData as Message);
}
}
// Handle direct Message structure (if item is already a message)
else if (item.role && item.message !== undefined) {
// This looks like a Message object directly
if (item.id && !processedIds.has(item.id)) {
processedIds.add(item.id);
loadedMessages.push(item as Message);
}
}
}
console.log('[loadWorkflow] Loaded messages:', {
count: loadedMessages.length,
messageIds: loadedMessages.map(m => m.id),
sampleMessage: loadedMessages.length > 0 ? loadedMessages[0] : null
});
// Sort messages chronologically
const sortedMessages = loadedMessages.sort(sortMessages);
// Update state
setMessages(sortedMessages);
setWorkflowId(workflowIdToLoad);
processedMessageIdsRef.current = processedIds;
console.log('[loadWorkflow] State updated:', {
messagesCount: sortedMessages.length,
workflowId: workflowIdToLoad
});
// Don't refresh threads list here - it causes unwanted loading state
// Threads will be refreshed after new messages are sent/completed
} catch (err: any) {
console.error('Error loading workflow:', err);
setError(err.message || 'Failed to load workflow');
}
}, [request]);
// Select a thread (loads its messages)
const selectThread = useCallback(async (workflowIdToSelect: string) => {
setSelectedThreadId(workflowIdToSelect);
await loadWorkflow(workflowIdToSelect);
}, [loadWorkflow]);
// Auto-load threads on initialization
useEffect(() => {
loadThreads();
}, [loadThreads]);
// Handle input change
const onInputChange = useCallback((value: string) => {
setInputValue(value);
}, []);
// Stop chatbot workflow
const stopChatbot = useCallback(async () => {
if (!workflowId || !isRunning) {
return;
}
try {
setIsSubmitting(true);
// Abort any ongoing stream
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
streamAbortControllerRef.current = null;
}
// Clear thinking message when stopping
clearThinkingMessage();
// Call stop API
await stopChatbotApi(request, workflowId);
setIsRunning(false);
setError(null);
} catch (err: any) {
console.error('Error stopping chatbot:', err);
setError(err.message || 'Failed to stop chatbot');
} finally {
setIsSubmitting(false);
}
}, [workflowId, isRunning, request, clearThinkingMessage]);
// Handle form submit
const handleSubmit = useCallback(async () => {
// If running, stop instead of starting
if (isRunning && workflowId) {
await stopChatbot();
return;
}
const trimmedInput = inputValue.trim();
if (!trimmedInput || isSubmitting) {
return;
}
try {
setIsSubmitting(true);
setError(null);
setIsRunning(true);
// Abort any existing stream
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
}
// Create new abort controller for this stream
const abortController = new AbortController();
streamAbortControllerRef.current = abortController;
// Prepare request body
const requestBody: StartChatbotRequest = {
prompt: trimmedInput,
userLanguage: 'en',
...(workflowId && { workflowId })
};
// Track if workflow was created in this request
let workflowCreated = false;
// Clear thinking message when starting a new request
clearThinkingMessage();
// Start SSE stream
await startChatbotStreamApi(
requestBody,
(item: ChatDataItem) => {
// Check if stream was aborted
if (abortController.signal.aborted) {
return;
}
// Process the chat data item
processChatDataItem(item);
// Extract workflow ID from message if available
if (item.type === 'message' && item.item) {
const messageData = item.item as any;
if (messageData.workflowId) {
if (!workflowId) {
// New workflow created - select it automatically
setWorkflowId(messageData.workflowId);
setSelectedThreadId(messageData.workflowId);
workflowCreated = true;
} else {
// Existing workflow - ensure it's selected
setSelectedThreadId(messageData.workflowId);
}
}
}
},
(err: Error) => {
// Only handle error if stream wasn't aborted
if (!abortController.signal.aborted) {
console.error('SSE stream error:', err);
setError(err.message || 'Stream error occurred');
setIsRunning(false);
}
},
() => {
// Stream completed
if (!abortController.signal.aborted) {
setIsRunning(false);
setInputValue(''); // Clear input on completion
// Clear thinking message on completion if no final message was received
setTimeout(() => {
clearThinkingMessage();
// Refresh threads list after message completion (silently, without loading state)
loadThreadsSilently();
}, 100);
}
}
);
// Clear input after starting (optimistic)
if (!workflowId) {
setInputValue('');
}
} catch (err: any) {
console.error('Error starting chatbot:', err);
setError(err.message || 'Failed to start chatbot');
setIsRunning(false);
} finally {
setIsSubmitting(false);
streamAbortControllerRef.current = null;
}
}, [inputValue, workflowId, isRunning, isSubmitting, stopChatbot, processChatDataItem, clearThinkingMessage, loadThreads]);
// Delete a chatbot workflow
const handleDeleteThread = useCallback(async (workflowIdToDelete: string): Promise<boolean> => {
try {
// Add to deleting set
setDeletingThreads(prev => new Set(prev).add(workflowIdToDelete));
// If deleting the selected thread, clear selection and messages
if (selectedThreadId === workflowIdToDelete) {
setMessages([]);
setWorkflowId(null);
setSelectedThreadId(null);
}
// Call delete API
await deleteChatbotWorkflowApi(request, workflowIdToDelete);
// Remove from threads list optimistically
setThreads(prev => prev.filter(t => t.id !== workflowIdToDelete));
return true;
} catch (err: any) {
console.error('Error deleting thread:', err);
throw err;
} finally {
// Remove from deleting set
setDeletingThreads(prev => {
const next = new Set(prev);
next.delete(workflowIdToDelete);
return next;
});
}
}, [request, selectedThreadId]);
// Optimistically remove thread from list
const removeThreadOptimistically = useCallback((workflowId: string) => {
setThreads(prev => prev.filter(t => t.id !== workflowId));
// If deleting the selected thread, clear selection
if (selectedThreadId === workflowId) {
setSelectedThreadId(null);
setMessages([]);
setWorkflowId(null);
}
}, [selectedThreadId]);
// Start a new chat (clears selection and messages)
const startNewChat = useCallback(() => {
// Abort any ongoing stream
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
streamAbortControllerRef.current = null;
}
// Clear messages and selection
setMessages([]);
setWorkflowId(null);
setSelectedThreadId(null);
setError(null);
setInputValue('');
thinkingLogsRef.current = [];
thinkingMessageIdRef.current = null;
clearProcessedMessages();
}, [clearProcessedMessages]);
// Reset chatbot state
const resetChatbot = useCallback(() => {
// Abort any ongoing stream
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
streamAbortControllerRef.current = null;
}
setMessages([]);
setWorkflowId(null);
setSelectedThreadId(null);
setIsRunning(false);
setIsSubmitting(false);
setError(null);
setInputValue('');
thinkingLogsRef.current = [];
thinkingMessageIdRef.current = null;
clearProcessedMessages();
}, [clearProcessedMessages]);
// Cleanup on unmount
const cleanup = useCallback(() => {
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
streamAbortControllerRef.current = null;
}
logQueueRef.current = [];
isProcessingLogsRef.current = false;
}, []);
// Memoized display messages
const displayMessages = useMemo(() => {
return messages.sort(sortMessages);
}, [messages]);
return {
// GenericDataHook interface
data: [],
loading: false,
error,
// Input form interface
inputValue,
onInputChange,
handleSubmit,
isSubmitting,
// Workflow state
workflowId: workflowId || undefined,
isRunning,
// Messages
messages: displayMessages,
// Chat history state
threads,
selectedThreadId,
threadsLoading,
threadsError,
// Chat history methods
selectThread,
loadThreads,
// Delete methods
handleDelete: handleDeleteThread,
removeOptimistically: removeThreadOptimistically,
deletingItems: deletingThreads,
refetch: loadThreads,
// Additional methods
stopChatbot,
resetChatbot,
startNewChat,
cleanup
};
}
export function createChatbotHook() {
return () => useChatbot();
}