227 lines
9.6 KiB
Python
227 lines
9.6 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Admin automation events routes for the backend API.
|
|
Sysadmin-only endpoints for viewing and controlling scheduler events.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Path, Request, Response
|
|
from typing import List, Dict, Any
|
|
from fastapi import status
|
|
import logging
|
|
|
|
# Import interfaces and models from feature containers
|
|
import modules.features.automation.interfaceFeatureAutomation as interfaceAutomation
|
|
from modules.auth import limiter, getRequestContext, requireSysAdminRole, RequestContext
|
|
from modules.datamodels.datamodelUam import User
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create router for admin automation events endpoints
|
|
router = APIRouter(
|
|
prefix="/api/admin/automation-events",
|
|
tags=["Admin Automation Events"],
|
|
responses={
|
|
404: {"description": "Not found"},
|
|
400: {"description": "Bad request"},
|
|
401: {"description": "Unauthorized"},
|
|
403: {"description": "Forbidden - Sysadmin only"},
|
|
500: {"description": "Internal server error"}
|
|
}
|
|
)
|
|
|
|
@router.get("")
|
|
@limiter.limit("30/minute")
|
|
def get_all_automation_events(
|
|
request: Request,
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all active scheduler jobs (sysadmin only).
|
|
Each job is enriched with context from its automation definition
|
|
(name, mandate, feature instance, creator) for readability.
|
|
"""
|
|
try:
|
|
from modules.shared.eventManagement import eventManager
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.services import getInterface as getServices
|
|
|
|
if not eventManager.scheduler:
|
|
return []
|
|
|
|
# 1. Collect all scheduler jobs
|
|
jobs = []
|
|
automationIds = []
|
|
for job in eventManager.scheduler.get_jobs():
|
|
if job.id.startswith("automation."):
|
|
automationId = job.id.replace("automation.", "")
|
|
automationIds.append(automationId)
|
|
jobs.append({
|
|
"eventId": job.id,
|
|
"automationId": automationId,
|
|
"nextRunTime": str(job.next_run_time) if job.next_run_time else None,
|
|
"trigger": str(job.trigger) if job.trigger else None,
|
|
"name": "",
|
|
"createdBy": "",
|
|
"mandate": "",
|
|
"featureInstance": ""
|
|
})
|
|
|
|
# 2. Enrich with context from automation definitions
|
|
if jobs:
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
eventUser = rootInterface.getUserByUsername("event")
|
|
if eventUser:
|
|
services = getServices(currentUser, None)
|
|
allAutomations = services.interfaceDbAutomation.getAllAutomationDefinitionsWithRBAC(eventUser)
|
|
|
|
# Build lookup by automation ID
|
|
automationLookup = {}
|
|
for a in allAutomations:
|
|
aId = a.get("id", "") if isinstance(a, dict) else getattr(a, "id", "")
|
|
automationLookup[aId] = a
|
|
|
|
# Caches for resolving UUIDs to names
|
|
_userCache = {}
|
|
_mandateCache = {}
|
|
_featureCache = {}
|
|
|
|
def _resolveUsername(userId):
|
|
if not userId:
|
|
return ""
|
|
if userId not in _userCache:
|
|
try:
|
|
user = rootInterface.getUser(userId)
|
|
_userCache[userId] = user.username if user else userId[:8]
|
|
except Exception:
|
|
_userCache[userId] = userId[:8]
|
|
return _userCache[userId]
|
|
|
|
def _resolveMandateLabel(mandateId):
|
|
if not mandateId:
|
|
return ""
|
|
if mandateId not in _mandateCache:
|
|
try:
|
|
mandate = rootInterface.getMandate(mandateId)
|
|
_mandateCache[mandateId] = getattr(mandate, "label", None) or mandateId[:8]
|
|
except Exception:
|
|
_mandateCache[mandateId] = mandateId[:8]
|
|
return _mandateCache[mandateId]
|
|
|
|
def _resolveFeatureLabel(featureInstanceId):
|
|
if not featureInstanceId:
|
|
return ""
|
|
if featureInstanceId not in _featureCache:
|
|
try:
|
|
instance = rootInterface.getFeatureInstance(featureInstanceId)
|
|
_featureCache[featureInstanceId] = getattr(instance, "label", None) or getattr(instance, "featureCode", None) or featureInstanceId[:8]
|
|
except Exception:
|
|
_featureCache[featureInstanceId] = featureInstanceId[:8]
|
|
return _featureCache[featureInstanceId]
|
|
|
|
# Enrich each job
|
|
for job in jobs:
|
|
automation = automationLookup.get(job["automationId"])
|
|
if automation:
|
|
if isinstance(automation, dict):
|
|
job["name"] = automation.get("label", "")
|
|
job["createdBy"] = _resolveUsername(automation.get("_createdBy", ""))
|
|
job["mandate"] = _resolveMandateLabel(automation.get("mandateId", ""))
|
|
job["featureInstance"] = _resolveFeatureLabel(automation.get("featureInstanceId", ""))
|
|
else:
|
|
job["name"] = getattr(automation, "label", "")
|
|
job["createdBy"] = _resolveUsername(getattr(automation, "_createdBy", ""))
|
|
job["mandate"] = _resolveMandateLabel(getattr(automation, "mandateId", ""))
|
|
job["featureInstance"] = _resolveFeatureLabel(getattr(automation, "featureInstanceId", ""))
|
|
else:
|
|
job["name"] = "(orphaned)"
|
|
except Exception as e:
|
|
logger.warning(f"Could not enrich automation events with context: {e}")
|
|
|
|
return jobs
|
|
except Exception as e:
|
|
logger.error(f"Error getting automation events: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error getting automation events: {str(e)}"
|
|
)
|
|
|
|
@router.post("/sync")
|
|
@limiter.limit("5/minute")
|
|
async def sync_all_automation_events(
|
|
request: Request,
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Manually trigger sync for all automations (sysadmin only).
|
|
This will register/remove events based on active flags.
|
|
"""
|
|
try:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.workflows.automation import syncAutomationEvents
|
|
|
|
# Get event user for sync operation (routes can import from interfaces)
|
|
rootInterface = getRootInterface()
|
|
eventUser = rootInterface.getUserByUsername("event")
|
|
if not eventUser:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Event user not available"
|
|
)
|
|
|
|
from modules.services import getInterface as getServices
|
|
services = getServices(currentUser, None)
|
|
result = syncAutomationEvents(services, eventUser)
|
|
return {
|
|
"success": True,
|
|
"synced": result.get("synced", 0),
|
|
"events": result.get("events", {})
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error syncing automation events: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error syncing automation events: {str(e)}"
|
|
)
|
|
|
|
@router.post("/{eventId}/remove")
|
|
@limiter.limit("10/minute")
|
|
def remove_event(
|
|
request: Request,
|
|
eventId: str = Path(..., description="Event ID to remove"),
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Remove a scheduler job (sysadmin only).
|
|
Removes the job from the scheduler and clears the eventId on the automation definition.
|
|
Does NOT delete the automation definition itself.
|
|
"""
|
|
try:
|
|
from modules.shared.eventManagement import eventManager
|
|
|
|
# Remove scheduler job
|
|
eventManager.remove(eventId)
|
|
|
|
# Clear eventId on the automation definition (so it can be re-synced later)
|
|
if eventId.startswith("automation."):
|
|
automationId = eventId.replace("automation.", "")
|
|
automationInterface = interfaceAutomation.getInterface(currentUser)
|
|
automation = automationInterface.getAutomationDefinition(automationId)
|
|
if automation and getattr(automation, "eventId", None) == eventId:
|
|
automationInterface.updateAutomationDefinition(automationId, {"eventId": None})
|
|
|
|
return {
|
|
"success": True,
|
|
"eventId": eventId,
|
|
"message": f"Event {eventId} removed successfully"
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error removing event: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error removing event: {str(e)}"
|
|
)
|