feat: Implement max_batches safety limit for Power BI data fetching and refine DAX batch query type handling for numeric and string values.
This commit is contained in:
parent
af6150e2fb
commit
c2f11b92fd
4 changed files with 52 additions and 5 deletions
|
|
@ -17,6 +17,7 @@ class PowerBIReader:
|
|||
group_by_columns: list[str] = None
|
||||
batch_size: int = 10000
|
||||
order_by_column: str | None = None
|
||||
max_batches: int = 100
|
||||
|
||||
@classmethod
|
||||
async def create(
|
||||
|
|
@ -29,6 +30,7 @@ class PowerBIReader:
|
|||
group_by_columns: list[str] = None,
|
||||
batch_size: int = 10000,
|
||||
order_by_column: str | None = None,
|
||||
max_batches: int = 100,
|
||||
**kwargs,
|
||||
) -> "PowerBIReader":
|
||||
return cls(
|
||||
|
|
@ -39,6 +41,7 @@ class PowerBIReader:
|
|||
group_by_columns=group_by_columns or [],
|
||||
batch_size=batch_size,
|
||||
order_by_column=order_by_column,
|
||||
max_batches=max_batches,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
|
@ -83,7 +86,29 @@ class PowerBIReader:
|
|||
)
|
||||
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
|
||||
|
||||
def _dax_query_batch(self, last_value: str | int | None = None) -> str:
|
||||
@staticmethod
|
||||
def _is_numeric_value(value: str | int | float | None) -> bool:
|
||||
"""Check if a value is numeric (including numeric strings).
|
||||
|
||||
Args:
|
||||
value: The value to check.
|
||||
|
||||
Returns:
|
||||
True if the value is numeric or a string that represents a number.
|
||||
"""
|
||||
if value is None:
|
||||
return False
|
||||
if isinstance(value, (int, float)):
|
||||
return True
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
float(value)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
return False
|
||||
|
||||
def _dax_query_batch(self, last_value: str | int | float | None = None) -> str:
|
||||
"""Generate a batched DAX query using TOPN and keyset pagination.
|
||||
|
||||
Uses ORDER BY with the order_by_column for deterministic ordering,
|
||||
|
|
@ -107,11 +132,14 @@ class PowerBIReader:
|
|||
)
|
||||
|
||||
# Subsequent batches: filter rows where order_col > last_value
|
||||
# Handle string vs numeric values
|
||||
if isinstance(last_value, str):
|
||||
filter_value = f'"{last_value}"'
|
||||
# IMPORTANT: Detect if value is numeric (even if stored as string)
|
||||
# to ensure proper numeric comparison in DAX, not string comparison
|
||||
if self._is_numeric_value(last_value):
|
||||
# Use numeric literal (no quotes) for proper numeric comparison
|
||||
filter_value = str(float(last_value) if '.' in str(last_value) else int(last_value))
|
||||
else:
|
||||
filter_value = str(last_value)
|
||||
# Genuine string value - quote it for DAX
|
||||
filter_value = f'"{last_value}"'
|
||||
|
||||
return (
|
||||
f"EVALUATE TOPN({self.batch_size}, "
|
||||
|
|
@ -188,6 +216,17 @@ class PowerBIReader:
|
|||
|
||||
while True:
|
||||
batch_num += 1
|
||||
|
||||
# Safety limit to prevent runaway requests
|
||||
if batch_num > self.max_batches:
|
||||
import logging
|
||||
logging.warning(
|
||||
f"Reached max_batches limit ({self.max_batches}) for table "
|
||||
f"'{self.table_name}'. Stopping batch fetch. "
|
||||
f"Total rows fetched so far: {sum(len(df) for df in all_dfs)}"
|
||||
)
|
||||
break
|
||||
|
||||
dax_query = self._dax_query_batch(last_value)
|
||||
df = await self._execute_query(dax_query)
|
||||
|
||||
|
|
|
|||
|
|
@ -151,6 +151,7 @@ class TableConfig:
|
|||
group_by_columns: List[str] = field(default_factory=list)
|
||||
batch_size: int = 10000
|
||||
order_by_column: str | None = None
|
||||
max_batches: int = 100
|
||||
steps: List[Dict[str, Any]] = field(default_factory=list)
|
||||
|
||||
|
||||
|
|
@ -263,6 +264,7 @@ class Preprocessor:
|
|||
group_by_columns=table_data.get("group_by_columns", []),
|
||||
batch_size=table_data.get("batch_size", 10000),
|
||||
order_by_column=table_data.get("order_by_column"),
|
||||
max_batches=table_data.get("max_batches", 100),
|
||||
steps=table_data.get("steps", []),
|
||||
)
|
||||
table_configs.append(table_config)
|
||||
|
|
|
|||
|
|
@ -51,6 +51,11 @@ class TableConfigSchema(BaseModel):
|
|||
description="Column to order by for batch fetching. Required for batching to work.",
|
||||
example="I_ID",
|
||||
)
|
||||
max_batches: int = Field(
|
||||
default=100,
|
||||
description="Maximum number of batches to fetch. Safety limit to prevent runaway requests.",
|
||||
example=100,
|
||||
)
|
||||
steps: List[Dict[str, Any]] = Field(
|
||||
default_factory=list,
|
||||
description="List of preprocessing steps to apply",
|
||||
|
|
|
|||
|
|
@ -155,6 +155,7 @@ class DataProcessorService:
|
|||
group_by_columns=table_config.group_by_columns,
|
||||
batch_size=table_config.batch_size,
|
||||
order_by_column=table_config.order_by_column,
|
||||
max_batches=table_config.max_batches,
|
||||
)
|
||||
|
||||
# Step 2: Read data from Power BI
|
||||
|
|
|
|||
Loading…
Reference in a new issue