Merge pull request #12 from valueonag/fix/preprocessing-limits
feat: Implement `max_batches` safety limit for Power BI data fetching…
This commit is contained in:
commit
3a28fbd5e6
4 changed files with 52 additions and 5 deletions
|
|
@ -17,6 +17,7 @@ class PowerBIReader:
|
||||||
group_by_columns: list[str] = None
|
group_by_columns: list[str] = None
|
||||||
batch_size: int = 10000
|
batch_size: int = 10000
|
||||||
order_by_column: str | None = None
|
order_by_column: str | None = None
|
||||||
|
max_batches: int = 100
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create(
|
async def create(
|
||||||
|
|
@ -29,6 +30,7 @@ class PowerBIReader:
|
||||||
group_by_columns: list[str] = None,
|
group_by_columns: list[str] = None,
|
||||||
batch_size: int = 10000,
|
batch_size: int = 10000,
|
||||||
order_by_column: str | None = None,
|
order_by_column: str | None = None,
|
||||||
|
max_batches: int = 100,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> "PowerBIReader":
|
) -> "PowerBIReader":
|
||||||
return cls(
|
return cls(
|
||||||
|
|
@ -39,6 +41,7 @@ class PowerBIReader:
|
||||||
group_by_columns=group_by_columns or [],
|
group_by_columns=group_by_columns or [],
|
||||||
batch_size=batch_size,
|
batch_size=batch_size,
|
||||||
order_by_column=order_by_column,
|
order_by_column=order_by_column,
|
||||||
|
max_batches=max_batches,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -83,7 +86,29 @@ class PowerBIReader:
|
||||||
)
|
)
|
||||||
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
|
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
|
||||||
|
|
||||||
def _dax_query_batch(self, last_value: str | int | None = None) -> str:
|
@staticmethod
|
||||||
|
def _is_numeric_value(value: str | int | float | None) -> bool:
|
||||||
|
"""Check if a value is numeric (including numeric strings).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: The value to check.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the value is numeric or a string that represents a number.
|
||||||
|
"""
|
||||||
|
if value is None:
|
||||||
|
return False
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
return True
|
||||||
|
if isinstance(value, str):
|
||||||
|
try:
|
||||||
|
float(value)
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _dax_query_batch(self, last_value: str | int | float | None = None) -> str:
|
||||||
"""Generate a batched DAX query using TOPN and keyset pagination.
|
"""Generate a batched DAX query using TOPN and keyset pagination.
|
||||||
|
|
||||||
Uses ORDER BY with the order_by_column for deterministic ordering,
|
Uses ORDER BY with the order_by_column for deterministic ordering,
|
||||||
|
|
@ -107,11 +132,14 @@ class PowerBIReader:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Subsequent batches: filter rows where order_col > last_value
|
# Subsequent batches: filter rows where order_col > last_value
|
||||||
# Handle string vs numeric values
|
# IMPORTANT: Detect if value is numeric (even if stored as string)
|
||||||
if isinstance(last_value, str):
|
# to ensure proper numeric comparison in DAX, not string comparison
|
||||||
filter_value = f'"{last_value}"'
|
if self._is_numeric_value(last_value):
|
||||||
|
# Use numeric literal (no quotes) for proper numeric comparison
|
||||||
|
filter_value = str(float(last_value) if '.' in str(last_value) else int(last_value))
|
||||||
else:
|
else:
|
||||||
filter_value = str(last_value)
|
# Genuine string value - quote it for DAX
|
||||||
|
filter_value = f'"{last_value}"'
|
||||||
|
|
||||||
return (
|
return (
|
||||||
f"EVALUATE TOPN({self.batch_size}, "
|
f"EVALUATE TOPN({self.batch_size}, "
|
||||||
|
|
@ -188,6 +216,17 @@ class PowerBIReader:
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
batch_num += 1
|
batch_num += 1
|
||||||
|
|
||||||
|
# Safety limit to prevent runaway requests
|
||||||
|
if batch_num > self.max_batches:
|
||||||
|
import logging
|
||||||
|
logging.warning(
|
||||||
|
f"Reached max_batches limit ({self.max_batches}) for table "
|
||||||
|
f"'{self.table_name}'. Stopping batch fetch. "
|
||||||
|
f"Total rows fetched so far: {sum(len(df) for df in all_dfs)}"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
dax_query = self._dax_query_batch(last_value)
|
dax_query = self._dax_query_batch(last_value)
|
||||||
df = await self._execute_query(dax_query)
|
df = await self._execute_query(dax_query)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -151,6 +151,7 @@ class TableConfig:
|
||||||
group_by_columns: List[str] = field(default_factory=list)
|
group_by_columns: List[str] = field(default_factory=list)
|
||||||
batch_size: int = 10000
|
batch_size: int = 10000
|
||||||
order_by_column: str | None = None
|
order_by_column: str | None = None
|
||||||
|
max_batches: int = 100
|
||||||
steps: List[Dict[str, Any]] = field(default_factory=list)
|
steps: List[Dict[str, Any]] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -263,6 +264,7 @@ class Preprocessor:
|
||||||
group_by_columns=table_data.get("group_by_columns", []),
|
group_by_columns=table_data.get("group_by_columns", []),
|
||||||
batch_size=table_data.get("batch_size", 10000),
|
batch_size=table_data.get("batch_size", 10000),
|
||||||
order_by_column=table_data.get("order_by_column"),
|
order_by_column=table_data.get("order_by_column"),
|
||||||
|
max_batches=table_data.get("max_batches", 100),
|
||||||
steps=table_data.get("steps", []),
|
steps=table_data.get("steps", []),
|
||||||
)
|
)
|
||||||
table_configs.append(table_config)
|
table_configs.append(table_config)
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,11 @@ class TableConfigSchema(BaseModel):
|
||||||
description="Column to order by for batch fetching. Required for batching to work.",
|
description="Column to order by for batch fetching. Required for batching to work.",
|
||||||
example="I_ID",
|
example="I_ID",
|
||||||
)
|
)
|
||||||
|
max_batches: int = Field(
|
||||||
|
default=100,
|
||||||
|
description="Maximum number of batches to fetch. Safety limit to prevent runaway requests.",
|
||||||
|
example=100,
|
||||||
|
)
|
||||||
steps: List[Dict[str, Any]] = Field(
|
steps: List[Dict[str, Any]] = Field(
|
||||||
default_factory=list,
|
default_factory=list,
|
||||||
description="List of preprocessing steps to apply",
|
description="List of preprocessing steps to apply",
|
||||||
|
|
|
||||||
|
|
@ -155,6 +155,7 @@ class DataProcessorService:
|
||||||
group_by_columns=table_config.group_by_columns,
|
group_by_columns=table_config.group_by_columns,
|
||||||
batch_size=table_config.batch_size,
|
batch_size=table_config.batch_size,
|
||||||
order_by_column=table_config.order_by_column,
|
order_by_column=table_config.order_by_column,
|
||||||
|
max_batches=table_config.max_batches,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 2: Read data from Power BI
|
# Step 2: Read data from Power BI
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue