72 lines
2.3 KiB
Python
72 lines
2.3 KiB
Python
"""
|
|
JWT Service
|
|
Centralizes local JWT creation and cookie helpers.
|
|
"""
|
|
|
|
from datetime import timedelta
|
|
from typing import Optional, Tuple
|
|
from fastapi import Response
|
|
from jose import jwt
|
|
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.shared.timezoneUtils import get_utc_now
|
|
|
|
# Config
|
|
SECRET_KEY = APP_CONFIG.get("APP_JWT_KEY_SECRET")
|
|
ALGORITHM = APP_CONFIG.get("Auth_ALGORITHM")
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = int(APP_CONFIG.get("APP_TOKEN_EXPIRY"))
|
|
REFRESH_TOKEN_EXPIRE_DAYS = int(APP_CONFIG.get("APP_REFRESH_TOKEN_EXPIRY", "7"))
|
|
|
|
|
|
def createAccessToken(data: dict, expiresDelta: Optional[timedelta] = None) -> Tuple[str, "datetime"]:
|
|
"""Create a JWT access token and return (token, expiresAt)."""
|
|
toEncode = data.copy()
|
|
if "jti" not in toEncode or not toEncode.get("jti"):
|
|
import uuid
|
|
toEncode["jti"] = str(uuid.uuid4())
|
|
|
|
expire = get_utc_now() + (expiresDelta if expiresDelta else timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
|
|
toEncode.update({"exp": expire})
|
|
encodedJwt = jwt.encode(toEncode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encodedJwt, expire
|
|
|
|
|
|
def createRefreshToken(data: dict) -> Tuple[str, "datetime"]:
|
|
"""Create a JWT refresh token and return (token, expiresAt)."""
|
|
toEncode = data.copy()
|
|
if "jti" not in toEncode or not toEncode.get("jti"):
|
|
import uuid
|
|
toEncode["jti"] = str(uuid.uuid4())
|
|
toEncode["type"] = "refresh"
|
|
|
|
expire = get_utc_now() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
toEncode.update({"exp": expire})
|
|
encodedJwt = jwt.encode(toEncode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encodedJwt, expire
|
|
|
|
|
|
def setAccessTokenCookie(response: Response, token: str, expiresDelta: Optional[timedelta] = None) -> None:
|
|
"""Set access token as httpOnly cookie."""
|
|
maxAge = int(expiresDelta.total_seconds()) if expiresDelta else ACCESS_TOKEN_EXPIRE_MINUTES * 60
|
|
response.set_cookie(
|
|
key="auth_token",
|
|
value=token,
|
|
httponly=True,
|
|
secure=True,
|
|
samesite="strict",
|
|
max_age=maxAge
|
|
)
|
|
|
|
|
|
def setRefreshTokenCookie(response: Response, token: str) -> None:
|
|
"""Set refresh token as httpOnly cookie."""
|
|
response.set_cookie(
|
|
key="refresh_token",
|
|
value=token,
|
|
httponly=True,
|
|
secure=True,
|
|
samesite="strict",
|
|
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
|
|
)
|
|
|
|
|