Compare commits

...

13 commits

Author SHA1 Message Date
8b2ac4d467 chore: remove placeholder Azure workflows (no Infomaniak VM yet)
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 03:04:12 +02:00
21d84a099c fix: use full GitHub URLs for non-mirrored actions (azure, astral-sh)
Some checks failed
Deploy althaus-preprocessing / build (push) Failing after 55s
Deploy althaus-preprocessing / deploy (push) Has been skipped
Deploy poweron-preprocessing / build (push) Failing after 18s
Deploy poweron-preprocessing / deploy (push) Has been skipped
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 02:42:30 +02:00
b9e73a728d refactor: migrate to Forgejo workflows, remove GitHub Actions
Some checks failed
Deploy althaus-preprocessing / build (push) Failing after 23s
Deploy althaus-preprocessing / deploy (push) Has been skipped
Deploy poweron-preprocessing / build (push) Failing after 2s
Deploy poweron-preprocessing / deploy (push) Has been skipped
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 02:34:22 +02:00
9bca05dea2 finished preprocessing batching 2025-12-23 13:04:46 +01:00
idittrich-valueon
3a28fbd5e6
Merge pull request #12 from valueonag/fix/preprocessing-limits
feat: Implement `max_batches` safety limit for Power BI data fetching…
2025-12-23 11:29:37 +01:00
Christopher Gondek
c2f11b92fd feat: Implement max_batches safety limit for Power BI data fetching and refine DAX batch query type handling for numeric and string values. 2025-12-23 11:28:55 +01:00
idittrich-valueon
f5f8dfcb80
Merge pull request #11 from valueonag/fix/preprocessing-limits
feat: Add batching and keyset pagination to Power BI data fetching us…
2025-12-23 11:15:09 +01:00
Christopher Gondek
af6150e2fb feat: Add batching and keyset pagination to Power BI data fetching using batch_size and order_by_column parameters. 2025-12-23 11:14:29 +01:00
Christopher Gondek
5b8daa4e49
Merge pull request #10 from valueonag/chore/limit-row-count
fix: sanitize SQL queries by removing trailing semicolons
2025-11-05 11:24:39 +01:00
Christopher Gondek
3fbb41b980 fix: sanitize SQL queries by removing trailing semicolons 2025-11-05 11:23:54 +01:00
Christopher Gondek
471ad42912
Merge pull request #9 from valueonag/chore/limit-row-count
chore: enforce query row limit
2025-11-05 11:02:43 +01:00
Christopher Gondek
dde61f447d chore: enforce query row limit 2025-11-05 11:02:24 +01:00
Christopher Gondek
e7dd3ea999
Merge pull request #8 from valueonag/feat/powerbi-measures
feat: support powerbi measures
2025-11-05 08:36:01 +01:00
10 changed files with 961 additions and 132 deletions

View file

@ -1,63 +0,0 @@
name: Build and deploy Python app to Azure Web App - althaus-preprocessing
on:
push:
branches: [main]
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout
uses: actions/checkout@v4
# ---------- BACKEND / PYTHON ----------
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.13"
- name: Install uv
uses: astral-sh/setup-uv@v6
# ---------- ARTIFACT ----------
- name: Upload artifact for deployment jobs
uses: actions/upload-artifact@v4
with:
name: python-app
path: |
.
!venv/
!.venv/
!.git/
!.cache/
deploy:
runs-on: ubuntu-latest
needs: build
permissions:
id-token: write
contents: read
steps:
- name: Download artifact from build job
uses: actions/download-artifact@v4
with:
name: python-app
- name: Login to Azure (OIDC)
uses: azure/login@v2
with:
client-id: ${{ secrets.AZUREAPPSERVICE_CLIENTID_3720212D35D047C38CEE2F365F1D81C7 }}
tenant-id: ${{ secrets.AZUREAPPSERVICE_TENANTID_AC3FDD6C7376466AA44A309428618439 }}
subscription-id: ${{ secrets.AZUREAPPSERVICE_SUBSCRIPTIONID_94A2088C685A46DFAE2BFBA50714B1DA }}
- name: Deploy to Azure Web App
uses: azure/webapps-deploy@v3
with:
app-name: "poweron-althaus-preprocess-prod"
slot-name: "Production"
package: . # reuse the downloaded artifact folder

View file

