web service parallelized

This commit is contained in:
ValueOn AG 2025-12-30 10:48:16 +01:00
parent 0d77263fb7
commit cb7ed7cf51
3 changed files with 285 additions and 163 deletions

View file

@ -122,10 +122,14 @@ class AiCallLooper:
) )
# Write the ACTUAL prompt sent to AI # Write the ACTUAL prompt sent to AI
if iteration == 1: # For section content generation: only write one prompt file (first iteration)
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt") # For document generation: write prompt for each iteration
else: isSectionContent = "_section_" in debugPrefix
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}") if iteration == 1 or not isSectionContent:
if iteration == 1:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
elif not isSectionContent:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.aiService.callAi(request) response = await self.aiService.callAi(request)
result = response.content result = response.content
@ -146,10 +150,13 @@ class AiCallLooper:
self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})") self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})")
# Write raw AI response to debug file # Write raw AI response to debug file
if iteration == 1: # For section content generation: only write one response file (first iteration)
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response") # For document generation: write response for each iteration
else: if iteration == 1 or not isSectionContent:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}") if iteration == 1:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
elif not isSectionContent:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration (only if workflow exists and has id) # Emit stats for this iteration (only if workflow exists and has id)
if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id: if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id:
@ -219,9 +226,9 @@ class AiCallLooper:
logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly") logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly")
if iterationOperationId: if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True) self.services.chat.progressLogFinish(iterationOperationId, True)
# Write final result # Note: Debug files (_prompt and _response) are already written above for iteration 1
# No need to write _final_result as it's redundant with _response
final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result) final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result)
self.services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result")
return final_json return final_json
# Extract sections from response (handles both valid and broken JSON) # Extract sections from response (handles both valid and broken JSON)
@ -397,7 +404,10 @@ class AiCallLooper:
self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})") self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})")
# Log merged sections for debugging # Log merged sections for debugging
self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}") # For section content generation: skip merged sections debug files (only one prompt/response needed)
isSectionContent = "_section_" in debugPrefix
if not isSectionContent:
self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
# Check if we should continue (completion detection) # Check if we should continue (completion detection)
# Simple logic: JSON completeness determines continuation # Simple logic: JSON completeness determines continuation
@ -465,7 +475,10 @@ class AiCallLooper:
final_result = self.responseParser.buildFinalResultFromSections(allSections, documentMetadata) final_result = self.responseParser.buildFinalResultFromSections(allSections, documentMetadata)
# Write final result to debug file # Write final result to debug file
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result") # For section content generation: skip final_result debug file (response already written)
isSectionContent = "_section_" in debugPrefix
if not isSectionContent:
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
return final_result return final_result

View file

