feat: added documentation on chatbot

This commit is contained in:
Ida Dittrich 2026-01-09 13:50:15 +01:00
parent 7e04d70683
commit db5ba4dee7
4 changed files with 750 additions and 2 deletions

View file

@ -64,7 +64,7 @@ flowchart LR
%% Service Center
subgraph ServiceCenter[gateway/modules/services/__init__.py]
SC[Services registry/factory<br/>(PublicService wrappers)]
SC[Services registry/factory<br>PublicService wrappers]
end
%% Services
@ -110,7 +110,7 @@ flowchart LR
DMVoice[datamodelVoice]
DMNeut[datamodelNeutralizer]
DMWork[datamodelWorkflow]
DMInit[__init__ (namespace)]
DMInit[__init__ namespace]
end
%% Workflows

View file

@ -0,0 +1,236 @@
# Streaming Utility Architecture: Event-Driven Real-Time Updates
## Current Implementation
### Event Manager (`modules/features/chatbot/eventManager.py`)
The `StreamingEventManager` is a **generic, reusable** event manager that provides:
- **Generic Event Queue Management**: Per-context asyncio queues (not just workflows)
- **Event Emission**: `emit_event()` method supporting multiple event types and categories
- **Event Streaming**: `stream_events()` async generator for SSE streaming
- **Automatic Cleanup**: Queue cleanup after delay (60 seconds default)
- **Event Categories**: Filtering by category (chat, workflow, document, etc.)
- **Thread-Safe**: Lock-based synchronization for concurrent access
### Architecture Overview
The streaming system uses a **pure event-driven approach**:
1. **Event Emission**: When data changes (messages created, logs written), events are emitted directly
2. **Event Queue**: Events are queued per context (workflow_id, document_id, etc.)
3. **SSE Streaming**: Route endpoint streams events from queue in real-time
4. **No Database Polling**: The SSE endpoint does NOT poll the database - it only streams queued events
### Current Usage: Chatbot Feature
The chatbot feature (`modules/features/chatbot/`) demonstrates the streaming architecture:
- **Event Types**: `chatdata`, `complete`, `stopped`, `error`
- **Event Categories**: `chat`, `workflow`
- **Direct Emission**: Events emitted when messages/logs are created in `mainChatbot.py`
## Implementation Details
### 1. Event Manager API (`modules/features/chatbot/eventManager.py`)
```python
class StreamingEventManager:
"""
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.)
Thread-safe event emission and queue management.
"""
def __init__(self):
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids
def create_queue(self, context_id: str) -> asyncio.Queue:
"""Create a new event queue for a context"""
def get_queue(self, context_id: str) -> Optional[asyncio.Queue]:
"""Get existing event queue for a context"""
def has_queue(self, context_id: str) -> bool:
"""Check if a queue exists for a context"""
async def emit_event(
self,
context_id: str, # workflow_id, document_id, task_id, etc.
event_type: str, # "message", "log", "status", "progress", "complete", "error", "chatdata"
data: Dict[str, Any], # Flexible data structure
event_category: str = "default", # "chat", "workflow", "document", etc.
message: Optional[str] = None, # For backward compatibility
step: Optional[str] = None # For backward compatibility
):
"""Emit event to the context's event queue"""
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None,
timeout: Optional[float] = None
) -> AsyncIterator[Dict[str, Any]]:
"""Async generator for streaming events from a context"""
async def cleanup(self, context_id: str, delay: float = 60.0):
"""Schedule cleanup of event queue after delay"""
```
**Global Singleton**: Access via `get_event_manager()` function
### 2. SSE Route Implementation (`modules/routes/routeChatbot.py`)
The chatbot streaming endpoint (`/api/chatbot/start/stream`) demonstrates the pattern:
```python
@router.post("/start/stream")
async def stream_chatbot_start(...) -> StreamingResponse:
event_manager = get_event_manager()
# Start background processing (creates workflow and event queue)
workflow = await chatProcess(currentUser, userInput, workflowId)
# Get or create event queue
queue = event_manager.get_queue(workflow.id) or event_manager.create_queue(workflow.id)
async def event_stream():
"""Pure event-driven streaming (no database polling)"""
# 1. Send initial chat data once (from database)
chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None)
if chatData.get("items"):
for item in filtered_items:
yield f"data: {json.dumps(item)}\n\n"
# 2. Stream events from queue (event-driven)
while True:
try:
# Get event from queue (blocks until event available)
event = await asyncio.wait_for(queue.get(), timeout=1.0)
# Handle event types
if event["type"] == "chatdata":
yield f"data: {json.dumps(event["data"])}\n\n"
elif event["type"] == "complete":
break # Close stream
# ... other event types
except asyncio.TimeoutError:
# Send keepalive every 30 seconds
yield f": keepalive\n\n"
continue
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
```
**Key Points**:
- **Initial Data**: Fetched once from database at stream start
- **Event Streaming**: Pure event-driven from queue (no polling)
- **Keepalive**: Sent every 30 seconds to keep connection alive
- **Status Check**: Periodic workflow status check (every 5 seconds) only for stopped detection
### 3. Event Emission in Processing Code
#### A. Chatbot Message Processing (`modules/features/chatbot/mainChatbot.py`)
Events are emitted **directly when data is created**:
```python
from modules.features.chatbot.eventManager import get_event_manager
event_manager = get_event_manager()
# When creating a user message
userMessage = interfaceDbChat.createMessage(userMessageData)
# Emit event immediately (exact chatData format)
await event_manager.emit_event(
context_id=workflow.id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": userMessage.dict()
},
event_category="chat"
)
# When creating assistant message
assistantMessage = interfaceDbChat.createMessage(assistantMessageData)
# Emit event immediately
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": assistantMessage.dict()
},
event_category="chat"
)
# When workflow completes
await event_manager.emit_event(
context_id=workflowId,
event_type="complete",
data={"workflowId": workflowId},
event_category="workflow",
message="Chatbot-Verarbeitung abgeschlossen",
step="complete"
)
```
#### B. Log Events
Logs are stored in database and then emitted as events:
```python
# Store log in database
log_data = {
"id": f"log_{uuid.uuid4()}",
"workflowId": workflowId,
"message": "Analysiere Benutzeranfrage...",
"type": "info",
"timestamp": getUtcTimestamp(),
"status": "running",
"roundNumber": round_number
}
interfaceDbChat.createLog(log_data)
# Note: Logs are emitted via the route's periodic chatData fetch mechanism
# OR can be emitted directly as events if needed
```
**Event Format**: Events use the exact `chatData` format: `{type, createdAt, item}`
## Benefits of Event-Driven Streaming
### Performance
- **Reduced Server Load**: No constant database queries every 0.5-3 seconds
- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms)
- **Bandwidth Efficiency**: Only send data when it changes, not empty responses
- **No Database Polling**: SSE endpoint does NOT poll database - pure event-driven
### User Experience
- **Real-time Updates**: Users see progress instantly as events occur
- **Better Responsiveness**: No perceived delay from polling intervals
- **Reduced Battery**: Mobile devices consume less power without constant polling
- **Immediate Feedback**: Messages appear as soon as they're created
### Scalability
- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) in future
- **Connection Management**: Better handling of many concurrent streams
- **Resource Efficiency**: One persistent connection vs many HTTP requests
- **Memory Efficient**: Queues cleaned up automatically after workflow completion