@ -1,63 +0,0 @@
name: Build and deploy Python app to Azure Web App - poweron-preprocessing
on:
push:
branches: [main]
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout
uses: actions/checkout@v4
# ---------- BACKEND / PYTHON ----------
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.13"
- name: Install uv
uses: astral-sh/setup-uv@v6
# ---------- ARTIFACT ----------
- name: Upload artifact for deployment jobs
uses: actions/upload-artifact@v4
with:
name: python-app
path: |
.
!venv/
!.venv/
!.git/
!.cache/
deploy:
runs-on: ubuntu-latest
needs: build
permissions:
id-token: write
contents: read
steps:
- name: Download artifact from build job
uses: actions/download-artifact@v4
with:
name: python-app
- name: Login to Azure (OIDC)
uses: azure/login@v2
with:
client-id: ${{ secrets.AZUREAPPSERVICE_CLIENTID_AA4B9998A69E4C5C8FDF357E3FEAADD5 }}
tenant-id: ${{ secrets.AZUREAPPSERVICE_TENANTID_CC57AD1F29D44DDA960AE3EAC6D2C27A }}
subscription-id: ${{ secrets.AZUREAPPSERVICE_SUBSCRIPTIONID_4CD1D97C506D403E8E284466DB4E7898 }}
- name: Deploy to Azure Web App
uses: azure/webapps-deploy@v3
with:
app-name: "poweron-preprocessing"
slot-name: "Production"
package: . # reuse the downloaded artifact folder

667
README.md
View file

