# Copyright (c) 2026 PowerOn AG # All rights reserved. """ Routes for Google authentication — split Auth app vs Data app. See wiki: concepts/OAuth-Auth-vs-Data-Connection-Konzept.md """ from fastapi import APIRouter, HTTPException, Request, Response, status, Depends, Query from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse import logging import json import time import uuid from typing import Dict, Any, Optional from requests_oauthlib import OAuth2Session import httpx from jose import jwt as jose_jwt from jose import JWTError from modules.shared.configuration import APP_CONFIG from modules.interfaces.interfaceDbApp import getInterface, getRootInterface from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection from modules.datamodels.datamodelSecurity import Token, TokenPurpose from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM from modules.auth.oauthConnectTicket import resolve_connect_context from modules.auth import ( createAccessToken, setAccessTokenCookie, createRefreshToken, setRefreshTokenCookie, clearAccessTokenCookie, clearRefreshTokenCookie, ) from modules.auth.tokenManager import TokenManager from modules.auth.oauthProviderConfig import googleAuthScopes, googleDataScopes from modules.shared.timeUtils import createExpirationTimestamp, getUtcTimestamp, parseTimestamp from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeSecurityGoogle") logger = logging.getLogger(__name__) _FLOW_LOGIN = "google_login" _FLOW_CONNECT = "google_connect" async def verify_google_token(access_token: str) -> Dict[str, Any]: """Verify Google access token and return token info including scopes.""" try: headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", } async with httpx.AsyncClient() as client: response = await client.get( "https://www.googleapis.com/oauth2/v1/tokeninfo", headers=headers, params={"access_token": access_token}, ) if response.status_code == 200: token_info = response.json() return { "valid": True, "token_info": token_info, "scopes": token_info.get("scope", "").split(" ") if token_info.get("scope") else [], "expires_in": int(token_info.get("expires_in", 0)), "user_id": token_info.get("user_id"), "email": token_info.get("email"), } return { "valid": False, "error": f"HTTP {response.status_code}", "details": response.text, } except Exception as e: logger.error(f"Error verifying Google token: {str(e)}") return {"valid": False, "error": str(e)} def _issue_oauth_state(claims: Dict[str, Any]) -> str: body = {**claims, "exp": int(time.time()) + 600} return jose_jwt.encode(body, SECRET_KEY, algorithm=ALGORITHM) def _parse_oauth_state(state: str) -> Dict[str, Any]: try: return jose_jwt.decode(state, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid OAuth state: {e}" ) from e router = APIRouter( prefix="/api/google", tags=["Security Google"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"}, }, ) AUTH_CLIENT_ID = APP_CONFIG.get("Service_GOOGLE_AUTH_CLIENT_ID") AUTH_CLIENT_SECRET = APP_CONFIG.get("Service_GOOGLE_AUTH_CLIENT_SECRET") AUTH_REDIRECT_URI = APP_CONFIG.get("Service_GOOGLE_AUTH_REDIRECT_URI") DATA_CLIENT_ID = APP_CONFIG.get("Service_GOOGLE_DATA_CLIENT_ID") DATA_CLIENT_SECRET = APP_CONFIG.get("Service_GOOGLE_DATA_CLIENT_SECRET") DATA_REDIRECT_URI = APP_CONFIG.get("Service_GOOGLE_DATA_REDIRECT_URI") def _require_google_auth_config(): if not AUTH_CLIENT_ID or not AUTH_CLIENT_SECRET or not AUTH_REDIRECT_URI: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Google Auth OAuth is not configured (Service_GOOGLE_AUTH_*)"), ) def _require_google_data_config(): if not DATA_CLIENT_ID or not DATA_CLIENT_SECRET or not DATA_REDIRECT_URI: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Google Data OAuth is not configured (Service_GOOGLE_DATA_*)"), ) @router.get("/auth/login") @limiter.limit("5/minute") def auth_login(request: Request) -> RedirectResponse: """Start Google login (Auth app — minimal scopes).""" try: _require_google_auth_config() state_jwt = _issue_oauth_state({"flow": _FLOW_LOGIN}) oauth = OAuth2Session( client_id=AUTH_CLIENT_ID, redirect_uri=AUTH_REDIRECT_URI, scope=googleAuthScopes, ) auth_url, _ = oauth.authorization_url( "https://accounts.google.com/o/oauth2/auth", state=state_jwt, access_type="online", prompt="consent select_account", ) return RedirectResponse(auth_url) except HTTPException: raise except Exception as e: logger.error(f"Error initiating Google auth login: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to initiate Google login: {str(e)}", ) @router.get("/auth/login/callback") async def auth_login_callback( code: str, state: str, request: Request, response: Response ) -> HTMLResponse: """OAuth callback for Google Auth app (login only).""" state_data = _parse_oauth_state(state) if state_data.get("flow") != _FLOW_LOGIN: raise HTTPException(status_code=400, detail=routeApiMsg("Invalid OAuth flow for this callback")) _require_google_auth_config() oauth = OAuth2Session(client_id=AUTH_CLIENT_ID, redirect_uri=AUTH_REDIRECT_URI) token_data = oauth.fetch_token( "https://oauth2.googleapis.com/token", client_secret=AUTH_CLIENT_SECRET, code=code, include_client_id=True, ) access_token = token_data.get("access_token") if not access_token: return HTMLResponse( content="

