# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Authentication module for backend API. Handles JWT-based authentication, token generation, and user context. """ from typing import Optional, Dict, Any, Tuple from fastapi import Depends, HTTPException, status, Request, Response from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt import logging from slowapi import Limiter from slowapi.util import get_remote_address from modules.shared.configuration import APP_CONFIG from modules.security.rootAccess import getRootDbAppConnector, getRootUser from modules.interfaces.interfaceDbAppObjects import getInterface from modules.datamodels.datamodelUam import User, AuthAuthority from modules.datamodels.datamodelSecurity import Token # Get Config Data SECRET_KEY = APP_CONFIG.get("APP_JWT_KEY_SECRET") ALGORITHM = APP_CONFIG.get("Auth_ALGORITHM") ACCESS_TOKEN_EXPIRE_MINUTES = int(APP_CONFIG.get("APP_TOKEN_EXPIRY")) REFRESH_TOKEN_EXPIRE_DAYS = int(APP_CONFIG.get("APP_REFRESH_TOKEN_EXPIRY", "7")) # Cookie-based Authentication Setup class CookieAuth(HTTPBearer): """Cookie-based authentication that checks httpOnly cookies first, then Authorization header""" def __init__(self, auto_error: bool = True): super().__init__(auto_error=auto_error) async def __call__(self, request: Request) -> Optional[str]: # 1. Check httpOnly cookie first (preferred method) token = request.cookies.get('auth_token') if token: return token # 2. Fallback to Authorization header for API calls authorization = request.headers.get("Authorization") if authorization and authorization.startswith("Bearer "): return authorization.split(" ")[1] if self.auto_error: raise HTTPException(status_code=401, detail="Not authenticated") return None # Initialize cookie-based auth cookieAuth = CookieAuth(auto_error=False) # Rate Limiter limiter = Limiter(key_func=get_remote_address) # Logger logger = logging.getLogger(__name__) # Note: JWT creation and cookie helpers moved to modules.auth.jwtService def _getUserBase(token: str = Depends(cookieAuth)) -> User: """ Extracts and validates the current user from the JWT token. Args: token: JWT Token from the Authorization header Returns: User model instance Raises: HTTPException: For invalid token or user """ credentialsException = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Guard: token may be None or malformed when cookie/header is missing or bad if not token or not isinstance(token, str): logger.warning("Missing JWT Token (no cookie/header)") raise credentialsException # Basic JWT format check (header.payload.signature) try: if token.count(".") != 2: logger.warning("Malformed JWT token format") raise credentialsException except Exception: # If anything odd happens while checking format, treat as invalid creds raise credentialsException try: # Decode token payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # Extract username from token username: str = payload.get("sub") if username is None: raise credentialsException # Extract mandate ID and user ID from token mandateId: str = payload.get("mandateId") userId: str = payload.get("userId") authority: str = payload.get("authenticationAuthority") tokenId: Optional[str] = payload.get("jti") sessionId: Optional[str] = payload.get("sid") or payload.get("sessionId") if not mandateId or not userId: logger.error(f"Missing context in token: mandateId={mandateId}, userId={userId}") raise credentialsException except JWTError: logger.warning("Invalid JWT Token") raise credentialsException # Get root user and interface for database access rootUser = getRootUser() appInterface = getInterface(rootUser) # Retrieve user from database user = appInterface.getUserByUsername(username) if user is None: logger.warning(f"User {username} not found") raise credentialsException # Check if user is enabled if not user.enabled: logger.warning(f"User {username} is disabled") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is disabled") # Ensure the user has the correct context if str(user.mandateId) != str(mandateId) or str(user.id) != str(userId): logger.error(f"User context mismatch: token(mandateId={mandateId}, userId={userId}) vs user(mandateId={user.mandateId}, id={user.id})") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User context has changed. Please log in again.", headers={"WWW-Authenticate": "Bearer"}, ) # For LOCAL gateway JWTs, enforce DB-backed token validity and revocation try: # Normalize authority to string for comparison normalized_authority = (str(authority).lower() if authority is not None else None) # If we have a token id, check if a corresponding DB token exists for local authority db_tokens = [] if tokenId: try: dbApp = getRootDbAppConnector() db_tokens = dbApp.getRecordset( Token, recordFilter={"id": tokenId} ) except Exception as e: # Check if this is a table not found error (token table was deleted) if "does not exist" in str(e).lower() or "relation" in str(e).lower(): logger.error("Token table does not exist - database may have been reset") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Authentication service temporarily unavailable. Please contact administrator." ) db_tokens = [] if db_tokens: # There is a server record for this token; enforce status and context when local db_token = db_tokens[0] token_authority = str(db_token.get("authority", "")).lower() if token_authority == str(AuthAuthority.LOCAL.value): # Must be active and match user/session/mandate active_token = appInterface.findActiveTokenById( tokenId=tokenId, userId=user.id, authority=AuthAuthority.LOCAL, sessionId=sessionId, mandateId=str(mandateId) if mandateId else None, ) if not active_token: logger.info( f"Local JWT db record not active/valid: jti={tokenId}, userId={user.id}, mandateId={mandateId}, sessionId={sessionId}" ) raise credentialsException else: # No DB record for this token. If the claim says local (or missing/unknown), require DB record. if normalized_authority in (None, "", str(AuthAuthority.LOCAL.value)): logger.info("Local JWT without server record or missing authority claim") raise credentialsException except HTTPException: raise except Exception as e: logger.error(f"Error during local token validation: {str(e)}") raise credentialsException return user def getCurrentUser(currentUser: User = Depends(_getUserBase)) -> User: """Get current active user with additional validation.""" # Check if current user is enabled if not currentUser.enabled: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User is disabled" ) return currentUser