from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Body, Query, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, Response from fastapi.staticfiles import StaticFiles from fastapi.security import OAuth2PasswordRequestForm import uvicorn from typing import List, Dict, Any, Optional import uuid import os import json import asyncio from datetime import datetime import logging from database import get_db, Database from agent_service import AgentService, get_agent_service from datetime import timedelta from typing import Dict from models import ( Workspace, Agent, DataObject, Prompt, WorkflowRequest, WorkflowResponse, LogEntry, Result ) from auth import ( ACCESS_TOKEN_EXPIRE_MINUTES, authenticate_user, create_access_token, fake_users_db, get_current_user ) # Konfiguration des Loggers logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) app = FastAPI(title="PowerOn | Data Platform API", description="Backend-API für die Multi-Agent Platform von ValueOn AG") # CORS-Konfiguration für Frontend-Anfragen app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:8080"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Pfad zum Webparts-Verzeichnis WEBPARTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "webparts") # Verzeichnis für hochgeladene Dateien erstellen UPLOAD_DIR = os.path.join(os.getcwd(), "uploads") os.makedirs(UPLOAD_DIR, exist_ok=True) # Komponenten des Frontends app.mount("/static", StaticFiles(directory="static"), name="static") # API - ENDPUNKTE @app.get("/", tags=["General"]) async def root(): """API-Statusendpunkt""" return {"status": "online", "message": "Data Platform API ist aktiv"} # Test Element @app.get("/api/test", tags=["Test"]) async def get_test(): return "OK 1.0" # Token-Endpunkt @app.post("/token", response_model=Dict[str, str]) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Ungültige Zugangsdaten", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user["username"]}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} # Geschützter Endpunkt (Beispiel) @app.get("/api/users/me") async def read_users_me(current_user = Depends(get_current_user)): return current_user # Workspace-Endpunkte @app.get("/api/workspaces", tags=["Workspaces"], response_model=List[Workspace]) async def get_workspaces( current_user = Depends(get_current_user), # <-- Authentifizierung hinzugefügt db: Database = Depends(get_db) ): """Alle verfügbaren Workspaces abrufen""" return db.get_all_workspaces() @app.get("/api/workspaces/{workspace_id}", tags=["Workspaces"], response_model=Workspace) async def get_workspace(workspace_id: str, db: Database = Depends(get_db)): """Einen bestimmten Workspace mit allen Details abrufen""" workspace = db.get_workspace(workspace_id) if not workspace: raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden") return workspace @app.post("/api/workspaces", tags=["Workspaces"], response_model=Workspace) async def create_workspace(workspace: Dict[str, Any] = Body(...), db: Database = Depends(get_db)): """Einen neuen Workspace erstellen""" workspace_id = str(uuid.uuid4()) workspace_data = { "id": workspace_id, "name": workspace.get("name", "Neuer Workspace"), "created_at": datetime.now().isoformat(), "prompts": [], "agents": [], "dataObjectReferences": [] } new_workspace = db.create_workspace(workspace_data) return new_workspace # Agenten-Endpunkte @app.get("/api/agents", tags=["Agents"], response_model=List[Agent]) async def get_agents(workspace_id: Optional[str] = Query(None), db: Database = Depends(get_db)): """Alle Agenten oder Agenten eines bestimmten Workspaces abrufen""" if workspace_id: return db.get_agents_by_workspace(workspace_id) return db.get_all_agents() @app.post("/api/agents", tags=["Agents"], response_model=Agent) async def create_agent( agent: Dict[str, Any] = Body(...), db: Database = Depends(get_db) ): """Einen neuen Agenten erstellen""" agent_id = str(uuid.uuid4()) workspace_id = agent.get("workspace_id") if not workspace_id: raise HTTPException(status_code=400, detail="workspace_id ist erforderlich") # Workspace existiert? workspace = db.get_workspace(workspace_id) if not workspace: raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden") agent_data = { "id": agent_id, "name": agent.get("name", "Neuer Agent"), "type": agent.get("type", "generic"), "capabilities": agent.get("capabilities", []), "description": agent.get("description", "") } new_agent = db.create_agent(agent_data, workspace_id) return new_agent # Datei-Endpunkte @app.get("/api/files", tags=["Files"], response_model=List[DataObject]) async def get_files(db: Database = Depends(get_db)): """Alle verfügbaren Dateien abrufen""" return db.get_all_files() @app.post("/api/files/upload", tags=["Files"]) async def upload_file( file: UploadFile = File(...), db: Database = Depends(get_db) ): """Eine Datei hochladen""" try: file_id = str(uuid.uuid4()) file_ext = os.path.splitext(file.filename)[1] file_path = os.path.join(UPLOAD_DIR, f"{file_id}{file_ext}") # Datei speichern with open(file_path, "wb") as f: content = await file.read() f.write(content) # Dateityp bestimmen file_type = "image" if file.content_type and "image" in file.content_type else "document" # In Datenbank speichern file_data = { "id": file_id, "name": file.filename, "type": file_type, "path": file_path, "content_type": file.content_type, "size": os.path.getsize(file_path), "upload_date": datetime.now().isoformat() } new_file = db.create_file(file_data) return { "id": new_file["id"], "name": new_file["name"], "type": new_file["type"], "size": f"{new_file['size'] / (1024 * 1024):.1f} MB", "upload_date": new_file["upload_date"] } except Exception as e: logger.error(f"Fehler beim Hochladen der Datei: {e}") raise HTTPException(status_code=500, detail=f"Fehler beim Hochladen der Datei: {str(e)}") @app.delete("/api/files/{file_id}", tags=["Files"]) async def delete_file( file_id: str, current_user = Depends(get_current_user), db: Database = Depends(get_db) ): """Löscht eine Datei""" try: # Hole die Datei aus der Datenbank file = db.get_file(file_id) if not file: raise HTTPException(status_code=404, detail=f"Datei mit ID {file_id} nicht gefunden") # Prüfe, ob die physische Datei existiert if "path" in file and os.path.exists(file["path"]): try: # Versuche, die physische Datei zu löschen os.remove(file["path"]) except Exception as e: logger.warning(f"Konnte physische Datei nicht löschen: {e}") # Lösche die Datei aus der Datenbank success = db.delete_file(file_id) if not success: raise HTTPException(status_code=500, detail="Fehler beim Löschen der Datei aus der Datenbank") return {"success": True, "message": f"Datei '{file.get('name', 'unbekannt')}' wurde gelöscht"} except HTTPException: raise except Exception as e: logger.error(f"Fehler beim Löschen der Datei: {e}") raise HTTPException(status_code=500, detail=f"Fehler beim Löschen der Datei: {str(e)}") # Prompt-Endpunkte @app.get("/api/prompts", tags=["Prompts"], response_model=List[Prompt]) async def get_prompts(workspace_id: Optional[str] = Query(None), db: Database = Depends(get_db)): """Alle Prompts oder Prompts eines bestimmten Workspaces abrufen""" if workspace_id: return db.get_prompts_by_workspace(workspace_id) return db.get_all_prompts() @app.post("/api/prompts", tags=["Prompts"], response_model=Prompt) async def create_prompt( prompt: Dict[str, Any] = Body(...), db: Database = Depends(get_db) ): """Einen neuen Prompt erstellen""" prompt_id = str(uuid.uuid4()) workspace_id = prompt.get("workspace_id") if not workspace_id: raise HTTPException(status_code=400, detail="workspace_id ist erforderlich") # Workspace existiert? workspace = db.get_workspace(workspace_id) if not workspace: raise HTTPException(status_code=404, detail=f"Workspace mit ID {workspace_id} nicht gefunden") prompt_data = { "id": prompt_id, "content": prompt.get("content", ""), "created_at": datetime.now().isoformat() } new_prompt = db.create_prompt(prompt_data, workspace_id) return new_prompt # Workflow-Endpunkte @app.post("/api/workflow/run", tags=["Workflow"], response_model=WorkflowResponse) async def run_workflow( workflow_request: WorkflowRequest, db: Database = Depends(get_db), agent_service: AgentService = Depends(get_agent_service) ): """Führt einen Workflow mit den ausgewählten Agenten und Dateien aus""" workspace = db.get_workspace(workflow_request.workspace_id) if not workspace: raise HTTPException(status_code=404, detail=f"Workspace mit ID {workflow_request.workspace_id} nicht gefunden") # Prüfen, ob Dateien existieren files = [] for file_id in workflow_request.files: file = db.get_file(file_id) if not file: raise HTTPException(status_code=404, detail=f"Datei mit ID {file_id} nicht gefunden") files.append(file) # Prüfen, ob Agenten existieren agents = [] for agent_id in workflow_request.agents: agent = db.get_agent(agent_id) if not agent: raise HTTPException(status_code=404, detail=f"Agent mit ID {agent_id} nicht gefunden") agents.append(agent) # Workflow ID generieren workflow_id = str(uuid.uuid4()) # Workflow starten (asynchron) workflow_task = asyncio.create_task( agent_service.execute_workflow( workflow_id, workflow_request.prompt, agents, files ) ) # Sofort eine Antwort zurückgeben return WorkflowResponse( workflow_id=workflow_id, status="running", message="Workflow wurde gestartet" ) @app.get("/api/workflow/{workflow_id}/status", tags=["Workflow"]) async def get_workflow_status( workflow_id: str, agent_service: AgentService = Depends(get_agent_service) ): """Status eines laufenden Workflows abrufen""" status = agent_service.get_workflow_status(workflow_id) if not status: raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden") return status @app.get("/api/workflow/{workflow_id}/logs", tags=["Workflow"], response_model=List[LogEntry]) async def get_workflow_logs( workflow_id: str, agent_service: AgentService = Depends(get_agent_service) ): """Protokolle eines Workflows abrufen""" logs = agent_service.get_workflow_logs(workflow_id) if logs is None: raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden") return logs @app.get("/api/workflow/{workflow_id}/results", tags=["Workflow"], response_model=List[Result]) async def get_workflow_results( workflow_id: str, agent_service: AgentService = Depends(get_agent_service) ): """Ergebnisse eines Workflows abrufen""" results = agent_service.get_workflow_results(workflow_id) if results is None: raise HTTPException(status_code=404, detail=f"Workflow mit ID {workflow_id} nicht gefunden") return results if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)