Compare commits

..

31 commits

Author SHA1 Message Date
8b2ac4d467 chore: remove placeholder Azure workflows (no Infomaniak VM yet)
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 03:04:12 +02:00
21d84a099c fix: use full GitHub URLs for non-mirrored actions (azure, astral-sh)
Some checks failed
Deploy althaus-preprocessing / build (push) Failing after 55s
Deploy althaus-preprocessing / deploy (push) Has been skipped
Deploy poweron-preprocessing / build (push) Failing after 18s
Deploy poweron-preprocessing / deploy (push) Has been skipped
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 02:42:30 +02:00
b9e73a728d refactor: migrate to Forgejo workflows, remove GitHub Actions
Some checks failed
Deploy althaus-preprocessing / build (push) Failing after 23s
Deploy althaus-preprocessing / deploy (push) Has been skipped
Deploy poweron-preprocessing / build (push) Failing after 2s
Deploy poweron-preprocessing / deploy (push) Has been skipped
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 02:34:22 +02:00
9bca05dea2 finished preprocessing batching 2025-12-23 13:04:46 +01:00
idittrich-valueon
3a28fbd5e6
Merge pull request #12 from valueonag/fix/preprocessing-limits
feat: Implement `max_batches` safety limit for Power BI data fetching…
2025-12-23 11:29:37 +01:00
Christopher Gondek
c2f11b92fd feat: Implement max_batches safety limit for Power BI data fetching and refine DAX batch query type handling for numeric and string values. 2025-12-23 11:28:55 +01:00
idittrich-valueon
f5f8dfcb80
Merge pull request #11 from valueonag/fix/preprocessing-limits
feat: Add batching and keyset pagination to Power BI data fetching us…
2025-12-23 11:15:09 +01:00
Christopher Gondek
af6150e2fb feat: Add batching and keyset pagination to Power BI data fetching using batch_size and order_by_column parameters. 2025-12-23 11:14:29 +01:00
Christopher Gondek
5b8daa4e49
Merge pull request #10 from valueonag/chore/limit-row-count
fix: sanitize SQL queries by removing trailing semicolons
2025-11-05 11:24:39 +01:00
Christopher Gondek
3fbb41b980 fix: sanitize SQL queries by removing trailing semicolons 2025-11-05 11:23:54 +01:00
Christopher Gondek
471ad42912
Merge pull request #9 from valueonag/chore/limit-row-count
chore: enforce query row limit
2025-11-05 11:02:43 +01:00
Christopher Gondek
dde61f447d chore: enforce query row limit 2025-11-05 11:02:24 +01:00
Christopher Gondek
e7dd3ea999
Merge pull request #8 from valueonag/feat/powerbi-measures
feat: support powerbi measures
2025-11-05 08:36:01 +01:00
Christopher Gondek
d9deb09d1b feat: support powerbi measures 2025-11-05 08:34:19 +01:00
Christopher Gondek
837e0c1d29
Merge pull request #7 from valueonag/feat/update-althaus-yml
chore: update althaus deploy yml
2025-11-03 12:26:15 +01:00
Christopher Gondek
40fc3db398 chore: update althaus deploy yml 2025-11-03 12:19:58 +01:00
ValueOn AG
9fb20a6cda Add or update the Azure App Service build and deployment workflow config 2025-11-03 11:53:13 +01:00
Christopher Gondek
9247e88dc3
Merge pull request #6 from valueonag/feat/cleanup-gh-actions
chore: comments
2025-11-03 09:35:17 +01:00
Christopher Gondek
91bffe884d chore: comments 2025-11-03 09:34:48 +01:00
Christopher Gondek
21c83fbbac
Merge pull request #5 from valueonag/chore/remove-env-prod-file
fix: deployment
2025-11-03 08:28:37 +01:00
Christopher Gondek
8391681743 fix: deployment 2025-11-03 08:27:48 +01:00
Christopher Gondek
2dc215e197
Merge pull request #4 from valueonag/chore/remove-env-prod-file
chore: remove env prod file
2025-11-03 08:24:43 +01:00
Christopher Gondek
0dca581300 chore: remove env prod file 2025-11-03 08:24:24 +01:00
Christopher Gondek
30f4b0e781
Merge pull request #3 from valueonag/feat/file-free-config
feat: yaml-free preprocessing
2025-11-03 07:56:54 +01:00
Christopher Gondek
56869051df feat: yaml-free preprocessing 2025-10-27 16:56:04 +01:00
Christopher Gondek
833df786a8 fix: key schemas 2025-10-17 10:53:09 +02:00
Christopher Gondek
1fe6a06a5f fix: api key header 2025-10-17 10:30:56 +02:00
Christopher Gondek
b3193ee56d fix: datasaver for Azure 2025-10-15 11:42:44 +02:00
Christopher Gondek
845636a424 fix: try fix for data dir in Azure 2025-10-15 11:24:56 +02:00
Christopher Gondek
40a92ccafd chore: add app.py for azure deploy 2025-10-15 11:02:39 +02:00
Christopher Gondek
187127458e
Merge pull request #2 from valueonag/feat/deploy
chore: prepare deployment
2025-10-15 10:35:29 +02:00
17 changed files with 1646 additions and 105 deletions

View file

@ -1,6 +1,10 @@
# Preprocessor Configuration
# Path to the preprocessor configuration YAML file
PP_CONFIG_PATH="/path/to/your/pp-config.yaml"
# Path to the preprocessor configuration YAML file (relative to project root)
# OPTIONAL: Only if you want to use /api/v1/dataprocessor/update-db endpoint
# Otherwise you can directly pass preprocessor config in the request body
# at /api/v1/dataprocessor/update-db-with-config
# Example YAML: pp-config.yaml in the src/ directory
PP_CONFIG_PATH="src/pp-config.yaml"
# API Keys
# API key for the preprocessor service
@ -10,8 +14,9 @@ PP_API_KEY="your-preprocessor-api-key-here"
DB_ENDPOINT_API_KEY="your-database-endpoint-api-key-here"
# Database Configuration
# Path to the SQLite database file
DB_URL="/path/to/your/database.db"
# Note: DB_PATH is automatically set based on DATA_DIR environment variable
# Local: defaults to data/database.sqlite
# Azure: set DATA_DIR=/home/_data in Azure App Settings
# Power BI Configuration
# Power BI dataset identifier

View file

@ -1,70 +0,0 @@
name: Build and deploy Python app to Azure Web App - poweron-preprocessing
on:
push:
branches: [main]
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout
uses: actions/checkout@v4
# ---------- BACKEND / PYTHON ----------
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.13"
- name: Install uv
uses: astral-sh/setup-uv@v6
# ---------- ARTIFACT ----------
- name: Upload artifact for deployment jobs
uses: actions/upload-artifact@v4
with:
name: python-app
path: |
.
!venv/
!.venv/
!.git/
!.cache/
deploy:
runs-on: ubuntu-latest
needs: build
permissions:
id-token: write # <-- required for OIDC
contents: read
steps:
- name: Download artifact from build job
uses: actions/download-artifact@v4
with:
name: python-app
- name: Login to Azure (OIDC)
uses: azure/login@v2
with:
client-id: ${{ secrets.AZUREAPPSERVICE_CLIENTID_AA4B9998A69E4C5C8FDF357E3FEAADD5 }}
tenant-id: ${{ secrets.AZUREAPPSERVICE_TENANTID_CC57AD1F29D44DDA960AE3EAC6D2C27A }}
subscription-id: ${{ secrets.AZUREAPPSERVICE_SUBSCRIPTIONID_4CD1D97C506D403E8E284466DB4E7898 }}
# Runtime env files (required in your setup)
- name: Set productive environment (runtime)
run: |
set -euo pipefail
cp env_prod.env .env
- name: Deploy to Azure Web App
uses: azure/webapps-deploy@v3
with:
app-name: "poweron-preprocessing"
slot-name: "Production"
package: . # reuse the downloaded artifact folder
# no publish-profile here; it will use the az login from azure/login