Authentication Failed

No access token.

", status_code=400, ) token_verification = await verify_google_token(access_token) if not token_verification.get("valid"): return HTMLResponse( content=f"

Authentication Failed

{token_verification.get('error')}

", status_code=400, ) headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", } async with httpx.AsyncClient() as client: user_info_response = await client.get( "https://www.googleapis.com/oauth2/v2/userinfo", headers=headers ) if user_info_response.status_code != 200: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Failed to get user info from Google"), ) user_info = user_info_response.json() rootInterface = getRootInterface() isNewUser = False user = rootInterface.getUserByUsername(user_info.get("email")) if not user: user = rootInterface.createUser( username=user_info.get("email"), email=user_info.get("email"), fullName=user_info.get("name"), authenticationAuthority=AuthAuthority.GOOGLE, externalId=user_info.get("id"), externalUsername=user_info.get("email"), externalEmail=user_info.get("email"), addExternalIdentityConnection=False, ) isNewUser = True # --- MFA gate -------------------------------------------------------- from modules.auth.mfaService import isMfaRequired as _isMfaRequired from modules.routes.routeMfa import createMfaPendingToken userRecord = rootInterface._getUserForAuthentication(user.username) userMandates = rootInterface.getUserMandates(str(user.id)) _mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)] _mandateObjs = [] for _mid in _mandateIds: try: _recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid}) if _recs: _mandateObjs.append(Mandate.model_validate(dict(_recs[0]))) except Exception: pass mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs) hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False)) if mfaRequired or hasMfaSetup: _sid = str(uuid.uuid4()) pendingToken = createMfaPendingToken( userId=str(user.id), username=user.username, authority=AuthAuthority.GOOGLE.value, sessionId=_sid, ) if hasMfaSetup: mfaType = "mfa_required" extraFields = "" else: mfaType = "mfa_setup_required" from modules.auth.mfaService import generateSetup as _generateSetup existingSecret = userRecord.get("mfaSecret") if userRecord else None if existingSecret: from modules.auth.mfaService import decryptSecret, buildTotp, getMfaIssuer _plain = decryptSecret(existingSecret, userId=str(user.id)) _uri = buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=getMfaIssuer()) setupResult = {"provisioningUri": _uri} else: setupResult = _generateSetup(userId=str(user.id), username=user.username) rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]}) extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}" return HTMLResponse( content=f""" MFA Required """ ) # --- end MFA gate ----------------------------------------------------- jwt_token_data = { "sub": user.username, "userId": str(user.id), "authenticationAuthority": AuthAuthority.GOOGLE.value, } jwt_token, jwt_expires_at = createAccessToken(jwt_token_data) refresh_token, _refresh_expires = createRefreshToken(jwt_token_data) from jose import jwt payload = jwt.decode(jwt_token, SECRET_KEY, algorithms=[ALGORITHM]) jti = payload.get("jti") token = Token( id=jti, userId=user.id, authority=AuthAuthority.GOOGLE, tokenPurpose=TokenPurpose.AUTH_SESSION, tokenAccess=jwt_token, tokenRefresh="", tokenType="bearer", expiresAt=jwt_expires_at.timestamp(), createdAt=getUtcTimestamp(), ) appInterface = getInterface(user) appInterface.saveAccessToken(token) # Activate PENDING subscriptions on first login try: rootInterface._activatePendingSubscriptions(str(user.id)) except Exception as subErr: logger.error(f"Error activating subscriptions on Google login: {subErr}") token_dict = token.model_dump() html_response = HTMLResponse( content=f""" Authentication Successful """ ) setAccessTokenCookie(html_response, jwt_token, expiresDelta=None) setRefreshTokenCookie(html_response, refresh_token) return html_response @router.get("/auth/connect") @limiter.limit("5/minute") def auth_connect( request: Request, connectionId: str = Query(..., description="UserConnection id"), connectTicket: str = Query(..., description="Short-lived ticket from POST /api/connections/{id}/connect"), reauth: Optional[int] = Query(0, description="If 1, force the consent screen so newly added scopes are granted"), ) -> RedirectResponse: """Start Google Data OAuth for an existing connection. Authenticated via ``connectTicket`` (issued by POST connect) so the popup works when the UI uses Bearer tokens in localStorage instead of cookies. Google already defaults to ``prompt=consent`` here, but ``include_granted_scopes=true`` can cause newly added scopes (e.g. calendar.readonly, contacts.readonly) to be silently dropped on subsequent re-authorisations. With ``reauth=1`` we drop ``include_granted_scopes`` so Google re-issues a token strictly for the current scope list. """ try: _require_google_data_config() _user, connection = resolve_connect_context( connectTicket, connectionId, _FLOW_CONNECT, AuthAuthority.GOOGLE ) state_jwt = connectTicket oauth = OAuth2Session( client_id=DATA_CLIENT_ID, redirect_uri=DATA_REDIRECT_URI, scope=googleDataScopes, ) extra_params: Dict[str, Any] = { "access_type": "offline", "state": state_jwt, } if not reauth: extra_params["include_granted_scopes"] = "true" login_hint = connection.externalEmail or connection.externalUsername if login_hint: extra_params["login_hint"] = login_hint if "@" in login_hint: extra_params["hd"] = login_hint.split("@", 1)[1] extra_params["prompt"] = "consent" else: extra_params["prompt"] = "consent select_account" auth_url, _ = oauth.authorization_url( "https://accounts.google.com/o/oauth2/auth", **extra_params ) return RedirectResponse(auth_url) except HTTPException: raise except Exception as e: logger.error(f"Error initiating Google connect: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to initiate Google connect: {str(e)}", ) @router.get("/auth/connect/callback") async def auth_connect_callback( code: str, state: str, request: Request, response: Response ) -> HTMLResponse: """OAuth callback for Google Data app (UserConnection).""" state_data = _parse_oauth_state(state) if state_data.get("flow") != _FLOW_CONNECT: raise HTTPException(status_code=400, detail=routeApiMsg("Invalid OAuth flow for this callback")) connection_id = state_data.get("connectionId") user_id = state_data.get("userId") if not connection_id or not user_id: raise HTTPException(status_code=400, detail=routeApiMsg("Missing connection or user in OAuth state")) _require_google_data_config() oauth = OAuth2Session(client_id=DATA_CLIENT_ID, redirect_uri=DATA_REDIRECT_URI) token_data = oauth.fetch_token( "https://oauth2.googleapis.com/token", client_secret=DATA_CLIENT_SECRET, code=code, include_client_id=True, ) granted_scopes = token_data.get("scope", "") token_response = { "access_token": token_data.get("access_token"), "refresh_token": token_data.get("refresh_token", ""), "token_type": token_data.get("token_type", "bearer"), "expires_in": token_data.get("expires_in", 0), } if not token_response.get("refresh_token"): try: rootInterface = getRootInterface() existing_tokens = rootInterface.getTokensByConnectionIdAndAuthority( connection_id, AuthAuthority.GOOGLE ) if existing_tokens: existing_tokens.sort( key=lambda x: parseTimestamp(x.createdAt, default=0), reverse=True ) token_response["refresh_token"] = existing_tokens[0].tokenRefresh or "" except Exception: pass if not token_response.get("access_token"): return HTMLResponse( content="

