# Copyright (c) 2026 PowerOn AG # All rights reserved. """ Routes for Google authentication — split Auth app vs Data app. See wiki: concepts/OAuth-Auth-vs-Data-Connection-Konzept.md """ from fastapi import APIRouter, HTTPException, Request, Response, status, Depends, Query from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse import logging import json import time import uuid from typing import Dict, Any, Optional from requests_oauthlib import OAuth2Session import httpx from jose import jwt as jose_jwt from jose import JWTError from modules.shared.configuration import APP_CONFIG from modules.interfaces.interfaceDbApp import getInterface, getRootInterface from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection from modules.datamodels.datamodelSecurity import Token, TokenPurpose from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM from modules.auth.oauthConnectTicket import resolve_connect_context from modules.auth import ( createAccessToken, setAccessTokenCookie, createRefreshToken, setRefreshTokenCookie, clearAccessTokenCookie, clearRefreshTokenCookie, ) from modules.auth.tokenManager import TokenManager from modules.auth.oauthProviderConfig import googleAuthScopes, googleDataScopes from modules.shared.timeUtils import createExpirationTimestamp, getUtcTimestamp, parseTimestamp from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeSecurityGoogle") logger = logging.getLogger(__name__) _FLOW_LOGIN = "google_login" _FLOW_CONNECT = "google_connect" async def verify_google_token(access_token: str) -> Dict[str, Any]: """Verify Google access token and return token info including scopes.""" try: headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", } async with httpx.AsyncClient() as client: response = await client.get( "https://www.googleapis.com/oauth2/v1/tokeninfo", headers=headers, params={"access_token": access_token}, ) if response.status_code == 200: token_info = response.json() return { "valid": True, "token_info": token_info, "scopes": token_info.get("scope", "").split(" ") if token_info.get("scope") else [], "expires_in": int(token_info.get("expires_in", 0)), "user_id": token_info.get("user_id"), "email": token_info.get("email"), } return { "valid": False, "error": f"HTTP {response.status_code}", "details": response.text, } except Exception as e: logger.error(f"Error verifying Google token: {str(e)}") return {"valid": False, "error": str(e)} def _issue_oauth_state(claims: Dict[str, Any]) -> str: body = {**claims, "exp": int(time.time()) + 600} return jose_jwt.encode(body, SECRET_KEY, algorithm=ALGORITHM) def _parse_oauth_state(state: str) -> Dict[str, Any]: try: return jose_jwt.decode(state, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid OAuth state: {e}" ) from e router = APIRouter( prefix="/api/google", tags=["Security Google"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"}, }, ) AUTH_CLIENT_ID = APP_CONFIG.get("Service_GOOGLE_AUTH_CLIENT_ID") AUTH_CLIENT_SECRET = APP_CONFIG.get("Service_GOOGLE_AUTH_CLIENT_SECRET") AUTH_REDIRECT_URI = APP_CONFIG.get("Service_GOOGLE_AUTH_REDIRECT_URI") DATA_CLIENT_ID = APP_CONFIG.get("Service_GOOGLE_DATA_CLIENT_ID") DATA_CLIENT_SECRET = APP_CONFIG.get("Service_GOOGLE_DATA_CLIENT_SECRET") DATA_REDIRECT_URI = APP_CONFIG.get("Service_GOOGLE_DATA_REDIRECT_URI") def _require_google_auth_config(): if not AUTH_CLIENT_ID or not AUTH_CLIENT_SECRET or not AUTH_REDIRECT_URI: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Google Auth OAuth is not configured (Service_GOOGLE_AUTH_*)"), ) def _require_google_data_config(): if not DATA_CLIENT_ID or not DATA_CLIENT_SECRET or not DATA_REDIRECT_URI: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Google Data OAuth is not configured (Service_GOOGLE_DATA_*)"), ) @router.get("/auth/login") @limiter.limit("5/minute") def auth_login(request: Request) -> RedirectResponse: """Start Google login (Auth app — minimal scopes).""" try: _require_google_auth_config() state_jwt = _issue_oauth_state({"flow": _FLOW_LOGIN}) oauth = OAuth2Session( client_id=AUTH_CLIENT_ID, redirect_uri=AUTH_REDIRECT_URI, scope=googleAuthScopes, ) auth_url, _ = oauth.authorization_url( "https://accounts.google.com/o/oauth2/auth", state=state_jwt, access_type="online", prompt="consent select_account", ) return RedirectResponse(auth_url) except HTTPException: raise except Exception as e: logger.error(f"Error initiating Google auth login: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to initiate Google login: {str(e)}", ) @router.get("/auth/login/callback") async def auth_login_callback( code: str, state: str, request: Request, response: Response ) -> HTMLResponse: """OAuth callback for Google Auth app (login only).""" state_data = _parse_oauth_state(state) if state_data.get("flow") != _FLOW_LOGIN: raise HTTPException(status_code=400, detail=routeApiMsg("Invalid OAuth flow for this callback")) _require_google_auth_config() oauth = OAuth2Session(client_id=AUTH_CLIENT_ID, redirect_uri=AUTH_REDIRECT_URI) token_data = oauth.fetch_token( "https://oauth2.googleapis.com/token", client_secret=AUTH_CLIENT_SECRET, code=code, include_client_id=True, ) access_token = token_data.get("access_token") if not access_token: return HTMLResponse( content="
No access token.
", status_code=400, ) token_verification = await verify_google_token(access_token) if not token_verification.get("valid"): return HTMLResponse( content=f"{token_verification.get('error')}
", status_code=400, ) headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", } async with httpx.AsyncClient() as client: user_info_response = await client.get( "https://www.googleapis.com/oauth2/v2/userinfo", headers=headers ) if user_info_response.status_code != 200: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Failed to get user info from Google"), ) user_info = user_info_response.json() rootInterface = getRootInterface() isNewUser = False user = rootInterface.getUserByUsername(user_info.get("email")) if not user: user = rootInterface.createUser( username=user_info.get("email"), email=user_info.get("email"), fullName=user_info.get("name"), authenticationAuthority=AuthAuthority.GOOGLE, externalId=user_info.get("id"), externalUsername=user_info.get("email"), externalEmail=user_info.get("email"), addExternalIdentityConnection=False, ) isNewUser = True # --- MFA gate -------------------------------------------------------- from modules.auth.mfaService import isMfaRequired as _isMfaRequired from modules.routes.routeMfa import createMfaPendingToken userRecord = rootInterface._getUserForAuthentication(user.username) userMandates = rootInterface.getUserMandates(str(user.id)) _mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)] _mandateObjs = [] for _mid in _mandateIds: try: _recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid}) if _recs: _mandateObjs.append(Mandate.model_validate(dict(_recs[0]))) except Exception: pass mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs) hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False)) if mfaRequired or hasMfaSetup: _sid = str(uuid.uuid4()) pendingToken = createMfaPendingToken( userId=str(user.id), username=user.username, authority=AuthAuthority.GOOGLE.value, sessionId=_sid, ) if hasMfaSetup: mfaType = "mfa_required" extraFields = "" else: mfaType = "mfa_setup_required" from modules.auth.mfaService import generateSetup as _generateSetup existingSecret = userRecord.get("mfaSecret") if userRecord else None if existingSecret: from modules.auth.mfaService import decryptSecret, buildTotp, getMfaIssuer _plain = decryptSecret(existingSecret, userId=str(user.id)) _uri = buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=getMfaIssuer()) setupResult = {"provisioningUri": _uri} else: setupResult = _generateSetup(userId=str(user.id), username=user.username) rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]}) extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}" return HTMLResponse( content=f"""No access token.
", status_code=400, ) token_verification = await verify_google_token(token_response["access_token"]) if not token_verification.get("valid"): return HTMLResponse( content=f"{token_verification.get('error')}
", status_code=400, ) headers = { "Authorization": f"Bearer {token_response['access_token']}", "Content-Type": "application/json", } async with httpx.AsyncClient() as client: user_info_response = await client.get( "https://www.googleapis.com/oauth2/v2/userinfo", headers=headers ) if user_info_response.status_code != 200: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Failed to get user info from Google"), ) user_info = user_info_response.json() rootInterface = getRootInterface() user = rootInterface.getUser(user_id) if not user: return HTMLResponse( content=f""" """, status_code=404, ) interface = getInterface(user) connections = interface.getUserConnections(user_id) connection = None for conn in connections: if conn.id == connection_id: connection = conn break if not connection: return HTMLResponse( content=f""" """, status_code=404, ) try: connection.status = ConnectionStatus.ACTIVE connection.lastChecked = getUtcTimestamp() connection.expiresAt = getUtcTimestamp() + token_response.get("expires_in", 0) connection.externalId = user_info.get("id") connection.externalUsername = user_info.get("email") connection.externalEmail = user_info.get("email") granted_scopes_list = ( granted_scopes if isinstance(granted_scopes, list) else (granted_scopes.split(" ") if granted_scopes else googleDataScopes) ) connection.grantedScopes = granted_scopes_list rootInterface.db.recordModify(UserConnection, connection_id, connection.model_dump()) token = Token( userId=user.id, authority=AuthAuthority.GOOGLE, connectionId=connection_id, tokenPurpose=TokenPurpose.DATA_CONNECTION, tokenAccess=token_response["access_token"], tokenRefresh=token_response.get("refresh_token", ""), tokenType=token_response.get("token_type", "bearer"), expiresAt=createExpirationTimestamp(token_response.get("expires_in", 0)), createdAt=getUtcTimestamp(), ) interface.saveConnectionToken(token) try: from modules.shared.callbackRegistry import callbackRegistry if connection.knowledgeIngestionEnabled: callbackRegistry.trigger( "connection.established", connectionId=connection.id, authority=str(getattr(connection.authority, "value", connection.authority) or "google"), userId=str(user.id), ) else: logger.info( "ingestion.connection.bootstrap.skipped — knowledge ingestion disabled by user", extra={ "event": "ingestion.connection.bootstrap.skipped", "connectionId": connection.id, "authority": "google", "reason": "consent_disabled", }, ) except Exception as _cbErr: logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr) return HTMLResponse( content=f"""