540 lines
No EOL
18 KiB
Python
540 lines
No EOL
18 KiB
Python
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Body, Query, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse, Response
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
|
|
|
|
import uvicorn
|
|
from typing import List, Dict, Any, Optional
|
|
import uuid
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
# Import interfaces
|
|
from interface_gateway import get_gateway_interface
|
|
from interface_lucydom import get_lucydom_interface
|
|
from interface_agentservice import get_agent_service
|
|
|
|
# Import auth module
|
|
from auth import (
|
|
create_access_token,
|
|
get_current_user,
|
|
get_current_active_user,
|
|
get_user_context,
|
|
ACCESS_TOKEN_EXPIRE_MINUTES
|
|
)
|
|
|
|
# Import models (restructured)
|
|
from model_gateway import User, UserInDB, Token, Mandate
|
|
from model_lucydom import (
|
|
Workspace,
|
|
Agent,
|
|
DataObject,
|
|
Prompt,
|
|
WorkflowRequest,
|
|
WorkflowResponse,
|
|
LogEntry,
|
|
Result
|
|
)
|
|
|
|
|
|
# Konfiguration des Loggers
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[logging.StreamHandler()]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(title="PowerOn | Data Platform API", description="Backend-API für die Multi-Agent Platform von ValueOn AG")
|
|
|
|
# CORS-Konfiguration für Frontend-Anfragen
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["http://localhost:8080","https://poweron-gateway-fsahaxgbfee8djea.germanywestcentral-01.azurewebsites.net"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# API - ENDPUNKTE
|
|
|
|
# Statischer Folder für Frontend
|
|
os.makedirs(os.path.join(os.getcwd(), "static"), exist_ok=True)
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
# Generelle Elements
|
|
@app.get("/", tags=["General"])
|
|
async def root():
|
|
"""API-Statusendpunkt"""
|
|
return {"status": "online", "message": "Data Platform API ist aktiv"}
|
|
|
|
@app.get("/api/test", tags=["General"])
|
|
async def get_test():
|
|
return "OK 1.0"
|
|
|
|
# Token-Endpunkt für Login
|
|
@app.post("/token", response_model=Token)
|
|
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
|
|
# Gateway-Interface ohne Kontext initialisieren
|
|
gateway = get_gateway_interface()
|
|
|
|
# Benutzer authentifizieren
|
|
user = gateway.authenticate_user(form_data.username, form_data.password)
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Ungültiger Benutzername oder Passwort",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Token mit Mandanten-ID erstellen
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={
|
|
"sub": user["username"],
|
|
"mandate_id": user["mandate_id"]
|
|
},
|
|
expires_delta=access_token_expires
|
|
)
|
|
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
# Benutzerregistrierung
|
|
@app.post("/api/users/register", response_model=User)
|
|
async def register_user(user_data: dict = Body(...), current_user: Dict[str, Any] = Depends(get_current_active_user)):
|
|
"""Neuen Benutzer registrieren"""
|
|
# Nur Benutzer des gleichen Mandanten dürfen erstellt werden
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# Gateway-Interface mit Benutzerkontext initialisieren
|
|
gateway = get_gateway_interface(mandate_id, user_id)
|
|
|
|
if "username" not in user_data or "password" not in user_data:
|
|
raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich")
|
|
|
|
try:
|
|
new_user = gateway.create_user(
|
|
username=user_data["username"],
|
|
password=user_data["password"],
|
|
email=user_data.get("email"),
|
|
full_name=user_data.get("full_name"),
|
|
language=user_data.get("language", "de")
|
|
)
|
|
return new_user
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
# Benutzerinfo abrufen
|
|
@app.get("/api/users/me", response_model=Dict[str, Any])
|
|
async def read_users_me(current_user: Dict[str, Any] = Depends(get_current_active_user)):
|
|
return current_user
|
|
|
|
|
|
# Workspace-Endpunkte
|
|
@app.get("/api/workspaces", tags=["Workspaces"], response_model=List[Dict[str, Any]])
|
|
async def get_workspaces(current_user: Dict[str, Any] = Depends(get_current_active_user)):
|
|
"""Alle verfügbaren Workspaces abrufen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
return lucy_interface.get_all_workspaces()
|
|
|
|
|
|
@app.get("/api/workspaces/{workspace_id}", tags=["Workspaces"], response_model=Dict[str, Any])
|
|
async def get_workspace(
|
|
workspace_id: int,
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen bestimmten Workspace mit allen Details abrufen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
workspace = lucy_interface.get_workspace(workspace_id)
|
|
if not workspace:
|
|
raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden")
|
|
|
|
return workspace
|
|
|
|
|
|
@app.post("/api/workspaces", tags=["Workspaces"], response_model=Dict[str, Any])
|
|
async def create_workspace(
|
|
workspace: Dict[str, Any] = Body(...),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen neuen Workspace erstellen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
new_workspace = lucy_interface.create_workspace(name=workspace.get("name", "Neuer Workspace"))
|
|
return new_workspace
|
|
|
|
|
|
# Agenten-Endpunkte
|
|
@app.get("/api/agents", tags=["Agents"], response_model=List[Dict[str, Any]])
|
|
async def get_agents(
|
|
workspace_id: Optional[int] = Query(None),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Alle Agenten oder Agenten eines bestimmten Workspaces abrufen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
if workspace_id:
|
|
return lucy_interface.get_agents_by_workspace(workspace_id)
|
|
|
|
return lucy_interface.get_all_agents()
|
|
|
|
|
|
@app.post("/api/agents", tags=["Agents"], response_model=Dict[str, Any])
|
|
async def create_agent(
|
|
agent: Dict[str, Any] = Body(...),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen neuen Agenten erstellen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
workspace_id = agent.get("workspace_id")
|
|
|
|
if not workspace_id:
|
|
raise HTTPException(status_code=400, detail="workspace_id ist erforderlich")
|
|
|
|
# Workspace existiert?
|
|
workspace = lucy_interface.get_workspace(workspace_id)
|
|
if not workspace:
|
|
raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden")
|
|
|
|
new_agent = lucy_interface.create_agent(
|
|
name=agent.get("name", "Neuer Agent"),
|
|
agent_type=agent.get("type", "generic"),
|
|
workspace_id=workspace_id,
|
|
capabilities=agent.get("capabilities", []),
|
|
description=agent.get("description", "")
|
|
)
|
|
|
|
return new_agent
|
|
|
|
|
|
# Datei-Endpunkte
|
|
@app.get("/api/files", tags=["Files"], response_model=List[Dict[str, Any]])
|
|
async def get_files(current_user: Dict[str, Any] = Depends(get_current_active_user)):
|
|
"""Alle verfügbaren Dateien abrufen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
return lucy_interface.get_all_files()
|
|
|
|
|
|
@app.post("/api/files/upload", tags=["Files"])
|
|
async def upload_file(
|
|
file: UploadFile = File(...),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Eine Datei hochladen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
try:
|
|
# Generiere eine eindeutige ID für die Datei
|
|
file_id = str(uuid.uuid4())
|
|
file_ext = os.path.splitext(file.filename)[1]
|
|
file_path = os.path.join(get_agent_service(mandate_id, user_id).upload_dir, f"{file_id}{file_ext}")
|
|
|
|
# Datei speichern
|
|
with open(file_path, "wb") as f:
|
|
content = await file.read()
|
|
f.write(content)
|
|
|
|
# Dateityp bestimmen
|
|
file_type = "image" if file.content_type and "image" in file.content_type else "document"
|
|
|
|
# In Datenbank speichern
|
|
new_file = lucy_interface.create_file(
|
|
name=file.filename,
|
|
file_type=file_type,
|
|
content_type=file.content_type,
|
|
size=os.path.getsize(file_path),
|
|
path=file_path
|
|
)
|
|
|
|
return {
|
|
"id": new_file["id"],
|
|
"name": new_file["name"],
|
|
"type": new_file["type"],
|
|
"size": f"{new_file['size'] / (1024 * 1024):.1f} MB",
|
|
"upload_date": new_file.get("upload_date")
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Hochladen der Datei: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Fehler beim Hochladen der Datei: {str(e)}")
|
|
|
|
|
|
@app.delete("/api/files/{file_id}", tags=["Files"])
|
|
async def delete_file(
|
|
file_id: int,
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Löscht eine Datei"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
try:
|
|
# Hole die Datei aus der Datenbank
|
|
file = lucy_interface.get_file(file_id)
|
|
if not file:
|
|
raise HTTPException(status_code=404, detail=f"Datei mit ID {file_id} nicht gefunden")
|
|
|
|
# Prüfe, ob die physische Datei existiert
|
|
if "path" in file and os.path.exists(file["path"]):
|
|
try:
|
|
# Versuche, die physische Datei zu löschen
|
|
os.remove(file["path"])
|
|
except Exception as e:
|
|
logger.warning(f"Konnte physische Datei nicht löschen: {e}")
|
|
|
|
# Lösche die Datei aus der Datenbank
|
|
success = lucy_interface.delete_file(file_id)
|
|
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail="Fehler beim Löschen der Datei aus der Datenbank")
|
|
|
|
return {"success": True, "message": f"Datei '{file.get('name', 'unbekannt')}' wurde gelöscht"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Löschen der Datei: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Fehler beim Löschen der Datei: {str(e)}")
|
|
|
|
|
|
# Prompt-Endpunkte
|
|
@app.get("/api/prompts", tags=["Prompts"], response_model=List[Dict[str, Any]])
|
|
async def get_prompts(
|
|
workspace_id: Optional[int] = Query(None),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Alle Prompts oder Prompts eines bestimmten Workspaces abrufen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
if workspace_id:
|
|
return lucy_interface.get_prompts_by_workspace(workspace_id)
|
|
|
|
return lucy_interface.get_all_prompts()
|
|
|
|
|
|
@app.post("/api/prompts", tags=["Prompts"], response_model=Dict[str, Any])
|
|
async def create_prompt(
|
|
prompt: Dict[str, Any] = Body(...),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen neuen Prompt erstellen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
workspace_id = prompt.get("workspace_id")
|
|
|
|
if not workspace_id:
|
|
raise HTTPException(status_code=400, detail="workspace_id ist erforderlich")
|
|
|
|
# Workspace existiert?
|
|
workspace = lucy_interface.get_workspace(workspace_id)
|
|
if not workspace:
|
|
raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden")
|
|
|
|
new_prompt = lucy_interface.create_prompt(
|
|
content=prompt.get("content", ""),
|
|
workspace_id=workspace_id
|
|
)
|
|
|
|
return new_prompt
|
|
|
|
|
|
# Workflow-Endpunkte
|
|
@app.post("/api/workflow/run", tags=["Workflow"], response_model=Dict[str, Any])
|
|
async def run_workflow(
|
|
workflow_request: WorkflowRequest = Body(...),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Führt einen Workflow mit den ausgewählten Agenten und Dateien aus"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# LucyDOM-Interface mit Benutzerkontext initialisieren
|
|
lucy_interface = get_lucydom_interface(mandate_id, user_id)
|
|
|
|
# AgentService mit Benutzerkontext initialisieren
|
|
agent_service = get_agent_service(mandate_id, user_id)
|
|
|
|
workspace = lucy_interface.get_workspace(workflow_request.workspace_id)
|
|
if not workspace:
|
|
raise HTTPException(status_code=404, detail=f"Workspace mit ID {workflow_request.workspace_id} nicht gefunden")
|
|
|
|
# Prüfen, ob Dateien existieren
|
|
files = []
|
|
for file_id in workflow_request.files:
|
|
file = lucy_interface.get_file(file_id)
|
|
if not file:
|
|
raise HTTPException(status_code=404, detail=f"Datei mit ID {file_id} nicht gefunden")
|
|
files.append(file)
|
|
|
|
# Prüfen, ob Agenten existieren
|
|
agents = []
|
|
for agent_id in workflow_request.agents:
|
|
agent = lucy_interface.get_agent(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail=f"Agent mit ID {agent_id} nicht gefunden")
|
|
agents.append(agent)
|
|
|
|
# Workflow ID generieren
|
|
workflow_id = str(uuid.uuid4())
|
|
|
|
# Workflow starten (asynchron)
|
|
workflow_task = asyncio.create_task(
|
|
agent_service.execute_workflow(
|
|
workflow_id,
|
|
workflow_request.prompt,
|
|
agents,
|
|
files
|
|
)
|
|
)
|
|
|
|
# Sofort eine Antwort zurückgeben
|
|
return {
|
|
"workflow_id": workflow_id,
|
|
"status": "running",
|
|
"message": "Workflow wurde gestartet"
|
|
}
|
|
|
|
|
|
@app.get("/api/workflow/{workflow_id}/status", tags=["Workflow"])
|
|
async def get_workflow_status(
|
|
workflow_id: str,
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Status eines laufenden Workflows abrufen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# AgentService mit Benutzerkontext initialisieren
|
|
agent_service = get_agent_service(mandate_id, user_id)
|
|
|
|
status = agent_service.get_workflow_status(workflow_id)
|
|
if not status:
|
|
raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden")
|
|
|
|
return status
|
|
|
|
|
|
@app.get("/api/workflow/{workflow_id}/logs", tags=["Workflow"], response_model=List[Dict[str, Any]])
|
|
async def get_workflow_logs(
|
|
workflow_id: str,
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Protokolle eines Workflows abrufen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# AgentService mit Benutzerkontext initialisieren
|
|
agent_service = get_agent_service(mandate_id, user_id)
|
|
|
|
logs = agent_service.get_workflow_logs(workflow_id)
|
|
if logs is None:
|
|
raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden")
|
|
|
|
return logs
|
|
|
|
|
|
@app.get("/api/workflow/{workflow_id}/results", tags=["Workflow"], response_model=List[Dict[str, Any]])
|
|
async def get_workflow_results(
|
|
workflow_id: str,
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Ergebnisse eines Workflows abrufen"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# AgentService mit Benutzerkontext initialisieren
|
|
agent_service = get_agent_service(mandate_id, user_id)
|
|
|
|
results = agent_service.get_workflow_results(workflow_id)
|
|
if results is None:
|
|
raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden")
|
|
|
|
return results
|
|
|
|
|
|
# Mandanten-Endpunkte (neu)
|
|
@app.get("/api/mandates", tags=["Mandates"], response_model=List[Dict[str, Any]])
|
|
async def get_mandates(current_user: Dict[str, Any] = Depends(get_current_active_user)):
|
|
"""Alle verfügbaren Mandanten abrufen (nur für Admin-Benutzer)"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# Gateway-Interface mit Benutzerkontext initialisieren
|
|
gateway = get_gateway_interface(mandate_id, user_id)
|
|
|
|
# TODO: Hier sollte eine Berechtigungsprüfung erfolgen
|
|
|
|
return gateway.get_all_mandates()
|
|
|
|
|
|
@app.post("/api/mandates", tags=["Mandates"], response_model=Dict[str, Any])
|
|
async def create_mandate(
|
|
mandate: Dict[str, Any] = Body(...),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen neuen Mandanten erstellen (nur für Admin-Benutzer)"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
|
|
# Gateway-Interface mit Benutzerkontext initialisieren
|
|
gateway = get_gateway_interface(mandate_id, user_id)
|
|
|
|
# TODO: Hier sollte eine Berechtigungsprüfung erfolgen
|
|
|
|
new_mandate = gateway.create_mandate(
|
|
name=mandate.get("name", "Neuer Mandant"),
|
|
language=mandate.get("language", "de")
|
|
)
|
|
|
|
return new_mandate
|
|
|
|
|
|
# Event handler beim Herunterfahren
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Führt Aufräumarbeiten beim Herunterfahren der Anwendung durch"""
|
|
# Holen des AgentService ohne Kontext (für Aufräumarbeiten)
|
|
agent_service = get_agent_service()
|
|
|
|
# HTTP-Client des AgentService schließen
|
|
await agent_service.close()
|
|
|
|
logger.info("Anwendung wurde heruntergefahren")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) |