667
README.md
View file

@ -0,0 +1,667 @@
# Gateway Preprocessing - Datenschnittstellen (Datenmodell)
## Übersicht
Dieses Projekt bietet zwei Hauptservices mit klar definierten Datenschnittstellen:
1. **Data Processor Service** (`/dataprocessor`) - Lädt Daten aus Power BI, verarbeitet sie und speichert sie in SQLite
2. **Data Query Service** (`/dataquery`) - Ermöglicht SQL-Abfragen auf die gespeicherten Daten
```mermaid
graph LR
A[Power BI Dataset] -->|Data Processor| B[SQLite Database]
B -->|Data Query| C[Client/API Consumer]
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#bbf,stroke:#333,stroke-width:2px
style C fill:#bfb,stroke:#333,stroke-width:2px
```
---
## Data Processor Service
Der Data Processor Service verarbeitet Daten aus Power BI und speichert sie in einer lokalen SQLite-Datenbank. Die Schemas befinden sich in `src/dataprocessor/schemas.py`.
### Datenmodell-Hierarchie
```mermaid
classDiagram
class PreprocessingConfigSchema {
+List~TableConfigSchema~ tables
}
class TableConfigSchema {
+str name
+str powerbi_table_name
+List~str~ measures
+List~str~ group_by_columns
+List~Dict~ steps
}
class UpdateDbWithConfigResponse {
+bool success
+int tables_processed
+List~str~ warnings
}
class UpdateDbResponse {
+bool success
}
PreprocessingConfigSchema "1" *-- "1..*" TableConfigSchema : enthält
```
### 1. TableConfigSchema
Definiert die Konfiguration für eine einzelne Tabelle, die aus Power BI gelesen und verarbeitet werden soll.
**Felder:**
| Feld | Typ | Beschreibung | Beispiel |
|------|-----|--------------|----------|
| `name` | `str` | Name der Tabelle in der lokalen SQLite-Datenbank | `"ProductData"` |
| `powerbi_table_name` | `str` | Name der Quelltabelle im Power BI Dataset | `"products_raw"` |
| `measures` | `List[str]` | Liste von Power BI Measures, die abgerufen werden sollen (optional) | `["EP in CHF", "Gesamtbetrag"]` |
| `group_by_columns` | `List[str]` | Spalten für Gruppierung bei Measures (triggert SUMMARIZECOLUMNS) | `["m_Artikel", "Kategorie"]` |
| `steps` | `List[Dict[str, Any]]` | Liste von Preprocessing-Schritten | siehe unten |
**Unterstützte Preprocessing-Schritte:**
```mermaid
graph TD
A[Preprocessing Steps] --> B[keep]
A --> C[fillna]
A --> D[to_numeric]
A --> E[dropna]
B --> B1["columns: List[str]<br/>Behält nur angegebene Spalten"]
C --> C1["column: str, value: Any<br/>Füllt fehlende Werte"]
D --> D1["column: str, errors: str<br/>Konvertiert zu numerisch"]
E --> E1["subset: List[str]<br/>Entfernt Zeilen mit NaN"]
style A fill:#f96,stroke:#333,stroke-width:3px
style B fill:#9cf,stroke:#333,stroke-width:2px
style C fill:#9cf,stroke:#333,stroke-width:2px
style D fill:#9cf,stroke:#333,stroke-width:2px
style E fill:#9cf,stroke:#333,stroke-width:2px
```
**Beispiel:**
```json
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"measures": ["Total Sales", "Average Price"],
"group_by_columns": ["Category", "Supplier"],
"steps": [
{
"keep": {
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": ["ProductID", "ProductName", "Price"]
}
}
]
}
```
### 2. PreprocessingConfigSchema
Komplette Konfiguration für einen Preprocessing-Request. Kann mehrere Tabellen gleichzeitig verarbeiten.
**Felder:**
| Feld | Typ | Beschreibung | Validierung |
|------|-----|--------------|-------------|
| `tables` | `List[TableConfigSchema]` | Liste von Tabellenkonfigurationen | min. 1 Tabelle |
**Vollständiges Request-Beispiel:**
```json
{
"tables": [
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"steps": [
{
"keep": {
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": ["ProductID", "ProductName", "Stock", "Unit", "Price"]
}
}
]
},
{
"name": "CustomerData",
"powerbi_table_name": "customers_raw",
"steps": [
{
"keep": {
"columns": ["CustomerID", "Name", "Email", "Region"]
}
},
{
"fillna": {
"column": "Region",
"value": "Unknown"
}
}
]
}
]
}
```
### 3. UpdateDbResponse
Einfache Response für Standard-DB-Updates (YAML-basiert).
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
**Beispiel:**
```json
{
"success": true
}
```
### 4. UpdateDbWithConfigResponse
Detaillierte Response für JSON-basierte DB-Updates mit Konfiguration.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
| `tables_processed` | `int` | Anzahl der erfolgreich verarbeiteten Tabellen |
| `warnings` | `List[str]` | Liste von Warnungen während der Verarbeitung |
**Beispiel:**
```json
{
"success": true,
"tables_processed": 2,
"warnings": [
"Table 'ProductData': 3 rows dropped due to missing values",
"Table 'ProductData': Column 'Price' had 5 non-numeric values coerced to NaN"
]
}
```
### Preprocessing-Workflow
```mermaid
sequenceDiagram
participant Client
participant API
participant PowerBI
participant Preprocessor
participant SQLite
Client->>API: POST /dataprocessor/update-db-with-config
Note over Client,API: PreprocessingConfigSchema
loop Für jede Tabelle
API->>PowerBI: Daten abrufen (powerbi_table_name)
PowerBI-->>API: Raw DataFrame
API->>Preprocessor: Preprocessing-Steps anwenden
Preprocessor->>Preprocessor: keep → fillna → to_numeric → dropna
Preprocessor-->>API: Verarbeiteter DataFrame
API->>SQLite: Tabelle speichern (name)
end
API-->>Client: UpdateDbWithConfigResponse
Note over Client,API: success, tables_processed, warnings
```
---
## Data Query Service
Der Data Query Service ermöglicht SQL-Abfragen auf die gespeicherte SQLite-Datenbank. Die Schemas befinden sich in `src/dataquery/schemas.py`.
### Datenmodell-Hierarchie
```mermaid
classDiagram
class SqlQueryRequest {
+str query
}
class SqlQueryResponse {
+bool success
+List~Dict~ data
+List~str~ columns
+int row_count
+Optional~str~ message
}
class DatabaseSchemaResponse {
+bool success
+List~TableInfo~ tables
+int table_count
}
class TableInfo {
+str name
+int row_count
}
class TableSchemaResponse {
+bool success
+str table_name
+List~ColumnInfo~ columns
+int row_count
+List~Dict~ sample_data
}
class ColumnInfo {
+str name
+str type
+bool nullable
+bool primary_key
}
DatabaseSchemaResponse "1" *-- "*" TableInfo : enthält
TableSchemaResponse "1" *-- "*" ColumnInfo : enthält
```
### 1. SqlQueryRequest
Request-Schema für SQL-Abfragen.
**Felder:**
| Feld | Typ | Beschreibung | Validierung |
|------|-----|--------------|-------------|
| `query` | `str` | SQL-Query (nur SELECT erlaubt) | min. 1 Zeichen |
**Beispiel:**
```json
{
"query": "SELECT ProductID, ProductName, Price FROM ProductData WHERE Price > 10 ORDER BY Price DESC LIMIT 100"
}
```
### 2. SqlQueryResponse
Response-Schema für SQL-Query-Ergebnisse.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob die Query erfolgreich ausgeführt wurde |
| `data` | `List[Dict[str, Any]]` | Query-Ergebnisse als Liste von Dictionaries |
| `columns` | `List[str]` | Spaltennamen im Ergebnis |
| `row_count` | `int` | Anzahl der zurückgegebenen Zeilen |
| `message` | `Optional[str]` | Zusätzliche Informationen oder Fehlermeldung |
**Beispiel:**
```json
{
"success": true,
"columns": ["ProductID", "ProductName", "Price"],
"row_count": 3,
"data": [
{
"ProductID": 42,
"ProductName": "Premium Widget",
"Price": 99.99
},
{
"ProductID": 15,
"ProductName": "Deluxe Gadget",
"Price": 79.50
},
{
"ProductID": 8,
"ProductName": "Standard Tool",
"Price": 45.00
}
],
"message": null
}
```
### 3. TableInfo
Informationen über eine Datenbanktabelle.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `name` | `str` | Tabellenname |
| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
### 4. DatabaseSchemaResponse
Response-Schema für Datenbank-Schema-Informationen.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
| `tables` | `List[TableInfo]` | Liste aller Tabellen in der Datenbank |
| `table_count` | `int` | Gesamtanzahl der Tabellen |
**Beispiel:**
```json
{
"success": true,
"tables": [
{
"name": "ProductData",
"row_count": 1523
},
{
"name": "CustomerData",
"row_count": 847
},
{
"name": "OrderData",
"row_count": 3421
}
],
"table_count": 3
}
```
### 5. ColumnInfo
Informationen über eine Tabellenspalte.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `name` | `str` | Spaltenname |
| `type` | `str` | Datentyp der Spalte |
| `nullable` | `bool` | Ob die Spalte NULL-Werte enthalten kann |
| `primary_key` | `bool` | Ob die Spalte ein Primary Key ist |
### 6. TableSchemaResponse
Response-Schema für detaillierte Tabellenstruktur-Informationen.
**Felder:**
| Feld | Typ | Beschreibung |
|------|-----|--------------|
| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
| `table_name` | `str` | Name der Tabelle |
| `columns` | `List[ColumnInfo]` | Liste aller Spalten in der Tabelle |
| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
| `sample_data` | `List[Dict[str, Any]]` | Beispieldaten (bis zu 5 Zeilen) |
**Beispiel:**
```json
{
"success": true,
"table_name": "ProductData",
"columns": [
{
"name": "ProductID",
"type": "INTEGER",
"nullable": false,
"primary_key": true
},
{
"name": "ProductName",
"type": "TEXT",
"nullable": false,
"primary_key": false
},
{
"name": "Supplier",
"type": "TEXT",
"nullable": true,
"primary_key": false
},
{
"name": "Price",
"type": "REAL",
"nullable": true,
"primary_key": false
}
],
"row_count": 1523,
"sample_data": [
{
"ProductID": 1,
"ProductName": "Widget A",
"Supplier": "Acme Corp",
"Price": 12.50
},
{
"ProductID": 2,
"ProductName": "Gadget B",
"Supplier": "TechCo",
"Price": 24.99
},
{
"ProductID": 3,
"ProductName": "Tool C",
"Supplier": "Unknown",
"Price": 8.75
}
]
}
```
### Query-Workflow
```mermaid
sequenceDiagram
participant Client
participant API
participant SQLite
rect rgb(200, 220, 250)
Note over Client,SQLite: Szenario 1: SQL Query ausführen
Client->>API: POST /dataquery/query
Note over Client,API: SqlQueryRequest
API->>SQLite: SELECT ausführen
SQLite-->>API: Ergebnisse
API-->>Client: SqlQueryResponse
Note over Client,API: data, columns, row_count
end
rect rgb(220, 250, 220)
Note over Client,SQLite: Szenario 2: Datenbank-Schema abrufen
Client->>API: GET /dataquery/schema
API->>SQLite: Tabellen auflisten
SQLite-->>API: Tabellenliste
API-->>Client: DatabaseSchemaResponse
Note over Client,API: tables, table_count
end
rect rgb(250, 220, 220)
Note over Client,SQLite: Szenario 3: Tabellenstruktur abrufen
Client->>API: GET /dataquery/table/{table_name}
API->>SQLite: Spalten + Sample Data abrufen
SQLite-->>API: Struktur + Daten
API-->>Client: TableSchemaResponse
Note over Client,API: columns, sample_data
end
```
---
## Gesamtarchitektur
```mermaid
graph TB
subgraph "External Data Source"
PBI[Power BI Dataset]
end
subgraph "Gateway Preprocessing API"
subgraph "Data Processor Service"
DP_Router[Router]
DP_Service[Service]
DP_Schemas[Schemas]
DP_Domain[Domain Layer]
end
subgraph "Data Query Service"
DQ_Router[Router]
DQ_Service[Service]
DQ_Schemas[Schemas]
end
end
subgraph "Local Storage"
DB[(SQLite Database)]
end
subgraph "Clients"
Client1[Web App]
Client2[BI Tool]
Client3[API Consumer]
end
PBI -->|Power BI Reader| DP_Domain
DP_Router --> DP_Service
DP_Service --> DP_Domain
DP_Domain -->|Save| DB
DQ_Router --> DQ_Service
DQ_Service -->|Query| DB
Client1 -->|POST /dataprocessor/update-db-with-config| DP_Router
Client2 -->|POST /dataquery/query| DQ_Router
Client3 -->|GET /dataquery/schema| DQ_Router
style PBI fill:#f9f,stroke:#333,stroke-width:2px
style DB fill:#bbf,stroke:#333,stroke-width:3px
style DP_Schemas fill:#ffa,stroke:#333,stroke-width:2px
style DQ_Schemas fill:#ffa,stroke:#333,stroke-width:2px
```
---
## API-Endpunkte Übersicht
### Data Processor Endpoints
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|---------|----------|----------------|-----------------|--------------|
| POST | `/dataprocessor/update-db` | - | `UpdateDbResponse` | Aktualisiert DB mit YAML-Config |
| POST | `/dataprocessor/update-db-with-config` | `PreprocessingConfigSchema` | `UpdateDbWithConfigResponse` | Aktualisiert DB mit JSON-Config |
### Data Query Endpoints
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|---------|----------|----------------|-----------------|--------------|
| POST | `/dataquery/query` | `SqlQueryRequest` | `SqlQueryResponse` | Führt SQL-Query aus |
| GET | `/dataquery/schema` | - | `DatabaseSchemaResponse` | Gibt DB-Schema zurück |
| GET | `/dataquery/table/{table_name}` | - | `TableSchemaResponse` | Gibt Tabellenstruktur zurück |
---
## Typische Use Cases
### Use Case 1: Daten aus Power BI laden und verarbeiten
```mermaid
flowchart LR
A[Client] -->|1. POST PreprocessingConfigSchema| B[API]
B -->|2. Daten abrufen| C[Power BI]
C -->|3. Raw Data| B
B -->|4. Preprocessing| B
B -->|5. Speichern| D[(SQLite)]
B -->|6. UpdateDbWithConfigResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
style C fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#bbf,stroke:#333,stroke-width:2px
```
### Use Case 2: Daten abfragen
```mermaid
flowchart LR
A[Client] -->|1. POST SqlQueryRequest| B[API]
B -->|2. SELECT Query| C[(SQLite)]
C -->|3. Ergebnisse| B
B -->|4. SqlQueryResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
style C fill:#bbf,stroke:#333,stroke-width:2px
```
### Use Case 3: Datenbank-Struktur erkunden
```mermaid
flowchart TD
A[Client] -->|1. GET /schema| B[API]
B -->|2. DatabaseSchemaResponse| A
A -->|3. GET /table/ProductData| B
B -->|4. TableSchemaResponse| A
A -->|5. POST SqlQueryRequest| B
B -->|6. SqlQueryResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
```
---
## Validierung und Fehlerbehandlung
Alle Schemas verwenden Pydantic für automatische Validierung:
- **Typsicherheit**: Alle Felder werden auf korrekte Typen geprüft
- **Required Fields**: Pflichtfelder müssen vorhanden sein
- **Min/Max Validierung**: z.B. `min_items=1` für `tables`
- **String Validierung**: z.B. `min_length=1` für SQL-Queries
- **Custom Validation**: Zusätzliche Business-Logik in den Services
Bei Validierungsfehlern gibt die API einen HTTP 422 (Unprocessable Entity) mit detaillierten Fehlermeldungen zurück.

