120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Short-lived signed tickets for OAuth data-connection popups.
|
|
|
|
The UI authenticates API calls with a Bearer token in localStorage, but
|
|
``window.open(authUrl)`` cannot send that header. Cross-origin httpOnly cookies
|
|
are unreliable in int/prod (UI on poweron-center.net, API on poweron.swiss).
|
|
Login popups work without a session because ``/auth/login`` is public; connect
|
|
popups hit ``/auth/connect``, which used to require ``getCurrentUser``.
|
|
|
|
Flow: POST ``/api/connections/{id}/connect`` (Bearer-authenticated) issues a
|
|
ticket; the popup opens ``/auth/connect?connectTicket=...`` which validates the
|
|
ticket instead of cookies.
|
|
"""
|
|
|
|
import time
|
|
from typing import Any, Dict, Tuple
|
|
|
|
from fastapi import HTTPException, status
|
|
from jose import JWTError, jwt as jose_jwt
|
|
|
|
from modules.auth.jwtService import ALGORITHM, SECRET_KEY
|
|
from modules.datamodels.datamodelUam import AuthAuthority, User, UserConnection
|
|
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
|
|
from modules.shared.i18nRegistry import apiRouteContext
|
|
|
|
_msg = apiRouteContext("oauthConnectTicket")
|
|
|
|
_CONNECT_TICKET_TTL_SEC = 600
|
|
|
|
# OAuth providers sometimes redirect to the API root if the app redirect URL omits the path.
|
|
OAUTH_FLOW_CALLBACK_PATHS: Dict[str, str] = {
|
|
"clickup_connect": "/api/clickup/auth/connect/callback",
|
|
"msft_connect": "/api/msft/auth/connect/callback",
|
|
"google_connect": "/api/google/auth/connect/callback",
|
|
}
|
|
|
|
|
|
def oauth_callback_redirect_path(state: str) -> str | None:
|
|
"""Map connect-ticket JWT (OAuth ``state`` param) to the correct callback route."""
|
|
try:
|
|
data = jose_jwt.decode(state, SECRET_KEY, algorithms=[ALGORITHM])
|
|
except JWTError:
|
|
return None
|
|
flow = data.get("flow")
|
|
if not isinstance(flow, str):
|
|
return None
|
|
return OAUTH_FLOW_CALLBACK_PATHS.get(flow)
|
|
|
|
|
|
def issue_connect_ticket(flow: str, connection_id: str, user_id: str) -> str:
|
|
"""Issue a short-lived JWT for starting a data-connection OAuth popup."""
|
|
body = {
|
|
"flow": flow,
|
|
"connectionId": connection_id,
|
|
"userId": str(user_id),
|
|
"exp": int(time.time()) + _CONNECT_TICKET_TTL_SEC,
|
|
}
|
|
return jose_jwt.encode(body, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def parse_connect_ticket(ticket: str, expected_flow: str) -> Dict[str, Any]:
|
|
"""Validate connect ticket signature, expiry, and flow."""
|
|
try:
|
|
data = jose_jwt.decode(ticket, SECRET_KEY, algorithms=[ALGORITHM])
|
|
except JWTError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=_msg("Invalid or expired connect ticket"),
|
|
) from e
|
|
if data.get("flow") != expected_flow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=_msg("Invalid connect ticket flow"),
|
|
)
|
|
connection_id = data.get("connectionId")
|
|
user_id = data.get("userId")
|
|
if not connection_id or not user_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=_msg("Incomplete connect ticket"),
|
|
)
|
|
return data
|
|
|
|
|
|
def resolve_connect_context(
|
|
connect_ticket: str,
|
|
connection_id: str,
|
|
expected_flow: str,
|
|
authority: AuthAuthority,
|
|
) -> Tuple[User, UserConnection]:
|
|
"""Validate ticket and return the user + connection for OAuth redirect."""
|
|
state = parse_connect_ticket(connect_ticket, expected_flow)
|
|
if state.get("connectionId") != connection_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=_msg("Connection ID does not match connect ticket"),
|
|
)
|
|
|
|
root = getRootInterface()
|
|
user = root.getUser(state["userId"])
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=_msg("User not found"),
|
|
)
|
|
|
|
interface = getInterface(user)
|
|
connection = None
|
|
for conn in interface.getUserConnections(user.id):
|
|
if conn.id == connection_id and conn.authority == authority:
|
|
connection = conn
|
|
break
|
|
if not connection:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=_msg("Connection not found"),
|
|
)
|
|
return user, connection
|