diff --git a/src/dataprocessor/domain/powerbi_reader.py b/src/dataprocessor/domain/powerbi_reader.py index 87393db..dc4366d 100644 --- a/src/dataprocessor/domain/powerbi_reader.py +++ b/src/dataprocessor/domain/powerbi_reader.py @@ -15,6 +15,8 @@ class PowerBIReader: include_nulls: bool = True measures: list[str] = None group_by_columns: list[str] = None + batch_size: int = 10000 + order_by_column: str | None = None @classmethod async def create( @@ -25,14 +27,18 @@ class PowerBIReader: table_name: str, measures: list[str] = None, group_by_columns: list[str] = None, + batch_size: int = 10000, + order_by_column: str | None = None, **kwargs, - ): + ) -> "PowerBIReader": return cls( dataset_id=dataset_id, access_token=access_token, table_name=table_name, measures=measures or [], group_by_columns=group_by_columns or [], + batch_size=batch_size, + order_by_column=order_by_column, **kwargs, ) @@ -77,14 +83,54 @@ class PowerBIReader: ) return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})" - async def read_data(self) -> pd.DataFrame: + def _dax_query_batch(self, last_value: str | int | None = None) -> str: + """Generate a batched DAX query using TOPN and keyset pagination. + + Uses ORDER BY with the order_by_column for deterministic ordering, + and FILTER to skip already-fetched rows based on the last seen value. + + Args: + last_value: The last value of order_by_column from the previous batch. + None for the first batch. + + Returns: + DAX query string for fetching the next batch. """ - Calls Power BI REST API: POST /datasets/{datasetId}/executeQueries - with DAX: EVALUATE 'TableName' and returns a DataFrame. + safe_table = self.table_name.replace("'", "''") + order_col = self.order_by_column + + if last_value is None: + # First batch: just use TOPN with ORDER BY + return ( + f"EVALUATE TOPN({self.batch_size}, '{safe_table}', " + f"'{safe_table}'[{order_col}], ASC)" + ) + + # Subsequent batches: filter rows where order_col > last_value + # Handle string vs numeric values + if isinstance(last_value, str): + filter_value = f'"{last_value}"' + else: + filter_value = str(last_value) + + return ( + f"EVALUATE TOPN({self.batch_size}, " + f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > {filter_value}), " + f"'{safe_table}'[{order_col}], ASC)" + ) + + async def _execute_query(self, dax_query: str) -> pd.DataFrame: + """Execute a DAX query and return the results as a DataFrame. + + Args: + dax_query: The DAX query string to execute. + + Returns: + DataFrame containing the query results. """ url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries" body = { - "queries": [{"query": self._dax_query()}], + "queries": [{"query": dax_query}], "serializerSettings": {"includeNulls": self.include_nulls}, } @@ -121,6 +167,51 @@ class PowerBIReader: df.columns = [_strip_qual(c) for c in df.columns] return df + async def read_data(self) -> pd.DataFrame: + """Fetch data from Power BI, using batching if order_by_column is set. + + If order_by_column is configured, fetches data in batches using + keyset pagination to avoid the Power BI API's 1M value limit. + Otherwise, fetches all data in a single query (legacy behavior). + + Returns: + DataFrame containing all fetched data. + """ + # Legacy behavior: no batching if order_by_column not set + if not self.order_by_column: + return await self._execute_query(self._dax_query()) + + # Batch fetching with keyset pagination + all_dfs: list[pd.DataFrame] = [] + last_value: str | int | None = None + batch_num = 0 + + while True: + batch_num += 1 + dax_query = self._dax_query_batch(last_value) + df = await self._execute_query(dax_query) + + if df.empty: + # No more data to fetch + break + + all_dfs.append(df) + + # Get the last value for the next batch + new_last_value = df[self.order_by_column].iloc[-1] + + # Safety check: if last_value didn't change, we're stuck in a loop + if new_last_value == last_value: + break + + last_value = new_last_value + + if not all_dfs: + return pd.DataFrame() + + result = pd.concat(all_dfs, ignore_index=True) + return result + @staticmethod def _get_access_token_sync( tenant_id: str, diff --git a/src/dataprocessor/domain/preprocessor.py b/src/dataprocessor/domain/preprocessor.py index f67ac65..93e725c 100644 --- a/src/dataprocessor/domain/preprocessor.py +++ b/src/dataprocessor/domain/preprocessor.py @@ -149,6 +149,8 @@ class TableConfig: powerbi_table_name: str measures: List[str] = field(default_factory=list) group_by_columns: List[str] = field(default_factory=list) + batch_size: int = 10000 + order_by_column: str | None = None steps: List[Dict[str, Any]] = field(default_factory=list) @@ -259,6 +261,8 @@ class Preprocessor: powerbi_table_name=table_data.get("powerbi_table_name", ""), measures=table_data.get("measures", []), group_by_columns=table_data.get("group_by_columns", []), + batch_size=table_data.get("batch_size", 10000), + order_by_column=table_data.get("order_by_column"), steps=table_data.get("steps", []), ) table_configs.append(table_config) diff --git a/src/dataprocessor/schemas.py b/src/dataprocessor/schemas.py index 376e692..797f1d0 100644 --- a/src/dataprocessor/schemas.py +++ b/src/dataprocessor/schemas.py @@ -41,6 +41,16 @@ class TableConfigSchema(BaseModel): description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)", example=["m_Artikel"], ) + batch_size: int = Field( + default=10000, + description="Number of rows to fetch per batch (for large tables)", + example=10000, + ) + order_by_column: str | None = Field( + default=None, + description="Column to order by for batch fetching. Required for batching to work.", + example="I_ID", + ) steps: List[Dict[str, Any]] = Field( default_factory=list, description="List of preprocessing steps to apply", diff --git a/src/dataprocessor/service.py b/src/dataprocessor/service.py index 998315e..6a58760 100644 --- a/src/dataprocessor/service.py +++ b/src/dataprocessor/service.py @@ -56,6 +56,8 @@ class DataProcessorService: table_name=table_config.powerbi_table_name, measures=table_config.measures, group_by_columns=table_config.group_by_columns, + batch_size=getattr(table_config, 'batch_size', 10000), + order_by_column=getattr(table_config, 'order_by_column', None), ) # Step 2: Read data from Power BI @@ -151,6 +153,8 @@ class DataProcessorService: table_name=table_config.powerbi_table_name, measures=table_config.measures, group_by_columns=table_config.group_by_columns, + batch_size=table_config.batch_size, + order_by_column=table_config.order_by_column, ) # Step 2: Read data from Power BI