gateway/modules/workflows/methods/methodWeb.py

437 lines
20 KiB
Python

import logging
import csv
import io
import json as _json
from typing import Any, Dict
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult, ActionDocument
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
from modules.datamodels.datamodelWeb import (
WebSearchRequest,
WebCrawlRequest,
WebScrapeRequest,
)
logger = logging.getLogger(__name__)
class MethodWeb(MethodBase):
"""Web method implementation for web operations."""
def __init__(self, services):
super().__init__(services)
self.name = "web"
self.description = "Web search, crawling, and scraping operations using Tavily"
@action
async def search(self, parameters: Dict[str, Any]) -> ActionResult:
"""Perform a web search and output a CSV with the found URLs. Each result row contains columns "url" and "title".
Parameters:
query (str, required): Search query.
maxResults (int, optional): Max number of results. Default: 10.
searchDepth ("basic"|"advanced", optional): Search depth. Default: provider default.
timeRange ("d"|"w"|"m"|"y", optional): Limit to last day/week/month/year.
topic ("general"|"news"|"academic", optional): Result domain preference.
includeDomains (list[str], optional): Only include these domains.
excludeDomains (list[str], optional): Exclude these domains.
language (str, optional): ISO code like "de", "en" to bias results.
includeAnswer (bool, optional): Ask provider to generate a short answer.
includeRawContent (bool, optional): Include raw content where possible.
"""
try:
# Prepare request data (generic, no region/language bias)
raw_query = parameters.get("query")
max_results = parameters.get("maxResults", 10)
if not raw_query or not isinstance(raw_query, str):
return ActionResult(success=False, error="Search query is required")
web_search_request = WebSearchRequest(
query=raw_query.strip(),
max_results=max_results,
search_depth=parameters.get("searchDepth"),
time_range=parameters.get("timeRange"),
topic=parameters.get("topic"),
include_domains=parameters.get("includeDomains"),
exclude_domains=parameters.get("excludeDomains"),
language=parameters.get("language"),
include_answer=parameters.get("includeAnswer"),
include_raw_content=parameters.get("includeRawContent"),
)
# Perform request via centralized service wrappers
web_search_result = await self.services.web.webSearch(web_search_request)
# Convert search results to CSV format (generic)
if web_search_result.success and web_search_result.documents:
csv_content = self._convert_web_result_to_csv(web_search_result)
csv_document = ActionDocument(
documentName=f"web_search_results.csv",
documentData=csv_content,
mimeType="text/csv"
)
return ActionResult(success=True, documents=[csv_document])
else:
return web_search_result
except Exception as e:
return ActionResult(success=False, error=str(e))
def _read_csv_with_urls(self, csv_content: str) -> list:
"""Read CSV content and extract URLs from url,title or title,url format (both ; and , delimiters)"""
urls = []
# Try both semicolon and comma delimiters
for delimiter in [';', ',']:
try:
reader = csv.DictReader(io.StringIO(csv_content), delimiter=delimiter)
for row in reader:
# Look for url column (case insensitive)
url = None
for key in row.keys():
if key.lower() == 'url':
url = row[key].strip()
break
if url and (url.startswith('http://') or url.startswith('https://')):
urls.append(url)
# If we found URLs with this delimiter, return them
if urls:
return urls
except Exception:
# Try next delimiter
continue
# If no valid CSV found, try simple text parsing as fallback
lines = csv_content.split('\n')
for line in lines:
line = line.strip()
if line and (line.startswith('http://') or line.startswith('https://')):
urls.append(line)
return urls
@action
async def crawl(self, parameters: Dict[str, Any]) -> ActionResult:
"""Crawl a list of URLs and extract text content.
Parameters:
documentList (list[str]|str, required): Reference(s) to documents containing URLs (e.g., CSV from search). Can be a single ref or list.
expectedDocumentFormats (list, optional): Hint for downstream handling.
extractDepth ("basic"|"advanced", optional): Extraction depth. Default: "advanced".
format ("text"|"markdown", optional): Output format. Default: "text".
"""
try:
document_list = parameters.get("documentList")
# Normalize to list if a single string reference is provided
if isinstance(document_list, str):
document_list = [document_list]
if not document_list:
return ActionResult(
success=False, error="No document list reference provided."
)
# Resolve document list reference to ChatDocument objects
chat_documents = self.services.workflow.getChatDocumentsFromDocumentList(document_list)
if not chat_documents:
return ActionResult(
success=False,
error=f"No documents found for reference: {document_list}",
)
# Extract URLs from all documents and combine them
all_urls = []
import json
import re
for i, doc in enumerate(chat_documents):
logger.info(f"Processing document {i+1}/{len(chat_documents)}: {doc.fileName}")
# Get file data using the service center
file_data = self.services.workflow.getFileData(doc.fileId)
if not file_data:
logger.warning(f"Could not retrieve file data for document: {doc.fileName}")
continue
content = file_data.decode("utf-8")
# Try to parse as CSV first (for new CSV format)
if doc.fileName.lower().endswith('.csv') or 'csv' in doc.mimeType.lower():
logger.info(f"Processing CSV file: {doc.fileName}")
doc_urls = self._read_csv_with_urls(content)
else:
# Parse JSON to extract URLs from search results
try:
# The document structure from WebSearchActionResult
search_data = json.loads(content)
# Extract URLs from the search results structure
doc_urls = []
if isinstance(search_data, dict):
# Handle the document structure: documentData contains the actual search results
doc_data = search_data.get("documentData", search_data)
if "results" in doc_data and isinstance(doc_data["results"], list):
doc_urls = [
result["url"]
for result in doc_data["results"]
if isinstance(result, dict) and "url" in result
]
elif "urls" in doc_data and isinstance(doc_data["urls"], list):
# Fallback: if URLs are stored directly in a 'urls' field
doc_urls = [url for url in doc_data["urls"] if isinstance(url, str)]
# Fallback: try to parse as plain text with regex (for backward compatibility)
if not doc_urls:
logger.warning(
f"Could not extract URLs from JSON structure in {doc.fileName}, trying plain text parsing"
)
doc_urls = re.split(r"[\n,;]+", content)
doc_urls = [
u.strip()
for u in doc_urls
if u.strip()
and (
u.strip().startswith("http://")
or u.strip().startswith("https://")
)
]
except json.JSONDecodeError:
# Fallback to plain text parsing if JSON parsing fails
logger.warning(f"Document {doc.fileName} is not valid JSON, trying plain text parsing")
doc_urls = re.split(r"[\n,;]+", content)
doc_urls = [
u.strip()
for u in doc_urls
if u.strip()
and (
u.strip().startswith("http://")
or u.strip().startswith("https://")
)
]
if doc_urls:
all_urls.extend(doc_urls)
logger.info(f"Extracted {len(doc_urls)} URLs from {doc.fileName}")
else:
logger.warning(f"No valid URLs found in document: {doc.fileName}")
if not all_urls:
return ActionResult(
success=False, error="No valid URLs found in any of the documents."
)
# Remove duplicates while preserving order
unique_urls = list(dict.fromkeys(all_urls))
logger.info(f"Extracted {len(unique_urls)} unique URLs from {len(chat_documents)} documents")
# Prepare request data with normalization
allowed_extract_depth = {"basic", "advanced"}
allowed_formats = {"text", "markdown"}
extract_depth = parameters.get("extractDepth")
if extract_depth and extract_depth not in allowed_extract_depth:
logger.warning(f"Invalid extractDepth '{extract_depth}' provided. Falling back to 'advanced'.")
extract_depth = "advanced"
fmt = parameters.get("format")
if fmt and fmt not in allowed_formats:
logger.warning(f"Invalid format '{fmt}' provided. Falling back to 'text'.")
fmt = "text"
web_crawl_request = WebCrawlRequest(
urls=unique_urls,
extract_depth=extract_depth,
format=fmt,
)
# Perform request via centralized service wrappers
web_crawl_result = await self.services.web.webCrawl(web_crawl_request)
# Convert and enrich with concise summaries per URL for better context
if web_crawl_result.success:
try:
doc = web_crawl_result.documents[0]
results = getattr(doc.documentData, "results", [])
enriched = []
# Summarize each result briefly using AI for added context
for item in results:
url = str(getattr(item, "url", ""))
content = str(getattr(item, "content", ""))
summary = ""
try:
if content:
prompt = (
"Summarize the following webpage content in 3-5 concise bullet points. "
"Focus on key points, figures, named entities (companies/institutions), and location context. "
"Return only bullet points without any preface."
)
context = content[:4000]
# Centralized AI summary (balanced analyse_content)
summary = await self.services.ai.callAi(
prompt=prompt,
documents=None,
options=AiCallOptions(
operationType=OperationType.ANALYSE_CONTENT,
priority=Priority.BALANCED,
compressPrompt=True,
compressContext=False,
processingMode="advanced",
maxCost=0.05,
maxProcessingTime=30
)
)
summary = summary.strip()
except Exception:
summary = ""
enriched.append({
"url": url,
"summary": summary,
"snippet": content[:500]
})
import json as _json
payload = {
"success": True,
"total_count": len(enriched),
"results": enriched,
}
json_content = _json.dumps(payload, ensure_ascii=False, indent=2)
except Exception:
# Fallback to original conversion
json_content = self._convert_web_result_to_json(web_crawl_result)
json_document = ActionDocument(
documentName=f"web_crawl_results.json",
documentData=json_content,
mimeType="application/json"
)
return ActionResult(success=True, documents=[json_document])
else:
return web_crawl_result
except Exception as e:
logger.error(f"Error in crawl method: {str(e)}")
return ActionResult(success=False, error=str(e))
@action
async def scrape(self, parameters: Dict[str, Any]) -> ActionResult:
"""Search and then crawl the found URLs in one step. To use for market analysis, web research, internet searches
Parameters:
query (str, required): Search query.
maxResults (int, optional): Max number of results. Default: 10.
searchDepth ("basic"|"advanced", optional): Search depth.
timeRange ("d"|"w"|"m"|"y", optional): Time window.
topic ("general"|"news"|"academic", optional): Result domain preference.
includeDomains (list[str], optional): Only include these domains.
excludeDomains (list[str], optional): Exclude these domains.
language (str, optional): ISO language bias.
includeAnswer (bool, optional): Ask provider to include an answer.
includeRawContent (bool, optional): Include raw content where possible.
extractDepth ("basic"|"advanced", optional): Crawl extraction depth. Default: "advanced".
format ("text"|"markdown", optional): Crawl output format. Default: "text".
"""
try:
query = parameters.get("query")
max_results = parameters.get("maxResults", 10)
# Normalize optional enums to avoid validation errors
allowed_search_depth = {"basic", "advanced"}
allowed_extract_depth = {"basic", "advanced"}
allowed_formats = {"text", "markdown"}
search_depth = parameters.get("searchDepth")
if search_depth and search_depth not in allowed_search_depth:
logger.warning(f"Invalid searchDepth '{search_depth}' provided. Falling back to None.")
search_depth = None
extract_depth = parameters.get("extractDepth")
if extract_depth and extract_depth not in allowed_extract_depth:
logger.warning(f"Invalid extractDepth '{extract_depth}' provided. Falling back to 'advanced'.")
extract_depth = "advanced"
fmt = parameters.get("format")
if fmt and fmt not in allowed_formats:
logger.warning(f"Invalid format '{fmt}' provided. Falling back to 'text'.")
fmt = "text"
if not query:
return ActionResult(success=False, error="Search query is required")
# Prepare request data
web_scrape_request = WebScrapeRequest(
query=query,
max_results=max_results,
search_depth=search_depth,
time_range=parameters.get("timeRange"),
topic=parameters.get("topic"),
include_domains=parameters.get("includeDomains"),
exclude_domains=parameters.get("excludeDomains"),
language=parameters.get("language"),
include_answer=parameters.get("includeAnswer"),
include_raw_content=parameters.get("includeRawContent"),
extract_depth=extract_depth,
format=fmt,
)
# Perform request via centralized service wrappers
web_scrape_result = await self.services.web.webScrape(web_scrape_request)
# Convert to proper JSON format
if web_scrape_result.success:
json_content = self._convert_web_result_to_json(web_scrape_result)
json_document = ActionDocument(
documentName=f"web_scrape_results.json",
documentData=json_content,
mimeType="application/json"
)
return ActionResult(
success=True,
documents=[json_document]
)
else:
return web_scrape_result
except Exception as e:
return ActionResult(success=False, error=str(e))
# Helpers
def _convert_web_result_to_json(self, web_result):
if not getattr(web_result, 'success', False) or not getattr(web_result, 'documents', None):
return _json.dumps({"success": getattr(web_result, 'success', False), "error": getattr(web_result, 'error', None)})
document_data = web_result.documents[0].documentData
result_dict = {
"success": True,
"results": [
{
"url": str(getattr(result, 'url', "")),
"content": getattr(result, 'content', "")
}
for result in getattr(document_data, 'results', [])
],
"total_count": getattr(document_data, 'total_count', 0)
}
if hasattr(document_data, 'urls'):
result_dict["urls"] = [str(url) for url in getattr(document_data, 'urls', [])]
elif hasattr(document_data, 'query'):
result_dict["query"] = getattr(document_data, 'query', None)
return _json.dumps(result_dict, indent=2, ensure_ascii=False)
def _convert_web_result_to_csv(self, web_search_result):
if not getattr(web_search_result, 'success', False) or not getattr(web_search_result, 'documents', None):
return ""
output = io.StringIO()
writer = csv.writer(output, delimiter=';')
writer.writerow(['url', 'title'])
document_data = web_search_result.documents[0].documentData
for result in getattr(document_data, 'results', []):
writer.writerow([str(getattr(result, 'url', "")), getattr(result, 'title', "")])
return output.getvalue()