645 lines
29 KiB
Python
645 lines
29 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Web crawl service for handling web research operations.
|
|
Manages the two-step process: WEB_SEARCH then WEB_CRAWL.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Dict, Any, List, Optional
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WebService:
|
|
"""Service for web search and crawling operations."""
|
|
|
|
def __init__(self, services):
|
|
"""Initialize webcrawl service with service center access."""
|
|
self.services = services
|
|
|
|
async def performWebResearch(
|
|
self,
|
|
prompt: str,
|
|
urls: List[str],
|
|
country: Optional[str],
|
|
language: Optional[str],
|
|
researchDepth: str = "general",
|
|
operationId: str = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Perform web research in two steps:
|
|
1. Use AI to analyze prompt and extract parameters + URLs
|
|
2. Call WEB_SEARCH to get URLs (if needed)
|
|
3. Combine URLs and filter to maxNumberPages
|
|
4. Call WEB_CRAWL for each URL
|
|
5. Return consolidated result
|
|
|
|
Args:
|
|
prompt: Natural language research prompt
|
|
urls: Optional list of URLs provided by user
|
|
country: Optional country code
|
|
language: Optional language code
|
|
operationId: Operation ID for progress tracking
|
|
|
|
Returns:
|
|
Consolidated research results as dictionary
|
|
"""
|
|
# Start progress tracking if operationId provided
|
|
if operationId:
|
|
self.services.chat.progressLogStart(
|
|
operationId,
|
|
"Web Research",
|
|
"Research",
|
|
f"Depth: {researchDepth}"
|
|
)
|
|
|
|
try:
|
|
# Step 1: AI intention analysis - extract URLs and parameters from prompt
|
|
if operationId:
|
|
self.services.chat.progressLogUpdate(operationId, 0.1, "Analyzing research intent")
|
|
|
|
analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth)
|
|
|
|
# Extract parameters from AI analysis
|
|
instruction = analysisResult.get("instruction", prompt)
|
|
extractedUrls = analysisResult.get("urls", [])
|
|
needsSearch = analysisResult.get("needsSearch", True) # Default to True
|
|
maxNumberPages = analysisResult.get("maxNumberPages", 10)
|
|
countryCode = analysisResult.get("country", country)
|
|
languageCode = analysisResult.get("language", language)
|
|
finalResearchDepth = analysisResult.get("researchDepth", researchDepth)
|
|
suggestedFilename = analysisResult.get("filename", None)
|
|
|
|
logger.info(f"AI Analysis: instruction='{instruction[:100]}...', urls={len(extractedUrls)}, needsSearch={needsSearch}, maxNumberPages={maxNumberPages}, researchDepth={finalResearchDepth}, filename={suggestedFilename}")
|
|
|
|
# Combine URLs (from user + from prompt extraction)
|
|
allUrls = []
|
|
if urls:
|
|
allUrls.extend(urls)
|
|
if extractedUrls:
|
|
allUrls.extend(extractedUrls)
|
|
|
|
# Step 2: Search for URLs if needed (based on needsSearch flag)
|
|
if needsSearch and (not allUrls or len(allUrls) < maxNumberPages):
|
|
self.services.chat.progressLogUpdate(operationId, 0.3, "Searching for URLs")
|
|
|
|
searchUrls = await self._performWebSearch(
|
|
instruction=instruction,
|
|
maxNumberPages=maxNumberPages - len(allUrls),
|
|
country=countryCode,
|
|
language=languageCode
|
|
)
|
|
|
|
# Add search URLs to the list
|
|
allUrls.extend(searchUrls)
|
|
|
|
self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
|
|
|
|
# Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering)
|
|
if len(allUrls) > maxNumberPages:
|
|
allUrls = allUrls[:maxNumberPages]
|
|
logger.info(f"Limited URLs to {maxNumberPages}")
|
|
|
|
if not allUrls:
|
|
return {"error": "No URLs found to crawl"}
|
|
|
|
# Step 4: Translate researchDepth to maxDepth
|
|
depthMap = {"fast": 1, "general": 2, "deep": 3}
|
|
maxDepth = depthMap.get(finalResearchDepth.lower(), 2)
|
|
|
|
# Step 5: Crawl all URLs with hierarchical logging
|
|
if operationId:
|
|
self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating")
|
|
self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs")
|
|
|
|
# Use parent operation ID directly (parentId should be operationId, not log entry ID)
|
|
parentOperationId = operationId # Use the parent's operationId directly
|
|
|
|
crawlResult = await self._performWebCrawl(
|
|
instruction=instruction,
|
|
urls=allUrls,
|
|
maxDepth=maxDepth,
|
|
parentOperationId=parentOperationId
|
|
)
|
|
|
|
if operationId:
|
|
self.services.chat.progressLogUpdate(operationId, 0.9, "Consolidating results")
|
|
self.services.chat.progressLogUpdate(operationId, 0.95, "Completed")
|
|
self.services.chat.progressLogFinish(operationId, True)
|
|
|
|
# Calculate statistics about crawl results
|
|
totalResults = len(crawlResult) if isinstance(crawlResult, list) else 1
|
|
totalContentLength = 0
|
|
urlsWithContent = 0
|
|
|
|
# Analyze crawl results to gather statistics
|
|
if isinstance(crawlResult, list):
|
|
for item in crawlResult:
|
|
if isinstance(item, dict):
|
|
if item.get("url"):
|
|
urlsWithContent += 1
|
|
content = item.get("content", "")
|
|
if isinstance(content, str):
|
|
totalContentLength += len(content)
|
|
elif isinstance(content, dict):
|
|
# Estimate size from dict
|
|
totalContentLength += len(str(content))
|
|
elif isinstance(crawlResult, dict):
|
|
if crawlResult.get("url"):
|
|
urlsWithContent = 1
|
|
content = crawlResult.get("content", "")
|
|
if isinstance(content, str):
|
|
totalContentLength = len(content)
|
|
elif isinstance(content, dict):
|
|
totalContentLength = len(str(content))
|
|
|
|
# Convert crawl results into sections format for generic validator
|
|
sections = []
|
|
if isinstance(crawlResult, list):
|
|
for idx, item in enumerate(crawlResult):
|
|
if isinstance(item, dict):
|
|
section = {
|
|
"id": f"result_{idx}",
|
|
"content_type": "paragraph",
|
|
"title": item.get("url", f"Result {idx + 1}"),
|
|
"order": idx
|
|
}
|
|
# Add content preview
|
|
content = item.get("content", "")
|
|
if isinstance(content, str) and content:
|
|
section["textPreview"] = content[:200] + ("..." if len(content) > 200 else "")
|
|
sections.append(section)
|
|
elif isinstance(crawlResult, dict):
|
|
section = {
|
|
"id": "result_0",
|
|
"content_type": "paragraph",
|
|
"title": crawlResult.get("url", "Research Result"),
|
|
"order": 0
|
|
}
|
|
content = crawlResult.get("content", "")
|
|
if isinstance(content, str) and content:
|
|
section["textPreview"] = content[:200] + ("..." if len(content) > 200 else "")
|
|
sections.append(section)
|
|
|
|
# Return consolidated result with metadata in format that generic validator understands
|
|
result = {
|
|
"metadata": {
|
|
"title": suggestedFilename or instruction[:100] if instruction else "Web Research Results",
|
|
"extraction_method": "web_crawl",
|
|
"research_depth": finalResearchDepth,
|
|
"max_depth": maxDepth,
|
|
"country": countryCode,
|
|
"language": languageCode,
|
|
"urls_crawled": allUrls[:20], # First 20 URLs for reference
|
|
"total_urls": len(allUrls),
|
|
"urls_with_content": urlsWithContent,
|
|
"total_content_length": totalContentLength,
|
|
"crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None
|
|
},
|
|
"sections": sections,
|
|
"statistics": {
|
|
"sectionCount": len(sections),
|
|
"total_urls": len(allUrls),
|
|
"results_count": totalResults,
|
|
"urls_with_content": urlsWithContent,
|
|
"total_content_length": totalContentLength
|
|
},
|
|
# Keep original structure for backward compatibility
|
|
"instruction": instruction,
|
|
"urls_crawled": allUrls,
|
|
"total_urls": len(allUrls),
|
|
"results": crawlResult,
|
|
"total_results": totalResults
|
|
}
|
|
|
|
# Add suggested filename if available
|
|
if suggestedFilename:
|
|
result["suggested_filename"] = suggestedFilename
|
|
result["metadata"]["suggested_filename"] = suggestedFilename
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in web research: {str(e)}")
|
|
if operationId:
|
|
self.services.chat.progressLogFinish(operationId, False)
|
|
raise
|
|
|
|
async def _analyzeResearchIntent(
|
|
self,
|
|
prompt: str,
|
|
urls: List[str],
|
|
country: Optional[str],
|
|
language: Optional[str],
|
|
researchDepth: str = "general"
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Use AI to analyze prompt and extract:
|
|
- URLs from the prompt text
|
|
- Research instruction
|
|
- maxNumberPages, timeRange, country, language from context
|
|
"""
|
|
# Build analysis prompt for AI
|
|
analysisPrompt = f"""Analyze this web research request and extract structured information.
|
|
|
|
RESEARCH REQUEST:
|
|
{prompt}
|
|
|
|
USER PROVIDED:
|
|
- URLs: {json.dumps(urls) if urls else "None"}
|
|
- Country: {country or "Not specified"}
|
|
- Language: {language or "Not specified"}
|
|
|
|
Extract and provide a JSON response with:
|
|
1. instruction: Formulate directly, WHAT you want to find on the web. Do not include URLs in the instruction. Good example: "What is the company Xyz doing?". Bad example: "Conduct web research on the company Xyz"
|
|
2. urls: Put list of URLs found in the prompt text, and URL's you know, that are relevant to the research
|
|
3. needsSearch: true if web search is needed to identify url's to crawl, false if only crawling of provided URLs is wanted
|
|
4. maxNumberPages: Recommended number of URLs to crawl (based on research scope, typical: 2-20)
|
|
5. country: Country code if identified in the prompt (2-digit lowercase, e.g., ch, us, de)
|
|
6. language: Language identified from the prompt (lowercase, e.g., de, en, fr)
|
|
7. researchDepth: Research depth based on instruction complexity - "fast" (quick overview, maxDepth=1), "general" (standard research, maxDepth=2), or "deep" (comprehensive research, maxDepth=3)
|
|
8. filename: Generate a concise, descriptive filename (without extension) for the research results. Should be short (max 50 characters), descriptive of the research topic, use underscores instead of spaces, and only contain alphanumeric characters and underscores. Example: "WebResearch_Topic_Context"
|
|
|
|
Return ONLY valid JSON, no additional text:
|
|
{{
|
|
"instruction": "research instruction",
|
|
"urls": ["url1", "url2"],
|
|
"needsSearch": true,
|
|
"maxNumberPages": 10,
|
|
"country": "ch",
|
|
"language": "en",
|
|
"researchDepth": "general",
|
|
"filename": "descriptive_filename_without_extension"
|
|
}}"""
|
|
|
|
try:
|
|
# Call AI planning to analyze intent
|
|
analysisJson = await self.services.ai.callAiPlanning(
|
|
analysisPrompt,
|
|
debugType="webresearchintent"
|
|
)
|
|
|
|
# Extract JSON from response (handles markdown code blocks)
|
|
extractedJson = self.services.utils.jsonExtractString(analysisJson)
|
|
if not extractedJson:
|
|
raise ValueError("No JSON found in AI response")
|
|
|
|
# Parse JSON response
|
|
result = json.loads(extractedJson)
|
|
|
|
logger.info(f"Intent analysis result: {result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error in AI intent analysis: {str(e)}")
|
|
# Fallback to basic extraction
|
|
return {
|
|
"instruction": prompt,
|
|
"urls": [],
|
|
"needsSearch": True,
|
|
"maxNumberPages": 10,
|
|
"country": country,
|
|
"language": language,
|
|
"researchDepth": researchDepth,
|
|
"filename": None
|
|
}
|
|
|
|
async def _performWebSearch(
|
|
self,
|
|
instruction: str,
|
|
maxNumberPages: int,
|
|
country: Optional[str],
|
|
language: Optional[str]
|
|
) -> List[str]:
|
|
"""Perform web search to find URLs."""
|
|
try:
|
|
# Build search prompt model
|
|
searchPromptModel = AiCallPromptWebSearch(
|
|
instruction=instruction,
|
|
country=country,
|
|
maxNumberPages=maxNumberPages,
|
|
language=language
|
|
)
|
|
searchPrompt = searchPromptModel.model_dump_json(exclude_none=True, indent=2)
|
|
|
|
# Debug: persist search prompt
|
|
self.services.utils.writeDebugFile(searchPrompt, "websearch_prompt")
|
|
|
|
# Call AI with WEB_SEARCH operation
|
|
searchOptions = AiCallOptions(
|
|
operationType=OperationTypeEnum.WEB_SEARCH,
|
|
resultFormat="json"
|
|
)
|
|
|
|
# Use unified callAiContent method
|
|
searchResponse = await self.services.ai.callAiContent(
|
|
prompt=searchPrompt,
|
|
options=searchOptions,
|
|
outputFormat="json"
|
|
)
|
|
|
|
# Extract content from AiResponse
|
|
searchResult = searchResponse.content
|
|
|
|
# Debug: persist search response
|
|
if isinstance(searchResult, str):
|
|
self.services.utils.writeDebugFile(searchResult, "websearch_response")
|
|
else:
|
|
self.services.utils.writeDebugFile(json.dumps(searchResult, indent=2), "websearch_response")
|
|
|
|
# Parse and extract URLs
|
|
if isinstance(searchResult, str):
|
|
# Extract JSON from response (handles markdown code blocks)
|
|
extractedJson = self.services.utils.jsonExtractString(searchResult)
|
|
searchData = json.loads(extractedJson) if extractedJson else json.loads(searchResult)
|
|
else:
|
|
searchData = searchResult
|
|
|
|
# Extract URLs from response
|
|
urls = []
|
|
if isinstance(searchData, dict):
|
|
if "urls" in searchData:
|
|
urls = searchData["urls"]
|
|
elif "results" in searchData:
|
|
urls = [r.get("url") for r in searchData["results"] if r.get("url")]
|
|
elif isinstance(searchData, list):
|
|
# Handle both cases: list of URL strings or list of dicts with "url" key
|
|
for item in searchData:
|
|
if isinstance(item, str):
|
|
# Item is already a URL string
|
|
urls.append(item)
|
|
elif isinstance(item, dict) and item.get("url"):
|
|
# Item is a dict with "url" key
|
|
urls.append(item.get("url"))
|
|
|
|
logger.info(f"Web search returned {len(urls)} URLs")
|
|
return urls
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in web search: {str(e)}")
|
|
return []
|
|
|
|
async def _performWebCrawl(
|
|
self,
|
|
instruction: str,
|
|
urls: List[str],
|
|
maxDepth: int = 2,
|
|
parentOperationId: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Perform web crawl on list of URLs - calls plugin for each URL individually."""
|
|
crawlResults = []
|
|
|
|
# Loop over each URL and crawl one at a time
|
|
for urlIndex, url in enumerate(urls):
|
|
# Create separate operation for each URL with parent reference
|
|
urlOperationId = None
|
|
if parentOperationId:
|
|
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
|
|
urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}"
|
|
self.services.chat.progressLogStart(
|
|
urlOperationId,
|
|
"Web Crawl",
|
|
f"URL {urlIndex + 1}",
|
|
url[:50] + "..." if len(url) > 50 else url,
|
|
parentOperationId=parentOperationId
|
|
)
|
|
|
|
try:
|
|
logger.info(f"Crawling URL {urlIndex + 1}/{len(urls)}: {url}")
|
|
|
|
if urlOperationId:
|
|
displayUrl = url[:50] + "..." if len(url) > 50 else url
|
|
self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
|
|
self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
|
|
|
|
# Build crawl prompt model for single URL
|
|
crawlPromptModel = AiCallPromptWebCrawl(
|
|
instruction=instruction,
|
|
url=url, # Single URL
|
|
maxDepth=maxDepth,
|
|
maxWidth=5 # Default: 5 pages per level
|
|
)
|
|
crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
|
|
|
|
# Debug: persist crawl prompt (with URL identifier in content for clarity)
|
|
debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
|
|
self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
|
|
|
|
# Call AI with WEB_CRAWL operation
|
|
crawlOptions = AiCallOptions(
|
|
operationType=OperationTypeEnum.WEB_CRAWL,
|
|
resultFormat="json"
|
|
)
|
|
|
|
if urlOperationId:
|
|
self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
|
|
|
|
# Use unified callAiContent method with parentOperationId for hierarchical logging
|
|
crawlResponse = await self.services.ai.callAiContent(
|
|
prompt=crawlPrompt,
|
|
options=crawlOptions,
|
|
outputFormat="json",
|
|
parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
|
|
)
|
|
|
|
if urlOperationId:
|
|
self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
|
|
|
|
# Extract content from AiResponse
|
|
crawlResult = crawlResponse.content
|
|
|
|
# Debug: persist crawl response
|
|
if isinstance(crawlResult, str):
|
|
self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
|
|
else:
|
|
self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
|
|
|
|
# Parse crawl result
|
|
if isinstance(crawlResult, str):
|
|
try:
|
|
# Extract JSON from response (handles markdown code blocks)
|
|
extractedJson = self.services.utils.jsonExtractString(crawlResult)
|
|
crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
|
|
except:
|
|
crawlData = {"url": url, "content": crawlResult}
|
|
else:
|
|
crawlData = crawlResult
|
|
|
|
# Process crawl results and create hierarchical progress logging for sub-URLs
|
|
if urlOperationId:
|
|
self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
|
|
|
|
# Recursively process crawl results to find nested URLs and create child operations
|
|
processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
|
|
|
|
# Count total URLs crawled (including sub-URLs) for progress message
|
|
totalUrlsCrawled = self._countUrlsInResults(processedResults)
|
|
|
|
# Ensure it's a list of results
|
|
if isinstance(processedResults, list):
|
|
crawlResults.extend(processedResults)
|
|
elif isinstance(processedResults, dict):
|
|
crawlResults.append(processedResults)
|
|
else:
|
|
crawlResults.append({"url": url, "content": str(processedResults)})
|
|
|
|
if urlOperationId:
|
|
if totalUrlsCrawled > 1:
|
|
self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
|
|
else:
|
|
self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
|
|
self.services.chat.progressLogFinish(urlOperationId, True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error crawling URL {url}: {str(e)}")
|
|
if urlOperationId:
|
|
self.services.chat.progressLogFinish(urlOperationId, False)
|
|
crawlResults.append({"url": url, "error": str(e)})
|
|
|
|
return crawlResults
|
|
|
|
def _processCrawlResultsWithHierarchy(
|
|
self,
|
|
crawlData: Any,
|
|
parentUrl: str,
|
|
parentOperationId: Optional[str],
|
|
maxDepth: int,
|
|
currentDepth: int
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Recursively process crawl results to create hierarchical progress logging for sub-URLs.
|
|
|
|
Args:
|
|
crawlData: Crawl result data (dict, list, or other)
|
|
parentUrl: Parent URL being crawled
|
|
parentOperationId: Parent operation ID for hierarchical logging
|
|
maxDepth: Maximum crawl depth
|
|
currentDepth: Current depth in the crawl tree
|
|
|
|
Returns:
|
|
List of processed crawl results
|
|
"""
|
|
import time
|
|
results = []
|
|
|
|
# Handle list of results
|
|
if isinstance(crawlData, list):
|
|
for idx, item in enumerate(crawlData):
|
|
if isinstance(item, dict):
|
|
# Check if this item has sub-URLs or nested results
|
|
itemUrl = item.get("url") or item.get("source") or parentUrl
|
|
|
|
# Create child operation for sub-URL if we're not at max depth
|
|
if currentDepth < maxDepth and parentOperationId:
|
|
# Check if this item has nested results or children
|
|
hasNestedResults = "results" in item or "children" in item or "subUrls" in item
|
|
|
|
if hasNestedResults or (itemUrl != parentUrl and currentDepth > 0):
|
|
# This is a sub-URL - create child operation
|
|
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
|
|
subUrlOperationId = f"{parentOperationId}_sub_{idx}_{int(time.time())}"
|
|
self.services.chat.progressLogStart(
|
|
subUrlOperationId,
|
|
"Crawling Sub-URL",
|
|
f"Depth {currentDepth + 1}",
|
|
itemUrl[:50] + "..." if len(itemUrl) > 50 else itemUrl,
|
|
parentOperationId=parentOperationId
|
|
)
|
|
|
|
try:
|
|
# Process nested results recursively
|
|
if "results" in item:
|
|
nestedResults = self._processCrawlResultsWithHierarchy(
|
|
item["results"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1
|
|
)
|
|
item["results"] = nestedResults
|
|
elif "children" in item:
|
|
nestedResults = self._processCrawlResultsWithHierarchy(
|
|
item["children"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1
|
|
)
|
|
item["children"] = nestedResults
|
|
elif "subUrls" in item:
|
|
nestedResults = self._processCrawlResultsWithHierarchy(
|
|
item["subUrls"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1
|
|
)
|
|
item["subUrls"] = nestedResults
|
|
|
|
self.services.chat.progressLogUpdate(subUrlOperationId, 0.9, "Completed")
|
|
self.services.chat.progressLogFinish(subUrlOperationId, True)
|
|
except Exception as e:
|
|
logger.error(f"Error processing sub-URL {itemUrl}: {str(e)}")
|
|
if subUrlOperationId:
|
|
self.services.chat.progressLogFinish(subUrlOperationId, False)
|
|
|
|
results.append(item)
|
|
else:
|
|
results.append(item)
|
|
|
|
# Handle dict with results array
|
|
elif isinstance(crawlData, dict):
|
|
if "results" in crawlData:
|
|
# Process nested results
|
|
nestedResults = self._processCrawlResultsWithHierarchy(
|
|
crawlData["results"], parentUrl, parentOperationId, maxDepth, currentDepth
|
|
)
|
|
crawlData["results"] = nestedResults
|
|
results.append(crawlData)
|
|
elif "children" in crawlData:
|
|
# Process children
|
|
nestedResults = self._processCrawlResultsWithHierarchy(
|
|
crawlData["children"], parentUrl, parentOperationId, maxDepth, currentDepth
|
|
)
|
|
crawlData["children"] = nestedResults
|
|
results.append(crawlData)
|
|
elif "subUrls" in crawlData:
|
|
# Process sub-URLs
|
|
nestedResults = self._processCrawlResultsWithHierarchy(
|
|
crawlData["subUrls"], parentUrl, parentOperationId, maxDepth, currentDepth
|
|
)
|
|
crawlData["subUrls"] = nestedResults
|
|
results.append(crawlData)
|
|
else:
|
|
# Single result dict
|
|
results.append(crawlData)
|
|
else:
|
|
# Other types - wrap in dict
|
|
results.append({"url": parentUrl, "content": str(crawlData)})
|
|
|
|
return results
|
|
|
|
def _countUrlsInResults(self, results: Any) -> int:
|
|
"""
|
|
Recursively count total URLs in crawl results (including nested sub-URLs).
|
|
|
|
Args:
|
|
results: Crawl results (dict, list, or other)
|
|
|
|
Returns:
|
|
Total count of URLs found
|
|
"""
|
|
count = 0
|
|
|
|
if isinstance(results, list):
|
|
for item in results:
|
|
count += self._countUrlsInResults(item)
|
|
elif isinstance(results, dict):
|
|
# Count this URL if it has a url field
|
|
if "url" in results or "source" in results:
|
|
count += 1
|
|
# Recursively count nested results
|
|
if "results" in results:
|
|
count += self._countUrlsInResults(results["results"])
|
|
if "children" in results:
|
|
count += self._countUrlsInResults(results["children"])
|
|
if "subUrls" in results:
|
|
count += self._countUrlsInResults(results["subUrls"])
|
|
elif isinstance(results, str):
|
|
# Single URL string
|
|
count = 1
|
|
|
|
return count
|
|
|