service-preprocessing/src/dataprocessor/domain/powerbi_reader.py
2025-09-26 17:07:42 +02:00

121 lines
3.8 KiB
Python

from msal import ConfidentialClientApplication, SerializableTokenCache
import anyio # comes with FastAPI via Starlette/AnyIO
from dataclasses import dataclass
from src.settings import settings
import pandas as pd
import httpx
@dataclass
class PowerBIReader:
dataset_id: str
access_token: str
table_name: str
base_url: str = settings.POWERBI_BASE_URL
include_nulls: bool = True
@classmethod
async def create(
cls, dataset_id: str, access_token: str, table_name: str, **kwargs
):
return cls(
dataset_id=dataset_id,
access_token=access_token,
table_name=table_name,
**kwargs,
)
def _dax_query(self) -> str:
# Escape single quotes in table names per DAX rules
safe = self.table_name.replace("'", "''")
return f"EVALUATE '{safe}'"
async def read_data(self) -> pd.DataFrame:
"""
Calls Power BI REST API: POST /datasets/{datasetId}/executeQueries
with DAX: EVALUATE 'TableName' and returns a DataFrame.
"""
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
body = {
"queries": [{"query": self._dax_query()}],
"serializerSettings": {"includeNulls": self.include_nulls},
}
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(url, headers=headers, json=body)
if resp.status_code != 200:
raise RuntimeError(
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
)
payload = resp.json()
try:
rows = payload["results"][0]["tables"][0]["rows"]
except (KeyError, IndexError) as e:
raise RuntimeError("Unexpected executeQueries response structure") from e
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
# Columns often come back as "Table[Column]". Strip the qualifier.
def _strip_qual(col: str) -> str:
if "[" in col and col.endswith("]"):
return col.split("[", 1)[1][:-1]
return col
df.columns = [_strip_qual(c) for c in df.columns]
return df
@staticmethod
def _get_access_token_sync(
tenant_id: str,
client_id: str,
client_secret: str,
*,
authority_base: str = settings.POWERBI_AUTHORITY_BASE,
cache: SerializableTokenCache | None = None,
) -> str:
SCOPE = ["https://analysis.windows.net/powerbi/api/.default"] # local scope
authority = f"{authority_base}/{tenant_id}"
app = ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential=client_secret,
token_cache=cache, # pass a SerializableTokenCache to reuse tokens
)
# Try cache first; fall back to client credentials
result = app.acquire_token_silent(
SCOPE, account=None
) or app.acquire_token_for_client(scopes=SCOPE)
if "access_token" not in result:
raise RuntimeError(
f"MSAL token error: {result.get('error')} - {result.get('error_description')}"
)
return result["access_token"]
@staticmethod
async def _get_access_token_async(
tenant_id: str,
client_id: str,
client_secret: str,
**kwargs,
) -> str:
# Offload the blocking MSAL HTTP call to a worker thread
return await anyio.to_thread.run_sync(
PowerBIReader._get_access_token_sync,
tenant_id,
client_id,
client_secret,
**kwargs,
)