View file

@ -0,0 +1,378 @@
# Chatbot vs FastTrack: Architecture Comparison
## Chatbot Basic Implementation
### Overview
The chatbot is a specialized feature designed for handling user queries that require database access or web research. It provides immediate responses by returning a workflow object instantly, then processes the request asynchronously in the background.
### Core Purpose
The chatbot analyzes user input to determine what database queries or web research are needed, executes them, and generates comprehensive answers based on real data rather than just AI knowledge.
### Main Entry Point
**Function**: `modules/features/chatbot/mainChatbot.py::chatProcess()`
**Signature**:
```python
async def chatProcess(
currentUser: User,
userInput: UserInputRequest,
workflowId: Optional[str] = None
) -> ChatWorkflow
```
### Basic Flow
1. **Workflow Creation/Resumption**
- Creates new workflow or resumes existing one
- Generates conversation name from user prompt
- Sets workflow mode to `WORKFLOW_CHATBOT`
- Creates event queue for streaming
2. **Message Storage**
- Stores user message immediately
- Emits message event for streaming
- Returns workflow object (instant response to user)
3. **Background Processing** (async)
- Analyzes user input to determine query needs
- Generates SQL queries if database access needed
- Executes queries in parallel
- Performs web research if needed
- Generates final answer with all results
### Key Components
#### 1. Analysis Phase (`_processChatbotMessage`)
**Purpose**: Determines what queries/research are needed
**Implementation**:
- Uses `get_initial_analysis_prompt()` from `chatbotConstants.py`
- Calls AI via `MethodAi.process()` with `simpleMode=True`
- Returns JSON with:
- `needsDatabaseQuery`: Boolean
- `needsWebResearch`: Boolean
- `sqlQueries[]`: Array of SQL query objects
- `reasoning`: Explanation of analysis
**Query Structure**:
```json
{
"query": "SELECT ...",
"purpose": "Description of what query retrieves",
"table": "Primary table name"
}
```
#### 2. Query Execution (`_execute_queries_parallel`)
**Purpose**: Executes multiple SQL queries simultaneously
**Implementation**:
- Uses `PreprocessorConnector` for database access
- Executes all queries in parallel via `asyncio.gather()`
- Returns results as dictionary:
- `query_1`, `query_2`, etc.: Success result text
- `query_1_data`, `query_2_data`, etc.: Raw data arrays
- `query_1_error`, `query_2_error`, etc.: Error messages if failed
**Benefits**:
- Parallel execution = faster overall time
- Continues even if some queries fail
- Provides detailed error information per query
#### 3. Web Research (`_buildWebResearchQuery`)
**Purpose**: Enriches web search queries with context from conversation
**Implementation**:
- Extracts product information from:
- Current user prompt (article numbers, product mentions)
- Database query results (if available)
- Previous assistant messages (conversation history)
- Builds enriched search query with article number, description, supplier
- Calls `services.web.performWebResearch()`
#### 4. Final Answer Generation
**Purpose**: Combines all results into comprehensive answer
**Implementation**:
- Uses `get_final_answer_system_prompt()` for structured response
- Builds context with:
- User question
- Database query results (organized by query number)
- Web research results
- Error information if queries failed
- Single AI call with all data
- Streams result as assistant message
### Key Functions
| Function | Purpose | Location |
|----------|---------|----------|
| `chatProcess()` | Main entry point, creates workflow and starts processing | `mainChatbot.py:60` |
| `_processChatbotMessage()` | Background processing: analysis → execution → answer | `mainChatbot.py:522` |
| `_execute_queries_parallel()` | Executes multiple SQL queries in parallel | `mainChatbot.py:194` |
| `_buildWebResearchQuery()` | Enriches web search with conversation context | `mainChatbot.py:318` |
| `_extractJsonFromResponse()` | Extracts JSON from AI response (handles markdown) | `mainChatbot.py:33` |
| `_emit_log_and_event()` | Stores logs and emits events for streaming | `mainChatbot.py:254` |
| `get_initial_analysis_prompt()` | System prompt for query analysis | `chatbotConstants.py` |
| `get_final_answer_system_prompt()` | System prompt for final answer generation | `chatbotConstants.py` |
| `generate_conversation_name()` | Generates conversation name from user prompt | `chatbotConstants.py` |
### Database Schema
The chatbot has knowledge of the database schema:
**Tables**:
- `Artikel`: Product information (I_ID, Artikelbezeichnung, Artikelnummer, etc.)
- `Einkaufspreis`: Price data (m_Artikel, EP_CHF)
- `Lagerplatz_Artikel`: Stock and warehouse location data (R_ARTIKEL, R_LAGERPLATZ, Bestände, etc.)
- `Lagerplatz`: Warehouse location names (I_ID, Lagerplatz, R_LAGER, R_LAGERORT)
**Relationships**:
- `Artikel.I_ID = Einkaufspreis.m_Artikel`
- `Artikel.I_ID = Lagerplatz_Artikel.R_ARTIKEL`
- `Lagerplatz_Artikel.R_LAGERPLATZ = Lagerplatz.I_ID`
### Streaming Architecture
**Route**: `/api/chatbot/start/stream`
**Format**: Server-Sent Events (SSE)
**Data Format**: Exact `chatData` format:
```json
{
"type": "message" | "log" | "stat",
"createdAt": "timestamp",
"item": { ... }
}
```
**Features**:
- Initial chat data sent immediately
- Periodic fetching of new chat data (every 0.5s)
- Event queue for real-time updates
- Round-based filtering for resumed conversations
### Error Handling
**Strategy**: Graceful degradation
- If analysis fails: Uses fallback empty analysis
- If queries fail: Logs errors per query, continues with successful ones
- If web research fails: Logs warning, continues without web data
- If final answer fails: Stores error message, updates workflow status
**Result**: Always provides some response, even if partial
### Workflow States
- `running`: Processing in progress
- `completed`: Successfully finished
- `stopped`: User stopped the workflow
- `error`: Error occurred during processing
### Key Design Decisions
1. **Immediate Return**: Returns workflow object instantly, processes in background
2. **Parallel Execution**: Executes multiple queries simultaneously for speed
3. **Streaming Feedback**: Provides real-time progress updates
4. **Data-Driven**: Uses real database/web data rather than AI knowledge only
5. **Graceful Degradation**: Continues with partial results if some steps fail
6. **Conversation Context**: Uses conversation history to enrich queries
---
## Comparison Chatbot and Fasttrack Workflow
This document compares two approaches for handling simple user requests:
- **Chatbot**: A specialized feature for database queries and web research
- **FastTrack**: A fast path optimization in the general workflow system
Both serve similar purposes but have different execution models and characteristics.
## Chatbot Architecture
### Execution Flow
```
User sends message
[STEP 1] Store message immediately → RETURN workflow (instant response)
[STEP 2] Background processing starts (async)
[STEP 3] Focused Analysis (~2-5s)
- Determines: needsDatabaseQuery? needsWebResearch?
- Generates SQL queries if needed
- Lightweight, purpose-specific prompt
[STEP 4] Execute queries in PARALLEL (if DB needed)
- Multiple SQL queries run simultaneously
- Database execution is fast
[STEP 5] Web research (if needed, parallel to DB)
[STEP 6] Final AI call with ALL results (~5-10s)
- Has actual data from DB/web
- Generates comprehensive answer
Stream everything back (queries, results, final answer)
Done (Total: ~7-20s, but user sees response immediately)
```
### Key Characteristics
- **Immediate Response**: Returns workflow object immediately, processes in background
- **Focused Analysis**: Single-purpose analysis to determine DB/web needs
- **Parallel Execution**: Executes multiple SQL queries simultaneously
- **Data-Driven**: Uses real database and web research results
- **Streaming Feedback**: Streams queries, results, and progress updates
- **Workflow Mode**: Uses `WorkflowModeEnum.WORKFLOW_CHATBOT`
### Implementation Details
**Entry Point**: `modules/features/chatbot/mainChatbot.py::chatProcess()`
**Analysis Phase**:
- Uses `get_initial_analysis_prompt()` for focused analysis
- Returns: `needsDatabaseQuery`, `needsWebResearch`, `sqlQueries[]`
- Executes via `MethodAi.process()` with `simpleMode=True`
**Query Execution**:
- Parallel execution via `_execute_queries_parallel()`
- Uses `PreprocessorConnector` for database access
- Results stored as `query_1`, `query_2_data`, etc.
**Final Answer**:
- Single AI call with all query results and web research
- Uses `get_final_answer_system_prompt()` for structured response
- Streams result as assistant message
## FastTrack Architecture
### Execution Flow
```
User sends message
[STEP 1] COMBINED ANALYSIS (heavy AI call - ~5-10s)
- Analyzes complexity, language, intent
- Normalizes request (full restatement)
- Extracts context items
- Determines dataType, expectedFormats, qualityRequirements
- Checks needsWorkflowHistory
- Determines fastTrack eligibility
[STEP 2] If simple → FastTrack path
[STEP 3] FastTrack AI call (~5-15s)
- Single AI call with prompt
- Basic processing mode
- Max 15s timeout
[STEP 4] Store answer
Done (Total: ~10-25s + overhead)
```
### Key Characteristics
- **Comprehensive Analysis**: Multi-purpose analysis covering 11 different aspects
- **Sequential Execution**: Single AI call after analysis
- **Knowledge-Based**: Relies on AI's training data (no database access)
- **Silent Processing**: No intermediate feedback until final answer
- **Workflow Integration**: Part of general workflow system
### Implementation Details
**Entry Point**: `modules/workflows/workflowManager.py::_executeFastPath()`
**Analysis Phase**:
- Uses `_analyzeUserInputAndComplexity()` for comprehensive analysis
- Returns: `complexity`, `detectedLanguage`, `normalizedRequest`, `intent`, `contextItems`, `dataType`, `expectedFormats`, `qualityRequirements`, `successCriteria`, `needsWorkflowHistory`, `fastTrack`
- Executes via `services.ai.callAiPlanning()`
**FastTrack Execution**:
- Single AI call via `workflowProcessor.fastPathExecute()`
- Uses `callWithTextContext()` for text-only responses
- Processing mode: `BASIC`
- Max cost: 0.10, Max time: 15s
**Response**:
- Creates `ActionDocument` with response text
- Stores as assistant message with `status="last"`
## Comparison
### Performance Characteristics
| Aspect | Chatbot | FastTrack |
|--------|---------|-----------|
| **Time to first response** | Instant (returns workflow immediately) | ~5-10s (waits for analysis) |
| **Initial analysis** | 2-5s (focused) | 5-10s (comprehensive) |
| **Query execution** | Parallel (fast) | N/A (no DB) |
| **Final answer** | 5-10s (with real data) | 5-15s (AI knowledge only) |
| **User sees progress** | Yes (streaming) | No (silent) |
| **Total perceived time** | ~2-5s (feels instant) | ~10-25s (feels slower) |
### Analysis Complexity
**Chatbot Analysis**:
- Single-purpose: Determines if DB/web research needed
- Lightweight prompt focused on query generation
- Returns: `needsDatabaseQuery`, `needsWebResearch`, `sqlQueries[]`
**FastTrack Analysis**:
- Multi-purpose: Comprehensive request analysis
- Detailed prompt covering 11 different aspects
- Returns: Complexity, language, intent, normalized request, context items, data type, formats, quality requirements, success criteria, workflow history needs, fastTrack eligibility
### Data Access
**Chatbot**:
- ✅ Direct database access via `PreprocessorConnector`
- ✅ Web research via `services.web.performWebResearch()`
- ✅ Uses real data for answers
**FastTrack**:
- ❌ No database access
- ❌ No web research
- ✅ Uses AI's training knowledge
### User Experience
**Chatbot**:
- Immediate workflow return
- Streaming progress updates
- See queries being generated
- See query results
- See final answer being built
**FastTrack**:
- Waits for analysis completion
- Silent processing
- Single final answer
- No intermediate feedback
### Code Complexity
**Chatbot**:
- Single-purpose feature
- Focused code path
- Direct query execution
- Minimal conditional logic
**FastTrack**:
- Part of larger workflow system
- Multiple routing decisions
- Integrated with task planning
- More conditional branches

