# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Infomaniak Personal-Access-Token onboarding for data connections. Infomaniak does NOT support OAuth scopes for kDrive/kSuite data access. The user must create a Personal Access Token (PAT) at https://manager.infomaniak.com/v3/ng/accounts/token/list with the API scopes: - ``accounts`` -> account discovery (REQUIRED for kDrive) - ``drive`` -> kDrive (active adapter) - ``workspace:calendar`` -> Calendar (active adapter) - ``workspace:contact`` -> Contacts (active adapter) - ``workspace:mail`` -> Mail (adapter pending; scope reserved) Validation strategy ------------------- The submit endpoint validates the PAT in three deterministic steps, each addressing exactly one scope: 1. ``resolveAccessibleAccountIds(pat)`` -> ``GET /1/accounts`` proves the ``accounts`` scope is on the PAT. Without this scope, kDrive cannot enumerate the owning account_ids (a standalone or free-tier kDrive lives on a *different* account_id than its kSuite counterpart, so the kSuite account_id from PIM is not enough). 2. ``resolveOwnerIdentity(pat)`` -> PIM Calendar (preferred) or PIM Contacts (fallback) yields the user's display name + their kSuite account_id, used purely for connection labelling. This also proves that at least one of ``workspace:calendar`` / ``workspace:contact`` is on the PAT (the connection would otherwise be blank in the UI). 3. ``GET /2/drive?account_id={firstAccountId}`` is the final scope probe -- 200 means the ``drive`` scope is present. 401/403 means the scope is missing. Mail has no separate probe: its scope is recorded in ``grantedScopes`` so a future adapter can pick it up without re-issuing the token. """ from fastapi import APIRouter, HTTPException, Request, status, Depends, Path, Body import logging from typing import Dict, Any import hashlib import httpx from modules.interfaces.interfaceDbApp import getInterface from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection from modules.datamodels.datamodelSecurity import Token, TokenPurpose from modules.auth import getCurrentUser, limiter from modules.shared.timeUtils import getUtcTimestamp, createExpirationTimestamp from modules.shared.i18nRegistry import apiRouteContext from modules.connectors.providerInfomaniak.connectorInfomaniak import ( resolveOwnerIdentity, resolveAccessibleAccountIds, InfomaniakIdentityError, ) routeApiMsg = apiRouteContext("routeSecurityInfomaniak") logger = logging.getLogger(__name__) INFOMANIAK_API_BASE = "https://api.infomaniak.com" # Infomaniak PATs do not expire unless the user sets an explicit lifetime in # the Manager (up to 30 years). We persist a 10-year horizon so the central # tokenStatus helper does not flag the connection as "no token". Mirrors # ClickUp. _INFOMANIAK_TOKEN_EXPIRES_IN_SEC = 10 * 365 * 24 * 3600 router = APIRouter( prefix="/api/infomaniak", tags=["Security Infomaniak"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 500: {"description": "Internal server error"}, }, ) async def _probeDriveScope(client: httpx.AsyncClient, pat: str, accountId: int) -> None: """Confirm the ``drive`` scope is on the PAT. Issues ``GET /2/drive?account_id={accountId}`` -- a clean 200 means both the ``drive`` scope is present and the resolved ``account_id`` is correct. 401/403 means the scope is missing; anything else means Infomaniak is misbehaving and we refuse to persist. """ url = f"{INFOMANIAK_API_BASE}/2/drive?account_id={accountId}" try: resp = await client.get( url, headers={"Authorization": f"Bearer {pat}", "Accept": "application/json"}, ) except httpx.HTTPError as e: logger.error(f"Infomaniak drive-probe network error ({url}): {e}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=routeApiMsg("Could not reach Infomaniak to validate the token"), ) if resp.status_code == 200: return if resp.status_code in (401, 403): logger.warning( f"Infomaniak drive-probe rejected PAT ({url}): " f"{resp.status_code} {resp.text[:200]}" ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg( "Token rejected by Infomaniak (missing scope 'drive'). " "Required scopes: 'drive' (kDrive) and 'workspace:calendar' " "(or 'workspace:contact'). Recommended for upcoming " "services: 'workspace:mail'." ), ) logger.error( f"Infomaniak drive-probe unexpected response ({url}): " f"{resp.status_code} {resp.text[:200]}" ) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=routeApiMsg( "Infomaniak drive-probe returned an unexpected response." ), ) @router.post("/connections/{connectionId}/token") @limiter.limit("10/minute") async def submit_infomaniak_token( request: Request, connectionId: str = Path(..., description="UserConnection id"), body: Dict[str, Any] = Body(..., description="{ 'token': '' }"), currentUser: User = Depends(getCurrentUser), ) -> Dict[str, Any]: """Validate and persist an Infomaniak Personal Access Token (PAT). Body: { "token": "" } Validation order (all three must succeed before persisting): 1. ``resolveAccessibleAccountIds(pat)`` -> proves the ``accounts`` scope is on the PAT (required for kDrive account discovery). 2. ``resolveOwnerIdentity(pat)`` -> display name + kSuite account_id for the connection UI label. 3. ``/2/drive?account_id=`` -> proves the ``drive`` scope is on the PAT. No data derived from the PAT is stored as adapter state -- both account list and owner identity are re-resolved lazily by the adapters at request time. """ pat = (body or {}).get("token") if not isinstance(pat, str) or not pat.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("Missing 'token' in request body"), ) pat = pat.strip() interface = getInterface(currentUser) connection = None for conn in interface.getUserConnections(currentUser.id): if conn.id == connectionId and conn.authority == AuthAuthority.INFOMANIAK: connection = conn break if not connection: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=routeApiMsg("Infomaniak connection not found"), ) try: accountIds = await resolveAccessibleAccountIds(pat) except InfomaniakIdentityError as e: logger.warning( f"Infomaniak token submit for connection {connectionId} could not " f"list accounts: {e}" ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg( "Token rejected by Infomaniak (missing scope 'accounts'). " "kDrive needs the 'accounts' scope to discover the owning " "Infomaniak account. Required scopes: 'accounts', 'drive', " "'workspace:calendar', 'workspace:contact'." ), ) try: identity = await resolveOwnerIdentity(pat) except InfomaniakIdentityError as e: logger.warning( f"Infomaniak token submit for connection {connectionId} could not " f"resolve owner identity: {e}" ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg( "Could not derive your Infomaniak account from the token. " "Please ensure the PAT carries 'workspace:calendar' or " "'workspace:contact' so we can identify your account." ), ) async with httpx.AsyncClient(timeout=15.0, follow_redirects=False) as client: await _probeDriveScope(client, pat, accountIds[0]) tokenFingerprint = "pat-" + hashlib.sha256(pat.encode("utf-8")).hexdigest()[:8] username = identity["displayName"] or f"infomaniak-{tokenFingerprint}" expiresAt = createExpirationTimestamp(_INFOMANIAK_TOKEN_EXPIRES_IN_SEC) try: connection.status = ConnectionStatus.ACTIVE connection.lastChecked = getUtcTimestamp() connection.expiresAt = expiresAt connection.externalId = str(identity["accountId"]) connection.externalUsername = username connection.grantedScopes = [ "accounts", "drive", "workspace:mail", "workspace:calendar", "workspace:contact", ] interface.db.recordModify(UserConnection, connectionId, connection.model_dump()) token = Token( userId=currentUser.id, authority=AuthAuthority.INFOMANIAK, connectionId=connectionId, tokenPurpose=TokenPurpose.DATA_CONNECTION, tokenAccess=pat, tokenRefresh=None, tokenType="bearer", expiresAt=expiresAt, createdAt=getUtcTimestamp(), ) interface.saveConnectionToken(token) logger.info( f"Infomaniak PAT stored for connection {connectionId} " f"(user {currentUser.id}, externalUsername={username}, " f"kSuiteAccountId={identity['accountId']}, " f"accessibleAccounts={accountIds})" ) return { "id": connection.id, "status": "connected", "type": "infomaniak", "externalUsername": username, "externalEmail": None, "lastChecked": connection.lastChecked, } except HTTPException: raise except Exception as e: logger.error( f"Error persisting Infomaniak token for connection {connectionId}: {e}", exc_info=True, ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Failed to store Infomaniak token"), )