gateway/modules/methods/methodWeb.py
2025-09-02 18:58:30 +02:00

284 lines
11 KiB
Python

import logging
import csv
import io
from typing import Any, Dict
from modules.chat.methodBase import MethodBase, action
from modules.interfaces.interfaceChatModel import ActionResult, ActionDocument
from modules.interfaces.interfaceWebObjects import WebInterface
from modules.interfaces.interfaceWebModel import (
WebSearchRequest,
WebCrawlRequest,
WebScrapeRequest,
)
logger = logging.getLogger(__name__)
class MethodWeb(MethodBase):
"""Web method implementation for web operations."""
def __init__(self, serviceCenter: Any):
super().__init__(serviceCenter)
self.name = "web"
self.description = "Web search, crawling, and scraping operations using Tavily"
@action
async def search(self, parameters: Dict[str, Any]) -> ActionResult:
"""Perform a web search and outputs a csv file with a list of found URLs
Each result contains columns "url" and "title".
Parameters:
query (str): Search query to perform
maxResults (int, optional): Maximum number of results (default: 10)
"""
try:
# Prepare request data
web_search_request = WebSearchRequest(
query=parameters.get("query"),
max_results=parameters.get("maxResults", 10),
)
# Perform request
web_interface = await WebInterface.create()
web_search_result = await web_interface.search(web_search_request)
# Convert search results to CSV format
if web_search_result.success and web_search_result.documents:
csv_content = web_interface.convert_web_search_result_to_csv(web_search_result)
# Create CSV document
csv_document = web_interface.create_csv_action_document(
csv_content,
f"web_search_results.csv"
)
return ActionResult(
success=True,
documents=[csv_document]
)
else:
return web_search_result
except Exception as e:
return ActionResult(success=False, error=str(e))
def _read_csv_with_urls(self, csv_content: str) -> list:
"""Read CSV content and extract URLs from url,title or title,url format (both ; and , delimiters)"""
urls = []
# Try both semicolon and comma delimiters
for delimiter in [';', ',']:
try:
reader = csv.DictReader(io.StringIO(csv_content), delimiter=delimiter)
for row in reader:
# Look for url column (case insensitive)
url = None
for key in row.keys():
if key.lower() == 'url':
url = row[key].strip()
break
if url and (url.startswith('http://') or url.startswith('https://')):
urls.append(url)
# If we found URLs with this delimiter, return them
if urls:
return urls
except Exception:
# Try next delimiter
continue
# If no valid CSV found, try simple text parsing as fallback
lines = csv_content.split('\n')
for line in lines:
line = line.strip()
if line and (line.startswith('http://') or line.startswith('https://')):
urls.append(line)
return urls
@action
async def crawl(self, parameters: Dict[str, Any]) -> ActionResult:
"""Crawls a list of URLs and extracts information from them.
Parameters:
documentList (str): Document list reference containing URL lists from search results
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
document_list = parameters.get("documentList")
if not document_list:
return ActionResult(
success=False, error="No document list reference provided."
)
# Resolve document list reference to ChatDocument objects
chat_documents = self.service.getChatDocumentsFromDocumentList(document_list)
if not chat_documents:
return ActionResult(
success=False,
error=f"No documents found for reference: {document_list}",
)
# Extract URLs from all documents and combine them
all_urls = []
import json
import re
for i, doc in enumerate(chat_documents):
logger.info(f"Processing document {i+1}/{len(chat_documents)}: {doc.fileName}")
# Get file data using the service center
file_data = self.service.getFileData(doc.fileId)
if not file_data:
logger.warning(f"Could not retrieve file data for document: {doc.fileName}")
continue
content = file_data.decode("utf-8")
# Try to parse as CSV first (for new CSV format)
if doc.fileName.lower().endswith('.csv') or 'csv' in doc.mimeType.lower():
logger.info(f"Processing CSV file: {doc.fileName}")
doc_urls = self._read_csv_with_urls(content)
else:
# Parse JSON to extract URLs from search results
try:
# The document structure from WebSearchActionResult
search_data = json.loads(content)
# Extract URLs from the search results structure
doc_urls = []
if isinstance(search_data, dict):
# Handle the document structure: documentData contains the actual search results
doc_data = search_data.get("documentData", search_data)
if "results" in doc_data and isinstance(doc_data["results"], list):
doc_urls = [
result["url"]
for result in doc_data["results"]
if isinstance(result, dict) and "url" in result
]
elif "urls" in doc_data and isinstance(doc_data["urls"], list):
# Fallback: if URLs are stored directly in a 'urls' field
doc_urls = [url for url in doc_data["urls"] if isinstance(url, str)]
# Fallback: try to parse as plain text with regex (for backward compatibility)
if not doc_urls:
logger.warning(
f"Could not extract URLs from JSON structure in {doc.fileName}, trying plain text parsing"
)
doc_urls = re.split(r"[\n,;]+", content)
doc_urls = [
u.strip()
for u in doc_urls
if u.strip()
and (
u.strip().startswith("http://")
or u.strip().startswith("https://")
)
]
except json.JSONDecodeError:
# Fallback to plain text parsing if JSON parsing fails
logger.warning(f"Document {doc.fileName} is not valid JSON, trying plain text parsing")
doc_urls = re.split(r"[\n,;]+", content)
doc_urls = [
u.strip()
for u in doc_urls
if u.strip()
and (
u.strip().startswith("http://")
or u.strip().startswith("https://")
)
]
if doc_urls:
all_urls.extend(doc_urls)
logger.info(f"Extracted {len(doc_urls)} URLs from {doc.fileName}")
else:
logger.warning(f"No valid URLs found in document: {doc.fileName}")
if not all_urls:
return ActionResult(
success=False, error="No valid URLs found in any of the documents."
)
# Remove duplicates while preserving order
unique_urls = list(dict.fromkeys(all_urls))
logger.info(f"Extracted {len(unique_urls)} unique URLs from {len(chat_documents)} documents")
# Prepare request data
web_crawl_request = WebCrawlRequest(urls=unique_urls)
# Perform request
web_interface = await WebInterface.create()
web_crawl_result = await web_interface.crawl(web_crawl_request)
# Convert to proper JSON format
if web_crawl_result.success:
json_content = web_interface.convert_web_result_to_json(web_crawl_result)
json_document = web_interface.create_json_action_document(
json_content,
f"web_crawl_results.json"
)
return ActionResult(
success=True,
documents=[json_document]
)
else:
return web_crawl_result
except Exception as e:
logger.error(f"Error in crawl method: {str(e)}")
return ActionResult(success=False, error=str(e))
@action
async def scrape(self, parameters: Dict[str, Any]) -> ActionResult:
"""Scrapes web content by searching for URLs and then extracting their content.
Combines search and crawl operations in one step.
Parameters:
query (str): Search query to perform
maxResults (int, optional): Maximum number of results (default: 10)
"""
try:
query = parameters.get("query")
max_results = parameters.get("maxResults", 10)
if not query:
return ActionResult(success=False, error="Search query is required")
# Prepare request data
web_scrape_request = WebScrapeRequest(
query=query,
max_results=max_results,
)
# Perform request
web_interface = await WebInterface.create()
web_scrape_result = await web_interface.scrape(web_scrape_request)
# Convert to proper JSON format
if web_scrape_result.success:
json_content = web_interface.convert_web_result_to_json(web_scrape_result)
json_document = web_interface.create_json_action_document(
json_content,
f"web_scrape_results.json"
)
return ActionResult(
success=True,
documents=[json_document]
)
else:
return web_scrape_result
except Exception as e:
return ActionResult(success=False, error=str(e))