@ -0,0 +1,667 @@
# Gateway Preprocessing - Datenschnittstellen (Datenmodell)
## Übersicht
Dieses Projekt bietet zwei Hauptservices mit klar definierten Datenschnittstellen:
1. **Data Processor Service** (`/dataprocessor`) - Lädt Daten aus Power BI, verarbeitet sie und speichert sie in SQLite
2. **Data Query Service** (`/dataquery`) - Ermöglicht SQL-Abfragen auf die gespeicherten Daten
```mermaid
graph LR
A[Power BI Dataset] -->|Data Processor| B[SQLite Database]
B -->|Data Query| C[Client/API Consumer]
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#bbf,stroke:#333,stroke-width:2px
style C fill:#bfb,stroke:#333,stroke-width:2px
```
---
## Data Processor Service
Der Data Processor Service verarbeitet Daten aus Power BI und speichert sie in einer lokalen SQLite-Datenbank. Die Schemas befinden sich in `src/dataprocessor/schemas.py`.
### Datenmodell-Hierarchie
```mermaid
classDiagram
class PreprocessingConfigSchema {
+List~TableConfigSchema~ tables
}
class TableConfigSchema {
+str name
+str powerbi_table_name
+List~str~ measures
+List~str~ group_by_columns
+List~Dict~ steps
}
class UpdateDbWithConfigResponse {
+bool success
+int tables_processed
+List~str~ warnings
}
class UpdateDbResponse {
+bool success
}
PreprocessingConfigSchema "1" *-- "1..*" TableConfigSchema : enthält
```
### 1. TableConfigSchema
Definiert die Konfiguration für eine einzelne Tabelle, die aus Power BI gelesen und verarbeitet werden soll.
**Felder:**
| Feld | Typ | Beschreibung | Beispiel |
|------|-----|--------------|----------|
| `name` | `str` | Name der Tabelle in der lokalen SQLite-Datenbank | `"ProductData"` |
| `powerbi_table_name` | `str` | Name der Quelltabelle im Power BI Dataset | `"products_raw"` |
| `measures` | `List[str]` | Liste von Power BI Measures, die abgerufen werden sollen (optional) | `["EP in CHF", "Gesamtbetrag"]` |
| `group_by_columns` | `List[str]` | Spalten für Gruppierung bei Measures (triggert SUMMARIZECOLUMNS) | `["m_Artikel", "Kategorie"]` |
| `steps` | `List[Dict[str, Any]]` | Liste von Preprocessing-Schritten | siehe unten |
**Unterstützte Preprocessing-Schritte:**
```mermaid
graph TD
A[Preprocessing Steps] --> B[keep]
A --> C[fillna]
A --> D[to_numeric]
A --> E[dropna]
B --> B1["columns: List[str]<br/>Behält nur angegebene Spalten"]
C --> C1["column: str, value: Any<br/>Füllt fehlende Werte"]
D --> D1["column: str, errors: str<br/>Konvertiert zu numerisch"]
E --> E1["subset: List[str]<br/>Entfernt Zeilen mit NaN"]
style A fill:#f96,stroke:#333,stroke-width:3px
style B fill:#9cf,stroke:#333,stroke-width:2px
style C fill:#9cf,stroke:#333,stroke-width:2px
style D fill:#9cf,stroke:#333,stroke-width:2px
style E fill:#9cf,stroke:#333,stroke-width:2px
```
**Beispiel:**
```json
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"measures": ["Total Sales", "Average Price"],
"group_by_columns": ["Category", "Supplier"],
"steps": [
{
"keep": {
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": ["ProductID", "ProductName", "Price"]
}
}
]
}
```
### 2. PreprocessingConfigSchema
Komplette Konfiguration für einen Preprocessing-Request. Kann mehrere Tabellen gleichzeitig verarbeiten.
**Felder:**
| Feld | Typ | Beschreibung | Validierung |
|------|-----|--------------|-------------|
| `tables` | `List[TableConfigSchema]` | Liste von Tabellenkonfigurationen | min. 1 Tabelle |
**Vollständiges Request-Beispiel:**
```json
{
"tables": [
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"steps": [
{
"keep": {
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": ["ProductID", "ProductName", "Stock", "Unit", "Price"]
}
}
]
},
{
"name": "CustomerData",
"powerbi_table_name": "customers_raw",
"steps": [
{
"keep": {
"columns": ["CustomerID", "Name", "Email", "Region"]
}
},
{
"fillna": {
"column": "Region",
"value": "Unknown"
}
}
]
}
]
}
```
### 3. UpdateDbResponse
Einfache Response für Standard-DB-Updates (YAML-basiert).
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
**Beispiel:**
```json
{
"success": true
}
```
### 4. UpdateDbWithConfigResponse
Detaillierte Response für JSON-basierte DB-Updates mit Konfiguration.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
| `tables_processed` | `int` | Anzahl der erfolgreich verarbeiteten Tabellen |
| `warnings` | `List[str]` | Liste von Warnungen während der Verarbeitung |
**Beispiel:**
```json
{
"success": true,
"tables_processed": 2,
"warnings": [
"Table 'ProductData': 3 rows dropped due to missing values",
"Table 'ProductData': Column 'Price' had 5 non-numeric values coerced to NaN"
]
}
```
### Preprocessing-Workflow
```mermaid
sequenceDiagram
participant Client
participant API
participant PowerBI
participant Preprocessor
participant SQLite
Client->>API: POST /dataprocessor/update-db-with-config
Note over Client,API: PreprocessingConfigSchema
loop Für jede Tabelle
API->>PowerBI: Daten abrufen (powerbi_table_name)
PowerBI-->>API: Raw DataFrame
API->>Preprocessor: Preprocessing-Steps anwenden
Preprocessor->>Preprocessor: keep → fillna → to_numeric → dropna
Preprocessor-->>API: Verarbeiteter DataFrame
API->>SQLite: Tabelle speichern (name)
end
API-->>Client: UpdateDbWithConfigResponse
Note over Client,API: success, tables_processed, warnings
```
---
## Data Query Service
Der Data Query Service ermöglicht SQL-Abfragen auf die gespeicherte SQLite-Datenbank. Die Schemas befinden sich in `src/dataquery/schemas.py`.
### Datenmodell-Hierarchie
```mermaid
classDiagram
class SqlQueryRequest {
+str query
}
class SqlQueryResponse {
+bool success
+List~Dict~ data
+List~str~ columns
+int row_count
+Optional~str~ message
}
class DatabaseSchemaResponse {
+bool success
+List~TableInfo~ tables
+int table_count
}
class TableInfo {
+str name
+int row_count
}
class TableSchemaResponse {
+bool success
+str table_name
+List~ColumnInfo~ columns
+int row_count
+List~Dict~ sample_data
}
class ColumnInfo {
+str name
+str type
+bool nullable
+bool primary_key
}
DatabaseSchemaResponse "1" *-- "*" TableInfo : enthält
TableSchemaResponse "1" *-- "*" ColumnInfo : enthält
```
### 1. SqlQueryRequest
Request-Schema für SQL-Abfragen.
**Felder:**
| Feld | Typ | Beschreibung | Validierung |
|------|-----|--------------|-------------|
| `query` | `str` | SQL-Query (nur SELECT erlaubt) | min. 1 Zeichen |
**Beispiel:**
```json
{
"query": "SELECT ProductID, ProductName, Price FROM ProductData WHERE Price > 10 ORDER BY Price DESC LIMIT 100"
}
```
### 2. SqlQueryResponse
Response-Schema für SQL-Query-Ergebnisse.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob die Query erfolgreich ausgeführt wurde |
| `data` | `List[Dict[str, Any]]` | Query-Ergebnisse als Liste von Dictionaries |
| `columns` | `List[str]` | Spaltennamen im Ergebnis |
| `row_count` | `int` | Anzahl der zurückgegebenen Zeilen |
| `message` | `Optional[str]` | Zusätzliche Informationen oder Fehlermeldung |
**Beispiel:**
```json
{
"success": true,
"columns": ["ProductID", "ProductName", "Price"],
"row_count": 3,
"data": [
{
"ProductID": 42,
"ProductName": "Premium Widget",
"Price": 99.99
},
{
"ProductID": 15,
"ProductName": "Deluxe Gadget",
"Price": 79.50
},
{
"ProductID": 8,
"ProductName": "Standard Tool",
"Price": 45.00
}
],
"message": null
}
```
### 3. TableInfo
Informationen über eine Datenbanktabelle.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `name` | `str` | Tabellenname |
| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
### 4. DatabaseSchemaResponse
Response-Schema für Datenbank-Schema-Informationen.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
| `tables` | `List[TableInfo]` | Liste aller Tabellen in der Datenbank |
| `table_count` | `int` | Gesamtanzahl der Tabellen |
**Beispiel:**
```json
{
"success": true,
"tables": [
{
"name": "ProductData",
"row_count": 1523
},
{
"name": "CustomerData",
"row_count": 847
},
{
"name": "OrderData",
"row_count": 3421
}
],
"table_count": 3
}
```
### 5. ColumnInfo
Informationen über eine Tabellenspalte.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `name` | `str` | Spaltenname |
| `type` | `str` | Datentyp der Spalte |
| `nullable` | `bool` | Ob die Spalte NULL-Werte enthalten kann |
| `primary_key` | `bool` | Ob die Spalte ein Primary Key ist |
### 6. TableSchemaResponse
Response-Schema für detaillierte Tabellenstruktur-Informationen.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
| `table_name` | `str` | Name der Tabelle |
| `columns` | `List[ColumnInfo]` | Liste aller Spalten in der Tabelle |
| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
| `sample_data` | `List[Dict[str, Any]]` | Beispieldaten (bis zu 5 Zeilen) |
**Beispiel:**
```json
{
"success": true,
"table_name": "ProductData",
"columns": [
{
"name": "ProductID",
"type": "INTEGER",
"nullable": false,
"primary_key": true
},
{
"name": "ProductName",
"type": "TEXT",
"nullable": false,
"primary_key": false
},
{
"name": "Supplier",
"type": "TEXT",
"nullable": true,
"primary_key": false
},
{
"name": "Price",
"type": "REAL",
"nullable": true,
"primary_key": false
}
],
"row_count": 1523,
"sample_data": [
{
"ProductID": 1,
"ProductName": "Widget A",
"Supplier": "Acme Corp",
"Price": 12.50
},
{
"ProductID": 2,
"ProductName": "Gadget B",
"Supplier": "TechCo",
"Price": 24.99
},
{
"ProductID": 3,
"ProductName": "Tool C",
"Supplier": "Unknown",
"Price": 8.75
}
]
}
```
### Query-Workflow
```mermaid
sequenceDiagram
participant Client
participant API
participant SQLite
rect rgb(200, 220, 250)
Note over Client,SQLite: Szenario 1: SQL Query ausführen
Client->>API: POST /dataquery/query
Note over Client,API: SqlQueryRequest
API->>SQLite: SELECT ausführen
SQLite-->>API: Ergebnisse
API-->>Client: SqlQueryResponse
Note over Client,API: data, columns, row_count
end
rect rgb(220, 250, 220)
Note over Client,SQLite: Szenario 2: Datenbank-Schema abrufen
Client->>API: GET /dataquery/schema
API->>SQLite: Tabellen auflisten
SQLite-->>API: Tabellenliste
API-->>Client: DatabaseSchemaResponse
Note over Client,API: tables, table_count
end
rect rgb(250, 220, 220)
Note over Client,SQLite: Szenario 3: Tabellenstruktur abrufen
Client->>API: GET /dataquery/table/{table_name}
API->>SQLite: Spalten + Sample Data abrufen
SQLite-->>API: Struktur + Daten
API-->>Client: TableSchemaResponse
Note over Client,API: columns, sample_data
end
```
---
## Gesamtarchitektur
```mermaid
graph TB
subgraph "External Data Source"
PBI[Power BI Dataset]
end
subgraph "Gateway Preprocessing API"
subgraph "Data Processor Service"
DP_Router[Router]
DP_Service[Service]
DP_Schemas[Schemas]
DP_Domain[Domain Layer]
end
subgraph "Data Query Service"
DQ_Router[Router]
DQ_Service[Service]
DQ_Schemas[Schemas]
end
end
subgraph "Local Storage"
DB[(SQLite Database)]
end
subgraph "Clients"
Client1[Web App]
Client2[BI Tool]
Client3[API Consumer]
end
PBI -->|Power BI Reader| DP_Domain
DP_Router --> DP_Service
DP_Service --> DP_Domain
DP_Domain -->|Save| DB
DQ_Router --> DQ_Service
DQ_Service -->|Query| DB
Client1 -->|POST /dataprocessor/update-db-with-config| DP_Router
Client2 -->|POST /dataquery/query| DQ_Router
Client3 -->|GET /dataquery/schema| DQ_Router
style PBI fill:#f9f,stroke:#333,stroke-width:2px
style DB fill:#bbf,stroke:#333,stroke-width:3px
style DP_Schemas fill:#ffa,stroke:#333,stroke-width:2px
style DQ_Schemas fill:#ffa,stroke:#333,stroke-width:2px
```
---
## API-Endpunkte Übersicht
### Data Processor Endpoints
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|---------|----------|----------------|-----------------|--------------|
| POST | `/dataprocessor/update-db` | - | `UpdateDbResponse` | Aktualisiert DB mit YAML-Config |
| POST | `/dataprocessor/update-db-with-config` | `PreprocessingConfigSchema` | `UpdateDbWithConfigResponse` | Aktualisiert DB mit JSON-Config |
### Data Query Endpoints
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|---------|----------|----------------|-----------------|--------------|
| POST | `/dataquery/query` | `SqlQueryRequest` | `SqlQueryResponse` | Führt SQL-Query aus |
| GET | `/dataquery/schema` | - | `DatabaseSchemaResponse` | Gibt DB-Schema zurück |
| GET | `/dataquery/table/{table_name}` | - | `TableSchemaResponse` | Gibt Tabellenstruktur zurück |
---
## Typische Use Cases
### Use Case 1: Daten aus Power BI laden und verarbeiten
```mermaid
flowchart LR
A[Client] -->|1. POST PreprocessingConfigSchema| B[API]
B -->|2. Daten abrufen| C[Power BI]
C -->|3. Raw Data| B
B -->|4. Preprocessing| B
B -->|5. Speichern| D[(SQLite)]
B -->|6. UpdateDbWithConfigResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
style C fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#bbf,stroke:#333,stroke-width:2px
```
### Use Case 2: Daten abfragen
```mermaid
flowchart LR
A[Client] -->|1. POST SqlQueryRequest| B[API]
B -->|2. SELECT Query| C[(SQLite)]
C -->|3. Ergebnisse| B
B -->|4. SqlQueryResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
style C fill:#bbf,stroke:#333,stroke-width:2px
```
### Use Case 3: Datenbank-Struktur erkunden
```mermaid
flowchart TD
A[Client] -->|1. GET /schema| B[API]
B -->|2. DatabaseSchemaResponse| A
A -->|3. GET /table/ProductData| B
B -->|4. TableSchemaResponse| A
A -->|5. POST SqlQueryRequest| B
B -->|6. SqlQueryResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
```
---
## Validierung und Fehlerbehandlung
Alle Schemas verwenden Pydantic für automatische Validierung:
- **Typsicherheit**: Alle Felder werden auf korrekte Typen geprüft
- **Required Fields**: Pflichtfelder müssen vorhanden sein
- **Min/Max Validierung**: z.B. `min_items=1` für `tables`
- **String Validierung**: z.B. `min_length=1` für SQL-Queries
- **Custom Validation**: Zusätzliche Business-Logik in den Services
Bei Validierungsfehlern gibt die API einen HTTP 422 (Unprocessable Entity) mit detaillierten Fehlermeldungen zurück.

