237 lines
No EOL
8.3 KiB
Python
237 lines
No EOL
8.3 KiB
Python
import os
|
|
import logging
|
|
from typing import Dict, Any, List, Optional, Union
|
|
import importlib
|
|
from connector_db_json import JSONDatabaseConnector
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LucyDOMInterface:
|
|
"""
|
|
Interface zur LucyDOM-Datenbank.
|
|
Verwendet den JSON-Konnektor für den Datenzugriff.
|
|
"""
|
|
|
|
def __init__(self, mandate_id: int, user_id: int):
|
|
"""
|
|
Initialisiert das LucyDOM-Interface mit Mandanten- und Benutzerkontext.
|
|
|
|
Args:
|
|
mandate_id: ID des aktuellen Mandanten
|
|
user_id: ID des aktuellen Benutzers
|
|
"""
|
|
self.mandate_id = mandate_id
|
|
self.user_id = user_id
|
|
|
|
# Datenverzeichnis
|
|
self.data_folder = "data_lucydom"
|
|
os.makedirs(self.data_folder, exist_ok=True)
|
|
|
|
# Datenmodell-Modul importieren
|
|
try:
|
|
self.model_module = importlib.import_module("model_lucydom")
|
|
logger.info("model_lucydom erfolgreich importiert")
|
|
except ImportError as e:
|
|
logger.error(f"Fehler beim Importieren von model_lucydom: {e}")
|
|
raise
|
|
|
|
# Konnektor erstellen
|
|
self.db = JSONDatabaseConnector(
|
|
db_folder=self.data_folder,
|
|
mandate_id=mandate_id,
|
|
user_id=user_id
|
|
)
|
|
|
|
# Datenbank initialisieren, falls nötig
|
|
self._initialize_database()
|
|
|
|
def _initialize_database(self):
|
|
"""
|
|
Initialisiert die Datenbank mit minimalen Objekten,
|
|
falls sie noch nicht existiert.
|
|
"""
|
|
# Prüfe, ob die Tabelle "workspaces" existiert
|
|
workspaces = self.db.get_recordset("workspaces")
|
|
|
|
# Wenn keine Workspaces existieren, erstelle den Default Workspace
|
|
if not workspaces:
|
|
logger.info("Erstelle Default Workspace")
|
|
|
|
# Erstelle den Default Workspace
|
|
default_workspace = {
|
|
"id": 1,
|
|
"mandate_id": self.mandate_id,
|
|
"user_id": self.user_id,
|
|
"name": "Default Workspace",
|
|
"created_at": self._get_current_timestamp()
|
|
}
|
|
|
|
self.db.record_create("workspaces", default_workspace)
|
|
logger.info("Default Workspace wurde erstellt")
|
|
|
|
def _get_current_timestamp(self) -> str:
|
|
"""Gibt den aktuellen Zeitstempel im ISO-Format zurück"""
|
|
from datetime import datetime
|
|
return datetime.now().isoformat()
|
|
|
|
# Workspace-Methoden
|
|
|
|
def get_all_workspaces(self) -> List[Dict[str, Any]]:
|
|
"""Gibt alle Workspaces des aktuellen Mandanten zurück"""
|
|
return self.db.get_recordset("workspaces")
|
|
|
|
def get_workspace(self, workspace_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Gibt einen Workspace anhand seiner ID zurück"""
|
|
workspaces = self.db.get_recordset("workspaces", record_filter={"id": workspace_id})
|
|
if workspaces:
|
|
return workspaces[0]
|
|
return None
|
|
|
|
def create_workspace(self, name: str) -> Dict[str, Any]:
|
|
"""Erstellt einen neuen Workspace"""
|
|
# Bestimme die nächste ID
|
|
workspaces = self.db.get_recordset("workspaces")
|
|
next_id = 1
|
|
if workspaces:
|
|
next_id = max(workspace["id"] for workspace in workspaces) + 1
|
|
|
|
workspace_data = {
|
|
"id": next_id,
|
|
"mandate_id": self.mandate_id,
|
|
"user_id": self.user_id,
|
|
"name": name,
|
|
"created_at": self._get_current_timestamp()
|
|
}
|
|
|
|
return self.db.record_create("workspaces", workspace_data)
|
|
|
|
# Agent-Methoden
|
|
|
|
def get_all_agents(self) -> List[Dict[str, Any]]:
|
|
"""Gibt alle Agenten des aktuellen Mandanten zurück"""
|
|
return self.db.get_recordset("agents")
|
|
|
|
def get_agents_by_workspace(self, workspace_id: int) -> List[Dict[str, Any]]:
|
|
"""Gibt alle Agenten eines Workspaces zurück"""
|
|
return self.db.get_recordset("agents", record_filter={"workspace_id": workspace_id})
|
|
|
|
def get_agent(self, agent_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Gibt einen Agenten anhand seiner ID zurück"""
|
|
agents = self.db.get_recordset("agents", record_filter={"id": agent_id})
|
|
if agents:
|
|
return agents[0]
|
|
return None
|
|
|
|
def create_agent(self, name: str, agent_type: str, workspace_id: int,
|
|
capabilities: List[str] = None, description: str = None) -> Dict[str, Any]:
|
|
"""Erstellt einen neuen Agenten"""
|
|
# Bestimme die nächste ID
|
|
agents = self.db.get_recordset("agents")
|
|
next_id = 1
|
|
if agents:
|
|
next_id = max(agent["id"] for agent in agents) + 1
|
|
|
|
agent_data = {
|
|
"id": next_id,
|
|
"mandate_id": self.mandate_id,
|
|
"user_id": self.user_id,
|
|
"name": name,
|
|
"type": agent_type,
|
|
"workspace_id": workspace_id,
|
|
"capabilities": capabilities or [],
|
|
"description": description
|
|
}
|
|
|
|
return self.db.record_create("agents", agent_data)
|
|
|
|
# Datei-Methoden
|
|
|
|
def get_all_files(self) -> List[Dict[str, Any]]:
|
|
"""Gibt alle Dateien des aktuellen Mandanten zurück"""
|
|
return self.db.get_recordset("files")
|
|
|
|
def get_file(self, file_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Gibt eine Datei anhand ihrer ID zurück"""
|
|
files = self.db.get_recordset("files", record_filter={"id": file_id})
|
|
if files:
|
|
return files[0]
|
|
return None
|
|
|
|
def create_file(self, name: str, file_type: str, content_type: str = None,
|
|
size: int = None, path: str = None) -> Dict[str, Any]:
|
|
"""Erstellt einen neuen Dateieintrag"""
|
|
# Bestimme die nächste ID
|
|
files = self.db.get_recordset("files")
|
|
next_id = 1
|
|
if files:
|
|
next_id = max(file["id"] for file in files) + 1
|
|
|
|
file_data = {
|
|
"id": next_id,
|
|
"mandate_id": self.mandate_id,
|
|
"user_id": self.user_id,
|
|
"name": name,
|
|
"type": file_type,
|
|
"content_type": content_type,
|
|
"size": size,
|
|
"path": path,
|
|
"upload_date": self._get_current_timestamp()
|
|
}
|
|
|
|
return self.db.record_create("files", file_data)
|
|
|
|
def delete_file(self, file_id: int) -> bool:
|
|
"""Löscht eine Datei aus der Datenbank"""
|
|
return self.db.record_delete("files", file_id)
|
|
|
|
# Prompt-Methoden
|
|
|
|
def get_all_prompts(self) -> List[Dict[str, Any]]:
|
|
"""Gibt alle Prompts des aktuellen Mandanten zurück"""
|
|
return self.db.get_recordset("prompts")
|
|
|
|
def get_prompts_by_workspace(self, workspace_id: int) -> List[Dict[str, Any]]:
|
|
"""Gibt alle Prompts eines Workspaces zurück"""
|
|
return self.db.get_recordset("prompts", record_filter={"workspace_id": workspace_id})
|
|
|
|
def get_prompt(self, prompt_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Gibt einen Prompt anhand seiner ID zurück"""
|
|
prompts = self.db.get_recordset("prompts", record_filter={"id": prompt_id})
|
|
if prompts:
|
|
return prompts[0]
|
|
return None
|
|
|
|
def create_prompt(self, content: str, workspace_id: int) -> Dict[str, Any]:
|
|
"""Erstellt einen neuen Prompt"""
|
|
# Bestimme die nächste ID
|
|
prompts = self.db.get_recordset("prompts")
|
|
next_id = 1
|
|
if prompts:
|
|
next_id = max(prompt["id"] for prompt in prompts) + 1
|
|
|
|
prompt_data = {
|
|
"id": next_id,
|
|
"mandate_id": self.mandate_id,
|
|
"user_id": self.user_id,
|
|
"content": content,
|
|
"workspace_id": workspace_id,
|
|
"created_at": self._get_current_timestamp()
|
|
}
|
|
|
|
return self.db.record_create("prompts", prompt_data)
|
|
|
|
|
|
# Singleton-Factory für LucyDOMInterface-Instanzen pro Kontext
|
|
_lucydom_interfaces = {}
|
|
|
|
def get_lucydom_interface(mandate_id: int, user_id: int) -> LucyDOMInterface:
|
|
"""
|
|
Gibt eine LucyDOMInterface-Instanz für den angegebenen Kontext zurück.
|
|
Wiederverwendet bestehende Instanzen.
|
|
"""
|
|
context_key = f"{mandate_id}_{user_id}"
|
|
if context_key not in _lucydom_interfaces:
|
|
_lucydom_interfaces[context_key] = LucyDOMInterface(mandate_id, user_id)
|
|
return _lucydom_interfaces[context_key] |