gateway/app.py
2026-03-22 18:20:31 +01:00

622 lines
No EOL
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import os
import sys
import unicodedata
from urllib.parse import quote_plus
os.environ["NUMEXPR_MAX_THREADS"] = "12"
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer
from contextlib import asynccontextmanager
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
from modules.shared.configuration import APP_CONFIG
from modules.shared.eventManagement import eventManager
from modules.workflows.automation import subAutomationSchedule
from modules.features.automation2.emailPoller import start as startAutomation2EmailPoller
from modules.features.automation2.emailPoller import stop as stopAutomation2EmailPoller
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.system.registry import loadFeatureMainModules
class DailyRotatingFileHandler(RotatingFileHandler):
"""
A rotating file handler that automatically switches to a new file when the date changes.
The log file name includes the current date and switches at midnight.
"""
def __init__(
self, logDir, filenamePrefix, maxBytes=10485760, backupCount=5, **kwargs
):
self.logDir = logDir
self.filenamePrefix = filenamePrefix
self.currentDate = None
self.currentFile = None
# Initialize with today's file
self._updateFileIfNeeded()
# Call parent constructor with current file
super().__init__(
self.currentFile, maxBytes=maxBytes, backupCount=backupCount, **kwargs
)
def _updateFileIfNeeded(self):
"""Update the log file if the date has changed"""
# Guard against interpreter shutdown when datetime may be None
if datetime is None:
return False
today = datetime.now().strftime("%Y%m%d")
if self.currentDate != today:
self.currentDate = today
newFile = os.path.join(self.logDir, f"{self.filenamePrefix}_{today}.log")
if self.currentFile != newFile:
self.currentFile = newFile
return True
return False
def emit(self, record):
"""Emit a log record, switching files if date has changed"""
# Check if we need to switch to a new file
if self._updateFileIfNeeded():
# Close current file and open new one
if self.stream:
self.stream.close()
self.stream = None
# Update the baseFilename for the parent class
self.baseFilename = self.currentFile
# Reopen the stream
if not self.delay:
self.stream = self._open()
# Call parent emit method
super().emit(record)
def initLogging():
"""Initialize logging with configuration from APP_CONFIG"""
# Get log level from config (default to INFO if not found)
logLevelName = APP_CONFIG.get("APP_LOGGING_LOG_LEVEL", "WARNING")
logLevel = getattr(logging, logLevelName)
# Get log directory from config
logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
if not os.path.isabs(logDir):
# If relative path, make it relative to the gateway directory
gatewayDir = os.path.dirname(os.path.abspath(__file__))
logDir = os.path.join(gatewayDir, logDir)
# Ensure log directory exists
os.makedirs(logDir, exist_ok=True)
# Create formatters - using single line format
consoleFormatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"),
)
# File formatter with more detailed error information but still single line
fileFormatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(name)s - %(message)s - %(pathname)s:%(lineno)d - %(funcName)s",
datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"),
)
# Add filter to exclude Chrome DevTools requests
class ChromeDevToolsFilter(logging.Filter):
def filter(self, record):
return not (
isinstance(record.msg, str)
and (
".well-known/appspecific/com.chrome.devtools.json" in record.msg
or "Request: /index.html" in record.msg
)
)
# Add filter to exclude all httpcore loggers (including sub-loggers)
class HttpcoreStarFilter(logging.Filter):
def filter(self, record):
return not (
record.name == "httpcore" or record.name.startswith("httpcore.")
)
# Add filter to exclude HTTP debug messages
class HTTPDebugFilter(logging.Filter):
def filter(self, record):
if isinstance(record.msg, str):
# Filter out HTTP debug messages
http_debug_patterns = [
"receive_response_body.started",
"receive_response_body.complete",
"response_closed.started",
"_send_single_request",
"httpcore.http11",
"httpx._client",
"HTTP Request",
"multipart.multipart",
]
return not any(pattern in record.msg for pattern in http_debug_patterns)
return True
# Add filter to remove emojis from log messages to prevent Unicode encoding errors
class EmojiFilter(logging.Filter):
def filter(self, record):
if isinstance(record.msg, str):
# Remove only emojis, preserve other Unicode characters like quotes
# Guard against None characters during shutdown
try:
record.msg = "".join(
char
for char in record.msg
if char is not None and unicodedata.category(char) != "So"
or (char is not None and not (
0x1F600 <= ord(char) <= 0x1F64F
or 0x1F300 <= ord(char) <= 0x1F5FF
or 0x1F680 <= ord(char) <= 0x1F6FF
or 0x1F1E0 <= ord(char) <= 0x1F1FF
or 0x2600 <= ord(char) <= 0x26FF
or 0x2700 <= ord(char) <= 0x27BF
))
)
except (TypeError, AttributeError):
# Handle edge cases during shutdown
pass
return True
# Add filter to normalize problematic unicode (e.g., arrows) to ASCII for terminals like cp1252
class UnicodeArrowFilter(logging.Filter):
def filter(self, record):
if isinstance(record.msg, str):
translation_map = {
"\u2192": "->", # rightwards arrow
"\u2190": "<-", # leftwards arrow
"\u2194": "<->", # left right arrow
"\u21D2": "=>", # rightwards double arrow
"\u21D0": "<=", # leftwards double arrow
"\u21D4": "<=>", # left right double arrow
"\u00AB": "<<", # left-pointing double angle quotation mark
"\u00BB": ">>", # right-pointing double angle quotation mark
}
for u, ascii_eq in translation_map.items():
record.msg = record.msg.replace(u, ascii_eq)
return True
# Configure handlers based on config
handlers = []
# Add console handler if enabled
if APP_CONFIG.get("APP_LOGGING_CONSOLE_ENABLED", True):
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(consoleFormatter)
consoleHandler.addFilter(ChromeDevToolsFilter())
consoleHandler.addFilter(HttpcoreStarFilter())
consoleHandler.addFilter(HTTPDebugFilter())
consoleHandler.addFilter(EmojiFilter())
consoleHandler.addFilter(UnicodeArrowFilter())
handlers.append(consoleHandler)
# Add file handler if enabled
if APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", True):
# Create daily application log file with automatic date switching
rotationSize = int(
APP_CONFIG.get("APP_LOGGING_ROTATION_SIZE", 10485760)
) # Default: 10MB
backupCount = int(APP_CONFIG.get("APP_LOGGING_BACKUP_COUNT", 5))
fileHandler = DailyRotatingFileHandler(
logDir=logDir,
filenamePrefix="log_app",
maxBytes=rotationSize,
backupCount=backupCount,
encoding="utf-8",
)
fileHandler.setFormatter(fileFormatter)
fileHandler.addFilter(ChromeDevToolsFilter())
fileHandler.addFilter(HttpcoreStarFilter())
fileHandler.addFilter(HTTPDebugFilter())
fileHandler.addFilter(EmojiFilter())
fileHandler.addFilter(UnicodeArrowFilter())
handlers.append(fileHandler)
# Configure the root logger
logging.basicConfig(
level=logLevel,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"),
handlers=handlers,
force=True, # Force reconfiguration of the root logger
)
# Silence noisy third-party libraries - use the same level as the root logger
noisyLoggers = [
"httpx",
"httpcore",
"urllib3",
"asyncio",
"fastapi.security.oauth2",
"msal",
"azure.core.pipeline.policies.http_logging_policy",
]
for loggerName in noisyLoggers:
logging.getLogger(loggerName).setLevel(logging.WARNING)
# Log the current logging configuration
logger = logging.getLogger(__name__)
logger.info(f"Logging initialized with level {logLevelName}")
logger.info(f"Log directory: {logDir}")
if APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", True):
today = datetime.now().strftime("%Y%m%d")
appLogFile = os.path.join(logDir, f"log_app_{today}.log")
logger.info(f"Application log file: {appLogFile} (auto-switches daily)")
else:
logger.info("Application log file: disabled")
logger.info(
f"Console logging: {'enabled' if APP_CONFIG.get('APP_LOGGING_CONSOLE_ENABLED', True) else 'disabled'}"
)
def makeSqlalchemyDbUrl() -> str:
host = APP_CONFIG.get("SQLALCHEMY_DB_HOST", "localhost")
port = APP_CONFIG.get("SQLALCHEMY_DB_PORT", "5432")
db = APP_CONFIG.get("SQLALCHEMY_DB_DATABASE", "project_gateway")
user = APP_CONFIG.get("SQLALCHEMY_DB_USER", "postgres")
pwd = quote_plus(APP_CONFIG.get("SQLALCHEMY_DB_PASSWORD_SECRET", ""))
# On Windows, prefer asyncpg to avoid psycopg + ProactorEventLoop incompatibility
if sys.platform == "win32":
return f"postgresql+asyncpg://{user}:{pwd}@{host}:{port}/{db}"
return f"postgresql+psycopg://{user}:{pwd}@{host}:{port}/{db}"
# Initialize logging
initLogging()
logger = logging.getLogger(__name__)
instanceLabel = APP_CONFIG.get("APP_ENV_LABEL")
# Pre-warm AI connectors on process load (before lifespan). Critical for chatbot latency.
try:
import modules.aicore.aicoreModelRegistry # noqa: F401
logger.info("AI connectors pre-warm (app load) triggered")
except Exception as e:
logging.getLogger(__name__).warning(f"AI pre-warm at app load failed: {e}")
# Define lifespan context manager for application startup/shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Application is starting up")
# --- Pre-warm AI connectors FIRST (before any other startup work) ---
# Avoids 48 s latency on first chatbot request; must run before first use.
try:
import modules.aicore.aicoreModelRegistry # noqa: F401 - triggers eager pre-warm
from modules.aicore.aicoreModelRegistry import modelRegistry
modelRegistry.ensureConnectorsRegistered()
modelRegistry.refreshModels(force=True)
logger.info("AI connectors and model registry pre-warmed")
except Exception as e:
logger.warning(f"AI pre-warm failed: {e}")
# Bootstrap database if needed (creates initial users, mandates, roles, etc.)
# This must happen before getting root interface
from modules.security.rootAccess import getRootDbAppConnector
from modules.interfaces.interfaceBootstrap import initBootstrap
rootDb = getRootDbAppConnector()
try:
initBootstrap(rootDb)
logger.info("Bootstrap check completed")
except Exception as e:
logger.warning(f"Bootstrap check failed (may already be initialized): {str(e)}")
# Register all feature definitions in RBAC catalog (for /api/features/ endpoint)
try:
from modules.security.rbacCatalog import getCatalogService
from modules.system.registry import registerAllFeaturesInCatalog
catalogService = getCatalogService()
registerAllFeaturesInCatalog(catalogService)
logger.info("Feature catalog registration completed")
except Exception as e:
logger.error(f"Feature catalog registration failed: {e}")
# Pre-warm service center modules (avoids first-request import latency)
try:
from modules.serviceCenter import preWarm
preWarm()
logger.info("Service center pre-warm completed")
except Exception as e:
logger.warning(f"Service center pre-warm failed (non-critical): {e}")
# Get event user for feature lifecycle (system-level user for background operations)
rootInterface = getRootInterface()
eventUser = rootInterface.getUserByUsername("event")
if not eventUser:
logger.error("Could not get event user - some features may not start properly")
# --- Init Feature Containers (Plug&Play) ---
try:
mainModules = loadFeatureMainModules()
for featureName, module in mainModules.items():
if hasattr(module, "onStart"):
try:
await module.onStart(eventUser)
logger.info(f"Feature '{featureName}' started")
except Exception as e:
logger.error(f"Feature '{featureName}' failed to start: {e}")
except Exception as e:
logger.warning(f"Could not initialize feature containers: {e}")
# --- Init Managers ---
subAutomationSchedule.start(eventUser) # Automation scheduler
# Automation2 email poller: started on-demand when a run pauses for email.checkEmail
eventManager.start()
# Register audit log cleanup scheduler
from modules.shared.auditLogger import registerAuditLogCleanupScheduler
registerAuditLogCleanupScheduler()
# Ensure billing settings and accounts exist for all mandates
try:
from modules.interfaces.interfaceDbBilling import _getRootInterface as getBillingRootInterface
billingInterface = getBillingRootInterface()
# Step 1: Ensure all mandates have billing settings (creates defaults if missing)
settingsCreated = billingInterface.ensureAllMandateSettingsExist()
if settingsCreated > 0:
logger.info(f"Billing startup: Created {settingsCreated} missing mandate billing settings")
# Step 2: Ensure all users have billing accounts (for PREPAY_USER mandates)
accountsCreated = billingInterface.ensureAllUserAccountsExist()
if accountsCreated > 0:
logger.info(f"Billing startup: Created {accountsCreated} missing user accounts")
except Exception as e:
logger.warning(f"Failed to ensure billing settings/accounts (non-critical): {e}")
yield
# --- Stop Managers ---
stopAutomation2EmailPoller(eventUser) # Automation2 email poller (no-op if not running)
eventManager.stop()
subAutomationSchedule.stop(eventUser) # Automation scheduler
# --- Stop Feature Containers (Plug&Play) ---
try:
mainModules = loadFeatureMainModules()
for featureName, module in mainModules.items():
if hasattr(module, "onStop"):
try:
await module.onStop(eventUser)
logger.info(f"Feature '{featureName}' stopped")
except Exception as e:
logger.error(f"Feature '{featureName}' failed to stop: {e}")
except Exception as e:
logger.warning(f"Could not shutdown feature containers: {e}")
logger.info("Application has been shut down")
# Custom function to generate readable operation IDs for Swagger UI
# Uses snake_case function names directly instead of auto-generated IDs
def _generateOperationId(route) -> str:
"""Generate operation ID from route function name (snake_case)."""
if hasattr(route, "endpoint") and hasattr(route.endpoint, "__name__"):
return route.endpoint.__name__
return route.name if route.name else "unknown"
# START APP
app = FastAPI(
title="PowerOn AG | Workflow Engine",
description=f"API for dynamic SaaS platforms ({instanceLabel})",
lifespan=lifespan,
swagger_ui_init_oauth={
"usePkceWithAuthorizationCodeGrant": True,
},
generate_unique_id_function=_generateOperationId,
)
# Configure OpenAPI security scheme for Swagger UI
# This adds the "Authorize" button to the /docs page
securityScheme = HTTPBearer()
app.openapi_schema = None # Reset schema to regenerate with security
def customOpenapi():
if app.openapi_schema:
return app.openapi_schema
from fastapi.openapi.utils import get_openapi
openapiSchema = get_openapi(
title=app.title,
version="1.0.0",
description=app.description,
routes=app.routes,
)
# Add security scheme definition
openapiSchema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Enter your JWT token (obtained from login endpoint or browser cookies)",
}
}
# Apply security globally to all endpoints
# Individual endpoints can override this if needed
openapiSchema["security"] = [{"BearerAuth": []}]
app.openapi_schema = openapiSchema
return app.openapi_schema
app.openapi = customOpenapi
# Parse CORS origins from environment variable
def getAllowedOrigins():
originsStr = APP_CONFIG.get("APP_ALLOWED_ORIGINS", "http://localhost:8080")
# Split by comma and strip whitespace
origins = [origin.strip() for origin in originsStr.split(",")]
logger.info(f"CORS allowed origins: {origins}")
return origins
# CORS origin regex pattern for wildcard subdomain support
# Matches all subdomains of poweron.swiss and poweron-center.net
CORS_ORIGIN_REGEX = r"https://.*\.(poweron\.swiss|poweron-center\.net)"
# CORS configuration using environment variables
app.add_middleware(
CORSMiddleware,
allow_origins=getAllowedOrigins(),
allow_origin_regex=CORS_ORIGIN_REGEX,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=["*"],
max_age=86400, # Increased caching for preflight requests
)
# SlowAPI rate limiter initialization
from modules.auth import limiter
from slowapi.errors import RateLimitExceeded
from slowapi import _rate_limit_exceeded_handler
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
async def _insufficientBalanceHandler(request: Request, exc: Exception):
"""HTTP 402 with structured billing hint (PREPAY_USER vs PREPAY_MANDATE)."""
payload = exc.toClientDict() if hasattr(exc, "toClientDict") else {"error": "INSUFFICIENT_BALANCE", "message": str(exc)}
return JSONResponse(status_code=402, content={"detail": payload})
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import (
InsufficientBalanceException,
)
app.add_exception_handler(InsufficientBalanceException, _insufficientBalanceHandler)
# CSRF protection middleware
from modules.auth import CSRFMiddleware
from modules.auth import (
TokenRefreshMiddleware,
ProactiveTokenRefreshMiddleware,
)
app.add_middleware(CSRFMiddleware)
# Token refresh middleware (silent refresh for expired OAuth tokens)
app.add_middleware(TokenRefreshMiddleware, enabled=True)
# Proactive token refresh middleware (refresh tokens before they expire)
app.add_middleware(
ProactiveTokenRefreshMiddleware, enabled=True, check_interval_minutes=5
)
# Include all routers
from modules.routes.routeAdmin import router as generalRouter
app.include_router(generalRouter)
from modules.routes.routeAttributes import router as attributesRouter
app.include_router(attributesRouter)
from modules.routes.routeDataMandates import router as mandateRouter
app.include_router(mandateRouter)
from modules.routes.routeDataUsers import router as userRouter
app.include_router(userRouter)
from modules.routes.routeDataFiles import router as fileRouter
app.include_router(fileRouter)
from modules.routes.routeDataPrompts import router as promptRouter
app.include_router(promptRouter)
from modules.routes.routeDataConnections import router as connectionsRouter
app.include_router(connectionsRouter)
from modules.routes.routeSecurityLocal import router as localRouter
app.include_router(localRouter)
from modules.routes.routeSecurityMsft import router as msftRouter
app.include_router(msftRouter)
from modules.routes.routeSecurityGoogle import router as googleRouter
app.include_router(googleRouter)
from modules.routes.routeVoiceGoogle import router as voiceGoogleRouter
app.include_router(voiceGoogleRouter)
from modules.routes.routeSecurityAdmin import router as adminSecurityRouter
app.include_router(adminSecurityRouter)
from modules.routes.routeSharepoint import router as sharepointRouter
app.include_router(sharepointRouter)
from modules.routes.routeAdminAutomationEvents import router as adminAutomationEventsRouter
app.include_router(adminAutomationEventsRouter)
from modules.routes.routeAdminLogs import router as adminLogsRouter
app.include_router(adminLogsRouter)
from modules.routes.routeAdminRbacRules import router as rbacAdminRulesRouter
app.include_router(rbacAdminRulesRouter)
from modules.routes.routeMessaging import router as messagingRouter
app.include_router(messagingRouter)
from modules.routes.routeAdminFeatures import router as featuresAdminRouter
app.include_router(featuresAdminRouter)
from modules.routes.routeStore import router as storeRouter
app.include_router(storeRouter)
from modules.routes.routeInvitations import router as invitationsRouter
app.include_router(invitationsRouter)
from modules.routes.routeNotifications import router as notificationsRouter
app.include_router(notificationsRouter)
from modules.routes.routeAdminRbacExport import router as rbacAdminExportRouter
app.include_router(rbacAdminExportRouter)
from modules.routes.routeAdminUserAccessOverview import router as userAccessOverviewRouter
app.include_router(userAccessOverviewRouter)
from modules.routes.routeGdpr import router as gdprRouter
app.include_router(gdprRouter)
from modules.routes.routeBilling import router as billingRouter
app.include_router(billingRouter)
# ============================================================================
# SYSTEM ROUTES (Navigation, etc.)
# ============================================================================
from modules.routes.routeSystem import router as systemRouter, navigationRouter
app.include_router(systemRouter)
app.include_router(navigationRouter)
# ============================================================================
# PLUG&PLAY FEATURE ROUTERS
# Dynamically load routers from feature containers in modules/features/
# ============================================================================
from modules.system.registry import loadFeatureRouters
featureLoadResults = loadFeatureRouters(app)
logger.info(f"Feature router load results: {featureLoadResults}")