# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Admin automation execution logs routes. SysAdmin-only endpoints for viewing consolidated automation execution history across all mandates and feature instances. """ from fastapi import APIRouter, HTTPException, Depends, Request, Query from typing import List, Dict, Any, Optional import logging import json import math import uuid from modules.auth import limiter, requireSysAdminRole from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelPagination import PaginationParams, PaginationMetadata, normalize_pagination_dict from modules.routes.routeDataUsers import _applyFiltersAndSort, _extractDistinctValues logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/admin/automation-logs", tags=["Admin Automation Logs"], responses={ 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden - Sysadmin only"}, 500: {"description": "Internal server error"}, }, ) def _buildFlattenedExecutionLogs(currentUser: User) -> List[Dict[str, Any]]: """Flatten executionLogs from all AutomationDefinitions across all mandates. Called from a SysAdmin-only endpoint — bypasses RBAC, reads directly from DB.""" from modules.interfaces.interfaceDbApp import getRootInterface from modules.features.automation.mainAutomation import getAutomationServices from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition rootInterface = getRootInterface() services = getAutomationServices(currentUser, mandateId=None, featureInstanceId=None) allAutomations = services.interfaceDbAutomation.db.getRecordset(AutomationDefinition) userCache: Dict[str, str] = {} mandateCache: Dict[str, str] = {} featureCache: Dict[str, str] = {} def _resolveUsername(userId: str) -> str: if not userId: return "" if userId not in userCache: try: user = rootInterface.getUser(userId) userCache[userId] = user.username if user else userId[:8] except Exception: userCache[userId] = userId[:8] return userCache[userId] def _resolveMandateLabel(mandateId: str) -> str: if not mandateId: return "" if mandateId not in mandateCache: try: mandate = rootInterface.getMandate(mandateId) mandateCache[mandateId] = getattr(mandate, "label", None) or mandateId[:8] except Exception: mandateCache[mandateId] = mandateId[:8] return mandateCache[mandateId] def _resolveFeatureLabel(featureInstanceId: str) -> str: if not featureInstanceId: return "" if featureInstanceId not in featureCache: try: instance = rootInterface.getFeatureInstance(featureInstanceId) featureCache[featureInstanceId] = ( getattr(instance, "label", None) or getattr(instance, "featureCode", None) or featureInstanceId[:8] ) except Exception: featureCache[featureInstanceId] = featureInstanceId[:8] return featureCache[featureInstanceId] flatLogs: List[Dict[str, Any]] = [] for automation in allAutomations: if isinstance(automation, dict): automationId = automation.get("id", "") automationLabel = automation.get("label", "") mandateId = automation.get("mandateId", "") featureInstanceId = automation.get("featureInstanceId", "") createdBy = automation.get("sysCreatedBy", "") logs = automation.get("executionLogs") or [] else: automationId = getattr(automation, "id", "") automationLabel = getattr(automation, "label", "") mandateId = getattr(automation, "mandateId", "") featureInstanceId = getattr(automation, "featureInstanceId", "") createdBy = getattr(automation, "sysCreatedBy", "") logs = getattr(automation, "executionLogs", None) or [] mandateName = _resolveMandateLabel(mandateId) featureInstanceName = _resolveFeatureLabel(featureInstanceId) executedByName = _resolveUsername(createdBy) for log in logs: timestamp = log.get("timestamp", 0) if isinstance(log, dict) else 0 status = log.get("status", "") if isinstance(log, dict) else "" workflowId = log.get("workflowId", "") if isinstance(log, dict) else "" messages = log.get("messages", []) if isinstance(log, dict) else [] flatLogs.append({ "id": str(uuid.uuid4()), "timestamp": timestamp, "automationId": automationId, "automationLabel": automationLabel, "mandateName": mandateName, "featureInstanceName": featureInstanceName, "executedBy": executedByName, "status": status, "workflowId": workflowId, "messages": "; ".join(messages) if messages else "", }) flatLogs.sort(key=lambda x: x.get("timestamp", 0), reverse=True) return flatLogs @router.get("") @limiter.limit("30/minute") def get_all_automation_logs( request: Request, pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"), currentUser: User = Depends(requireSysAdminRole), ): """Get consolidated execution logs from all automations (sysadmin only).""" try: paginationParams: Optional[PaginationParams] = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") logs = _buildFlattenedExecutionLogs(currentUser) filtered = _applyFiltersAndSort(logs, paginationParams) if paginationParams: totalItems = len(filtered) totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0 startIdx = (paginationParams.page - 1) * paginationParams.pageSize endIdx = startIdx + paginationParams.pageSize return { "items": filtered[startIdx:endIdx], "pagination": PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=totalItems, totalPages=totalPages, sort=paginationParams.sort, filters=paginationParams.filters, ).model_dump(), } return {"items": logs, "pagination": None} except HTTPException: raise except Exception as e: logger.error(f"Error getting automation logs: {str(e)}") raise HTTPException(status_code=500, detail=f"Error getting automation logs: {str(e)}") @router.get("/filter-values") @limiter.limit("60/minute") def get_automation_log_filter_values( request: Request, column: str = Query(..., description="Column key"), pagination: Optional[str] = Query(None, description="JSON-encoded current filters"), currentUser: User = Depends(requireSysAdminRole), ): """Return distinct filter values for a column in automation logs.""" try: crossFilterParams: Optional[PaginationParams] = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) filters = paginationDict.get("filters", {}) filters.pop(column, None) paginationDict["filters"] = filters paginationDict.pop("sort", None) crossFilterParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError): pass logs = _buildFlattenedExecutionLogs(currentUser) crossFiltered = _applyFiltersAndSort(logs, crossFilterParams) return _extractDistinctValues(crossFiltered, column) except Exception as e: logger.error(f"Error getting filter values: {str(e)}") raise HTTPException(status_code=500, detail=str(e))