23
app.py Normal file
View file

@ -0,0 +1,23 @@
#!/usr/bin/env python3
"""
Azure entry point for PowerOn Preprocessing App.
This file redirects to the actual application in the src directory.
"""
import os
import sys
# Add current directory to Python path
sys.path.insert(0, os.path.dirname(__file__))
# Ensure DATA_DIR exists (Azure persistent storage at /home/data)
data_dir = os.environ.get("DATA_DIR", "/home/data")
os.makedirs(data_dir, exist_ok=True)
print(f"Data directory ready: {data_dir}")
# Import the actual application
from src.main import app
# For Azure App Service with uvicorn, expose the app at module level
# This allows uvicorn to find it as 'app:app'
application = app

View file

@ -1,8 +0,0 @@
PP_CONFIG_PATH="/Users/christopher/Documents/Repos/vo-customer-preprocessor/src/pp-config.yaml"
PP_API_KEY="kj823u90209mj020394jp2msakhfkjashjkf"
DB_ENDPOINT_API_KEY="ouho02j0rj2oijroi3rj2oijro23jr0990"
DB_URL="/Users/christopher/Documents/Repos/vo-customer-preprocessor/data/data_althaus.db"
POWERBI_DATASET_ID="0e72b1f1-3d32-4caa-bc1a-e523b6232343"
POWERBI_CLIENT_ID="9f6fa2cf-3fe1-4ed5-a430-bb7e408c0d87"
POWERBI_CLIENT_SECRET="Vdy8Q~Bm2_5ooy-pgYtEgvA9-LjRN2HiXFw6Ody0"
POWERBI_TENANT_ID="6a51aaeb-2467-4186-9504-2a05aedc591f"

