16 KiB
Gateway Preprocessing - Datenschnittstellen (Datenmodell)
Übersicht
Dieses Projekt bietet zwei Hauptservices mit klar definierten Datenschnittstellen:
- Data Processor Service (
/dataprocessor) - Lädt Daten aus Power BI, verarbeitet sie und speichert sie in SQLite - Data Query Service (
/dataquery) - Ermöglicht SQL-Abfragen auf die gespeicherten Daten
graph LR
A[Power BI Dataset] -->|Data Processor| B[SQLite Database]
B -->|Data Query| C[Client/API Consumer]
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#bbf,stroke:#333,stroke-width:2px
style C fill:#bfb,stroke:#333,stroke-width:2px
Data Processor Service
Der Data Processor Service verarbeitet Daten aus Power BI und speichert sie in einer lokalen SQLite-Datenbank. Die Schemas befinden sich in src/dataprocessor/schemas.py.
Datenmodell-Hierarchie
classDiagram
class PreprocessingConfigSchema {
+List~TableConfigSchema~ tables
}
class TableConfigSchema {
+str name
+str powerbi_table_name
+List~str~ measures
+List~str~ group_by_columns
+List~Dict~ steps
}
class UpdateDbWithConfigResponse {
+bool success
+int tables_processed
+List~str~ warnings
}
class UpdateDbResponse {
+bool success
}
PreprocessingConfigSchema "1" *-- "1..*" TableConfigSchema : enthält
1. TableConfigSchema
Definiert die Konfiguration für eine einzelne Tabelle, die aus Power BI gelesen und verarbeitet werden soll.
Felder:
| Feld | Typ | Beschreibung | Beispiel |
|---|---|---|---|
name |
str |
Name der Tabelle in der lokalen SQLite-Datenbank | "ProductData" |
powerbi_table_name |
str |
Name der Quelltabelle im Power BI Dataset | "products_raw" |
measures |
List[str] |
Liste von Power BI Measures, die abgerufen werden sollen (optional) | ["EP in CHF", "Gesamtbetrag"] |
group_by_columns |
List[str] |
Spalten für Gruppierung bei Measures (triggert SUMMARIZECOLUMNS) | ["m_Artikel", "Kategorie"] |
steps |
List[Dict[str, Any]] |
Liste von Preprocessing-Schritten | siehe unten |
Unterstützte Preprocessing-Schritte:
graph TD
A[Preprocessing Steps] --> B[keep]
A --> C[fillna]
A --> D[to_numeric]
A --> E[dropna]
B --> B1["columns: List[str]<br/>Behält nur angegebene Spalten"]
C --> C1["column: str, value: Any<br/>Füllt fehlende Werte"]
D --> D1["column: str, errors: str<br/>Konvertiert zu numerisch"]
E --> E1["subset: List[str]<br/>Entfernt Zeilen mit NaN"]
style A fill:#f96,stroke:#333,stroke-width:3px
style B fill:#9cf,stroke:#333,stroke-width:2px
style C fill:#9cf,stroke:#333,stroke-width:2px
style D fill:#9cf,stroke:#333,stroke-width:2px
style E fill:#9cf,stroke:#333,stroke-width:2px
Beispiel:
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"measures": ["Total Sales", "Average Price"],
"group_by_columns": ["Category", "Supplier"],
"steps": [
{
"keep": {
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": ["ProductID", "ProductName", "Price"]
}
}
]
}
2. PreprocessingConfigSchema
Komplette Konfiguration für einen Preprocessing-Request. Kann mehrere Tabellen gleichzeitig verarbeiten.
Felder:
| Feld | Typ | Beschreibung | Validierung |
|---|---|---|---|
tables |
List[TableConfigSchema] |
Liste von Tabellenkonfigurationen | min. 1 Tabelle |
Vollständiges Request-Beispiel:
{
"tables": [
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"steps": [
{
"keep": {
"columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": ["ProductID", "ProductName", "Stock", "Unit", "Price"]
}
}
]
},
{
"name": "CustomerData",
"powerbi_table_name": "customers_raw",
"steps": [
{
"keep": {
"columns": ["CustomerID", "Name", "Email", "Region"]
}
},
{
"fillna": {
"column": "Region",
"value": "Unknown"
}
}
]
}
]
}
3. UpdateDbResponse
Einfache Response für Standard-DB-Updates (YAML-basiert).
Felder:
| Feld | Typ | Beschreibung |
|---|---|---|
success |
bool |
Gibt an, ob das Update erfolgreich war |
Beispiel:
{
"success": true
}
4. UpdateDbWithConfigResponse
Detaillierte Response für JSON-basierte DB-Updates mit Konfiguration.
Felder:
| Feld | Typ | Beschreibung |
|---|---|---|
success |
bool |
Gibt an, ob das Update erfolgreich war |
tables_processed |
int |
Anzahl der erfolgreich verarbeiteten Tabellen |
warnings |
List[str] |
Liste von Warnungen während der Verarbeitung |
Beispiel:
{
"success": true,
"tables_processed": 2,
"warnings": [
"Table 'ProductData': 3 rows dropped due to missing values",
"Table 'ProductData': Column 'Price' had 5 non-numeric values coerced to NaN"
]
}
Preprocessing-Workflow
sequenceDiagram
participant Client
participant API
participant PowerBI
participant Preprocessor
participant SQLite
Client->>API: POST /dataprocessor/update-db-with-config
Note over Client,API: PreprocessingConfigSchema
loop Für jede Tabelle
API->>PowerBI: Daten abrufen (powerbi_table_name)
PowerBI-->>API: Raw DataFrame
API->>Preprocessor: Preprocessing-Steps anwenden
Preprocessor->>Preprocessor: keep → fillna → to_numeric → dropna
Preprocessor-->>API: Verarbeiteter DataFrame
API->>SQLite: Tabelle speichern (name)
end
API-->>Client: UpdateDbWithConfigResponse
Note over Client,API: success, tables_processed, warnings
Data Query Service
Der Data Query Service ermöglicht SQL-Abfragen auf die gespeicherte SQLite-Datenbank. Die Schemas befinden sich in src/dataquery/schemas.py.
Datenmodell-Hierarchie
classDiagram
class SqlQueryRequest {
+str query
}
class SqlQueryResponse {
+bool success
+List~Dict~ data
+List~str~ columns
+int row_count
+Optional~str~ message
}
class DatabaseSchemaResponse {
+bool success
+List~TableInfo~ tables
+int table_count
}
class TableInfo {
+str name
+int row_count
}
class TableSchemaResponse {
+bool success
+str table_name
+List~ColumnInfo~ columns
+int row_count
+List~Dict~ sample_data
}
class ColumnInfo {
+str name
+str type
+bool nullable
+bool primary_key
}
DatabaseSchemaResponse "1" *-- "*" TableInfo : enthält
TableSchemaResponse "1" *-- "*" ColumnInfo : enthält
1. SqlQueryRequest
Request-Schema für SQL-Abfragen.
Felder:
| Feld | Typ | Beschreibung | Validierung |
|---|---|---|---|
query |
str |
SQL-Query (nur SELECT erlaubt) | min. 1 Zeichen |
Beispiel:
{
"query": "SELECT ProductID, ProductName, Price FROM ProductData WHERE Price > 10 ORDER BY Price DESC LIMIT 100"
}
2. SqlQueryResponse
Response-Schema für SQL-Query-Ergebnisse.
Felder:
| Feld | Typ | Beschreibung |
|---|---|---|
success |
bool |
Gibt an, ob die Query erfolgreich ausgeführt wurde |
data |
List[Dict[str, Any]] |
Query-Ergebnisse als Liste von Dictionaries |
columns |
List[str] |
Spaltennamen im Ergebnis |
row_count |
int |
Anzahl der zurückgegebenen Zeilen |
message |
Optional[str] |
Zusätzliche Informationen oder Fehlermeldung |
Beispiel:
{
"success": true,
"columns": ["ProductID", "ProductName", "Price"],
"row_count": 3,
"data": [
{
"ProductID": 42,
"ProductName": "Premium Widget",
"Price": 99.99
},
{
"ProductID": 15,
"ProductName": "Deluxe Gadget",
"Price": 79.50
},
{
"ProductID": 8,
"ProductName": "Standard Tool",
"Price": 45.00
}
],
"message": null
}
3. TableInfo
Informationen über eine Datenbanktabelle.
Felder:
| Feld | Typ | Beschreibung |
|---|---|---|
name |
str |
Tabellenname |
row_count |
int |
Anzahl der Zeilen in der Tabelle |
4. DatabaseSchemaResponse
Response-Schema für Datenbank-Schema-Informationen.
Felder:
| Feld | Typ | Beschreibung |
|---|---|---|
success |
bool |
Gibt an, ob die Schema-Abfrage erfolgreich war |
tables |
List[TableInfo] |
Liste aller Tabellen in der Datenbank |
table_count |
int |
Gesamtanzahl der Tabellen |
Beispiel:
{
"success": true,
"tables": [
{
"name": "ProductData",
"row_count": 1523
},
{
"name": "CustomerData",
"row_count": 847
},
{
"name": "OrderData",
"row_count": 3421
}
],
"table_count": 3
}
5. ColumnInfo
Informationen über eine Tabellenspalte.
Felder:
| Feld | Typ | Beschreibung |
|---|---|---|
name |
str |
Spaltenname |
type |
str |
Datentyp der Spalte |
nullable |
bool |
Ob die Spalte NULL-Werte enthalten kann |
primary_key |
bool |
Ob die Spalte ein Primary Key ist |
6. TableSchemaResponse
Response-Schema für detaillierte Tabellenstruktur-Informationen.
Felder:
| Feld | Typ | Beschreibung |
|---|---|---|
success |
bool |
Gibt an, ob die Schema-Abfrage erfolgreich war |
table_name |
str |
Name der Tabelle |
columns |
List[ColumnInfo] |
Liste aller Spalten in der Tabelle |
row_count |
int |
Anzahl der Zeilen in der Tabelle |
sample_data |
List[Dict[str, Any]] |
Beispieldaten (bis zu 5 Zeilen) |
Beispiel:
{
"success": true,
"table_name": "ProductData",
"columns": [
{
"name": "ProductID",
"type": "INTEGER",
"nullable": false,
"primary_key": true
},
{
"name": "ProductName",
"type": "TEXT",
"nullable": false,
"primary_key": false
},
{
"name": "Supplier",
"type": "TEXT",
"nullable": true,
"primary_key": false
},
{
"name": "Price",
"type": "REAL",
"nullable": true,
"primary_key": false
}
],
"row_count": 1523,
"sample_data": [
{
"ProductID": 1,
"ProductName": "Widget A",
"Supplier": "Acme Corp",
"Price": 12.50
},
{
"ProductID": 2,
"ProductName": "Gadget B",
"Supplier": "TechCo",
"Price": 24.99
},
{
"ProductID": 3,
"ProductName": "Tool C",
"Supplier": "Unknown",
"Price": 8.75
}
]
}
Query-Workflow
sequenceDiagram
participant Client
participant API
participant SQLite
rect rgb(200, 220, 250)
Note over Client,SQLite: Szenario 1: SQL Query ausführen
Client->>API: POST /dataquery/query
Note over Client,API: SqlQueryRequest
API->>SQLite: SELECT ausführen
SQLite-->>API: Ergebnisse
API-->>Client: SqlQueryResponse
Note over Client,API: data, columns, row_count
end
rect rgb(220, 250, 220)
Note over Client,SQLite: Szenario 2: Datenbank-Schema abrufen
Client->>API: GET /dataquery/schema
API->>SQLite: Tabellen auflisten
SQLite-->>API: Tabellenliste
API-->>Client: DatabaseSchemaResponse
Note over Client,API: tables, table_count
end
rect rgb(250, 220, 220)
Note over Client,SQLite: Szenario 3: Tabellenstruktur abrufen
Client->>API: GET /dataquery/table/{table_name}
API->>SQLite: Spalten + Sample Data abrufen
SQLite-->>API: Struktur + Daten
API-->>Client: TableSchemaResponse
Note over Client,API: columns, sample_data
end
Gesamtarchitektur
graph TB
subgraph "External Data Source"
PBI[Power BI Dataset]
end
subgraph "Gateway Preprocessing API"
subgraph "Data Processor Service"
DP_Router[Router]
DP_Service[Service]
DP_Schemas[Schemas]
DP_Domain[Domain Layer]
end
subgraph "Data Query Service"
DQ_Router[Router]
DQ_Service[Service]
DQ_Schemas[Schemas]
end
end
subgraph "Local Storage"
DB[(SQLite Database)]
end
subgraph "Clients"
Client1[Web App]
Client2[BI Tool]
Client3[API Consumer]
end
PBI -->|Power BI Reader| DP_Domain
DP_Router --> DP_Service
DP_Service --> DP_Domain
DP_Domain -->|Save| DB
DQ_Router --> DQ_Service
DQ_Service -->|Query| DB
Client1 -->|POST /dataprocessor/update-db-with-config| DP_Router
Client2 -->|POST /dataquery/query| DQ_Router
Client3 -->|GET /dataquery/schema| DQ_Router
style PBI fill:#f9f,stroke:#333,stroke-width:2px
style DB fill:#bbf,stroke:#333,stroke-width:3px
style DP_Schemas fill:#ffa,stroke:#333,stroke-width:2px
style DQ_Schemas fill:#ffa,stroke:#333,stroke-width:2px
API-Endpunkte Übersicht
Data Processor Endpoints
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|---|---|---|---|---|
| POST | /dataprocessor/update-db |
- | UpdateDbResponse |
Aktualisiert DB mit YAML-Config |
| POST | /dataprocessor/update-db-with-config |
PreprocessingConfigSchema |
UpdateDbWithConfigResponse |
Aktualisiert DB mit JSON-Config |
Data Query Endpoints
| Methode | Endpoint | Request Schema | Response Schema | Beschreibung |
|---|---|---|---|---|
| POST | /dataquery/query |
SqlQueryRequest |
SqlQueryResponse |
Führt SQL-Query aus |
| GET | /dataquery/schema |
- | DatabaseSchemaResponse |
Gibt DB-Schema zurück |
| GET | /dataquery/table/{table_name} |
- | TableSchemaResponse |
Gibt Tabellenstruktur zurück |
Typische Use Cases
Use Case 1: Daten aus Power BI laden und verarbeiten
flowchart LR
A[Client] -->|1. POST PreprocessingConfigSchema| B[API]
B -->|2. Daten abrufen| C[Power BI]
C -->|3. Raw Data| B
B -->|4. Preprocessing| B
B -->|5. Speichern| D[(SQLite)]
B -->|6. UpdateDbWithConfigResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
style C fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#bbf,stroke:#333,stroke-width:2px
Use Case 2: Daten abfragen
flowchart LR
A[Client] -->|1. POST SqlQueryRequest| B[API]
B -->|2. SELECT Query| C[(SQLite)]
C -->|3. Ergebnisse| B
B -->|4. SqlQueryResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
style C fill:#bbf,stroke:#333,stroke-width:2px
Use Case 3: Datenbank-Struktur erkunden
flowchart TD
A[Client] -->|1. GET /schema| B[API]
B -->|2. DatabaseSchemaResponse| A
A -->|3. GET /table/ProductData| B
B -->|4. TableSchemaResponse| A
A -->|5. POST SqlQueryRequest| B
B -->|6. SqlQueryResponse| A
style A fill:#bfb,stroke:#333,stroke-width:2px
Validierung und Fehlerbehandlung
Alle Schemas verwenden Pydantic für automatische Validierung:
- Typsicherheit: Alle Felder werden auf korrekte Typen geprüft
- Required Fields: Pflichtfelder müssen vorhanden sein
- Min/Max Validierung: z.B.
min_items=1fürtables - String Validierung: z.B.
min_length=1für SQL-Queries - Custom Validation: Zusätzliche Business-Logik in den Services
Bei Validierungsfehlern gibt die API einen HTTP 422 (Unprocessable Entity) mit detaillierten Fehlermeldungen zurück.