gateway/modules/routes/routeSecurityLocal.py
2025-05-27 14:32:08 +02:00

203 lines
6.6 KiB
Python

"""
Routes for local security and authentication.
"""
from fastapi import APIRouter, HTTPException, status, Depends, Request
from fastapi.security import OAuth2PasswordRequestForm
import logging
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from fastapi.responses import JSONResponse
# Import auth modules
from modules.security.auth import createAccessToken, getCurrentUser, limiter
from modules.interfaces.serviceAppClass import getInterface
from modules.interfaces.serviceAppModel import User, AuthAuthority
from modules.interfaces.serviceAppTokens import LocalToken, saveToken
# Configure logger
logger = logging.getLogger(__name__)
# Create router for Local Security endpoints
router = APIRouter(
prefix="/api/local",
tags=["Security Local"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
@router.post("/login")
@limiter.limit("5/minute")
async def login(
request: Request,
formData: OAuth2PasswordRequestForm = Depends(),
):
"""Get access token for local user authentication"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token")
if not csrf_token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing"
)
# Get gateway interface
appInterface = getInterface()
# Authenticate user
user = appInterface.authenticateLocalUser(
username=formData.username,
password=formData.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create token data
token_data = {
"sub": user.username,
"mandateId": str(user.mandateId),
"userId": str(user.id),
"authenticationAuthority": AuthAuthority.LOCAL
}
# Create access token
access_token, expires_at = createAccessToken(token_data)
if not access_token:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create access token"
)
# Save token data
token_data = {
"access_token": access_token,
"token_type": "bearer",
"expires_at": expires_at.timestamp()
}
saveToken(appInterface, "Local", token_data)
# Create response data
response_data = {
"type": "local_auth_success",
"access_token": access_token,
"token_data": token_data
}
return response_data
except ValueError as e:
# Handle authentication errors
error_msg = str(e)
logger.warning(f"Authentication failed for user {formData.username}: {error_msg}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_msg,
headers={"WWW-Authenticate": "Bearer"},
)
except Exception as e:
# Handle other errors
error_msg = f"Login failed: {str(e)}"
logger.error(f"Unexpected error during login for user {formData.username}: {error_msg}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=error_msg
)
@router.post("/register", response_model=User)
async def register_user(userData: User):
"""Register a new local user."""
try:
# Get gateway interface
appInterface = getInterface()
# Get default mandate ID
defaultMandateId = appInterface.getInitialId("mandates")
if not defaultMandateId:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="No default mandate found"
)
# Create user with default mandate
user = appInterface.createUser(
username=userData.username,
password=userData.password,
email=userData.email,
mandateId=defaultMandateId, # Use default mandate instead of userData.mandateId
authenticationAuthority=AuthAuthority.LOCAL
)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to register user"
)
return user
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Error registering user: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to register user: {str(e)}"
)
@router.get("/me", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def read_user_me(currentUser: Dict[str, Any] = Depends(getCurrentUser)):
"""Get current user information"""
return currentUser
@router.post("/logout")
@limiter.limit("30/minute")
async def logout(currentUser: Dict[str, Any] = Depends(getCurrentUser)):
"""Logout from local authentication"""
try:
# Get user interface
appInterface = getInterface()
# Revoke all sessions for the user
appInterface.revokeAllUserSessions(currentUser.get("id"))
return JSONResponse({
"message": "Successfully logged out"
})
except Exception as e:
logger.error(f"Error during logout: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Logout failed: {str(e)}"
)
@router.get("/available", response_model=Dict[str, Any])
async def check_username_availability(
username: str,
authenticationAuthority: str = "local"
):
"""Check if a username is available for registration"""
try:
interfaceRoot = getInterface()
return interfaceRoot.checkUsernameAvailability(username, authenticationAuthority)
except Exception as e:
logger.error(f"Error checking username availability: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to check username availability: {str(e)}"
)