201 lines
No EOL
7.7 KiB
Python
201 lines
No EOL
7.7 KiB
Python
import logging
|
|
import re
|
|
import requests
|
|
from typing import List, Dict, Any, Optional
|
|
from bs4 import BeautifulSoup
|
|
import json
|
|
import os
|
|
import configload as configload
|
|
|
|
|
|
# Logger konfigurieren
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Konfigurationsdaten laden
|
|
def load_config_data():
|
|
config = configload.load_config()
|
|
return {
|
|
"timeout": int(config.get('Connector_AiWebscraping', 'TIMEOUT', fallback="30")),
|
|
"max_urls": int(config.get('Connector_AiWebscraping', 'MAX_URLS', fallback="5")),
|
|
"max_content_length": int(config.get('Connector_AiWebscraping', 'MAX_CONTENT_LENGTH', fallback="10000")),
|
|
"user_agent": config.get('Connector_AiWebscraping', 'USER_AGENT', fallback="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
|
|
}
|
|
|
|
class WebScrapingService:
|
|
"""
|
|
Connector für Web-Scraping-Funktionalitäten.
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Konfiguration laden
|
|
self.config = load_config_data()
|
|
|
|
# Konfigurationswerte zu Instance-Attributen zuweisen
|
|
self.timeout = self.config["timeout"]
|
|
self.max_urls = self.config["max_urls"]
|
|
self.max_content_length = self.config["max_content_length"]
|
|
self.user_agent = self.config["user_agent"]
|
|
|
|
logger.info(f"WebScraping Connector initialisiert mit Timeout: {self.timeout}s")
|
|
|
|
def scrape_url(self, url: str) -> str:
|
|
"""
|
|
Scrapt den Inhalt einer URL und extrahiert den relevanten Text.
|
|
|
|
Args:
|
|
url: Die zu scrapende URL
|
|
|
|
Returns:
|
|
Der extrahierte Inhalt
|
|
|
|
Raises:
|
|
Exception: Bei Fehlern im Scraping-Prozess
|
|
"""
|
|
headers = {
|
|
'User-Agent': self.user_agent
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
# Entferne Skripte, Styles und andere unwichtige Elemente
|
|
for script in soup(["script", "style", "meta", "noscript", "iframe"]):
|
|
script.extract()
|
|
|
|
# Extrahiere den Hauptinhalt
|
|
main_content = ""
|
|
|
|
# Versuche, Hauptcontainer zu finden (häufige IDs und Klassen)
|
|
main_elements = soup.select('main, #main, .main, #content, .content, article, .article, .post, #post')
|
|
|
|
if main_elements:
|
|
# Nehme den ersten gefundenen Hauptcontainer
|
|
main_content = main_elements[0].get_text(separator='\n', strip=True)
|
|
else:
|
|
# Falls kein Hauptcontainer gefunden, nehme den Body-Text
|
|
main_content = soup.body.get_text(separator='\n', strip=True)
|
|
|
|
# Bereinige den Text (entferne mehrfache Leerzeilen etc.)
|
|
lines = [line.strip() for line in main_content.split('\n') if line.strip()]
|
|
main_content = '\n'.join(lines)
|
|
|
|
# Begrenze die Länge
|
|
if len(main_content) > self.max_content_length:
|
|
main_content = main_content[:self.max_content_length] + "...\n[Inhalt gekürzt]"
|
|
|
|
return main_content
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Scrapen von {url}: {str(e)}")
|
|
raise Exception(f"Fehler beim Scrapen von {url}: {str(e)}")
|
|
|
|
def extract_urls(self, text: str) -> List[str]:
|
|
"""
|
|
Extrahiert URLs aus einem Text.
|
|
|
|
Args:
|
|
text: Der zu analysierende Text
|
|
|
|
Returns:
|
|
Liste der gefundenen URLs
|
|
"""
|
|
# Einfacher URL-Extraktions-Regex
|
|
url_pattern = re.compile(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*(?:\?\S+)?')
|
|
return url_pattern.findall(text)
|
|
|
|
def extract_keywords(self, text: str) -> str:
|
|
"""
|
|
Extrahiert Schlüsselwörter aus einem Text.
|
|
|
|
Args:
|
|
text: Der zu analysierende Text
|
|
|
|
Returns:
|
|
Extrahierte Schlüsselwörter als String
|
|
"""
|
|
# Einfache Implementierung - in der Praxis könntest du NLP verwenden
|
|
words = text.split()
|
|
# Filtere kurze Wörter und häufige Stopwörter
|
|
stopwords = ["einen", "einer", "eines", "keine", "nicht", "diese", "dieses", "zwischen",
|
|
"und", "oder", "aber", "denn", "wenn", "weil", "obwohl", "während", "für",
|
|
"mit", "von", "aus", "nach", "bei", "über", "unter", "durch", "gegen"]
|
|
keywords = [w for w in words if len(w) > 4 and w.lower() not in stopwords]
|
|
return " ".join(keywords[:5]) # Begrenze auf 5 Keywords
|
|
|
|
async def search_web(self, query: str) -> str:
|
|
"""
|
|
Simuliert eine Websuche mit den gegebenen Schlüsselwörtern.
|
|
|
|
Args:
|
|
query: Suchbegriffe
|
|
|
|
Returns:
|
|
Ergebnisse der Suche (simuliert)
|
|
"""
|
|
# HINWEIS: Dies ist eine Simulation! In einer echten Anwendung
|
|
# würdest du Google Custom Search API, SerpAPI oder ähnliches verwenden
|
|
|
|
# Für eine echte Implementierung:
|
|
# - Google Custom Search API: https://developers.google.com/custom-search/v1/overview
|
|
# - SerpAPI: https://serpapi.com/
|
|
# - Oder ähnliche Dienste
|
|
|
|
return f"Hinweis: Dies ist eine Demo-Implementierung ohne echte Websuche. In der Produktion würde hier der Agent tatsächlich nach '{query}' suchen."
|
|
|
|
async def scrape_web_data(self, prompt: str) -> str:
|
|
"""
|
|
Führt Web-Scraping basierend auf dem Prompt durch
|
|
|
|
Args:
|
|
prompt: Der Benutzer-Prompt
|
|
|
|
Returns:
|
|
Gescrapte Webdaten als Text
|
|
"""
|
|
try:
|
|
# Extrahiere mögliche Schlüsselwörter oder URLs aus dem Prompt
|
|
keywords = self.extract_keywords(prompt)
|
|
urls = self.extract_urls(prompt)
|
|
|
|
results = []
|
|
|
|
# Falls direkte URLs im Prompt enthalten sind
|
|
if urls:
|
|
logger.info(f"Gefundene URLs: {', '.join(urls[:self.max_urls])}")
|
|
for url in urls[:self.max_urls]: # Begrenze auf max_urls
|
|
try:
|
|
logger.info(f"Scrape URL: {url}")
|
|
content = self.scrape_url(url)
|
|
if content:
|
|
results.append(f"## Inhalt von {url}\n{content}")
|
|
logger.info(f"Scraping von {url} erfolgreich")
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Scrapen von {url}: {e}")
|
|
|
|
# Falls keine URLs, versuche Suche mit Schlüsselwörtern
|
|
elif keywords:
|
|
logger.info(f"Verwende Keywords für Suche: {keywords}")
|
|
search_results = await self.search_web(keywords)
|
|
if search_results:
|
|
results.append(f"## Suchergebnisse für: {keywords}\n{search_results}")
|
|
logger.info("Suche abgeschlossen")
|
|
|
|
if results:
|
|
return "\n\n".join(results)
|
|
|
|
logger.warning("Keine relevanten Web-Daten gefunden")
|
|
return "Keine relevanten Web-Daten gefunden."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Web-Scraping: {e}")
|
|
return f"Web-Scraping konnte nicht durchgeführt werden: {str(e)}"
|
|
|
|
async def close(self):
|
|
"""
|
|
Schließt alle offenen Ressourcen.
|
|
"""
|
|
# Derzeit keine offenen Ressourcen zu schließen
|
|
pass |