@ -537,11 +537,6 @@ class StructureFiller:
try: try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
self.services.utils.writeDebugFile(
generationPrompt,
f"{chapterId}_section_{sectionId}_prompt"
)
logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt (aggregation)")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@ -553,6 +548,12 @@ class StructureFiller:
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
# Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
self.services.utils.writeDebugFile(
generationPrompt,
f"{chapterId}_section_{sectionId}_prompt"
)
request = AiCallRequest( request = AiCallRequest(
prompt=generationPrompt, prompt=generationPrompt,
contentParts=[], contentParts=[],
@ -564,6 +565,12 @@ class StructureFiller:
) )
aiResponse = await self.aiService.callAi(request) aiResponse = await self.aiService.callAi(request)
generatedElements = [] generatedElements = []
# Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
self.services.utils.writeDebugFile(
aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
f"{chapterId}_section_{sectionId}_response"
)
else: else:
async def buildSectionPromptWithContinuation( async def buildSectionPromptWithContinuation(
section: Dict[str, Any], section: Dict[str, Any],
@ -665,11 +672,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = [] generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
self.services.utils.writeDebugFile( # Note: Debug files are written by _callAiWithLooping using debugPrefix
aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
f"{chapterId}_section_{sectionId}_response"
)
logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response (aggregation)")
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
@ -735,11 +738,6 @@ The JSON should be a fragment that can be merged with the previous response."""
try: try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
self.services.utils.writeDebugFile(
generationPrompt,
f"{chapterId}_section_{sectionId}_prompt"
)
logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@ -751,6 +749,12 @@ The JSON should be a fragment that can be merged with the previous response."""
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
# Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
self.services.utils.writeDebugFile(
generationPrompt,
f"{chapterId}_section_{sectionId}_prompt"
)
request = AiCallRequest( request = AiCallRequest(
prompt=generationPrompt, prompt=generationPrompt,
contentParts=[], contentParts=[],
@ -762,6 +766,12 @@ The JSON should be a fragment that can be merged with the previous response."""
) )
aiResponse = await self.aiService.callAi(request) aiResponse = await self.aiService.callAi(request)
generatedElements = [] generatedElements = []
# Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
self.services.utils.writeDebugFile(
aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
f"{chapterId}_section_{sectionId}_response"
)
else: else:
isAggregation = False isAggregation = False
@ -865,11 +875,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = [] generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
self.services.utils.writeDebugFile( # Note: Debug files are written by _callAiWithLooping using debugPrefix
aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
f"{chapterId}_section_{sectionId}_response"
)
logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
@ -968,11 +974,6 @@ The JSON should be a fragment that can be merged with the previous response."""
try: try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
self.services.utils.writeDebugFile(
generationPrompt,
f"{chapterId}_section_{sectionId}_prompt"
)
logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@ -984,6 +985,12 @@ The JSON should be a fragment that can be merged with the previous response."""
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
# Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
self.services.utils.writeDebugFile(
generationPrompt,
f"{chapterId}_section_{sectionId}_prompt"
)
request = AiCallRequest( request = AiCallRequest(
prompt=generationPrompt, prompt=generationPrompt,
contentParts=[], contentParts=[],
@ -995,6 +1002,12 @@ The JSON should be a fragment that can be merged with the previous response."""
) )
aiResponse = await self.aiService.callAi(request) aiResponse = await self.aiService.callAi(request)
generatedElements = [] generatedElements = []
# Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
self.services.utils.writeDebugFile(
aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
f"{chapterId}_section_{sectionId}_response"
)
else: else:
isAggregation = False isAggregation = False
@ -1098,11 +1111,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = [] generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
self.services.utils.writeDebugFile( # Note: Debug files are written by _callAiWithLooping using debugPrefix
aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
f"{chapterId}_section_{sectionId}_response"
)
logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")

View file

@ -8,6 +8,8 @@ Manages the two-step process: WEB_SEARCH then WEB_CRAWL.
import json import json
import logging import logging
import time import time
import asyncio
from urllib.parse import urlparse
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl
@ -99,12 +101,18 @@ class WebService:
self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
# Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering) # Step 3: Validate and filter URLs before crawling
if len(allUrls) > maxNumberPages: validatedUrls = self._validateUrls(allUrls)
allUrls = allUrls[:maxNumberPages] if not validatedUrls:
logger.warning(f"All {len(allUrls)} URLs failed validation")
return {"error": "No valid URLs found to crawl"}
# Filter to maxNumberPages (simple cut, no intelligent filtering)
if len(validatedUrls) > maxNumberPages:
validatedUrls = validatedUrls[:maxNumberPages]
logger.info(f"Limited URLs to {maxNumberPages}") logger.info(f"Limited URLs to {maxNumberPages}")
if not allUrls: if not validatedUrls:
return {"error": "No URLs found to crawl"} return {"error": "No URLs found to crawl"}
# Step 4: Translate researchDepth to maxDepth # Step 4: Translate researchDepth to maxDepth
@ -114,14 +122,14 @@ class WebService:
# Step 5: Crawl all URLs with hierarchical logging # Step 5: Crawl all URLs with hierarchical logging
if operationId: if operationId:
self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating")
self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(validatedUrls)} URLs")
# Use parent operation ID directly (parentId should be operationId, not log entry ID) # Use parent operation ID directly (parentId should be operationId, not log entry ID)
parentOperationId = operationId # Use the parent's operationId directly parentOperationId = operationId # Use the parent's operationId directly
crawlResult = await self._performWebCrawl( crawlResult = await self._performWebCrawl(
instruction=instruction, instruction=instruction,
urls=allUrls, urls=validatedUrls,
maxDepth=maxDepth, maxDepth=maxDepth,
parentOperationId=parentOperationId parentOperationId=parentOperationId
) )
@ -194,24 +202,24 @@ class WebService:
"max_depth": maxDepth, "max_depth": maxDepth,
"country": countryCode, "country": countryCode,
"language": languageCode, "language": languageCode,
"urls_crawled": allUrls[:20], # First 20 URLs for reference "urls_crawled": validatedUrls[:20], # First 20 URLs for reference
"total_urls": len(allUrls), "total_urls": len(validatedUrls),
"urls_with_content": urlsWithContent, "urls_with_content": urlsWithContent,
"total_content_length": totalContentLength, "total_content_length": totalContentLength,
"crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None "crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None
}, },
"sections": sections, "sections": sections,
"statistics": { "statistics": {
"sectionCount": len(sections), "sectionCount": len(sections),
"total_urls": len(allUrls), "total_urls": len(validatedUrls),
"results_count": totalResults, "results_count": totalResults,
"urls_with_content": urlsWithContent, "urls_with_content": urlsWithContent,
"total_content_length": totalContentLength "total_content_length": totalContentLength
}, },
# Keep original structure for backward compatibility # Keep original structure for backward compatibility
"instruction": instruction, "instruction": instruction,
"urls_crawled": allUrls, "urls_crawled": validatedUrls,
"total_urls": len(allUrls), "total_urls": len(validatedUrls),
"results": crawlResult, "results": crawlResult,
"total_results": totalResults "total_results": totalResults
} }
@ -383,6 +391,50 @@ Return ONLY valid JSON, no additional text:
logger.error(f"Error in web search: {str(e)}") logger.error(f"Error in web search: {str(e)}")
return [] return []
def _validateUrls(self, urls: List[str]) -> List[str]:
"""
Validate URLs before crawling - filters out invalid URLs.
Args:
urls: List of URLs to validate
Returns:
List of valid URLs
"""
validatedUrls = []
for url in urls:
if not url or not isinstance(url, str):
logger.debug(f"Skipping invalid URL (not a string): {url}")
continue
url = url.strip()
if not url:
logger.debug(f"Skipping empty URL")
continue
# Basic URL validation using urlparse
try:
parsed = urlparse(url)
# Check if URL has at least scheme and netloc
if not parsed.scheme or not parsed.netloc:
logger.debug(f"Skipping invalid URL (missing scheme or netloc): {url}")
continue
# Only allow http/https schemes
if parsed.scheme not in ['http', 'https']:
logger.debug(f"Skipping URL with unsupported scheme '{parsed.scheme}': {url}")
continue
validatedUrls.append(url)
logger.debug(f"Validated URL: {url}")
except Exception as e:
logger.warning(f"Error validating URL '{url}': {str(e)}")
continue
logger.info(f"Validated {len(validatedUrls)}/{len(urls)} URLs")
return validatedUrls
async def _performWebCrawl( async def _performWebCrawl(
self, self,
instruction: str, instruction: str,
@ -390,117 +442,165 @@ Return ONLY valid JSON, no additional text:
maxDepth: int = 2, maxDepth: int = 2,
parentOperationId: Optional[str] = None parentOperationId: Optional[str] = None
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Perform web crawl on list of URLs - calls plugin for each URL individually.""" """Perform web crawl on list of URLs - crawls URLs in parallel for better performance."""
crawlResults = [] # Create tasks for parallel crawling
crawlTasks = []
# Loop over each URL and crawl one at a time
for urlIndex, url in enumerate(urls): for urlIndex, url in enumerate(urls):
# Create separate operation for each URL with parent reference task = self._crawlSingleUrl(
urlOperationId = None url=url,
if parentOperationId: urlIndex=urlIndex,
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" totalUrls=len(urls),
urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}" instruction=instruction,
self.services.chat.progressLogStart( maxDepth=maxDepth,
urlOperationId, parentOperationId=parentOperationId
"Web Crawl", )
f"URL {urlIndex + 1}", crawlTasks.append(task)
url[:50] + "..." if len(url) > 50 else url,
parentOperationId=parentOperationId
)
try:
logger.info(f"Crawling URL {urlIndex + 1}/{len(urls)}: {url}")
if urlOperationId:
displayUrl = url[:50] + "..." if len(url) > 50 else url
self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
# Build crawl prompt model for single URL
crawlPromptModel = AiCallPromptWebCrawl(
instruction=instruction,
url=url, # Single URL
maxDepth=maxDepth,
maxWidth=5 # Default: 5 pages per level
)
crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
# Debug: persist crawl prompt (with URL identifier in content for clarity)
debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
# Call AI with WEB_CRAWL operation
crawlOptions = AiCallOptions(
operationType=OperationTypeEnum.WEB_CRAWL,
resultFormat="json"
)
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
# Use unified callAiContent method with parentOperationId for hierarchical logging
crawlResponse = await self.services.ai.callAiContent(
prompt=crawlPrompt,
options=crawlOptions,
outputFormat="json",
parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
)
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
# Extract content from AiResponse
crawlResult = crawlResponse.content
# Debug: persist crawl response
if isinstance(crawlResult, str):
self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
else:
self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
# Parse crawl result
if isinstance(crawlResult, str):
try:
# Extract JSON from response (handles markdown code blocks)
extractedJson = self.services.utils.jsonExtractString(crawlResult)
crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
except:
crawlData = {"url": url, "content": crawlResult}
else:
crawlData = crawlResult
# Process crawl results and create hierarchical progress logging for sub-URLs
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
# Recursively process crawl results to find nested URLs and create child operations
processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
# Count total URLs crawled (including sub-URLs) for progress message
totalUrlsCrawled = self._countUrlsInResults(processedResults)
# Ensure it's a list of results
if isinstance(processedResults, list):
crawlResults.extend(processedResults)
elif isinstance(processedResults, dict):
crawlResults.append(processedResults)
else:
crawlResults.append({"url": url, "content": str(processedResults)})
if urlOperationId:
if totalUrlsCrawled > 1:
self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
else:
self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
self.services.chat.progressLogFinish(urlOperationId, True)
except Exception as e:
logger.error(f"Error crawling URL {url}: {str(e)}")
if urlOperationId:
self.services.chat.progressLogFinish(urlOperationId, False)
crawlResults.append({"url": url, "error": str(e)})
return crawlResults # Execute all crawl tasks in parallel
logger.info(f"Starting parallel crawl of {len(urls)} URLs")
crawlResults = await asyncio.gather(*crawlTasks, return_exceptions=True)
# Process results and handle exceptions
processedResults = []
for idx, result in enumerate(crawlResults):
if isinstance(result, Exception):
logger.error(f"Error crawling URL {urls[idx]}: {str(result)}")
processedResults.append({"url": urls[idx], "error": str(result)})
else:
processedResults.extend(result if isinstance(result, list) else [result])
logger.info(f"Completed parallel crawl: {len(processedResults)} results")
return processedResults
async def _crawlSingleUrl(
self,
url: str,
urlIndex: int,
totalUrls: int,
instruction: str,
maxDepth: int,
parentOperationId: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Crawl a single URL - called in parallel for multiple URLs.
Args:
url: URL to crawl
urlIndex: Index of URL in the list
totalUrls: Total number of URLs being crawled
instruction: Research instruction
maxDepth: Maximum crawl depth
parentOperationId: Parent operation ID for progress tracking
Returns:
List of crawl results for this URL
"""
# Create separate operation for each URL with parent reference
urlOperationId = None
if parentOperationId:
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}"
self.services.chat.progressLogStart(
urlOperationId,
"Web Crawl",
f"URL {urlIndex + 1}/{totalUrls}",
url[:50] + "..." if len(url) > 50 else url,
parentOperationId=parentOperationId
)
try:
logger.info(f"Crawling URL {urlIndex + 1}/{totalUrls}: {url}")
if urlOperationId:
displayUrl = url[:50] + "..." if len(url) > 50 else url
self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
# Build crawl prompt model for single URL
crawlPromptModel = AiCallPromptWebCrawl(
instruction=instruction,
url=url, # Single URL
maxDepth=maxDepth,
maxWidth=5 # Default: 5 pages per level
)
crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
# Debug: persist crawl prompt (with URL identifier in content for clarity)
debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
# Call AI with WEB_CRAWL operation
crawlOptions = AiCallOptions(
operationType=OperationTypeEnum.WEB_CRAWL,
resultFormat="json"
)
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
# Use unified callAiContent method with parentOperationId for hierarchical logging
crawlResponse = await self.services.ai.callAiContent(
prompt=crawlPrompt,
options=crawlOptions,
outputFormat="json",
parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
)
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
# Extract content from AiResponse
crawlResult = crawlResponse.content
# Debug: persist crawl response
if isinstance(crawlResult, str):
self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
else:
self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
# Parse crawl result
if isinstance(crawlResult, str):
try:
# Extract JSON from response (handles markdown code blocks)
extractedJson = self.services.utils.jsonExtractString(crawlResult)
crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
except:
crawlData = {"url": url, "content": crawlResult}
else:
crawlData = crawlResult
# Process crawl results and create hierarchical progress logging for sub-URLs
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
# Recursively process crawl results to find nested URLs and create child operations
processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
# Count total URLs crawled (including sub-URLs) for progress message
totalUrlsCrawled = self._countUrlsInResults(processedResults)
# Ensure it's a list of results
if isinstance(processedResults, list):
results = processedResults
elif isinstance(processedResults, dict):
results = [processedResults]
else:
results = [{"url": url, "content": str(processedResults)}]
if urlOperationId:
if totalUrlsCrawled > 1:
self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
else:
self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
self.services.chat.progressLogFinish(urlOperationId, True)
return results
except Exception as e:
logger.error(f"Error crawling URL {url}: {str(e)}")
if urlOperationId:
self.services.chat.progressLogFinish(urlOperationId, False)
return [{"url": url, "error": str(e)}]
def _processCrawlResultsWithHierarchy( def _processCrawlResultsWithHierarchy(
self, self,