Compare commits
24 commits
feat/file-
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 8b2ac4d467 | |||
| 21d84a099c | |||
| b9e73a728d | |||
| 9bca05dea2 | |||
|
|
3a28fbd5e6 | ||
|
|
c2f11b92fd | ||
|
|
f5f8dfcb80 | ||
|
|
af6150e2fb | ||
|
|
5b8daa4e49 | ||
|
|
3fbb41b980 | ||
|
|
471ad42912 | ||
|
|
dde61f447d | ||
|
|
e7dd3ea999 | ||
|
|
d9deb09d1b | ||
|
|
837e0c1d29 | ||
|
|
40fc3db398 | ||
|
|
9fb20a6cda | ||
|
|
9247e88dc3 | ||
|
|
91bffe884d | ||
|
|
21c83fbbac | ||
|
|
8391681743 | ||
|
|
2dc215e197 | ||
|
|
0dca581300 | ||
|
|
30f4b0e781 |
12 changed files with 1107 additions and 87 deletions
70
.github/workflows/main_poweron-preprocessing.yml
vendored
70
.github/workflows/main_poweron-preprocessing.yml
vendored
|
|
@ -1,70 +0,0 @@
|
||||||
name: Build and deploy Python app to Azure Web App - poweron-preprocessing
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [main]
|
|
||||||
workflow_dispatch:
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
permissions:
|
|
||||||
contents: read
|
|
||||||
steps:
|
|
||||||
- name: Checkout
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
|
||||||
# ---------- BACKEND / PYTHON ----------
|
|
||||||
- name: Set up Python
|
|
||||||
uses: actions/setup-python@v5
|
|
||||||
with:
|
|
||||||
python-version: "3.13"
|
|
||||||
|
|
||||||
- name: Install uv
|
|
||||||
uses: astral-sh/setup-uv@v6
|
|
||||||
|
|
||||||
# ---------- ARTIFACT ----------
|
|
||||||
- name: Upload artifact for deployment jobs
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: python-app
|
|
||||||
path: |
|
|
||||||
.
|
|
||||||
!venv/
|
|
||||||
!.venv/
|
|
||||||
!.git/
|
|
||||||
!.cache/
|
|
||||||
|
|
||||||
deploy:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
needs: build
|
|
||||||
permissions:
|
|
||||||
id-token: write # <-- required for OIDC
|
|
||||||
contents: read
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- name: Download artifact from build job
|
|
||||||
uses: actions/download-artifact@v4
|
|
||||||
with:
|
|
||||||
name: python-app
|
|
||||||
|
|
||||||
- name: Login to Azure (OIDC)
|
|
||||||
uses: azure/login@v2
|
|
||||||
with:
|
|
||||||
client-id: ${{ secrets.AZUREAPPSERVICE_CLIENTID_AA4B9998A69E4C5C8FDF357E3FEAADD5 }}
|
|
||||||
tenant-id: ${{ secrets.AZUREAPPSERVICE_TENANTID_CC57AD1F29D44DDA960AE3EAC6D2C27A }}
|
|
||||||
subscription-id: ${{ secrets.AZUREAPPSERVICE_SUBSCRIPTIONID_4CD1D97C506D403E8E284466DB4E7898 }}
|
|
||||||
|
|
||||||
# Runtime env files (required in your setup)
|
|
||||||
- name: Set productive environment (runtime)
|
|
||||||
run: |
|
|
||||||
set -euo pipefail
|
|
||||||
cp env_prod.env .env
|
|
||||||
|
|
||||||
- name: Deploy to Azure Web App
|
|
||||||
uses: azure/webapps-deploy@v3
|
|
||||||
with:
|
|
||||||
app-name: "poweron-preprocessing"
|
|
||||||
slot-name: "Production"
|
|
||||||
package: . # reuse the downloaded artifact folder
|
|
||||||
# no publish-profile here; it will use the az login from azure/login
|
|
||||||
667
README.md
667
README.md
|
|
@ -0,0 +1,667 @@
|
||||||
|
# Gateway Preprocessing - Datenschnittstellen (Datenmodell)
|
||||||
|
|
||||||
|
## Übersicht
|
||||||
|
|
||||||
|
Dieses Projekt bietet zwei Hauptservices mit klar definierten Datenschnittstellen:
|
||||||
|
|
||||||
|
1. **Data Processor Service** (`/dataprocessor`) - Lädt Daten aus Power BI, verarbeitet sie und speichert sie in SQLite
|
||||||
|
2. **Data Query Service** (`/dataquery`) - Ermöglicht SQL-Abfragen auf die gespeicherten Daten
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
graph LR
|
||||||
|
A[Power BI Dataset] -->|Data Processor| B[SQLite Database]
|
||||||
|
B -->|Data Query| C[Client/API Consumer]
|
||||||
|
|
||||||
|
style A fill:#f9f,stroke:#333,stroke-width:2px
|
||||||
|
style B fill:#bbf,stroke:#333,stroke-width:2px
|
||||||
|
style C fill:#bfb,stroke:#333,stroke-width:2px
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Data Processor Service
|
||||||
|
|
||||||
|
Der Data Processor Service verarbeitet Daten aus Power BI und speichert sie in einer lokalen SQLite-Datenbank. Die Schemas befinden sich in `src/dataprocessor/schemas.py`.
|
||||||
|
|
||||||
|
### Datenmodell-Hierarchie
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
classDiagram
|
||||||
|
class PreprocessingConfigSchema {
|
||||||
|
+List~TableConfigSchema~ tables
|
||||||
|
}
|
||||||
|
|
||||||
|
class TableConfigSchema {
|
||||||
|
+str name
|
||||||
|
+str powerbi_table_name
|
||||||
|
+List~str~ measures
|
||||||
|
+List~str~ group_by_columns
|
||||||
|
+List~Dict~ steps
|
||||||
|
}
|
||||||
|
|
||||||
|
class UpdateDbWithConfigResponse {
|
||||||
|
+bool success
|
||||||
|
+int tables_processed
|
||||||
|
+List~str~ warnings
|
||||||
|
}
|
||||||
|
|
||||||
|
class UpdateDbResponse {
|
||||||
|
+bool success
|
||||||
|
}
|
||||||
|
|
||||||
|
PreprocessingConfigSchema "1" *-- "1..*" TableConfigSchema : enthält
|
||||||
|
```
|
||||||
|
|
||||||
|
### 1. TableConfigSchema
|
||||||
|
|
||||||
|
Definiert die Konfiguration für eine einzelne Tabelle, die aus Power BI gelesen und verarbeitet werden soll.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung | Beispiel |
|
||||||
|
|------|-----|--------------|----------|
|
||||||
|
| `name` | `str` | Name der Tabelle in der lokalen SQLite-Datenbank | `"ProductData"` |
|
||||||
|
| `powerbi_table_name` | `str` | Name der Quelltabelle im Power BI Dataset | `"products_raw"` |
|
||||||
|
| `measures` | `List[str]` | Liste von Power BI Measures, die abgerufen werden sollen (optional) | `["EP in CHF", "Gesamtbetrag"]` |
|
||||||
|
| `group_by_columns` | `List[str]` | Spalten für Gruppierung bei Measures (triggert SUMMARIZECOLUMNS) | `["m_Artikel", "Kategorie"]` |
|
||||||
|
| `steps` | `List[Dict[str, Any]]` | Liste von Preprocessing-Schritten | siehe unten |
|
||||||
|
|
||||||
|
**Unterstützte Preprocessing-Schritte:**
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
graph TD
|
||||||
|
A[Preprocessing Steps] --> B[keep]
|
||||||
|
A --> C[fillna]
|
||||||
|
A --> D[to_numeric]
|
||||||
|
A --> E[dropna]
|
||||||
|
|
||||||
|
B --> B1["columns: List[str]<br/>Behält nur angegebene Spalten"]
|
||||||
|
C --> C1["column: str, value: Any<br/>Füllt fehlende Werte"]
|
||||||
|
D --> D1["column: str, errors: str<br/>Konvertiert zu numerisch"]
|
||||||
|
E --> E1["subset: List[str]<br/>Entfernt Zeilen mit NaN"]
|
||||||
|
|
||||||
|
style A fill:#f96,stroke:#333,stroke-width:3px
|
||||||
|
style B fill:#9cf,stroke:#333,stroke-width:2px
|
||||||
|
style C fill:#9cf,stroke:#333,stroke-width:2px
|
||||||
|
style D fill:#9cf,stroke:#333,stroke-width:2px
|
||||||
|
style E fill:#9cf,stroke:#333,stroke-width:2px
|
||||||
|
```
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "ProductData",
|
||||||
|
"powerbi_table_name": "products_raw",
|
||||||
|
"measures": ["Total Sales", "Average Price"],
|
||||||
|
"group_by_columns": ["Category", "Supplier"],
|
||||||
|
"steps": [
|
||||||
|
{
|
||||||
|
"keep": {
|
||||||
|
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fillna": {
|
||||||
|
"column": "Supplier",
|
||||||
|
"value": "Unknown"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"to_numeric": {
|
||||||
|
"column": "Price",
|
||||||
|
"errors": "coerce"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dropna": {
|
||||||
|
"subset": ["ProductID", "ProductName", "Price"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. PreprocessingConfigSchema
|
||||||
|
|
||||||
|
Komplette Konfiguration für einen Preprocessing-Request. Kann mehrere Tabellen gleichzeitig verarbeiten.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung | Validierung |
|
||||||
|
|------|-----|--------------|-------------|
|
||||||
|
| `tables` | `List[TableConfigSchema]` | Liste von Tabellenkonfigurationen | min. 1 Tabelle |
|
||||||
|
|
||||||
|
**Vollständiges Request-Beispiel:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"tables": [
|
||||||
|
{
|
||||||
|
"name": "ProductData",
|
||||||
|
"powerbi_table_name": "products_raw",
|
||||||
|
"steps": [
|
||||||
|
{
|
||||||
|
"keep": {
|
||||||
|
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fillna": {
|
||||||
|
"column": "Supplier",
|
||||||
|
"value": "Unknown"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"to_numeric": {
|
||||||
|
"column": "Price",
|
||||||
|
"errors": "coerce"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dropna": {
|
||||||
|
"subset": ["ProductID", "ProductName", "Stock", "Unit", "Price"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "CustomerData",
|
||||||
|
"powerbi_table_name": "customers_raw",
|
||||||
|
"steps": [
|
||||||
|
{
|
||||||
|
"keep": {
|
||||||
|
"columns": ["CustomerID", "Name", "Email", "Region"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fillna": {
|
||||||
|
"column": "Region",
|
||||||
|
"value": "Unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. UpdateDbResponse
|
||||||
|
|
||||||
|
Einfache Response für Standard-DB-Updates (YAML-basiert).
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung |
|
||||||
|
|------|-----|--------------|
|
||||||
|
| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"success": true
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. UpdateDbWithConfigResponse
|
||||||
|
|
||||||
|
Detaillierte Response für JSON-basierte DB-Updates mit Konfiguration.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung |
|
||||||
|
|------|-----|--------------|
|
||||||
|
| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
|
||||||
|
| `tables_processed` | `int` | Anzahl der erfolgreich verarbeiteten Tabellen |
|
||||||
|
| `warnings` | `List[str]` | Liste von Warnungen während der Verarbeitung |
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"success": true,
|
||||||
|
"tables_processed": 2,
|
||||||
|
"warnings": [
|
||||||
|
"Table 'ProductData': 3 rows dropped due to missing values",
|
||||||
|
"Table 'ProductData': Column 'Price' had 5 non-numeric values coerced to NaN"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Preprocessing-Workflow
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
participant Client
|
||||||
|
participant API
|
||||||
|
participant PowerBI
|
||||||
|
participant Preprocessor
|
||||||
|
participant SQLite
|
||||||
|
|
||||||
|
Client->>API: POST /dataprocessor/update-db-with-config
|
||||||
|
Note over Client,API: PreprocessingConfigSchema
|
||||||
|
|
||||||
|
loop Für jede Tabelle
|
||||||
|
API->>PowerBI: Daten abrufen (powerbi_table_name)
|
||||||
|
PowerBI-->>API: Raw DataFrame
|
||||||
|
API->>Preprocessor: Preprocessing-Steps anwenden
|
||||||
|
Preprocessor->>Preprocessor: keep → fillna → to_numeric → dropna
|
||||||
|
Preprocessor-->>API: Verarbeiteter DataFrame
|
||||||
|
API->>SQLite: Tabelle speichern (name)
|
||||||
|
end
|
||||||
|
|
||||||
|
API-->>Client: UpdateDbWithConfigResponse
|
||||||
|
Note over Client,API: success, tables_processed, warnings
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Data Query Service
|
||||||
|
|
||||||
|
Der Data Query Service ermöglicht SQL-Abfragen auf die gespeicherte SQLite-Datenbank. Die Schemas befinden sich in `src/dataquery/schemas.py`.
|
||||||
|
|
||||||
|
### Datenmodell-Hierarchie
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
classDiagram
|
||||||
|
class SqlQueryRequest {
|
||||||
|
+str query
|
||||||
|
}
|
||||||
|
|
||||||
|
class SqlQueryResponse {
|
||||||
|
+bool success
|
||||||
|
+List~Dict~ data
|
||||||
|
+List~str~ columns
|
||||||
|
+int row_count
|
||||||
|
+Optional~str~ message
|
||||||
|
}
|
||||||
|
|
||||||
|
class DatabaseSchemaResponse {
|
||||||
|
+bool success
|
||||||
|
+List~TableInfo~ tables
|
||||||
|
+int table_count
|
||||||
|
}
|
||||||
|
|
||||||
|
class TableInfo {
|
||||||
|
+str name
|
||||||
|
+int row_count
|
||||||
|
}
|
||||||
|
|
||||||
|
class TableSchemaResponse {
|
||||||
|
+bool success
|
||||||
|
+str table_name
|
||||||
|
+List~ColumnInfo~ columns
|
||||||
|
+int row_count
|
||||||
|
+List~Dict~ sample_data
|
||||||
|
}
|
||||||
|
|
||||||
|
class ColumnInfo {
|
||||||
|
+str name
|
||||||
|
+str type
|
||||||
|
+bool nullable
|
||||||
|
+bool primary_key
|
||||||
|
}
|
||||||
|
|
||||||
|
DatabaseSchemaResponse "1" *-- "*" TableInfo : enthält
|
||||||
|
TableSchemaResponse "1" *-- "*" ColumnInfo : enthält
|
||||||
|
```
|
||||||
|
|
||||||
|
### 1. SqlQueryRequest
|
||||||
|
|
||||||
|
Request-Schema für SQL-Abfragen.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung | Validierung |
|
||||||
|
|------|-----|--------------|-------------|
|
||||||
|
| `query` | `str` | SQL-Query (nur SELECT erlaubt) | min. 1 Zeichen |
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"query": "SELECT ProductID, ProductName, Price FROM ProductData WHERE Price > 10 ORDER BY Price DESC LIMIT 100"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. SqlQueryResponse
|
||||||
|
|
||||||
|
Response-Schema für SQL-Query-Ergebnisse.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung |
|
||||||
|
|------|-----|--------------|
|
||||||
|
| `success` | `bool` | Gibt an, ob die Query erfolgreich ausgeführt wurde |
|
||||||
|
| `data` | `List[Dict[str, Any]]` | Query-Ergebnisse als Liste von Dictionaries |
|
||||||
|
| `columns` | `List[str]` | Spaltennamen im Ergebnis |
|
||||||
|
| `row_count` | `int` | Anzahl der zurückgegebenen Zeilen |
|
||||||
|
| `message` | `Optional[str]` | Zusätzliche Informationen oder Fehlermeldung |
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"success": true,
|
||||||
|
"columns": ["ProductID", "ProductName", "Price"],
|
||||||
|
"row_count": 3,
|
||||||
|
"data": [
|
||||||
|
{
|
||||||
|
"ProductID": 42,
|
||||||
|
"ProductName": "Premium Widget",
|
||||||
|
"Price": 99.99
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ProductID": 15,
|
||||||
|
"ProductName": "Deluxe Gadget",
|
||||||
|
"Price": 79.50
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ProductID": 8,
|
||||||
|
"ProductName": "Standard Tool",
|
||||||
|
"Price": 45.00
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"message": null
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. TableInfo
|
||||||
|
|
||||||
|
Informationen über eine Datenbanktabelle.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung |
|
||||||
|
|------|-----|--------------|
|
||||||
|
| `name` | `str` | Tabellenname |
|
||||||
|
| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
|
||||||
|
|
||||||
|
### 4. DatabaseSchemaResponse
|
||||||
|
|
||||||
|
Response-Schema für Datenbank-Schema-Informationen.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung |
|
||||||
|
|------|-----|--------------|
|
||||||
|
| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
|
||||||
|
| `tables` | `List[TableInfo]` | Liste aller Tabellen in der Datenbank |
|
||||||
|
| `table_count` | `int` | Gesamtanzahl der Tabellen |
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"success": true,
|
||||||
|
"tables": [
|
||||||
|
{
|
||||||
|
"name": "ProductData",
|
||||||
|
"row_count": 1523
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "CustomerData",
|
||||||
|
"row_count": 847
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "OrderData",
|
||||||
|
"row_count": 3421
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"table_count": 3
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 5. ColumnInfo
|
||||||
|
|
||||||
|
Informationen über eine Tabellenspalte.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung |
|
||||||
|
|------|-----|--------------|
|
||||||
|
| `name` | `str` | Spaltenname |
|
||||||
|
| `type` | `str` | Datentyp der Spalte |
|
||||||
|
| `nullable` | `bool` | Ob die Spalte NULL-Werte enthalten kann |
|
||||||
|
| `primary_key` | `bool` | Ob die Spalte ein Primary Key ist |
|
||||||
|
|
||||||
|
### 6. TableSchemaResponse
|
||||||
|
|
||||||
|
Response-Schema für detaillierte Tabellenstruktur-Informationen.
|
||||||
|
|
||||||
|
**Felder:**
|
||||||
|
|
||||||
|
| Feld | Typ | Beschreibung |
|
||||||
|
|------|-----|--------------|
|
||||||
|
| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
|
||||||
|
| `table_name` | `str` | Name der Tabelle |
|
||||||
|
| `columns` | `List[ColumnInfo]` | Liste aller Spalten in der Tabelle |
|
||||||
|
| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
|
||||||
|
| `sample_data` | `List[Dict[str, Any]]` | Beispieldaten (bis zu 5 Zeilen) |
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"success": true,
|
||||||
|
"table_name": "ProductData",
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"name": "ProductID",
|
||||||
|
"type": "INTEGER",
|
||||||
|
"nullable": false,
|
||||||
|
"primary_key": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "ProductName",
|
||||||
|
"type": "TEXT",
|
||||||
|
"nullable": false,
|
||||||
|
"primary_key": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Supplier",
|
||||||
|
"type": "TEXT",
|
||||||
|
"nullable": true,
|
||||||
|
"primary_key": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Price",
|
||||||
|
"type": "REAL",
|
||||||
|
"nullable": true,
|
||||||
|
"primary_key": false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"row_count": 1523,
|
||||||
|
"sample_data": [
|
||||||
|
{
|
||||||
|
"ProductID": 1,
|
||||||
|
"ProductName": "Widget A",
|
||||||
|
"Supplier": "Acme Corp",
|
||||||
|
"Price": 12.50
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ProductID": 2,
|
||||||
|
"ProductName": "Gadget B",
|
||||||
|
"Supplier": "TechCo",
|
||||||
|
"Price": 24.99
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ProductID": 3,
|
||||||
|
"ProductName": "Tool C",
|
||||||
|
"Supplier": "Unknown",
|
||||||
|
"Price": 8.75
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Query-Workflow
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
participant Client
|
||||||
|
participant API
|
||||||
|
participant SQLite
|
||||||
|
|
||||||
|
rect rgb(200, 220, 250)
|
||||||
|
Note over Client,SQLite: Szenario 1: SQL Query ausführen
|
||||||
|
Client->>API: POST /dataquery/query
|
||||||
|
Note over Client,API: SqlQueryRequest
|
||||||
|
API->>SQLite: SELECT ausführen
|
||||||
|
SQLite-->>API: Ergebnisse
|
||||||
|
API-->>Client: SqlQueryResponse
|
||||||
|
Note over Client,API: data, columns, row_count
|
||||||
|
end
|
||||||
|
|
||||||
|
rect rgb(220, 250, 220)
|
||||||
|
Note over Client,SQLite: Szenario 2: Datenbank-Schema abrufen
|
||||||
|
Client->>API: GET /dataquery/schema
|
||||||
|
API->>SQLite: Tabellen auflisten
|
||||||
|
SQLite-->>API: Tabellenliste
|
||||||
|
API-->>Client: DatabaseSchemaResponse
|
||||||
|
Note over Client,API: tables, table_count
|
||||||
|
end
|
||||||
|
|
||||||
|
rect rgb(250, 220, 220)
|
||||||
|
Note over Client,SQLite: Szenario 3: Tabellenstruktur abrufen
|
||||||
|
Client->>API: GET /dataquery/table/{table_name}
|
||||||
|
API->>SQLite: Spalten + Sample Data abrufen
|
||||||
|
SQLite-->>API: Struktur + Daten
|
||||||
|
API-->>Client: TableSchemaResponse
|
||||||
|
Note over Client,API: columns, sample_data
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Gesamtarchitektur
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
graph TB
|
||||||
|
subgraph "External Data Source"
|
||||||
|
PBI[Power BI Dataset]
|
||||||
|
end
|
||||||
|
|
||||||
|
subgraph "Gateway Preprocessing API"
|
||||||
|
subgraph "Data Processor Service"
|
||||||
|
DP_Router[Router]
|
||||||
|
DP_Service[Service]
|
||||||
|
DP_Schemas[Schemas]
|
||||||
|
DP_Domain[Domain Layer]
|
||||||
|
end
|
||||||
|
|
||||||
|
subgraph "Data Query Service"
|
||||||
|
DQ_Router[Router]
|
||||||
|
DQ_Service[Service]
|
||||||
|
DQ_Schemas[Schemas]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
subgraph "Local Storage"
|
||||||
|
DB[(SQLite Database)]
|
||||||
|
end
|
||||||
|
|
||||||
|
subgraph "Clients"
|
||||||
|
Client1[Web App]
|
||||||
|
Client2[BI Tool]
|
||||||
|
Client3[API Consumer]
|
||||||
|
end
|
||||||
|
|
||||||
|
PBI -->|Power BI Reader| DP_Domain
|
||||||
|
DP_Router --> DP_Service
|
||||||
|
DP_Service --> DP_Domain
|
||||||
|
DP_Domain -->|Save| DB
|
||||||
|
|
||||||
|
DQ_Router --> DQ_Service
|
||||||
|
DQ_Service -->|Query| DB
|
||||||
|
|
||||||
|
Client1 -->|POST /dataprocessor/update-db-with-config| DP_Router
|
||||||
|
Client2 -->|POST /dataquery/query| DQ_Router
|
||||||
|
Client3 -->|GET /dataquery/schema| DQ_Router
|
||||||
|
|
||||||
|
style PBI fill:#f9f,stroke:#333,stroke-width:2px
|
||||||
|
style DB fill:#bbf,stroke:#333,stroke-width:3px
|
||||||
|
style DP_Schemas fill:#ffa,stroke:#333,stroke-width:2px
|
||||||
|
style DQ_Schemas fill:#ffa,stroke:#333,stroke-width:2px
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## API-Endpunkte Übersicht
|
||||||
|
|
||||||
|
### Data Processor Endpoints
|
||||||
|
|
||||||
|
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|
||||||
|
|---------|----------|----------------|-----------------|--------------|
|
||||||
|
| POST | `/dataprocessor/update-db` | - | `UpdateDbResponse` | Aktualisiert DB mit YAML-Config |
|
||||||
|
| POST | `/dataprocessor/update-db-with-config` | `PreprocessingConfigSchema` | `UpdateDbWithConfigResponse` | Aktualisiert DB mit JSON-Config |
|
||||||
|
|
||||||
|
### Data Query Endpoints
|
||||||
|
|
||||||
|
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|
||||||
|
|---------|----------|----------------|-----------------|--------------|
|
||||||
|
| POST | `/dataquery/query` | `SqlQueryRequest` | `SqlQueryResponse` | Führt SQL-Query aus |
|
||||||
|
| GET | `/dataquery/schema` | - | `DatabaseSchemaResponse` | Gibt DB-Schema zurück |
|
||||||
|
| GET | `/dataquery/table/{table_name}` | - | `TableSchemaResponse` | Gibt Tabellenstruktur zurück |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Typische Use Cases
|
||||||
|
|
||||||
|
### Use Case 1: Daten aus Power BI laden und verarbeiten
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
flowchart LR
|
||||||
|
A[Client] -->|1. POST PreprocessingConfigSchema| B[API]
|
||||||
|
B -->|2. Daten abrufen| C[Power BI]
|
||||||
|
C -->|3. Raw Data| B
|
||||||
|
B -->|4. Preprocessing| B
|
||||||
|
B -->|5. Speichern| D[(SQLite)]
|
||||||
|
B -->|6. UpdateDbWithConfigResponse| A
|
||||||
|
|
||||||
|
style A fill:#bfb,stroke:#333,stroke-width:2px
|
||||||
|
style C fill:#f9f,stroke:#333,stroke-width:2px
|
||||||
|
style D fill:#bbf,stroke:#333,stroke-width:2px
|
||||||
|
```
|
||||||
|
|
||||||
|
### Use Case 2: Daten abfragen
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
flowchart LR
|
||||||
|
A[Client] -->|1. POST SqlQueryRequest| B[API]
|
||||||
|
B -->|2. SELECT Query| C[(SQLite)]
|
||||||
|
C -->|3. Ergebnisse| B
|
||||||
|
B -->|4. SqlQueryResponse| A
|
||||||
|
|
||||||
|
style A fill:#bfb,stroke:#333,stroke-width:2px
|
||||||
|
style C fill:#bbf,stroke:#333,stroke-width:2px
|
||||||
|
```
|
||||||
|
|
||||||
|
### Use Case 3: Datenbank-Struktur erkunden
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
flowchart TD
|
||||||
|
A[Client] -->|1. GET /schema| B[API]
|
||||||
|
B -->|2. DatabaseSchemaResponse| A
|
||||||
|
A -->|3. GET /table/ProductData| B
|
||||||
|
B -->|4. TableSchemaResponse| A
|
||||||
|
A -->|5. POST SqlQueryRequest| B
|
||||||
|
B -->|6. SqlQueryResponse| A
|
||||||
|
|
||||||
|
style A fill:#bfb,stroke:#333,stroke-width:2px
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Validierung und Fehlerbehandlung
|
||||||
|
|
||||||
|
Alle Schemas verwenden Pydantic für automatische Validierung:
|
||||||
|
|
||||||
|
- **Typsicherheit**: Alle Felder werden auf korrekte Typen geprüft
|
||||||
|
- **Required Fields**: Pflichtfelder müssen vorhanden sein
|
||||||
|
- **Min/Max Validierung**: z.B. `min_items=1` für `tables`
|
||||||
|
- **String Validierung**: z.B. `min_length=1` für SQL-Queries
|
||||||
|
- **Custom Validation**: Zusätzliche Business-Logik in den Services
|
||||||
|
|
||||||
|
Bei Validierungsfehlern gibt die API einen HTTP 422 (Unprocessable Entity) mit detaillierten Fehlermeldungen zurück.
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
PP_CONFIG_PATH="src/pp-config.yaml"
|
|
||||||
PP_API_KEY="kj823u90209mj020394jp2msakhfkjashjkf"
|
|
||||||
DB_ENDPOINT_API_KEY="ouho02j0rj2oijroi3rj2oijro23jr0990"
|
|
||||||
POWERBI_DATASET_ID="0e72b1f1-3d32-4caa-bc1a-e523b6232343"
|
|
||||||
POWERBI_CLIENT_ID="9f6fa2cf-3fe1-4ed5-a430-bb7e408c0d87"
|
|
||||||
POWERBI_CLIENT_SECRET="Vdy8Q~Bm2_5ooy-pgYtEgvA9-LjRN2HiXFw6Ody0"
|
|
||||||
POWERBI_TENANT_ID="6a51aaeb-2467-4186-9504-2a05aedc591f"
|
|
||||||
DATA_DIR="/home/data"
|
|
||||||
|
|
@ -4,6 +4,7 @@ from dataclasses import dataclass
|
||||||
from src.settings import settings
|
from src.settings import settings
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import httpx
|
import httpx
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
@ -13,31 +14,169 @@ class PowerBIReader:
|
||||||
table_name: str
|
table_name: str
|
||||||
base_url: str = settings.POWERBI_BASE_URL
|
base_url: str = settings.POWERBI_BASE_URL
|
||||||
include_nulls: bool = True
|
include_nulls: bool = True
|
||||||
|
measures: list[str] = None
|
||||||
|
group_by_columns: list[str] = None
|
||||||
|
batch_size: int = 10000
|
||||||
|
order_by_column: str | None = None
|
||||||
|
max_batches: int = 100
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create(
|
async def create(
|
||||||
cls, dataset_id: str, access_token: str, table_name: str, **kwargs
|
cls,
|
||||||
):
|
*,
|
||||||
|
dataset_id: str,
|
||||||
|
access_token: str,
|
||||||
|
table_name: str,
|
||||||
|
measures: list[str] = None,
|
||||||
|
group_by_columns: list[str] = None,
|
||||||
|
batch_size: int = 10000,
|
||||||
|
order_by_column: str | None = None,
|
||||||
|
max_batches: int = 100,
|
||||||
|
**kwargs,
|
||||||
|
) -> "PowerBIReader":
|
||||||
return cls(
|
return cls(
|
||||||
dataset_id=dataset_id,
|
dataset_id=dataset_id,
|
||||||
access_token=access_token,
|
access_token=access_token,
|
||||||
table_name=table_name,
|
table_name=table_name,
|
||||||
|
measures=measures or [],
|
||||||
|
group_by_columns=group_by_columns or [],
|
||||||
|
batch_size=batch_size,
|
||||||
|
order_by_column=order_by_column,
|
||||||
|
max_batches=max_batches,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _dax_query(self) -> str:
|
def _dax_query(self) -> str:
|
||||||
# Escape single quotes in table names per DAX rules
|
"""Generate DAX query based on configuration.
|
||||||
safe = self.table_name.replace("'", "''")
|
|
||||||
return f"EVALUATE '{safe}'"
|
|
||||||
|
|
||||||
async def read_data(self) -> pd.DataFrame:
|
Generates different DAX queries depending on whether measures and/or
|
||||||
|
group_by_columns are specified:
|
||||||
|
|
||||||
|
1. No measures: EVALUATE 'TableName'
|
||||||
|
Returns all physical/calculated columns from the table.
|
||||||
|
|
||||||
|
2. Measures only: EVALUATE ADDCOLUMNS('TableName', "Measure1", [Measure1], ...)
|
||||||
|
Returns all columns plus the specified measures.
|
||||||
|
|
||||||
|
3. Measures + group_by_columns: EVALUATE SUMMARIZECOLUMNS('Table'[Col1], ..., "Measure1", [Measure1], ...)
|
||||||
|
Returns aggregated measures grouped by the specified columns.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DAX query string to execute against Power BI.
|
||||||
"""
|
"""
|
||||||
Calls Power BI REST API: POST /datasets/{datasetId}/executeQueries
|
# Remove XML/HTML tags from table name (e.g., '<oii>Artikel</oii>' -> 'Artikel')
|
||||||
with DAX: EVALUATE 'TableName' and returns a DataFrame.
|
cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
|
||||||
|
# Escape single quotes in table names per DAX rules
|
||||||
|
safe_table = cleaned_table_name.replace("'", "''")
|
||||||
|
|
||||||
|
# Case 1: No measures - simple table evaluation
|
||||||
|
if not self.measures:
|
||||||
|
return f"EVALUATE '{safe_table}'"
|
||||||
|
|
||||||
|
# Case 2: Measures without grouping - use ADDCOLUMNS
|
||||||
|
if not self.group_by_columns:
|
||||||
|
measure_clauses = ", ".join(
|
||||||
|
[f'"{measure}", [{measure}]' for measure in self.measures]
|
||||||
|
)
|
||||||
|
return f"EVALUATE ADDCOLUMNS('{safe_table}', {measure_clauses})"
|
||||||
|
|
||||||
|
# Case 3: Measures with grouping - use SUMMARIZECOLUMNS
|
||||||
|
group_cols = ", ".join(
|
||||||
|
[f"'{safe_table}'[{col}]" for col in self.group_by_columns]
|
||||||
|
)
|
||||||
|
measure_clauses = ", ".join(
|
||||||
|
[f'"{measure}", [{measure}]' for measure in self.measures]
|
||||||
|
)
|
||||||
|
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_numeric_value(value: str | int | float | None) -> bool:
|
||||||
|
"""Check if a value is numeric (including numeric strings).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: The value to check.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the value is numeric or a string that represents a number.
|
||||||
"""
|
"""
|
||||||
|
if value is None:
|
||||||
|
return False
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
return True
|
||||||
|
if isinstance(value, str):
|
||||||
|
try:
|
||||||
|
float(value)
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _dax_query_batch(self, last_value: str | int | float | None = None) -> str:
|
||||||
|
"""Generate a batched DAX query using TOPN and keyset pagination.
|
||||||
|
|
||||||
|
Uses ORDER BY with the order_by_column for deterministic ordering,
|
||||||
|
and FILTER to skip already-fetched rows based on the last seen value.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
last_value: The last value of order_by_column from the previous batch.
|
||||||
|
None for the first batch.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DAX query string for fetching the next batch.
|
||||||
|
"""
|
||||||
|
# Remove XML/HTML tags from table name (e.g., '<oii>Artikel</oii>' -> 'Artikel')
|
||||||
|
cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
|
||||||
|
# Escape single quotes in table names per DAX rules
|
||||||
|
safe_table = cleaned_table_name.replace("'", "''")
|
||||||
|
order_col = self.order_by_column
|
||||||
|
|
||||||
|
if last_value is None:
|
||||||
|
# First batch: just use TOPN with ORDER BY
|
||||||
|
return (
|
||||||
|
f"EVALUATE TOPN({self.batch_size}, '{safe_table}', "
|
||||||
|
f"'{safe_table}'[{order_col}], ASC)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subsequent batches: filter rows where order_col > last_value
|
||||||
|
# IMPORTANT: Handle type conversion to avoid DAX type mismatch errors
|
||||||
|
# Use a type-safe approach: convert column using IF/ISNUMBER to handle both text and numeric columns
|
||||||
|
if self._is_numeric_value(last_value):
|
||||||
|
numeric_val = float(last_value) if '.' in str(last_value) else int(last_value)
|
||||||
|
# Use IF to check if column is numeric, if so use it directly, otherwise convert with VALUE()
|
||||||
|
# This handles both text columns (VALUE converts) and numeric columns (use directly)
|
||||||
|
return (
|
||||||
|
f"EVALUATE TOPN({self.batch_size}, "
|
||||||
|
f"FILTER('{safe_table}', "
|
||||||
|
f"NOT(ISBLANK('{safe_table}'[{order_col}])) && "
|
||||||
|
f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])) > {numeric_val}), "
|
||||||
|
f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])), ASC)"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# String comparison - escape quotes in the value
|
||||||
|
escaped_value = str(last_value).replace('"', '""')
|
||||||
|
return (
|
||||||
|
f"EVALUATE TOPN({self.batch_size}, "
|
||||||
|
f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > \"{escaped_value}\"), "
|
||||||
|
f"'{safe_table}'[{order_col}], ASC)"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _execute_query(self, dax_query: str) -> pd.DataFrame:
|
||||||
|
"""Execute a DAX query and return the results as a DataFrame.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dax_query: The DAX query string to execute.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DataFrame containing the query results.
|
||||||
|
"""
|
||||||
|
# Log the DAX query for debugging
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.info(f"Executing DAX query: {dax_query}")
|
||||||
|
|
||||||
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
|
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
|
||||||
body = {
|
body = {
|
||||||
"queries": [{"query": self._dax_query()}],
|
"queries": [{"query": dax_query}],
|
||||||
"serializerSettings": {"includeNulls": self.include_nulls},
|
"serializerSettings": {"includeNulls": self.include_nulls},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -50,6 +189,7 @@ class PowerBIReader:
|
||||||
resp = await client.post(url, headers=headers, json=body)
|
resp = await client.post(url, headers=headers, json=body)
|
||||||
|
|
||||||
if resp.status_code != 200:
|
if resp.status_code != 200:
|
||||||
|
logger.error(f"DAX query failed: {dax_query}")
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
|
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
|
||||||
)
|
)
|
||||||
|
|
@ -74,6 +214,116 @@ class PowerBIReader:
|
||||||
df.columns = [_strip_qual(c) for c in df.columns]
|
df.columns = [_strip_qual(c) for c in df.columns]
|
||||||
return df
|
return df
|
||||||
|
|
||||||
|
async def read_data(self) -> pd.DataFrame:
|
||||||
|
"""Fetch data from Power BI, using batching if order_by_column is set.
|
||||||
|
|
||||||
|
If order_by_column is configured, fetches data in batches using
|
||||||
|
keyset pagination to avoid the Power BI API's 1M value limit.
|
||||||
|
Otherwise, fetches all data in a single query (legacy behavior).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DataFrame containing all fetched data.
|
||||||
|
"""
|
||||||
|
# Legacy behavior: no batching if order_by_column not set
|
||||||
|
if not self.order_by_column:
|
||||||
|
return await self._execute_query(self._dax_query())
|
||||||
|
|
||||||
|
# Batch fetching with keyset pagination
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
all_dfs: list[pd.DataFrame] = []
|
||||||
|
last_value: str | int | None = None
|
||||||
|
batch_num = 0
|
||||||
|
total_rows = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
batch_num += 1
|
||||||
|
|
||||||
|
# Safety limit to prevent runaway requests
|
||||||
|
if batch_num > self.max_batches:
|
||||||
|
logger.warning(
|
||||||
|
f"Reached max_batches limit ({self.max_batches}) for table "
|
||||||
|
f"'{self.table_name}'. Stopping batch fetch. "
|
||||||
|
f"Total rows fetched so far: {total_rows}"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
dax_query = self._dax_query_batch(last_value)
|
||||||
|
df = await self._execute_query(dax_query)
|
||||||
|
|
||||||
|
if df.empty:
|
||||||
|
logger.info(f"Batch {batch_num}: Empty result, stopping fetch")
|
||||||
|
break
|
||||||
|
|
||||||
|
batch_rows = len(df)
|
||||||
|
total_rows += batch_rows
|
||||||
|
|
||||||
|
# Log batch info
|
||||||
|
if self.order_by_column in df.columns:
|
||||||
|
min_val = df[self.order_by_column].min()
|
||||||
|
max_val = df[self.order_by_column].max()
|
||||||
|
logger.info(
|
||||||
|
f"Batch {batch_num}: Fetched {batch_rows} rows, "
|
||||||
|
f"{self.order_by_column} range: {min_val} to {max_val}, "
|
||||||
|
f"Total so far: {total_rows}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(f"Batch {batch_num}: Fetched {batch_rows} rows, Total so far: {total_rows}")
|
||||||
|
|
||||||
|
all_dfs.append(df)
|
||||||
|
|
||||||
|
# Get the maximum value for the next batch (not just the last row)
|
||||||
|
# This ensures we don't skip rows when there are multiple rows per ID
|
||||||
|
# Using max() instead of iloc[-1] because rows are ordered by I_ID,
|
||||||
|
# but if there are multiple rows with the same I_ID, the last row might
|
||||||
|
# have a smaller I_ID than the maximum in the batch
|
||||||
|
max_value = df[self.order_by_column].max()
|
||||||
|
|
||||||
|
# Ensure numeric values are preserved as numeric (not converted to string)
|
||||||
|
# Check if the column is numeric and convert the value accordingly
|
||||||
|
if pd.api.types.is_numeric_dtype(df[self.order_by_column].dtype):
|
||||||
|
# Column is numeric - ensure value is numeric
|
||||||
|
if pd.isna(max_value):
|
||||||
|
logger.info(f"Batch {batch_num}: Max value is NaN, stopping")
|
||||||
|
break # Can't continue with NaN
|
||||||
|
# Convert to appropriate numeric type
|
||||||
|
if pd.api.types.is_integer_dtype(df[self.order_by_column].dtype):
|
||||||
|
new_last_value = int(max_value)
|
||||||
|
else:
|
||||||
|
new_last_value = float(max_value)
|
||||||
|
else:
|
||||||
|
new_last_value = max_value
|
||||||
|
|
||||||
|
# Safety check: if last_value didn't change, we're stuck in a loop
|
||||||
|
if new_last_value == last_value:
|
||||||
|
logger.warning(
|
||||||
|
f"Batch {batch_num}: last_value didn't change ({last_value}), "
|
||||||
|
f"stopping to avoid infinite loop"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
last_value = new_last_value
|
||||||
|
|
||||||
|
logger.info(f"Finished fetching batches for table '{self.table_name}': {batch_num} batches, {total_rows} total rows")
|
||||||
|
|
||||||
|
if not all_dfs:
|
||||||
|
return pd.DataFrame()
|
||||||
|
|
||||||
|
result = pd.concat(all_dfs, ignore_index=True)
|
||||||
|
|
||||||
|
# Log statistics
|
||||||
|
if self.order_by_column and self.order_by_column in result.columns:
|
||||||
|
unique_ids = result[self.order_by_column].nunique()
|
||||||
|
total_rows_final = len(result)
|
||||||
|
logger.info(
|
||||||
|
f"Table '{self.table_name}': {total_rows_final} total rows, "
|
||||||
|
f"{unique_ids} unique {self.order_by_column} values, "
|
||||||
|
f"average {total_rows_final / unique_ids:.2f} rows per {self.order_by_column}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_access_token_sync(
|
def _get_access_token_sync(
|
||||||
tenant_id: str,
|
tenant_id: str,
|
||||||
|
|
|
||||||
|
|
@ -147,6 +147,11 @@ class TableConfig:
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
powerbi_table_name: str
|
powerbi_table_name: str
|
||||||
|
measures: List[str] = field(default_factory=list)
|
||||||
|
group_by_columns: List[str] = field(default_factory=list)
|
||||||
|
batch_size: int = 10000
|
||||||
|
order_by_column: str | None = None
|
||||||
|
max_batches: int = 100
|
||||||
steps: List[Dict[str, Any]] = field(default_factory=list)
|
steps: List[Dict[str, Any]] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -184,6 +189,8 @@ class Preprocessor:
|
||||||
table_config = TableConfig(
|
table_config = TableConfig(
|
||||||
name=table_data.get("name", ""),
|
name=table_data.get("name", ""),
|
||||||
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||||
|
measures=table_data.get("measures", []),
|
||||||
|
group_by_columns=table_data.get("group_by_columns", []),
|
||||||
steps=table_data.get("steps", []),
|
steps=table_data.get("steps", []),
|
||||||
)
|
)
|
||||||
table_configs.append(table_config)
|
table_configs.append(table_config)
|
||||||
|
|
@ -253,6 +260,11 @@ class Preprocessor:
|
||||||
table_config = TableConfig(
|
table_config = TableConfig(
|
||||||
name=table_data.get("name", ""),
|
name=table_data.get("name", ""),
|
||||||
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||||
|
measures=table_data.get("measures", []),
|
||||||
|
group_by_columns=table_data.get("group_by_columns", []),
|
||||||
|
batch_size=table_data.get("batch_size", 10000),
|
||||||
|
order_by_column=table_data.get("order_by_column"),
|
||||||
|
max_batches=table_data.get("max_batches", 100),
|
||||||
steps=table_data.get("steps", []),
|
steps=table_data.get("steps", []),
|
||||||
)
|
)
|
||||||
table_configs.append(table_config)
|
table_configs.append(table_config)
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,53 @@ async def update_db_with_config(
|
||||||
3. **Saves to local database**: The processed data is saved to the local SQLite
|
3. **Saves to local database**: The processed data is saved to the local SQLite
|
||||||
database with the specified table name.
|
database with the specified table name.
|
||||||
|
|
||||||
|
## Power BI Measures Support
|
||||||
|
|
||||||
|
In addition to retrieving physical/calculated columns from Power BI tables, you can
|
||||||
|
now retrieve Power BI measures using the optional `measures` and `group_by_columns`
|
||||||
|
fields in your table configuration.
|
||||||
|
|
||||||
|
### Retrieving Measures
|
||||||
|
|
||||||
|
Power BI measures are calculated values that live only in the model and are computed
|
||||||
|
at query time. To retrieve them alongside your table data, add a `measures` array
|
||||||
|
to your table configuration:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "Einkaufspreis",
|
||||||
|
"powerbi_table_name": "Einkaufspreis",
|
||||||
|
"measures": ["EP in CHF", "Gesamtbetrag in CHF"],
|
||||||
|
"steps": [...]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
This uses the DAX ADDCOLUMNS pattern: `EVALUATE ADDCOLUMNS('TableName', "MeasureName", [MeasureName], ...)`
|
||||||
|
|
||||||
|
### Grouping Measures
|
||||||
|
|
||||||
|
If your measures need to be aggregated by specific columns, add the `group_by_columns`
|
||||||
|
field. This is useful when measures are defined with aggregation functions:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "Einkaufspreis_Aggregated",
|
||||||
|
"powerbi_table_name": "Einkaufspreis",
|
||||||
|
"measures": ["EP in CHF", "Gesamtbetrag in CHF"],
|
||||||
|
"group_by_columns": ["m_Artikel"],
|
||||||
|
"steps": [...]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
This uses the DAX SUMMARIZECOLUMNS pattern: `EVALUATE SUMMARIZECOLUMNS('Table'[Column], "MeasureName", [MeasureName], ...)`
|
||||||
|
|
||||||
|
### Measure Name Formatting
|
||||||
|
|
||||||
|
- Measure names with spaces are automatically handled (e.g., "EP in CHF" becomes `[EP in CHF]` in DAX)
|
||||||
|
- If `measures` is empty or not provided, the standard table evaluation is used
|
||||||
|
- If `measures` is provided without `group_by_columns`, ADDCOLUMNS is used
|
||||||
|
- If both `measures` and `group_by_columns` are provided, SUMMARIZECOLUMNS is used
|
||||||
|
|
||||||
## Available Preprocessing Steps
|
## Available Preprocessing Steps
|
||||||
|
|
||||||
The following preprocessing steps are supported. Each step is specified as a
|
The following preprocessing steps are supported. Each step is specified as a
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ class TableConfigSchema(BaseModel):
|
||||||
Attributes:
|
Attributes:
|
||||||
name: The name to use for the table in the local SQLite database
|
name: The name to use for the table in the local SQLite database
|
||||||
powerbi_table_name: The name of the source table in Power BI dataset
|
powerbi_table_name: The name of the source table in Power BI dataset
|
||||||
|
measures: Optional list of Power BI measures to retrieve
|
||||||
|
group_by_columns: Optional list of columns to group by when retrieving measures
|
||||||
steps: List of preprocessing steps to apply to the table data
|
steps: List of preprocessing steps to apply to the table data
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -29,6 +31,31 @@ class TableConfigSchema(BaseModel):
|
||||||
description="Name of the table in the Power BI dataset",
|
description="Name of the table in the Power BI dataset",
|
||||||
example="data_full",
|
example="data_full",
|
||||||
)
|
)
|
||||||
|
measures: List[str] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="List of Power BI measure names to retrieve",
|
||||||
|
example=["EP in CHF", "Gesamtbetrag in CHF"],
|
||||||
|
)
|
||||||
|
group_by_columns: List[str] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)",
|
||||||
|
example=["m_Artikel"],
|
||||||
|
)
|
||||||
|
batch_size: int = Field(
|
||||||
|
default=10000,
|
||||||
|
description="Number of rows to fetch per batch (for large tables)",
|
||||||
|
example=10000,
|
||||||
|
)
|
||||||
|
order_by_column: str | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Column to order by for batch fetching. Required for batching to work.",
|
||||||
|
example="I_ID",
|
||||||
|
)
|
||||||
|
max_batches: int = Field(
|
||||||
|
default=100,
|
||||||
|
description="Maximum number of batches to fetch. Safety limit to prevent runaway requests.",
|
||||||
|
example=100,
|
||||||
|
)
|
||||||
steps: List[Dict[str, Any]] = Field(
|
steps: List[Dict[str, Any]] = Field(
|
||||||
default_factory=list,
|
default_factory=list,
|
||||||
description="List of preprocessing steps to apply",
|
description="List of preprocessing steps to apply",
|
||||||
|
|
|
||||||
|
|
@ -54,6 +54,10 @@ class DataProcessorService:
|
||||||
dataset_id=settings.POWERBI_DATASET_ID,
|
dataset_id=settings.POWERBI_DATASET_ID,
|
||||||
access_token=self.access_token,
|
access_token=self.access_token,
|
||||||
table_name=table_config.powerbi_table_name,
|
table_name=table_config.powerbi_table_name,
|
||||||
|
measures=table_config.measures,
|
||||||
|
group_by_columns=table_config.group_by_columns,
|
||||||
|
batch_size=getattr(table_config, 'batch_size', 10000),
|
||||||
|
order_by_column=getattr(table_config, 'order_by_column', None),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 2: Read data from Power BI
|
# Step 2: Read data from Power BI
|
||||||
|
|
@ -147,6 +151,11 @@ class DataProcessorService:
|
||||||
dataset_id=settings.POWERBI_DATASET_ID,
|
dataset_id=settings.POWERBI_DATASET_ID,
|
||||||
access_token=self.access_token,
|
access_token=self.access_token,
|
||||||
table_name=table_config.powerbi_table_name,
|
table_name=table_config.powerbi_table_name,
|
||||||
|
measures=table_config.measures,
|
||||||
|
group_by_columns=table_config.group_by_columns,
|
||||||
|
batch_size=table_config.batch_size,
|
||||||
|
order_by_column=table_config.order_by_column,
|
||||||
|
max_batches=table_config.max_batches,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 2: Read data from Power BI
|
# Step 2: Read data from Power BI
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,53 @@ class DataQueryService:
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def _enforce_query_limit(self, *, query: str) -> str:
|
||||||
|
"""Enforce maximum row limit on query.
|
||||||
|
|
||||||
|
If query has LIMIT > SQL_ROW_LIMIT, replace with SQL_ROW_LIMIT.
|
||||||
|
If query has no LIMIT, append LIMIT SQL_ROW_LIMIT.
|
||||||
|
If query has LIMIT <= SQL_ROW_LIMIT, keep as is.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query: The SQL query to enforce limit on.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Query with enforced limit.
|
||||||
|
"""
|
||||||
|
max_limit = settings.SQL_ROW_LIMIT
|
||||||
|
|
||||||
|
# Strip trailing semicolons and whitespace to prevent multi-statement errors
|
||||||
|
query = query.rstrip("; \t\n\r")
|
||||||
|
|
||||||
|
# Remove comments and normalize whitespace for parsing
|
||||||
|
cleaned_query = re.sub(r"--.*$", "", query, flags=re.MULTILINE)
|
||||||
|
cleaned_query = re.sub(r"/\*.*?\*/", "", cleaned_query, flags=re.DOTALL)
|
||||||
|
|
||||||
|
# Look for LIMIT clause (case insensitive)
|
||||||
|
# Pattern matches: LIMIT <number> or LIMIT <number> OFFSET <number>
|
||||||
|
limit_pattern = r"\bLIMIT\s+(\d+)(\s+OFFSET\s+\d+)?\s*$"
|
||||||
|
match = re.search(limit_pattern, cleaned_query, re.IGNORECASE)
|
||||||
|
|
||||||
|
if match:
|
||||||
|
# Extract the current limit value
|
||||||
|
current_limit = int(match.group(1))
|
||||||
|
|
||||||
|
if current_limit > max_limit:
|
||||||
|
# Replace with max_limit while preserving OFFSET if present
|
||||||
|
offset_clause = match.group(2) or ""
|
||||||
|
# Find the position in the original query to replace
|
||||||
|
# Use the original query to preserve formatting
|
||||||
|
original_match = re.search(limit_pattern, query, re.IGNORECASE)
|
||||||
|
if original_match:
|
||||||
|
new_limit_clause = f"LIMIT {max_limit}{offset_clause}"
|
||||||
|
query = query[: original_match.start()] + new_limit_clause
|
||||||
|
# If current_limit <= max_limit, keep query as is
|
||||||
|
else:
|
||||||
|
# No LIMIT clause found, append one
|
||||||
|
query = f"{query.rstrip()} LIMIT {max_limit}"
|
||||||
|
|
||||||
|
return query
|
||||||
|
|
||||||
async def execute_query(self, *, query: str) -> SqlQueryResponse:
|
async def execute_query(self, *, query: str) -> SqlQueryResponse:
|
||||||
"""Execute a SELECT query and return the results.
|
"""Execute a SELECT query and return the results.
|
||||||
|
|
||||||
|
|
@ -90,6 +137,9 @@ class DataQueryService:
|
||||||
message="Only SELECT queries are allowed",
|
message="Only SELECT queries are allowed",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Enforce row limit on the query
|
||||||
|
query = self._enforce_query_limit(query=query)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with self.engine.begin() as conn:
|
async with self.engine.begin() as conn:
|
||||||
result = await conn.execute(text(query))
|
result = await conn.execute(text(query))
|
||||||
|
|
|
||||||
|
|
@ -30,3 +30,30 @@ tables:
|
||||||
"Einheit",
|
"Einheit",
|
||||||
"EP in CHF",
|
"EP in CHF",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Example: Retrieving Power BI measures with ADDCOLUMNS
|
||||||
|
# Uncomment to retrieve measures alongside all table columns
|
||||||
|
# - name: "Einkaufspreis_With_Measures"
|
||||||
|
# powerbi_table_name: "Einkaufspreis"
|
||||||
|
# measures:
|
||||||
|
# - "EP in CHF"
|
||||||
|
# - "Gesamtbetrag in CHF"
|
||||||
|
# steps:
|
||||||
|
# - to_numeric:
|
||||||
|
# column: "EP_CHF"
|
||||||
|
# errors: "coerce"
|
||||||
|
# - dropna:
|
||||||
|
# subset: ["EP_CHF"]
|
||||||
|
|
||||||
|
# Example: Retrieving aggregated measures with SUMMARIZECOLUMNS
|
||||||
|
# Uncomment to retrieve measures grouped by specific columns
|
||||||
|
# - name: "Einkaufspreis_Aggregated"
|
||||||
|
# powerbi_table_name: "Einkaufspreis"
|
||||||
|
# measures:
|
||||||
|
# - "EP in CHF"
|
||||||
|
# - "Gesamtbetrag in CHF"
|
||||||
|
# group_by_columns:
|
||||||
|
# - "m_Artikel"
|
||||||
|
# steps:
|
||||||
|
# - dropna:
|
||||||
|
# subset: ["m_Artikel"]
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,14 @@ class Settings(BaseSettings):
|
||||||
description="Path to the SQLite database.",
|
description="Path to the SQLite database.",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# --- Database Query Settings ---
|
||||||
|
|
||||||
|
# Maximum number of rows to return from SQL queries.
|
||||||
|
SQL_ROW_LIMIT: int = Field(
|
||||||
|
default=50,
|
||||||
|
description="Maximum number of rows to return from SQL queries. Defaults to 50.",
|
||||||
|
)
|
||||||
|
|
||||||
# --- API Keys ---
|
# --- API Keys ---
|
||||||
|
|
||||||
# Preprocessor API key to access this app.
|
# Preprocessor API key to access this app.
|
||||||
|
|
|
||||||
1
tests/dataquery/__init__.py
Normal file
1
tests/dataquery/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
"""Data query tests."""
|
||||||
Loading…
Reference in a new issue