158 lines
5.4 KiB
Python
158 lines
5.4 KiB
Python
"""
|
|
Admin automation events routes for the backend API.
|
|
Sysadmin-only endpoints for viewing and controlling automation events.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Path, Request, Response
|
|
from typing import List, Dict, Any
|
|
from fastapi import status
|
|
import logging
|
|
|
|
# Import interfaces and models
|
|
import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
|
|
from modules.auth import getCurrentUser, limiter
|
|
from modules.datamodels.datamodelUam import User
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create router for admin automation events endpoints
|
|
router = APIRouter(
|
|
prefix="/api/admin/automation-events",
|
|
tags=["Admin Automation Events"],
|
|
responses={
|
|
404: {"description": "Not found"},
|
|
400: {"description": "Bad request"},
|
|
401: {"description": "Unauthorized"},
|
|
403: {"description": "Forbidden - Sysadmin only"},
|
|
500: {"description": "Internal server error"}
|
|
}
|
|
)
|
|
|
|
def requireSysadmin(currentUser: User):
|
|
"""Require sysadmin role"""
|
|
if "sysadmin" not in (currentUser.roleLabels or []):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Sysadmin role required"
|
|
)
|
|
|
|
@router.get("")
|
|
@limiter.limit("30/minute")
|
|
async def get_all_automation_events(
|
|
request: Request,
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all automation events across all mandates (sysadmin only).
|
|
Returns list of all registered events with their automation IDs and schedules.
|
|
"""
|
|
requireSysadmin(currentUser)
|
|
|
|
try:
|
|
from modules.shared.eventManagement import eventManager
|
|
|
|
# Get all jobs from scheduler
|
|
jobs = []
|
|
if eventManager.scheduler:
|
|
for job in eventManager.scheduler.get_jobs():
|
|
if job.id.startswith("automation."):
|
|
automation_id = job.id.replace("automation.", "")
|
|
jobs.append({
|
|
"eventId": job.id,
|
|
"automationId": automation_id,
|
|
"nextRunTime": str(job.next_run_time) if job.next_run_time else None,
|
|
"trigger": str(job.trigger) if job.trigger else None
|
|
})
|
|
|
|
return jobs
|
|
except Exception as e:
|
|
logger.error(f"Error getting automation events: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error getting automation events: {str(e)}"
|
|
)
|
|
|
|
@router.post("/sync")
|
|
@limiter.limit("5/minute")
|
|
async def sync_all_automation_events(
|
|
request: Request,
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Manually trigger sync for all automations (sysadmin only).
|
|
This will register/remove events based on active flags.
|
|
"""
|
|
requireSysadmin(currentUser)
|
|
|
|
try:
|
|
from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
|
|
from modules.interfaces.interfaceDbAppObjects import getRootInterface
|
|
from modules.features.workflow import syncAutomationEvents
|
|
|
|
chatInterface = getChatInterface(currentUser)
|
|
# Get event user for sync operation (routes can import from interfaces)
|
|
rootInterface = getRootInterface()
|
|
eventUser = rootInterface.getUserByUsername("event")
|
|
if not eventUser:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Event user not available"
|
|
)
|
|
|
|
from modules.services import getInterface as getServices
|
|
services = getServices(currentUser, None)
|
|
result = await syncAutomationEvents(services, eventUser)
|
|
return {
|
|
"success": True,
|
|
"synced": result.get("synced", 0),
|
|
"events": result.get("events", {})
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error syncing automation events: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error syncing automation events: {str(e)}"
|
|
)
|
|
|
|
@router.post("/{eventId}/remove")
|
|
@limiter.limit("10/minute")
|
|
async def remove_event(
|
|
request: Request,
|
|
eventId: str = Path(..., description="Event ID to remove"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Manually remove a specific event from scheduler (sysadmin only).
|
|
Used for debugging and manual event cleanup.
|
|
"""
|
|
requireSysadmin(currentUser)
|
|
|
|
try:
|
|
from modules.shared.eventManagement import eventManager
|
|
|
|
# Remove event
|
|
eventManager.remove(eventId)
|
|
|
|
# Update automation's eventId if it exists
|
|
if eventId.startswith("automation."):
|
|
automation_id = eventId.replace("automation.", "")
|
|
chatInterface = interfaceDbChatObjects.getInterface(currentUser)
|
|
automation = chatInterface.getAutomationDefinition(automation_id)
|
|
if automation and automation.get("eventId") == eventId:
|
|
chatInterface.updateAutomationDefinition(automation_id, {"eventId": None})
|
|
|
|
return {
|
|
"success": True,
|
|
"eventId": eventId,
|
|
"message": f"Event {eventId} removed successfully"
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error removing event: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error removing event: {str(e)}"
|
|
)
|
|
|