# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Infomaniak OAuth for data connections (UserConnection + Token). Pure DATA_CONNECTION flow -- Infomaniak is NOT a login authority for PowerOn. """ from fastapi import APIRouter, HTTPException, Request, status, Depends, Query from fastapi.responses import HTMLResponse, RedirectResponse import logging import json import time from typing import Dict, Any from urllib.parse import urlencode import httpx from jose import jwt as jose_jwt from jose import JWTError from modules.shared.configuration import APP_CONFIG from modules.interfaces.interfaceDbApp import getInterface, getRootInterface from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection from modules.datamodels.datamodelSecurity import Token, TokenPurpose from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM from modules.auth.oauthProviderConfig import infomaniakDataScopes from modules.shared.timeUtils import createExpirationTimestamp, getUtcTimestamp, parseTimestamp from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeSecurityInfomaniak") logger = logging.getLogger(__name__) _FLOW_CONNECT = "infomaniak_connect" INFOMANIAK_AUTHORIZE_URL = "https://login.infomaniak.com/authorize" INFOMANIAK_TOKEN_URL = "https://login.infomaniak.com/token" INFOMANIAK_API_BASE = "https://api.infomaniak.com" CLIENT_ID = APP_CONFIG.get("Service_INFOMANIAK_DATA_CLIENT_ID") CLIENT_SECRET = APP_CONFIG.get("Service_INFOMANIAK_DATA_CLIENT_SECRET") REDIRECT_URI = APP_CONFIG.get("Service_INFOMANIAK_OAUTH_REDIRECT_URI") def _issue_oauth_state(claims: Dict[str, Any]) -> str: body = {**claims, "exp": int(time.time()) + 600} return jose_jwt.encode(body, SECRET_KEY, algorithm=ALGORITHM) def _parse_oauth_state(state: str) -> Dict[str, Any]: try: return jose_jwt.decode(state, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid OAuth state: {e}" ) from e def _require_infomaniak_config(): if not CLIENT_ID or not CLIENT_SECRET or not REDIRECT_URI: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg( "Infomaniak OAuth is not configured " "(Service_INFOMANIAK_DATA_CLIENT_ID, Service_INFOMANIAK_DATA_CLIENT_SECRET, " "Service_INFOMANIAK_OAUTH_REDIRECT_URI)" ), ) router = APIRouter( prefix="/api/infomaniak", tags=["Security Infomaniak"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 500: {"description": "Internal server error"}, }, ) @router.get("/auth/connect") @limiter.limit("5/minute") def auth_connect( request: Request, connectionId: str = Query(..., description="UserConnection id"), currentUser: User = Depends(getCurrentUser), ) -> RedirectResponse: """Start Infomaniak OAuth for an existing connection (requires gateway session).""" try: _require_infomaniak_config() interface = getInterface(currentUser) connections = interface.getUserConnections(currentUser.id) connection = None for conn in connections: if conn.id == connectionId and conn.authority == AuthAuthority.INFOMANIAK: connection = conn break if not connection: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=routeApiMsg("Infomaniak connection not found"), ) state_jwt = _issue_oauth_state( { "flow": _FLOW_CONNECT, "connectionId": connectionId, "userId": str(currentUser.id), } ) query = urlencode( { "client_id": CLIENT_ID, "response_type": "code", "access_type": "offline", "redirect_uri": REDIRECT_URI, "scope": " ".join(infomaniakDataScopes), "state": state_jwt, } ) auth_url = f"{INFOMANIAK_AUTHORIZE_URL}?{query}" return RedirectResponse(auth_url) except HTTPException: raise except Exception as e: logger.error(f"Error initiating Infomaniak connect: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to initiate Infomaniak connect: {str(e)}", ) @router.get("/auth/connect/callback") async def auth_connect_callback( code: str = Query(...), state: str = Query(...), ) -> HTMLResponse: """OAuth callback for Infomaniak data connection.""" state_data = _parse_oauth_state(state) if state_data.get("flow") != _FLOW_CONNECT: raise HTTPException( status_code=400, detail=routeApiMsg("Invalid OAuth flow for this callback") ) connection_id = state_data.get("connectionId") user_id = state_data.get("userId") if not connection_id or not user_id: raise HTTPException( status_code=400, detail=routeApiMsg("Missing connection or user in OAuth state") ) _require_infomaniak_config() async with httpx.AsyncClient() as client: token_resp = await client.post( INFOMANIAK_TOKEN_URL, data={ "grant_type": "authorization_code", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "code": code, "redirect_uri": REDIRECT_URI, }, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=30.0, ) if token_resp.status_code != 200: logger.error( f"Infomaniak token exchange failed: {token_resp.status_code} {token_resp.text}" ) return HTMLResponse( content=f"

