# Copyright (c) 2026 PowerOn AG # All rights reserved. """ Short-lived signed tickets for OAuth data-connection popups. The UI authenticates API calls with a Bearer token in localStorage, but ``window.open(authUrl)`` cannot send that header. Cross-origin httpOnly cookies are unreliable in cross-origin setups (UI and API on different subdomains). Login popups work without a session because ``/auth/login`` is public; connect popups hit ``/auth/connect``, which used to require ``getCurrentUser``. Flow: POST ``/api/connections/{id}/connect`` (Bearer-authenticated) issues a ticket; the popup opens ``/auth/connect?connectTicket=...`` which validates the ticket instead of cookies. """ import time from typing import Any, Dict, Tuple from fastapi import HTTPException, status from jose import JWTError, jwt as jose_jwt from modules.auth.jwtService import ALGORITHM, SECRET_KEY from modules.datamodels.datamodelUam import AuthAuthority, User, UserConnection from modules.interfaces.interfaceDbApp import getInterface, getRootInterface from modules.shared.i18nRegistry import apiRouteContext _msg = apiRouteContext("oauthConnectTicket") _CONNECT_TICKET_TTL_SEC = 600 def issue_connect_ticket(flow: str, connection_id: str, user_id: str) -> str: """Issue a short-lived JWT for starting a data-connection OAuth popup.""" body = { "flow": flow, "connectionId": connection_id, "userId": str(user_id), "exp": int(time.time()) + _CONNECT_TICKET_TTL_SEC, } return jose_jwt.encode(body, SECRET_KEY, algorithm=ALGORITHM) def parse_connect_ticket(ticket: str, expected_flow: str) -> Dict[str, Any]: """Validate connect ticket signature, expiry, and flow.""" try: data = jose_jwt.decode(ticket, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=_msg("Invalid or expired connect ticket"), ) from e if data.get("flow") != expected_flow: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=_msg("Invalid connect ticket flow"), ) connection_id = data.get("connectionId") user_id = data.get("userId") if not connection_id or not user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=_msg("Incomplete connect ticket"), ) return data def resolve_connect_context( connect_ticket: str, connection_id: str, expected_flow: str, authority: AuthAuthority, ) -> Tuple[User, UserConnection]: """Validate ticket and return the user + connection for OAuth redirect.""" state = parse_connect_ticket(connect_ticket, expected_flow) if state.get("connectionId") != connection_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=_msg("Connection ID does not match connect ticket"), ) root = getRootInterface() user = root.getUser(state["userId"]) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=_msg("User not found"), ) interface = getInterface(user) connection = None for conn in interface.getUserConnections(user.id): if conn.id == connection_id and conn.authority == authority: connection = conn break if not connection: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=_msg("Connection not found"), ) return user, connection