121 lines
3.8 KiB
Python
121 lines
3.8 KiB
Python
from msal import ConfidentialClientApplication, SerializableTokenCache
|
|
import anyio # comes with FastAPI via Starlette/AnyIO
|
|
from dataclasses import dataclass
|
|
from src.settings import settings
|
|
import pandas as pd
|
|
import httpx
|
|
|
|
|
|
@dataclass
|
|
class PowerBIReader:
|
|
dataset_id: str
|
|
access_token: str
|
|
table_name: str
|
|
base_url: str = settings.POWERBI_BASE_URL
|
|
include_nulls: bool = True
|
|
|
|
@classmethod
|
|
async def create(
|
|
cls, dataset_id: str, access_token: str, table_name: str, **kwargs
|
|
):
|
|
return cls(
|
|
dataset_id=dataset_id,
|
|
access_token=access_token,
|
|
table_name=table_name,
|
|
**kwargs,
|
|
)
|
|
|
|
def _dax_query(self) -> str:
|
|
# Escape single quotes in table names per DAX rules
|
|
safe = self.table_name.replace("'", "''")
|
|
return f"EVALUATE '{safe}'"
|
|
|
|
async def read_data(self) -> pd.DataFrame:
|
|
"""
|
|
Calls Power BI REST API: POST /datasets/{datasetId}/executeQueries
|
|
with DAX: EVALUATE 'TableName' and returns a DataFrame.
|
|
"""
|
|
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
|
|
body = {
|
|
"queries": [{"query": self._dax_query()}],
|
|
"serializerSettings": {"includeNulls": self.include_nulls},
|
|
}
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=60) as client:
|
|
resp = await client.post(url, headers=headers, json=body)
|
|
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(
|
|
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
|
|
)
|
|
|
|
payload = resp.json()
|
|
try:
|
|
rows = payload["results"][0]["tables"][0]["rows"]
|
|
except (KeyError, IndexError) as e:
|
|
raise RuntimeError("Unexpected executeQueries response structure") from e
|
|
|
|
if not rows:
|
|
return pd.DataFrame()
|
|
|
|
df = pd.DataFrame(rows)
|
|
|
|
# Columns often come back as "Table[Column]". Strip the qualifier.
|
|
def _strip_qual(col: str) -> str:
|
|
if "[" in col and col.endswith("]"):
|
|
return col.split("[", 1)[1][:-1]
|
|
return col
|
|
|
|
df.columns = [_strip_qual(c) for c in df.columns]
|
|
return df
|
|
|
|
@staticmethod
|
|
def _get_access_token_sync(
|
|
tenant_id: str,
|
|
client_id: str,
|
|
client_secret: str,
|
|
*,
|
|
authority_base: str = settings.POWERBI_AUTHORITY_BASE,
|
|
cache: SerializableTokenCache | None = None,
|
|
) -> str:
|
|
SCOPE = ["https://analysis.windows.net/powerbi/api/.default"] # local scope
|
|
authority = f"{authority_base}/{tenant_id}"
|
|
|
|
app = ConfidentialClientApplication(
|
|
client_id=client_id,
|
|
authority=authority,
|
|
client_credential=client_secret,
|
|
token_cache=cache, # pass a SerializableTokenCache to reuse tokens
|
|
)
|
|
|
|
# Try cache first; fall back to client credentials
|
|
result = app.acquire_token_silent(
|
|
SCOPE, account=None
|
|
) or app.acquire_token_for_client(scopes=SCOPE)
|
|
|
|
if "access_token" not in result:
|
|
raise RuntimeError(
|
|
f"MSAL token error: {result.get('error')} - {result.get('error_description')}"
|
|
)
|
|
return result["access_token"]
|
|
|
|
@staticmethod
|
|
async def _get_access_token_async(
|
|
tenant_id: str,
|
|
client_id: str,
|
|
client_secret: str,
|
|
**kwargs,
|
|
) -> str:
|
|
# Offload the blocking MSAL HTTP call to a worker thread
|
|
return await anyio.to_thread.run_sync(
|
|
PowerBIReader._get_access_token_sync,
|
|
tenant_id,
|
|
client_id,
|
|
client_secret,
|
|
**kwargs,
|
|
)
|