Merge pull request #11 from valueonag/fix/preprocessing-limits
feat: Add batching and keyset pagination to Power BI data fetching us…
This commit is contained in:
commit
f5f8dfcb80
4 changed files with 114 additions and 5 deletions
|
|
@ -15,6 +15,8 @@ class PowerBIReader:
|
||||||
include_nulls: bool = True
|
include_nulls: bool = True
|
||||||
measures: list[str] = None
|
measures: list[str] = None
|
||||||
group_by_columns: list[str] = None
|
group_by_columns: list[str] = None
|
||||||
|
batch_size: int = 10000
|
||||||
|
order_by_column: str | None = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create(
|
async def create(
|
||||||
|
|
@ -25,14 +27,18 @@ class PowerBIReader:
|
||||||
table_name: str,
|
table_name: str,
|
||||||
measures: list[str] = None,
|
measures: list[str] = None,
|
||||||
group_by_columns: list[str] = None,
|
group_by_columns: list[str] = None,
|
||||||
|
batch_size: int = 10000,
|
||||||
|
order_by_column: str | None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
) -> "PowerBIReader":
|
||||||
return cls(
|
return cls(
|
||||||
dataset_id=dataset_id,
|
dataset_id=dataset_id,
|
||||||
access_token=access_token,
|
access_token=access_token,
|
||||||
table_name=table_name,
|
table_name=table_name,
|
||||||
measures=measures or [],
|
measures=measures or [],
|
||||||
group_by_columns=group_by_columns or [],
|
group_by_columns=group_by_columns or [],
|
||||||
|
batch_size=batch_size,
|
||||||
|
order_by_column=order_by_column,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -77,14 +83,54 @@ class PowerBIReader:
|
||||||
)
|
)
|
||||||
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
|
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
|
||||||
|
|
||||||
async def read_data(self) -> pd.DataFrame:
|
def _dax_query_batch(self, last_value: str | int | None = None) -> str:
|
||||||
|
"""Generate a batched DAX query using TOPN and keyset pagination.
|
||||||
|
|
||||||
|
Uses ORDER BY with the order_by_column for deterministic ordering,
|
||||||
|
and FILTER to skip already-fetched rows based on the last seen value.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
last_value: The last value of order_by_column from the previous batch.
|
||||||
|
None for the first batch.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DAX query string for fetching the next batch.
|
||||||
"""
|
"""
|
||||||
Calls Power BI REST API: POST /datasets/{datasetId}/executeQueries
|
safe_table = self.table_name.replace("'", "''")
|
||||||
with DAX: EVALUATE 'TableName' and returns a DataFrame.
|
order_col = self.order_by_column
|
||||||
|
|
||||||
|
if last_value is None:
|
||||||
|
# First batch: just use TOPN with ORDER BY
|
||||||
|
return (
|
||||||
|
f"EVALUATE TOPN({self.batch_size}, '{safe_table}', "
|
||||||
|
f"'{safe_table}'[{order_col}], ASC)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subsequent batches: filter rows where order_col > last_value
|
||||||
|
# Handle string vs numeric values
|
||||||
|
if isinstance(last_value, str):
|
||||||
|
filter_value = f'"{last_value}"'
|
||||||
|
else:
|
||||||
|
filter_value = str(last_value)
|
||||||
|
|
||||||
|
return (
|
||||||
|
f"EVALUATE TOPN({self.batch_size}, "
|
||||||
|
f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > {filter_value}), "
|
||||||
|
f"'{safe_table}'[{order_col}], ASC)"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _execute_query(self, dax_query: str) -> pd.DataFrame:
|
||||||
|
"""Execute a DAX query and return the results as a DataFrame.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dax_query: The DAX query string to execute.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DataFrame containing the query results.
|
||||||
"""
|
"""
|
||||||
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
|
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
|
||||||
body = {
|
body = {
|
||||||
"queries": [{"query": self._dax_query()}],
|
"queries": [{"query": dax_query}],
|
||||||
"serializerSettings": {"includeNulls": self.include_nulls},
|
"serializerSettings": {"includeNulls": self.include_nulls},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -121,6 +167,51 @@ class PowerBIReader:
|
||||||
df.columns = [_strip_qual(c) for c in df.columns]
|
df.columns = [_strip_qual(c) for c in df.columns]
|
||||||
return df
|
return df
|
||||||
|
|
||||||
|
async def read_data(self) -> pd.DataFrame:
|
||||||
|
"""Fetch data from Power BI, using batching if order_by_column is set.
|
||||||
|
|
||||||
|
If order_by_column is configured, fetches data in batches using
|
||||||
|
keyset pagination to avoid the Power BI API's 1M value limit.
|
||||||
|
Otherwise, fetches all data in a single query (legacy behavior).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DataFrame containing all fetched data.
|
||||||
|
"""
|
||||||
|
# Legacy behavior: no batching if order_by_column not set
|
||||||
|
if not self.order_by_column:
|
||||||
|
return await self._execute_query(self._dax_query())
|
||||||
|
|
||||||
|
# Batch fetching with keyset pagination
|
||||||
|
all_dfs: list[pd.DataFrame] = []
|
||||||
|
last_value: str | int | None = None
|
||||||
|
batch_num = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
batch_num += 1
|
||||||
|
dax_query = self._dax_query_batch(last_value)
|
||||||
|
df = await self._execute_query(dax_query)
|
||||||
|
|
||||||
|
if df.empty:
|
||||||
|
# No more data to fetch
|
||||||
|
break
|
||||||
|
|
||||||
|
all_dfs.append(df)
|
||||||
|
|
||||||
|
# Get the last value for the next batch
|
||||||
|
new_last_value = df[self.order_by_column].iloc[-1]
|
||||||
|
|
||||||
|
# Safety check: if last_value didn't change, we're stuck in a loop
|
||||||
|
if new_last_value == last_value:
|
||||||
|
break
|
||||||
|
|
||||||
|
last_value = new_last_value
|
||||||
|
|
||||||
|
if not all_dfs:
|
||||||
|
return pd.DataFrame()
|
||||||
|
|
||||||
|
result = pd.concat(all_dfs, ignore_index=True)
|
||||||
|
return result
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_access_token_sync(
|
def _get_access_token_sync(
|
||||||
tenant_id: str,
|
tenant_id: str,
|
||||||
|
|
|
||||||
|
|
@ -149,6 +149,8 @@ class TableConfig:
|
||||||
powerbi_table_name: str
|
powerbi_table_name: str
|
||||||
measures: List[str] = field(default_factory=list)
|
measures: List[str] = field(default_factory=list)
|
||||||
group_by_columns: List[str] = field(default_factory=list)
|
group_by_columns: List[str] = field(default_factory=list)
|
||||||
|
batch_size: int = 10000
|
||||||
|
order_by_column: str | None = None
|
||||||
steps: List[Dict[str, Any]] = field(default_factory=list)
|
steps: List[Dict[str, Any]] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -259,6 +261,8 @@ class Preprocessor:
|
||||||
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||||
measures=table_data.get("measures", []),
|
measures=table_data.get("measures", []),
|
||||||
group_by_columns=table_data.get("group_by_columns", []),
|
group_by_columns=table_data.get("group_by_columns", []),
|
||||||
|
batch_size=table_data.get("batch_size", 10000),
|
||||||
|
order_by_column=table_data.get("order_by_column"),
|
||||||
steps=table_data.get("steps", []),
|
steps=table_data.get("steps", []),
|
||||||
)
|
)
|
||||||
table_configs.append(table_config)
|
table_configs.append(table_config)
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,16 @@ class TableConfigSchema(BaseModel):
|
||||||
description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)",
|
description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)",
|
||||||
example=["m_Artikel"],
|
example=["m_Artikel"],
|
||||||
)
|
)
|
||||||
|
batch_size: int = Field(
|
||||||
|
default=10000,
|
||||||
|
description="Number of rows to fetch per batch (for large tables)",
|
||||||
|
example=10000,
|
||||||
|
)
|
||||||
|
order_by_column: str | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Column to order by for batch fetching. Required for batching to work.",
|
||||||
|
example="I_ID",
|
||||||
|
)
|
||||||
steps: List[Dict[str, Any]] = Field(
|
steps: List[Dict[str, Any]] = Field(
|
||||||
default_factory=list,
|
default_factory=list,
|
||||||
description="List of preprocessing steps to apply",
|
description="List of preprocessing steps to apply",
|
||||||
|
|
|
||||||
|
|
@ -56,6 +56,8 @@ class DataProcessorService:
|
||||||
table_name=table_config.powerbi_table_name,
|
table_name=table_config.powerbi_table_name,
|
||||||
measures=table_config.measures,
|
measures=table_config.measures,
|
||||||
group_by_columns=table_config.group_by_columns,
|
group_by_columns=table_config.group_by_columns,
|
||||||
|
batch_size=getattr(table_config, 'batch_size', 10000),
|
||||||
|
order_by_column=getattr(table_config, 'order_by_column', None),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 2: Read data from Power BI
|
# Step 2: Read data from Power BI
|
||||||
|
|
@ -151,6 +153,8 @@ class DataProcessorService:
|
||||||
table_name=table_config.powerbi_table_name,
|
table_name=table_config.powerbi_table_name,
|
||||||
measures=table_config.measures,
|
measures=table_config.measures,
|
||||||
group_by_columns=table_config.group_by_columns,
|
group_by_columns=table_config.group_by_columns,
|
||||||
|
batch_size=table_config.batch_size,
|
||||||
|
order_by_column=table_config.order_by_column,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 2: Read data from Power BI
|
# Step 2: Read data from Power BI
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue