From cb7ed7cf51986871208b48e5ea7ad4046868ded1 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 30 Dec 2025 10:48:16 +0100
Subject: [PATCH] web service parallelized
---
.../services/serviceAi/subAiCallLooping.py | 37 +-
.../services/serviceAi/subStructureFilling.py | 69 ++--
modules/services/serviceWeb/mainServiceWeb.py | 342 +++++++++++-------
3 files changed, 285 insertions(+), 163 deletions(-)
diff --git a/modules/services/serviceAi/subAiCallLooping.py b/modules/services/serviceAi/subAiCallLooping.py
index bb1824c2..6e2c90b5 100644
--- a/modules/services/serviceAi/subAiCallLooping.py
+++ b/modules/services/serviceAi/subAiCallLooping.py
@@ -122,10 +122,14 @@ class AiCallLooper:
)
# Write the ACTUAL prompt sent to AI
- if iteration == 1:
- self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
- else:
- self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
+ # For section content generation: only write one prompt file (first iteration)
+ # For document generation: write prompt for each iteration
+ isSectionContent = "_section_" in debugPrefix
+ if iteration == 1 or not isSectionContent:
+ if iteration == 1:
+ self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
+ elif not isSectionContent:
+ self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.aiService.callAi(request)
result = response.content
@@ -146,10 +150,13 @@ class AiCallLooper:
self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})")
# Write raw AI response to debug file
- if iteration == 1:
- self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
- else:
- self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
+ # For section content generation: only write one response file (first iteration)
+ # For document generation: write response for each iteration
+ if iteration == 1 or not isSectionContent:
+ if iteration == 1:
+ self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
+ elif not isSectionContent:
+ self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration (only if workflow exists and has id)
if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id:
@@ -219,9 +226,9 @@ class AiCallLooper:
logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
- # Write final result
+ # Note: Debug files (_prompt and _response) are already written above for iteration 1
+ # No need to write _final_result as it's redundant with _response
final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result)
- self.services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result")
return final_json
# Extract sections from response (handles both valid and broken JSON)
@@ -397,7 +404,10 @@ class AiCallLooper:
self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})")
# Log merged sections for debugging
- self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
+ # For section content generation: skip merged sections debug files (only one prompt/response needed)
+ isSectionContent = "_section_" in debugPrefix
+ if not isSectionContent:
+ self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
# Check if we should continue (completion detection)
# Simple logic: JSON completeness determines continuation
@@ -465,7 +475,10 @@ class AiCallLooper:
final_result = self.responseParser.buildFinalResultFromSections(allSections, documentMetadata)
# Write final result to debug file
- self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
+ # For section content generation: skip final_result debug file (response already written)
+ isSectionContent = "_section_" in debugPrefix
+ if not isSectionContent:
+ self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
return final_result
diff --git a/modules/services/serviceAi/subStructureFilling.py b/modules/services/serviceAi/subStructureFilling.py
index 75642b48..138f6572 100644
--- a/modules/services/serviceAi/subStructureFilling.py
+++ b/modules/services/serviceAi/subStructureFilling.py
@@ -537,11 +537,6 @@ class StructureFiller:
try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt (aggregation)")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@@ -553,6 +548,12 @@ class StructureFiller:
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+ # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+
request = AiCallRequest(
prompt=generationPrompt,
contentParts=[],
@@ -564,6 +565,12 @@ class StructureFiller:
)
aiResponse = await self.aiService.callAi(request)
generatedElements = []
+
+ # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
else:
async def buildSectionPromptWithContinuation(
section: Dict[str, Any],
@@ -665,11 +672,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
- self.services.utils.writeDebugFile(
- aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response (aggregation)")
+ # Note: Debug files are written by _callAiWithLooping using debugPrefix
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
@@ -735,11 +738,6 @@ The JSON should be a fragment that can be merged with the previous response."""
try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@@ -751,6 +749,12 @@ The JSON should be a fragment that can be merged with the previous response."""
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+ # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+
request = AiCallRequest(
prompt=generationPrompt,
contentParts=[],
@@ -762,6 +766,12 @@ The JSON should be a fragment that can be merged with the previous response."""
)
aiResponse = await self.aiService.callAi(request)
generatedElements = []
+
+ # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
else:
isAggregation = False
@@ -865,11 +875,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
- self.services.utils.writeDebugFile(
- aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
+ # Note: Debug files are written by _callAiWithLooping using debugPrefix
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
@@ -968,11 +974,6 @@ The JSON should be a fragment that can be merged with the previous response."""
try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@@ -984,6 +985,12 @@ The JSON should be a fragment that can be merged with the previous response."""
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+ # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+
request = AiCallRequest(
prompt=generationPrompt,
contentParts=[],
@@ -995,6 +1002,12 @@ The JSON should be a fragment that can be merged with the previous response."""
)
aiResponse = await self.aiService.callAi(request)
generatedElements = []
+
+ # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
else:
isAggregation = False
@@ -1098,11 +1111,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
- self.services.utils.writeDebugFile(
- aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
+ # Note: Debug files are written by _callAiWithLooping using debugPrefix
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py
index 18176a92..50f7a84c 100644
--- a/modules/services/serviceWeb/mainServiceWeb.py
+++ b/modules/services/serviceWeb/mainServiceWeb.py
@@ -8,6 +8,8 @@ Manages the two-step process: WEB_SEARCH then WEB_CRAWL.
import json
import logging
import time
+import asyncio
+from urllib.parse import urlparse
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl
@@ -99,12 +101,18 @@ class WebService:
self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
- # Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering)
- if len(allUrls) > maxNumberPages:
- allUrls = allUrls[:maxNumberPages]
+ # Step 3: Validate and filter URLs before crawling
+ validatedUrls = self._validateUrls(allUrls)
+ if not validatedUrls:
+ logger.warning(f"All {len(allUrls)} URLs failed validation")
+ return {"error": "No valid URLs found to crawl"}
+
+ # Filter to maxNumberPages (simple cut, no intelligent filtering)
+ if len(validatedUrls) > maxNumberPages:
+ validatedUrls = validatedUrls[:maxNumberPages]
logger.info(f"Limited URLs to {maxNumberPages}")
- if not allUrls:
+ if not validatedUrls:
return {"error": "No URLs found to crawl"}
# Step 4: Translate researchDepth to maxDepth
@@ -114,14 +122,14 @@ class WebService:
# Step 5: Crawl all URLs with hierarchical logging
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating")
- self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs")
+ self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(validatedUrls)} URLs")
# Use parent operation ID directly (parentId should be operationId, not log entry ID)
parentOperationId = operationId # Use the parent's operationId directly
crawlResult = await self._performWebCrawl(
instruction=instruction,
- urls=allUrls,
+ urls=validatedUrls,
maxDepth=maxDepth,
parentOperationId=parentOperationId
)
@@ -194,24 +202,24 @@ class WebService:
"max_depth": maxDepth,
"country": countryCode,
"language": languageCode,
- "urls_crawled": allUrls[:20], # First 20 URLs for reference
- "total_urls": len(allUrls),
+ "urls_crawled": validatedUrls[:20], # First 20 URLs for reference
+ "total_urls": len(validatedUrls),
"urls_with_content": urlsWithContent,
"total_content_length": totalContentLength,
"crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None
},
"sections": sections,
"statistics": {
- "sectionCount": len(sections),
- "total_urls": len(allUrls),
+ "sectionCount": len(sections),
+ "total_urls": len(validatedUrls),
"results_count": totalResults,
"urls_with_content": urlsWithContent,
"total_content_length": totalContentLength
},
# Keep original structure for backward compatibility
"instruction": instruction,
- "urls_crawled": allUrls,
- "total_urls": len(allUrls),
+ "urls_crawled": validatedUrls,
+ "total_urls": len(validatedUrls),
"results": crawlResult,
"total_results": totalResults
}
@@ -383,6 +391,50 @@ Return ONLY valid JSON, no additional text:
logger.error(f"Error in web search: {str(e)}")
return []
+ def _validateUrls(self, urls: List[str]) -> List[str]:
+ """
+ Validate URLs before crawling - filters out invalid URLs.
+
+ Args:
+ urls: List of URLs to validate
+
+ Returns:
+ List of valid URLs
+ """
+ validatedUrls = []
+ for url in urls:
+ if not url or not isinstance(url, str):
+ logger.debug(f"Skipping invalid URL (not a string): {url}")
+ continue
+
+ url = url.strip()
+ if not url:
+ logger.debug(f"Skipping empty URL")
+ continue
+
+ # Basic URL validation using urlparse
+ try:
+ parsed = urlparse(url)
+ # Check if URL has at least scheme and netloc
+ if not parsed.scheme or not parsed.netloc:
+ logger.debug(f"Skipping invalid URL (missing scheme or netloc): {url}")
+ continue
+
+ # Only allow http/https schemes
+ if parsed.scheme not in ['http', 'https']:
+ logger.debug(f"Skipping URL with unsupported scheme '{parsed.scheme}': {url}")
+ continue
+
+ validatedUrls.append(url)
+ logger.debug(f"Validated URL: {url}")
+
+ except Exception as e:
+ logger.warning(f"Error validating URL '{url}': {str(e)}")
+ continue
+
+ logger.info(f"Validated {len(validatedUrls)}/{len(urls)} URLs")
+ return validatedUrls
+
async def _performWebCrawl(
self,
instruction: str,
@@ -390,117 +442,165 @@ Return ONLY valid JSON, no additional text:
maxDepth: int = 2,
parentOperationId: Optional[str] = None
) -> List[Dict[str, Any]]:
- """Perform web crawl on list of URLs - calls plugin for each URL individually."""
- crawlResults = []
-
- # Loop over each URL and crawl one at a time
+ """Perform web crawl on list of URLs - crawls URLs in parallel for better performance."""
+ # Create tasks for parallel crawling
+ crawlTasks = []
for urlIndex, url in enumerate(urls):
- # Create separate operation for each URL with parent reference
- urlOperationId = None
- if parentOperationId:
- workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
- urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}"
- self.services.chat.progressLogStart(
- urlOperationId,
- "Web Crawl",
- f"URL {urlIndex + 1}",
- url[:50] + "..." if len(url) > 50 else url,
- parentOperationId=parentOperationId
- )
-
- try:
- logger.info(f"Crawling URL {urlIndex + 1}/{len(urls)}: {url}")
-
- if urlOperationId:
- displayUrl = url[:50] + "..." if len(url) > 50 else url
- self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
- self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
-
- # Build crawl prompt model for single URL
- crawlPromptModel = AiCallPromptWebCrawl(
- instruction=instruction,
- url=url, # Single URL
- maxDepth=maxDepth,
- maxWidth=5 # Default: 5 pages per level
- )
- crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
-
- # Debug: persist crawl prompt (with URL identifier in content for clarity)
- debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
- self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
-
- # Call AI with WEB_CRAWL operation
- crawlOptions = AiCallOptions(
- operationType=OperationTypeEnum.WEB_CRAWL,
- resultFormat="json"
- )
-
- if urlOperationId:
- self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
-
- # Use unified callAiContent method with parentOperationId for hierarchical logging
- crawlResponse = await self.services.ai.callAiContent(
- prompt=crawlPrompt,
- options=crawlOptions,
- outputFormat="json",
- parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
- )
-
- if urlOperationId:
- self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
-
- # Extract content from AiResponse
- crawlResult = crawlResponse.content
-
- # Debug: persist crawl response
- if isinstance(crawlResult, str):
- self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
- else:
- self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
-
- # Parse crawl result
- if isinstance(crawlResult, str):
- try:
- # Extract JSON from response (handles markdown code blocks)
- extractedJson = self.services.utils.jsonExtractString(crawlResult)
- crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
- except:
- crawlData = {"url": url, "content": crawlResult}
- else:
- crawlData = crawlResult
-
- # Process crawl results and create hierarchical progress logging for sub-URLs
- if urlOperationId:
- self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
-
- # Recursively process crawl results to find nested URLs and create child operations
- processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
-
- # Count total URLs crawled (including sub-URLs) for progress message
- totalUrlsCrawled = self._countUrlsInResults(processedResults)
-
- # Ensure it's a list of results
- if isinstance(processedResults, list):
- crawlResults.extend(processedResults)
- elif isinstance(processedResults, dict):
- crawlResults.append(processedResults)
- else:
- crawlResults.append({"url": url, "content": str(processedResults)})
-
- if urlOperationId:
- if totalUrlsCrawled > 1:
- self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
- else:
- self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
- self.services.chat.progressLogFinish(urlOperationId, True)
-
- except Exception as e:
- logger.error(f"Error crawling URL {url}: {str(e)}")
- if urlOperationId:
- self.services.chat.progressLogFinish(urlOperationId, False)
- crawlResults.append({"url": url, "error": str(e)})
+ task = self._crawlSingleUrl(
+ url=url,
+ urlIndex=urlIndex,
+ totalUrls=len(urls),
+ instruction=instruction,
+ maxDepth=maxDepth,
+ parentOperationId=parentOperationId
+ )
+ crawlTasks.append(task)
- return crawlResults
+ # Execute all crawl tasks in parallel
+ logger.info(f"Starting parallel crawl of {len(urls)} URLs")
+ crawlResults = await asyncio.gather(*crawlTasks, return_exceptions=True)
+
+ # Process results and handle exceptions
+ processedResults = []
+ for idx, result in enumerate(crawlResults):
+ if isinstance(result, Exception):
+ logger.error(f"Error crawling URL {urls[idx]}: {str(result)}")
+ processedResults.append({"url": urls[idx], "error": str(result)})
+ else:
+ processedResults.extend(result if isinstance(result, list) else [result])
+
+ logger.info(f"Completed parallel crawl: {len(processedResults)} results")
+ return processedResults
+
+ async def _crawlSingleUrl(
+ self,
+ url: str,
+ urlIndex: int,
+ totalUrls: int,
+ instruction: str,
+ maxDepth: int,
+ parentOperationId: Optional[str] = None
+ ) -> List[Dict[str, Any]]:
+ """
+ Crawl a single URL - called in parallel for multiple URLs.
+
+ Args:
+ url: URL to crawl
+ urlIndex: Index of URL in the list
+ totalUrls: Total number of URLs being crawled
+ instruction: Research instruction
+ maxDepth: Maximum crawl depth
+ parentOperationId: Parent operation ID for progress tracking
+
+ Returns:
+ List of crawl results for this URL
+ """
+ # Create separate operation for each URL with parent reference
+ urlOperationId = None
+ if parentOperationId:
+ workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
+ urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}"
+ self.services.chat.progressLogStart(
+ urlOperationId,
+ "Web Crawl",
+ f"URL {urlIndex + 1}/{totalUrls}",
+ url[:50] + "..." if len(url) > 50 else url,
+ parentOperationId=parentOperationId
+ )
+
+ try:
+ logger.info(f"Crawling URL {urlIndex + 1}/{totalUrls}: {url}")
+
+ if urlOperationId:
+ displayUrl = url[:50] + "..." if len(url) > 50 else url
+ self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
+ self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
+
+ # Build crawl prompt model for single URL
+ crawlPromptModel = AiCallPromptWebCrawl(
+ instruction=instruction,
+ url=url, # Single URL
+ maxDepth=maxDepth,
+ maxWidth=5 # Default: 5 pages per level
+ )
+ crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
+
+ # Debug: persist crawl prompt (with URL identifier in content for clarity)
+ debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
+ self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
+
+ # Call AI with WEB_CRAWL operation
+ crawlOptions = AiCallOptions(
+ operationType=OperationTypeEnum.WEB_CRAWL,
+ resultFormat="json"
+ )
+
+ if urlOperationId:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
+
+ # Use unified callAiContent method with parentOperationId for hierarchical logging
+ crawlResponse = await self.services.ai.callAiContent(
+ prompt=crawlPrompt,
+ options=crawlOptions,
+ outputFormat="json",
+ parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
+ )
+
+ if urlOperationId:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
+
+ # Extract content from AiResponse
+ crawlResult = crawlResponse.content
+
+ # Debug: persist crawl response
+ if isinstance(crawlResult, str):
+ self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
+ else:
+ self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
+
+ # Parse crawl result
+ if isinstance(crawlResult, str):
+ try:
+ # Extract JSON from response (handles markdown code blocks)
+ extractedJson = self.services.utils.jsonExtractString(crawlResult)
+ crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
+ except:
+ crawlData = {"url": url, "content": crawlResult}
+ else:
+ crawlData = crawlResult
+
+ # Process crawl results and create hierarchical progress logging for sub-URLs
+ if urlOperationId:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
+
+ # Recursively process crawl results to find nested URLs and create child operations
+ processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
+
+ # Count total URLs crawled (including sub-URLs) for progress message
+ totalUrlsCrawled = self._countUrlsInResults(processedResults)
+
+ # Ensure it's a list of results
+ if isinstance(processedResults, list):
+ results = processedResults
+ elif isinstance(processedResults, dict):
+ results = [processedResults]
+ else:
+ results = [{"url": url, "content": str(processedResults)}]
+
+ if urlOperationId:
+ if totalUrlsCrawled > 1:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
+ else:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
+ self.services.chat.progressLogFinish(urlOperationId, True)
+
+ return results
+
+ except Exception as e:
+ logger.error(f"Error crawling URL {url}: {str(e)}")
+ if urlOperationId:
+ self.services.chat.progressLogFinish(urlOperationId, False)
+ return [{"url": url, "error": str(e)}]
def _processCrawlResultsWithHierarchy(
self,