""" Web method module. Handles web operations using the web service. """ import logging from typing import Dict, Any, List, Optional from datetime import datetime from modules.interfaces.interfaceWeb import WebService from modules.methods.methodBase import MethodBase, MethodResult, action logger = logging.getLogger(__name__) class MethodWeb(MethodBase): """Web method implementation""" def __init__(self, serviceContainer): """Initialize the web method""" super().__init__(serviceContainer) self.webService = WebService(serviceContainer) @action async def search(self, parameters: Dict[str, Any], authData: Optional[Dict[str, Any]] = None) -> MethodResult: """ Search web content Args: parameters: query: Search query engine: Search engine to use (google, bing) maxResults: Maximum number of results """ try: query = parameters["query"] engine = parameters.get("engine", "google") maxResults = parameters.get("maxResults", 10) # Search web results = await self.webService.searchContent( query=query, engine=engine, maxResults=maxResults ) return self._createResult( success=True, data={ "query": query, "engine": engine, "results": results } ) except Exception as e: logger.error(f"Error searching web: {str(e)}") return self._createResult( success=False, data={"error": str(e)} ) @action async def crawl(self, parameters: Dict[str, Any], authData: Optional[Dict[str, Any]] = None) -> MethodResult: """ Crawl web page Args: parameters: url: URL to crawl depth: Crawl depth followLinks: Whether to follow links extractContent: Whether to extract content """ try: url = parameters["url"] depth = parameters.get("depth", 1) followLinks = parameters.get("followLinks", False) extractContent = parameters.get("extractContent", True) # Crawl page results = await self.webService.crawlPage( url=url, depth=depth, followLinks=followLinks, extractContent=extractContent ) return self._createResult( success=True, data={ "url": url, "depth": depth, "results": results } ) except Exception as e: logger.error(f"Error crawling web page: {str(e)}") return self._createResult( success=False, data={"error": str(e)} ) @action async def extract(self, parameters: Dict[str, Any], authData: Optional[Dict[str, Any]] = None) -> MethodResult: """ Extract content from web page Args: parameters: url: URL to extract from selectors: CSS selectors to extract format: Output format (text, html, json) """ try: url = parameters["url"] selectors = parameters.get("selectors", ["body"]) format = parameters.get("format", "text") # Extract content content = await self.webService.extractContent( url=url, selectors=selectors, format=format ) return self._createResult( success=True, data={ "url": url, "format": format, "content": content } ) except Exception as e: logger.error(f"Error extracting web content: {str(e)}") return self._createResult( success=False, data={"error": str(e)} ) @action async def validate(self, parameters: Dict[str, Any], authData: Optional[Dict[str, Any]] = None) -> MethodResult: """ Validate web page Args: parameters: url: URL to validate checks: List of checks to perform """ try: url = parameters["url"] checks = parameters.get("checks", ["accessibility", "seo", "performance"]) # Validate page results = await self.webService.validatePage( url=url, checks=checks ) return self._createResult( success=True, data={ "url": url, "checks": checks, "results": results } ) except Exception as e: logger.error(f"Error validating web page: {str(e)}") return self._createResult( success=False, data={"error": str(e)} )