gateway/backend/app.py
2025-03-15 19:53:40 +01:00

312 lines
No EOL
10 KiB
Python

import uvicorn
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Body, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, Response
from fastapi.staticfiles import StaticFiles
from typing import List, Dict, Any, Optional
import uuid
import os
import json
import asyncio
from datetime import datetime
import logging
from models import (
Workspace,
Agent,
DataObject,
Prompt,
WorkflowRequest,
WorkflowResponse,
LogEntry,
Result
)
from database import get_db, Database
from agent_service import AgentService, get_agent_service
# Konfiguration des Loggers
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
app = FastAPI(title="Data Platform API", description="Backend-API für die Multi-Agent Datenplattform")
# CORS-Konfiguration für Frontend-Anfragen
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In Produktion einschränken
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pfad zum Webparts-Verzeichnis
WEBPARTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "webparts")
# Verzeichnis für hochgeladene Dateien erstellen
UPLOAD_DIR = os.path.join(os.getcwd(), "uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
# Komponenten des Frontends
app.mount("/webparts", StaticFiles(directory="webparts"), name="webparts")
@app.get("/", tags=["General"])
async def root():
"""API-Statusendpunkt"""
return {"status": "online", "message": "Data Platform API ist aktiv"}
# Test Element
@app.get("/api/test", tags=["Test"])
async def get_test():
return "OK 1.0"
# Workspace-Endpunkte
@app.get("/api/workspaces", tags=["Workspaces"], response_model=List[Workspace])
async def get_workspaces(db: Database = Depends(get_db)):
"""Alle verfügbaren Workspaces abrufen"""
return db.get_all_workspaces()
@app.get("/api/workspaces/{workspace_id}", tags=["Workspaces"], response_model=Workspace)
async def get_workspace(workspace_id: str, db: Database = Depends(get_db)):
"""Einen bestimmten Workspace mit allen Details abrufen"""
workspace = db.get_workspace(workspace_id)
if not workspace:
raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden")
return workspace
@app.post("/api/workspaces", tags=["Workspaces"], response_model=Workspace)
async def create_workspace(workspace: Dict[str, Any] = Body(...), db: Database = Depends(get_db)):
"""Einen neuen Workspace erstellen"""
workspace_id = str(uuid.uuid4())
workspace_data = {
"id": workspace_id,
"name": workspace.get("name", "Neuer Workspace"),
"created_at": datetime.now().isoformat(),
"prompts": [],
"agents": [],
"dataObjectReferences": []
}
new_workspace = db.create_workspace(workspace_data)
return new_workspace
# Agenten-Endpunkte
@app.get("/api/agents", tags=["Agents"], response_model=List[Agent])
async def get_agents(workspace_id: Optional[str] = Query(None), db: Database = Depends(get_db)):
"""Alle Agenten oder Agenten eines bestimmten Workspaces abrufen"""
if workspace_id:
return db.get_agents_by_workspace(workspace_id)
return db.get_all_agents()
@app.post("/api/agents", tags=["Agents"], response_model=Agent)
async def create_agent(
agent: Dict[str, Any] = Body(...),
db: Database = Depends(get_db)
):
"""Einen neuen Agenten erstellen"""
agent_id = str(uuid.uuid4())
workspace_id = agent.get("workspace_id")
if not workspace_id:
raise HTTPException(status_code=400, detail="workspace_id ist erforderlich")
# Workspace existiert?
workspace = db.get_workspace(workspace_id)
if not workspace:
raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden")
agent_data = {
"id": agent_id,
"name": agent.get("name", "Neuer Agent"),
"type": agent.get("type", "generic"),
"capabilities": agent.get("capabilities", []),
"description": agent.get("description", "")
}
new_agent = db.create_agent(agent_data, workspace_id)
return new_agent
# Datei-Endpunkte
@app.get("/api/files", tags=["Files"], response_model=List[DataObject])
async def get_files(db: Database = Depends(get_db)):
"""Alle verfügbaren Dateien abrufen"""
return db.get_all_files()
@app.post("/api/files/upload", tags=["Files"])
async def upload_file(
file: UploadFile = File(...),
db: Database = Depends(get_db)
):
"""Eine Datei hochladen"""
try:
file_id = str(uuid.uuid4())
file_ext = os.path.splitext(file.filename)[1]
file_path = os.path.join(UPLOAD_DIR, f"{file_id}{file_ext}")
# Datei speichern
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
# Dateityp bestimmen
file_type = "image" if file.content_type and "image" in file.content_type else "document"
# In Datenbank speichern
file_data = {
"id": file_id,
"name": file.filename,
"type": file_type,
"path": file_path,
"content_type": file.content_type,
"size": os.path.getsize(file_path),
"upload_date": datetime.now().isoformat()
}
new_file = db.create_file(file_data)
return {
"id": new_file["id"],
"name": new_file["name"],
"type": new_file["type"],
"size": f"{new_file['size'] / (1024 * 1024):.1f} MB",
"upload_date": new_file["upload_date"]
}
except Exception as e:
logger.error(f"Fehler beim Hochladen der Datei: {e}")
raise HTTPException(status_code=500, detail=f"Fehler beim Hochladen der Datei: {str(e)}")
# Prompt-Endpunkte
@app.get("/api/prompts", tags=["Prompts"], response_model=List[Prompt])
async def get_prompts(workspace_id: Optional[str] = Query(None), db: Database = Depends(get_db)):
"""Alle Prompts oder Prompts eines bestimmten Workspaces abrufen"""
if workspace_id:
return db.get_prompts_by_workspace(workspace_id)
return db.get_all_prompts()
@app.post("/api/prompts", tags=["Prompts"], response_model=Prompt)
async def create_prompt(
prompt: Dict[str, Any] = Body(...),
db: Database = Depends(get_db)
):
"""Einen neuen Prompt erstellen"""
prompt_id = str(uuid.uuid4())
workspace_id = prompt.get("workspace_id")
if not workspace_id:
raise HTTPException(status_code=400, detail="workspace_id ist erforderlich")
# Workspace existiert?
workspace = db.get_workspace(workspace_id)
if not workspace:
raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden")
prompt_data = {
"id": prompt_id,
"content": prompt.get("content", ""),
"created_at": datetime.now().isoformat()
}
new_prompt = db.create_prompt(prompt_data, workspace_id)
return new_prompt
# Workflow-Endpunkte
@app.post("/api/workflow/run", tags=["Workflow"], response_model=WorkflowResponse)
async def run_workflow(
workflow_request: WorkflowRequest,
db: Database = Depends(get_db),
agent_service: AgentService = Depends(get_agent_service)
):
"""Führt einen Workflow mit den ausgewählten Agenten und Dateien aus"""
workspace = db.get_workspace(workflow_request.workspace_id)
if not workspace:
raise HTTPException(status_code=404, detail=f"Workspace mit ID {workflow_request.workspace_id} nicht gefunden")
# Prüfen, ob Dateien existieren
files = []
for file_id in workflow_request.files:
file = db.get_file(file_id)
if not file:
raise HTTPException(status_code=404, detail=f"Datei mit ID {file_id} nicht gefunden")
files.append(file)
# Prüfen, ob Agenten existieren
agents = []
for agent_id in workflow_request.agents:
agent = db.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent mit ID {agent_id} nicht gefunden")
agents.append(agent)
# Workflow ID generieren
workflow_id = str(uuid.uuid4())
# Workflow starten (asynchron)
workflow_task = asyncio.create_task(
agent_service.execute_workflow(
workflow_id,
workflow_request.prompt,
agents,
files
)
)
# Sofort eine Antwort zurückgeben
return WorkflowResponse(
workflow_id=workflow_id,
status="running",
message="Workflow wurde gestartet"
)
@app.get("/api/workflow/{workflow_id}/status", tags=["Workflow"])
async def get_workflow_status(
workflow_id: str,
agent_service: AgentService = Depends(get_agent_service)
):
"""Status eines laufenden Workflows abrufen"""
status = agent_service.get_workflow_status(workflow_id)
if not status:
raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden")
return status
@app.get("/api/workflow/{workflow_id}/logs", tags=["Workflow"], response_model=List[LogEntry])
async def get_workflow_logs(
workflow_id: str,
agent_service: AgentService = Depends(get_agent_service)
):
"""Protokolle eines Workflows abrufen"""
logs = agent_service.get_workflow_logs(workflow_id)
if logs is None:
raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden")
return logs
@app.get("/api/workflow/{workflow_id}/results", tags=["Workflow"], response_model=List[Result])
async def get_workflow_results(
workflow_id: str,
agent_service: AgentService = Depends(get_agent_service)
):
"""Ergebnisse eines Workflows abrufen"""
results = agent_service.get_workflow_results(workflow_id)
if results is None:
raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden")
return results
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)