""" Routes for local security and authentication. """ from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body from fastapi.security import OAuth2PasswordRequestForm import logging from typing import Dict, Any, Optional from datetime import datetime, timedelta from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse from pydantic import BaseModel # Import auth modules from modules.security.auth import createAccessToken, getCurrentUser, limiter from modules.interfaces.interfaceAppObjects import getInterface, getRootInterface from modules.interfaces.interfaceAppModel import User, UserInDB, AuthAuthority, UserPrivilege, Token from modules.shared.attributeUtils import ModelMixin # Configure logger logger = logging.getLogger(__name__) # Create router for Local Security endpoints router = APIRouter( prefix="/api/local", tags=["Security Local"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"} } ) @router.post("/login") @limiter.limit("5/minute") async def login( request: Request, formData: OAuth2PasswordRequestForm = Depends(), ) -> Dict[str, Any]: """Get access token for local user authentication""" try: # Validate CSRF token csrf_token = request.headers.get("X-CSRF-Token") if not csrf_token: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="CSRF token missing" ) # Get gateway interface with root privileges for authentication rootInterface = getRootInterface() # Get default mandate ID defaultMandateId = rootInterface.getInitialId("mandates") if not defaultMandateId: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="No default mandate found" ) # Set the mandate ID on the interface rootInterface.mandateId = defaultMandateId # Authenticate user user = rootInterface.authenticateLocalUser( username=formData.username, password=formData.password ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", headers={"WWW-Authenticate": "Bearer"}, ) # Create token data token_data = { "sub": user.username, "mandateId": str(user.mandateId), "userId": str(user.id), "authenticationAuthority": AuthAuthority.LOCAL } # Create access token access_token, expires_at = createAccessToken(token_data) if not access_token: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create access token" ) # Get user-specific interface for token operations userInterface = getInterface(user) # Create token token = Token( userId=user.id, authority=AuthAuthority.LOCAL, tokenAccess=access_token, tokenType="bearer", expiresAt=expires_at.timestamp() ) # Save access token userInterface.saveAccessToken(token) # Create response data response_data = { "type": "local_auth_success", "access_token": access_token, "token_data": token.dict() } return response_data except ValueError as e: # Handle authentication errors error_msg = str(e) logger.warning(f"Authentication failed for user {formData.username}: {error_msg}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=error_msg, headers={"WWW-Authenticate": "Bearer"}, ) except Exception as e: # Handle other errors error_msg = f"Login failed: {str(e)}" logger.error(f"Unexpected error during login for user {formData.username}: {error_msg}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_msg ) @router.post("/register", response_model=User) @limiter.limit("10/minute") async def register_user( request: Request, userData: User = Body(...), password: str = Body(..., embed=True) ) -> User: """Register a new local user.""" try: # Get gateway interface with root privileges since this is a public endpoint appInterface = getRootInterface() # Get default mandate ID defaultMandateId = appInterface.getInitialId("mandates") if not defaultMandateId: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="No default mandate found" ) # Set the mandate ID on the interface appInterface.mandateId = defaultMandateId # Create user with local authentication user = appInterface.createUser( username=userData.username, password=password, email=userData.email, fullName=userData.fullName, language=userData.language, enabled=userData.enabled, privilege=userData.privilege, authenticationAuthority=AuthAuthority.LOCAL ) if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Failed to register user" ) return user except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) except Exception as e: logger.error(f"Error registering user: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to register user: {str(e)}" ) @router.get("/me", response_model=User) @limiter.limit("30/minute") async def read_user_me( request: Request, currentUser: User = Depends(getCurrentUser) ) -> User: """Get current user info""" try: return currentUser except Exception as e: logger.error(f"Error getting user me: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get current user: {str(e)}" ) @router.post("/logout") @limiter.limit("30/minute") async def logout(request: Request, currentUser: User = Depends(getCurrentUser)) -> JSONResponse: """Logout from local authentication""" try: # Get user interface with current user context appInterface = getInterface(currentUser) # Note: JWT tokens are stateless, so no server-side cleanup needed # The client should discard the JWT token on logout return JSONResponse({ "message": "Successfully logged out" }) except Exception as e: logger.error(f"Error during logout: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Logout failed: {str(e)}" ) @router.get("/available") @limiter.limit("10/minute") async def check_username_availability( request: Request, username: str, authenticationAuthority: str = "local" ) -> Dict[str, Any]: """Check if a username is available for registration.""" try: # Get root interface appInterface = getRootInterface() # Use the interface's method to check availability result = appInterface.checkUsernameAvailability({ "username": username, "authenticationAuthority": authenticationAuthority }) return { "username": username, "authenticationAuthority": authenticationAuthority, "available": result["available"], "message": result["message"] } except Exception as e: logger.error(f"Error checking username availability: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to check username availability: {str(e)}" )