View file

@ -0,0 +1,134 @@
# Web Search Content Extraction Fixes
## Problem Summary
The Tavily web search integration was failing to extract content from search results, causing web research to return empty or incomplete data. The main issues were related to handling `None` values and incomplete error recovery.
## Main Issues Fixed
### 1. Incomplete Content Extraction from Search Results
**Problem:**
- When Tavily API returned search results, some results had `raw_content` set to `None` (not missing, but explicitly `None`)
- The code used `result.get("raw_content") or result.get("content", "")` which failed when `raw_content` existed but was `None`
- This caused `None` values to propagate through the system instead of falling back to the `content` field or empty string
**Fix:**
Changed the content extraction in `aicorePluginTavily.py` to properly handle `None` values:
```python
# Before (line 344):
rawContent=result.get("raw_content") or result.get("content", "")
# After:
rawContent=result.get("raw_content") or result.get("content") or ""
```
This ensures that if `raw_content` is `None`, it falls back to `content`, and if that's also `None`, it defaults to an empty string.
**Additional Fix:**
Added defensive checks in the `webSearch` method to safely extract content even when result objects have unexpected structures:
```python
# Safely extract content with multiple fallbacks
content = ""
if hasattr(result, 'rawContent'):
content = result.rawContent or ""
if not content and hasattr(result, 'content'):
content = result.content or ""
```
### 2. NoneType Error When Logging Content Length
**Problem:**
- Code attempted to check `len(first_result.get('raw_content', ''))` for logging
- When `raw_content` key existed but value was `None`, `.get()` returned `None` instead of the default `''`
- This caused `len(None)` to fail with `TypeError: object of type 'NoneType' has no len()`
**Fix:**
Changed the logging code to safely handle `None` values:
```python
# Before (line 338):
logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(first_result.get('raw_content', ''))}")
# After:
raw_content = first_result.get('raw_content') or ''
logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(raw_content)}")
```
### 3. Missing Error Recovery in Content Extraction
**Problem:**
- When processing search results, if one result failed to extract, the entire extraction could fail
- No recovery mechanism to extract at least URLs even when content extraction failed
- Errors were logged but processing stopped, losing potentially useful data
**Fix:**
Added per-result error handling with recovery:
```python
for result in searchResults:
try:
# Extract URL, content, title safely
# ... extraction logic ...
except Exception as resultError:
logger.warning(f"Error processing individual search result: {resultError}")
# Continue processing other results instead of failing completely
continue
```
Also added recovery at the extraction level:
```python
except Exception as extractionError:
logger.error(f"Error extracting URLs and content from search results: {extractionError}")
# Try to recover at least URLs
try:
urls = [result.url for result in searchResults if hasattr(result, 'url') and result.url]
logger.info(f"Recovered {len(urls)} URLs after extraction error")
except Exception:
logger.error("Failed to recover any URLs from search results")
```
### 4. Incomplete Crawl Result Processing
**Problem:**
- When crawl returned results but individual page processing failed, entire crawl was lost
- No fallback to extract at least URLs from failed crawl results
- Missing content fields could cause errors when formatting results
**Fix:**
Added error handling for individual page processing:
```python
for i, result in enumerate(crawlResults, 1):
try:
# Format page content
# ... formatting logic ...
except Exception as pageError:
logger.warning(f"Error formatting page {i} from crawl: {pageError}")
# Try to add at least the URL
try:
pageUrls.append(result.url if hasattr(result, 'url') and result.url else webCrawlPrompt.url)
except Exception:
pass
```
Also ensured all result fields have safe defaults:
```python
results.append(WebCrawlResult(
url=result_url or url, # Fallback to base URL
content=result_content, # Already ensured to be string
title=result_title # Already ensured to be string
))
```
## Impact
These fixes ensure that:
1. **Content is always extracted** - Even when `raw_content` is `None`, the system falls back to `content` field or empty string
2. **Partial results are preserved** - If some results fail, others are still processed and returned
3. **URLs are recovered** - Even when content extraction fails completely, URLs can still be extracted for crawling
4. **No crashes from None values** - All `None` values are properly handled before operations like `len()` are called
## Testing Recommendations
- Test with Tavily search results that have `raw_content` set to `None`
- Test with mixed results (some with content, some without)
- Test error recovery when individual results fail
- Verify that URLs are still extracted even when content extraction fails