View file

@ -4,6 +4,7 @@ from dataclasses import dataclass
from src.settings import settings from src.settings import settings
import pandas as pd import pandas as pd
import httpx import httpx
import re
@dataclass @dataclass
@ -15,6 +16,9 @@ class PowerBIReader:
include_nulls: bool = True include_nulls: bool = True
measures: list[str] = None measures: list[str] = None
group_by_columns: list[str] = None group_by_columns: list[str] = None
batch_size: int = 10000
order_by_column: str | None = None
max_batches: int = 100
@classmethod @classmethod
async def create( async def create(
@ -25,14 +29,20 @@ class PowerBIReader:
table_name: str, table_name: str,
measures: list[str] = None, measures: list[str] = None,
group_by_columns: list[str] = None, group_by_columns: list[str] = None,
batch_size: int = 10000,
order_by_column: str | None = None,
max_batches: int = 100,
**kwargs, **kwargs,
): ) -> "PowerBIReader":
return cls( return cls(
dataset_id=dataset_id, dataset_id=dataset_id,
access_token=access_token, access_token=access_token,
table_name=table_name, table_name=table_name,
measures=measures or [], measures=measures or [],
group_by_columns=group_by_columns or [], group_by_columns=group_by_columns or [],
batch_size=batch_size,
order_by_column=order_by_column,
max_batches=max_batches,
**kwargs, **kwargs,
) )
@ -54,8 +64,10 @@ class PowerBIReader:
Returns: Returns:
DAX query string to execute against Power BI. DAX query string to execute against Power BI.
""" """
# Remove XML/HTML tags from table name (e.g., '<oii>Artikel</oii>' -> 'Artikel')
cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
# Escape single quotes in table names per DAX rules # Escape single quotes in table names per DAX rules
safe_table = self.table_name.replace("'", "''") safe_table = cleaned_table_name.replace("'", "''")
# Case 1: No measures - simple table evaluation # Case 1: No measures - simple table evaluation
if not self.measures: if not self.measures:
@ -77,14 +89,94 @@ class PowerBIReader:
) )
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})" return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
async def read_data(self) -> pd.DataFrame: @staticmethod
def _is_numeric_value(value: str | int | float | None) -> bool:
"""Check if a value is numeric (including numeric strings).
Args:
value: The value to check.
Returns:
True if the value is numeric or a string that represents a number.
""" """
Calls Power BI REST API: POST /datasets/{datasetId}/executeQueries if value is None:
with DAX: EVALUATE 'TableName' and returns a DataFrame. return False
if isinstance(value, (int, float)):
return True
if isinstance(value, str):
try:
float(value)
return True
except ValueError:
return False
return False
def _dax_query_batch(self, last_value: str | int | float | None = None) -> str:
"""Generate a batched DAX query using TOPN and keyset pagination.
Uses ORDER BY with the order_by_column for deterministic ordering,
and FILTER to skip already-fetched rows based on the last seen value.
Args:
last_value: The last value of order_by_column from the previous batch.
None for the first batch.
Returns:
DAX query string for fetching the next batch.
""" """
# Remove XML/HTML tags from table name (e.g., '<oii>Artikel</oii>' -> 'Artikel')
cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
# Escape single quotes in table names per DAX rules
safe_table = cleaned_table_name.replace("'", "''")
order_col = self.order_by_column
if last_value is None:
# First batch: just use TOPN with ORDER BY
return (
f"EVALUATE TOPN({self.batch_size}, '{safe_table}', "
f"'{safe_table}'[{order_col}], ASC)"
)
# Subsequent batches: filter rows where order_col > last_value
# IMPORTANT: Handle type conversion to avoid DAX type mismatch errors
# Use a type-safe approach: convert column using IF/ISNUMBER to handle both text and numeric columns
if self._is_numeric_value(last_value):
numeric_val = float(last_value) if '.' in str(last_value) else int(last_value)
# Use IF to check if column is numeric, if so use it directly, otherwise convert with VALUE()
# This handles both text columns (VALUE converts) and numeric columns (use directly)
return (
f"EVALUATE TOPN({self.batch_size}, "
f"FILTER('{safe_table}', "
f"NOT(ISBLANK('{safe_table}'[{order_col}])) && "
f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])) > {numeric_val}), "
f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])), ASC)"
)
else:
# String comparison - escape quotes in the value
escaped_value = str(last_value).replace('"', '""')
return (
f"EVALUATE TOPN({self.batch_size}, "
f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > \"{escaped_value}\"), "
f"'{safe_table}'[{order_col}], ASC)"
)
async def _execute_query(self, dax_query: str) -> pd.DataFrame:
"""Execute a DAX query and return the results as a DataFrame.
Args:
dax_query: The DAX query string to execute.
Returns:
DataFrame containing the query results.
"""
# Log the DAX query for debugging
import logging
logger = logging.getLogger(__name__)
logger.info(f"Executing DAX query: {dax_query}")
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries" url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
body = { body = {
"queries": [{"query": self._dax_query()}], "queries": [{"query": dax_query}],
"serializerSettings": {"includeNulls": self.include_nulls}, "serializerSettings": {"includeNulls": self.include_nulls},
} }
@ -97,6 +189,7 @@ class PowerBIReader:
resp = await client.post(url, headers=headers, json=body) resp = await client.post(url, headers=headers, json=body)
if resp.status_code != 200: if resp.status_code != 200:
logger.error(f"DAX query failed: {dax_query}")
raise RuntimeError( raise RuntimeError(
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}" f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
) )
@ -121,6 +214,116 @@ class PowerBIReader:
df.columns = [_strip_qual(c) for c in df.columns] df.columns = [_strip_qual(c) for c in df.columns]
return df return df
async def read_data(self) -> pd.DataFrame:
"""Fetch data from Power BI, using batching if order_by_column is set.
If order_by_column is configured, fetches data in batches using
keyset pagination to avoid the Power BI API's 1M value limit.
Otherwise, fetches all data in a single query (legacy behavior).
Returns:
DataFrame containing all fetched data.
"""
# Legacy behavior: no batching if order_by_column not set
if not self.order_by_column:
return await self._execute_query(self._dax_query())
# Batch fetching with keyset pagination
import logging
logger = logging.getLogger(__name__)
all_dfs: list[pd.DataFrame] = []
last_value: str | int | None = None
batch_num = 0
total_rows = 0
while True:
batch_num += 1
# Safety limit to prevent runaway requests
if batch_num > self.max_batches:
logger.warning(
f"Reached max_batches limit ({self.max_batches}) for table "
f"'{self.table_name}'. Stopping batch fetch. "
f"Total rows fetched so far: {total_rows}"
)
break
dax_query = self._dax_query_batch(last_value)
df = await self._execute_query(dax_query)
if df.empty:
logger.info(f"Batch {batch_num}: Empty result, stopping fetch")
break
batch_rows = len(df)
total_rows += batch_rows
# Log batch info
if self.order_by_column in df.columns:
min_val = df[self.order_by_column].min()
max_val = df[self.order_by_column].max()
logger.info(
f"Batch {batch_num}: Fetched {batch_rows} rows, "
f"{self.order_by_column} range: {min_val} to {max_val}, "
f"Total so far: {total_rows}"
)
else:
logger.info(f"Batch {batch_num}: Fetched {batch_rows} rows, Total so far: {total_rows}")
all_dfs.append(df)
# Get the maximum value for the next batch (not just the last row)
# This ensures we don't skip rows when there are multiple rows per ID
# Using max() instead of iloc[-1] because rows are ordered by I_ID,
# but if there are multiple rows with the same I_ID, the last row might
# have a smaller I_ID than the maximum in the batch
max_value = df[self.order_by_column].max()
# Ensure numeric values are preserved as numeric (not converted to string)
# Check if the column is numeric and convert the value accordingly
if pd.api.types.is_numeric_dtype(df[self.order_by_column].dtype):
# Column is numeric - ensure value is numeric
if pd.isna(max_value):
logger.info(f"Batch {batch_num}: Max value is NaN, stopping")
break # Can't continue with NaN
# Convert to appropriate numeric type
if pd.api.types.is_integer_dtype(df[self.order_by_column].dtype):
new_last_value = int(max_value)
else:
new_last_value = float(max_value)
else:
new_last_value = max_value
# Safety check: if last_value didn't change, we're stuck in a loop
if new_last_value == last_value:
logger.warning(
f"Batch {batch_num}: last_value didn't change ({last_value}), "
f"stopping to avoid infinite loop"
)
break
last_value = new_last_value
logger.info(f"Finished fetching batches for table '{self.table_name}': {batch_num} batches, {total_rows} total rows")
if not all_dfs:
return pd.DataFrame()
result = pd.concat(all_dfs, ignore_index=True)
# Log statistics
if self.order_by_column and self.order_by_column in result.columns:
unique_ids = result[self.order_by_column].nunique()
total_rows_final = len(result)
logger.info(
f"Table '{self.table_name}': {total_rows_final} total rows, "
f"{unique_ids} unique {self.order_by_column} values, "
f"average {total_rows_final / unique_ids:.2f} rows per {self.order_by_column}"
)
return result
@staticmethod @staticmethod
def _get_access_token_sync( def _get_access_token_sync(
tenant_id: str, tenant_id: str,

View file

@ -149,6 +149,9 @@ class TableConfig:
powerbi_table_name: str powerbi_table_name: str
measures: List[str] = field(default_factory=list) measures: List[str] = field(default_factory=list)
group_by_columns: List[str] = field(default_factory=list) group_by_columns: List[str] = field(default_factory=list)
batch_size: int = 10000
order_by_column: str | None = None
max_batches: int = 100
steps: List[Dict[str, Any]] = field(default_factory=list) steps: List[Dict[str, Any]] = field(default_factory=list)
@ -259,6 +262,9 @@ class Preprocessor:
powerbi_table_name=table_data.get("powerbi_table_name", ""), powerbi_table_name=table_data.get("powerbi_table_name", ""),
measures=table_data.get("measures", []), measures=table_data.get("measures", []),
group_by_columns=table_data.get("group_by_columns", []), group_by_columns=table_data.get("group_by_columns", []),
batch_size=table_data.get("batch_size", 10000),
order_by_column=table_data.get("order_by_column"),
max_batches=table_data.get("max_batches", 100),
steps=table_data.get("steps", []), steps=table_data.get("steps", []),
) )
table_configs.append(table_config) table_configs.append(table_config)