View file

@ -4,6 +4,7 @@ from dataclasses import dataclass
from src.settings import settings
import pandas as pd
import httpx
import re
@dataclass
@ -13,31 +14,169 @@ class PowerBIReader:
table_name: str
base_url: str = settings.POWERBI_BASE_URL
include_nulls: bool = True
measures: list[str] = None
group_by_columns: list[str] = None
batch_size: int = 10000
order_by_column: str | None = None
max_batches: int = 100
@classmethod
async def create(
cls, dataset_id: str, access_token: str, table_name: str, **kwargs
):
cls,
*,
dataset_id: str,
access_token: str,
table_name: str,
measures: list[str] = None,
group_by_columns: list[str] = None,
batch_size: int = 10000,
order_by_column: str | None = None,
max_batches: int = 100,
**kwargs,
) -> "PowerBIReader":
return cls(
dataset_id=dataset_id,
access_token=access_token,
table_name=table_name,
measures=measures or [],
group_by_columns=group_by_columns or [],
batch_size=batch_size,
order_by_column=order_by_column,
max_batches=max_batches,
**kwargs,
)
def _dax_query(self) -> str:
# Escape single quotes in table names per DAX rules
safe = self.table_name.replace("'", "''")
return f"EVALUATE '{safe}'"
"""Generate DAX query based on configuration.
async def read_data(self) -> pd.DataFrame:
Generates different DAX queries depending on whether measures and/or
group_by_columns are specified:
1. No measures: EVALUATE 'TableName'
Returns all physical/calculated columns from the table.
2. Measures only: EVALUATE ADDCOLUMNS('TableName', "Measure1", [Measure1], ...)
Returns all columns plus the specified measures.
3. Measures + group_by_columns: EVALUATE SUMMARIZECOLUMNS('Table'[Col1], ..., "Measure1", [Measure1], ...)
Returns aggregated measures grouped by the specified columns.
Returns:
DAX query string to execute against Power BI.
"""
Calls Power BI REST API: POST /datasets/{datasetId}/executeQueries
with DAX: EVALUATE 'TableName' and returns a DataFrame.
# Remove XML/HTML tags from table name (e.g., '<oii>Artikel</oii>' -> 'Artikel')
cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
# Escape single quotes in table names per DAX rules
safe_table = cleaned_table_name.replace("'", "''")
# Case 1: No measures - simple table evaluation
if not self.measures:
return f"EVALUATE '{safe_table}'"
# Case 2: Measures without grouping - use ADDCOLUMNS
if not self.group_by_columns:
measure_clauses = ", ".join(
[f'"{measure}", [{measure}]' for measure in self.measures]
)
return f"EVALUATE ADDCOLUMNS('{safe_table}', {measure_clauses})"
# Case 3: Measures with grouping - use SUMMARIZECOLUMNS
group_cols = ", ".join(
[f"'{safe_table}'[{col}]" for col in self.group_by_columns]
)
measure_clauses = ", ".join(
[f'"{measure}", [{measure}]' for measure in self.measures]
)
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
@staticmethod
def _is_numeric_value(value: str | int | float | None) -> bool:
"""Check if a value is numeric (including numeric strings).
Args:
value: The value to check.
Returns:
True if the value is numeric or a string that represents a number.
"""
if value is None:
return False
if isinstance(value, (int, float)):
return True
if isinstance(value, str):
try:
float(value)
return True
except ValueError:
return False
return False
def _dax_query_batch(self, last_value: str | int | float | None = None) -> str:
"""Generate a batched DAX query using TOPN and keyset pagination.
Uses ORDER BY with the order_by_column for deterministic ordering,
and FILTER to skip already-fetched rows based on the last seen value.
Args:
last_value: The last value of order_by_column from the previous batch.
None for the first batch.
Returns:
DAX query string for fetching the next batch.
"""
# Remove XML/HTML tags from table name (e.g., '<oii>Artikel</oii>' -> 'Artikel')
cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
# Escape single quotes in table names per DAX rules
safe_table = cleaned_table_name.replace("'", "''")
order_col = self.order_by_column
if last_value is None:
# First batch: just use TOPN with ORDER BY
return (
f"EVALUATE TOPN({self.batch_size}, '{safe_table}', "
f"'{safe_table}'[{order_col}], ASC)"
)
# Subsequent batches: filter rows where order_col > last_value
# IMPORTANT: Handle type conversion to avoid DAX type mismatch errors
# Use a type-safe approach: convert column using IF/ISNUMBER to handle both text and numeric columns
if self._is_numeric_value(last_value):
numeric_val = float(last_value) if '.' in str(last_value) else int(last_value)
# Use IF to check if column is numeric, if so use it directly, otherwise convert with VALUE()
# This handles both text columns (VALUE converts) and numeric columns (use directly)
return (
f"EVALUATE TOPN({self.batch_size}, "
f"FILTER('{safe_table}', "
f"NOT(ISBLANK('{safe_table}'[{order_col}])) && "
f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])) > {numeric_val}), "
f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])), ASC)"
)
else:
# String comparison - escape quotes in the value
escaped_value = str(last_value).replace('"', '""')
return (
f"EVALUATE TOPN({self.batch_size}, "
f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > \"{escaped_value}\"), "
f"'{safe_table}'[{order_col}], ASC)"
)
async def _execute_query(self, dax_query: str) -> pd.DataFrame:
"""Execute a DAX query and return the results as a DataFrame.
Args:
dax_query: The DAX query string to execute.
Returns:
DataFrame containing the query results.
"""
# Log the DAX query for debugging
import logging
logger = logging.getLogger(__name__)
logger.info(f"Executing DAX query: {dax_query}")
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
body = {
"queries": [{"query": self._dax_query()}],
"queries": [{"query": dax_query}],
"serializerSettings": {"includeNulls": self.include_nulls},
}
@ -50,6 +189,7 @@ class PowerBIReader:
resp = await client.post(url, headers=headers, json=body)
if resp.status_code != 200:
logger.error(f"DAX query failed: {dax_query}")
raise RuntimeError(
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
)
@ -74,6 +214,116 @@ class PowerBIReader:
df.columns = [_strip_qual(c) for c in df.columns]
return df
async def read_data(self) -> pd.DataFrame:
"""Fetch data from Power BI, using batching if order_by_column is set.
If order_by_column is configured, fetches data in batches using
keyset pagination to avoid the Power BI API's 1M value limit.
Otherwise, fetches all data in a single query (legacy behavior).
Returns:
DataFrame containing all fetched data.
"""
# Legacy behavior: no batching if order_by_column not set
if not self.order_by_column:
return await self._execute_query(self._dax_query())
# Batch fetching with keyset pagination
import logging
logger = logging.getLogger(__name__)
all_dfs: list[pd.DataFrame] = []
last_value: str | int | None = None
batch_num = 0
total_rows = 0
while True:
batch_num += 1
# Safety limit to prevent runaway requests
if batch_num > self.max_batches:
logger.warning(
f"Reached max_batches limit ({self.max_batches}) for table "
f"'{self.table_name}'. Stopping batch fetch. "
f"Total rows fetched so far: {total_rows}"
)
break
dax_query = self._dax_query_batch(last_value)
df = await self._execute_query(dax_query)
if df.empty:
logger.info(f"Batch {batch_num}: Empty result, stopping fetch")
break
batch_rows = len(df)
total_rows += batch_rows
# Log batch info
if self.order_by_column in df.columns:
min_val = df[self.order_by_column].min()
max_val = df[self.order_by_column].max()
logger.info(
f"Batch {batch_num}: Fetched {batch_rows} rows, "
f"{self.order_by_column} range: {min_val} to {max_val}, "
f"Total so far: {total_rows}"
)
else:
logger.info(f"Batch {batch_num}: Fetched {batch_rows} rows, Total so far: {total_rows}")
all_dfs.append(df)
# Get the maximum value for the next batch (not just the last row)
# This ensures we don't skip rows when there are multiple rows per ID
# Using max() instead of iloc[-1] because rows are ordered by I_ID,
# but if there are multiple rows with the same I_ID, the last row might
# have a smaller I_ID than the maximum in the batch
max_value = df[self.order_by_column].max()
# Ensure numeric values are preserved as numeric (not converted to string)
# Check if the column is numeric and convert the value accordingly
if pd.api.types.is_numeric_dtype(df[self.order_by_column].dtype):
# Column is numeric - ensure value is numeric
if pd.isna(max_value):
logger.info(f"Batch {batch_num}: Max value is NaN, stopping")
break # Can't continue with NaN
# Convert to appropriate numeric type
if pd.api.types.is_integer_dtype(df[self.order_by_column].dtype):
new_last_value = int(max_value)
else:
new_last_value = float(max_value)
else:
new_last_value = max_value
# Safety check: if last_value didn't change, we're stuck in a loop
if new_last_value == last_value:
logger.warning(
f"Batch {batch_num}: last_value didn't change ({last_value}), "
f"stopping to avoid infinite loop"
)
break
last_value = new_last_value
logger.info(f"Finished fetching batches for table '{self.table_name}': {batch_num} batches, {total_rows} total rows")
if not all_dfs:
return pd.DataFrame()
result = pd.concat(all_dfs, ignore_index=True)
# Log statistics
if self.order_by_column and self.order_by_column in result.columns:
unique_ids = result[self.order_by_column].nunique()
total_rows_final = len(result)
logger.info(
f"Table '{self.table_name}': {total_rows_final} total rows, "
f"{unique_ids} unique {self.order_by_column} values, "
f"average {total_rows_final / unique_ids:.2f} rows per {self.order_by_column}"
)
return result
@staticmethod
def _get_access_token_sync(
tenant_id: str,

View file

@ -147,6 +147,11 @@ class TableConfig:
name: str
powerbi_table_name: str
measures: List[str] = field(default_factory=list)
group_by_columns: List[str] = field(default_factory=list)
batch_size: int = 10000
order_by_column: str | None = None
max_batches: int = 100
steps: List[Dict[str, Any]] = field(default_factory=list)
@ -184,6 +189,82 @@ class Preprocessor:
table_config = TableConfig(
name=table_data.get("name", ""),
powerbi_table_name=table_data.get("powerbi_table_name", ""),
measures=table_data.get("measures", []),
group_by_columns=table_data.get("group_by_columns", []),
steps=table_data.get("steps", []),
)
table_configs.append(table_config)
return cls(table_configs=table_configs)
@classmethod
async def create_from_config(cls, *, config: Dict[str, Any]) -> "Preprocessor":
"""Create a Preprocessor instance from a configuration dictionary.
This method allows creating a Preprocessor without relying on a YAML file,
enabling dynamic configuration via API requests with JSON payloads.
The configuration dictionary should follow the same structure as the YAML
configuration file, containing a "tables" key with a list of table
configurations.
Args:
config: Dictionary containing preprocessing configuration with structure:
{
"tables": [
{
"name": str,
"powerbi_table_name": str,
"steps": [
{step_type: {param1: value1, ...}},
...
]
},
...
]
}
Returns:
A new Preprocessor instance configured with the provided tables.
Raises:
ValueError: If the configuration dictionary is invalid or missing required keys.
Example:
>>> config = {
... "tables": [
... {
... "name": "Sales",
... "powerbi_table_name": "sales_raw",
... "steps": [
... {"keep": {"columns": ["Product", "Amount"]}},
... {"fillna": {"column": "Amount", "value": 0}}
... ]
... }
... ]
... }
>>> preprocessor = await Preprocessor.create_from_config(config=config)
"""
if not isinstance(config, dict):
raise ValueError("Configuration must be a dictionary")
if "tables" not in config:
raise ValueError("Configuration must contain a 'tables' key")
# Parse table configurations
table_configs = []
for table_data in config.get("tables", []):
if not isinstance(table_data, dict):
raise ValueError("Each table configuration must be a dictionary")
table_config = TableConfig(
name=table_data.get("name", ""),
powerbi_table_name=table_data.get("powerbi_table_name", ""),
measures=table_data.get("measures", []),
group_by_columns=table_data.get("group_by_columns", []),
batch_size=table_data.get("batch_size", 10000),
order_by_column=table_data.get("order_by_column"),
max_batches=table_data.get("max_batches", 100),
steps=table_data.get("steps", []),
)
table_configs.append(table_config)

View file

@ -4,7 +4,9 @@ import pandas as pd
import logging
from dataclasses import dataclass
from pathlib import Path
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from sqlalchemy.pool import NullPool
from src.dataprocessor.domain.base_datasaver import BaseDataSaver
@ -21,9 +23,15 @@ class SQLiteDataSaver(BaseDataSaver):
@classmethod
async def create(cls, db_path: str) -> "SQLiteDataSaver":
"""Create a new instance of DataSaver."""
# Build the full db url
db_url = f"sqlite+aiosqlite:///{db_path}"
engine = create_async_engine(db_url, future=True)
# Ensure the directory exists
db_file = Path(db_path)
db_file.parent.mkdir(parents=True, exist_ok=True)
# Build the full db url with absolute path (4 slashes total)
db_url = f"sqlite+aiosqlite:///{db_file.as_posix()}"
# Use NullPool to avoid connection pooling issues with SQLite
engine = create_async_engine(db_url, poolclass=NullPool, future=True)
return cls(db_url=db_url, engine=engine)
async def save_table(

View file

@ -2,10 +2,14 @@
# Set up router
import logging
from fastapi import APIRouter
from fastapi import Depends
from fastapi import APIRouter, Body
from fastapi import Security
from src.dataprocessor.schemas import UpdateDbResponse
from src.dataprocessor.schemas import (
UpdateDbResponse,
PreprocessingConfigSchema,
UpdateDbWithConfigResponse,
)
from src.dependencies import require_pp_api_key
from src.dataprocessor.service import DataProcessorService
@ -16,8 +20,223 @@ logger = logging.getLogger(__name__)
@router.post("/update-db")
async def update_db(*, _: None = Depends(require_pp_api_key)) -> UpdateDbResponse:
async def update_db(_: None = Security(require_pp_api_key)) -> UpdateDbResponse:
"""Endpoint to update the AI-database."""
service = await DataProcessorService.create()
await service.update_database()
return UpdateDbResponse(success=True)
@router.post("/update-db-with-config")
async def update_db_with_config(
config: PreprocessingConfigSchema = Body(...),
_: None = Security(require_pp_api_key),
) -> UpdateDbWithConfigResponse:
"""Update the database using a JSON configuration instead of a YAML file.
This endpoint provides a flexible, dynamic way to preprocess and update your
database without relying on a static YAML configuration file. You can specify
the entire preprocessing pipeline in the request body, allowing for on-demand
custom preprocessing operations.
## How It Works
1. **Reads data from Power BI**: For each table specified in the configuration,
data is fetched from the corresponding Power BI dataset table.
2. **Applies preprocessing steps**: Each table configuration includes a list of
preprocessing steps that are applied sequentially to transform the data.
3. **Saves to local database**: The processed data is saved to the local SQLite
database with the specified table name.
## Power BI Measures Support
In addition to retrieving physical/calculated columns from Power BI tables, you can
now retrieve Power BI measures using the optional `measures` and `group_by_columns`
fields in your table configuration.
### Retrieving Measures
Power BI measures are calculated values that live only in the model and are computed
at query time. To retrieve them alongside your table data, add a `measures` array
to your table configuration:
```json
{
"name": "Einkaufspreis",
"powerbi_table_name": "Einkaufspreis",
"measures": ["EP in CHF", "Gesamtbetrag in CHF"],
"steps": [...]
}
```
This uses the DAX ADDCOLUMNS pattern: `EVALUATE ADDCOLUMNS('TableName', "MeasureName", [MeasureName], ...)`
### Grouping Measures
If your measures need to be aggregated by specific columns, add the `group_by_columns`
field. This is useful when measures are defined with aggregation functions:
```json
{
"name": "Einkaufspreis_Aggregated",
"powerbi_table_name": "Einkaufspreis",
"measures": ["EP in CHF", "Gesamtbetrag in CHF"],
"group_by_columns": ["m_Artikel"],
"steps": [...]
}
```
This uses the DAX SUMMARIZECOLUMNS pattern: `EVALUATE SUMMARIZECOLUMNS('Table'[Column], "MeasureName", [MeasureName], ...)`
### Measure Name Formatting
- Measure names with spaces are automatically handled (e.g., "EP in CHF" becomes `[EP in CHF]` in DAX)
- If `measures` is empty or not provided, the standard table evaluation is used
- If `measures` is provided without `group_by_columns`, ADDCOLUMNS is used
- If both `measures` and `group_by_columns` are provided, SUMMARIZECOLUMNS is used
## Available Preprocessing Steps
The following preprocessing steps are supported. Each step is specified as a
dictionary with the step name as the key and its parameters as the value:
### keep
Keep only specified columns from the DataFrame.
- **Parameters**:
- `columns` (List[str]): List of column names to retain
- **Example**:
```json
{"keep": {"columns": ["ProductID", "ProductName", "Price"]}}
```
### fillna
Fill missing (NaN) values in a specified column.
- **Parameters**:
- `column` (str): Name of the column to fill
- `value` (Any): Value to use for filling NaN entries
- **Example**:
```json
{"fillna": {"column": "Supplier", "value": "Unknown"}}
```
### to_numeric
Convert a column to numeric type.
- **Parameters**:
- `column` (str): Name of the column to convert
- `errors` (str, optional): How to handle conversion errors
- "coerce": Convert invalid values to NaN
- "raise": Raise an exception for invalid values
- "ignore": Leave invalid values unchanged
- **Example**:
```json
{"to_numeric": {"column": "Price", "errors": "coerce"}}
```
### dropna
Drop rows that contain missing values in specified columns.
- **Parameters**:
- `subset` (List[str]): List of column names to check for NaN values
- **Example**:
```json
{"dropna": {"subset": ["ProductID", "Price"]}}
```
## Request Body Structure
The request body must contain a `tables` array, where each table object specifies:
- `name`: The name for the table in the local SQLite database
- `powerbi_table_name`: The source table name in the Power BI dataset
- `steps`: An ordered list of preprocessing steps to apply
## Complete Example
```json
{
"tables": [
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"steps": [
{
"keep": {
"columns": [
"ProductID",
"ProductName",
"Supplier",
"Stock",
"Unit",
"Price"
]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": [
"ProductID",
"ProductName",
"Stock",
"Unit",
"Price"
]
}
}
]
}
]
}
```
## Differences from /update-db Endpoint
- **No YAML file dependency**: Configuration is provided directly in the request
- **Dynamic configuration**: Different preprocessing pipelines can be specified per request
- **Warnings included**: Response includes any warnings encountered during preprocessing
- **Table count**: Response includes the number of tables successfully processed
## Response
Returns a `UpdateDbWithConfigResponse` containing:
- `success`: Boolean indicating if the operation was successful
- `tables_processed`: Number of tables that were successfully processed
- `warnings`: List of any warnings encountered during preprocessing (e.g., missing columns)
## Error Handling
The endpoint will raise an error if:
- The configuration structure is invalid
- No table configurations are provided
- No data is returned from Power BI for a specified table
- Preprocessing results in an empty DataFrame
- Database save operations fail
Args:
config: The preprocessing configuration specifying tables and their processing steps
_: Security dependency requiring valid API key
Returns:
UpdateDbWithConfigResponse with operation status, table count, and any warnings
Raises:
HTTPException: If authentication fails or processing errors occur
"""
service = await DataProcessorService.create()
tables_processed, warnings = await service.update_database_with_config(
config=config.model_dump()
)
return UpdateDbWithConfigResponse(
success=True, tables_processed=tables_processed, warnings=warnings
)

View file

@ -1,5 +1,6 @@
"""Schemas for the data processor service."""
from typing import Any, Dict, List
from pydantic import BaseModel, Field
@ -7,3 +8,162 @@ class UpdateDbResponse(BaseModel):
success: bool = Field(
..., description="Indicates if the database update was successful."
)
class TableConfigSchema(BaseModel):
"""Schema for a single table configuration.
Defines how a table should be read from Power BI and preprocessed.
Attributes:
name: The name to use for the table in the local SQLite database
powerbi_table_name: The name of the source table in Power BI dataset
measures: Optional list of Power BI measures to retrieve
group_by_columns: Optional list of columns to group by when retrieving measures
steps: List of preprocessing steps to apply to the table data
"""
name: str = Field(
..., description="Name for the table in the local database", example="Data"
)
powerbi_table_name: str = Field(
...,
description="Name of the table in the Power BI dataset",
example="data_full",
)
measures: List[str] = Field(
default_factory=list,
description="List of Power BI measure names to retrieve",
example=["EP in CHF", "Gesamtbetrag in CHF"],
)
group_by_columns: List[str] = Field(
default_factory=list,
description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)",
example=["m_Artikel"],
)
batch_size: int = Field(
default=10000,
description="Number of rows to fetch per batch (for large tables)",
example=10000,
)
order_by_column: str | None = Field(
default=None,
description="Column to order by for batch fetching. Required for batching to work.",
example="I_ID",
)
max_batches: int = Field(
default=100,
description="Maximum number of batches to fetch. Safety limit to prevent runaway requests.",
example=100,
)
steps: List[Dict[str, Any]] = Field(
default_factory=list,
description="List of preprocessing steps to apply",
example=[
{"keep": {"columns": ["col1", "col2"]}},
{"fillna": {"column": "col1", "value": "Unknown"}},
],
)
class PreprocessingConfigSchema(BaseModel):
"""Schema for the complete preprocessing configuration.
This schema defines the structure for JSON-based preprocessing configuration,
replacing the need for a YAML configuration file. It allows dynamic
configuration of table preprocessing operations per API request.
The configuration supports multiple tables, each with its own set of
preprocessing steps. Available preprocessing steps include:
- **keep**: Keep only specified columns
- Parameters: columns (List[str])
- Example: {"keep": {"columns": ["Name", "Price", "Quantity"]}}
- **fillna**: Fill missing values in a column
- Parameters: column (str), value (Any)
- Example: {"fillna": {"column": "Supplier", "value": "Unknown"}}
- **to_numeric**: Convert a column to numeric type
- Parameters: column (str), errors (str, optional)
- Example: {"to_numeric": {"column": "Price", "errors": "coerce"}}
- **dropna**: Drop rows with missing values in specified columns
- Parameters: subset (List[str])
- Example: {"dropna": {"subset": ["Name", "Price"]}}
Attributes:
tables: List of table configurations to process
Example Request Body:
{
"tables": [
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"steps": [
{
"keep": {
"columns": [
"ProductID",
"ProductName",
"Supplier",
"Stock",
"Unit",
"Price"
]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": [
"ProductID",
"ProductName",
"Stock",
"Unit",
"Price"
]
}
}
]
}
]
}
"""
tables: List[TableConfigSchema] = Field(
..., description="List of table configurations to process", min_items=1
)
class UpdateDbWithConfigResponse(BaseModel):
"""Response schema for the JSON-based database update endpoint.
Attributes:
success: Indicates if the database update was successful
tables_processed: Number of tables that were processed
warnings: List of any warnings encountered during preprocessing
"""
success: bool = Field(
..., description="Indicates if the database update was successful"
)
tables_processed: int = Field(
..., description="Number of tables that were successfully processed"
)
warnings: List[str] = Field(
default_factory=list,
description="List of warnings encountered during preprocessing",
)

View file

@ -54,6 +54,10 @@ class DataProcessorService:
dataset_id=settings.POWERBI_DATASET_ID,
access_token=self.access_token,
table_name=table_config.powerbi_table_name,
measures=table_config.measures,
group_by_columns=table_config.group_by_columns,
batch_size=getattr(table_config, 'batch_size', 10000),
order_by_column=getattr(table_config, 'order_by_column', None),
)
# Step 2: Read data from Power BI
@ -72,3 +76,109 @@ class DataProcessorService:
# Step 4: Update the local SQLite database
await self.data_saver.save_table(df, table_config.name, overwrite=True)
async def update_database_with_config(
self, *, config: dict
) -> tuple[int, list[str]]:
"""Update the database using a provided JSON configuration.
This method processes tables based on a configuration provided directly
as a dictionary (typically from a JSON request body), without relying
on a YAML configuration file. This enables dynamic, on-demand preprocessing
with custom configurations per API request.
The method follows the same workflow as update_database():
1. Reads data from Power BI for each configured table
2. Applies the specified preprocessing steps
3. Saves the processed data to the local SQLite database
Args:
config: Dictionary containing preprocessing configuration with structure:
{
"tables": [
{
"name": str,
"powerbi_table_name": str,
"steps": [
{step_type: {param1: value1, ...}},
...
]
},
...
]
}
Returns:
A tuple containing:
- int: Number of tables successfully processed
- List[str]: List of warnings collected during preprocessing
Raises:
RuntimeError: If no table configurations are found or if data processing fails.
ValueError: If the configuration structure is invalid.
Example:
>>> config = {
... "tables": [
... {
... "name": "Sales",
... "powerbi_table_name": "sales_raw",
... "steps": [
... {"keep": {"columns": ["Product", "Amount"]}},
... {"dropna": {"subset": ["Product", "Amount"]}}
... ]
... }
... ]
... }
>>> service = await DataProcessorService.create()
>>> tables_processed, warnings = await service.update_database_with_config(
... config=config
... )
"""
# Create a Preprocessor from the provided configuration
preprocessor = await Preprocessor.create_from_config(config=config)
table_configs = preprocessor.get_table_configs()
if not table_configs:
raise RuntimeError("No table configurations found in provided config.")
all_warnings = []
tables_processed = 0
for table_config in table_configs:
# Step 1: Create PowerBIReader for this table
power_bi_reader = await PowerBIReader.create(
dataset_id=settings.POWERBI_DATASET_ID,
access_token=self.access_token,
table_name=table_config.powerbi_table_name,
measures=table_config.measures,
group_by_columns=table_config.group_by_columns,
batch_size=table_config.batch_size,
order_by_column=table_config.order_by_column,
max_batches=table_config.max_batches,
)
# Step 2: Read data from Power BI
df = await power_bi_reader.read_data()
if df.empty:
raise RuntimeError(
f"No data read from Power BI for table '{table_config.name}'."
)
# Step 3: Preprocess the data using this table's steps
df = await preprocessor.preprocess(df, steps=table_config.steps)
if df.empty:
raise RuntimeError(
f"No data returned from preprocessing for table '{table_config.name}'."
)
# Collect any warnings from preprocessing
if preprocessor.last_report:
all_warnings.extend(preprocessor.last_report)
# Step 4: Update the local SQLite database
await self.data_saver.save_table(df, table_config.name, overwrite=True)
tables_processed += 1
return tables_processed, all_warnings

