From e313055de640ba9e25c4e498e29a482eae81d80b Mon Sep 17 00:00:00 2001 From: patrick-motsch Date: Wed, 18 Feb 2026 21:39:37 +0100 Subject: [PATCH] log admin --- app.py | 3 + modules/routes/routeAdminLogs.py | 143 +++++++++++++++++++++++++++++++ modules/system/mainSystem.py | 10 +++ 3 files changed, 156 insertions(+) create mode 100644 modules/routes/routeAdminLogs.py diff --git a/app.py b/app.py index fe597989..1eaf50a4 100644 --- a/app.py +++ b/app.py @@ -525,6 +525,9 @@ app.include_router(sharepointRouter) from modules.routes.routeAdminAutomationEvents import router as adminAutomationEventsRouter app.include_router(adminAutomationEventsRouter) +from modules.routes.routeAdminLogs import router as adminLogsRouter +app.include_router(adminLogsRouter) + from modules.routes.routeAdminRbacRules import router as rbacAdminRulesRouter app.include_router(rbacAdminRulesRouter) diff --git a/modules/routes/routeAdminLogs.py b/modules/routes/routeAdminLogs.py new file mode 100644 index 00000000..065eba9c --- /dev/null +++ b/modules/routes/routeAdminLogs.py @@ -0,0 +1,143 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Admin log viewer routes for the backend API. +Sysadmin-only endpoint for viewing gateway application logs. +Reads from the daily rotating log files (log_app_YYYYMMDD.log). +""" + +import os +import glob +import logging +from datetime import datetime +from fastapi import APIRouter, HTTPException, Depends, Request, Query +from fastapi.responses import PlainTextResponse +from modules.auth import limiter, requireSysAdminRole +from modules.shared.configuration import APP_CONFIG +from modules.datamodels.datamodelUam import User + +logger = logging.getLogger(__name__) + +router = APIRouter( + prefix="/api/admin/logs", + tags=["Admin Logs"], + responses={ + 401: {"description": "Unauthorized"}, + 403: {"description": "Forbidden - Sysadmin only"}, + 500: {"description": "Internal server error"}, + }, +) + + +def _getLogDir() -> str: + """Resolve the configured log directory (same logic as app.py initLogging).""" + logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") + if not os.path.isabs(logDir): + gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + logDir = os.path.join(gatewayDir, logDir) + return logDir + + +def _getLogFiles() -> list[str]: + """Return all log_app_*.log files sorted by name descending (newest first).""" + logDir = _getLogDir() + pattern = os.path.join(logDir, "log_app_*.log") + files = glob.glob(pattern) + files.sort(reverse=True) + return files + + +def _readLastNLines(filePath: str, n: int) -> list[str]: + """Read last n lines from a file efficiently.""" + lines = [] + try: + with open(filePath, "r", encoding="utf-8", errors="replace") as f: + lines = f.readlines() + except Exception as e: + logger.warning(f"Failed to read log file {filePath}: {e}") + return lines[-n:] if len(lines) > n else lines + + +@router.get("") +@limiter.limit("30/minute") +def getLogEntries( + request: Request, + count: int = Query(default=200, ge=1, le=50000, description="Number of log entries to return"), + currentUser: User = Depends(requireSysAdminRole), +) -> dict: + """ + Get the last N log entries from the gateway log files. + Merges across multiple daily log files if needed. + """ + try: + logFiles = _getLogFiles() + + if not logFiles: + return {"lines": [], "totalCollected": 0, "logDir": _getLogDir()} + + collectedLines: list[str] = [] + remaining = count + + for filePath in logFiles: + if remaining <= 0: + break + + lines = _readLastNLines(filePath, remaining) + collectedLines = lines + collectedLines + remaining = count - len(collectedLines) + + finalLines = collectedLines[-count:] if len(collectedLines) > count else collectedLines + + return { + "lines": [line.rstrip("\n").rstrip("\r") for line in finalLines], + "totalCollected": len(finalLines), + "logDir": _getLogDir(), + } + + except Exception as e: + logger.error(f"Failed to read log files: {e}") + raise HTTPException(status_code=500, detail=f"Failed to read log files: {str(e)}") + + +@router.get("/download") +@limiter.limit("10/minute") +def downloadLog( + request: Request, + count: int = Query(default=1000, ge=1, le=100000, description="Number of log entries to download"), + currentUser: User = Depends(requireSysAdminRole), +) -> PlainTextResponse: + """ + Download the last N log entries as a plain text file. + """ + try: + logFiles = _getLogFiles() + + if not logFiles: + return PlainTextResponse("No log files found.", media_type="text/plain") + + collectedLines: list[str] = [] + remaining = count + + for filePath in logFiles: + if remaining <= 0: + break + + lines = _readLastNLines(filePath, remaining) + collectedLines = lines + collectedLines + remaining = count - len(collectedLines) + + finalLines = collectedLines[-count:] if len(collectedLines) > count else collectedLines + content = "".join(finalLines) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + return PlainTextResponse( + content=content, + media_type="text/plain", + headers={ + "Content-Disposition": f'attachment; filename="gateway_log_{timestamp}.log"', + }, + ) + + except Exception as e: + logger.error(f"Failed to download log files: {e}") + raise HTTPException(status_code=500, detail=f"Failed to download log files: {str(e)}") diff --git a/modules/system/mainSystem.py b/modules/system/mainSystem.py index 137bf17b..f8b6edf2 100644 --- a/modules/system/mainSystem.py +++ b/modules/system/mainSystem.py @@ -223,6 +223,16 @@ NAVIGATION_SECTIONS = [ "adminOnly": True, "sysAdminOnly": True, }, + { + "id": "admin-logs", + "objectKey": "ui.admin.logs", + "label": {"en": "Logs", "de": "Logs", "fr": "Logs"}, + "icon": "FaFileAlt", + "path": "/admin/logs", + "order": 70, + "adminOnly": True, + "sysAdminOnly": True, + }, ], }, ]