From 9bca05dea24c1ae50e94eb339ce0bc68cc8e8f5f Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Tue, 23 Dec 2025 13:04:46 +0100 Subject: [PATCH] finished preprocessing batching --- README.md | 667 +++++++++++++++++++++ src/dataprocessor/domain/powerbi_reader.py | 113 +++- 2 files changed, 760 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index e69de29..66c1a21 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,667 @@ +# Gateway Preprocessing - Datenschnittstellen (Datenmodell) + +## Übersicht + +Dieses Projekt bietet zwei Hauptservices mit klar definierten Datenschnittstellen: + +1. **Data Processor Service** (`/dataprocessor`) - Lädt Daten aus Power BI, verarbeitet sie und speichert sie in SQLite +2. **Data Query Service** (`/dataquery`) - Ermöglicht SQL-Abfragen auf die gespeicherten Daten + +```mermaid +graph LR + A[Power BI Dataset] -->|Data Processor| B[SQLite Database] + B -->|Data Query| C[Client/API Consumer] + + style A fill:#f9f,stroke:#333,stroke-width:2px + style B fill:#bbf,stroke:#333,stroke-width:2px + style C fill:#bfb,stroke:#333,stroke-width:2px +``` + +--- + +## Data Processor Service + +Der Data Processor Service verarbeitet Daten aus Power BI und speichert sie in einer lokalen SQLite-Datenbank. Die Schemas befinden sich in `src/dataprocessor/schemas.py`. + +### Datenmodell-Hierarchie + +```mermaid +classDiagram + class PreprocessingConfigSchema { + +List~TableConfigSchema~ tables + } + + class TableConfigSchema { + +str name + +str powerbi_table_name + +List~str~ measures + +List~str~ group_by_columns + +List~Dict~ steps + } + + class UpdateDbWithConfigResponse { + +bool success + +int tables_processed + +List~str~ warnings + } + + class UpdateDbResponse { + +bool success + } + + PreprocessingConfigSchema "1" *-- "1..*" TableConfigSchema : enthält +``` + +### 1. TableConfigSchema + +Definiert die Konfiguration für eine einzelne Tabelle, die aus Power BI gelesen und verarbeitet werden soll. + +**Felder:** + +| Feld | Typ | Beschreibung | Beispiel | +|------|-----|--------------|----------| +| `name` | `str` | Name der Tabelle in der lokalen SQLite-Datenbank | `"ProductData"` | +| `powerbi_table_name` | `str` | Name der Quelltabelle im Power BI Dataset | `"products_raw"` | +| `measures` | `List[str]` | Liste von Power BI Measures, die abgerufen werden sollen (optional) | `["EP in CHF", "Gesamtbetrag"]` | +| `group_by_columns` | `List[str]` | Spalten für Gruppierung bei Measures (triggert SUMMARIZECOLUMNS) | `["m_Artikel", "Kategorie"]` | +| `steps` | `List[Dict[str, Any]]` | Liste von Preprocessing-Schritten | siehe unten | + +**Unterstützte Preprocessing-Schritte:** + +```mermaid +graph TD + A[Preprocessing Steps] --> B[keep] + A --> C[fillna] + A --> D[to_numeric] + A --> E[dropna] + + B --> B1["columns: List[str]
Behält nur angegebene Spalten"] + C --> C1["column: str, value: Any
Füllt fehlende Werte"] + D --> D1["column: str, errors: str
Konvertiert zu numerisch"] + E --> E1["subset: List[str]
Entfernt Zeilen mit NaN"] + + style A fill:#f96,stroke:#333,stroke-width:3px + style B fill:#9cf,stroke:#333,stroke-width:2px + style C fill:#9cf,stroke:#333,stroke-width:2px + style D fill:#9cf,stroke:#333,stroke-width:2px + style E fill:#9cf,stroke:#333,stroke-width:2px +``` + +**Beispiel:** + +```json +{ + "name": "ProductData", + "powerbi_table_name": "products_raw", + "measures": ["Total Sales", "Average Price"], + "group_by_columns": ["Category", "Supplier"], + "steps": [ + { + "keep": { + "columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"] + } + }, + { + "fillna": { + "column": "Supplier", + "value": "Unknown" + } + }, + { + "to_numeric": { + "column": "Price", + "errors": "coerce" + } + }, + { + "dropna": { + "subset": ["ProductID", "ProductName", "Price"] + } + } + ] +} +``` + +### 2. PreprocessingConfigSchema + +Komplette Konfiguration für einen Preprocessing-Request. Kann mehrere Tabellen gleichzeitig verarbeiten. + +**Felder:** + +| Feld | Typ | Beschreibung | Validierung | +|------|-----|--------------|-------------| +| `tables` | `List[TableConfigSchema]` | Liste von Tabellenkonfigurationen | min. 1 Tabelle | + +**Vollständiges Request-Beispiel:** + +```json +{ + "tables": [ + { + "name": "ProductData", + "powerbi_table_name": "products_raw", + "steps": [ + { + "keep": { + "columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"] + } + }, + { + "fillna": { + "column": "Supplier", + "value": "Unknown" + } + }, + { + "to_numeric": { + "column": "Price", + "errors": "coerce" + } + }, + { + "dropna": { + "subset": ["ProductID", "ProductName", "Stock", "Unit", "Price"] + } + } + ] + }, + { + "name": "CustomerData", + "powerbi_table_name": "customers_raw", + "steps": [ + { + "keep": { + "columns": ["CustomerID", "Name", "Email", "Region"] + } + }, + { + "fillna": { + "column": "Region", + "value": "Unknown" + } + } + ] + } + ] +} +``` + +### 3. UpdateDbResponse + +Einfache Response für Standard-DB-Updates (YAML-basiert). + +**Felder:** + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `success` | `bool` | Gibt an, ob das Update erfolgreich war | + +**Beispiel:** + +```json +{ + "success": true +} +``` + +### 4. UpdateDbWithConfigResponse + +Detaillierte Response für JSON-basierte DB-Updates mit Konfiguration. + +**Felder:** + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `success` | `bool` | Gibt an, ob das Update erfolgreich war | +| `tables_processed` | `int` | Anzahl der erfolgreich verarbeiteten Tabellen | +| `warnings` | `List[str]` | Liste von Warnungen während der Verarbeitung | + +**Beispiel:** + +```json +{ + "success": true, + "tables_processed": 2, + "warnings": [ + "Table 'ProductData': 3 rows dropped due to missing values", + "Table 'ProductData': Column 'Price' had 5 non-numeric values coerced to NaN" + ] +} +``` + +### Preprocessing-Workflow + +```mermaid +sequenceDiagram + participant Client + participant API + participant PowerBI + participant Preprocessor + participant SQLite + + Client->>API: POST /dataprocessor/update-db-with-config + Note over Client,API: PreprocessingConfigSchema + + loop Für jede Tabelle + API->>PowerBI: Daten abrufen (powerbi_table_name) + PowerBI-->>API: Raw DataFrame + API->>Preprocessor: Preprocessing-Steps anwenden + Preprocessor->>Preprocessor: keep → fillna → to_numeric → dropna + Preprocessor-->>API: Verarbeiteter DataFrame + API->>SQLite: Tabelle speichern (name) + end + + API-->>Client: UpdateDbWithConfigResponse + Note over Client,API: success, tables_processed, warnings +``` + +--- + +## Data Query Service + +Der Data Query Service ermöglicht SQL-Abfragen auf die gespeicherte SQLite-Datenbank. Die Schemas befinden sich in `src/dataquery/schemas.py`. + +### Datenmodell-Hierarchie + +```mermaid +classDiagram + class SqlQueryRequest { + +str query + } + + class SqlQueryResponse { + +bool success + +List~Dict~ data + +List~str~ columns + +int row_count + +Optional~str~ message + } + + class DatabaseSchemaResponse { + +bool success + +List~TableInfo~ tables + +int table_count + } + + class TableInfo { + +str name + +int row_count + } + + class TableSchemaResponse { + +bool success + +str table_name + +List~ColumnInfo~ columns + +int row_count + +List~Dict~ sample_data + } + + class ColumnInfo { + +str name + +str type + +bool nullable + +bool primary_key + } + + DatabaseSchemaResponse "1" *-- "*" TableInfo : enthält + TableSchemaResponse "1" *-- "*" ColumnInfo : enthält +``` + +### 1. SqlQueryRequest + +Request-Schema für SQL-Abfragen. + +**Felder:** + +| Feld | Typ | Beschreibung | Validierung | +|------|-----|--------------|-------------| +| `query` | `str` | SQL-Query (nur SELECT erlaubt) | min. 1 Zeichen | + +**Beispiel:** + +```json +{ + "query": "SELECT ProductID, ProductName, Price FROM ProductData WHERE Price > 10 ORDER BY Price DESC LIMIT 100" +} +``` + +### 2. SqlQueryResponse + +Response-Schema für SQL-Query-Ergebnisse. + +**Felder:** + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `success` | `bool` | Gibt an, ob die Query erfolgreich ausgeführt wurde | +| `data` | `List[Dict[str, Any]]` | Query-Ergebnisse als Liste von Dictionaries | +| `columns` | `List[str]` | Spaltennamen im Ergebnis | +| `row_count` | `int` | Anzahl der zurückgegebenen Zeilen | +| `message` | `Optional[str]` | Zusätzliche Informationen oder Fehlermeldung | + +**Beispiel:** + +```json +{ + "success": true, + "columns": ["ProductID", "ProductName", "Price"], + "row_count": 3, + "data": [ + { + "ProductID": 42, + "ProductName": "Premium Widget", + "Price": 99.99 + }, + { + "ProductID": 15, + "ProductName": "Deluxe Gadget", + "Price": 79.50 + }, + { + "ProductID": 8, + "ProductName": "Standard Tool", + "Price": 45.00 + } + ], + "message": null +} +``` + +### 3. TableInfo + +Informationen über eine Datenbanktabelle. + +**Felder:** + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `name` | `str` | Tabellenname | +| `row_count` | `int` | Anzahl der Zeilen in der Tabelle | + +### 4. DatabaseSchemaResponse + +Response-Schema für Datenbank-Schema-Informationen. + +**Felder:** + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war | +| `tables` | `List[TableInfo]` | Liste aller Tabellen in der Datenbank | +| `table_count` | `int` | Gesamtanzahl der Tabellen | + +**Beispiel:** + +```json +{ + "success": true, + "tables": [ + { + "name": "ProductData", + "row_count": 1523 + }, + { + "name": "CustomerData", + "row_count": 847 + }, + { + "name": "OrderData", + "row_count": 3421 + } + ], + "table_count": 3 +} +``` + +### 5. ColumnInfo + +Informationen über eine Tabellenspalte. + +**Felder:** + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `name` | `str` | Spaltenname | +| `type` | `str` | Datentyp der Spalte | +| `nullable` | `bool` | Ob die Spalte NULL-Werte enthalten kann | +| `primary_key` | `bool` | Ob die Spalte ein Primary Key ist | + +### 6. TableSchemaResponse + +Response-Schema für detaillierte Tabellenstruktur-Informationen. + +**Felder:** + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war | +| `table_name` | `str` | Name der Tabelle | +| `columns` | `List[ColumnInfo]` | Liste aller Spalten in der Tabelle | +| `row_count` | `int` | Anzahl der Zeilen in der Tabelle | +| `sample_data` | `List[Dict[str, Any]]` | Beispieldaten (bis zu 5 Zeilen) | + +**Beispiel:** + +```json +{ + "success": true, + "table_name": "ProductData", + "columns": [ + { + "name": "ProductID", + "type": "INTEGER", + "nullable": false, + "primary_key": true + }, + { + "name": "ProductName", + "type": "TEXT", + "nullable": false, + "primary_key": false + }, + { + "name": "Supplier", + "type": "TEXT", + "nullable": true, + "primary_key": false + }, + { + "name": "Price", + "type": "REAL", + "nullable": true, + "primary_key": false + } + ], + "row_count": 1523, + "sample_data": [ + { + "ProductID": 1, + "ProductName": "Widget A", + "Supplier": "Acme Corp", + "Price": 12.50 + }, + { + "ProductID": 2, + "ProductName": "Gadget B", + "Supplier": "TechCo", + "Price": 24.99 + }, + { + "ProductID": 3, + "ProductName": "Tool C", + "Supplier": "Unknown", + "Price": 8.75 + } + ] +} +``` + +### Query-Workflow + +```mermaid +sequenceDiagram + participant Client + participant API + participant SQLite + + rect rgb(200, 220, 250) + Note over Client,SQLite: Szenario 1: SQL Query ausführen + Client->>API: POST /dataquery/query + Note over Client,API: SqlQueryRequest + API->>SQLite: SELECT ausführen + SQLite-->>API: Ergebnisse + API-->>Client: SqlQueryResponse + Note over Client,API: data, columns, row_count + end + + rect rgb(220, 250, 220) + Note over Client,SQLite: Szenario 2: Datenbank-Schema abrufen + Client->>API: GET /dataquery/schema + API->>SQLite: Tabellen auflisten + SQLite-->>API: Tabellenliste + API-->>Client: DatabaseSchemaResponse + Note over Client,API: tables, table_count + end + + rect rgb(250, 220, 220) + Note over Client,SQLite: Szenario 3: Tabellenstruktur abrufen + Client->>API: GET /dataquery/table/{table_name} + API->>SQLite: Spalten + Sample Data abrufen + SQLite-->>API: Struktur + Daten + API-->>Client: TableSchemaResponse + Note over Client,API: columns, sample_data + end +``` + +--- + +## Gesamtarchitektur + +```mermaid +graph TB + subgraph "External Data Source" + PBI[Power BI Dataset] + end + + subgraph "Gateway Preprocessing API" + subgraph "Data Processor Service" + DP_Router[Router] + DP_Service[Service] + DP_Schemas[Schemas] + DP_Domain[Domain Layer] + end + + subgraph "Data Query Service" + DQ_Router[Router] + DQ_Service[Service] + DQ_Schemas[Schemas] + end + end + + subgraph "Local Storage" + DB[(SQLite Database)] + end + + subgraph "Clients" + Client1[Web App] + Client2[BI Tool] + Client3[API Consumer] + end + + PBI -->|Power BI Reader| DP_Domain + DP_Router --> DP_Service + DP_Service --> DP_Domain + DP_Domain -->|Save| DB + + DQ_Router --> DQ_Service + DQ_Service -->|Query| DB + + Client1 -->|POST /dataprocessor/update-db-with-config| DP_Router + Client2 -->|POST /dataquery/query| DQ_Router + Client3 -->|GET /dataquery/schema| DQ_Router + + style PBI fill:#f9f,stroke:#333,stroke-width:2px + style DB fill:#bbf,stroke:#333,stroke-width:3px + style DP_Schemas fill:#ffa,stroke:#333,stroke-width:2px + style DQ_Schemas fill:#ffa,stroke:#333,stroke-width:2px +``` + +--- + +## API-Endpunkte Übersicht + +### Data Processor Endpoints + +| Methode | Endpoint | Request Schema | Response Schema | Beschreibung | +|---------|----------|----------------|-----------------|--------------| +| POST | `/dataprocessor/update-db` | - | `UpdateDbResponse` | Aktualisiert DB mit YAML-Config | +| POST | `/dataprocessor/update-db-with-config` | `PreprocessingConfigSchema` | `UpdateDbWithConfigResponse` | Aktualisiert DB mit JSON-Config | + +### Data Query Endpoints + +| Methode | Endpoint | Request Schema | Response Schema | Beschreibung | +|---------|----------|----------------|-----------------|--------------| +| POST | `/dataquery/query` | `SqlQueryRequest` | `SqlQueryResponse` | Führt SQL-Query aus | +| GET | `/dataquery/schema` | - | `DatabaseSchemaResponse` | Gibt DB-Schema zurück | +| GET | `/dataquery/table/{table_name}` | - | `TableSchemaResponse` | Gibt Tabellenstruktur zurück | + +--- + +## Typische Use Cases + +### Use Case 1: Daten aus Power BI laden und verarbeiten + +```mermaid +flowchart LR + A[Client] -->|1. POST PreprocessingConfigSchema| B[API] + B -->|2. Daten abrufen| C[Power BI] + C -->|3. Raw Data| B + B -->|4. Preprocessing| B + B -->|5. Speichern| D[(SQLite)] + B -->|6. UpdateDbWithConfigResponse| A + + style A fill:#bfb,stroke:#333,stroke-width:2px + style C fill:#f9f,stroke:#333,stroke-width:2px + style D fill:#bbf,stroke:#333,stroke-width:2px +``` + +### Use Case 2: Daten abfragen + +```mermaid +flowchart LR + A[Client] -->|1. POST SqlQueryRequest| B[API] + B -->|2. SELECT Query| C[(SQLite)] + C -->|3. Ergebnisse| B + B -->|4. SqlQueryResponse| A + + style A fill:#bfb,stroke:#333,stroke-width:2px + style C fill:#bbf,stroke:#333,stroke-width:2px +``` + +### Use Case 3: Datenbank-Struktur erkunden + +```mermaid +flowchart TD + A[Client] -->|1. GET /schema| B[API] + B -->|2. DatabaseSchemaResponse| A + A -->|3. GET /table/ProductData| B + B -->|4. TableSchemaResponse| A + A -->|5. POST SqlQueryRequest| B + B -->|6. SqlQueryResponse| A + + style A fill:#bfb,stroke:#333,stroke-width:2px +``` + +--- + +## Validierung und Fehlerbehandlung + +Alle Schemas verwenden Pydantic für automatische Validierung: + +- **Typsicherheit**: Alle Felder werden auf korrekte Typen geprüft +- **Required Fields**: Pflichtfelder müssen vorhanden sein +- **Min/Max Validierung**: z.B. `min_items=1` für `tables` +- **String Validierung**: z.B. `min_length=1` für SQL-Queries +- **Custom Validation**: Zusätzliche Business-Logik in den Services + +Bei Validierungsfehlern gibt die API einen HTTP 422 (Unprocessable Entity) mit detaillierten Fehlermeldungen zurück. diff --git a/src/dataprocessor/domain/powerbi_reader.py b/src/dataprocessor/domain/powerbi_reader.py index 74af88a..b585305 100644 --- a/src/dataprocessor/domain/powerbi_reader.py +++ b/src/dataprocessor/domain/powerbi_reader.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from src.settings import settings import pandas as pd import httpx +import re @dataclass @@ -63,8 +64,10 @@ class PowerBIReader: Returns: DAX query string to execute against Power BI. """ + # Remove XML/HTML tags from table name (e.g., 'Artikel' -> 'Artikel') + cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip() # Escape single quotes in table names per DAX rules - safe_table = self.table_name.replace("'", "''") + safe_table = cleaned_table_name.replace("'", "''") # Case 1: No measures - simple table evaluation if not self.measures: @@ -121,7 +124,10 @@ class PowerBIReader: Returns: DAX query string for fetching the next batch. """ - safe_table = self.table_name.replace("'", "''") + # Remove XML/HTML tags from table name (e.g., 'Artikel' -> 'Artikel') + cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip() + # Escape single quotes in table names per DAX rules + safe_table = cleaned_table_name.replace("'", "''") order_col = self.order_by_column if last_value is None: @@ -132,20 +138,27 @@ class PowerBIReader: ) # Subsequent batches: filter rows where order_col > last_value - # IMPORTANT: Detect if value is numeric (even if stored as string) - # to ensure proper numeric comparison in DAX, not string comparison + # IMPORTANT: Handle type conversion to avoid DAX type mismatch errors + # Use a type-safe approach: convert column using IF/ISNUMBER to handle both text and numeric columns if self._is_numeric_value(last_value): - # Use numeric literal (no quotes) for proper numeric comparison - filter_value = str(float(last_value) if '.' in str(last_value) else int(last_value)) + numeric_val = float(last_value) if '.' in str(last_value) else int(last_value) + # Use IF to check if column is numeric, if so use it directly, otherwise convert with VALUE() + # This handles both text columns (VALUE converts) and numeric columns (use directly) + return ( + f"EVALUATE TOPN({self.batch_size}, " + f"FILTER('{safe_table}', " + f"NOT(ISBLANK('{safe_table}'[{order_col}])) && " + f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])) > {numeric_val}), " + f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])), ASC)" + ) else: - # Genuine string value - quote it for DAX - filter_value = f'"{last_value}"' - - return ( - f"EVALUATE TOPN({self.batch_size}, " - f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > {filter_value}), " - f"'{safe_table}'[{order_col}], ASC)" - ) + # String comparison - escape quotes in the value + escaped_value = str(last_value).replace('"', '""') + return ( + f"EVALUATE TOPN({self.batch_size}, " + f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > \"{escaped_value}\"), " + f"'{safe_table}'[{order_col}], ASC)" + ) async def _execute_query(self, dax_query: str) -> pd.DataFrame: """Execute a DAX query and return the results as a DataFrame. @@ -156,6 +169,11 @@ class PowerBIReader: Returns: DataFrame containing the query results. """ + # Log the DAX query for debugging + import logging + logger = logging.getLogger(__name__) + logger.info(f"Executing DAX query: {dax_query}") + url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries" body = { "queries": [{"query": dax_query}], @@ -171,6 +189,7 @@ class PowerBIReader: resp = await client.post(url, headers=headers, json=body) if resp.status_code != 200: + logger.error(f"DAX query failed: {dax_query}") raise RuntimeError( f"Power BI executeQueries failed: {resp.status_code} - {resp.text}" ) @@ -210,20 +229,23 @@ class PowerBIReader: return await self._execute_query(self._dax_query()) # Batch fetching with keyset pagination + import logging + logger = logging.getLogger(__name__) + all_dfs: list[pd.DataFrame] = [] last_value: str | int | None = None batch_num = 0 + total_rows = 0 while True: batch_num += 1 # Safety limit to prevent runaway requests if batch_num > self.max_batches: - import logging - logging.warning( + logger.warning( f"Reached max_batches limit ({self.max_batches}) for table " f"'{self.table_name}'. Stopping batch fetch. " - f"Total rows fetched so far: {sum(len(df) for df in all_dfs)}" + f"Total rows fetched so far: {total_rows}" ) break @@ -231,24 +253,75 @@ class PowerBIReader: df = await self._execute_query(dax_query) if df.empty: - # No more data to fetch + logger.info(f"Batch {batch_num}: Empty result, stopping fetch") break + batch_rows = len(df) + total_rows += batch_rows + + # Log batch info + if self.order_by_column in df.columns: + min_val = df[self.order_by_column].min() + max_val = df[self.order_by_column].max() + logger.info( + f"Batch {batch_num}: Fetched {batch_rows} rows, " + f"{self.order_by_column} range: {min_val} to {max_val}, " + f"Total so far: {total_rows}" + ) + else: + logger.info(f"Batch {batch_num}: Fetched {batch_rows} rows, Total so far: {total_rows}") + all_dfs.append(df) - # Get the last value for the next batch - new_last_value = df[self.order_by_column].iloc[-1] + # Get the maximum value for the next batch (not just the last row) + # This ensures we don't skip rows when there are multiple rows per ID + # Using max() instead of iloc[-1] because rows are ordered by I_ID, + # but if there are multiple rows with the same I_ID, the last row might + # have a smaller I_ID than the maximum in the batch + max_value = df[self.order_by_column].max() + + # Ensure numeric values are preserved as numeric (not converted to string) + # Check if the column is numeric and convert the value accordingly + if pd.api.types.is_numeric_dtype(df[self.order_by_column].dtype): + # Column is numeric - ensure value is numeric + if pd.isna(max_value): + logger.info(f"Batch {batch_num}: Max value is NaN, stopping") + break # Can't continue with NaN + # Convert to appropriate numeric type + if pd.api.types.is_integer_dtype(df[self.order_by_column].dtype): + new_last_value = int(max_value) + else: + new_last_value = float(max_value) + else: + new_last_value = max_value # Safety check: if last_value didn't change, we're stuck in a loop if new_last_value == last_value: + logger.warning( + f"Batch {batch_num}: last_value didn't change ({last_value}), " + f"stopping to avoid infinite loop" + ) break last_value = new_last_value + + logger.info(f"Finished fetching batches for table '{self.table_name}': {batch_num} batches, {total_rows} total rows") if not all_dfs: return pd.DataFrame() result = pd.concat(all_dfs, ignore_index=True) + + # Log statistics + if self.order_by_column and self.order_by_column in result.columns: + unique_ids = result[self.order_by_column].nunique() + total_rows_final = len(result) + logger.info( + f"Table '{self.table_name}': {total_rows_final} total rows, " + f"{unique_ids} unique {self.order_by_column} values, " + f"average {total_rows_final / unique_ids:.2f} rows per {self.order_by_column}" + ) + return result @staticmethod