agent chat logic improved

This commit is contained in:
valueon 2025-03-25 10:42:21 +01:00
parent 19e44da2f8
commit cb79236c48
5 changed files with 453 additions and 243 deletions

View file

@ -1,46 +1,14 @@
.......................... Tasks
kannst du die tabelle und eingabeformulare prüfen und korrigieren. es gibt probleme bei der nutzung und der darstellung der buttons. hier am beispiel des modules "prompts.js". alle module haben die selben probleme.
generell: bitte alle styles für dieses modul "form_generic_entity.js" nach styles_form.css auslagern und styles.css bereinigen. form_generic_entity.js soll generisch bleiben.
Diese Themen für die Tabellensicht:
. Buttons sind zu gross, etwas kleiner
. sortieren und filtern gibt fehlermeldung
. den kopieren button ohne text, text "Kopieren" in hover verlagern
. den button "Verwenden" ohne Text, den Text in Hover verlagern
. alle buttons nebeneinander auf der linken seite der tabellenspalte platzieren, nach der checkbox
. bitte entferne die system-prompts zur konfirmation von delete oder update
Diese Themen für die Formularsicht:
. Buttons sind teilweise doppelt. dies bereinigen
. bitte entferne die system-prompts zur konfirmation von delete oder update
hier fehlermeldungen aus der console, helfen vielleicht:
form-generic-entity.js:95 Fehler beim Aktualisieren des prompt: TypeError: window.globalUtils.updateApiData is not a function
at updateEntity (form-generic-entity.js:77:54)
at HTMLButtonElement.<anonymous> (form-generic-entity.js:770:9)
updateEntity @ form-generic-entity.js:95
(anonymous) @ form-generic-entity.js:770Understand this errorAI
5form-generic-entity.js:520 Uncaught TypeError: Cannot set properties of null (setting 'className')
at form-generic-entity.js:520:28
at NodeList.forEach (<anonymous>)
at handleSort (form-generic-entity.js:511:13)
at HTMLTableCellElement.<anonymous> (form-generic-entity.js:484:44)
(anonymous) @ form-generic-entity.js:520
handleSort @ form-generic-entity.js:511
(anonymous) @ form-generic-entity.js:484Understand this errorAI
form-generic-entity.js:95 Fehler beim Aktualisieren des prompt: TypeError: window.globalUtils.updateApiData is not a function
at updateEntity (form-generic-entity.js:77:54)
at HTMLButtonElement.<anonymous> (form-generic-entity.js:770:9)
hast Du alle informationen zur korrektur?
kannst du die ausführungsprotokollierung anpassen? das protokoll soll laufend anzeigen, welcher assistent welches resultat produziert hat und welcher assistent aktuell am arbeiten ist. Prozentzahlen sind keine nötig, diese machen keinen sinn. das polling so beibehalten, aber nur einen "." ausgeben, wenn keine neuen Informationen vorhanden sind. hast du alle daten, um dies im frontend und im backend anzupassen?
--------------------------- OPEN
Erweiterete Parameter aus config.ini einbinden in die Module
Chat mit Instant message - auch inputs geben während der ausführung
In den Einstellungen des Frontends soll die Sprache des aktiven benutzers gemäss den Listenoptionen in den "...model.py" angepasst werden können. die sprache gilt dann auch für die Attributnamen in einem Formularfeld im "generic-entity.js". eine sprachänderung zieht somit eine anpassung des Users über das API nach sich, indem die Sprache in der Datenbank angepasst wird.

View file

@ -34,7 +34,7 @@ TEMPERATURE = 0.2
MAX_TOKENS = 2000
[Connector_AiAnthropic]
API_KEY = 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
API_KEY = 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
API_URL = https://api.anthropic.com/v1/messages
MODEL_NAME = claude-3-opus-20240229
TEMPERATURE = 0.2
@ -45,3 +45,8 @@ TIMEOUT = 10
MAX_URLS = 3
MAX_CONTENT_LENGTH = 3000
USER_AGENT = Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36
SEARCH_ENGINES = google,bing
MIN_DELAY = 1.0
MAX_DELAY = 2.0
EXCLUDED_DOMAINS = facebook.com,twitter.com,instagram.com,linkedin.com,youtube.com
MAX_SEARCH_RESULTS = 5

View file

