platform-core/modules/auth/oauthConnectTicket.py
ValueOn AG fc6de11c37
Some checks failed
Deploy Plattform-Core (Int) / deploy (push) Blocked by required conditions
Deploy Plattform-Core (Int) / test (push) Has been cancelled
Remove poweron-center.net references, clean up demo tests
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-10 00:42:09 +02:00

101 lines
3.5 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""
Short-lived signed tickets for OAuth data-connection popups.
The UI authenticates API calls with a Bearer token in localStorage, but
``window.open(authUrl)`` cannot send that header. Cross-origin httpOnly cookies
are unreliable in cross-origin setups (UI and API on different subdomains).
Login popups work without a session because ``/auth/login`` is public; connect
popups hit ``/auth/connect``, which used to require ``getCurrentUser``.
Flow: POST ``/api/connections/{id}/connect`` (Bearer-authenticated) issues a
ticket; the popup opens ``/auth/connect?connectTicket=...`` which validates the
ticket instead of cookies.
"""
import time
from typing import Any, Dict, Tuple
from fastapi import HTTPException, status
from jose import JWTError, jwt as jose_jwt
from modules.auth.jwtService import ALGORITHM, SECRET_KEY
from modules.datamodels.datamodelUam import AuthAuthority, User, UserConnection
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
from modules.shared.i18nRegistry import apiRouteContext
_msg = apiRouteContext("oauthConnectTicket")
_CONNECT_TICKET_TTL_SEC = 600
def issue_connect_ticket(flow: str, connection_id: str, user_id: str) -> str:
"""Issue a short-lived JWT for starting a data-connection OAuth popup."""
body = {
"flow": flow,
"connectionId": connection_id,
"userId": str(user_id),
"exp": int(time.time()) + _CONNECT_TICKET_TTL_SEC,
}
return jose_jwt.encode(body, SECRET_KEY, algorithm=ALGORITHM)
def parse_connect_ticket(ticket: str, expected_flow: str) -> Dict[str, Any]:
"""Validate connect ticket signature, expiry, and flow."""
try:
data = jose_jwt.decode(ticket, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=_msg("Invalid or expired connect ticket"),
) from e
if data.get("flow") != expected_flow:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=_msg("Invalid connect ticket flow"),
)
connection_id = data.get("connectionId")
user_id = data.get("userId")
if not connection_id or not user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=_msg("Incomplete connect ticket"),
)
return data
def resolve_connect_context(
connect_ticket: str,
connection_id: str,
expected_flow: str,
authority: AuthAuthority,
) -> Tuple[User, UserConnection]:
"""Validate ticket and return the user + connection for OAuth redirect."""
state = parse_connect_ticket(connect_ticket, expected_flow)
if state.get("connectionId") != connection_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=_msg("Connection ID does not match connect ticket"),
)
root = getRootInterface()
user = root.getUser(state["userId"])
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=_msg("User not found"),
)
interface = getInterface(user)
connection = None
for conn in interface.getUserConnections(user.id):
if conn.id == connection_id and conn.authority == authority:
connection = conn
break
if not connection:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=_msg("Connection not found"),
)
return user, connection