# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Admin automation events routes for the backend API. Sysadmin-only endpoints for viewing and controlling scheduler events. """ from fastapi import APIRouter, HTTPException, Depends, Path, Request, Response, Query from typing import List, Dict, Any, Optional from fastapi import status import logging import json import math # Import interfaces and models from feature containers import modules.features.automation.interfaceFeatureAutomation as interfaceAutomation from modules.auth import limiter, getRequestContext, requireSysAdminRole, RequestContext from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelPagination import PaginationParams, PaginationMetadata, normalize_pagination_dict from modules.routes.routeDataUsers import _applyFiltersAndSort, _extractDistinctValues # Configure logger logger = logging.getLogger(__name__) # Create router for admin automation events endpoints router = APIRouter( prefix="/api/admin/automation-events", tags=["Admin Automation Events"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden - Sysadmin only"}, 500: {"description": "Internal server error"} } ) def _buildEnrichedAutomationEvents(currentUser: User) -> List[Dict[str, Any]]: """Build the full enriched automation events list.""" from modules.shared.eventManagement import eventManager from modules.interfaces.interfaceDbApp import getRootInterface from modules.features.automation.mainAutomation import getAutomationServices if not eventManager.scheduler: return [] jobs = [] for job in eventManager.scheduler.get_jobs(): if job.id.startswith("automation."): automationId = job.id.replace("automation.", "") jobs.append({ "eventId": job.id, "id": job.id, "automationId": automationId, "nextRunTime": str(job.next_run_time) if job.next_run_time else None, "trigger": str(job.trigger) if job.trigger else None, "name": "", "createdBy": "", "mandate": "", "featureInstance": "" }) if jobs: try: rootInterface = getRootInterface() eventUser = rootInterface.getUserByUsername("event") if eventUser: services = getAutomationServices(currentUser, mandateId=None, featureInstanceId=None) allAutomations = services.interfaceDbAutomation.getAllAutomationDefinitionsWithRBAC(eventUser) automationLookup = {} for a in allAutomations: aId = a.get("id", "") if isinstance(a, dict) else getattr(a, "id", "") automationLookup[aId] = a _userCache: Dict[str, str] = {} _mandateCache: Dict[str, str] = {} _featureCache: Dict[str, str] = {} def _resolveUsername(userId): if not userId: return "" if userId not in _userCache: try: user = rootInterface.getUser(userId) _userCache[userId] = user.username if user else userId[:8] except Exception: _userCache[userId] = userId[:8] return _userCache[userId] def _resolveMandateLabel(mandateId): if not mandateId: return "" if mandateId not in _mandateCache: try: mandate = rootInterface.getMandate(mandateId) _mandateCache[mandateId] = getattr(mandate, "label", None) or mandateId[:8] except Exception: _mandateCache[mandateId] = mandateId[:8] return _mandateCache[mandateId] def _resolveFeatureLabel(featureInstanceId): if not featureInstanceId: return "" if featureInstanceId not in _featureCache: try: instance = rootInterface.getFeatureInstance(featureInstanceId) _featureCache[featureInstanceId] = getattr(instance, "label", None) or getattr(instance, "featureCode", None) or featureInstanceId[:8] except Exception: _featureCache[featureInstanceId] = featureInstanceId[:8] return _featureCache[featureInstanceId] for job in jobs: automation = automationLookup.get(job["automationId"]) if automation: if isinstance(automation, dict): job["name"] = automation.get("label", "") job["createdBy"] = _resolveUsername(automation.get("_createdBy", "")) job["mandate"] = _resolveMandateLabel(automation.get("mandateId", "")) job["featureInstance"] = _resolveFeatureLabel(automation.get("featureInstanceId", "")) else: job["name"] = getattr(automation, "label", "") job["createdBy"] = _resolveUsername(getattr(automation, "_createdBy", "")) job["mandate"] = _resolveMandateLabel(getattr(automation, "mandateId", "")) job["featureInstance"] = _resolveFeatureLabel(getattr(automation, "featureInstanceId", "")) else: job["name"] = "(orphaned)" except Exception as e: logger.warning(f"Could not enrich automation events with context: {e}") return jobs @router.get("") @limiter.limit("30/minute") def get_all_automation_events( request: Request, pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"), currentUser: User = Depends(requireSysAdminRole), ): """Get all active scheduler jobs with pagination support (sysadmin only).""" try: paginationParams: Optional[PaginationParams] = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") enriched = _buildEnrichedAutomationEvents(currentUser) filtered = _applyFiltersAndSort(enriched, paginationParams) if paginationParams: totalItems = len(filtered) totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0 startIdx = (paginationParams.page - 1) * paginationParams.pageSize endIdx = startIdx + paginationParams.pageSize return { "items": filtered[startIdx:endIdx], "pagination": PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=totalItems, totalPages=totalPages, sort=paginationParams.sort, filters=paginationParams.filters, ).model_dump(), } return {"items": enriched, "pagination": None} except HTTPException: raise except Exception as e: logger.error(f"Error getting automation events: {str(e)}") raise HTTPException(status_code=500, detail=f"Error getting automation events: {str(e)}") @router.get("/filter-values") @limiter.limit("60/minute") def get_automation_event_filter_values( request: Request, column: str = Query(..., description="Column key"), pagination: Optional[str] = Query(None, description="JSON-encoded current filters"), currentUser: User = Depends(requireSysAdminRole), ): """Return distinct filter values for a column in automation events.""" try: crossFilterParams: Optional[PaginationParams] = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) filters = paginationDict.get("filters", {}) filters.pop(column, None) paginationDict["filters"] = filters paginationDict.pop("sort", None) crossFilterParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError): pass enriched = _buildEnrichedAutomationEvents(currentUser) crossFiltered = _applyFiltersAndSort(enriched, crossFilterParams) return _extractDistinctValues(crossFiltered, column) except Exception as e: logger.error(f"Error getting filter values: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/sync") @limiter.limit("5/minute") async def sync_all_automation_events( request: Request, currentUser: User = Depends(requireSysAdminRole) ) -> Dict[str, Any]: """ Manually trigger sync for all automations (sysadmin only). This will register/remove events based on active flags. """ try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.workflows.automation import syncAutomationEvents # Get event user for sync operation (routes can import from interfaces) rootInterface = getRootInterface() eventUser = rootInterface.getUserByUsername("event") if not eventUser: raise HTTPException( status_code=500, detail="Event user not available" ) from modules.features.automation.mainAutomation import getAutomationServices services = getAutomationServices(currentUser, mandateId=None, featureInstanceId=None) result = syncAutomationEvents(services, eventUser) return { "success": True, "synced": result.get("synced", 0), "events": result.get("events", {}) } except HTTPException: raise except Exception as e: logger.error(f"Error syncing automation events: {str(e)}") raise HTTPException( status_code=500, detail=f"Error syncing automation events: {str(e)}" ) @router.post("/{eventId}/remove") @limiter.limit("10/minute") def remove_event( request: Request, eventId: str = Path(..., description="Event ID to remove"), currentUser: User = Depends(requireSysAdminRole) ) -> Dict[str, Any]: """ Remove a scheduler job (sysadmin only). Removes the job from the scheduler and clears the eventId on the automation definition. Does NOT delete the automation definition itself. """ try: from modules.shared.eventManagement import eventManager # Remove scheduler job eventManager.remove(eventId) # Clear eventId on the automation definition (so it can be re-synced later) if eventId.startswith("automation."): automationId = eventId.replace("automation.", "") automationInterface = interfaceAutomation.getInterface(currentUser) automation = automationInterface.getAutomationDefinition(automationId) if automation and getattr(automation, "eventId", None) == eventId: automationInterface.updateAutomationDefinition(automationId, {"eventId": None}) return { "success": True, "eventId": eventId, "message": f"Event {eventId} removed successfully" } except Exception as e: logger.error(f"Error removing event: {str(e)}") raise HTTPException( status_code=500, detail=f"Error removing event: {str(e)}" )