gateway/gwserver/routes/agent.py
2025-03-16 23:50:27 +01:00

91 lines
No EOL
2.9 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Body, Query, Path
from typing import List, Dict, Any, Optional
from fastapi import status
# Import auth module
from auth import get_current_active_user, get_user_context
# Import interfaces
from modules.lucydom_interface import get_lucydom_interface
# Router für Agenten-Endpunkte erstellen
router = APIRouter(
prefix="/api/agents",
tags=["Agents"],
responses={404: {"description": "Not found"}}
)
@router.get("", response_model=List[Dict[str, Any]])
async def get_agents(
workspace_id: Optional[int] = Query(None),
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Alle Agenten oder Agenten eines bestimmten Workspaces abrufen"""
mandate_id, user_id = await get_user_context(current_user)
# LucyDOM-Interface mit Benutzerkontext initialisieren
lucy_interface = get_lucydom_interface(mandate_id, user_id)
if workspace_id:
return lucy_interface.get_agents_by_workspace(workspace_id)
return lucy_interface.get_all_agents()
@router.post("", response_model=Dict[str, Any])
async def create_agent(
agent: Dict[str, Any] = Body(...),
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Einen neuen Agenten erstellen"""
mandate_id, user_id = await get_user_context(current_user)
# LucyDOM-Interface mit Benutzerkontext initialisieren
lucy_interface = get_lucydom_interface(mandate_id, user_id)
workspace_id = agent.get("workspace_id")
if not workspace_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="workspace_id ist erforderlich"
)
# Workspace existiert?
workspace = lucy_interface.get_workspace(workspace_id)
if not workspace:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workspace mit ID {workspace_id} nicht gefunden"
)
new_agent = lucy_interface.create_agent(
name=agent.get("name", "Neuer Agent"),
agent_type=agent.get("type", "generic"),
workspace_id=workspace_id,
capabilities=agent.get("capabilities", []),
description=agent.get("description", "")
)
return new_agent
@router.get("/{agent_id}", response_model=Dict[str, Any])
async def get_agent(
agent_id: int,
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Einen bestimmten Agenten abrufen"""
mandate_id, user_id = await get_user_context(current_user)
# LucyDOM-Interface mit Benutzerkontext initialisieren
lucy_interface = get_lucydom_interface(mandate_id, user_id)
agent = lucy_interface.get_agent(agent_id)
if not agent:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Agent mit ID {agent_id} nicht gefunden"
)
return agent