platform-core/app.py
ValueOn AG ce612ffcfc
All checks were successful
Deploy Plattform-Core (Int) / test (push) Successful in 1m0s
Deploy Plattform-Core (Int) / deploy (push) Successful in 9s
import referencing fixes
2026-06-08 14:46:52 +02:00

895 lines
No EOL
35 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import os
import sys
import unicodedata
from urllib.parse import quote_plus
os.environ["NUMEXPR_MAX_THREADS"] = "12"
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer
from contextlib import asynccontextmanager
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
from modules.shared.configuration import APP_CONFIG
from modules.shared.eventManagement import eventManager
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.system.registry import loadFeatureMainModules
class DailyRotatingFileHandler(RotatingFileHandler):
"""
A rotating file handler that automatically switches to a new file when the date changes.
The log file name includes the current date and switches at midnight.
"""
def __init__(
self, logDir, filenamePrefix, maxBytes=10485760, backupCount=5, **kwargs
):
self.logDir = logDir
self.filenamePrefix = filenamePrefix
self.currentDate = None
self.currentFile = None
# Initialize with today's file
self._updateFileIfNeeded()
# Call parent constructor with current file
super().__init__(
self.currentFile, maxBytes=maxBytes, backupCount=backupCount, **kwargs
)
def _updateFileIfNeeded(self):
"""Update the log file if the date has changed"""
# Guard against interpreter shutdown when datetime may be None
if datetime is None:
return False
today = datetime.now().strftime("%Y%m%d")
if self.currentDate != today:
self.currentDate = today
newFile = os.path.join(self.logDir, f"{self.filenamePrefix}_{today}.log")
if self.currentFile != newFile:
self.currentFile = newFile
return True
return False
def doRollover(self):
"""Size-based rollover that tolerates Windows file locks."""
try:
super().doRollover()
except PermissionError:
pass
def emit(self, record):
"""Emit a log record, switching files if date has changed"""
# Check if we need to switch to a new file
if self._updateFileIfNeeded():
# Close current file and open new one
if self.stream:
self.stream.close()
self.stream = None
# Update the baseFilename for the parent class
self.baseFilename = self.currentFile
# Reopen the stream
if not self.delay:
self.stream = self._open()
# Call parent emit method
super().emit(record)
def initLogging():
"""Initialize logging with configuration from APP_CONFIG"""
# Get log level from config (default to INFO if not found)
logLevelName = APP_CONFIG.get("APP_LOGGING_LOG_LEVEL", "WARNING")
logLevel = getattr(logging, logLevelName)
# Get log directory from config
logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
if not os.path.isabs(logDir):
# If relative path, make it relative to the gateway directory
gatewayDir = os.path.dirname(os.path.abspath(__file__))
logDir = os.path.join(gatewayDir, logDir)
# Ensure log directory exists
os.makedirs(logDir, exist_ok=True)
# Create formatters - using single line format
consoleFormatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"),
)
# File formatter with more detailed error information but still single line
fileFormatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(name)s - %(message)s - %(pathname)s:%(lineno)d - %(funcName)s",
datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"),
)
# Add filter to exclude Chrome DevTools requests
class ChromeDevToolsFilter(logging.Filter):
def filter(self, record):
return not (
isinstance(record.msg, str)
and (
".well-known/appspecific/com.chrome.devtools.json" in record.msg
or "Request: /index.html" in record.msg
)
)
# Add filter to exclude all httpcore loggers (including sub-loggers)
class HttpcoreStarFilter(logging.Filter):
def filter(self, record):
return not (
record.name == "httpcore" or record.name.startswith("httpcore.")
)
# Add filter to exclude HTTP debug messages
class HTTPDebugFilter(logging.Filter):
def filter(self, record):
if isinstance(record.msg, str):
# Filter out HTTP debug messages
http_debug_patterns = [
"receive_response_body.started",
"receive_response_body.complete",
"response_closed.started",
"_send_single_request",
"httpcore.http11",
"httpx._client",
"HTTP Request",
"multipart.multipart",
]
return not any(pattern in record.msg for pattern in http_debug_patterns)
return True
# Add filter to remove emojis from log messages to prevent Unicode encoding errors
class EmojiFilter(logging.Filter):
def filter(self, record):
if isinstance(record.msg, str):
# Remove only emojis, preserve other Unicode characters like quotes
# Guard against None characters during shutdown
try:
record.msg = "".join(
char
for char in record.msg
if char is not None and unicodedata.category(char) != "So"
or (char is not None and not (
0x1F600 <= ord(char) <= 0x1F64F
or 0x1F300 <= ord(char) <= 0x1F5FF
or 0x1F680 <= ord(char) <= 0x1F6FF
or 0x1F1E0 <= ord(char) <= 0x1F1FF
or 0x2600 <= ord(char) <= 0x26FF
or 0x2700 <= ord(char) <= 0x27BF
))
)
except (TypeError, AttributeError):
# Handle edge cases during shutdown
pass
return True
# Add filter to normalize problematic unicode (e.g., arrows) to ASCII for terminals like cp1252
class UnicodeArrowFilter(logging.Filter):
def filter(self, record):
if isinstance(record.msg, str):
translation_map = {
"\u2192": "->", # rightwards arrow
"\u2190": "<-", # leftwards arrow
"\u2194": "<->", # left right arrow
"\u21D2": "=>", # rightwards double arrow
"\u21D0": "<=", # leftwards double arrow
"\u21D4": "<=>", # left right double arrow
"\u00AB": "<<", # left-pointing double angle quotation mark
"\u00BB": ">>", # right-pointing double angle quotation mark
}
for u, ascii_eq in translation_map.items():
record.msg = record.msg.replace(u, ascii_eq)
return True
# Configure handlers based on config
handlers = []
# Add console handler if enabled
if APP_CONFIG.get("APP_LOGGING_CONSOLE_ENABLED", True):
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(consoleFormatter)
consoleHandler.addFilter(ChromeDevToolsFilter())
consoleHandler.addFilter(HttpcoreStarFilter())
consoleHandler.addFilter(HTTPDebugFilter())
consoleHandler.addFilter(EmojiFilter())
consoleHandler.addFilter(UnicodeArrowFilter())
handlers.append(consoleHandler)
# Add file handler if enabled
if APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", True):
# Create daily application log file with automatic date switching
rotationSize = int(
APP_CONFIG.get("APP_LOGGING_ROTATION_SIZE", 10485760)
) # Default: 10MB
backupCount = int(APP_CONFIG.get("APP_LOGGING_BACKUP_COUNT", 5))
fileHandler = DailyRotatingFileHandler(
logDir=logDir,
filenamePrefix="log_app",
maxBytes=rotationSize,
backupCount=backupCount,
encoding="utf-8",
)
fileHandler.setFormatter(fileFormatter)
fileHandler.addFilter(ChromeDevToolsFilter())
fileHandler.addFilter(HttpcoreStarFilter())
fileHandler.addFilter(HTTPDebugFilter())
fileHandler.addFilter(EmojiFilter())
fileHandler.addFilter(UnicodeArrowFilter())
handlers.append(fileHandler)
# Configure the root logger
logging.basicConfig(
level=logLevel,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"),
handlers=handlers,
force=True, # Force reconfiguration of the root logger
)
# Silence noisy third-party libraries - use the same level as the root logger
noisyLoggers = [
"httpx",
"httpcore",
"urllib3",
"asyncio",
"fastapi.security.oauth2",
"msal",
"azure.core.pipeline.policies.http_logging_policy",
"stripe",
"apscheduler",
]
for loggerName in noisyLoggers:
logging.getLogger(loggerName).setLevel(logging.WARNING)
# Log the current logging configuration
logger = logging.getLogger(__name__)
logger.info(f"Logging initialized with level {logLevelName}")
logger.info(f"Log directory: {logDir}")
if APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", True):
today = datetime.now().strftime("%Y%m%d")
appLogFile = os.path.join(logDir, f"log_app_{today}.log")
logger.info(f"Application log file: {appLogFile} (auto-switches daily)")
else:
logger.info("Application log file: disabled")
logger.info(
f"Console logging: {'enabled' if APP_CONFIG.get('APP_LOGGING_CONSOLE_ENABLED', True) else 'disabled'}"
)
def makeSqlalchemyDbUrl() -> str:
host = APP_CONFIG.get("SQLALCHEMY_DB_HOST", "localhost")
port = APP_CONFIG.get("SQLALCHEMY_DB_PORT", "5432")
db = APP_CONFIG.get("SQLALCHEMY_DB_DATABASE", "project_gateway")
user = APP_CONFIG.get("SQLALCHEMY_DB_USER", "postgres")
pwd = quote_plus(APP_CONFIG.get("SQLALCHEMY_DB_PASSWORD_SECRET", ""))
# On Windows, prefer asyncpg to avoid psycopg + ProactorEventLoop incompatibility
if sys.platform == "win32":
return f"postgresql+asyncpg://{user}:{pwd}@{host}:{port}/{db}"
return f"postgresql+psycopg://{user}:{pwd}@{host}:{port}/{db}"
# Initialize logging
initLogging()
logger = logging.getLogger(__name__)
instanceLabel = APP_CONFIG.get("APP_ENV_LABEL")
# Pre-warm AI connectors on process load (before lifespan). Critical for AI/agent latency.
try:
import modules.aicore.aicoreModelRegistry # noqa: F401
logger.info("AI connectors pre-warm (app load) triggered")
except Exception as e:
logging.getLogger(__name__).warning(f"AI pre-warm at app load failed: {e}")
# Define lifespan context manager for application startup/shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Application is starting up")
# Validate FK metadata on all Pydantic models (fail-fast, no silent fallbacks)
from modules.dbHelpers.fkRegistry import validateFkTargets
fkErrors = validateFkTargets()
if fkErrors:
for err in fkErrors:
logger.error("FK metadata validation: %s", err)
raise SystemExit(f"FK metadata validation failed ({len(fkErrors)} error(s)) — fix datamodels before starting")
# AI connectors already pre-warmed at module-load via _eager_prewarm() in aicoreModelRegistry.
# Bootstrap database if needed (creates initial users, mandates, roles, etc.)
# This must happen before getting root interface
from modules.security.rootAccess import getRootDbAppConnector
from modules.interfaces.interfaceBootstrap import initBootstrap
rootDb = getRootDbAppConnector()
try:
initBootstrap(rootDb)
logger.info("Bootstrap check completed")
except Exception as e:
logger.warning(f"Bootstrap check failed (may already be initialized): {str(e)}")
# Register all feature definitions in RBAC catalog (for /api/features/ endpoint)
try:
from modules.security.rbacCatalog import getCatalogService
from modules.system.registry import registerAllFeaturesInCatalog, syncCatalogFeaturesToDb
catalogService = getCatalogService()
registerAllFeaturesInCatalog(catalogService)
logger.info("Feature catalog registration completed")
# Register service center RBAC objects (Composition Root — avoids system→serviceCenter import)
try:
from modules.serviceCenter import registerServiceObjects
registerServiceObjects(catalogService)
except Exception as e:
logger.warning(f"Service center RBAC registration failed: {e}")
# Persist the in-memory feature registry into the Feature DB-table so
# the FeatureInstance.featureCode FK has real targets. Without this
# every FeatureInstance row would be flagged as orphan by the
# SysAdmin DB-health scan (cf. interfaceFeatures.upsertFeature).
try:
syncCatalogFeaturesToDb(catalogService)
except Exception as e:
logger.error(f"Feature DB sync failed: {e}")
except Exception as e:
logger.error(f"Feature catalog registration failed: {e}")
# Sync gateway i18n registry to DB and load translation cache
try:
from modules.system.i18nBootSync import syncRegistryToDb, loadCache
from modules.serviceCenter.registry import IMPORTABLE_SERVICES
serviceLabels = [svc.get("label") for svc in IMPORTABLE_SERVICES.values()]
accountingLabels = []
try:
from modules.features.trustee.accounting.accountingRegistry import getAccountingRegistry
registry = getAccountingRegistry()
for connectorType, connector in (registry._connectors or {}).items():
for field in connector.getRequiredConfigFields():
label = getattr(field, "label", "") or ""
if label:
accountingLabels.append({"label": label, "connectorType": connectorType})
except Exception:
pass
await syncRegistryToDb(serviceLabels=serviceLabels, accountingLabels=accountingLabels)
await loadCache()
logger.info("i18n registry sync + cache load completed")
except Exception as e:
logger.warning(f"i18n registry sync failed (non-critical): {e}")
# Pre-warm service center modules (avoids first-request import latency)
try:
from modules.serviceCenter import preWarm
preWarm()
logger.info("Service center pre-warm completed")
except Exception as e:
logger.warning(f"Service center pre-warm failed (non-critical): {e}")
# Get event user for feature lifecycle (system-level user for background operations)
rootInterface = getRootInterface()
eventUser = rootInterface.getUserByUsername("event")
if not eventUser:
logger.error("Could not get event user - some features may not start properly")
# --- Init Feature Containers (Plug&Play) ---
try:
mainModules = loadFeatureMainModules()
for featureName, module in mainModules.items():
if hasattr(module, "onStart"):
try:
await module.onStart(eventUser)
logger.info(f"Feature '{featureName}' started")
except Exception as e:
logger.error(f"Feature '{featureName}' failed to start: {e}")
except Exception as e:
logger.warning(f"Could not initialize feature containers: {e}")
# Bootstrap Stripe prices for paid plans (composition root — upward import allowed here)
try:
from modules.serviceCenter.services.serviceSubscription.stripeBootstrap import bootstrapStripePrices
bootstrapStripePrices()
except Exception as e:
logger.error(f"Stripe price bootstrap failed: {e}")
# Bootstrap MIME map into ComponentObjects (composition root — upward import allowed here)
try:
from modules.serviceCenter.services.serviceExtraction.subRegistry import ExtractorRegistry
from modules.interfaces.interfaceDbManagement import ComponentObjects
_mimeRegistry = ExtractorRegistry()
_extensionToMime = _mimeRegistry.getExtensionToMimeMap()
_textMimes: set = set()
_seen: set = set()
for _ext in _mimeRegistry._map.values():
_eid = id(_ext)
if _eid in _seen:
continue
_seen.add(_eid)
_mimes = _ext.getSupportedMimeTypes()
if any(m.startswith("text/") for m in _mimes):
_textMimes.update(_mimes)
_textMimes.update({"application/json", "application/xml", "application/javascript", "application/sql", "application/x-yaml", "application/x-toml"})
ComponentObjects.setMimeMap(_extensionToMime, _textMimes)
except Exception as e:
logger.warning(f"MIME map bootstrap failed: {e}")
# --- Init Managers ---
import asyncio
try:
main_loop = asyncio.get_running_loop()
eventManager.set_event_loop(main_loop)
from modules.workflowAutomation.scheduler.mainScheduler import setMainLoop as setSchedulerMainLoop, setOnRunFailedCallback
setSchedulerMainLoop(main_loop)
# Inject run-failed notification callback (Composition Root — avoids workflows→serviceCenter import)
def _onRunFailed(workflowId, runId, error, mandateId=None, workflowLabel=None):
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelMessaging import MessagingEventParameters
rootInterface = getRootInterface()
if not rootInterface:
return
eventUser = rootInterface.getUserByUsername("event")
if not eventUser:
return
ctx = ServiceCenterContext(
user=eventUser,
mandate_id=mandateId or "",
feature_instance_id="",
feature_code="workflowAutomation",
)
messagingService = getService("messaging", ctx)
subscriptionId = "WorkflowAutomationRunFailed"
eventParams = MessagingEventParameters(triggerData={
"workflowId": workflowId,
"workflowLabel": workflowLabel or workflowId,
"runId": runId,
"error": error,
"mandateId": mandateId or "",
})
messagingService.executeSubscription(subscriptionId, eventParams)
setOnRunFailedCallback(_onRunFailed)
# Suppress noisy ConnectionResetError from ProactorEventLoop on Windows
# when clients (browsers) close connections abruptly. This is a known
# asyncio issue on Windows: https://bugs.python.org/issue39010
def _suppressClientDisconnect(loop, ctx):
exc = ctx.get("exception")
if isinstance(exc, ConnectionResetError):
return
if isinstance(exc, ConnectionAbortedError):
return
loop.default_exception_handler(ctx)
main_loop.set_exception_handler(_suppressClientDisconnect)
except RuntimeError:
pass
eventManager.start()
# --- WorkflowAutomation: Scheduler boot (System-Lifespan, not Feature-onStart) ---
try:
from modules.workflowAutomation.scheduler.mainScheduler import start as _startWorkflowScheduler
_startWorkflowScheduler(eventUser)
logger.info("WorkflowAutomation scheduler started (system lifespan)")
except Exception as e:
logger.error(f"WorkflowAutomation scheduler failed to start: {e}")
# Register audit log cleanup scheduler
from modules.dbHelpers.auditLogger import registerAuditLogCleanupScheduler
registerAuditLogCleanupScheduler()
# Register enterprise subscription auto-renewal scheduler
from modules.serviceCenter.services.serviceSubscription.enterpriseRenewalScheduler import registerEnterpriseRenewalScheduler
registerEnterpriseRenewalScheduler()
# Recover background jobs that were RUNNING when the previous worker died
try:
from modules.serviceCenter.services.serviceBackgroundJobs.mainBackgroundJobService import (
recoverInterruptedJobs,
registerZombieKillerScheduler,
)
recoverInterruptedJobs()
registerZombieKillerScheduler(intervalMinutes=5)
except Exception as e:
logger.warning(f"BackgroundJob recovery failed (non-critical): {e}")
# Subscribe knowledge ingestion to connection lifecycle events so OAuth
# connect/disconnect reliably trigger bootstrap/purge.
try:
from modules.serviceCenter.services.serviceKnowledge.subConnectorIngestConsumer import (
registerKnowledgeIngestionConsumer,
)
registerKnowledgeIngestionConsumer()
# Side-effect import: registers all walker progress message keys
# in the i18n registry so `syncRegistryToDb` picks them up.
from modules.serviceCenter.services.serviceKnowledge import _progressMessages # noqa: F401
except Exception as e:
logger.warning(f"KnowledgeIngestionConsumer registration failed (non-critical): {e}")
# Install force-exit handler AFTER uvicorn has registered its own SIGINT
# handler. Uvicorn's default timeout-graceful-shutdown is None (wait
# forever), so frontend polling keep-alive connections block the process.
# This wraps uvicorn's handler: on Ctrl+C, start a 3s timer that calls
# os._exit() if the graceful shutdown hasn't completed by then.
import signal as _sig
import threading as _thr
_prevSigint = _sig.getsignal(_sig.SIGINT)
def _onSigint(signum, frame):
_t = _thr.Timer(3.0, lambda: os._exit(0))
_t.daemon = True
_t.start()
if callable(_prevSigint) and _prevSigint not in (_sig.SIG_DFL, _sig.SIG_IGN):
_prevSigint(signum, frame)
else:
raise KeyboardInterrupt
_sig.signal(_sig.SIGINT, _onSigint)
yield
# --- Shutdown sequence (protected against CancelledError) ---
try:
# 1. Drain SSE queues and cancel agent tasks FIRST so that open
# streaming connections break out of their queue.get() loop
# immediately. Without this, uvicorn waits for the SSE generators
# to finish (up to 120 s keepalive timeout) before the rest of
# the shutdown can proceed.
try:
from modules.shared.eventManager import get_event_manager as _getStreamingEM
_getStreamingEM().shutdown()
except Exception as e:
logger.warning(f"Streaming EventManager shutdown failed: {e}")
# 2. Signal DB layer to abort in-flight borrow waits immediately.
# This MUST happen early so that sync worker threads stuck in
# _acquireConn (30 s poll loop) bail out within one backoff tick
# instead of blocking process exit for the full borrow timeout.
try:
from modules.connectors.connectorDbPostgre import closeAllPools
closeAllPools()
except Exception as e:
logger.warning(f"Closing DB connection pools failed: {e}")
# 3. Stop scheduler (removes all pending cron/interval jobs)
eventManager.stop()
# 3.5 Stop WorkflowAutomation scheduler + email poller (System-Lifespan)
try:
from modules.workflowAutomation.scheduler.mainScheduler import stop as _stopWorkflowScheduler
_stopWorkflowScheduler()
except Exception as e:
logger.warning(f"WorkflowAutomation scheduler stop failed: {e}")
try:
from modules.workflowAutomation.scheduler.emailPoller import stop as _stopEmailPoller
_stopEmailPoller(eventUser)
except Exception as e:
logger.warning(f"Email poller stop failed: {e}")
# 4. Stop Feature Containers (Plug&Play)
try:
mainModules = loadFeatureMainModules()
for featureName, module in mainModules.items():
if hasattr(module, "onStop"):
try:
await module.onStop(eventUser)
logger.info(f"Feature '{featureName}' stopped")
except Exception as e:
logger.error(f"Feature '{featureName}' failed to stop: {e}")
except Exception as e:
logger.warning(f"Could not shutdown feature containers: {e}")
# 5. Close shared HTTP sessions (ResilientHttp) to avoid TCP keepalive hang
try:
from modules.shared.httpResilience import closeAllResilientHttp
await closeAllResilientHttp()
except Exception as e:
logger.warning(f"Closing HTTP sessions failed: {e}")
logger.info("Application has been shut down")
except asyncio.CancelledError:
logger.info("Shutdown interrupted (CancelledError) -- resources released")
# Custom function to generate readable operation IDs for Swagger UI
# Uses snake_case function names directly instead of auto-generated IDs
def _generateOperationId(route) -> str:
"""Generate operation ID from route function name (snake_case)."""
if hasattr(route, "endpoint") and hasattr(route.endpoint, "__name__"):
return route.endpoint.__name__
return route.name if route.name else "unknown"
# START APP
app = FastAPI(
title="PowerOn AG | Workflow Engine",
description=f"API for dynamic SaaS platforms ({instanceLabel})",
lifespan=lifespan,
swagger_ui_init_oauth={
"usePkceWithAuthorizationCodeGrant": True,
},
generate_unique_id_function=_generateOperationId,
)
# Configure OpenAPI security scheme for Swagger UI
# This adds the "Authorize" button to the /docs page
securityScheme = HTTPBearer()
app.openapi_schema = None # Reset schema to regenerate with security
def customOpenapi():
if app.openapi_schema:
return app.openapi_schema
from fastapi.openapi.utils import get_openapi
openapiSchema = get_openapi(
title=app.title,
version="1.0.0",
description=app.description,
routes=app.routes,
)
# Add security scheme definition
openapiSchema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Enter your JWT token (obtained from login endpoint or browser cookies)",
}
}
# Apply security globally to all endpoints
# Individual endpoints can override this if needed
openapiSchema["security"] = [{"BearerAuth": []}]
app.openapi_schema = openapiSchema
return app.openapi_schema
app.openapi = customOpenapi
# Parse CORS origins from environment variable
def getAllowedOrigins():
originsStr = APP_CONFIG.get("APP_ALLOWED_ORIGINS", "http://localhost:8080")
# Split by comma and strip whitespace
origins = [origin.strip() for origin in originsStr.split(",")]
logger.info(f"CORS allowed origins: {origins}")
return origins
# CORS origin regex pattern for wildcard subdomain support
# Matches all subdomains of poweron.swiss and poweron-center.net
CORS_ORIGIN_REGEX = r"https://.*\.(poweron\.swiss|poweron-center\.net)"
# SlowAPI rate limiter initialization
from modules.auth import limiter
from slowapi.errors import RateLimitExceeded
from slowapi import _rate_limit_exceeded_handler
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
async def _insufficientBalanceHandler(request: Request, exc: Exception):
"""HTTP 402 with structured billing hint."""
payload = exc.toClientDict() if hasattr(exc, "toClientDict") else {"error": "INSUFFICIENT_BALANCE", "message": str(exc)}
return JSONResponse(status_code=402, content={"detail": payload})
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import (
InsufficientBalanceException,
)
app.add_exception_handler(InsufficientBalanceException, _insufficientBalanceHandler)
# CSRF protection middleware
from modules.auth import CSRFMiddleware
from modules.auth import (
TokenRefreshMiddleware,
ProactiveTokenRefreshMiddleware,
)
# Per-request context middleware: language (Accept-Language) + user timezone (X-User-Timezone).
# Both are written into ContextVars and consumed by t() / resolveText() and getRequestNow()
# without having to thread them through every call site.
from modules.shared.i18nRegistry import setLanguage, normalizePrimaryLanguageTag
from modules.shared.timeUtils import setRequestTimezone
@app.middleware("http")
async def _requestContextMiddleware(request: Request, call_next):
acceptLang = request.headers.get("Accept-Language", "")
lang = normalizePrimaryLanguageTag(acceptLang, "de")
setLanguage(lang)
setRequestTimezone(request.headers.get("X-User-Timezone", ""))
return await call_next(request)
app.add_middleware(CSRFMiddleware)
# Token refresh middleware (silent refresh for expired OAuth tokens)
app.add_middleware(TokenRefreshMiddleware, enabled=True)
# Proactive token refresh middleware (refresh tokens before they expire)
app.add_middleware(
ProactiveTokenRefreshMiddleware, enabled=True, check_interval_minutes=5
)
# CORS must be registered LAST so it wraps the whole stack: every response (errors, CSRF 403,
# rate limits) still gets Access-Control-Allow-Origin for browser cross-origin calls.
app.add_middleware(
CORSMiddleware,
allow_origins=getAllowedOrigins(),
allow_origin_regex=CORS_ORIGIN_REGEX,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=["*"],
max_age=86400,
)
# Include all routers
from modules.routes.routeAdmin import router as generalRouter
app.include_router(generalRouter)
from modules.routes.routeAttributes import router as attributesRouter
app.include_router(attributesRouter)
from modules.routes.routeDataMandates import router as mandateRouter
app.include_router(mandateRouter)
from modules.routes.routeDataUsers import router as userRouter
app.include_router(userRouter)
from modules.routes.routeDataFiles import router as fileRouter
app.include_router(fileRouter)
from modules.routes.routeDataSources import router as dataSourceRouter
app.include_router(dataSourceRouter)
from modules.routes.routeUdb import router as udbRouter
app.include_router(udbRouter)
from modules.routes.routeDataPrompts import router as promptRouter
app.include_router(promptRouter)
from modules.routes.routeDataConnections import router as connectionsRouter
app.include_router(connectionsRouter)
from modules.routes.routeRagInventory import router as ragInventoryRouter
app.include_router(ragInventoryRouter)
from modules.routes.routeTableViews import router as tableViewsRouter
app.include_router(tableViewsRouter)
from modules.routes.routeSecurityLocal import router as localRouter
app.include_router(localRouter)
from modules.routes.routeMfa import router as mfaRouter
app.include_router(mfaRouter)
from modules.routes.routeSecurityMsft import router as msftRouter
app.include_router(msftRouter)
from modules.routes.routeSecurityGoogle import router as googleRouter
app.include_router(googleRouter)
from modules.routes.routeSecurityClickup import router as clickupRouter
app.include_router(clickupRouter)
from modules.routes.routeSecurityInfomaniak import router as infomaniakRouter
app.include_router(infomaniakRouter)
from modules.routes.routeClickup import router as clickupApiRouter
app.include_router(clickupApiRouter)
from modules.routes.routeVoiceGoogle import router as voiceGoogleRouter
app.include_router(voiceGoogleRouter)
from modules.routes.routeVoiceUser import router as voiceUserRouter
app.include_router(voiceUserRouter)
from modules.routes.routeSharepoint import router as sharepointRouter
app.include_router(sharepointRouter)
from modules.routes.routeAudit import router as auditRouter
app.include_router(auditRouter)
from modules.routes.routeAdminLogs import router as adminLogsRouter
app.include_router(adminLogsRouter)
from modules.routes.routeAdminRbacRules import router as rbacAdminRulesRouter
app.include_router(rbacAdminRulesRouter)
from modules.routes.routeAdminFeatures import router as featuresAdminRouter
app.include_router(featuresAdminRouter)
from modules.routes.routeStore import router as storeRouter
app.include_router(storeRouter)
from modules.routes.routeInvitations import router as invitationsRouter
app.include_router(invitationsRouter)
from modules.routes.routeNotifications import router as notificationsRouter
app.include_router(notificationsRouter)
from modules.routes.routeI18n import router as i18nRouter
app.include_router(i18nRouter)
from modules.routes.routeAdminUserAccessOverview import router as userAccessOverviewRouter
app.include_router(userAccessOverviewRouter)
from modules.routes.routeAdminDemoConfig import router as demoConfigRouter
app.include_router(demoConfigRouter)
from modules.routes.routeAdminDatabaseHealth import router as adminDatabaseHealthRouter
app.include_router(adminDatabaseHealthRouter)
from modules.routes.routeGdpr import router as gdprRouter
app.include_router(gdprRouter)
from modules.routes.routeBilling import router as billingRouter
app.include_router(billingRouter)
from modules.routes.routeSubscription import router as subscriptionRouter
app.include_router(subscriptionRouter)
from modules.routes.routeJobs import router as jobsRouter
app.include_router(jobsRouter)
# ============================================================================
# SYSTEM ROUTES (Navigation, etc.)
# ============================================================================
from modules.routes.routeSystem import router as systemRouter, navigationRouter
app.include_router(systemRouter)
app.include_router(navigationRouter)
from modules.routes.routeWorkflowAutomation import router as workflowAutomationRouter
app.include_router(workflowAutomationRouter)
# ============================================================================
# PLUG&PLAY FEATURE ROUTERS
# Dynamically load routers from feature containers in modules/features/
# ============================================================================
from modules.system.registry import loadFeatureRouters
featureLoadResults = loadFeatureRouters(app)
logger.info(f"Feature router load results: {featureLoadResults}")
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8000))
try:
import gunicorn.app.wsgiapp # type: ignore[import-untyped] # noqa: F401
import subprocess
import sys
subprocess.run([
sys.executable, "-m", "gunicorn", "app:app",
"--bind", f"0.0.0.0:{port}",
"--timeout", "600",
"--worker-class", "uvicorn.workers.UvicornWorker",
"--workers", "1",
], check=True)
except ImportError:
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=port, workers=1, timeout_graceful_shutdown=2)