Connection Failed

{token_resp.text}

", status_code=400, ) token_json = token_resp.json() access_token = token_json.get("access_token") refresh_token = token_json.get("refresh_token", "") expires_in = int(token_json.get("expires_in", 0)) granted_scopes = token_json.get("scope", "") if not access_token: return HTMLResponse( content="

Connection Failed

No access token.

", status_code=400, ) rootInterface = getRootInterface() if not refresh_token: try: existing_tokens = rootInterface.getTokensByConnectionIdAndAuthority( connection_id, AuthAuthority.INFOMANIAK ) if existing_tokens: existing_tokens.sort( key=lambda x: parseTimestamp(x.createdAt, default=0), reverse=True ) refresh_token = existing_tokens[0].tokenRefresh or "" except Exception: pass async with httpx.AsyncClient() as client: profile_resp = await client.get( f"{INFOMANIAK_API_BASE}/1/profile", headers={ "Authorization": f"Bearer {access_token}", "Accept": "application/json", }, timeout=30.0, ) if profile_resp.status_code != 200: logger.error( f"Infomaniak profile lookup failed: {profile_resp.status_code} {profile_resp.text}" ) return HTMLResponse( content="

Connection Failed

Could not load Infomaniak profile.

", status_code=400, ) profile_payload = profile_resp.json() profile = profile_payload.get("data") if isinstance(profile_payload, dict) else None profile = profile or {} user = rootInterface.getUser(user_id) if not user: return HTMLResponse( content=""" """, status_code=404, ) interface = getInterface(user) connections = interface.getUserConnections(user_id) connection = None for conn in connections: if conn.id == connection_id: connection = conn break if not connection: return HTMLResponse( content=""" """, status_code=404, ) ext_id = str(profile.get("id", "")) if profile.get("id") is not None else "" username = profile.get("login") or profile.get("email") or ext_id email = profile.get("email") expires_at = createExpirationTimestamp(expires_in) granted_scopes_list = ( granted_scopes if isinstance(granted_scopes, list) else (granted_scopes.split(" ") if granted_scopes else infomaniakDataScopes) ) try: connection.status = ConnectionStatus.ACTIVE connection.lastChecked = getUtcTimestamp() connection.expiresAt = expires_at connection.externalId = ext_id connection.externalUsername = username if email: connection.externalEmail = email connection.grantedScopes = granted_scopes_list rootInterface.db.recordModify(UserConnection, connection_id, connection.model_dump()) token = Token( userId=user.id, authority=AuthAuthority.INFOMANIAK, connectionId=connection_id, tokenPurpose=TokenPurpose.DATA_CONNECTION, tokenAccess=access_token, tokenRefresh=refresh_token, tokenType=token_json.get("token_type", "bearer"), expiresAt=expires_at, createdAt=getUtcTimestamp(), ) interface.saveConnectionToken(token) return HTMLResponse( content=f""" Connection Successful """ ) except Exception as e: logger.error(f"Error updating Infomaniak connection: {str(e)}", exc_info=True) return HTMLResponse( content=f""" """, status_code=500, )