diff --git a/appdoc/doc_gateway_architecture_overview.md b/appdoc/doc_gateway_architecture_overview.md index a68c9d0..7fb0496 100644 --- a/appdoc/doc_gateway_architecture_overview.md +++ b/appdoc/doc_gateway_architecture_overview.md @@ -64,7 +64,7 @@ flowchart LR %% Service Center subgraph ServiceCenter[gateway/modules/services/__init__.py] - SC[Services registry/factory
(PublicService wrappers)] + SC[Services registry/factory
PublicService wrappers] end %% Services @@ -110,7 +110,7 @@ flowchart LR DMVoice[datamodelVoice] DMNeut[datamodelNeutralizer] DMWork[datamodelWorkflow] - DMInit[__init__ (namespace)] + DMInit[__init__ namespace] end %% Workflows diff --git a/implementation/Chatbot/implementation_SSE.md b/implementation/Chatbot/implementation_SSE.md new file mode 100644 index 0000000..3748814 --- /dev/null +++ b/implementation/Chatbot/implementation_SSE.md @@ -0,0 +1,236 @@ +# Streaming Utility Architecture: Event-Driven Real-Time Updates + +## Current Implementation + +### Event Manager (`modules/features/chatbot/eventManager.py`) + +The `StreamingEventManager` is a **generic, reusable** event manager that provides: + +- **Generic Event Queue Management**: Per-context asyncio queues (not just workflows) +- **Event Emission**: `emit_event()` method supporting multiple event types and categories +- **Event Streaming**: `stream_events()` async generator for SSE streaming +- **Automatic Cleanup**: Queue cleanup after delay (60 seconds default) +- **Event Categories**: Filtering by category (chat, workflow, document, etc.) +- **Thread-Safe**: Lock-based synchronization for concurrent access + +### Architecture Overview + +The streaming system uses a **pure event-driven approach**: + +1. **Event Emission**: When data changes (messages created, logs written), events are emitted directly +2. **Event Queue**: Events are queued per context (workflow_id, document_id, etc.) +3. **SSE Streaming**: Route endpoint streams events from queue in real-time +4. **No Database Polling**: The SSE endpoint does NOT poll the database - it only streams queued events + +### Current Usage: Chatbot Feature + +The chatbot feature (`modules/features/chatbot/`) demonstrates the streaming architecture: + +- **Event Types**: `chatdata`, `complete`, `stopped`, `error` +- **Event Categories**: `chat`, `workflow` +- **Direct Emission**: Events emitted when messages/logs are created in `mainChatbot.py` + +## Implementation Details + +### 1. Event Manager API (`modules/features/chatbot/eventManager.py`) + +```python +class StreamingEventManager: + """ + Generic event manager for real-time streaming across all features. + Supports multiple event types and contexts (workflows, documents, tasks, etc.) + Thread-safe event emission and queue management. + """ + + def __init__(self): + self._queues: Dict[str, asyncio.Queue] = {} + self._locks: Dict[str, asyncio.Lock] = {} + self._cleanup_tasks: Dict[str, asyncio.Task] = {} + self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids + + def create_queue(self, context_id: str) -> asyncio.Queue: + """Create a new event queue for a context""" + + def get_queue(self, context_id: str) -> Optional[asyncio.Queue]: + """Get existing event queue for a context""" + + def has_queue(self, context_id: str) -> bool: + """Check if a queue exists for a context""" + + async def emit_event( + self, + context_id: str, # workflow_id, document_id, task_id, etc. + event_type: str, # "message", "log", "status", "progress", "complete", "error", "chatdata" + data: Dict[str, Any], # Flexible data structure + event_category: str = "default", # "chat", "workflow", "document", etc. + message: Optional[str] = None, # For backward compatibility + step: Optional[str] = None # For backward compatibility + ): + """Emit event to the context's event queue""" + + async def stream_events( + self, + context_id: str, + event_categories: Optional[List[str]] = None, + timeout: Optional[float] = None + ) -> AsyncIterator[Dict[str, Any]]: + """Async generator for streaming events from a context""" + + async def cleanup(self, context_id: str, delay: float = 60.0): + """Schedule cleanup of event queue after delay""" +``` + +**Global Singleton**: Access via `get_event_manager()` function + +### 2. SSE Route Implementation (`modules/routes/routeChatbot.py`) + +The chatbot streaming endpoint (`/api/chatbot/start/stream`) demonstrates the pattern: + +```python +@router.post("/start/stream") +async def stream_chatbot_start(...) -> StreamingResponse: + event_manager = get_event_manager() + + # Start background processing (creates workflow and event queue) + workflow = await chatProcess(currentUser, userInput, workflowId) + + # Get or create event queue + queue = event_manager.get_queue(workflow.id) or event_manager.create_queue(workflow.id) + + async def event_stream(): + """Pure event-driven streaming (no database polling)""" + # 1. Send initial chat data once (from database) + chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) + if chatData.get("items"): + for item in filtered_items: + yield f"data: {json.dumps(item)}\n\n" + + # 2. Stream events from queue (event-driven) + while True: + try: + # Get event from queue (blocks until event available) + event = await asyncio.wait_for(queue.get(), timeout=1.0) + + # Handle event types + if event["type"] == "chatdata": + yield f"data: {json.dumps(event["data"])}\n\n" + elif event["type"] == "complete": + break # Close stream + # ... other event types + + except asyncio.TimeoutError: + # Send keepalive every 30 seconds + yield f": keepalive\n\n" + continue + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" + } + ) +``` + +**Key Points**: +- **Initial Data**: Fetched once from database at stream start +- **Event Streaming**: Pure event-driven from queue (no polling) +- **Keepalive**: Sent every 30 seconds to keep connection alive +- **Status Check**: Periodic workflow status check (every 5 seconds) only for stopped detection + +### 3. Event Emission in Processing Code + +#### A. Chatbot Message Processing (`modules/features/chatbot/mainChatbot.py`) + +Events are emitted **directly when data is created**: + +```python +from modules.features.chatbot.eventManager import get_event_manager + +event_manager = get_event_manager() + +# When creating a user message +userMessage = interfaceDbChat.createMessage(userMessageData) + +# Emit event immediately (exact chatData format) +await event_manager.emit_event( + context_id=workflow.id, + event_type="chatdata", + data={ + "type": "message", + "createdAt": message_timestamp, + "item": userMessage.dict() + }, + event_category="chat" +) + +# When creating assistant message +assistantMessage = interfaceDbChat.createMessage(assistantMessageData) + +# Emit event immediately +await event_manager.emit_event( + context_id=workflowId, + event_type="chatdata", + data={ + "type": "message", + "createdAt": message_timestamp, + "item": assistantMessage.dict() + }, + event_category="chat" +) + +# When workflow completes +await event_manager.emit_event( + context_id=workflowId, + event_type="complete", + data={"workflowId": workflowId}, + event_category="workflow", + message="Chatbot-Verarbeitung abgeschlossen", + step="complete" +) +``` + +#### B. Log Events + +Logs are stored in database and then emitted as events: + +```python +# Store log in database +log_data = { + "id": f"log_{uuid.uuid4()}", + "workflowId": workflowId, + "message": "Analysiere Benutzeranfrage...", + "type": "info", + "timestamp": getUtcTimestamp(), + "status": "running", + "roundNumber": round_number +} +interfaceDbChat.createLog(log_data) + +# Note: Logs are emitted via the route's periodic chatData fetch mechanism +# OR can be emitted directly as events if needed +``` + +**Event Format**: Events use the exact `chatData` format: `{type, createdAt, item}` + +## Benefits of Event-Driven Streaming + +### Performance +- **Reduced Server Load**: No constant database queries every 0.5-3 seconds +- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms) +- **Bandwidth Efficiency**: Only send data when it changes, not empty responses +- **No Database Polling**: SSE endpoint does NOT poll database - pure event-driven + +### User Experience +- **Real-time Updates**: Users see progress instantly as events occur +- **Better Responsiveness**: No perceived delay from polling intervals +- **Reduced Battery**: Mobile devices consume less power without constant polling +- **Immediate Feedback**: Messages appear as soon as they're created + +### Scalability +- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) in future +- **Connection Management**: Better handling of many concurrent streams +- **Resource Efficiency**: One persistent connection vs many HTTP requests +- **Memory Efficient**: Queues cleaned up automatically after workflow completion diff --git a/implementation/Chatbot/implementation_chatbot_feature.md b/implementation/Chatbot/implementation_chatbot_feature.md new file mode 100644 index 0000000..604d406 --- /dev/null +++ b/implementation/Chatbot/implementation_chatbot_feature.md @@ -0,0 +1,378 @@ +# Chatbot vs FastTrack: Architecture Comparison + +## Chatbot Basic Implementation + +### Overview + +The chatbot is a specialized feature designed for handling user queries that require database access or web research. It provides immediate responses by returning a workflow object instantly, then processes the request asynchronously in the background. + +### Core Purpose + +The chatbot analyzes user input to determine what database queries or web research are needed, executes them, and generates comprehensive answers based on real data rather than just AI knowledge. + +### Main Entry Point + +**Function**: `modules/features/chatbot/mainChatbot.py::chatProcess()` + +**Signature**: +```python +async def chatProcess( + currentUser: User, + userInput: UserInputRequest, + workflowId: Optional[str] = None +) -> ChatWorkflow +``` + +### Basic Flow + +1. **Workflow Creation/Resumption** + - Creates new workflow or resumes existing one + - Generates conversation name from user prompt + - Sets workflow mode to `WORKFLOW_CHATBOT` + - Creates event queue for streaming + +2. **Message Storage** + - Stores user message immediately + - Emits message event for streaming + - Returns workflow object (instant response to user) + +3. **Background Processing** (async) + - Analyzes user input to determine query needs + - Generates SQL queries if database access needed + - Executes queries in parallel + - Performs web research if needed + - Generates final answer with all results + +### Key Components + +#### 1. Analysis Phase (`_processChatbotMessage`) + +**Purpose**: Determines what queries/research are needed + +**Implementation**: +- Uses `get_initial_analysis_prompt()` from `chatbotConstants.py` +- Calls AI via `MethodAi.process()` with `simpleMode=True` +- Returns JSON with: + - `needsDatabaseQuery`: Boolean + - `needsWebResearch`: Boolean + - `sqlQueries[]`: Array of SQL query objects + - `reasoning`: Explanation of analysis + +**Query Structure**: +```json +{ + "query": "SELECT ...", + "purpose": "Description of what query retrieves", + "table": "Primary table name" +} +``` + +#### 2. Query Execution (`_execute_queries_parallel`) + +**Purpose**: Executes multiple SQL queries simultaneously + +**Implementation**: +- Uses `PreprocessorConnector` for database access +- Executes all queries in parallel via `asyncio.gather()` +- Returns results as dictionary: + - `query_1`, `query_2`, etc.: Success result text + - `query_1_data`, `query_2_data`, etc.: Raw data arrays + - `query_1_error`, `query_2_error`, etc.: Error messages if failed + +**Benefits**: +- Parallel execution = faster overall time +- Continues even if some queries fail +- Provides detailed error information per query + +#### 3. Web Research (`_buildWebResearchQuery`) + +**Purpose**: Enriches web search queries with context from conversation + +**Implementation**: +- Extracts product information from: + - Current user prompt (article numbers, product mentions) + - Database query results (if available) + - Previous assistant messages (conversation history) +- Builds enriched search query with article number, description, supplier +- Calls `services.web.performWebResearch()` + +#### 4. Final Answer Generation + +**Purpose**: Combines all results into comprehensive answer + +**Implementation**: +- Uses `get_final_answer_system_prompt()` for structured response +- Builds context with: + - User question + - Database query results (organized by query number) + - Web research results + - Error information if queries failed +- Single AI call with all data +- Streams result as assistant message + +### Key Functions + +| Function | Purpose | Location | +|----------|---------|----------| +| `chatProcess()` | Main entry point, creates workflow and starts processing | `mainChatbot.py:60` | +| `_processChatbotMessage()` | Background processing: analysis → execution → answer | `mainChatbot.py:522` | +| `_execute_queries_parallel()` | Executes multiple SQL queries in parallel | `mainChatbot.py:194` | +| `_buildWebResearchQuery()` | Enriches web search with conversation context | `mainChatbot.py:318` | +| `_extractJsonFromResponse()` | Extracts JSON from AI response (handles markdown) | `mainChatbot.py:33` | +| `_emit_log_and_event()` | Stores logs and emits events for streaming | `mainChatbot.py:254` | +| `get_initial_analysis_prompt()` | System prompt for query analysis | `chatbotConstants.py` | +| `get_final_answer_system_prompt()` | System prompt for final answer generation | `chatbotConstants.py` | +| `generate_conversation_name()` | Generates conversation name from user prompt | `chatbotConstants.py` | + +### Database Schema + +The chatbot has knowledge of the database schema: + +**Tables**: +- `Artikel`: Product information (I_ID, Artikelbezeichnung, Artikelnummer, etc.) +- `Einkaufspreis`: Price data (m_Artikel, EP_CHF) +- `Lagerplatz_Artikel`: Stock and warehouse location data (R_ARTIKEL, R_LAGERPLATZ, Bestände, etc.) +- `Lagerplatz`: Warehouse location names (I_ID, Lagerplatz, R_LAGER, R_LAGERORT) + +**Relationships**: +- `Artikel.I_ID = Einkaufspreis.m_Artikel` +- `Artikel.I_ID = Lagerplatz_Artikel.R_ARTIKEL` +- `Lagerplatz_Artikel.R_LAGERPLATZ = Lagerplatz.I_ID` + +### Streaming Architecture + +**Route**: `/api/chatbot/start/stream` + +**Format**: Server-Sent Events (SSE) + +**Data Format**: Exact `chatData` format: +```json +{ + "type": "message" | "log" | "stat", + "createdAt": "timestamp", + "item": { ... } +} +``` + +**Features**: +- Initial chat data sent immediately +- Periodic fetching of new chat data (every 0.5s) +- Event queue for real-time updates +- Round-based filtering for resumed conversations + +### Error Handling + +**Strategy**: Graceful degradation + +- If analysis fails: Uses fallback empty analysis +- If queries fail: Logs errors per query, continues with successful ones +- If web research fails: Logs warning, continues without web data +- If final answer fails: Stores error message, updates workflow status + +**Result**: Always provides some response, even if partial + +### Workflow States + +- `running`: Processing in progress +- `completed`: Successfully finished +- `stopped`: User stopped the workflow +- `error`: Error occurred during processing + +### Key Design Decisions + +1. **Immediate Return**: Returns workflow object instantly, processes in background +2. **Parallel Execution**: Executes multiple queries simultaneously for speed +3. **Streaming Feedback**: Provides real-time progress updates +4. **Data-Driven**: Uses real database/web data rather than AI knowledge only +5. **Graceful Degradation**: Continues with partial results if some steps fail +6. **Conversation Context**: Uses conversation history to enrich queries + +--- + +## Comparison Chatbot and Fasttrack Workflow + +This document compares two approaches for handling simple user requests: +- **Chatbot**: A specialized feature for database queries and web research +- **FastTrack**: A fast path optimization in the general workflow system + +Both serve similar purposes but have different execution models and characteristics. + +## Chatbot Architecture + +### Execution Flow + +``` +User sends message + ↓ +[STEP 1] Store message immediately → RETURN workflow (instant response) + ↓ +[STEP 2] Background processing starts (async) + ↓ +[STEP 3] Focused Analysis (~2-5s) + - Determines: needsDatabaseQuery? needsWebResearch? + - Generates SQL queries if needed + - Lightweight, purpose-specific prompt + ↓ +[STEP 4] Execute queries in PARALLEL (if DB needed) + - Multiple SQL queries run simultaneously + - Database execution is fast + ↓ +[STEP 5] Web research (if needed, parallel to DB) + ↓ +[STEP 6] Final AI call with ALL results (~5-10s) + - Has actual data from DB/web + - Generates comprehensive answer + ↓ +Stream everything back (queries, results, final answer) + ↓ +Done (Total: ~7-20s, but user sees response immediately) +``` + +### Key Characteristics + +- **Immediate Response**: Returns workflow object immediately, processes in background +- **Focused Analysis**: Single-purpose analysis to determine DB/web needs +- **Parallel Execution**: Executes multiple SQL queries simultaneously +- **Data-Driven**: Uses real database and web research results +- **Streaming Feedback**: Streams queries, results, and progress updates +- **Workflow Mode**: Uses `WorkflowModeEnum.WORKFLOW_CHATBOT` + +### Implementation Details + +**Entry Point**: `modules/features/chatbot/mainChatbot.py::chatProcess()` + +**Analysis Phase**: +- Uses `get_initial_analysis_prompt()` for focused analysis +- Returns: `needsDatabaseQuery`, `needsWebResearch`, `sqlQueries[]` +- Executes via `MethodAi.process()` with `simpleMode=True` + +**Query Execution**: +- Parallel execution via `_execute_queries_parallel()` +- Uses `PreprocessorConnector` for database access +- Results stored as `query_1`, `query_2_data`, etc. + +**Final Answer**: +- Single AI call with all query results and web research +- Uses `get_final_answer_system_prompt()` for structured response +- Streams result as assistant message + +## FastTrack Architecture + +### Execution Flow + +``` +User sends message + ↓ +[STEP 1] COMBINED ANALYSIS (heavy AI call - ~5-10s) + - Analyzes complexity, language, intent + - Normalizes request (full restatement) + - Extracts context items + - Determines dataType, expectedFormats, qualityRequirements + - Checks needsWorkflowHistory + - Determines fastTrack eligibility + ↓ +[STEP 2] If simple → FastTrack path + ↓ +[STEP 3] FastTrack AI call (~5-15s) + - Single AI call with prompt + - Basic processing mode + - Max 15s timeout + ↓ +[STEP 4] Store answer + ↓ +Done (Total: ~10-25s + overhead) +``` + +### Key Characteristics + +- **Comprehensive Analysis**: Multi-purpose analysis covering 11 different aspects +- **Sequential Execution**: Single AI call after analysis +- **Knowledge-Based**: Relies on AI's training data (no database access) +- **Silent Processing**: No intermediate feedback until final answer +- **Workflow Integration**: Part of general workflow system + +### Implementation Details + +**Entry Point**: `modules/workflows/workflowManager.py::_executeFastPath()` + +**Analysis Phase**: +- Uses `_analyzeUserInputAndComplexity()` for comprehensive analysis +- Returns: `complexity`, `detectedLanguage`, `normalizedRequest`, `intent`, `contextItems`, `dataType`, `expectedFormats`, `qualityRequirements`, `successCriteria`, `needsWorkflowHistory`, `fastTrack` +- Executes via `services.ai.callAiPlanning()` + +**FastTrack Execution**: +- Single AI call via `workflowProcessor.fastPathExecute()` +- Uses `callWithTextContext()` for text-only responses +- Processing mode: `BASIC` +- Max cost: 0.10, Max time: 15s + +**Response**: +- Creates `ActionDocument` with response text +- Stores as assistant message with `status="last"` + +## Comparison + +### Performance Characteristics + +| Aspect | Chatbot | FastTrack | +|--------|---------|-----------| +| **Time to first response** | Instant (returns workflow immediately) | ~5-10s (waits for analysis) | +| **Initial analysis** | 2-5s (focused) | 5-10s (comprehensive) | +| **Query execution** | Parallel (fast) | N/A (no DB) | +| **Final answer** | 5-10s (with real data) | 5-15s (AI knowledge only) | +| **User sees progress** | Yes (streaming) | No (silent) | +| **Total perceived time** | ~2-5s (feels instant) | ~10-25s (feels slower) | + +### Analysis Complexity + +**Chatbot Analysis**: +- Single-purpose: Determines if DB/web research needed +- Lightweight prompt focused on query generation +- Returns: `needsDatabaseQuery`, `needsWebResearch`, `sqlQueries[]` + +**FastTrack Analysis**: +- Multi-purpose: Comprehensive request analysis +- Detailed prompt covering 11 different aspects +- Returns: Complexity, language, intent, normalized request, context items, data type, formats, quality requirements, success criteria, workflow history needs, fastTrack eligibility + +### Data Access + +**Chatbot**: +- ✅ Direct database access via `PreprocessorConnector` +- ✅ Web research via `services.web.performWebResearch()` +- ✅ Uses real data for answers + +**FastTrack**: +- ❌ No database access +- ❌ No web research +- ✅ Uses AI's training knowledge + +### User Experience + +**Chatbot**: +- Immediate workflow return +- Streaming progress updates +- See queries being generated +- See query results +- See final answer being built + +**FastTrack**: +- Waits for analysis completion +- Silent processing +- Single final answer +- No intermediate feedback + +### Code Complexity + +**Chatbot**: +- Single-purpose feature +- Focused code path +- Direct query execution +- Minimal conditional logic + +**FastTrack**: +- Part of larger workflow system +- Multiple routing decisions +- Integrated with task planning +- More conditional branches + diff --git a/implementation/Chatbot/implementation_web_research_fix.md b/implementation/Chatbot/implementation_web_research_fix.md new file mode 100644 index 0000000..420a984 --- /dev/null +++ b/implementation/Chatbot/implementation_web_research_fix.md @@ -0,0 +1,134 @@ +# Web Search Content Extraction Fixes + +## Problem Summary + +The Tavily web search integration was failing to extract content from search results, causing web research to return empty or incomplete data. The main issues were related to handling `None` values and incomplete error recovery. + +## Main Issues Fixed + +### 1. Incomplete Content Extraction from Search Results + +**Problem:** +- When Tavily API returned search results, some results had `raw_content` set to `None` (not missing, but explicitly `None`) +- The code used `result.get("raw_content") or result.get("content", "")` which failed when `raw_content` existed but was `None` +- This caused `None` values to propagate through the system instead of falling back to the `content` field or empty string + +**Fix:** +Changed the content extraction in `aicorePluginTavily.py` to properly handle `None` values: +```python +# Before (line 344): +rawContent=result.get("raw_content") or result.get("content", "") + +# After: +rawContent=result.get("raw_content") or result.get("content") or "" +``` + +This ensures that if `raw_content` is `None`, it falls back to `content`, and if that's also `None`, it defaults to an empty string. + +**Additional Fix:** +Added defensive checks in the `webSearch` method to safely extract content even when result objects have unexpected structures: +```python +# Safely extract content with multiple fallbacks +content = "" +if hasattr(result, 'rawContent'): + content = result.rawContent or "" +if not content and hasattr(result, 'content'): + content = result.content or "" +``` + +### 2. NoneType Error When Logging Content Length + +**Problem:** +- Code attempted to check `len(first_result.get('raw_content', ''))` for logging +- When `raw_content` key existed but value was `None`, `.get()` returned `None` instead of the default `''` +- This caused `len(None)` to fail with `TypeError: object of type 'NoneType' has no len()` + +**Fix:** +Changed the logging code to safely handle `None` values: +```python +# Before (line 338): +logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(first_result.get('raw_content', ''))}") + +# After: +raw_content = first_result.get('raw_content') or '' +logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(raw_content)}") +``` + +### 3. Missing Error Recovery in Content Extraction + +**Problem:** +- When processing search results, if one result failed to extract, the entire extraction could fail +- No recovery mechanism to extract at least URLs even when content extraction failed +- Errors were logged but processing stopped, losing potentially useful data + +**Fix:** +Added per-result error handling with recovery: +```python +for result in searchResults: + try: + # Extract URL, content, title safely + # ... extraction logic ... + except Exception as resultError: + logger.warning(f"Error processing individual search result: {resultError}") + # Continue processing other results instead of failing completely + continue +``` + +Also added recovery at the extraction level: +```python +except Exception as extractionError: + logger.error(f"Error extracting URLs and content from search results: {extractionError}") + # Try to recover at least URLs + try: + urls = [result.url for result in searchResults if hasattr(result, 'url') and result.url] + logger.info(f"Recovered {len(urls)} URLs after extraction error") + except Exception: + logger.error("Failed to recover any URLs from search results") +``` + +### 4. Incomplete Crawl Result Processing + +**Problem:** +- When crawl returned results but individual page processing failed, entire crawl was lost +- No fallback to extract at least URLs from failed crawl results +- Missing content fields could cause errors when formatting results + +**Fix:** +Added error handling for individual page processing: +```python +for i, result in enumerate(crawlResults, 1): + try: + # Format page content + # ... formatting logic ... + except Exception as pageError: + logger.warning(f"Error formatting page {i} from crawl: {pageError}") + # Try to add at least the URL + try: + pageUrls.append(result.url if hasattr(result, 'url') and result.url else webCrawlPrompt.url) + except Exception: + pass +``` + +Also ensured all result fields have safe defaults: +```python +results.append(WebCrawlResult( + url=result_url or url, # Fallback to base URL + content=result_content, # Already ensured to be string + title=result_title # Already ensured to be string +)) +``` + +## Impact + +These fixes ensure that: +1. **Content is always extracted** - Even when `raw_content` is `None`, the system falls back to `content` field or empty string +2. **Partial results are preserved** - If some results fail, others are still processed and returned +3. **URLs are recovered** - Even when content extraction fails completely, URLs can still be extracted for crawling +4. **No crashes from None values** - All `None` values are properly handled before operations like `len()` are called + +## Testing Recommendations + +- Test with Tavily search results that have `raw_content` set to `None` +- Test with mixed results (some with content, some without) +- Test error recovery when individual results fail +- Verify that URLs are still extracted even when content extraction fails