From c2f11b92fd89ae5654279b0d92863086d1638804 Mon Sep 17 00:00:00 2001 From: Christopher Gondek Date: Tue, 23 Dec 2025 11:28:55 +0100 Subject: [PATCH] feat: Implement `max_batches` safety limit for Power BI data fetching and refine DAX batch query type handling for numeric and string values. --- src/dataprocessor/domain/powerbi_reader.py | 49 +++++++++++++++++++--- src/dataprocessor/domain/preprocessor.py | 2 + src/dataprocessor/schemas.py | 5 +++ src/dataprocessor/service.py | 1 + 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/dataprocessor/domain/powerbi_reader.py b/src/dataprocessor/domain/powerbi_reader.py index dc4366d..74af88a 100644 --- a/src/dataprocessor/domain/powerbi_reader.py +++ b/src/dataprocessor/domain/powerbi_reader.py @@ -17,6 +17,7 @@ class PowerBIReader: group_by_columns: list[str] = None batch_size: int = 10000 order_by_column: str | None = None + max_batches: int = 100 @classmethod async def create( @@ -29,6 +30,7 @@ class PowerBIReader: group_by_columns: list[str] = None, batch_size: int = 10000, order_by_column: str | None = None, + max_batches: int = 100, **kwargs, ) -> "PowerBIReader": return cls( @@ -39,6 +41,7 @@ class PowerBIReader: group_by_columns=group_by_columns or [], batch_size=batch_size, order_by_column=order_by_column, + max_batches=max_batches, **kwargs, ) @@ -83,7 +86,29 @@ class PowerBIReader: ) return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})" - def _dax_query_batch(self, last_value: str | int | None = None) -> str: + @staticmethod + def _is_numeric_value(value: str | int | float | None) -> bool: + """Check if a value is numeric (including numeric strings). + + Args: + value: The value to check. + + Returns: + True if the value is numeric or a string that represents a number. + """ + if value is None: + return False + if isinstance(value, (int, float)): + return True + if isinstance(value, str): + try: + float(value) + return True + except ValueError: + return False + return False + + def _dax_query_batch(self, last_value: str | int | float | None = None) -> str: """Generate a batched DAX query using TOPN and keyset pagination. Uses ORDER BY with the order_by_column for deterministic ordering, @@ -107,11 +132,14 @@ class PowerBIReader: ) # Subsequent batches: filter rows where order_col > last_value - # Handle string vs numeric values - if isinstance(last_value, str): - filter_value = f'"{last_value}"' + # IMPORTANT: Detect if value is numeric (even if stored as string) + # to ensure proper numeric comparison in DAX, not string comparison + if self._is_numeric_value(last_value): + # Use numeric literal (no quotes) for proper numeric comparison + filter_value = str(float(last_value) if '.' in str(last_value) else int(last_value)) else: - filter_value = str(last_value) + # Genuine string value - quote it for DAX + filter_value = f'"{last_value}"' return ( f"EVALUATE TOPN({self.batch_size}, " @@ -188,6 +216,17 @@ class PowerBIReader: while True: batch_num += 1 + + # Safety limit to prevent runaway requests + if batch_num > self.max_batches: + import logging + logging.warning( + f"Reached max_batches limit ({self.max_batches}) for table " + f"'{self.table_name}'. Stopping batch fetch. " + f"Total rows fetched so far: {sum(len(df) for df in all_dfs)}" + ) + break + dax_query = self._dax_query_batch(last_value) df = await self._execute_query(dax_query) diff --git a/src/dataprocessor/domain/preprocessor.py b/src/dataprocessor/domain/preprocessor.py index 93e725c..929721c 100644 --- a/src/dataprocessor/domain/preprocessor.py +++ b/src/dataprocessor/domain/preprocessor.py @@ -151,6 +151,7 @@ class TableConfig: group_by_columns: List[str] = field(default_factory=list) batch_size: int = 10000 order_by_column: str | None = None + max_batches: int = 100 steps: List[Dict[str, Any]] = field(default_factory=list) @@ -263,6 +264,7 @@ class Preprocessor: group_by_columns=table_data.get("group_by_columns", []), batch_size=table_data.get("batch_size", 10000), order_by_column=table_data.get("order_by_column"), + max_batches=table_data.get("max_batches", 100), steps=table_data.get("steps", []), ) table_configs.append(table_config) diff --git a/src/dataprocessor/schemas.py b/src/dataprocessor/schemas.py index 797f1d0..c4bf787 100644 --- a/src/dataprocessor/schemas.py +++ b/src/dataprocessor/schemas.py @@ -51,6 +51,11 @@ class TableConfigSchema(BaseModel): description="Column to order by for batch fetching. Required for batching to work.", example="I_ID", ) + max_batches: int = Field( + default=100, + description="Maximum number of batches to fetch. Safety limit to prevent runaway requests.", + example=100, + ) steps: List[Dict[str, Any]] = Field( default_factory=list, description="List of preprocessing steps to apply", diff --git a/src/dataprocessor/service.py b/src/dataprocessor/service.py index 6a58760..0daeb4b 100644 --- a/src/dataprocessor/service.py +++ b/src/dataprocessor/service.py @@ -155,6 +155,7 @@ class DataProcessorService: group_by_columns=table_config.group_by_columns, batch_size=table_config.batch_size, order_by_column=table_config.order_by_column, + max_batches=table_config.max_batches, ) # Step 2: Read data from Power BI