Connection Failed

No access token.

", status_code=400, ) token_verification = await verify_google_token(token_response["access_token"]) if not token_verification.get("valid"): return HTMLResponse( content=f"

Connection Failed

{token_verification.get('error')}

", status_code=400, ) headers = { "Authorization": f"Bearer {token_response['access_token']}", "Content-Type": "application/json", } async with httpx.AsyncClient() as client: user_info_response = await client.get( "https://www.googleapis.com/oauth2/v2/userinfo", headers=headers ) if user_info_response.status_code != 200: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Failed to get user info from Google"), ) user_info = user_info_response.json() rootInterface = getRootInterface() user = rootInterface.getUser(user_id) if not user: return HTMLResponse( content=f""" """, status_code=404, ) interface = getInterface(user) connections = interface.getUserConnections(user_id) connection = None for conn in connections: if conn.id == connection_id: connection = conn break if not connection: return HTMLResponse( content=f""" """, status_code=404, ) try: connection.status = ConnectionStatus.ACTIVE connection.lastChecked = getUtcTimestamp() connection.expiresAt = getUtcTimestamp() + token_response.get("expires_in", 0) connection.externalId = user_info.get("id") connection.externalUsername = user_info.get("email") connection.externalEmail = user_info.get("email") granted_scopes_list = ( granted_scopes if isinstance(granted_scopes, list) else (granted_scopes.split(" ") if granted_scopes else googleDataScopes) ) connection.grantedScopes = granted_scopes_list rootInterface.db.recordModify(UserConnection, connection_id, connection.model_dump()) token = Token( userId=user.id, authority=AuthAuthority.GOOGLE, connectionId=connection_id, tokenPurpose=TokenPurpose.DATA_CONNECTION, tokenAccess=token_response["access_token"], tokenRefresh=token_response.get("refresh_token", ""), tokenType=token_response.get("token_type", "bearer"), expiresAt=createExpirationTimestamp(token_response.get("expires_in", 0)), createdAt=getUtcTimestamp(), ) interface.saveConnectionToken(token) try: from modules.shared.callbackRegistry import callbackRegistry if connection.knowledgeIngestionEnabled: callbackRegistry.trigger( "connection.established", connectionId=connection.id, authority=str(getattr(connection.authority, "value", connection.authority) or "google"), userId=str(user.id), ) else: logger.info( "ingestion.connection.bootstrap.skipped — knowledge ingestion disabled by user", extra={ "event": "ingestion.connection.bootstrap.skipped", "connectionId": connection.id, "authority": "google", "reason": "consent_disabled", }, ) except Exception as _cbErr: logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr) return HTMLResponse( content=f""" Connection Successful """ ) except Exception as e: logger.error(f"Error updating Google connection: {str(e)}", exc_info=True) return HTMLResponse( content=f""" """, status_code=500, ) @router.get("/me", response_model=User) @limiter.limit("30/minute") def get_current_user( request: Request, currentUser: User = Depends(getCurrentUser), ) -> User: return currentUser @router.post("/logout") @limiter.limit("10/minute") def logout( request: Request, currentUser: User = Depends(getCurrentUser), ) -> JSONResponse: """ End only the PowerOn gateway session. Does not revoke the Google account session in the browser. """ try: appInterface = getInterface(currentUser) token = request.cookies.get("auth_token") if not token: auth_header = request.headers.get("Authorization") if auth_header and auth_header.lower().startswith("bearer "): token = auth_header.split(" ", 1)[1].strip() if not token: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("No token found"), ) try: payload = jose_jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) session_id = payload.get("sid") or payload.get("sessionId") jti = payload.get("jti") except Exception as e: logger.error(f"Failed to decode JWT on Google logout: {str(e)}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("Invalid token"), ) revoked = 0 if session_id: revoked = appInterface.revokeTokensBySessionId( session_id, currentUser.id, AuthAuthority.GOOGLE, revokedBy=currentUser.id, reason="logout", ) elif jti: appInterface.revokeTokenById(jti, revokedBy=currentUser.id, reason="logout") revoked = 1 try: from modules.dbHelpers.auditLogger import audit_logger audit_logger.logUserAccess( userId=str(currentUser.id), mandateId="system", action="logout", successInfo=f"google_gateway_logout revoked={revoked}", ipAddress=request.client.host if request.client else None, userAgent=request.headers.get("user-agent"), success=True, ) except Exception: pass json_response = JSONResponse( { "message": "Successfully logged out from application (Google account stays signed in elsewhere)", "revokedTokens": revoked, } ) clearAccessTokenCookie(json_response) clearRefreshTokenCookie(json_response) return json_response except HTTPException: raise except Exception as e: logger.error(f"Error during logout: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to logout: {str(e)}", ) @router.post("/refresh") @limiter.limit("10/minute") async def refresh_token( request: Request, currentUser: User = Depends(getCurrentUser), ) -> Dict[str, Any]: try: appInterface = getInterface(currentUser) payload = {} try: payload = await request.json() except Exception: payload = {} requested_connection_id = ( payload.get("connectionId") if isinstance(payload, dict) else None ) connections = appInterface.getUserConnections(currentUser.id) google_connection = None if requested_connection_id: for conn in connections: if conn.id == requested_connection_id and conn.authority == AuthAuthority.GOOGLE: google_connection = conn break if not google_connection: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=routeApiMsg("Requested Google connection not found for current user"), ) else: for conn in connections: if conn.authority == AuthAuthority.GOOGLE: google_connection = conn break if not google_connection: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=routeApiMsg("No Google connection found for current user"), ) current_token = TokenManager().getFreshToken(google_connection.id) if not current_token: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=routeApiMsg("No Google token found for this connection"), ) expiresAtValue = parseTimestamp(current_token.expiresAt) google_connection.expiresAt = ( expiresAtValue if expiresAtValue else google_connection.expiresAt ) google_connection.lastChecked = getUtcTimestamp() google_connection.status = ConnectionStatus.ACTIVE appInterface.db.recordModify( UserConnection, google_connection.id, google_connection.model_dump() ) currentTime = getUtcTimestamp() expiresAt = parseTimestamp(current_token.expiresAt) expiresIn = int(expiresAt - currentTime) if expiresAt else 0 return { "message": "Token refreshed successfully", "expires_at": expiresAt, "expires_in_seconds": expiresIn, } except HTTPException: raise except Exception as e: logger.error(f"Error refreshing Google token: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to refresh token: {str(e)}", )