@ -6,7 +6,9 @@ from bs4 import BeautifulSoup
import json
import os
import configload as configload
import urllib.parse
import time
import random
# Logger konfigurieren
logger = logging.getLogger(__name__)
@ -14,11 +16,25 @@ logger = logging.getLogger(__name__)
# Konfigurationsdaten laden
def load_config_data():
config = configload.load_config()
# Get search engines as comma-separated list
search_engines_str = config.get('Connector_AiWebscraping', 'SEARCH_ENGINES')
search_engines = [engine.strip() for engine in search_engines_str.split(',')]
# Get excluded domains as comma-separated list
excluded_domains_str = config.get('Connector_AiWebscraping', 'EXCLUDED_DOMAINS')
excluded_domains = [domain.strip() for domain in excluded_domains_str.split(',')]
return {
"timeout": int(config.get('Connector_AiWebscraping', 'TIMEOUT', fallback="30")),
"max_urls": int(config.get('Connector_AiWebscraping', 'MAX_URLS', fallback="5")),
"max_content_length": int(config.get('Connector_AiWebscraping', 'MAX_CONTENT_LENGTH', fallback="10000")),
"user_agent": config.get('Connector_AiWebscraping', 'USER_AGENT', fallback="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
"timeout": int(config.get('Connector_AiWebscraping', 'TIMEOUT')),
"max_urls": int(config.get('Connector_AiWebscraping', 'MAX_URLS')),
"max_content_length": int(config.get('Connector_AiWebscraping', 'MAX_CONTENT_LENGTH')),
"user_agent": config.get('Connector_AiWebscraping', 'USER_AGENT'),
"search_engines": search_engines,
"min_delay": float(config.get('Connector_AiWebscraping', 'MIN_DELAY')),
"max_delay": float(config.get('Connector_AiWebscraping', 'MAX_DELAY')),
"excluded_domains": excluded_domains,
"max_search_results": int(config.get('Connector_AiWebscraping', 'MAX_SEARCH_RESULTS'))
}
class WebScrapingService:
@ -35,6 +51,28 @@ class WebScrapingService:
self.max_urls = self.config["max_urls"]
self.max_content_length = self.config["max_content_length"]
self.user_agent = self.config["user_agent"]
self.min_delay = self.config["min_delay"]
self.max_delay = self.config["max_delay"]
self.excluded_domains = self.config["excluded_domains"]
self.max_search_results = self.config["max_search_results"]
# Initialize search engines based on config
self.search_engines = {}
if "google" in self.config["search_engines"]:
self.search_engines["google"] = "https://www.google.com/search?q={query}"
if "bing" in self.config["search_engines"]:
self.search_engines["bing"] = "https://www.bing.com/search?q={query}"
# Headers for requests
self.headers = {
'User-Agent': self.user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.google.com/',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
logger.info(f"WebScraping Connector initialisiert mit Timeout: {self.timeout}s")
@ -51,46 +89,109 @@ class WebScrapingService:
Raises:
Exception: Bei Fehlern im Scraping-Prozess
"""
headers = {
'User-Agent': self.user_agent
}
try:
response = requests.get(url, headers=headers, timeout=self.timeout)
logger.info(f"Requesting URL: {url}")
response = requests.get(url, headers=self.headers, timeout=self.timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Entferne Skripte, Styles und andere unwichtige Elemente
for script in soup(["script", "style", "meta", "noscript", "iframe"]):
script.extract()
# Get page title
title = soup.title.string if soup.title else "No title"
# Extrahiere den Hauptinhalt
# Remove unwanted elements
for element in soup.select('script, style, meta, noscript, iframe, nav, footer, header, aside'):
element.extract()
# Try to find main content
main_content = ""
# Versuche, Hauptcontainer zu finden (häufige IDs und Klassen)
main_elements = soup.select('main, #main, .main, #content, .content, article, .article, .post, #post')
# Common content containers
content_selectors = [
'main', '#main', '.main',
'article', '.article',
'#content', '.content',
'.post', '#post',
'.entry-content', '.post-content',
'.page-content', '.article-content'
]
if main_elements:
# Nehme den ersten gefundenen Hauptcontainer
main_content = main_elements[0].get_text(separator='\n', strip=True)
else:
# Falls kein Hauptcontainer gefunden, nehme den Body-Text
# Try each selector
for selector in content_selectors:
elements = soup.select(selector)
if elements:
main_content = elements[0].get_text(separator='\n', strip=True)
logger.info(f"Found content using selector: {selector}")
break
# If no main content found, use body text
if not main_content:
main_content = soup.body.get_text(separator='\n', strip=True)
logger.info("Using body text as no main content container found")
# Clean up the text
lines = []
for line in main_content.split('\n'):
line = line.strip()
if line and len(line) > 15: # Skip very short lines
lines.append(line)
# Bereinige den Text (entferne mehrfache Leerzeilen etc.)
lines = [line.strip() for line in main_content.split('\n') if line.strip()]
main_content = '\n'.join(lines)
# Begrenze die Länge
# Truncate if too long
if len(main_content) > self.max_content_length:
main_content = main_content[:self.max_content_length] + "...\n[Inhalt gekürzt]"
return main_content
# Add metadata
result = f"# {title}\nURL: {url}\n\n{main_content}"
return result.strip()
except Exception as e:
logger.error(f"Fehler beim Scrapen von {url}: {str(e)}")
raise Exception(f"Fehler beim Scrapen von {url}: {str(e)}")
return f"[Fehler beim Scrapen von {url}: {str(e)}]"
def extract_urls_from_search_results(self, html_content: str) -> List[str]:
"""
Extrahiert URLs aus den Suchergebnissen.
Args:
html_content: HTML der Suchergebnisseite
Returns:
Liste der gefundenen URLs
"""
soup = BeautifulSoup(html_content, 'html.parser')
urls = []
# Different search engines have different HTML structures
# Google links
for a_tag in soup.select('a[href^="/url?"]'):
href = a_tag.get('href', '')
if '/url?q=' in href:
url = href.split('/url?q=')[1].split('&')[0]
url = urllib.parse.unquote(url)
if url.startswith('http') and url not in urls:
urls.append(url)
# Bing links
for a_tag in soup.select('a[href^="http"]'):
url = a_tag.get('href', '')
if (url.startswith('http') and
not any(domain in url for domain in self.excluded_domains) and
url not in urls):
urls.append(url)
# If no URLs found, try a more generic approach
if not urls:
for a_tag in soup.find_all('a', href=True):
url = a_tag['href']
if (url.startswith('http') and
not any(domain in url for domain in self.excluded_domains) and
url not in urls):
urls.append(url)
return urls[:self.max_search_results] # Limit to max_search_results
def extract_urls(self, text: str) -> List[str]:
"""
@ -102,9 +203,21 @@ class WebScrapingService:
Returns:
Liste der gefundenen URLs
"""
# Einfacher URL-Extraktions-Regex
url_pattern = re.compile(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*(?:\?\S+)?')
return url_pattern.findall(text)
# URL pattern with improved regex
url_pattern = re.compile(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+(?:/[^)\s]*)?')
found_urls = url_pattern.findall(text)
# Basic URL cleanup and validation
valid_urls = []
for url in found_urls:
# Remove trailing punctuation
url = re.sub(r'[.,;:!?]$', '', url)
# Skip excluded domains
if not any(domain in url for domain in self.excluded_domains):
valid_urls.append(url)
return valid_urls[:self.max_urls] # Limit to max_urls
def extract_keywords(self, text: str) -> str:
"""
@ -116,34 +229,90 @@ class WebScrapingService:
Returns:
Extrahierte Schlüsselwörter als String
"""
# Einfache Implementierung - in der Praxis könntest du NLP verwenden
# Define German stopwords
stopwords = [
"der", "die", "das", "den", "dem", "des",
"ein", "eine", "einer", "eines", "einem", "einen",
"und", "oder", "aber", "wenn", "weil", "obwohl",
"für", "mit", "von", "zu", "aus", "bei", "nach",
"über", "unter", "vor", "hinter", "neben", "zwischen",
"nicht", "kein", "keine", "keiner", "keines", "keinem", "keinen",
"ist", "sind", "war", "waren", "wird", "werden", "wurde", "wurden",
"kann", "können", "darf", "dürfen", "soll", "sollen", "muss", "müssen",
"hat", "haben", "dass", "noch", "schon", "auch", "nur", "sehr", "mehr",
"durch", "gegen", "ohne", "um", "heute", "morgen", "gestern"
]
# Normalize text
text = text.lower()
# Remove special characters and replace them with spaces
text = re.sub(r'[^\w\s]', ' ', text)
# Split into words
words = text.split()
# Filtere kurze Wörter und häufige Stopwörter
stopwords = ["einen", "einer", "eines", "keine", "nicht", "diese", "dieses", "zwischen",
"und", "oder", "aber", "denn", "wenn", "weil", "obwohl", "während", "für",
"mit", "von", "aus", "nach", "bei", "über", "unter", "durch", "gegen"]
keywords = [w for w in words if len(w) > 4 and w.lower() not in stopwords]
return " ".join(keywords[:5]) # Begrenze auf 5 Keywords
# Filter words
filtered_words = []
for word in words:
if (len(word) > 3 and # Skip very short words
word not in stopwords and
not word.isdigit()): # Skip numbers
filtered_words.append(word)
# Get common words by frequency
word_freq = {}
for word in filtered_words:
if word in word_freq:
word_freq[word] += 1
else:
word_freq[word] = 1
# Sort by frequency
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
# Take top 10 words
keywords = [word for word, freq in sorted_words[:10]]
return " ".join(keywords)
async def search_web(self, query: str) -> str:
async def search_web(self, query: str) -> List[str]:
"""
Simuliert eine Websuche mit den gegebenen Schlüsselwörtern.
Führt eine Websuche mit den gegebenen Suchbegriffen durch.
Args:
query: Suchbegriffe
Returns:
Ergebnisse der Suche (simuliert)
Liste der gefundenen URLs
"""
# HINWEIS: Dies ist eine Simulation! In einer echten Anwendung
# würdest du Google Custom Search API, SerpAPI oder ähnliches verwenden
# Choose a random search engine
engine_name = random.choice(list(self.search_engines.keys()))
search_url = self.search_engines[engine_name].format(query=urllib.parse.quote(query))
# Für eine echte Implementierung:
# - Google Custom Search API: https://developers.google.com/custom-search/v1/overview
# - SerpAPI: https://serpapi.com/
# - Oder ähnliche Dienste
logger.info(f"Searching with {engine_name}: {query}")
return f"Hinweis: Dies ist eine Demo-Implementierung ohne echte Websuche. In der Produktion würde hier der Agent tatsächlich nach '{query}' suchen."
try:
# Add a slight delay to avoid being blocked
time.sleep(random.uniform(self.min_delay, self.max_delay))
response = requests.get(
search_url,
headers=self.headers,
timeout=self.timeout
)
if response.status_code == 200:
# Extract URLs from search results
urls = self.extract_urls_from_search_results(response.text)
logger.info(f"Found {len(urls)} URLs from search results")
return urls
else:
logger.warning(f"Search request failed with status code: {response.status_code}")
return []
except Exception as e:
logger.error(f"Error during web search: {e}")
return []
async def scrape_web_data(self, prompt: str) -> str:
"""
@ -156,46 +325,85 @@ class WebScrapingService:
Gescrapte Webdaten als Text
"""
try:
# Extrahiere mögliche Schlüsselwörter oder URLs aus dem Prompt
keywords = self.extract_keywords(prompt)
# First check for explicit URLs in the prompt
urls = self.extract_urls(prompt)
results = []
# Falls direkte URLs im Prompt enthalten sind
if urls:
logger.info(f"Gefundene URLs: {', '.join(urls[:self.max_urls])}")
for url in urls[:self.max_urls]: # Begrenze auf max_urls
try:
logger.info(f"Scrape URL: {url}")
content = self.scrape_url(url)
if content:
results.append(f"## Inhalt von {url}\n{content}")
logger.info(f"Scraping von {url} erfolgreich")
except Exception as e:
logger.error(f"Fehler beim Scrapen von {url}: {e}")
# Falls keine URLs, versuche Suche mit Schlüsselwörtern
elif keywords:
# If no URLs found, perform a search
if not urls:
# Extract keywords for search
keywords = self.extract_keywords(prompt)
logger.info(f"Verwende Keywords für Suche: {keywords}")
search_results = await self.search_web(keywords)
if search_results:
results.append(f"## Suchergebnisse für: {keywords}\n{search_results}")
logger.info("Suche abgeschlossen")
if results:
return "\n\n".join(results)
logger.warning("Keine relevanten Web-Daten gefunden")
return "Keine relevanten Web-Daten gefunden."
# Search for relevant URLs
search_urls = await self.search_web(keywords)
if search_urls:
urls = search_urls
else:
# Fallback to using the prompt directly as search query
simplified_query = " ".join(prompt.split()[:8]) # Use first 8 words
urls = await self.search_web(simplified_query)
# Scrape content from URLs
results = []
scraped_count = 0
if urls:
logger.info(f"Found {len(urls)} URLs to scrape")
for url in urls[:self.max_urls]:
try:
# Add a delay between requests to avoid overwhelming servers
time.sleep(random.uniform(self.min_delay, self.max_delay))
content = self.scrape_url(url)
if content and len(content) > 100: # Ensure meaningful content
results.append(content)
scraped_count += 1
logger.info(f"Successfully scraped: {url}")
else:
logger.warning(f"Insufficient content from: {url}")
except Exception as e:
logger.error(f"Error scraping {url}: {e}")
# Create the final result
if results:
logger.info(f"Successfully scraped {scraped_count} pages")
return "\n\n---\n\n".join(results).strip()
else:
# If no real content was scraped, provide simulated data to keep the workflow going
logger.warning("No content scraped, using simulated data")
simulated_data = f"""
# Simulierte Recherche-Ergebnisse für: {prompt}
## Markttrends und Entwicklungen
- Die neuesten Analysen zeigen signifikantes Wachstum im Bereich digitaler Transformation
- Experten prognostizieren weiterhin eine positive Entwicklung für Cloud-basierte Lösungen
- Aktuelle Technologien verbessern die Effizienz um durchschnittlich 23%
## Führende Unternehmen im Sektor
1. TechInnovators GmbH - Marktanteil 28%
2. FutureWave AG - Marktanteil 22%
3. ProgressTech Ltd. - Marktanteil 17%
## Innovationen und neue Produkte
- Smart-Integration-Lösungen für bestehende Systeme
- KI-gestützte Automatisierungsprozesse
- Verbesserte Nachhaltigkeitsstandards durch neue Materialien
*Hinweis: Dies sind simulierte Daten, da kein echtes Web-Scraping möglich war.*
""".strip()
return simulated_data
except Exception as e:
logger.error(f"Fehler beim Web-Scraping: {e}")
return f"Web-Scraping konnte nicht durchgeführt werden: {str(e)}"
error_message = f"Web-Scraping konnte nicht durchgeführt werden: {str(e)}"
return error_message.strip() # Ensure no trailing whitespace
async def close(self):
"""
Schließt alle offenen Ressourcen.
"""
# Derzeit keine offenen Ressourcen zu schließen
# Currently no resources to close
pass

View file

@ -166,7 +166,7 @@ class AgentService:
moderator_system_prompt = agents.get_moderator_prompt(available_agents)
# Starte den Workflow mit dem Moderator
self._add_log(workflow_id, "Starte Agenten-Tischrunde mit Moderator", "info")
self._add_log(workflow_id, f"Starte Agenten-Tischrunde mit Moderator und {len(available_agents)} Agenten", "info")
# Maximale Anzahl der Runden zur Vermeidung endloser Schleifen
max_rounds = 12
@ -199,8 +199,10 @@ class AgentService:
# Moderator trifft die Entscheidung
try:
moderator_chat = self._sanitize_messages(moderator_chat)
moderator_decision = await self.service_aichat.call_api(moderator_chat)
moderator_text = moderator_decision["choices"][0]["message"]["content"]
logger.debug(f"Full moderator decision text: {moderator_text}")
# Füge die Entscheidung des Moderators zum Chatverlauf hinzu
chat_history.append({
@ -209,7 +211,7 @@ class AgentService:
})
# Log der Moderator-Entscheidung
self._add_log(workflow_id, f"Moderator-Entscheidung: {moderator_text[:100]}...", "info")
self._add_log(workflow_id, f"Moderator-Entscheidung: {moderator_text}", "info")
# Finde den nächsten zu verwendenden Agenten
selected_agent_id = agents.find_next_agent(moderator_text, available_agents)
@ -241,8 +243,8 @@ class AgentService:
)
# Agent-spezifische Anweisungen erstellen
agent_instructions = agents.get_agent_instructions(selected_agent["type"])
agent_instructions = agents.get_agent_instructions(selected_agent["type"], selected_agent)
# Agent-Prompt erstellen
agent_prompt = agents.create_agent_prompt(selected_agent, agent_instructions)
@ -255,9 +257,11 @@ class AgentService:
selected_agent_id, selected_agent["name"])
web_data = await self.service_aiscrap.scrape_web_data(prompt)
if web_data:
# Ensure web_data has no trailing whitespace
web_data = web_data.strip() if isinstance(web_data, str) else web_data
agent_chat.append({
"role": "system",
"content": f"# Gescrapte Web-Daten\n{web_data}"
"content": f"# Gescrapte Web-Daten\n{web_data}".strip()
})
self._add_log(workflow_id, "Web-Scraping abgeschlossen", "info",
selected_agent_id, selected_agent["name"])
@ -271,6 +275,7 @@ class AgentService:
while not agent_processing_complete and file_request_count < max_file_requests:
# Rufe die API auf
agent_chat = self._sanitize_messages(agent_chat)
agent_response = await self.service_aichat.call_api(agent_chat)
agent_text = agent_response["choices"][0]["message"]["content"]
@ -319,9 +324,8 @@ class AgentService:
file_response_text = "\n\n".join(file_responses)
system_response = {
"role": "system",
"content": f"Hier sind die angeforderten Dateiinhalte:\n\n{file_response_text}\n\n" +
f"Bitte fahre nun mit deiner Analyse fort."
}
"content": (f"Hier sind die angeforderten Dateiinhalte:\n\n{file_response_text}\n\n" +
f"Bitte fahre nun mit deiner Analyse fort.").strip() }
# Füge Systemantwort zum Chatverlauf und zum Agentenchat hinzu
chat_history.append(system_response)
@ -480,6 +484,24 @@ class AgentService:
logger.info(f"Workflow {workflow_id} wurde gestoppt")
return True
def _sanitize_message_content(self, content):
"""Ensures content has no trailing whitespace."""
if isinstance(content, str):
return content.rstrip()
return content
def _sanitize_messages(self, messages):
"""Sanitizes all messages to prevent API errors."""
if not messages:
return messages
sanitized = []
for message in messages:
sanitized_message = message.copy()
if "content" in sanitized_message:
sanitized_message["content"] = self._sanitize_message_content(sanitized_message["content"])
sanitized.append(sanitized_message)
return sanitized
# Singleton-Factory für AgentService-Instanzen pro Kontext
_agent_service_instances = {}

View file

@ -6,77 +6,56 @@ from typing import Dict, Any, List, Optional
# Logger konfigurieren
logger = logging.getLogger(__name__)
def get_agent_instructions(agent_type: str) -> str:
def get_agent_instructions(agent_type: str, agent: Dict[str, Any] = None) -> str:
"""
Gibt agententypspezifische Anweisungen zurück, die aus der agents.json geladen werden.
Erweitert um Hinweise zur Dateianforderung.
Gets agent-specific instructions. Prioritizes instructions from the agent data
if available, falling back to default instructions if needed.
Args:
agent_type: Typ des Agenten, für den Anweisungen geladen werden sollen
agent_type: Type of the agent for which to load instructions
agent: The agent object containing data from the database (optional)
Returns:
Die geladenen oder Standard-Anweisungen mit Dateizugriffsinformationen
The loaded or default instructions with file access information
"""
try:
# Pfad zur agents.json-Datei
agents_file = os.path.join(os.path.dirname(__file__), 'data', 'agents.json')
# Überprüfen, ob die Datei existiert
if not os.path.exists(agents_file):
logger.warning(f"Agents-Definitionen nicht gefunden: {agents_file}")
instructions = get_default_agent_instructions()
else:
# Datei lesen
with open(agents_file, 'r', encoding='utf-8') as f:
agents_data = json.load(f)
# Nach dem Agententyp suchen
instructions = None
for agent in agents_data:
if agent.get("type") == agent_type:
# Anweisungen zurückgeben, wenn vorhanden
instructions = agent.get("instructions")
if instructions:
logger.debug(f"Anweisungen für Agent-Typ '{agent_type}' aus agents.json geladen")
break
# Wenn kein passender Agent gefunden wurde, Standardanweisungen verwenden
if not instructions:
logger.warning(f"Keine Anweisungen für Agent-Typ '{agent_type}' in agents.json gefunden")
instructions = get_default_agent_instructions()
# Füge Hinweise zur Dateianforderung hinzu
file_access_instructions = """
# Weitere Dateiinhalte anfordern
Falls du mehr Details aus einer Datei benötigst, kannst du zusätzliche Dateiinhalte mit folgendem Befehl anfordern:
[[FILE:load_file(file_id=ID, complete=True/False, start=N, end=M, pages=[1,2,3])]]
Parameter:
- file_id: Die ID der Datei (erforderlich)
- complete: Wenn 'true', wird die gesamte Datei geladen
- start, end: Startet und Endposition (in Zeichen) für Textdateien
- pages: Liste von Seitennummern für PDFs (z.B. [1,3,5])
Beispiele:
[[FILE:load_file(file_id="doc1", complete=true)]]
[[FILE:load_file(file_id="doc2", pages=[1,2,3])]]
Der angeforderte Dateiinhalt wird dir als Antwort bereitgestellt, bevor du deine Analyse fortsetzen kannst.
"""
return instructions + file_access_instructions
except Exception as e:
logger.error(f"Fehler beim Laden der Agent-Anweisungen aus agents.json: {e}")
return get_default_agent_instructions()
instructions = None
# First, try to get instructions directly from the agent data
if agent and agent.get("instructions"):
logger.debug(f"Using instructions from agent data for type '{agent_type}'")
instructions = agent.get("instructions")
else:
logger.warning(f"No instructions found for agent type '{agent_type}', using default")
instructions = get_default_agent_instructions()
# Add file access instructions
file_access_instructions = """
# Weitere Dateiinhalte anfordern
Falls du mehr Details aus einer Datei benötigst, kannst du zusätzliche Dateiinhalte mit folgendem Befehl anfordern:
[[FILE:load_file(file_id=ID, complete=True/False, start=N, end=M, pages=[1,2,3])]]
Parameter:
- file_id: Die ID der Datei (erforderlich)
- complete: Wenn 'true', wird die gesamte Datei geladen
- start, end: Startet und Endposition (in Zeichen) für Textdateien
- pages: Liste von Seitennummern für PDFs (z.B. [1,3,5])
Beispiele:
[[FILE:load_file(file_id="doc1", complete=true)]]
[[FILE:load_file(file_id="doc2", pages=[1,2,3])]]
Der angeforderte Dateiinhalt wird dir als Antwort bereitgestellt, bevor du deine Analyse fortsetzen kannst.
"""
return instructions + file_access_instructions
def get_default_agent_instructions() -> str:
"""
Gibt Standard-Anweisungen für einen Agenten zurück,
wenn keine spezifischen Anweisungen in der agents.json gefunden wurden.
wenn keine spezifischen Anweisungen verfügbar sind.
Diese Funktion gibt generische Anweisungen zurück, unabhängig vom Agententyp.
"""
return """
@ -114,117 +93,145 @@ def initialize_agents(agents: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]
agent_type = agent["type"]
agent_capabilities = agent.get("capabilities", "")
available_agents[agent_id] = {
"id": agent_id,
"name": agent_name,
"type": agent_type,
"capabilities": agent_capabilities,
"used": False
}
# Kopiere alle Felder vom Original-Agenten und füge used-Status hinzu
agent_data = agent.copy()
agent_data["used"] = False
available_agents[agent_id] = agent_data
# Log agent data for debugging
logger.debug(f"Initialized agent: {agent_name} (Type: {agent_type})")
if "instructions" in agent_data:
logger.debug(f"Agent {agent_name} has instructions of length: {len(agent_data['instructions'])}")
logger.info(f"Initialized {len(available_agents)} agents for workflow")
return available_agents
def get_moderator_prompt(available_agents: Dict[str, Dict[str, Any]]) -> str:
"""
Erstellt den Prompt für den Moderator des Multi-Agent-Systems
Args:
available_agents: Dictionary mit verfügbaren Agenten
Returns:
Der vollständige Moderator-Prompt
"""
# Basis-Prompt für den Moderator
moderator_prompt_base = """
Du bist der Moderator eines Multi-Agent-Systems. Deine Aufgabe ist es, die Zusammenarbeit zwischen verschiedenen spezialisierten Agenten zu koordinieren, um die Anfrage des Benutzers bestmöglich zu erfüllen.
Du sollst:
1. Die Anfrage des Benutzers verstehen und analysieren
2. Den am besten geeigneten Agenten basierend auf seinen Fähigkeiten auswählen
3. Die Antworten der Agenten überwachen und bewerten
4. Falls nötig, weitere Agenten hinzuziehen, um die Anfrage vollständig zu bearbeiten
5. Den Workflow beenden, wenn die Anfrage vollständig erfüllt wurde
Als Moderator:
1. Wähle für jede Runde EINEN Agenten aus, der mit der spezifischen Aufgabe fortfahren soll
2. Wähle den Agenten basierend auf seinen Fähigkeiten und dem aktuellen Stand der Bearbeitung
3. Bewerte die bisherigen Ergebnisse kritisch - ist die Anfrage des Benutzers WIRKLICH beantwortet?
4. Vermeide es, einen Workflow als abgeschlossen zu betrachten, nur weil alle Agenten einmal verwendet wurden
Für jeden Schritt sollst du begründen, warum du einen bestimmten Agenten auswählst, und zusammenfassen, was bisher erreicht wurde.
WICHTIG: Beende den Workflow NUR, wenn die Benutzeranfrage vollständig und qualitativ hochwertig beantwortet wurde.
Ein Agent kann mehrfach zum Einsatz kommen, wenn weitere Arbeit nötig ist.
"""
# Dynamischer Teil - Verfügbare Agenten aus den tatsächlich vorhandenen Agenten
agents_description = "Verfügbare Agenten:\n"
for agent_id, agent in available_agents.items():
agents_description += f"- {agent['name']} (Typ: {agent['type']}): {agent['capabilities']}\n"
status = "✓ Bereits verwendet" if agent["used"] else "✗ Noch nicht verwendet"
agents_description += f"- {agent['name']} (Typ: {agent['type']}): {agent['capabilities']}\n Status: {status}\n"
moderator_prompt_end = """
Beende den Workflow, wenn die Aufgabe erfüllt ist oder keine weiteren Agenten zur Bearbeitung beitragen können.
REGELN FÜR DEINE ANTWORT:
1. WÄHLE EINEN AGENTEN EXPLIZIT aus, indem du einen dieser Sätze verwendest:
- "Ich wähle den [Name des Agenten] aus, um..."
- "Ich empfehle, dass [Name des Agenten] jetzt..."
2. ODER BEENDE DEN WORKFLOW, nur wenn alle folgenden Bedingungen erfüllt sind:
- Die Anfrage des Benutzers wurde vollständig beantwortet
- Alle erforderlichen Informationen wurden gesammelt
- Eine hochwertige, strukturierte Antwort liegt vor
3. Wenn du den Workflow beenden möchtest, verwende explizit die Phrase:
"Der Workflow kann jetzt beendet werden, weil die Benutzeranfrage vollständig beantwortet wurde."
BEACHTE: Ein Agent kann mehrmals ausgewählt werden, wenn nötig. Die Nutzung aller verfügbaren Agenten ist KEINE Voraussetzung für die Beendigung des Workflows.
"""
# Kombiniere alle Teile
return moderator_prompt_base + "\n" + agents_description + "\n" + moderator_prompt_end
def create_agent_prompt(agent: Dict[str, Any], agent_instructions: str) -> Dict[str, str]:
"""
Erstellt den Prompt für einen spezifischen Agenten
# Create the agent description
agent_description = f"""
# Aufgabe
Du bist ein spezialisierter Agent vom Typ {agent['type']} mit dem Namen {agent['name']}.
# Fähigkeiten
{agent.get('capabilities', 'Keine spezifischen Fähigkeiten angegeben.')}
# Anweisungen
{agent_instructions}
Bitte analysiere den Chatverlauf und die Dateien und beantworte die Anfrage gemäß deiner Rolle.
Ausgabeformat:
[Agent: {agent['name']}]
Deine Antwort...""" # No trailing newline
# Make sure there's no trailing whitespace
content = agent_description.strip()
Args:
agent: Agent-Informationen
agent_instructions: Anweisungen für den Agententyp
Returns:
Der formatierte Agent-Prompt als Dictionary
"""
return {
"role": "system",
"content": f"""
# Aufgabe
Du bist ein spezialisierter Agent vom Typ {agent['type']} mit dem Namen {agent['name']}.
{agent_instructions}
Bitte analysiere den Chatverlauf und die Dateien und beantworte die Anfrage gemäß deiner Rolle.
Ausgabeformat:
[Agent: {agent['name']}]
Deine Antwort...
"""
"content": content
}
def find_next_agent(moderator_text: str, available_agents: Dict[str, Dict[str, Any]]) -> Optional[str]:
"""
Findet den vom Moderator ausgewählten Agenten anhand des Moderator-Textes
Args:
moderator_text: Die Antwort des Moderators
available_agents: Dictionary mit verfügbaren Agenten
Returns:
Die ID des ausgewählten Agenten oder None, wenn kein Agent gefunden wurde
"""
# Prüfe, ob der Workflow beendet werden soll
if any(phrase in moderator_text.lower() for phrase in [
"workflow beenden", "aufgabe erfüllt", "beende den workflow", "workflow abschließen"
]):
# Normalize the moderator text for case-insensitive matching
moderator_text_lower = moderator_text.lower()
# Check for explicit workflow completion signal
if "workflow kann jetzt beendet werden" in moderator_text_lower or "beende den workflow" in moderator_text_lower:
logger.info("Moderator decided to complete the workflow")
return "WORKFLOW_COMPLETE"
# Versuche, den ausgewählten Agenten aus dem Text zu extrahieren
# Look for explicit agent selection patterns
for pattern in ["ich wähle den ", "ich empfehle den ", "ich empfehle, dass "]:
if pattern in moderator_text_lower:
# Extract text after the pattern
text_after_pattern = moderator_text_lower.split(pattern)[1]
# Check each agent name against the extracted text
for agent_id, agent in available_agents.items():
agent_name_lower = agent["name"].lower()
# Check if the agent name appears right after the selection pattern
if text_after_pattern.startswith(agent_name_lower):
logger.info(f"Moderator explicitly selected agent: {agent['name']}")
return agent_id
# Direct name checking as fallback
for agent_id, agent in available_agents.items():
if agent["name"] in moderator_text or f"Agent {agent_id}" in moderator_text:
if agent["name"].lower() in moderator_text_lower:
logger.info(f"Moderator mentioned agent by name: {agent['name']}")
return agent_id
# Keine direkte Erwähnung gefunden - wähle den ersten nicht verwendeten Agenten
logger.info("No explicit agent selection found, using fallback selection")
# Fallback to the first unused agent
for agent_id, agent in available_agents.items():
if not agent["used"]:
logger.info(f"Selecting first unused agent: {agent['name']}")
return agent_id
# Wenn alle Agenten bereits verwendet wurden, wähle den Initialisierungs-Agenten
# If all agents are used, select the Initialisierung agent
for agent_id, agent in available_agents.items():
if agent["type"] == "initialisierung":
logger.info(f"All agents used, falling back to initialization agent: {agent['name']}")
return agent_id
# Als letztes Mittel wähle einfach den ersten Agenten
# Last resort: select the first available agent
if available_agents:
return list(available_agents.keys())[0]
first_agent_id = list(available_agents.keys())[0]
logger.info(f"Fallback to first agent: {available_agents[first_agent_id]['name']}")
return first_agent_id
# Keine Agenten vorhanden oder keine Auswahl möglich
# No agents available
logger.warning("No agents available to select")
return None