""" Authentication module for backend API. Handles JWT-based authentication, token generation, and user context. """ from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Any, Tuple from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt import logging from modules.interfaces.gatewayInterface import getGatewayInterface from modules.shared.configuration import APP_CONFIG # Get Config Data SECRET_KEY = APP_CONFIG.get("APP_JWT_SECRET_SECRET") ALGORITHM = APP_CONFIG.get("Auth_ALGORITHM") ACCESS_TOKEN_EXPIRE_MINUTES = int(APP_CONFIG.get("APP_TOKEN_EXPIRY")) # OAuth2 Setup oauth2Scheme = OAuth2PasswordBearer(tokenUrl="token") # Logger logger = logging.getLogger(__name__) def createAccessToken(data: dict, expiresDelta: Optional[timedelta] = None) -> str: """ Creates a JWT Access Token. Args: data: Data to encode (usually user ID or username) expiresDelta: Validity duration of the token (optional) Returns: JWT Token as string """ toEncode = data.copy() if expiresDelta: expire = datetime.now(timezone.utc) + expiresDelta else: expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) toEncode.update({"exp": expire}) encodedJwt = jwt.encode(toEncode, SECRET_KEY, algorithm=ALGORITHM) return encodedJwt async def getCurrentUser(token: str = Depends(oauth2Scheme)) -> Dict[str, Any]: """ Extracts and validates the current user from the JWT token. Args: token: JWT Token from the Authorization header Returns: User data Raises: HTTPException: For invalid token or user """ credentialsException = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # Decode token payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # Extract username from token username: str = payload.get("sub") if username is None: raise credentialsException # Extract mandate ID from token (if present) mandateId: int = payload.get("mandateId", 1) # Default: Root mandate except JWTError: logger.warning("Invalid JWT Token") raise credentialsException # Initialize Gateway Interface without context gateway = getGatewayInterface() # Retrieve user from database user = gateway.getUserByUsername(username) if user is None: logger.warning(f"User {username} not found") raise credentialsException if user.get("disabled", False): logger.warning(f"User {username} is disabled") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is disabled") return user async def getCurrentActiveUser(currentUser: Dict[str, Any] = Depends(getCurrentUser)) -> Dict[str, Any]: """ Ensures that the user is active. Args: currentUser: Current user data Returns: User data Raises: HTTPException: If the user is disabled """ if currentUser.get("disabled", False): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is disabled") return currentUser async def getUserContext(currentUser: Dict[str, Any]) -> Tuple[int, int]: """ Extracts the mandate ID and user ID from the current user. Enhanced with better logging. Args: currentUser: The current user Returns: Tuple of (mandateId, userId) """ # Default values defaultMandateId = 0 defaultUserId = 0 # Extract mandateId mandateId = currentUser.get("mandateId", None) if mandateId is None: logger.warning(f"No mandateId found in currentUser, using default: {defaultMandateId}") mandateId = defaultMandateId else: try: mandateId = int(mandateId) except (ValueError, TypeError): logger.error(f"Invalid mandateId value: {mandateId}, using default: {defaultMandateId}") mandateId = defaultMandateId # Extract userId userId = currentUser.get("id", None) if userId is None: logger.warning(f"No userId found in currentUser, using default: {defaultUserId}") userId = defaultUserId else: try: userId = int(userId) except (ValueError, TypeError): logger.error(f"Invalid userId value: {userId}, using default: {defaultUserId}") userId = defaultUserId return mandateId, userId