gateway/modules/interfaces/interfaceWebObjects.py
2025-09-02 18:58:30 +02:00

118 lines
No EOL
4.2 KiB
Python

from typing import Optional
import json
import csv
import io
from modules.interfaces.interfaceWebModel import (
WebCrawlActionResult,
WebSearchActionResult,
WebSearchRequest,
WebCrawlRequest,
WebScrapeActionResult,
WebScrapeRequest,
WebCrawlDocumentData,
WebScrapeDocumentData,
WebSearchDocumentData,
)
from dataclasses import dataclass
from modules.connectors.connectorWebTavily import ConnectorTavily
from modules.interfaces.interfaceChatModel import ActionDocument
@dataclass(slots=True)
class WebInterface:
connectorWebTavily: ConnectorTavily
def __post_init__(self) -> None:
if self.connectorWebTavily is None:
raise TypeError(
"connectorWebTavily must be provided. "
"Use `await WebInterface.create()` or pass a ConnectorTavily."
)
@classmethod
async def create(cls) -> "WebInterface":
connectorWebTavily = await ConnectorTavily.create()
return WebInterface(connectorWebTavily=connectorWebTavily)
async def search(
self, web_search_request: WebSearchRequest
) -> WebSearchActionResult:
# NOTE: Add connectors here
return await self.connectorWebTavily.search_urls(web_search_request)
async def crawl(self, web_crawl_request: WebCrawlRequest) -> WebCrawlActionResult:
# NOTE: Add connectors here
return await self.connectorWebTavily.crawl_urls(web_crawl_request)
async def scrape(
self, web_scrape_request: WebScrapeRequest
) -> WebScrapeActionResult:
# NOTE: Add connectors here
return await self.connectorWebTavily.scrape(web_scrape_request)
def convert_web_result_to_json(self, web_result) -> str:
"""Convert WebCrawlActionResult or WebScrapeActionResult to proper JSON format"""
if not web_result.success or not web_result.documents:
return json.dumps({"success": web_result.success, "error": web_result.error})
# Extract the document data and convert to dict
document_data = web_result.documents[0].documentData
# Convert Pydantic model to dict
result_dict = {
"success": web_result.success,
"results": [
{
"url": str(result.url),
"content": result.content
}
for result in document_data.results
],
"total_count": document_data.total_count
}
# Add type-specific fields
if hasattr(document_data, 'urls'):
# WebCrawlDocumentData has urls field
result_dict["urls"] = [str(url) for url in document_data.urls]
elif hasattr(document_data, 'query'):
# WebScrapeDocumentData has query field
result_dict["query"] = document_data.query
return json.dumps(result_dict, indent=2, ensure_ascii=False)
def convert_web_search_result_to_csv(self, web_search_result: WebSearchActionResult) -> str:
"""Convert WebSearchActionResult to CSV format with url and title columns"""
if not web_search_result.success or not web_search_result.documents:
return ""
output = io.StringIO()
writer = csv.writer(output, delimiter=';')
# Write header
writer.writerow(['url', 'title'])
# Write data rows
document_data = web_search_result.documents[0].documentData
for result in document_data.results:
writer.writerow([str(result.url), result.title])
return output.getvalue()
def create_json_action_document(self, json_content: str, document_name: str) -> ActionDocument:
"""Create an ActionDocument with JSON content"""
return ActionDocument(
documentName=document_name,
documentData=json_content,
mimeType="application/json"
)
def create_csv_action_document(self, csv_content: str, document_name: str) -> ActionDocument:
"""Create an ActionDocument with CSV content"""
return ActionDocument(
documentName=document_name,
documentData=csv_content,
mimeType="text/csv"
)