Compare commits
35 commits
feat/multi
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 8b2ac4d467 | |||
| 21d84a099c | |||
| b9e73a728d | |||
| 9bca05dea2 | |||
|
|
3a28fbd5e6 | ||
|
|
c2f11b92fd | ||
|
|
f5f8dfcb80 | ||
|
|
af6150e2fb | ||
|
|
5b8daa4e49 | ||
|
|
3fbb41b980 | ||
|
|
471ad42912 | ||
|
|
dde61f447d | ||
|
|
e7dd3ea999 | ||
|
|
d9deb09d1b | ||
|
|
837e0c1d29 | ||
|
|
40fc3db398 | ||
|
|
9fb20a6cda | ||
|
|
9247e88dc3 | ||
|
|
91bffe884d | ||
|
|
21c83fbbac | ||
|
|
8391681743 | ||
|
|
2dc215e197 | ||
|
|
0dca581300 | ||
|
|
30f4b0e781 | ||
|
|
56869051df | ||
|
|
833df786a8 | ||
|
|
1fe6a06a5f | ||
|
|
b3193ee56d | ||
|
|
845636a424 | ||
|
|
40a92ccafd | ||
|
|
187127458e | ||
|
|
2567e8e513 | ||
|
|
9a86921dd1 | ||
|
|
a77069e9bb | ||
|
|
d417c84d77 |
17 changed files with 1689 additions and 27 deletions
13
.env.example
13
.env.example
|
|
@ -1,6 +1,10 @@
|
|||
# Preprocessor Configuration
|
||||
# Path to the preprocessor configuration YAML file
|
||||
PP_CONFIG_PATH="/path/to/your/pp-config.yaml"
|
||||
# Path to the preprocessor configuration YAML file (relative to project root)
|
||||
# OPTIONAL: Only if you want to use /api/v1/dataprocessor/update-db endpoint
|
||||
# Otherwise you can directly pass preprocessor config in the request body
|
||||
# at /api/v1/dataprocessor/update-db-with-config
|
||||
# Example YAML: pp-config.yaml in the src/ directory
|
||||
PP_CONFIG_PATH="src/pp-config.yaml"
|
||||
|
||||
# API Keys
|
||||
# API key for the preprocessor service
|
||||
|
|
@ -10,8 +14,9 @@ PP_API_KEY="your-preprocessor-api-key-here"
|
|||
DB_ENDPOINT_API_KEY="your-database-endpoint-api-key-here"
|
||||
|
||||
# Database Configuration
|
||||
# Path to the SQLite database file
|
||||
DB_URL="/path/to/your/database.db"
|
||||
# Note: DB_PATH is automatically set based on DATA_DIR environment variable
|
||||
# Local: defaults to data/database.sqlite
|
||||
# Azure: set DATA_DIR=/home/_data in Azure App Settings
|
||||
|
||||
# Power BI Configuration
|
||||
# Power BI dataset identifier
|
||||
|
|
|
|||
667
README.md
667
README.md
|
|
@ -0,0 +1,667 @@
|
|||
# Gateway Preprocessing - Datenschnittstellen (Datenmodell)
|
||||
|
||||
## Übersicht
|
||||
|
||||
Dieses Projekt bietet zwei Hauptservices mit klar definierten Datenschnittstellen:
|
||||
|
||||
1. **Data Processor Service** (`/dataprocessor`) - Lädt Daten aus Power BI, verarbeitet sie und speichert sie in SQLite
|
||||
2. **Data Query Service** (`/dataquery`) - Ermöglicht SQL-Abfragen auf die gespeicherten Daten
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
A[Power BI Dataset] -->|Data Processor| B[SQLite Database]
|
||||
B -->|Data Query| C[Client/API Consumer]
|
||||
|
||||
style A fill:#f9f,stroke:#333,stroke-width:2px
|
||||
style B fill:#bbf,stroke:#333,stroke-width:2px
|
||||
style C fill:#bfb,stroke:#333,stroke-width:2px
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Data Processor Service
|
||||
|
||||
Der Data Processor Service verarbeitet Daten aus Power BI und speichert sie in einer lokalen SQLite-Datenbank. Die Schemas befinden sich in `src/dataprocessor/schemas.py`.
|
||||
|
||||
### Datenmodell-Hierarchie
|
||||
|
||||
```mermaid
|
||||
classDiagram
|
||||
class PreprocessingConfigSchema {
|
||||
+List~TableConfigSchema~ tables
|
||||
}
|
||||
|
||||
class TableConfigSchema {
|
||||
+str name
|
||||
+str powerbi_table_name
|
||||
+List~str~ measures
|
||||
+List~str~ group_by_columns
|
||||
+List~Dict~ steps
|
||||
}
|
||||
|
||||
class UpdateDbWithConfigResponse {
|
||||
+bool success
|
||||
+int tables_processed
|
||||
+List~str~ warnings
|
||||
}
|
||||
|
||||
class UpdateDbResponse {
|
||||
+bool success
|
||||
}
|
||||
|
||||
PreprocessingConfigSchema "1" *-- "1..*" TableConfigSchema : enthält
|
||||
```
|
||||
|
||||
### 1. TableConfigSchema
|
||||
|
||||
Definiert die Konfiguration für eine einzelne Tabelle, die aus Power BI gelesen und verarbeitet werden soll.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung | Beispiel |
|
||||
|------|-----|--------------|----------|
|
||||
| `name` | `str` | Name der Tabelle in der lokalen SQLite-Datenbank | `"ProductData"` |
|
||||
| `powerbi_table_name` | `str` | Name der Quelltabelle im Power BI Dataset | `"products_raw"` |
|
||||
| `measures` | `List[str]` | Liste von Power BI Measures, die abgerufen werden sollen (optional) | `["EP in CHF", "Gesamtbetrag"]` |
|
||||
| `group_by_columns` | `List[str]` | Spalten für Gruppierung bei Measures (triggert SUMMARIZECOLUMNS) | `["m_Artikel", "Kategorie"]` |
|
||||
| `steps` | `List[Dict[str, Any]]` | Liste von Preprocessing-Schritten | siehe unten |
|
||||
|
||||
**Unterstützte Preprocessing-Schritte:**
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
A[Preprocessing Steps] --> B[keep]
|
||||
A --> C[fillna]
|
||||
A --> D[to_numeric]
|
||||
A --> E[dropna]
|
||||
|
||||
B --> B1["columns: List[str]<br/>Behält nur angegebene Spalten"]
|
||||
C --> C1["column: str, value: Any<br/>Füllt fehlende Werte"]
|
||||
D --> D1["column: str, errors: str<br/>Konvertiert zu numerisch"]
|
||||
E --> E1["subset: List[str]<br/>Entfernt Zeilen mit NaN"]
|
||||
|
||||
style A fill:#f96,stroke:#333,stroke-width:3px
|
||||
style B fill:#9cf,stroke:#333,stroke-width:2px
|
||||
style C fill:#9cf,stroke:#333,stroke-width:2px
|
||||
style D fill:#9cf,stroke:#333,stroke-width:2px
|
||||
style E fill:#9cf,stroke:#333,stroke-width:2px
|
||||
```
|
||||
|
||||
**Beispiel:**
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "ProductData",
|
||||
"powerbi_table_name": "products_raw",
|
||||
"measures": ["Total Sales", "Average Price"],
|
||||
"group_by_columns": ["Category", "Supplier"],
|
||||
"steps": [
|
||||
{
|
||||
"keep": {
|
||||
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"fillna": {
|
||||
"column": "Supplier",
|
||||
"value": "Unknown"
|
||||
}
|
||||
},
|
||||
{
|
||||
"to_numeric": {
|
||||
"column": "Price",
|
||||
"errors": "coerce"
|
||||
}
|
||||
},
|
||||
{
|
||||
"dropna": {
|
||||
"subset": ["ProductID", "ProductName", "Price"]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### 2. PreprocessingConfigSchema
|
||||
|
||||
Komplette Konfiguration für einen Preprocessing-Request. Kann mehrere Tabellen gleichzeitig verarbeiten.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung | Validierung |
|
||||
|------|-----|--------------|-------------|
|
||||
| `tables` | `List[TableConfigSchema]` | Liste von Tabellenkonfigurationen | min. 1 Tabelle |
|
||||
|
||||
**Vollständiges Request-Beispiel:**
|
||||
|
||||
```json
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": "ProductData",
|
||||
"powerbi_table_name": "products_raw",
|
||||
"steps": [
|
||||
{
|
||||
"keep": {
|
||||
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"fillna": {
|
||||
"column": "Supplier",
|
||||
"value": "Unknown"
|
||||
}
|
||||
},
|
||||
{
|
||||
"to_numeric": {
|
||||
"column": "Price",
|
||||
"errors": "coerce"
|
||||
}
|
||||
},
|
||||
{
|
||||
"dropna": {
|
||||
"subset": ["ProductID", "ProductName", "Stock", "Unit", "Price"]
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "CustomerData",
|
||||
"powerbi_table_name": "customers_raw",
|
||||
"steps": [
|
||||
{
|
||||
"keep": {
|
||||
"columns": ["CustomerID", "Name", "Email", "Region"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"fillna": {
|
||||
"column": "Region",
|
||||
"value": "Unknown"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### 3. UpdateDbResponse
|
||||
|
||||
Einfache Response für Standard-DB-Updates (YAML-basiert).
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung |
|
||||
|------|-----|--------------|
|
||||
| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
|
||||
|
||||
**Beispiel:**
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true
|
||||
}
|
||||
```
|
||||
|
||||
### 4. UpdateDbWithConfigResponse
|
||||
|
||||
Detaillierte Response für JSON-basierte DB-Updates mit Konfiguration.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung |
|
||||
|------|-----|--------------|
|
||||
| `success` | `bool` | Gibt an, ob das Update erfolgreich war |
|
||||
| `tables_processed` | `int` | Anzahl der erfolgreich verarbeiteten Tabellen |
|
||||
| `warnings` | `List[str]` | Liste von Warnungen während der Verarbeitung |
|
||||
|
||||
**Beispiel:**
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"tables_processed": 2,
|
||||
"warnings": [
|
||||
"Table 'ProductData': 3 rows dropped due to missing values",
|
||||
"Table 'ProductData': Column 'Price' had 5 non-numeric values coerced to NaN"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Preprocessing-Workflow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Client
|
||||
participant API
|
||||
participant PowerBI
|
||||
participant Preprocessor
|
||||
participant SQLite
|
||||
|
||||
Client->>API: POST /dataprocessor/update-db-with-config
|
||||
Note over Client,API: PreprocessingConfigSchema
|
||||
|
||||
loop Für jede Tabelle
|
||||
API->>PowerBI: Daten abrufen (powerbi_table_name)
|
||||
PowerBI-->>API: Raw DataFrame
|
||||
API->>Preprocessor: Preprocessing-Steps anwenden
|
||||
Preprocessor->>Preprocessor: keep → fillna → to_numeric → dropna
|
||||
Preprocessor-->>API: Verarbeiteter DataFrame
|
||||
API->>SQLite: Tabelle speichern (name)
|
||||
end
|
||||
|
||||
API-->>Client: UpdateDbWithConfigResponse
|
||||
Note over Client,API: success, tables_processed, warnings
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Data Query Service
|
||||
|
||||
Der Data Query Service ermöglicht SQL-Abfragen auf die gespeicherte SQLite-Datenbank. Die Schemas befinden sich in `src/dataquery/schemas.py`.
|
||||
|
||||
### Datenmodell-Hierarchie
|
||||
|
||||
```mermaid
|
||||
classDiagram
|
||||
class SqlQueryRequest {
|
||||
+str query
|
||||
}
|
||||
|
||||
class SqlQueryResponse {
|
||||
+bool success
|
||||
+List~Dict~ data
|
||||
+List~str~ columns
|
||||
+int row_count
|
||||
+Optional~str~ message
|
||||
}
|
||||
|
||||
class DatabaseSchemaResponse {
|
||||
+bool success
|
||||
+List~TableInfo~ tables
|
||||
+int table_count
|
||||
}
|
||||
|
||||
class TableInfo {
|
||||
+str name
|
||||
+int row_count
|
||||
}
|
||||
|
||||
class TableSchemaResponse {
|
||||
+bool success
|
||||
+str table_name
|
||||
+List~ColumnInfo~ columns
|
||||
+int row_count
|
||||
+List~Dict~ sample_data
|
||||
}
|
||||
|
||||
class ColumnInfo {
|
||||
+str name
|
||||
+str type
|
||||
+bool nullable
|
||||
+bool primary_key
|
||||
}
|
||||
|
||||
DatabaseSchemaResponse "1" *-- "*" TableInfo : enthält
|
||||
TableSchemaResponse "1" *-- "*" ColumnInfo : enthält
|
||||
```
|
||||
|
||||
### 1. SqlQueryRequest
|
||||
|
||||
Request-Schema für SQL-Abfragen.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung | Validierung |
|
||||
|------|-----|--------------|-------------|
|
||||
| `query` | `str` | SQL-Query (nur SELECT erlaubt) | min. 1 Zeichen |
|
||||
|
||||
**Beispiel:**
|
||||
|
||||
```json
|
||||
{
|
||||
"query": "SELECT ProductID, ProductName, Price FROM ProductData WHERE Price > 10 ORDER BY Price DESC LIMIT 100"
|
||||
}
|
||||
```
|
||||
|
||||
### 2. SqlQueryResponse
|
||||
|
||||
Response-Schema für SQL-Query-Ergebnisse.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung |
|
||||
|------|-----|--------------|
|
||||
| `success` | `bool` | Gibt an, ob die Query erfolgreich ausgeführt wurde |
|
||||
| `data` | `List[Dict[str, Any]]` | Query-Ergebnisse als Liste von Dictionaries |
|
||||
| `columns` | `List[str]` | Spaltennamen im Ergebnis |
|
||||
| `row_count` | `int` | Anzahl der zurückgegebenen Zeilen |
|
||||
| `message` | `Optional[str]` | Zusätzliche Informationen oder Fehlermeldung |
|
||||
|
||||
**Beispiel:**
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"columns": ["ProductID", "ProductName", "Price"],
|
||||
"row_count": 3,
|
||||
"data": [
|
||||
{
|
||||
"ProductID": 42,
|
||||
"ProductName": "Premium Widget",
|
||||
"Price": 99.99
|
||||
},
|
||||
{
|
||||
"ProductID": 15,
|
||||
"ProductName": "Deluxe Gadget",
|
||||
"Price": 79.50
|
||||
},
|
||||
{
|
||||
"ProductID": 8,
|
||||
"ProductName": "Standard Tool",
|
||||
"Price": 45.00
|
||||
}
|
||||
],
|
||||
"message": null
|
||||
}
|
||||
```
|
||||
|
||||
### 3. TableInfo
|
||||
|
||||
Informationen über eine Datenbanktabelle.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung |
|
||||
|------|-----|--------------|
|
||||
| `name` | `str` | Tabellenname |
|
||||
| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
|
||||
|
||||
### 4. DatabaseSchemaResponse
|
||||
|
||||
Response-Schema für Datenbank-Schema-Informationen.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung |
|
||||
|------|-----|--------------|
|
||||
| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
|
||||
| `tables` | `List[TableInfo]` | Liste aller Tabellen in der Datenbank |
|
||||
| `table_count` | `int` | Gesamtanzahl der Tabellen |
|
||||
|
||||
**Beispiel:**
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"tables": [
|
||||
{
|
||||
"name": "ProductData",
|
||||
"row_count": 1523
|
||||
},
|
||||
{
|
||||
"name": "CustomerData",
|
||||
"row_count": 847
|
||||
},
|
||||
{
|
||||
"name": "OrderData",
|
||||
"row_count": 3421
|
||||
}
|
||||
],
|
||||
"table_count": 3
|
||||
}
|
||||
```
|
||||
|
||||
### 5. ColumnInfo
|
||||
|
||||
Informationen über eine Tabellenspalte.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung |
|
||||
|------|-----|--------------|
|
||||
| `name` | `str` | Spaltenname |
|
||||
| `type` | `str` | Datentyp der Spalte |
|
||||
| `nullable` | `bool` | Ob die Spalte NULL-Werte enthalten kann |
|
||||
| `primary_key` | `bool` | Ob die Spalte ein Primary Key ist |
|
||||
|
||||
### 6. TableSchemaResponse
|
||||
|
||||
Response-Schema für detaillierte Tabellenstruktur-Informationen.
|
||||
|
||||
**Felder:**
|
||||
|
||||
| Feld | Typ | Beschreibung |
|
||||
|------|-----|--------------|
|
||||
| `success` | `bool` | Gibt an, ob die Schema-Abfrage erfolgreich war |
|
||||
| `table_name` | `str` | Name der Tabelle |
|
||||
| `columns` | `List[ColumnInfo]` | Liste aller Spalten in der Tabelle |
|
||||
| `row_count` | `int` | Anzahl der Zeilen in der Tabelle |
|
||||
| `sample_data` | `List[Dict[str, Any]]` | Beispieldaten (bis zu 5 Zeilen) |
|
||||
|
||||
**Beispiel:**
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"table_name": "ProductData",
|
||||
"columns": [
|
||||
{
|
||||
"name": "ProductID",
|
||||
"type": "INTEGER",
|
||||
"nullable": false,
|
||||
"primary_key": true
|
||||
},
|
||||
{
|
||||
"name": "ProductName",
|
||||
"type": "TEXT",
|
||||
"nullable": false,
|
||||
"primary_key": false
|
||||
},
|
||||
{
|
||||
"name": "Supplier",
|
||||
"type": "TEXT",
|
||||
"nullable": true,
|
||||
"primary_key": false
|
||||
},
|
||||
{
|
||||
"name": "Price",
|
||||
"type": "REAL",
|
||||
"nullable": true,
|
||||
"primary_key": false
|
||||
}
|
||||
],
|
||||
"row_count": 1523,
|
||||
"sample_data": [
|
||||
{
|
||||
"ProductID": 1,
|
||||
"ProductName": "Widget A",
|
||||
"Supplier": "Acme Corp",
|
||||
"Price": 12.50
|
||||
},
|
||||
{
|
||||
"ProductID": 2,
|
||||
"ProductName": "Gadget B",
|
||||
"Supplier": "TechCo",
|
||||
"Price": 24.99
|
||||
},
|
||||
{
|
||||
"ProductID": 3,
|
||||
"ProductName": "Tool C",
|
||||
"Supplier": "Unknown",
|
||||
"Price": 8.75
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Query-Workflow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Client
|
||||
participant API
|
||||
participant SQLite
|
||||
|
||||
rect rgb(200, 220, 250)
|
||||
Note over Client,SQLite: Szenario 1: SQL Query ausführen
|
||||
Client->>API: POST /dataquery/query
|
||||
Note over Client,API: SqlQueryRequest
|
||||
API->>SQLite: SELECT ausführen
|
||||
SQLite-->>API: Ergebnisse
|
||||
API-->>Client: SqlQueryResponse
|
||||
Note over Client,API: data, columns, row_count
|
||||
end
|
||||
|
||||
rect rgb(220, 250, 220)
|
||||
Note over Client,SQLite: Szenario 2: Datenbank-Schema abrufen
|
||||
Client->>API: GET /dataquery/schema
|
||||
API->>SQLite: Tabellen auflisten
|
||||
SQLite-->>API: Tabellenliste
|
||||
API-->>Client: DatabaseSchemaResponse
|
||||
Note over Client,API: tables, table_count
|
||||
end
|
||||
|
||||
rect rgb(250, 220, 220)
|
||||
Note over Client,SQLite: Szenario 3: Tabellenstruktur abrufen
|
||||
Client->>API: GET /dataquery/table/{table_name}
|
||||
API->>SQLite: Spalten + Sample Data abrufen
|
||||
SQLite-->>API: Struktur + Daten
|
||||
API-->>Client: TableSchemaResponse
|
||||
Note over Client,API: columns, sample_data
|
||||
end
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Gesamtarchitektur
|
||||
|
||||
```mermaid
|
||||
graph TB
|
||||
subgraph "External Data Source"
|
||||
PBI[Power BI Dataset]
|
||||
end
|
||||
|
||||
subgraph "Gateway Preprocessing API"
|
||||
subgraph "Data Processor Service"
|
||||
DP_Router[Router]
|
||||
DP_Service[Service]
|
||||
DP_Schemas[Schemas]
|
||||
DP_Domain[Domain Layer]
|
||||
end
|
||||
|
||||
subgraph "Data Query Service"
|
||||
DQ_Router[Router]
|
||||
DQ_Service[Service]
|
||||
DQ_Schemas[Schemas]
|
||||
end
|
||||
end
|
||||
|
||||
subgraph "Local Storage"
|
||||
DB[(SQLite Database)]
|
||||
end
|
||||
|
||||
subgraph "Clients"
|
||||
Client1[Web App]
|
||||
Client2[BI Tool]
|
||||
Client3[API Consumer]
|
||||
end
|
||||
|
||||
PBI -->|Power BI Reader| DP_Domain
|
||||
DP_Router --> DP_Service
|
||||
DP_Service --> DP_Domain
|
||||
DP_Domain -->|Save| DB
|
||||
|
||||
DQ_Router --> DQ_Service
|
||||
DQ_Service -->|Query| DB
|
||||
|
||||
Client1 -->|POST /dataprocessor/update-db-with-config| DP_Router
|
||||
Client2 -->|POST /dataquery/query| DQ_Router
|
||||
Client3 -->|GET /dataquery/schema| DQ_Router
|
||||
|
||||
style PBI fill:#f9f,stroke:#333,stroke-width:2px
|
||||
style DB fill:#bbf,stroke:#333,stroke-width:3px
|
||||
style DP_Schemas fill:#ffa,stroke:#333,stroke-width:2px
|
||||
style DQ_Schemas fill:#ffa,stroke:#333,stroke-width:2px
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## API-Endpunkte Übersicht
|
||||
|
||||
### Data Processor Endpoints
|
||||
|
||||
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|
||||
|---------|----------|----------------|-----------------|--------------|
|
||||
| POST | `/dataprocessor/update-db` | - | `UpdateDbResponse` | Aktualisiert DB mit YAML-Config |
|
||||
| POST | `/dataprocessor/update-db-with-config` | `PreprocessingConfigSchema` | `UpdateDbWithConfigResponse` | Aktualisiert DB mit JSON-Config |
|
||||
|
||||
### Data Query Endpoints
|
||||
|
||||
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|
||||
|---------|----------|----------------|-----------------|--------------|
|
||||
| POST | `/dataquery/query` | `SqlQueryRequest` | `SqlQueryResponse` | Führt SQL-Query aus |
|
||||
| GET | `/dataquery/schema` | - | `DatabaseSchemaResponse` | Gibt DB-Schema zurück |
|
||||
| GET | `/dataquery/table/{table_name}` | - | `TableSchemaResponse` | Gibt Tabellenstruktur zurück |
|
||||
|
||||
---
|
||||
|
||||
## Typische Use Cases
|
||||
|
||||
### Use Case 1: Daten aus Power BI laden und verarbeiten
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
A[Client] -->|1. POST PreprocessingConfigSchema| B[API]
|
||||
B -->|2. Daten abrufen| C[Power BI]
|
||||
C -->|3. Raw Data| B
|
||||
B -->|4. Preprocessing| B
|
||||
B -->|5. Speichern| D[(SQLite)]
|
||||
B -->|6. UpdateDbWithConfigResponse| A
|
||||
|
||||
style A fill:#bfb,stroke:#333,stroke-width:2px
|
||||
style C fill:#f9f,stroke:#333,stroke-width:2px
|
||||
style D fill:#bbf,stroke:#333,stroke-width:2px
|
||||
```
|
||||
|
||||
### Use Case 2: Daten abfragen
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
A[Client] -->|1. POST SqlQueryRequest| B[API]
|
||||
B -->|2. SELECT Query| C[(SQLite)]
|
||||
C -->|3. Ergebnisse| B
|
||||
B -->|4. SqlQueryResponse| A
|
||||
|
||||
style A fill:#bfb,stroke:#333,stroke-width:2px
|
||||
style C fill:#bbf,stroke:#333,stroke-width:2px
|
||||
```
|
||||
|
||||
### Use Case 3: Datenbank-Struktur erkunden
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
A[Client] -->|1. GET /schema| B[API]
|
||||
B -->|2. DatabaseSchemaResponse| A
|
||||
A -->|3. GET /table/ProductData| B
|
||||
B -->|4. TableSchemaResponse| A
|
||||
A -->|5. POST SqlQueryRequest| B
|
||||
B -->|6. SqlQueryResponse| A
|
||||
|
||||
style A fill:#bfb,stroke:#333,stroke-width:2px
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Validierung und Fehlerbehandlung
|
||||
|
||||
Alle Schemas verwenden Pydantic für automatische Validierung:
|
||||
|
||||
- **Typsicherheit**: Alle Felder werden auf korrekte Typen geprüft
|
||||
- **Required Fields**: Pflichtfelder müssen vorhanden sein
|
||||
- **Min/Max Validierung**: z.B. `min_items=1` für `tables`
|
||||
- **String Validierung**: z.B. `min_length=1` für SQL-Queries
|
||||
- **Custom Validation**: Zusätzliche Business-Logik in den Services
|
||||
|
||||
Bei Validierungsfehlern gibt die API einen HTTP 422 (Unprocessable Entity) mit detaillierten Fehlermeldungen zurück.
|
||||
23
app.py
Normal file
23
app.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Azure entry point for PowerOn Preprocessing App.
|
||||
This file redirects to the actual application in the src directory.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Add current directory to Python path
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
# Ensure DATA_DIR exists (Azure persistent storage at /home/data)
|
||||
data_dir = os.environ.get("DATA_DIR", "/home/data")
|
||||
os.makedirs(data_dir, exist_ok=True)
|
||||
print(f"Data directory ready: {data_dir}")
|
||||
|
||||
# Import the actual application
|
||||
from src.main import app
|
||||
|
||||
# For Azure App Service with uvicorn, expose the app at module level
|
||||
# This allows uvicorn to find it as 'app:app'
|
||||
application = app
|
||||
42
requirements.txt
Normal file
42
requirements.txt
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
aiosqlite==0.21.0
|
||||
annotated-types==0.7.0
|
||||
anyio==4.11.0
|
||||
certifi==2025.8.3
|
||||
cffi==2.0.0
|
||||
charset-normalizer==3.4.3
|
||||
click==8.3.0
|
||||
cryptography==46.0.1
|
||||
fastapi==0.117.1
|
||||
greenlet==3.2.4
|
||||
h11==0.16.0
|
||||
httpcore==1.0.9
|
||||
httpx==0.28.1
|
||||
idna==3.10
|
||||
iniconfig==2.1.0
|
||||
msal==1.34.0
|
||||
numpy==2.3.3
|
||||
packaging==25.0
|
||||
pandas==2.3.2
|
||||
pluggy==1.6.0
|
||||
pycparser==2.23
|
||||
pydantic==2.11.9
|
||||
pydantic-core==2.33.2
|
||||
pydantic-settings==2.10.1
|
||||
pygments==2.19.2
|
||||
pyjwt==2.10.1
|
||||
pytest==8.4.2
|
||||
pytest-asyncio==1.2.0
|
||||
python-dateutil==2.9.0.post0
|
||||
python-dotenv==1.1.1
|
||||
pytz==2025.2
|
||||
pyyaml==6.0.2
|
||||
requests==2.32.5
|
||||
six==1.17.0
|
||||
sniffio==1.3.1
|
||||
sqlalchemy==2.0.43
|
||||
starlette==0.48.0
|
||||
typing-extensions==4.15.0
|
||||
typing-inspection==0.4.1
|
||||
tzdata==2025.2
|
||||
urllib3==2.5.0
|
||||
uvicorn==0.37.0
|
||||
|
|
@ -4,6 +4,7 @@ from dataclasses import dataclass
|
|||
from src.settings import settings
|
||||
import pandas as pd
|
||||
import httpx
|
||||
import re
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -13,31 +14,169 @@ class PowerBIReader:
|
|||
table_name: str
|
||||
base_url: str = settings.POWERBI_BASE_URL
|
||||
include_nulls: bool = True
|
||||
measures: list[str] = None
|
||||
group_by_columns: list[str] = None
|
||||
batch_size: int = 10000
|
||||
order_by_column: str | None = None
|
||||
max_batches: int = 100
|
||||
|
||||
@classmethod
|
||||
async def create(
|
||||
cls, dataset_id: str, access_token: str, table_name: str, **kwargs
|
||||
):
|
||||
cls,
|
||||
*,
|
||||
dataset_id: str,
|
||||
access_token: str,
|
||||
table_name: str,
|
||||
measures: list[str] = None,
|
||||
group_by_columns: list[str] = None,
|
||||
batch_size: int = 10000,
|
||||
order_by_column: str | None = None,
|
||||
max_batches: int = 100,
|
||||
**kwargs,
|
||||
) -> "PowerBIReader":
|
||||
return cls(
|
||||
dataset_id=dataset_id,
|
||||
access_token=access_token,
|
||||
table_name=table_name,
|
||||
measures=measures or [],
|
||||
group_by_columns=group_by_columns or [],
|
||||
batch_size=batch_size,
|
||||
order_by_column=order_by_column,
|
||||
max_batches=max_batches,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def _dax_query(self) -> str:
|
||||
# Escape single quotes in table names per DAX rules
|
||||
safe = self.table_name.replace("'", "''")
|
||||
return f"EVALUATE '{safe}'"
|
||||
"""Generate DAX query based on configuration.
|
||||
|
||||
async def read_data(self) -> pd.DataFrame:
|
||||
Generates different DAX queries depending on whether measures and/or
|
||||
group_by_columns are specified:
|
||||
|
||||
1. No measures: EVALUATE 'TableName'
|
||||
Returns all physical/calculated columns from the table.
|
||||
|
||||
2. Measures only: EVALUATE ADDCOLUMNS('TableName', "Measure1", [Measure1], ...)
|
||||
Returns all columns plus the specified measures.
|
||||
|
||||
3. Measures + group_by_columns: EVALUATE SUMMARIZECOLUMNS('Table'[Col1], ..., "Measure1", [Measure1], ...)
|
||||
Returns aggregated measures grouped by the specified columns.
|
||||
|
||||
Returns:
|
||||
DAX query string to execute against Power BI.
|
||||
"""
|
||||
Calls Power BI REST API: POST /datasets/{datasetId}/executeQueries
|
||||
with DAX: EVALUATE 'TableName' and returns a DataFrame.
|
||||
# Remove XML/HTML tags from table name (e.g., '<oii>Artikel</oii>' -> 'Artikel')
|
||||
cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
|
||||
# Escape single quotes in table names per DAX rules
|
||||
safe_table = cleaned_table_name.replace("'", "''")
|
||||
|
||||
# Case 1: No measures - simple table evaluation
|
||||
if not self.measures:
|
||||
return f"EVALUATE '{safe_table}'"
|
||||
|
||||
# Case 2: Measures without grouping - use ADDCOLUMNS
|
||||
if not self.group_by_columns:
|
||||
measure_clauses = ", ".join(
|
||||
[f'"{measure}", [{measure}]' for measure in self.measures]
|
||||
)
|
||||
return f"EVALUATE ADDCOLUMNS('{safe_table}', {measure_clauses})"
|
||||
|
||||
# Case 3: Measures with grouping - use SUMMARIZECOLUMNS
|
||||
group_cols = ", ".join(
|
||||
[f"'{safe_table}'[{col}]" for col in self.group_by_columns]
|
||||
)
|
||||
measure_clauses = ", ".join(
|
||||
[f'"{measure}", [{measure}]' for measure in self.measures]
|
||||
)
|
||||
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
|
||||
|
||||
@staticmethod
|
||||
def _is_numeric_value(value: str | int | float | None) -> bool:
|
||||
"""Check if a value is numeric (including numeric strings).
|
||||
|
||||
Args:
|
||||
value: The value to check.
|
||||
|
||||
Returns:
|
||||
True if the value is numeric or a string that represents a number.
|
||||
"""
|
||||
if value is None:
|
||||
return False
|
||||
if isinstance(value, (int, float)):
|
||||
return True
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
float(value)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
return False
|
||||
|
||||
def _dax_query_batch(self, last_value: str | int | float | None = None) -> str:
|
||||
"""Generate a batched DAX query using TOPN and keyset pagination.
|
||||
|
||||
Uses ORDER BY with the order_by_column for deterministic ordering,
|
||||
and FILTER to skip already-fetched rows based on the last seen value.
|
||||
|
||||
Args:
|
||||
last_value: The last value of order_by_column from the previous batch.
|
||||
None for the first batch.
|
||||
|
||||
Returns:
|
||||
DAX query string for fetching the next batch.
|
||||
"""
|
||||
# Remove XML/HTML tags from table name (e.g., '<oii>Artikel</oii>' -> 'Artikel')
|
||||
cleaned_table_name = re.sub(r'<[^>]+>', '', self.table_name).strip()
|
||||
# Escape single quotes in table names per DAX rules
|
||||
safe_table = cleaned_table_name.replace("'", "''")
|
||||
order_col = self.order_by_column
|
||||
|
||||
if last_value is None:
|
||||
# First batch: just use TOPN with ORDER BY
|
||||
return (
|
||||
f"EVALUATE TOPN({self.batch_size}, '{safe_table}', "
|
||||
f"'{safe_table}'[{order_col}], ASC)"
|
||||
)
|
||||
|
||||
# Subsequent batches: filter rows where order_col > last_value
|
||||
# IMPORTANT: Handle type conversion to avoid DAX type mismatch errors
|
||||
# Use a type-safe approach: convert column using IF/ISNUMBER to handle both text and numeric columns
|
||||
if self._is_numeric_value(last_value):
|
||||
numeric_val = float(last_value) if '.' in str(last_value) else int(last_value)
|
||||
# Use IF to check if column is numeric, if so use it directly, otherwise convert with VALUE()
|
||||
# This handles both text columns (VALUE converts) and numeric columns (use directly)
|
||||
return (
|
||||
f"EVALUATE TOPN({self.batch_size}, "
|
||||
f"FILTER('{safe_table}', "
|
||||
f"NOT(ISBLANK('{safe_table}'[{order_col}])) && "
|
||||
f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])) > {numeric_val}), "
|
||||
f"IF(ISNUMBER('{safe_table}'[{order_col}]), '{safe_table}'[{order_col}], VALUE('{safe_table}'[{order_col}])), ASC)"
|
||||
)
|
||||
else:
|
||||
# String comparison - escape quotes in the value
|
||||
escaped_value = str(last_value).replace('"', '""')
|
||||
return (
|
||||
f"EVALUATE TOPN({self.batch_size}, "
|
||||
f"FILTER('{safe_table}', '{safe_table}'[{order_col}] > \"{escaped_value}\"), "
|
||||
f"'{safe_table}'[{order_col}], ASC)"
|
||||
)
|
||||
|
||||
async def _execute_query(self, dax_query: str) -> pd.DataFrame:
|
||||
"""Execute a DAX query and return the results as a DataFrame.
|
||||
|
||||
Args:
|
||||
dax_query: The DAX query string to execute.
|
||||
|
||||
Returns:
|
||||
DataFrame containing the query results.
|
||||
"""
|
||||
# Log the DAX query for debugging
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info(f"Executing DAX query: {dax_query}")
|
||||
|
||||
url = f"{self.base_url}/datasets/{self.dataset_id}/executeQueries"
|
||||
body = {
|
||||
"queries": [{"query": self._dax_query()}],
|
||||
"queries": [{"query": dax_query}],
|
||||
"serializerSettings": {"includeNulls": self.include_nulls},
|
||||
}
|
||||
|
||||
|
|
@ -50,6 +189,7 @@ class PowerBIReader:
|
|||
resp = await client.post(url, headers=headers, json=body)
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"DAX query failed: {dax_query}")
|
||||
raise RuntimeError(
|
||||
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
|
||||
)
|
||||
|
|
@ -74,6 +214,116 @@ class PowerBIReader:
|
|||
df.columns = [_strip_qual(c) for c in df.columns]
|
||||
return df
|
||||
|
||||
async def read_data(self) -> pd.DataFrame:
|
||||
"""Fetch data from Power BI, using batching if order_by_column is set.
|
||||
|
||||
If order_by_column is configured, fetches data in batches using
|
||||
keyset pagination to avoid the Power BI API's 1M value limit.
|
||||
Otherwise, fetches all data in a single query (legacy behavior).
|
||||
|
||||
Returns:
|
||||
DataFrame containing all fetched data.
|
||||
"""
|
||||
# Legacy behavior: no batching if order_by_column not set
|
||||
if not self.order_by_column:
|
||||
return await self._execute_query(self._dax_query())
|
||||
|
||||
# Batch fetching with keyset pagination
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
all_dfs: list[pd.DataFrame] = []
|
||||
last_value: str | int | None = None
|
||||
batch_num = 0
|
||||
total_rows = 0
|
||||
|
||||
while True:
|
||||
batch_num += 1
|
||||
|
||||
# Safety limit to prevent runaway requests
|
||||
if batch_num > self.max_batches:
|
||||
logger.warning(
|
||||
f"Reached max_batches limit ({self.max_batches}) for table "
|
||||
f"'{self.table_name}'. Stopping batch fetch. "
|
||||
f"Total rows fetched so far: {total_rows}"
|
||||
)
|
||||
break
|
||||
|
||||
dax_query = self._dax_query_batch(last_value)
|
||||
df = await self._execute_query(dax_query)
|
||||
|
||||
if df.empty:
|
||||
logger.info(f"Batch {batch_num}: Empty result, stopping fetch")
|
||||
break
|
||||
|
||||
batch_rows = len(df)
|
||||
total_rows += batch_rows
|
||||
|
||||
# Log batch info
|
||||
if self.order_by_column in df.columns:
|
||||
min_val = df[self.order_by_column].min()
|
||||
max_val = df[self.order_by_column].max()
|
||||
logger.info(
|
||||
f"Batch {batch_num}: Fetched {batch_rows} rows, "
|
||||
f"{self.order_by_column} range: {min_val} to {max_val}, "
|
||||
f"Total so far: {total_rows}"
|
||||
)
|
||||
else:
|
||||
logger.info(f"Batch {batch_num}: Fetched {batch_rows} rows, Total so far: {total_rows}")
|
||||
|
||||
all_dfs.append(df)
|
||||
|
||||
# Get the maximum value for the next batch (not just the last row)
|
||||
# This ensures we don't skip rows when there are multiple rows per ID
|
||||
# Using max() instead of iloc[-1] because rows are ordered by I_ID,
|
||||
# but if there are multiple rows with the same I_ID, the last row might
|
||||
# have a smaller I_ID than the maximum in the batch
|
||||
max_value = df[self.order_by_column].max()
|
||||
|
||||
# Ensure numeric values are preserved as numeric (not converted to string)
|
||||
# Check if the column is numeric and convert the value accordingly
|
||||
if pd.api.types.is_numeric_dtype(df[self.order_by_column].dtype):
|
||||
# Column is numeric - ensure value is numeric
|
||||
if pd.isna(max_value):
|
||||
logger.info(f"Batch {batch_num}: Max value is NaN, stopping")
|
||||
break # Can't continue with NaN
|
||||
# Convert to appropriate numeric type
|
||||
if pd.api.types.is_integer_dtype(df[self.order_by_column].dtype):
|
||||
new_last_value = int(max_value)
|
||||
else:
|
||||
new_last_value = float(max_value)
|
||||
else:
|
||||
new_last_value = max_value
|
||||
|
||||
# Safety check: if last_value didn't change, we're stuck in a loop
|
||||
if new_last_value == last_value:
|
||||
logger.warning(
|
||||
f"Batch {batch_num}: last_value didn't change ({last_value}), "
|
||||
f"stopping to avoid infinite loop"
|
||||
)
|
||||
break
|
||||
|
||||
last_value = new_last_value
|
||||
|
||||
logger.info(f"Finished fetching batches for table '{self.table_name}': {batch_num} batches, {total_rows} total rows")
|
||||
|
||||
if not all_dfs:
|
||||
return pd.DataFrame()
|
||||
|
||||
result = pd.concat(all_dfs, ignore_index=True)
|
||||
|
||||
# Log statistics
|
||||
if self.order_by_column and self.order_by_column in result.columns:
|
||||
unique_ids = result[self.order_by_column].nunique()
|
||||
total_rows_final = len(result)
|
||||
logger.info(
|
||||
f"Table '{self.table_name}': {total_rows_final} total rows, "
|
||||
f"{unique_ids} unique {self.order_by_column} values, "
|
||||
f"average {total_rows_final / unique_ids:.2f} rows per {self.order_by_column}"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _get_access_token_sync(
|
||||
tenant_id: str,
|
||||
|
|
|
|||
|
|
@ -147,6 +147,11 @@ class TableConfig:
|
|||
|
||||
name: str
|
||||
powerbi_table_name: str
|
||||
measures: List[str] = field(default_factory=list)
|
||||
group_by_columns: List[str] = field(default_factory=list)
|
||||
batch_size: int = 10000
|
||||
order_by_column: str | None = None
|
||||
max_batches: int = 100
|
||||
steps: List[Dict[str, Any]] = field(default_factory=list)
|
||||
|
||||
|
||||
|
|
@ -184,6 +189,82 @@ class Preprocessor:
|
|||
table_config = TableConfig(
|
||||
name=table_data.get("name", ""),
|
||||
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||
measures=table_data.get("measures", []),
|
||||
group_by_columns=table_data.get("group_by_columns", []),
|
||||
steps=table_data.get("steps", []),
|
||||
)
|
||||
table_configs.append(table_config)
|
||||
|
||||
return cls(table_configs=table_configs)
|
||||
|
||||
@classmethod
|
||||
async def create_from_config(cls, *, config: Dict[str, Any]) -> "Preprocessor":
|
||||
"""Create a Preprocessor instance from a configuration dictionary.
|
||||
|
||||
This method allows creating a Preprocessor without relying on a YAML file,
|
||||
enabling dynamic configuration via API requests with JSON payloads.
|
||||
|
||||
The configuration dictionary should follow the same structure as the YAML
|
||||
configuration file, containing a "tables" key with a list of table
|
||||
configurations.
|
||||
|
||||
Args:
|
||||
config: Dictionary containing preprocessing configuration with structure:
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": str,
|
||||
"powerbi_table_name": str,
|
||||
"steps": [
|
||||
{step_type: {param1: value1, ...}},
|
||||
...
|
||||
]
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
Returns:
|
||||
A new Preprocessor instance configured with the provided tables.
|
||||
|
||||
Raises:
|
||||
ValueError: If the configuration dictionary is invalid or missing required keys.
|
||||
|
||||
Example:
|
||||
>>> config = {
|
||||
... "tables": [
|
||||
... {
|
||||
... "name": "Sales",
|
||||
... "powerbi_table_name": "sales_raw",
|
||||
... "steps": [
|
||||
... {"keep": {"columns": ["Product", "Amount"]}},
|
||||
... {"fillna": {"column": "Amount", "value": 0}}
|
||||
... ]
|
||||
... }
|
||||
... ]
|
||||
... }
|
||||
>>> preprocessor = await Preprocessor.create_from_config(config=config)
|
||||
"""
|
||||
if not isinstance(config, dict):
|
||||
raise ValueError("Configuration must be a dictionary")
|
||||
|
||||
if "tables" not in config:
|
||||
raise ValueError("Configuration must contain a 'tables' key")
|
||||
|
||||
# Parse table configurations
|
||||
table_configs = []
|
||||
for table_data in config.get("tables", []):
|
||||
if not isinstance(table_data, dict):
|
||||
raise ValueError("Each table configuration must be a dictionary")
|
||||
|
||||
table_config = TableConfig(
|
||||
name=table_data.get("name", ""),
|
||||
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||
measures=table_data.get("measures", []),
|
||||
group_by_columns=table_data.get("group_by_columns", []),
|
||||
batch_size=table_data.get("batch_size", 10000),
|
||||
order_by_column=table_data.get("order_by_column"),
|
||||
max_batches=table_data.get("max_batches", 100),
|
||||
steps=table_data.get("steps", []),
|
||||
)
|
||||
table_configs.append(table_config)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,9 @@ import pandas as pd
|
|||
import logging
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
|
||||
from sqlalchemy.pool import NullPool
|
||||
|
||||
from src.dataprocessor.domain.base_datasaver import BaseDataSaver
|
||||
|
||||
|
|
@ -21,9 +23,15 @@ class SQLiteDataSaver(BaseDataSaver):
|
|||
@classmethod
|
||||
async def create(cls, db_path: str) -> "SQLiteDataSaver":
|
||||
"""Create a new instance of DataSaver."""
|
||||
# Build the full db url
|
||||
db_url = f"sqlite+aiosqlite:///{db_path}"
|
||||
engine = create_async_engine(db_url, future=True)
|
||||
# Ensure the directory exists
|
||||
db_file = Path(db_path)
|
||||
db_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Build the full db url with absolute path (4 slashes total)
|
||||
db_url = f"sqlite+aiosqlite:///{db_file.as_posix()}"
|
||||
|
||||
# Use NullPool to avoid connection pooling issues with SQLite
|
||||
engine = create_async_engine(db_url, poolclass=NullPool, future=True)
|
||||
return cls(db_url=db_url, engine=engine)
|
||||
|
||||
async def save_table(
|
||||
|
|
|
|||
|
|
@ -2,10 +2,14 @@
|
|||
|
||||
# Set up router
|
||||
import logging
|
||||
from fastapi import APIRouter
|
||||
from fastapi import Depends
|
||||
from fastapi import APIRouter, Body
|
||||
from fastapi import Security
|
||||
|
||||
from src.dataprocessor.schemas import UpdateDbResponse
|
||||
from src.dataprocessor.schemas import (
|
||||
UpdateDbResponse,
|
||||
PreprocessingConfigSchema,
|
||||
UpdateDbWithConfigResponse,
|
||||
)
|
||||
from src.dependencies import require_pp_api_key
|
||||
from src.dataprocessor.service import DataProcessorService
|
||||
|
||||
|
|
@ -16,8 +20,223 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
@router.post("/update-db")
|
||||
async def update_db(*, _: None = Depends(require_pp_api_key)) -> UpdateDbResponse:
|
||||
async def update_db(_: None = Security(require_pp_api_key)) -> UpdateDbResponse:
|
||||
"""Endpoint to update the AI-database."""
|
||||
service = await DataProcessorService.create()
|
||||
await service.update_database()
|
||||
return UpdateDbResponse(success=True)
|
||||
|
||||
|
||||
@router.post("/update-db-with-config")
|
||||
async def update_db_with_config(
|
||||
config: PreprocessingConfigSchema = Body(...),
|
||||
_: None = Security(require_pp_api_key),
|
||||
) -> UpdateDbWithConfigResponse:
|
||||
"""Update the database using a JSON configuration instead of a YAML file.
|
||||
|
||||
This endpoint provides a flexible, dynamic way to preprocess and update your
|
||||
database without relying on a static YAML configuration file. You can specify
|
||||
the entire preprocessing pipeline in the request body, allowing for on-demand
|
||||
custom preprocessing operations.
|
||||
|
||||
## How It Works
|
||||
|
||||
1. **Reads data from Power BI**: For each table specified in the configuration,
|
||||
data is fetched from the corresponding Power BI dataset table.
|
||||
|
||||
2. **Applies preprocessing steps**: Each table configuration includes a list of
|
||||
preprocessing steps that are applied sequentially to transform the data.
|
||||
|
||||
3. **Saves to local database**: The processed data is saved to the local SQLite
|
||||
database with the specified table name.
|
||||
|
||||
## Power BI Measures Support
|
||||
|
||||
In addition to retrieving physical/calculated columns from Power BI tables, you can
|
||||
now retrieve Power BI measures using the optional `measures` and `group_by_columns`
|
||||
fields in your table configuration.
|
||||
|
||||
### Retrieving Measures
|
||||
|
||||
Power BI measures are calculated values that live only in the model and are computed
|
||||
at query time. To retrieve them alongside your table data, add a `measures` array
|
||||
to your table configuration:
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "Einkaufspreis",
|
||||
"powerbi_table_name": "Einkaufspreis",
|
||||
"measures": ["EP in CHF", "Gesamtbetrag in CHF"],
|
||||
"steps": [...]
|
||||
}
|
||||
```
|
||||
|
||||
This uses the DAX ADDCOLUMNS pattern: `EVALUATE ADDCOLUMNS('TableName', "MeasureName", [MeasureName], ...)`
|
||||
|
||||
### Grouping Measures
|
||||
|
||||
If your measures need to be aggregated by specific columns, add the `group_by_columns`
|
||||
field. This is useful when measures are defined with aggregation functions:
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "Einkaufspreis_Aggregated",
|
||||
"powerbi_table_name": "Einkaufspreis",
|
||||
"measures": ["EP in CHF", "Gesamtbetrag in CHF"],
|
||||
"group_by_columns": ["m_Artikel"],
|
||||
"steps": [...]
|
||||
}
|
||||
```
|
||||
|
||||
This uses the DAX SUMMARIZECOLUMNS pattern: `EVALUATE SUMMARIZECOLUMNS('Table'[Column], "MeasureName", [MeasureName], ...)`
|
||||
|
||||
### Measure Name Formatting
|
||||
|
||||
- Measure names with spaces are automatically handled (e.g., "EP in CHF" becomes `[EP in CHF]` in DAX)
|
||||
- If `measures` is empty or not provided, the standard table evaluation is used
|
||||
- If `measures` is provided without `group_by_columns`, ADDCOLUMNS is used
|
||||
- If both `measures` and `group_by_columns` are provided, SUMMARIZECOLUMNS is used
|
||||
|
||||
## Available Preprocessing Steps
|
||||
|
||||
The following preprocessing steps are supported. Each step is specified as a
|
||||
dictionary with the step name as the key and its parameters as the value:
|
||||
|
||||
### keep
|
||||
Keep only specified columns from the DataFrame.
|
||||
- **Parameters**:
|
||||
- `columns` (List[str]): List of column names to retain
|
||||
- **Example**:
|
||||
```json
|
||||
{"keep": {"columns": ["ProductID", "ProductName", "Price"]}}
|
||||
```
|
||||
|
||||
### fillna
|
||||
Fill missing (NaN) values in a specified column.
|
||||
- **Parameters**:
|
||||
- `column` (str): Name of the column to fill
|
||||
- `value` (Any): Value to use for filling NaN entries
|
||||
- **Example**:
|
||||
```json
|
||||
{"fillna": {"column": "Supplier", "value": "Unknown"}}
|
||||
```
|
||||
|
||||
### to_numeric
|
||||
Convert a column to numeric type.
|
||||
- **Parameters**:
|
||||
- `column` (str): Name of the column to convert
|
||||
- `errors` (str, optional): How to handle conversion errors
|
||||
- "coerce": Convert invalid values to NaN
|
||||
- "raise": Raise an exception for invalid values
|
||||
- "ignore": Leave invalid values unchanged
|
||||
- **Example**:
|
||||
```json
|
||||
{"to_numeric": {"column": "Price", "errors": "coerce"}}
|
||||
```
|
||||
|
||||
### dropna
|
||||
Drop rows that contain missing values in specified columns.
|
||||
- **Parameters**:
|
||||
- `subset` (List[str]): List of column names to check for NaN values
|
||||
- **Example**:
|
||||
```json
|
||||
{"dropna": {"subset": ["ProductID", "Price"]}}
|
||||
```
|
||||
|
||||
## Request Body Structure
|
||||
|
||||
The request body must contain a `tables` array, where each table object specifies:
|
||||
- `name`: The name for the table in the local SQLite database
|
||||
- `powerbi_table_name`: The source table name in the Power BI dataset
|
||||
- `steps`: An ordered list of preprocessing steps to apply
|
||||
|
||||
## Complete Example
|
||||
|
||||
```json
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": "ProductData",
|
||||
"powerbi_table_name": "products_raw",
|
||||
"steps": [
|
||||
{
|
||||
"keep": {
|
||||
"columns": [
|
||||
"ProductID",
|
||||
"ProductName",
|
||||
"Supplier",
|
||||
"Stock",
|
||||
"Unit",
|
||||
"Price"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"fillna": {
|
||||
"column": "Supplier",
|
||||
"value": "Unknown"
|
||||
}
|
||||
},
|
||||
{
|
||||
"to_numeric": {
|
||||
"column": "Price",
|
||||
"errors": "coerce"
|
||||
}
|
||||
},
|
||||
{
|
||||
"dropna": {
|
||||
"subset": [
|
||||
"ProductID",
|
||||
"ProductName",
|
||||
"Stock",
|
||||
"Unit",
|
||||
"Price"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Differences from /update-db Endpoint
|
||||
|
||||
- **No YAML file dependency**: Configuration is provided directly in the request
|
||||
- **Dynamic configuration**: Different preprocessing pipelines can be specified per request
|
||||
- **Warnings included**: Response includes any warnings encountered during preprocessing
|
||||
- **Table count**: Response includes the number of tables successfully processed
|
||||
|
||||
## Response
|
||||
|
||||
Returns a `UpdateDbWithConfigResponse` containing:
|
||||
- `success`: Boolean indicating if the operation was successful
|
||||
- `tables_processed`: Number of tables that were successfully processed
|
||||
- `warnings`: List of any warnings encountered during preprocessing (e.g., missing columns)
|
||||
|
||||
## Error Handling
|
||||
|
||||
The endpoint will raise an error if:
|
||||
- The configuration structure is invalid
|
||||
- No table configurations are provided
|
||||
- No data is returned from Power BI for a specified table
|
||||
- Preprocessing results in an empty DataFrame
|
||||
- Database save operations fail
|
||||
|
||||
Args:
|
||||
config: The preprocessing configuration specifying tables and their processing steps
|
||||
_: Security dependency requiring valid API key
|
||||
|
||||
Returns:
|
||||
UpdateDbWithConfigResponse with operation status, table count, and any warnings
|
||||
|
||||
Raises:
|
||||
HTTPException: If authentication fails or processing errors occur
|
||||
"""
|
||||
service = await DataProcessorService.create()
|
||||
tables_processed, warnings = await service.update_database_with_config(
|
||||
config=config.model_dump()
|
||||
)
|
||||
return UpdateDbWithConfigResponse(
|
||||
success=True, tables_processed=tables_processed, warnings=warnings
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
"""Schemas for the data processor service."""
|
||||
|
||||
from typing import Any, Dict, List
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
|
|
@ -7,3 +8,162 @@ class UpdateDbResponse(BaseModel):
|
|||
success: bool = Field(
|
||||
..., description="Indicates if the database update was successful."
|
||||
)
|
||||
|
||||
|
||||
class TableConfigSchema(BaseModel):
|
||||
"""Schema for a single table configuration.
|
||||
|
||||
Defines how a table should be read from Power BI and preprocessed.
|
||||
|
||||
Attributes:
|
||||
name: The name to use for the table in the local SQLite database
|
||||
powerbi_table_name: The name of the source table in Power BI dataset
|
||||
measures: Optional list of Power BI measures to retrieve
|
||||
group_by_columns: Optional list of columns to group by when retrieving measures
|
||||
steps: List of preprocessing steps to apply to the table data
|
||||
"""
|
||||
|
||||
name: str = Field(
|
||||
..., description="Name for the table in the local database", example="Data"
|
||||
)
|
||||
powerbi_table_name: str = Field(
|
||||
...,
|
||||
description="Name of the table in the Power BI dataset",
|
||||
example="data_full",
|
||||
)
|
||||
measures: List[str] = Field(
|
||||
default_factory=list,
|
||||
description="List of Power BI measure names to retrieve",
|
||||
example=["EP in CHF", "Gesamtbetrag in CHF"],
|
||||
)
|
||||
group_by_columns: List[str] = Field(
|
||||
default_factory=list,
|
||||
description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)",
|
||||
example=["m_Artikel"],
|
||||
)
|
||||
batch_size: int = Field(
|
||||
default=10000,
|
||||
description="Number of rows to fetch per batch (for large tables)",
|
||||
example=10000,
|
||||
)
|
||||
order_by_column: str | None = Field(
|
||||
default=None,
|
||||
description="Column to order by for batch fetching. Required for batching to work.",
|
||||
example="I_ID",
|
||||
)
|
||||
max_batches: int = Field(
|
||||
default=100,
|
||||
description="Maximum number of batches to fetch. Safety limit to prevent runaway requests.",
|
||||
example=100,
|
||||
)
|
||||
steps: List[Dict[str, Any]] = Field(
|
||||
default_factory=list,
|
||||
description="List of preprocessing steps to apply",
|
||||
example=[
|
||||
{"keep": {"columns": ["col1", "col2"]}},
|
||||
{"fillna": {"column": "col1", "value": "Unknown"}},
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class PreprocessingConfigSchema(BaseModel):
|
||||
"""Schema for the complete preprocessing configuration.
|
||||
|
||||
This schema defines the structure for JSON-based preprocessing configuration,
|
||||
replacing the need for a YAML configuration file. It allows dynamic
|
||||
configuration of table preprocessing operations per API request.
|
||||
|
||||
The configuration supports multiple tables, each with its own set of
|
||||
preprocessing steps. Available preprocessing steps include:
|
||||
|
||||
- **keep**: Keep only specified columns
|
||||
- Parameters: columns (List[str])
|
||||
- Example: {"keep": {"columns": ["Name", "Price", "Quantity"]}}
|
||||
|
||||
- **fillna**: Fill missing values in a column
|
||||
- Parameters: column (str), value (Any)
|
||||
- Example: {"fillna": {"column": "Supplier", "value": "Unknown"}}
|
||||
|
||||
- **to_numeric**: Convert a column to numeric type
|
||||
- Parameters: column (str), errors (str, optional)
|
||||
- Example: {"to_numeric": {"column": "Price", "errors": "coerce"}}
|
||||
|
||||
- **dropna**: Drop rows with missing values in specified columns
|
||||
- Parameters: subset (List[str])
|
||||
- Example: {"dropna": {"subset": ["Name", "Price"]}}
|
||||
|
||||
Attributes:
|
||||
tables: List of table configurations to process
|
||||
|
||||
Example Request Body:
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": "ProductData",
|
||||
"powerbi_table_name": "products_raw",
|
||||
"steps": [
|
||||
{
|
||||
"keep": {
|
||||
"columns": [
|
||||
"ProductID",
|
||||
"ProductName",
|
||||
"Supplier",
|
||||
"Stock",
|
||||
"Unit",
|
||||
"Price"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"fillna": {
|
||||
"column": "Supplier",
|
||||
"value": "Unknown"
|
||||
}
|
||||
},
|
||||
{
|
||||
"to_numeric": {
|
||||
"column": "Price",
|
||||
"errors": "coerce"
|
||||
}
|
||||
},
|
||||
{
|
||||
"dropna": {
|
||||
"subset": [
|
||||
"ProductID",
|
||||
"ProductName",
|
||||
"Stock",
|
||||
"Unit",
|
||||
"Price"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
tables: List[TableConfigSchema] = Field(
|
||||
..., description="List of table configurations to process", min_items=1
|
||||
)
|
||||
|
||||
|
||||
class UpdateDbWithConfigResponse(BaseModel):
|
||||
"""Response schema for the JSON-based database update endpoint.
|
||||
|
||||
Attributes:
|
||||
success: Indicates if the database update was successful
|
||||
tables_processed: Number of tables that were processed
|
||||
warnings: List of any warnings encountered during preprocessing
|
||||
"""
|
||||
|
||||
success: bool = Field(
|
||||
..., description="Indicates if the database update was successful"
|
||||
)
|
||||
tables_processed: int = Field(
|
||||
..., description="Number of tables that were successfully processed"
|
||||
)
|
||||
warnings: List[str] = Field(
|
||||
default_factory=list,
|
||||
description="List of warnings encountered during preprocessing",
|
||||
)
|
||||
|
|
|
|||
|
|
@ -54,6 +54,10 @@ class DataProcessorService:
|
|||
dataset_id=settings.POWERBI_DATASET_ID,
|
||||
access_token=self.access_token,
|
||||
table_name=table_config.powerbi_table_name,
|
||||
measures=table_config.measures,
|
||||
group_by_columns=table_config.group_by_columns,
|
||||
batch_size=getattr(table_config, 'batch_size', 10000),
|
||||
order_by_column=getattr(table_config, 'order_by_column', None),
|
||||
)
|
||||
|
||||
# Step 2: Read data from Power BI
|
||||
|
|
@ -72,3 +76,109 @@ class DataProcessorService:
|
|||
|
||||
# Step 4: Update the local SQLite database
|
||||
await self.data_saver.save_table(df, table_config.name, overwrite=True)
|
||||
|
||||
async def update_database_with_config(
|
||||
self, *, config: dict
|
||||
) -> tuple[int, list[str]]:
|
||||
"""Update the database using a provided JSON configuration.
|
||||
|
||||
This method processes tables based on a configuration provided directly
|
||||
as a dictionary (typically from a JSON request body), without relying
|
||||
on a YAML configuration file. This enables dynamic, on-demand preprocessing
|
||||
with custom configurations per API request.
|
||||
|
||||
The method follows the same workflow as update_database():
|
||||
1. Reads data from Power BI for each configured table
|
||||
2. Applies the specified preprocessing steps
|
||||
3. Saves the processed data to the local SQLite database
|
||||
|
||||
Args:
|
||||
config: Dictionary containing preprocessing configuration with structure:
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": str,
|
||||
"powerbi_table_name": str,
|
||||
"steps": [
|
||||
{step_type: {param1: value1, ...}},
|
||||
...
|
||||
]
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
- int: Number of tables successfully processed
|
||||
- List[str]: List of warnings collected during preprocessing
|
||||
|
||||
Raises:
|
||||
RuntimeError: If no table configurations are found or if data processing fails.
|
||||
ValueError: If the configuration structure is invalid.
|
||||
|
||||
Example:
|
||||
>>> config = {
|
||||
... "tables": [
|
||||
... {
|
||||
... "name": "Sales",
|
||||
... "powerbi_table_name": "sales_raw",
|
||||
... "steps": [
|
||||
... {"keep": {"columns": ["Product", "Amount"]}},
|
||||
... {"dropna": {"subset": ["Product", "Amount"]}}
|
||||
... ]
|
||||
... }
|
||||
... ]
|
||||
... }
|
||||
>>> service = await DataProcessorService.create()
|
||||
>>> tables_processed, warnings = await service.update_database_with_config(
|
||||
... config=config
|
||||
... )
|
||||
"""
|
||||
# Create a Preprocessor from the provided configuration
|
||||
preprocessor = await Preprocessor.create_from_config(config=config)
|
||||
table_configs = preprocessor.get_table_configs()
|
||||
|
||||
if not table_configs:
|
||||
raise RuntimeError("No table configurations found in provided config.")
|
||||
|
||||
all_warnings = []
|
||||
tables_processed = 0
|
||||
|
||||
for table_config in table_configs:
|
||||
# Step 1: Create PowerBIReader for this table
|
||||
power_bi_reader = await PowerBIReader.create(
|
||||
dataset_id=settings.POWERBI_DATASET_ID,
|
||||
access_token=self.access_token,
|
||||
table_name=table_config.powerbi_table_name,
|
||||
measures=table_config.measures,
|
||||
group_by_columns=table_config.group_by_columns,
|
||||
batch_size=table_config.batch_size,
|
||||
order_by_column=table_config.order_by_column,
|
||||
max_batches=table_config.max_batches,
|
||||
)
|
||||
|
||||
# Step 2: Read data from Power BI
|
||||
df = await power_bi_reader.read_data()
|
||||
if df.empty:
|
||||
raise RuntimeError(
|
||||
f"No data read from Power BI for table '{table_config.name}'."
|
||||
)
|
||||
|
||||
# Step 3: Preprocess the data using this table's steps
|
||||
df = await preprocessor.preprocess(df, steps=table_config.steps)
|
||||
if df.empty:
|
||||
raise RuntimeError(
|
||||
f"No data returned from preprocessing for table '{table_config.name}'."
|
||||
)
|
||||
|
||||
# Collect any warnings from preprocessing
|
||||
if preprocessor.last_report:
|
||||
all_warnings.extend(preprocessor.last_report)
|
||||
|
||||
# Step 4: Update the local SQLite database
|
||||
await self.data_saver.save_table(df, table_config.name, overwrite=True)
|
||||
|
||||
tables_processed += 1
|
||||
|
||||
return tables_processed, all_warnings
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
"""Router for data query endpoints."""
|
||||
|
||||
import logging
|
||||
from fastapi import APIRouter, Depends, Path
|
||||
from fastapi import APIRouter
|
||||
from fastapi import Path
|
||||
from fastapi import Security
|
||||
|
||||
from src.dataquery.schemas import (
|
||||
DatabaseSchemaResponse,
|
||||
|
|
@ -21,7 +23,7 @@ logger = logging.getLogger(__name__)
|
|||
@router.post("/query", response_model=SqlQueryResponse)
|
||||
async def execute_sql_query(
|
||||
request: SqlQueryRequest,
|
||||
_: None = Depends(require_db_api_key),
|
||||
_: None = Security(require_db_api_key),
|
||||
) -> SqlQueryResponse:
|
||||
"""Execute a SELECT SQL query against the database.
|
||||
|
||||
|
|
@ -44,7 +46,7 @@ async def execute_sql_query(
|
|||
|
||||
@router.get("/schema", response_model=DatabaseSchemaResponse)
|
||||
async def get_database_schema(
|
||||
_: None = Depends(require_db_api_key),
|
||||
_: None = Security(require_db_api_key),
|
||||
) -> DatabaseSchemaResponse:
|
||||
"""Get information about all tables in the database.
|
||||
|
||||
|
|
@ -64,7 +66,7 @@ async def get_database_schema(
|
|||
@router.get("/schema/{table_name}", response_model=TableSchemaResponse)
|
||||
async def get_table_schema(
|
||||
table_name: str = Path(..., description="Name of the table to inspect"),
|
||||
_: None = Depends(require_db_api_key),
|
||||
_: None = Security(require_db_api_key),
|
||||
) -> TableSchemaResponse:
|
||||
"""Get detailed information about a specific table.
|
||||
|
||||
|
|
|
|||
|
|
@ -71,6 +71,53 @@ class DataQueryService:
|
|||
|
||||
return True
|
||||
|
||||
def _enforce_query_limit(self, *, query: str) -> str:
|
||||
"""Enforce maximum row limit on query.
|
||||
|
||||
If query has LIMIT > SQL_ROW_LIMIT, replace with SQL_ROW_LIMIT.
|
||||
If query has no LIMIT, append LIMIT SQL_ROW_LIMIT.
|
||||
If query has LIMIT <= SQL_ROW_LIMIT, keep as is.
|
||||
|
||||
Args:
|
||||
query: The SQL query to enforce limit on.
|
||||
|
||||
Returns:
|
||||
Query with enforced limit.
|
||||
"""
|
||||
max_limit = settings.SQL_ROW_LIMIT
|
||||
|
||||
# Strip trailing semicolons and whitespace to prevent multi-statement errors
|
||||
query = query.rstrip("; \t\n\r")
|
||||
|
||||
# Remove comments and normalize whitespace for parsing
|
||||
cleaned_query = re.sub(r"--.*$", "", query, flags=re.MULTILINE)
|
||||
cleaned_query = re.sub(r"/\*.*?\*/", "", cleaned_query, flags=re.DOTALL)
|
||||
|
||||
# Look for LIMIT clause (case insensitive)
|
||||
# Pattern matches: LIMIT <number> or LIMIT <number> OFFSET <number>
|
||||
limit_pattern = r"\bLIMIT\s+(\d+)(\s+OFFSET\s+\d+)?\s*$"
|
||||
match = re.search(limit_pattern, cleaned_query, re.IGNORECASE)
|
||||
|
||||
if match:
|
||||
# Extract the current limit value
|
||||
current_limit = int(match.group(1))
|
||||
|
||||
if current_limit > max_limit:
|
||||
# Replace with max_limit while preserving OFFSET if present
|
||||
offset_clause = match.group(2) or ""
|
||||
# Find the position in the original query to replace
|
||||
# Use the original query to preserve formatting
|
||||
original_match = re.search(limit_pattern, query, re.IGNORECASE)
|
||||
if original_match:
|
||||
new_limit_clause = f"LIMIT {max_limit}{offset_clause}"
|
||||
query = query[: original_match.start()] + new_limit_clause
|
||||
# If current_limit <= max_limit, keep query as is
|
||||
else:
|
||||
# No LIMIT clause found, append one
|
||||
query = f"{query.rstrip()} LIMIT {max_limit}"
|
||||
|
||||
return query
|
||||
|
||||
async def execute_query(self, *, query: str) -> SqlQueryResponse:
|
||||
"""Execute a SELECT query and return the results.
|
||||
|
||||
|
|
@ -90,6 +137,9 @@ class DataQueryService:
|
|||
message="Only SELECT queries are allowed",
|
||||
)
|
||||
|
||||
# Enforce row limit on the query
|
||||
query = self._enforce_query_limit(query=query)
|
||||
|
||||
try:
|
||||
async with self.engine.begin() as conn:
|
||||
result = await conn.execute(text(query))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
"""Dependencies for FastAPI routers."""
|
||||
|
||||
from fastapi import HTTPException
|
||||
from fastapi import Security
|
||||
from fastapi import status
|
||||
from fastapi.security import APIKeyHeader
|
||||
from fastapi.security.base import SecurityBase
|
||||
|
|
@ -12,10 +13,11 @@ from src.settings import settings
|
|||
api_key_header: SecurityBase = APIKeyHeader(
|
||||
name="X-PP-API-Key",
|
||||
description="API key for preprocessor access",
|
||||
scheme_name="PreprocessorKey",
|
||||
)
|
||||
|
||||
|
||||
def require_pp_api_key(*, api_key: str = api_key_header) -> None:
|
||||
def require_pp_api_key(api_key: str = Security(api_key_header)) -> None:
|
||||
"""Validate the preprocessor API key.
|
||||
|
||||
Args:
|
||||
|
|
@ -36,10 +38,11 @@ def require_pp_api_key(*, api_key: str = api_key_header) -> None:
|
|||
db_api_key_header: SecurityBase = APIKeyHeader(
|
||||
name="X-DB-API-Key",
|
||||
description="API key for database query access",
|
||||
scheme_name="DatabaseKey",
|
||||
)
|
||||
|
||||
|
||||
def require_db_api_key(*, api_key: str = db_api_key_header) -> None:
|
||||
def require_db_api_key(api_key: str = Security(db_api_key_header)) -> None:
|
||||
"""Validate the database API key.
|
||||
|
||||
Args:
|
||||
|
|
|
|||
|
|
@ -30,3 +30,30 @@ tables:
|
|||
"Einheit",
|
||||
"EP in CHF",
|
||||
]
|
||||
|
||||
# Example: Retrieving Power BI measures with ADDCOLUMNS
|
||||
# Uncomment to retrieve measures alongside all table columns
|
||||
# - name: "Einkaufspreis_With_Measures"
|
||||
# powerbi_table_name: "Einkaufspreis"
|
||||
# measures:
|
||||
# - "EP in CHF"
|
||||
# - "Gesamtbetrag in CHF"
|
||||
# steps:
|
||||
# - to_numeric:
|
||||
# column: "EP_CHF"
|
||||
# errors: "coerce"
|
||||
# - dropna:
|
||||
# subset: ["EP_CHF"]
|
||||
|
||||
# Example: Retrieving aggregated measures with SUMMARIZECOLUMNS
|
||||
# Uncomment to retrieve measures grouped by specific columns
|
||||
# - name: "Einkaufspreis_Aggregated"
|
||||
# powerbi_table_name: "Einkaufspreis"
|
||||
# measures:
|
||||
# - "EP in CHF"
|
||||
# - "Gesamtbetrag in CHF"
|
||||
# group_by_columns:
|
||||
# - "m_Artikel"
|
||||
# steps:
|
||||
# - dropna:
|
||||
# subset: ["m_Artikel"]
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
"""General application settings."""
|
||||
|
||||
import os
|
||||
from pydantic import Field
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
|
@ -21,8 +22,20 @@ class Settings(BaseSettings):
|
|||
|
||||
# SQLite database file path.
|
||||
# For in memory, use ":memory:" (not persistent).
|
||||
# Uses DATA_DIR environment variable for Azure persistent storage
|
||||
DB_PATH: str = Field(
|
||||
"data/database.sqlite", description="Path to the SQLite database."
|
||||
default_factory=lambda: os.path.join(
|
||||
os.environ.get("DATA_DIR", "data"), "database.sqlite"
|
||||
),
|
||||
description="Path to the SQLite database.",
|
||||
)
|
||||
|
||||
# --- Database Query Settings ---
|
||||
|
||||
# Maximum number of rows to return from SQL queries.
|
||||
SQL_ROW_LIMIT: int = Field(
|
||||
default=50,
|
||||
description="Maximum number of rows to return from SQL queries. Defaults to 50.",
|
||||
)
|
||||
|
||||
# --- API Keys ---
|
||||
|
|
|
|||
1
startup.txt
Normal file
1
startup.txt
Normal file
|
|
@ -0,0 +1 @@
|
|||
uvicorn app:app --host 0.0.0.0 --port 8000
|
||||
1
tests/dataquery/__init__.py
Normal file
1
tests/dataquery/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
"""Data query tests."""
|
||||
Loading…
Reference in a new issue