diff --git a/README.md b/README.md
index e69de29..66c1a21 100644
--- a/README.md
+++ b/README.md
@@ -0,0 +1,667 @@
+# Gateway Preprocessing - Datenschnittstellen (Datenmodell)
+
+## Übersicht
+
+Dieses Projekt bietet zwei Hauptservices mit klar definierten Datenschnittstellen:
+
+1. **Data Processor Service** (`/dataprocessor`) - Lädt Daten aus Power BI, verarbeitet sie und speichert sie in SQLite
+2. **Data Query Service** (`/dataquery`) - Ermöglicht SQL-Abfragen auf die gespeicherten Daten
+
+```mermaid
+graph LR
+ A[Power BI Dataset] -->|Data Processor| B[SQLite Database]
+ B -->|Data Query| C[Client/API Consumer]
+
+ style A fill:#f9f,stroke:#333,stroke-width:2px
+ style B fill:#bbf,stroke:#333,stroke-width:2px
+ style C fill:#bfb,stroke:#333,stroke-width:2px
+```
+
+---
+
+## Data Processor Service
+
+Der Data Processor Service verarbeitet Daten aus Power BI und speichert sie in einer lokalen SQLite-Datenbank. Die Schemas befinden sich in `src/dataprocessor/schemas.py`.
+
+### Datenmodell-Hierarchie
+
+```mermaid
+classDiagram
+ class PreprocessingConfigSchema {
+ +List~TableConfigSchema~ tables
+ }
+
+ class TableConfigSchema {
+ +str name
+ +str powerbi_table_name
+ +List~str~ measures
+ +List~str~ group_by_columns
+ +List~Dict~ steps
+ }
+
+ class UpdateDbWithConfigResponse {
+ +bool success
+ +int tables_processed
+ +List~str~ warnings
+ }
+
+ class UpdateDbResponse {
+ +bool success
+ }
+
+ PreprocessingConfigSchema "1" *-- "1..*" TableConfigSchema : enthält
+```
+
+### 1. TableConfigSchema
+
+Definiert die Konfiguration für eine einzelne Tabelle, die aus Power BI gelesen und verarbeitet werden soll.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung | Beispiel |
+|------|-----|--------------|----------|
+| `name` | `str` | Name der Tabelle in der lokalen SQLite-Datenbank | `"ProductData"` |
+| `powerbi_table_name` | `str` | Name der Quelltabelle im Power BI Dataset | `"products_raw"` |
+| `measures` | `List[str]` | Liste von Power BI Measures, die abgerufen werden sollen (optional) | `["EP in CHF", "Gesamtbetrag"]` |
+| `group_by_columns` | `List[str]` | Spalten für Gruppierung bei Measures (triggert SUMMARIZECOLUMNS) | `["m_Artikel", "Kategorie"]` |
+| `steps` | `List[Dict[str, Any]]` | Liste von Preprocessing-Schritten | siehe unten |
+
+**Unterstützte Preprocessing-Schritte:**
+
+```mermaid
+graph TD
+ A[Preprocessing Steps] --> B[keep]
+ A --> C[fillna]
+ A --> D[to_numeric]
+ A --> E[dropna]
+
+ B --> B1["columns: List[str]
Behält nur angegebene Spalten"]
+ C --> C1["column: str, value: Any
Füllt fehlende Werte"]
+ D --> D1["column: str, errors: str
Konvertiert zu numerisch"]
+ E --> E1["subset: List[str]
Entfernt Zeilen mit NaN"]
+
+ style A fill:#f96,stroke:#333,stroke-width:3px
+ style B fill:#9cf,stroke:#333,stroke-width:2px
+ style C fill:#9cf,stroke:#333,stroke-width:2px
+ style D fill:#9cf,stroke:#333,stroke-width:2px
+ style E fill:#9cf,stroke:#333,stroke-width:2px
+```
+
+**Beispiel:**
+
+```json
+{
+ "name": "ProductData",
+ "powerbi_table_name": "products_raw",
+ "measures": ["Total Sales", "Average Price"],
+ "group_by_columns": ["Category", "Supplier"],
+ "steps": [
+ {
+ "keep": {
+ "columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
+ }
+ },
+ {
+ "fillna": {
+ "column": "Supplier",
+ "value": "Unknown"
+ }
+ },
+ {
+ "to_numeric": {
+ "column": "Price",
+ "errors": "coerce"
+ }
+ },
+ {
+ "dropna": {
+ "subset": ["ProductID", "ProductName", "Price"]
+ }
+ }
+ ]
+}
+```
+
+### 2. PreprocessingConfigSchema
+
+Komplette Konfiguration für einen Preprocessing-Request. Kann mehrere Tabellen gleichzeitig verarbeiten.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung | Validierung |
+|------|-----|--------------|-------------|
+| `tables` | `List[TableConfigSchema]` | Liste von Tabellenkonfigurationen | min. 1 Tabelle |
+
+**Vollständiges Request-Beispiel:**
+
+```json
+{
+ "tables": [
+ {
+ "name": "ProductData",
+ "powerbi_table_name": "products_raw",
+ "steps": [
+ {
+ "keep": {
+ "columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
+ }
+ },
+ {
+ "fillna": {
+ "column": "Supplier",
+ "value": "Unknown"
+ }
+ },
+ {
+ "to_numeric": {
+ "column": "Price",
+ "errors": "coerce"
+ }
+ },
+ {
+ "dropna": {
+ "subset": ["ProductID", "ProductName", "Stock", "Unit", "Price"]
+ }
+ }
+ ]
+ },
+ {
+ "name": "CustomerData",
+ "powerbi_table_name": "customers_raw",
+ "steps": [
+ {
+ "keep": {
+ "columns": ["CustomerID", "Name", "Email", "Region"]
+ }
+ },
+ {
+ "fillna": {
+ "column": "Region",
+ "value": "Unknown"
+ }
+ }
+ ]
+ }
+ ]
+}
+```
+
+### 3. UpdateDbResponse
+
+Einfache Response für Standard-DB-Updates (YAML-basiert).
+
+**Felder:**
+
+| Feld | Typ | Beschreibung |
+|------|-----|--------------|
+| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
+
+**Beispiel:**
+
+```json
+{
+ "success": true
+}
+```
+
+### 4. UpdateDbWithConfigResponse
+
+Detaillierte Response für JSON-basierte DB-Updates mit Konfiguration.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung |
+|------|-----|--------------|
+| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
+| `tables_processed` | `int` | Anzahl der erfolgreich verarbeiteten Tabellen |
+| `warnings` | `List[str]` | Liste von Warnungen während der Verarbeitung |
+
+**Beispiel:**
+
+```json
+{
+ "success": true,
+ "tables_processed": 2,
+ "warnings": [
+ "Table 'ProductData': 3 rows dropped due to missing values",
+ "Table 'ProductData': Column 'Price' had 5 non-numeric values coerced to NaN"
+ ]
+}
+```
+
+### Preprocessing-Workflow
+
+```mermaid
+sequenceDiagram
+ participant Client
+ participant API
+ participant PowerBI
+ participant Preprocessor
+ participant SQLite
+
+ Client->>API: POST /dataprocessor/update-db-with-config
+ Note over Client,API: PreprocessingConfigSchema
+
+ loop Für jede Tabelle
+ API->>PowerBI: Daten abrufen (powerbi_table_name)
+ PowerBI-->>API: Raw DataFrame
+ API->>Preprocessor: Preprocessing-Steps anwenden
+ Preprocessor->>Preprocessor: keep → fillna → to_numeric → dropna
+ Preprocessor-->>API: Verarbeiteter DataFrame
+ API->>SQLite: Tabelle speichern (name)
+ end
+
+ API-->>Client: UpdateDbWithConfigResponse
+ Note over Client,API: success, tables_processed, warnings
+```
+
+---
+
+## Data Query Service
+
+Der Data Query Service ermöglicht SQL-Abfragen auf die gespeicherte SQLite-Datenbank. Die Schemas befinden sich in `src/dataquery/schemas.py`.
+
+### Datenmodell-Hierarchie
+
+```mermaid
+classDiagram
+ class SqlQueryRequest {
+ +str query
+ }
+
+ class SqlQueryResponse {
+ +bool success
+ +List~Dict~ data
+ +List~str~ columns
+ +int row_count
+ +Optional~str~ message
+ }
+
+ class DatabaseSchemaResponse {
+ +bool success
+ +List~TableInfo~ tables
+ +int table_count
+ }
+
+ class TableInfo {
+ +str name
+ +int row_count
+ }
+
+ class TableSchemaResponse {
+ +bool success
+ +str table_name
+ +List~ColumnInfo~ columns
+ +int row_count
+ +List~Dict~ sample_data
+ }
+
+ class ColumnInfo {
+ +str name
+ +str type
+ +bool nullable
+ +bool primary_key
+ }
+
+ DatabaseSchemaResponse "1" *-- "*" TableInfo : enthält
+ TableSchemaResponse "1" *-- "*" ColumnInfo : enthält
+```
+
+### 1. SqlQueryRequest
+
+Request-Schema für SQL-Abfragen.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung | Validierung |
+|------|-----|--------------|-------------|
+| `query` | `str` | SQL-Query (nur SELECT erlaubt) | min. 1 Zeichen |
+
+**Beispiel:**
+
+```json
+{
+ "query": "SELECT ProductID, ProductName, Price FROM ProductData WHERE Price > 10 ORDER BY Price DESC LIMIT 100"
+}
+```
+
+### 2. SqlQueryResponse
+
+Response-Schema für SQL-Query-Ergebnisse.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung |
+|------|-----|--------------|
+| `success` | `bool` | Gibt an, ob die Query erfolgreich ausgeführt wurde |
+| `data` | `List[Dict[str, Any]]` | Query-Ergebnisse als Liste von Dictionaries |
+| `columns` | `List[str]` | Spaltennamen im Ergebnis |
+| `row_count` | `int` | Anzahl der zurückgegebenen Zeilen |
+| `message` | `Optional[str]` | Zusätzliche Informationen oder Fehlermeldung |
+
+**Beispiel:**
+
+```json
+{
+ "success": true,
+ "columns": ["ProductID", "ProductName", "Price"],
+ "row_count": 3,
+ "data": [
+ {
+ "ProductID": 42,
+ "ProductName": "Premium Widget",
+ "Price": 99.99
+ },
+ {
+ "ProductID": 15,
+ "ProductName": "Deluxe Gadget",
+ "Price": 79.50
+ },
+ {
+ "ProductID": 8,
+ "ProductName": "Standard Tool",
+ "Price": 45.00
+ }
+ ],
+ "message": null
+}
+```
+
+### 3. TableInfo
+
+Informationen über eine Datenbanktabelle.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung |
+|------|-----|--------------|
+| `name` | `str` | Tabellenname |
+| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
+
+### 4. DatabaseSchemaResponse
+
+Response-Schema für Datenbank-Schema-Informationen.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung |
+|------|-----|--------------|
+| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
+| `tables` | `List[TableInfo]` | Liste aller Tabellen in der Datenbank |
+| `table_count` | `int` | Gesamtanzahl der Tabellen |
+
+**Beispiel:**
+
+```json
+{
+ "success": true,
+ "tables": [
+ {
+ "name": "ProductData",
+ "row_count": 1523
+ },
+ {
+ "name": "CustomerData",
+ "row_count": 847
+ },
+ {
+ "name": "OrderData",
+ "row_count": 3421
+ }
+ ],
+ "table_count": 3
+}
+```
+
+### 5. ColumnInfo
+
+Informationen über eine Tabellenspalte.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung |
+|------|-----|--------------|
+| `name` | `str` | Spaltenname |
+| `type` | `str` | Datentyp der Spalte |
+| `nullable` | `bool` | Ob die Spalte NULL-Werte enthalten kann |
+| `primary_key` | `bool` | Ob die Spalte ein Primary Key ist |
+
+### 6. TableSchemaResponse
+
+Response-Schema für detaillierte Tabellenstruktur-Informationen.
+
+**Felder:**
+
+| Feld | Typ | Beschreibung |
+|------|-----|--------------|
+| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
+| `table_name` | `str` | Name der Tabelle |
+| `columns` | `List[ColumnInfo]` | Liste aller Spalten in der Tabelle |
+| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
+| `sample_data` | `List[Dict[str, Any]]` | Beispieldaten (bis zu 5 Zeilen) |
+
+**Beispiel:**
+
+```json
+{
+ "success": true,
+ "table_name": "ProductData",
+ "columns": [
+ {
+ "name": "ProductID",
+ "type": "INTEGER",
+ "nullable": false,
+ "primary_key": true
+ },
+ {
+ "name": "ProductName",
+ "type": "TEXT",
+ "nullable": false,
+ "primary_key": false
+ },
+ {
+ "name": "Supplier",
+ "type": "TEXT",
+ "nullable": true,
+ "primary_key": false
+ },
+ {
+ "name": "Price",
+ "type": "REAL",
+ "nullable": true,
+ "primary_key": false
+ }
+ ],
+ "row_count": 1523,
+ "sample_data": [
+ {
+ "ProductID": 1,
+ "ProductName": "Widget A",
+ "Supplier": "Acme Corp",
+ "Price": 12.50
+ },
+ {
+ "ProductID": 2,
+ "ProductName": "Gadget B",
+ "Supplier": "TechCo",
+ "Price": 24.99
+ },
+ {
+ "ProductID": 3,
+ "ProductName": "Tool C",
+ "Supplier": "Unknown",
+ "Price": 8.75
+ }
+ ]
+}
+```
+
+### Query-Workflow
+
+```mermaid
+sequenceDiagram
+ participant Client
+ participant API
+ participant SQLite
+
+ rect rgb(200, 220, 250)
+ Note over Client,SQLite: Szenario 1: SQL Query ausführen
+ Client->>API: POST /dataquery/query
+ Note over Client,API: SqlQueryRequest
+ API->>SQLite: SELECT ausführen
+ SQLite-->>API: Ergebnisse
+ API-->>Client: SqlQueryResponse
+ Note over Client,API: data, columns, row_count
+ end
+
+ rect rgb(220, 250, 220)
+ Note over Client,SQLite: Szenario 2: Datenbank-Schema abrufen
+ Client->>API: GET /dataquery/schema
+ API->>SQLite: Tabellen auflisten
+ SQLite-->>API: Tabellenliste
+ API-->>Client: DatabaseSchemaResponse
+ Note over Client,API: tables, table_count
+ end
+
+ rect rgb(250, 220, 220)
+ Note over Client,SQLite: Szenario 3: Tabellenstruktur abrufen
+ Client->>API: GET /dataquery/table/{table_name}
+ API->>SQLite: Spalten + Sample Data abrufen
+ SQLite-->>API: Struktur + Daten
+ API-->>Client: TableSchemaResponse
+ Note over Client,API: columns, sample_data
+ end
+```
+
+---
+
+## Gesamtarchitektur
+
+```mermaid
+graph TB
+ subgraph "External Data Source"
+ PBI[Power BI Dataset]
+ end
+
+ subgraph "Gateway Preprocessing API"
+ subgraph "Data Processor Service"
+ DP_Router[Router]
+ DP_Service[Service]
+ DP_Schemas[Schemas]
+ DP_Domain[Domain Layer]
+ end
+
+ subgraph "Data Query Service"
+ DQ_Router[Router]
+ DQ_Service[Service]
+ DQ_Schemas[Schemas]
+ end
+ end
+
+ subgraph "Local Storage"
+ DB[(SQLite Database)]
+ end
+
+ subgraph "Clients"
+ Client1[Web App]
+ Client2[BI Tool]
+ Client3[API Consumer]
+ end
+
+ PBI -->|Power BI Reader| DP_Domain
+ DP_Router --> DP_Service
+ DP_Service --> DP_Domain
+ DP_Domain -->|Save| DB
+
+ DQ_Router --> DQ_Service
+ DQ_Service -->|Query| DB
+
+ Client1 -->|POST /dataprocessor/update-db-with-config| DP_Router
+ Client2 -->|POST /dataquery/query| DQ_Router
+ Client3 -->|GET /dataquery/schema| DQ_Router
+
+ style PBI fill:#f9f,stroke:#333,stroke-width:2px
+ style DB fill:#bbf,stroke:#333,stroke-width:3px
+ style DP_Schemas fill:#ffa,stroke:#333,stroke-width:2px
+ style DQ_Schemas fill:#ffa,stroke:#333,stroke-width:2px
+```
+
+---
+
+## API-Endpunkte Übersicht
+
+### Data Processor Endpoints
+
+| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
+|---------|----------|----------------|-----------------|--------------|
+| POST | `/dataprocessor/update-db` | - | `UpdateDbResponse` | Aktualisiert DB mit YAML-Config |
+| POST | `/dataprocessor/update-db-with-config` | `PreprocessingConfigSchema` | `UpdateDbWithConfigResponse` | Aktualisiert DB mit JSON-Config |
+
+### Data Query Endpoints
+
+| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
+|---------|----------|----------------|-----------------|--------------|
+| POST | `/dataquery/query` | `SqlQueryRequest` | `SqlQueryResponse` | Führt SQL-Query aus |
+| GET | `/dataquery/schema` | - | `DatabaseSchemaResponse` | Gibt DB-Schema zurück |
+| GET | `/dataquery/table/{table_name}` | - | `TableSchemaResponse` | Gibt Tabellenstruktur zurück |
+
+---
+
+## Typische Use Cases
+
+### Use Case 1: Daten aus Power BI laden und verarbeiten
+
+```mermaid
+flowchart LR
+ A[Client] -->|1. POST PreprocessingConfigSchema| B[API]
+ B -->|2. Daten abrufen| C[Power BI]
+ C -->|3. Raw Data| B
+ B -->|4. Preprocessing| B
+ B -->|5. Speichern| D[(SQLite)]
+ B -->|6. UpdateDbWithConfigResponse| A
+
+ style A fill:#bfb,stroke:#333,stroke-width:2px
+ style C fill:#f9f,stroke:#333,stroke-width:2px
+ style D fill:#bbf,stroke:#333,stroke-width:2px
+```
+
+### Use Case 2: Daten abfragen
+
+```mermaid
+flowchart LR
+ A[Client] -->|1. POST SqlQueryRequest| B[API]
+ B -->|2. SELECT Query| C[(SQLite)]
+ C -->|3. Ergebnisse| B
+ B -->|4. SqlQueryResponse| A
+
+ style A fill:#bfb,stroke:#333,stroke-width:2px
+ style C fill:#bbf,stroke:#333,stroke-width:2px
+```
+
+### Use Case 3: Datenbank-Struktur erkunden
+
+```mermaid
+flowchart TD
+ A[Client] -->|1. GET /schema| B[API]
+ B -->|2. DatabaseSchemaResponse| A
+ A -->|3. GET /table/ProductData| B
+ B -->|4. TableSchemaResponse| A
+ A -->|5. POST SqlQueryRequest| B
+ B -->|6. SqlQueryResponse| A
+
+ style A fill:#bfb,stroke:#333,stroke-width:2px
+```
+
+---
+
+## Validierung und Fehlerbehandlung
+
+Alle Schemas verwenden Pydantic für automatische Validierung:
+
+- **Typsicherheit**: Alle Felder werden auf korrekte Typen geprüft
+- **Required Fields**: Pflichtfelder müssen vorhanden sein
+- **Min/Max Validierung**: z.B. `min_items=1` für `tables`
+- **String Validierung**: z.B. `min_length=1` für SQL-Queries
+- **Custom Validation**: Zusätzliche Business-Logik in den Services
+
+Bei Validierungsfehlern gibt die API einen HTTP 422 (Unprocessable Entity) mit detaillierten Fehlermeldungen zurück.
diff --git a/src/dataprocessor/domain/powerbi_reader.py b/src/dataprocessor/domain/powerbi_reader.py
index 74af88a..b585305 100644
--- a/src/dataprocessor/domain/powerbi_reader.py
+++ b/src/dataprocessor/domain/powerbi_reader.py
@@ -4,6 +4,7 @@ from dataclasses import dataclass
from src.settings import settings
import pandas as pd
import httpx
+import re
@dataclass
@@ -63,8 +64,10 @@ class PowerBIReader:
Returns:
DAX query string to execute against Power BI.
"""
+ # Remove XML/HTML tags from table name (e.g., 'Artikel' -> 'Artikel')
+ cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
# Escape single quotes in table names per DAX rules
- safe_table = self.table_name.replace("'", "''")
+ safe_table = cleaned_table_name.replace("'", "''")
# Case 1: No measures - simple table evaluation
if not self.measures:
@@ -121,7 +124,10 @@ class PowerBIReader:
Returns:
DAX query string for fetching the next batch.
"""
- safe_table = self.table_name.replace("'", "''")
+ # Remove XML/HTML tags from table name (e.g., 'Artikel' -> 'Artikel')
+ cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
+ # Escape single quotes in table names per DAX rules
+ safe_table = cleaned_table_name.replace("'", "''")
order_col = self.order_by_column
if last_value is None:
@@ -132,20 +138,27 @@ class PowerBIReader:
)
# Subsequent batches: filter rows where order_col > last_value
- # IMPORTANT: Detect if value is numeric (even if stored as string)
- # to ensure proper numeric comparison in DAX, not string comparison
+ # IMPORTANT: Handle type conversion to avoid DAX type mismatch errors
+ # Use a type-safe approach: convert column using IF/ISNUMBER to handle both text and numeric columns
if self._is_numeric_value(last_value):
- # Use numeric literal (no quotes) for proper numeric comparison
- filter_value = str(float(last_value) if '.' in str(last_value) else int(last_value))
+ numeric_val = float(last_value) if '.' in str(last_value) else int(last_value)
+ # Use IF to check if column is numeric, if so use it directly, otherwise convert with VALUE()
+ # This handles both text columns (VALUE converts) and numeric columns (use directly)
+ return (
+ f"EVALUATE TOPN({self.batch_size}, "
+ f"FILTER('{safe_table}', "
+ f"NOT(ISBLANK('{safe_table}'[{order_col}])) && "
+ f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])) > {numeric_val}), "
+ f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])), ASC)"
+ )
else:
- # Genuine string value - quote it for DAX
- filter_value = f'"{last_value}"'
-
- return (
- f"EVALUATE TOPN({self.batch_size}, "
- f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > {filter_value}), "
- f"'{safe_table}'[{order_col}], ASC)"
- )
+ # String comparison - escape quotes in the value
+ escaped_value = str(last_value).replace('"', '""')
+ return (
+ f"EVALUATE TOPN({self.batch_size}, "
+ f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > \"{escaped_value}\"), "
+ f"'{safe_table}'[{order_col}], ASC)"
+ )
async def _execute_query(self, dax_query: str) -> pd.DataFrame:
"""Execute a DAX query and return the results as a DataFrame.
@@ -156,6 +169,11 @@ class PowerBIReader:
Returns:
DataFrame containing the query results.
"""
+ # Log the DAX query for debugging
+ import logging
+ logger = logging.getLogger(__name__)
+ logger.info(f"Executing DAX query: {dax_query}")
+
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
body = {
"queries": [{"query": dax_query}],
@@ -171,6 +189,7 @@ class PowerBIReader:
resp = await client.post(url, headers=headers, json=body)
if resp.status_code != 200:
+ logger.error(f"DAX query failed: {dax_query}")
raise RuntimeError(
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
)
@@ -210,20 +229,23 @@ class PowerBIReader:
return await self._execute_query(self._dax_query())
# Batch fetching with keyset pagination
+ import logging
+ logger = logging.getLogger(__name__)
+
all_dfs: list[pd.DataFrame] = []
last_value: str | int | None = None
batch_num = 0
+ total_rows = 0
while True:
batch_num += 1
# Safety limit to prevent runaway requests
if batch_num > self.max_batches:
- import logging
- logging.warning(
+ logger.warning(
f"Reached max_batches limit ({self.max_batches}) for table "
f"'{self.table_name}'. Stopping batch fetch. "
- f"Total rows fetched so far: {sum(len(df) for df in all_dfs)}"
+ f"Total rows fetched so far: {total_rows}"
)
break
@@ -231,24 +253,75 @@ class PowerBIReader:
df = await self._execute_query(dax_query)
if df.empty:
- # No more data to fetch
+ logger.info(f"Batch {batch_num}: Empty result, stopping fetch")
break
+ batch_rows = len(df)
+ total_rows += batch_rows
+
+ # Log batch info
+ if self.order_by_column in df.columns:
+ min_val = df[self.order_by_column].min()
+ max_val = df[self.order_by_column].max()
+ logger.info(
+ f"Batch {batch_num}: Fetched {batch_rows} rows, "
+ f"{self.order_by_column} range: {min_val} to {max_val}, "
+ f"Total so far: {total_rows}"
+ )
+ else:
+ logger.info(f"Batch {batch_num}: Fetched {batch_rows} rows, Total so far: {total_rows}")
+
all_dfs.append(df)
- # Get the last value for the next batch
- new_last_value = df[self.order_by_column].iloc[-1]
+ # Get the maximum value for the next batch (not just the last row)
+ # This ensures we don't skip rows when there are multiple rows per ID
+ # Using max() instead of iloc[-1] because rows are ordered by I_ID,
+ # but if there are multiple rows with the same I_ID, the last row might
+ # have a smaller I_ID than the maximum in the batch
+ max_value = df[self.order_by_column].max()
+
+ # Ensure numeric values are preserved as numeric (not converted to string)
+ # Check if the column is numeric and convert the value accordingly
+ if pd.api.types.is_numeric_dtype(df[self.order_by_column].dtype):
+ # Column is numeric - ensure value is numeric
+ if pd.isna(max_value):
+ logger.info(f"Batch {batch_num}: Max value is NaN, stopping")
+ break # Can't continue with NaN
+ # Convert to appropriate numeric type
+ if pd.api.types.is_integer_dtype(df[self.order_by_column].dtype):
+ new_last_value = int(max_value)
+ else:
+ new_last_value = float(max_value)
+ else:
+ new_last_value = max_value
# Safety check: if last_value didn't change, we're stuck in a loop
if new_last_value == last_value:
+ logger.warning(
+ f"Batch {batch_num}: last_value didn't change ({last_value}), "
+ f"stopping to avoid infinite loop"
+ )
break
last_value = new_last_value
+
+ logger.info(f"Finished fetching batches for table '{self.table_name}': {batch_num} batches, {total_rows} total rows")
if not all_dfs:
return pd.DataFrame()
result = pd.concat(all_dfs, ignore_index=True)
+
+ # Log statistics
+ if self.order_by_column and self.order_by_column in result.columns:
+ unique_ids = result[self.order_by_column].nunique()
+ total_rows_final = len(result)
+ logger.info(
+ f"Table '{self.table_name}': {total_rows_final} total rows, "
+ f"{unique_ids} unique {self.order_by_column} values, "
+ f"average {total_rows_final / unique_ids:.2f} rows per {self.order_by_column}"
+ )
+
return result
@staticmethod