View file

@ -1,7 +1,9 @@
"""Router for data query endpoints."""
import logging
from fastapi import APIRouter, Depends, Path
from fastapi import APIRouter
from fastapi import Path
from fastapi import Security
from src.dataquery.schemas import (
DatabaseSchemaResponse,
@ -21,7 +23,7 @@ logger = logging.getLogger(__name__)
@router.post("/query", response_model=SqlQueryResponse)
async def execute_sql_query(
request: SqlQueryRequest,
_: None = Depends(require_db_api_key),
_: None = Security(require_db_api_key),
) -> SqlQueryResponse:
"""Execute a SELECT SQL query against the database.
@ -44,7 +46,7 @@ async def execute_sql_query(
@router.get("/schema", response_model=DatabaseSchemaResponse)
async def get_database_schema(
_: None = Depends(require_db_api_key),
_: None = Security(require_db_api_key),
) -> DatabaseSchemaResponse:
"""Get information about all tables in the database.
@ -64,7 +66,7 @@ async def get_database_schema(
@router.get("/schema/{table_name}", response_model=TableSchemaResponse)
async def get_table_schema(
table_name: str = Path(..., description="Name of the table to inspect"),
_: None = Depends(require_db_api_key),
_: None = Security(require_db_api_key),
) -> TableSchemaResponse:
"""Get detailed information about a specific table.

View file

@ -71,6 +71,53 @@ class DataQueryService:
return True
def _enforce_query_limit(self, *, query: str) -> str:
"""Enforce maximum row limit on query.
If query has LIMIT > SQL_ROW_LIMIT, replace with SQL_ROW_LIMIT.
If query has no LIMIT, append LIMIT SQL_ROW_LIMIT.
If query has LIMIT <= SQL_ROW_LIMIT, keep as is.
Args:
query: The SQL query to enforce limit on.
Returns:
Query with enforced limit.
"""
max_limit = settings.SQL_ROW_LIMIT
# Strip trailing semicolons and whitespace to prevent multi-statement errors
query = query.rstrip("; \t\n\r")
# Remove comments and normalize whitespace for parsing
cleaned_query = re.sub(r"--.*$", "", query, flags=re.MULTILINE)
cleaned_query = re.sub(r"/\*.*?\*/", "", cleaned_query, flags=re.DOTALL)
# Look for LIMIT clause (case insensitive)
# Pattern matches: LIMIT <number> or LIMIT <number> OFFSET <number>
limit_pattern = r"\bLIMIT\s+(\d+)(\s+OFFSET\s+\d+)?\s*$"
match = re.search(limit_pattern, cleaned_query, re.IGNORECASE)
if match:
# Extract the current limit value
current_limit = int(match.group(1))
if current_limit > max_limit:
# Replace with max_limit while preserving OFFSET if present
offset_clause = match.group(2) or ""
# Find the position in the original query to replace
# Use the original query to preserve formatting
original_match = re.search(limit_pattern, query, re.IGNORECASE)
if original_match:
new_limit_clause = f"LIMIT {max_limit}{offset_clause}"
query = query[: original_match.start()] + new_limit_clause
# If current_limit <= max_limit, keep query as is
else:
# No LIMIT clause found, append one
query = f"{query.rstrip()} LIMIT {max_limit}"
return query
async def execute_query(self, *, query: str) -> SqlQueryResponse:
"""Execute a SELECT query and return the results.
@ -90,6 +137,9 @@ class DataQueryService:
message="Only SELECT queries are allowed",
)
# Enforce row limit on the query
query = self._enforce_query_limit(query=query)
try:
async with self.engine.begin() as conn:
result = await conn.execute(text(query))

View file

@ -1,6 +1,7 @@
"""Dependencies for FastAPI routers."""
from fastapi import HTTPException
from fastapi import Security
from fastapi import status
from fastapi.security import APIKeyHeader
from fastapi.security.base import SecurityBase
@ -12,10 +13,11 @@ from src.settings import settings
api_key_header: SecurityBase = APIKeyHeader(
name="X-PP-API-Key",
description="API key for preprocessor access",
scheme_name="PreprocessorKey",
)
def require_pp_api_key(*, api_key: str = api_key_header) -> None:
def require_pp_api_key(api_key: str = Security(api_key_header)) -> None:
"""Validate the preprocessor API key.
Args:
@ -36,10 +38,11 @@ def require_pp_api_key(*, api_key: str = api_key_header) -> None:
db_api_key_header: SecurityBase = APIKeyHeader(
name="X-DB-API-Key",
description="API key for database query access",
scheme_name="DatabaseKey",
)
def require_db_api_key(*, api_key: str = db_api_key_header) -> None:
def require_db_api_key(api_key: str = Security(db_api_key_header)) -> None:
"""Validate the database API key.
Args:

View file

@ -30,3 +30,30 @@ tables:
"Einheit",
"EP in CHF",
]
# Example: Retrieving Power BI measures with ADDCOLUMNS
# Uncomment to retrieve measures alongside all table columns
# - name: "Einkaufspreis_With_Measures"
# powerbi_table_name: "Einkaufspreis"
# measures:
# - "EP in CHF"
# - "Gesamtbetrag in CHF"
# steps:
# - to_numeric:
# column: "EP_CHF"
# errors: "coerce"
# - dropna:
# subset: ["EP_CHF"]
# Example: Retrieving aggregated measures with SUMMARIZECOLUMNS
# Uncomment to retrieve measures grouped by specific columns
# - name: "Einkaufspreis_Aggregated"
# powerbi_table_name: "Einkaufspreis"
# measures:
# - "EP in CHF"
# - "Gesamtbetrag in CHF"
# group_by_columns:
# - "m_Artikel"
# steps:
# - dropna:
# subset: ["m_Artikel"]

View file

@ -1,5 +1,6 @@
"""General application settings."""
import os
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
@ -21,8 +22,20 @@ class Settings(BaseSettings):
# SQLite database file path.
# For in memory, use ":memory:" (not persistent).
# Uses DATA_DIR environment variable for Azure persistent storage
DB_PATH: str = Field(
"data/database.sqlite", description="Path to the SQLite database."
default_factory=lambda: os.path.join(
os.environ.get("DATA_DIR", "data"), "database.sqlite"
),
description="Path to the SQLite database.",
)
# --- Database Query Settings ---
# Maximum number of rows to return from SQL queries.
SQL_ROW_LIMIT: int = Field(
default=50,
description="Maximum number of rows to return from SQL queries. Defaults to 50.",
)
# --- API Keys ---

View file

@ -0,0 +1 @@
"""Data query tests."""