""" Routes for local security and authentication. """ from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body from fastapi.security import OAuth2PasswordRequestForm import logging from typing import Dict, Any from datetime import datetime from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse import uuid from jose import jwt # Import auth modules from modules.security.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM from modules.security.jwtService import createAccessToken, createRefreshToken, setAccessTokenCookie, setRefreshTokenCookie, clearAccessTokenCookie, clearRefreshTokenCookie from modules.interfaces.interfaceDbAppObjects import getInterface, getRootInterface from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, UserPrivilege from modules.datamodels.datamodelSecurity import Token # Configure logger logger = logging.getLogger(__name__) # Create router for Local Security endpoints router = APIRouter( prefix="/api/local", tags=["Security Local"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"} } ) @router.post("/login") @limiter.limit("30/minute") async def login( request: Request, response: Response, formData: OAuth2PasswordRequestForm = Depends(), ) -> Dict[str, Any]: """Get access token for local user authentication""" try: # Validate CSRF token csrf_token = request.headers.get("X-CSRF-Token") if not csrf_token: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="CSRF token missing" ) # Get gateway interface with root privileges for authentication rootInterface = getRootInterface() # Get default mandate ID from modules.datamodels.datamodelUam import Mandate defaultMandateId = rootInterface.getInitialId(Mandate) if not defaultMandateId: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="No default mandate found" ) # Set the mandate ID on the interface rootInterface.mandateId = defaultMandateId # Authenticate user user = rootInterface.authenticateLocalUser( username=formData.username, password=formData.password ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", headers={"WWW-Authenticate": "Bearer"}, ) # Create token data token_data = { "sub": user.username, "mandateId": str(user.mandateId), "userId": str(user.id), "authenticationAuthority": AuthAuthority.LOCAL } # Create session id and include in token claims for session-scoped logout session_id = str(uuid.uuid4()) token_data["sid"] = session_id # Create access token + set cookie access_token, _access_expires = createAccessToken(token_data) setAccessTokenCookie(response, access_token) # Create refresh token + set cookie refresh_token, _refresh_expires = createRefreshToken(token_data) setRefreshTokenCookie(response, refresh_token) # Get expiration time for response try: payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM]) expires_at = datetime.fromtimestamp(payload.get("exp")) except Exception as e: logger.error(f"Failed to decode access token: {str(e)}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to finalize token") # Get user-specific interface for token operations userInterface = getInterface(user) # Get jti from already decoded payload jti = payload.get("jti") # Create token token = Token( id=jti, userId=user.id, authority=AuthAuthority.LOCAL, tokenAccess=access_token, tokenType="bearer", expiresAt=expires_at.timestamp(), sessionId=session_id, mandateId=str(user.mandateId) ) # Save access token userInterface.saveAccessToken(token) # Log successful login try: from modules.shared.auditLogger import audit_logger audit_logger.logUserAccess( userId=str(user.id), mandateId=str(user.mandateId), action="login", successInfo="local_auth_success" ) except Exception: # Don't fail if audit logging fails pass # Create response data (tokens are now in httpOnly cookies) response_data = { "type": "local_auth_success", "message": "Login successful - tokens set in httpOnly cookies", "authenticationAuthority": "local", "expires_at": expires_at.isoformat() } return response_data except ValueError as e: # Handle authentication errors error_msg = str(e) logger.warning(f"Authentication failed for user {formData.username}: {error_msg}") # Check if user is disabled and provide specific message if error_msg == "User is disabled": error_msg = "Your account is disabled. Please send an email to p.motsch@valueon.ch to get access to the PowerOn center." # Log failed login attempt try: from modules.shared.auditLogger import audit_logger audit_logger.logUserAccess( userId="unknown", mandateId="unknown", action="login", successInfo=f"failed: {error_msg}" ) except Exception: # Don't fail if audit logging fails pass raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=error_msg, headers={"WWW-Authenticate": "Bearer"}, ) except Exception as e: # Handle other errors error_msg = f"Login failed: {str(e)}" logger.error(f"Unexpected error during login for user {formData.username}: {error_msg}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_msg ) @router.post("/register", response_model=User) @limiter.limit("10/minute") async def register_user( request: Request, userData: User = Body(...), password: str = Body(..., embed=True) ) -> User: """Register a new local user.""" try: # Get gateway interface with root privileges since this is a public endpoint appInterface = getRootInterface() # Get default mandate ID from modules.datamodels.datamodelUam import Mandate defaultMandateId = appInterface.getInitialId(Mandate) if not defaultMandateId: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="No default mandate found" ) # Set the mandate ID on the interface appInterface.mandateId = defaultMandateId # Create user with local authentication # Set safe default privilege level for new registrations # New users are disabled by default and require admin approval from modules.datamodels.datamodelUam import UserPrivilege user = appInterface.createUser( username=userData.username, password=password, email=userData.email, fullName=userData.fullName, language=userData.language, enabled=False, # New users are disabled by default privilege=UserPrivilege.USER, # Always set to USER for new registrations authenticationAuthority=AuthAuthority.LOCAL ) if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Failed to register user" ) return user except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) except Exception as e: logger.error(f"Error registering user: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to register user: {str(e)}" ) @router.get("/me", response_model=User) @limiter.limit("30/minute") async def read_user_me( request: Request, currentUser: User = Depends(getCurrentUser) ) -> User: """Get current user info""" try: return currentUser except Exception as e: logger.error(f"Error getting user me: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get current user: {str(e)}" ) @router.post("/refresh") @limiter.limit("60/minute") async def refresh_token( request: Request, response: Response ) -> Dict[str, Any]: """Refresh access token using refresh token from cookie""" try: # Get refresh token from cookie refresh_token = request.cookies.get('refresh_token') if not refresh_token: raise HTTPException(status_code=401, detail="No refresh token found") # Validate refresh token try: payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM]) if payload.get("type") != "refresh": raise HTTPException(status_code=401, detail="Invalid refresh token type") except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Refresh token expired") except jwt.JWTError: raise HTTPException(status_code=401, detail="Invalid refresh token") # Get user information from refresh token payload user_id = payload.get("userId") if not user_id: raise HTTPException(status_code=401, detail="Invalid refresh token - missing user ID") # Get user from database using the user ID from refresh token try: app_interface = getRootInterface() current_user = app_interface.getUser(user_id) if not current_user: raise HTTPException(status_code=401, detail="User not found") except Exception as e: logger.error(f"Failed to get user from database: {str(e)}") raise HTTPException(status_code=500, detail="Failed to validate user") # Create new token data token_data = { "sub": current_user.username, "mandateId": str(current_user.mandateId), "userId": str(current_user.id), "authenticationAuthority": current_user.authenticationAuthority } # Create new access token + set cookie access_token, _expires = createAccessToken(token_data) setAccessTokenCookie(response, access_token) # Get expiration time try: payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM]) expires_at = datetime.fromtimestamp(payload.get("exp")) except Exception as e: logger.error(f"Failed to decode new access token: {str(e)}") raise HTTPException(status_code=500, detail="Failed to create new token") return { "type": "token_refresh_success", "message": "Token refreshed successfully", "expires_at": expires_at.isoformat() } except HTTPException as e: # If it's a 503 error (service unavailable due to missing token table), return it as-is if e.status_code == 503: raise # For other HTTP exceptions, re-raise them raise except Exception as e: logger.error(f"Token refresh error: {str(e)}") raise HTTPException(status_code=500, detail="Token refresh failed") @router.post("/logout") @limiter.limit("30/minute") async def logout(request: Request, response: Response, currentUser: User = Depends(getCurrentUser)) -> JSONResponse: """Logout from local authentication""" try: # Get user interface with current user context appInterface = getInterface(currentUser) # Get token from cookie or Authorization header token = request.cookies.get('auth_token') if not token: auth_header = request.headers.get("Authorization") if auth_header and auth_header.lower().startswith("bearer "): token = auth_header.split(" ", 1)[1].strip() if not token: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No token found") try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) session_id = payload.get("sid") or payload.get("sessionId") jti = payload.get("jti") except Exception as e: logger.error(f"Failed to decode JWT on logout: {str(e)}") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token") revoked = 0 if session_id: revoked = appInterface.revokeTokensBySessionId(session_id, currentUser.id, AuthAuthority.LOCAL, revokedBy=currentUser.id, reason="logout") elif jti: appInterface.revokeTokenById(jti, revokedBy=currentUser.id, reason="logout") revoked = 1 # Log successful logout try: from modules.shared.auditLogger import audit_logger audit_logger.logUserAccess( userId=str(currentUser.id), mandateId=str(currentUser.mandateId), action="logout", successInfo=f"revoked_tokens: {revoked}" ) except Exception: # Don't fail if audit logging fails pass # Create the JSON response first json_response = JSONResponse({ "message": "Successfully logged out - cookies cleared", "revokedTokens": revoked }) # Clear httpOnly cookies on the response we're actually returning clearAccessTokenCookie(json_response) clearRefreshTokenCookie(json_response) return json_response except Exception as e: logger.error(f"Error during logout: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Logout failed: {str(e)}" ) @router.get("/available") @limiter.limit("10/minute") async def check_username_availability( request: Request, username: str, authenticationAuthority: str = "local" ) -> Dict[str, Any]: """Check if a username is available for registration.""" try: # Get root interface appInterface = getRootInterface() # Use the interface's method to check availability result = appInterface.checkUsernameAvailability({ "username": username, "authenticationAuthority": authenticationAuthority }) return { "username": username, "authenticationAuthority": authenticationAuthority, "available": result["available"], "message": result["message"] } except Exception as e: logger.error(f"Error checking username availability: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to check username availability: {str(e)}" )