View file

@ -41,6 +41,21 @@ class TableConfigSchema(BaseModel):
description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)", description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)",
example=["m_Artikel"], example=["m_Artikel"],
) )
batch_size: int = Field(
default=10000,
description="Number of rows to fetch per batch (for large tables)",
example=10000,
)
order_by_column: str | None = Field(
default=None,
description="Column to order by for batch fetching. Required for batching to work.",
example="I_ID",
)
max_batches: int = Field(
default=100,
description="Maximum number of batches to fetch. Safety limit to prevent runaway requests.",
example=100,
)
steps: List[Dict[str, Any]] = Field( steps: List[Dict[str, Any]] = Field(
default_factory=list, default_factory=list,
description="List of preprocessing steps to apply", description="List of preprocessing steps to apply",

View file

@ -56,6 +56,8 @@ class DataProcessorService:
table_name=table_config.powerbi_table_name, table_name=table_config.powerbi_table_name,
measures=table_config.measures, measures=table_config.measures,
group_by_columns=table_config.group_by_columns, group_by_columns=table_config.group_by_columns,
batch_size=getattr(table_config, 'batch_size', 10000),
order_by_column=getattr(table_config, 'order_by_column', None),
) )
# Step 2: Read data from Power BI # Step 2: Read data from Power BI
@ -151,6 +153,9 @@ class DataProcessorService:
table_name=table_config.powerbi_table_name, table_name=table_config.powerbi_table_name,
measures=table_config.measures, measures=table_config.measures,
group_by_columns=table_config.group_by_columns, group_by_columns=table_config.group_by_columns,
batch_size=table_config.batch_size,
order_by_column=table_config.order_by_column,
max_batches=table_config.max_batches,
) )
# Step 2: Read data from Power BI # Step 2: Read data from Power BI

