gateway/modules/routes/routeDataAutomation.py

242 lines
8.4 KiB
Python

"""
Automation routes for the backend API.
Implements the endpoints for automation definition management.
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
import json
# Import interfaces and models
from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
from modules.security.auth import getCurrentUser, limiter
from modules.datamodels.datamodelChat import AutomationDefinition, ChatWorkflow
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
from modules.shared.attributeUtils import getModelAttributeDefinitions
# Configure logger
logger = logging.getLogger(__name__)
# Model attributes for AutomationDefinition
automationAttributes = getModelAttributeDefinitions(AutomationDefinition)
# Create router for automation endpoints
router = APIRouter(
prefix="/api/automations",
tags=["Manage Automations"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
@router.get("", response_model=PaginatedResponse[AutomationDefinition])
@limiter.limit("30/minute")
async def get_automations(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser = Depends(getCurrentUser)
) -> PaginatedResponse[AutomationDefinition]:
"""
Get automation definitions with optional pagination, sorting, and filtering.
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
chatInterface = getChatInterface(currentUser)
result = chatInterface.getAllAutomationDefinitions(pagination=paginationParams)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[Dict]
# Note: Using JSONResponse to bypass Pydantic validation which would filter out _createdBy
# The enriched fields (_createdByUserName, mandateName) are not in the Pydantic model
from fastapi.responses import JSONResponse
if paginationParams:
response_data = {
"items": result.items,
"pagination": {
"currentPage": paginationParams.page,
"pageSize": paginationParams.pageSize,
"totalItems": result.totalItems,
"totalPages": result.totalPages,
"sort": paginationParams.sort,
"filters": paginationParams.filters
}
}
else:
response_data = {
"items": result,
"pagination": None
}
return JSONResponse(content=response_data)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting automations: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error getting automations: {str(e)}"
)
@router.post("", response_model=AutomationDefinition)
@limiter.limit("10/minute")
async def create_automation(
request: Request,
automation: AutomationDefinition,
currentUser = Depends(getCurrentUser)
) -> AutomationDefinition:
"""Create a new automation definition"""
try:
chatInterface = getChatInterface(currentUser)
automationData = automation.model_dump()
created = chatInterface.createAutomationDefinition(automationData)
return AutomationDefinition(**created)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating automation: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error creating automation: {str(e)}"
)
@router.get("/{automationId}", response_model=AutomationDefinition)
@limiter.limit("30/minute")
async def get_automation(
request: Request,
automationId: str = Path(..., description="Automation ID"),
currentUser = Depends(getCurrentUser)
) -> AutomationDefinition:
"""Get a single automation definition by ID"""
try:
chatInterface = getChatInterface(currentUser)
automation = chatInterface.getAutomationDefinition(automationId)
if not automation:
raise HTTPException(
status_code=404,
detail=f"Automation {automationId} not found"
)
return AutomationDefinition(**automation)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting automation: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error getting automation: {str(e)}"
)
@router.put("/{automationId}", response_model=AutomationDefinition)
@limiter.limit("10/minute")
async def update_automation(
request: Request,
automationId: str = Path(..., description="Automation ID"),
automation: AutomationDefinition = Body(...),
currentUser = Depends(getCurrentUser)
) -> AutomationDefinition:
"""Update an automation definition"""
try:
chatInterface = getChatInterface(currentUser)
automationData = automation.model_dump()
updated = chatInterface.updateAutomationDefinition(automationId, automationData)
return AutomationDefinition(**updated)
except HTTPException:
raise
except PermissionError as e:
raise HTTPException(
status_code=403,
detail=str(e)
)
except Exception as e:
logger.error(f"Error updating automation: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error updating automation: {str(e)}"
)
@router.delete("/{automationId}")
@limiter.limit("10/minute")
async def delete_automation(
request: Request,
automationId: str = Path(..., description="Automation ID"),
currentUser = Depends(getCurrentUser)
) -> Response:
"""Delete an automation definition"""
try:
chatInterface = getChatInterface(currentUser)
success = chatInterface.deleteAutomationDefinition(automationId)
if success:
return Response(status_code=204)
else:
raise HTTPException(
status_code=500,
detail="Failed to delete automation"
)
except HTTPException:
raise
except PermissionError as e:
raise HTTPException(
status_code=403,
detail=str(e)
)
except Exception as e:
logger.error(f"Error deleting automation: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error deleting automation: {str(e)}"
)
@router.post("/{automationId}/execute", response_model=ChatWorkflow)
@limiter.limit("5/minute")
async def execute_automation(
request: Request,
automationId: str = Path(..., description="Automation ID"),
currentUser = Depends(getCurrentUser)
) -> ChatWorkflow:
"""Execute an automation immediately (test mode)"""
try:
chatInterface = getChatInterface(currentUser)
workflow = await chatInterface.executeAutomation(automationId)
return workflow
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=404,
detail=str(e)
)
except Exception as e:
logger.error(f"Error executing automation: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error executing automation: {str(e)}"
)
@router.get("/attributes", response_model=Dict[str, Any])
async def get_automation_attributes(
request: Request
) -> Dict[str, Any]:
"""Get attribute definitions for AutomationDefinition model"""
return {"attributes": automationAttributes}