import os import logging from typing import Dict, Any, List, Optional, Union import importlib from connector_db_json import JSONDatabaseConnector logger = logging.getLogger(__name__) class LucyDOMInterface: """ Interface zur LucyDOM-Datenbank. Verwendet den JSON-Konnektor für den Datenzugriff. """ def __init__(self, mandate_id: int, user_id: int): """ Initialisiert das LucyDOM-Interface mit Mandanten- und Benutzerkontext. Args: mandate_id: ID des aktuellen Mandanten user_id: ID des aktuellen Benutzers """ self.mandate_id = mandate_id self.user_id = user_id # Datenverzeichnis self.data_folder = "_database_lucydom" os.makedirs(self.data_folder, exist_ok=True) # Datenmodell-Modul importieren try: self.model_module = importlib.import_module("modules.lucydom_model") logger.info("lucydom_model erfolgreich importiert") except ImportError as e: logger.error(f"Fehler beim Importieren von lucydom_model: {e}") raise # Konnektor erstellen self.db = JSONDatabaseConnector( db_folder=self.data_folder, mandate_id=mandate_id, user_id=user_id ) # Datenbank initialisieren, falls nötig self._initialize_database() def _initialize_database(self): """ Initialisiert die Datenbank mit minimalen Objekten, falls sie noch nicht existiert. """ # Prüfe, ob die Tabelle "workspaces" existiert workspaces = self.db.get_recordset("workspaces") # Wenn keine Workspaces existieren, erstelle den Default Workspace if not workspaces: logger.info("Erstelle Default Workspace") # Erstelle den Default Workspace default_workspace = { "mandate_id": self.mandate_id, "user_id": self.user_id, "name": "Default Workspace", "created_at": self._get_current_timestamp() } created_workspace = self.db.record_create("workspaces", default_workspace) logger.info(f"Default Workspace wurde erstellt mit ID {created_workspace['id']}") def _get_current_timestamp(self) -> str: """Gibt den aktuellen Zeitstempel im ISO-Format zurück""" from datetime import datetime return datetime.now().isoformat() def get_initial_id(self, table: str) -> Optional[int]: """ Gibt die initiale ID für eine Tabelle zurück. Args: table: Name der Tabelle Returns: Die initiale ID oder None, wenn nicht vorhanden """ return self.db.get_initial_id(table) # Workspace-Methoden def get_all_workspaces(self) -> List[Dict[str, Any]]: """Gibt alle Workspaces des aktuellen Mandanten zurück""" return self.db.get_recordset("workspaces") def get_workspace(self, workspace_id: int) -> Optional[Dict[str, Any]]: """Gibt einen Workspace anhand seiner ID zurück""" workspaces = self.db.get_recordset("workspaces", record_filter={"id": workspace_id}) if workspaces: return workspaces[0] return None def create_workspace(self, name: str) -> Dict[str, Any]: """Erstellt einen neuen Workspace""" workspace_data = { "mandate_id": self.mandate_id, "user_id": self.user_id, "name": name, "created_at": self._get_current_timestamp() } return self.db.record_create("workspaces", workspace_data) def update_workspace(self, workspace_id: int, name: str) -> Dict[str, Any]: """ Aktualisiert einen vorhandenen Workspace Args: workspace_id: ID des zu aktualisierenden Workspaces name: Neuer Name des Workspaces Returns: Das aktualisierte Workspace-Objekt """ # Prüfen, ob der Workspace existiert workspace = self.get_workspace(workspace_id) if not workspace: return None # Daten für die Aktualisierung vorbereiten workspace_data = { "name": name } # Workspace aktualisieren return self.db.record_modify("workspaces", workspace_id, workspace_data) def delete_workspace(self, workspace_id: int) -> bool: """ Löscht einen Workspace aus der Datenbank Returns: True, wenn der Workspace erfolgreich gelöscht wurde, sonst False """ # Prüfen, ob es der initiale Workspace ist initial_workspace_id = self.get_initial_id("workspaces") if initial_workspace_id is not None and workspace_id == initial_workspace_id: logger.warning("Versuch, den Default Workspace zu löschen, wurde verhindert") return False return self.db.record_delete("workspaces", workspace_id) # Agent-Methoden def get_all_agents(self) -> List[Dict[str, Any]]: """Gibt alle Agenten des aktuellen Mandanten zurück""" return self.db.get_recordset("agents") def get_agents_by_workspace(self, workspace_id: int) -> List[Dict[str, Any]]: """Gibt alle Agenten eines Workspaces zurück""" return self.db.get_recordset("agents", record_filter={"workspace_id": workspace_id}) def get_agent(self, agent_id: int) -> Optional[Dict[str, Any]]: """Gibt einen Agenten anhand seiner ID zurück""" agents = self.db.get_recordset("agents", record_filter={"id": agent_id}) if agents: return agents[0] return None def create_agent(self, name: str, agent_type: str, workspace_id: int, capabilities: str = None, description: str = None) -> Dict[str, Any]: """Erstellt einen neuen Agenten""" agent_data = { "mandate_id": self.mandate_id, "user_id": self.user_id, "name": name, "type": agent_type, "workspace_id": workspace_id, "capabilities": capabilities, "description": description } return self.db.record_create("agents", agent_data) def update_agent(self, agent_id: int, name: str, agent_type: str, workspace_id: int, capabilities: str = None, description: str = None) -> Dict[str, Any]: """ Aktualisiert einen vorhandenen Agenten Args: agent_id: ID des zu aktualisierenden Agenten name: Neuer Name des Agenten agent_type: Neuer Typ des Agenten workspace_id: ID des Workspaces, zu dem der Agent gehört capabilities: Fähigkeiten des Agenten description: Beschreibung des Agenten Returns: Das aktualisierte Agenten-Objekt """ # Prüfen, ob der Agent existiert agent = self.get_agent(agent_id) if not agent: return None # Daten für die Aktualisierung vorbereiten agent_data = { "name": name, "type": agent_type, "workspace_id": workspace_id, "capabilities": capabilities if capabilities is not None else agent.get("capabilities"), "description": description if description is not None else agent.get("description") } # Agent aktualisieren updated_agent = self.db.record_modify("agents", agent_id, agent_data) return updated_agent def delete_agent(self, agent_id: int) -> bool: """ Löscht einen Agenten aus der Datenbank Args: agent_id: ID des zu löschenden Agenten Returns: True, wenn der Agent erfolgreich gelöscht wurde, sonst False """ return self.db.record_delete("agents", agent_id) # Datei-Methoden def get_all_files(self) -> List[Dict[str, Any]]: """Gibt alle Dateien des aktuellen Mandanten zurück""" return self.db.get_recordset("files") def get_file(self, file_id: int) -> Optional[Dict[str, Any]]: """Gibt eine Datei anhand ihrer ID zurück""" files = self.db.get_recordset("files", record_filter={"id": file_id}) if files: return files[0] return None def create_file(self, name: str, file_type: str, content_type: str = None, size: int = None, path: str = None) -> Dict[str, Any]: """Erstellt einen neuen Dateieintrag""" file_data = { "mandate_id": self.mandate_id, "user_id": self.user_id, "name": name, "type": file_type, "content_type": content_type, "size": size, "path": path, "upload_date": self._get_current_timestamp() } return self.db.record_create("files", file_data) def update_file(self, file_id: int, name: str = None, file_type: str = None, content_type: str = None, size: int = None, path: str = None) -> Dict[str, Any]: """ Aktualisiert eine vorhandene Datei Args: file_id: ID der zu aktualisierenden Datei name: Neuer Name der Datei file_type: Neuer Dateityp content_type: Neuer Content-Type size: Neue Dateigröße path: Neuer Dateipfad Returns: Das aktualisierte Datei-Objekt """ # Prüfen, ob die Datei existiert file = self.get_file(file_id) if not file: return None # Daten für die Aktualisierung vorbereiten file_data = {} if name is not None: file_data["name"] = name if file_type is not None: file_data["type"] = file_type if content_type is not None: file_data["content_type"] = content_type if size is not None: file_data["size"] = size if path is not None: file_data["path"] = path # Datei aktualisieren return self.db.record_modify("files", file_id, file_data) def delete_file(self, file_id: int) -> bool: """Löscht eine Datei aus der Datenbank""" return self.db.record_delete("files", file_id) # Prompt-Methoden def get_all_prompts(self) -> List[Dict[str, Any]]: """Gibt alle Prompts des aktuellen Mandanten zurück""" return self.db.get_recordset("prompts") def get_prompts_by_workspace(self, workspace_id: int) -> List[Dict[str, Any]]: """Gibt alle Prompts eines Workspaces zurück""" return self.db.get_recordset("prompts", record_filter={"workspace_id": workspace_id}) def get_prompt(self, prompt_id: int) -> Optional[Dict[str, Any]]: """Gibt einen Prompt anhand seiner ID zurück""" prompts = self.db.get_recordset("prompts", record_filter={"id": prompt_id}) if prompts: return prompts[0] return None def create_prompt(self, content: str, workspace_id: int) -> Dict[str, Any]: """Erstellt einen neuen Prompt""" prompt_data = { "mandate_id": self.mandate_id, "user_id": self.user_id, "content": content, "workspace_id": workspace_id, "created_at": self._get_current_timestamp() } return self.db.record_create("prompts", prompt_data) def update_prompt(self, prompt_id: int, content: str = None, workspace_id: int = None) -> Dict[str, Any]: """ Aktualisiert einen vorhandenen Prompt Args: prompt_id: ID des zu aktualisierenden Prompts content: Neuer Inhalt des Prompts workspace_id: Neue ID des Workspaces, zu dem der Prompt gehört Returns: Das aktualisierte Prompt-Objekt """ # Prüfen, ob der Prompt existiert prompt = self.get_prompt(prompt_id) if not prompt: return None # Daten für die Aktualisierung vorbereiten prompt_data = {} if content is not None: prompt_data["content"] = content if workspace_id is not None: prompt_data["workspace_id"] = workspace_id # Prompt aktualisieren return self.db.record_modify("prompts", prompt_id, prompt_data) def delete_prompt(self, prompt_id: int) -> bool: """ Löscht einen Prompt aus der Datenbank Args: prompt_id: ID des zu löschenden Prompts Returns: True, wenn der Prompt erfolgreich gelöscht wurde, sonst False """ return self.db.record_delete("prompts", prompt_id) # Singleton-Factory für LucyDOMInterface-Instanzen pro Kontext _lucydom_interfaces = {} def get_lucydom_interface(mandate_id: int = 0, user_id: int = 0) -> LucyDOMInterface: """ Gibt eine LucyDOMInterface-Instanz für den angegebenen Kontext zurück. Wiederverwendet bestehende Instanzen. """ context_key = f"{mandate_id}_{user_id}" if context_key not in _lucydom_interfaces: _lucydom_interfaces[context_key] = LucyDOMInterface(mandate_id, user_id) return _lucydom_interfaces[context_key] # Init get_lucydom_interface()