View file

@ -71,6 +71,53 @@ class DataQueryService:
return True return True
def _enforce_query_limit(self, *, query: str) -> str:
"""Enforce maximum row limit on query.
If query has LIMIT > SQL_ROW_LIMIT, replace with SQL_ROW_LIMIT.
If query has no LIMIT, append LIMIT SQL_ROW_LIMIT.
If query has LIMIT <= SQL_ROW_LIMIT, keep as is.
Args:
query: The SQL query to enforce limit on.
Returns:
Query with enforced limit.
"""
max_limit = settings.SQL_ROW_LIMIT
# Strip trailing semicolons and whitespace to prevent multi-statement errors
query = query.rstrip("; \t\n\r")
# Remove comments and normalize whitespace for parsing
cleaned_query = re.sub(r"--.*$", "", query, flags=re.MULTILINE)
cleaned_query = re.sub(r"/\*.*?\*/", "", cleaned_query, flags=re.DOTALL)
# Look for LIMIT clause (case insensitive)
# Pattern matches: LIMIT <number> or LIMIT <number> OFFSET <number>
limit_pattern = r"\bLIMIT\s+(\d+)(\s+OFFSET\s+\d+)?\s*$"
match = re.search(limit_pattern, cleaned_query, re.IGNORECASE)
if match:
# Extract the current limit value
current_limit = int(match.group(1))
if current_limit > max_limit:
# Replace with max_limit while preserving OFFSET if present
offset_clause = match.group(2) or ""
# Find the position in the original query to replace
# Use the original query to preserve formatting
original_match = re.search(limit_pattern, query, re.IGNORECASE)
if original_match:
new_limit_clause = f"LIMIT {max_limit}{offset_clause}"
query = query[: original_match.start()] + new_limit_clause
# If current_limit <= max_limit, keep query as is
else:
# No LIMIT clause found, append one
query = f"{query.rstrip()} LIMIT {max_limit}"
return query
async def execute_query(self, *, query: str) -> SqlQueryResponse: async def execute_query(self, *, query: str) -> SqlQueryResponse:
"""Execute a SELECT query and return the results. """Execute a SELECT query and return the results.
@ -90,6 +137,9 @@ class DataQueryService:
message="Only SELECT queries are allowed", message="Only SELECT queries are allowed",
) )
# Enforce row limit on the query
query = self._enforce_query_limit(query=query)
try: try:
async with self.engine.begin() as conn: async with self.engine.begin() as conn:
result = await conn.execute(text(query)) result = await conn.execute(text(query))

View file

@ -30,6 +30,14 @@ class Settings(BaseSettings):
description="Path to the SQLite database.", description="Path to the SQLite database.",
) )
# --- Database Query Settings ---
# Maximum number of rows to return from SQL queries.
SQL_ROW_LIMIT: int = Field(
default=50,
description="Maximum number of rows to return from SQL queries. Defaults to 50.",
)
# --- API Keys --- # --- API Keys ---
# Preprocessor API key to access this app. # Preprocessor API key to access this app.

View file

@ -0,0 +1 @@
"""Data query tests."""