No description
Find a file
ValueOn AG b9e73a728d
Some checks failed
Deploy althaus-preprocessing / build (push) Failing after 23s
Deploy althaus-preprocessing / deploy (push) Has been skipped
Deploy poweron-preprocessing / build (push) Failing after 2s
Deploy poweron-preprocessing / deploy (push) Has been skipped
refactor: migrate to Forgejo workflows, remove GitHub Actions
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 02:34:22 +02:00
.forgejo/workflows refactor: migrate to Forgejo workflows, remove GitHub Actions 2026-05-24 02:34:22 +02:00
src finished preprocessing batching 2025-12-23 13:04:46 +01:00
tests chore: enforce query row limit 2025-11-05 11:02:24 +01:00
.env.example feat: yaml-free preprocessing 2025-10-27 16:56:04 +01:00
.gitignore chore: final touches for prod 2025-10-13 16:10:50 +02:00
.python-version feat: basic project setup 2025-09-24 12:15:26 +02:00
app.py fix: try fix for data dir in Azure 2025-10-15 11:24:56 +02:00
pyproject.toml chore: final touches for prod 2025-10-13 16:10:50 +02:00
README.md finished preprocessing batching 2025-12-23 13:04:46 +01:00
requirements.txt chore: prepare deployment 2025-10-15 10:33:20 +02:00
startup.txt chore: prepare deployment 2025-10-15 10:33:20 +02:00
uv.lock chore: final touches for prod 2025-10-13 16:10:50 +02:00

Gateway Preprocessing - Datenschnittstellen (Datenmodell)

Übersicht

Dieses Projekt bietet zwei Hauptservices mit klar definierten Datenschnittstellen:

  1. Data Processor Service (/dataprocessor) - Lädt Daten aus Power BI, verarbeitet sie und speichert sie in SQLite
  2. Data Query Service (/dataquery) - Ermöglicht SQL-Abfragen auf die gespeicherten Daten
graph LR
    A[Power BI Dataset] -->|Data Processor| B[SQLite Database]
    B -->|Data Query| C[Client/API Consumer]
    
    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#bbf,stroke:#333,stroke-width:2px
    style C fill:#bfb,stroke:#333,stroke-width:2px

Data Processor Service

Der Data Processor Service verarbeitet Daten aus Power BI und speichert sie in einer lokalen SQLite-Datenbank. Die Schemas befinden sich in src/dataprocessor/schemas.py.

Datenmodell-Hierarchie

classDiagram
    class PreprocessingConfigSchema {
        +List~TableConfigSchema~ tables
    }
    
    class TableConfigSchema {
        +str name
        +str powerbi_table_name
        +List~str~ measures
        +List~str~ group_by_columns
        +List~Dict~ steps
    }
    
    class UpdateDbWithConfigResponse {
        +bool success
        +int tables_processed
        +List~str~ warnings
    }
    
    class UpdateDbResponse {
        +bool success
    }
    
    PreprocessingConfigSchema "1" *-- "1..*" TableConfigSchema : enthält

1. TableConfigSchema

Definiert die Konfiguration für eine einzelne Tabelle, die aus Power BI gelesen und verarbeitet werden soll.

Felder:

Feld Typ Beschreibung Beispiel
name str Name der Tabelle in der lokalen SQLite-Datenbank "ProductData"
powerbi_table_name str Name der Quelltabelle im Power BI Dataset "products_raw"
measures List[str] Liste von Power BI Measures, die abgerufen werden sollen (optional) ["EP in CHF", "Gesamtbetrag"]
group_by_columns List[str] Spalten für Gruppierung bei Measures (triggert SUMMARIZECOLUMNS) ["m_Artikel", "Kategorie"]
steps List[Dict[str, Any]] Liste von Preprocessing-Schritten siehe unten

Unterstützte Preprocessing-Schritte:

graph TD
    A[Preprocessing Steps] --> B[keep]
    A --> C[fillna]
    A --> D[to_numeric]
    A --> E[dropna]
    
    B --> B1["columns: List[str]<br/>Behält nur angegebene Spalten"]
    C --> C1["column: str, value: Any<br/>Füllt fehlende Werte"]
    D --> D1["column: str, errors: str<br/>Konvertiert zu numerisch"]
    E --> E1["subset: List[str]<br/>Entfernt Zeilen mit NaN"]
    
    style A fill:#f96,stroke:#333,stroke-width:3px
    style B fill:#9cf,stroke:#333,stroke-width:2px
    style C fill:#9cf,stroke:#333,stroke-width:2px
    style D fill:#9cf,stroke:#333,stroke-width:2px
    style E fill:#9cf,stroke:#333,stroke-width:2px

Beispiel:

{
  "name": "ProductData",
  "powerbi_table_name": "products_raw",
  "measures": ["Total Sales", "Average Price"],
  "group_by_columns": ["Category", "Supplier"],
  "steps": [
    {
      "keep": {
        "columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
      }
    },
    {
      "fillna": {
        "column": "Supplier",
        "value": "Unknown"
      }
    },
    {
      "to_numeric": {
        "column": "Price",
        "errors": "coerce"
      }
    },
    {
      "dropna": {
        "subset": ["ProductID", "ProductName", "Price"]
      }
    }
  ]
}

2. PreprocessingConfigSchema

Komplette Konfiguration für einen Preprocessing-Request. Kann mehrere Tabellen gleichzeitig verarbeiten.

Felder:

Feld Typ Beschreibung Validierung
tables List[TableConfigSchema] Liste von Tabellenkonfigurationen min. 1 Tabelle

Vollständiges Request-Beispiel:

{
  "tables": [
    {
      "name": "ProductData",
      "powerbi_table_name": "products_raw",
      "steps": [
        {
          "keep": {
            "columns": ["ProductID", "ProductName", "Supplier", "Stock", "Unit", "Price"]
          }
        },
        {
          "fillna": {
            "column": "Supplier",
            "value": "Unknown"
          }
        },
        {
          "to_numeric": {
            "column": "Price",
            "errors": "coerce"
          }
        },
        {
          "dropna": {
            "subset": ["ProductID", "ProductName", "Stock", "Unit", "Price"]
          }
        }
      ]
    },
    {
      "name": "CustomerData",
      "powerbi_table_name": "customers_raw",
      "steps": [
        {
          "keep": {
            "columns": ["CustomerID", "Name", "Email", "Region"]
          }
        },
        {
          "fillna": {
            "column": "Region",
            "value": "Unknown"
          }
        }
      ]
    }
  ]
}

3. UpdateDbResponse

Einfache Response für Standard-DB-Updates (YAML-basiert).

Felder:

Feld Typ Beschreibung
success bool Gibt an, ob das Update erfolgreich war

Beispiel:

{
  "success": true
}

4. UpdateDbWithConfigResponse

Detaillierte Response für JSON-basierte DB-Updates mit Konfiguration.

Felder:

Feld Typ Beschreibung
success bool Gibt an, ob das Update erfolgreich war
tables_processed int Anzahl der erfolgreich verarbeiteten Tabellen
warnings List[str] Liste von Warnungen während der Verarbeitung

Beispiel:

{
  "success": true,
  "tables_processed": 2,
  "warnings": [
    "Table 'ProductData': 3 rows dropped due to missing values",
    "Table 'ProductData': Column 'Price' had 5 non-numeric values coerced to NaN"
  ]
}

Preprocessing-Workflow

sequenceDiagram
    participant Client
    participant API
    participant PowerBI
    participant Preprocessor
    participant SQLite

    Client->>API: POST /dataprocessor/update-db-with-config
    Note over Client,API: PreprocessingConfigSchema
    
    loop Für jede Tabelle
        API->>PowerBI: Daten abrufen (powerbi_table_name)
        PowerBI-->>API: Raw DataFrame
        API->>Preprocessor: Preprocessing-Steps anwenden
        Preprocessor->>Preprocessor: keep → fillna → to_numeric → dropna
        Preprocessor-->>API: Verarbeiteter DataFrame
        API->>SQLite: Tabelle speichern (name)
    end
    
    API-->>Client: UpdateDbWithConfigResponse
    Note over Client,API: success, tables_processed, warnings

Data Query Service

Der Data Query Service ermöglicht SQL-Abfragen auf die gespeicherte SQLite-Datenbank. Die Schemas befinden sich in src/dataquery/schemas.py.

Datenmodell-Hierarchie

classDiagram
    class SqlQueryRequest {
        +str query
    }
    
    class SqlQueryResponse {
        +bool success
        +List~Dict~ data
        +List~str~ columns
        +int row_count
        +Optional~str~ message
    }
    
    class DatabaseSchemaResponse {
        +bool success
        +List~TableInfo~ tables
        +int table_count
    }
    
    class TableInfo {
        +str name
        +int row_count
    }
    
    class TableSchemaResponse {
        +bool success
        +str table_name
        +List~ColumnInfo~ columns
        +int row_count
        +List~Dict~ sample_data
    }
    
    class ColumnInfo {
        +str name
        +str type
        +bool nullable
        +bool primary_key
    }
    
    DatabaseSchemaResponse "1" *-- "*" TableInfo : enthält
    TableSchemaResponse "1" *-- "*" ColumnInfo : enthält

1. SqlQueryRequest

Request-Schema für SQL-Abfragen.

Felder:

Feld Typ Beschreibung Validierung
query str SQL-Query (nur SELECT erlaubt) min. 1 Zeichen

Beispiel:

{
  "query": "SELECT ProductID, ProductName, Price FROM ProductData WHERE Price > 10 ORDER BY Price DESC LIMIT 100"
}

2. SqlQueryResponse

Response-Schema für SQL-Query-Ergebnisse.

Felder:

Feld Typ Beschreibung
success bool Gibt an, ob die Query erfolgreich ausgeführt wurde
data List[Dict[str, Any]] Query-Ergebnisse als Liste von Dictionaries
columns List[str] Spaltennamen im Ergebnis
row_count int Anzahl der zurückgegebenen Zeilen
message Optional[str] Zusätzliche Informationen oder Fehlermeldung

Beispiel:

{
  "success": true,
  "columns": ["ProductID", "ProductName", "Price"],
  "row_count": 3,
  "data": [
    {
      "ProductID": 42,
      "ProductName": "Premium Widget",
      "Price": 99.99
    },
    {
      "ProductID": 15,
      "ProductName": "Deluxe Gadget",
      "Price": 79.50
    },
    {
      "ProductID": 8,
      "ProductName": "Standard Tool",
      "Price": 45.00
    }
  ],
  "message": null
}

3. TableInfo

Informationen über eine Datenbanktabelle.

Felder:

Feld Typ Beschreibung
name str Tabellenname
row_count int Anzahl der Zeilen in der Tabelle

4. DatabaseSchemaResponse

Response-Schema für Datenbank-Schema-Informationen.

Felder:

Feld Typ Beschreibung
success bool Gibt an, ob die Schema-Abfrage erfolgreich war
tables List[TableInfo] Liste aller Tabellen in der Datenbank
table_count int Gesamtanzahl der Tabellen

Beispiel:

{
  "success": true,
  "tables": [
    {
      "name": "ProductData",
      "row_count": 1523
    },
    {
      "name": "CustomerData",
      "row_count": 847
    },
    {
      "name": "OrderData",
      "row_count": 3421
    }
  ],
  "table_count": 3
}

5. ColumnInfo

Informationen über eine Tabellenspalte.

Felder:

Feld Typ Beschreibung
name str Spaltenname
type str Datentyp der Spalte
nullable bool Ob die Spalte NULL-Werte enthalten kann
primary_key bool Ob die Spalte ein Primary Key ist

6. TableSchemaResponse

Response-Schema für detaillierte Tabellenstruktur-Informationen.

Felder:

Feld Typ Beschreibung
success bool Gibt an, ob die Schema-Abfrage erfolgreich war
table_name str Name der Tabelle
columns List[ColumnInfo] Liste aller Spalten in der Tabelle
row_count int Anzahl der Zeilen in der Tabelle
sample_data List[Dict[str, Any]] Beispieldaten (bis zu 5 Zeilen)

Beispiel:

{
  "success": true,
  "table_name": "ProductData",
  "columns": [
    {
      "name": "ProductID",
      "type": "INTEGER",
      "nullable": false,
      "primary_key": true
    },
    {
      "name": "ProductName",
      "type": "TEXT",
      "nullable": false,
      "primary_key": false
    },
    {
      "name": "Supplier",
      "type": "TEXT",
      "nullable": true,
      "primary_key": false
    },
    {
      "name": "Price",
      "type": "REAL",
      "nullable": true,
      "primary_key": false
    }
  ],
  "row_count": 1523,
  "sample_data": [
    {
      "ProductID": 1,
      "ProductName": "Widget A",
      "Supplier": "Acme Corp",
      "Price": 12.50
    },
    {
      "ProductID": 2,
      "ProductName": "Gadget B",
      "Supplier": "TechCo",
      "Price": 24.99
    },
    {
      "ProductID": 3,
      "ProductName": "Tool C",
      "Supplier": "Unknown",
      "Price": 8.75
    }
  ]
}

Query-Workflow

sequenceDiagram
    participant Client
    participant API
    participant SQLite

    rect rgb(200, 220, 250)
        Note over Client,SQLite: Szenario 1: SQL Query ausführen
        Client->>API: POST /dataquery/query
        Note over Client,API: SqlQueryRequest
        API->>SQLite: SELECT ausführen
        SQLite-->>API: Ergebnisse
        API-->>Client: SqlQueryResponse
        Note over Client,API: data, columns, row_count
    end

    rect rgb(220, 250, 220)
        Note over Client,SQLite: Szenario 2: Datenbank-Schema abrufen
        Client->>API: GET /dataquery/schema
        API->>SQLite: Tabellen auflisten
        SQLite-->>API: Tabellenliste
        API-->>Client: DatabaseSchemaResponse
        Note over Client,API: tables, table_count
    end

    rect rgb(250, 220, 220)
        Note over Client,SQLite: Szenario 3: Tabellenstruktur abrufen
        Client->>API: GET /dataquery/table/{table_name}
        API->>SQLite: Spalten + Sample Data abrufen
        SQLite-->>API: Struktur + Daten
        API-->>Client: TableSchemaResponse
        Note over Client,API: columns, sample_data
    end

Gesamtarchitektur

graph TB
    subgraph "External Data Source"
        PBI[Power BI Dataset]
    end
    
    subgraph "Gateway Preprocessing API"
        subgraph "Data Processor Service"
            DP_Router[Router]
            DP_Service[Service]
            DP_Schemas[Schemas]
            DP_Domain[Domain Layer]
        end
        
        subgraph "Data Query Service"
            DQ_Router[Router]
            DQ_Service[Service]
            DQ_Schemas[Schemas]
        end
    end
    
    subgraph "Local Storage"
        DB[(SQLite Database)]
    end
    
    subgraph "Clients"
        Client1[Web App]
        Client2[BI Tool]
        Client3[API Consumer]
    end
    
    PBI -->|Power BI Reader| DP_Domain
    DP_Router --> DP_Service
    DP_Service --> DP_Domain
    DP_Domain -->|Save| DB
    
    DQ_Router --> DQ_Service
    DQ_Service -->|Query| DB
    
    Client1 -->|POST /dataprocessor/update-db-with-config| DP_Router
    Client2 -->|POST /dataquery/query| DQ_Router
    Client3 -->|GET /dataquery/schema| DQ_Router
    
    style PBI fill:#f9f,stroke:#333,stroke-width:2px
    style DB fill:#bbf,stroke:#333,stroke-width:3px
    style DP_Schemas fill:#ffa,stroke:#333,stroke-width:2px
    style DQ_Schemas fill:#ffa,stroke:#333,stroke-width:2px

API-Endpunkte Übersicht

Data Processor Endpoints

Methode Endpoint Request Schema Response Schema Beschreibung
POST /dataprocessor/update-db - UpdateDbResponse Aktualisiert DB mit YAML-Config
POST /dataprocessor/update-db-with-config PreprocessingConfigSchema UpdateDbWithConfigResponse Aktualisiert DB mit JSON-Config

Data Query Endpoints

Methode Endpoint Request Schema Response Schema Beschreibung
POST /dataquery/query SqlQueryRequest SqlQueryResponse Führt SQL-Query aus
GET /dataquery/schema - DatabaseSchemaResponse Gibt DB-Schema zurück
GET /dataquery/table/{table_name} - TableSchemaResponse Gibt Tabellenstruktur zurück

Typische Use Cases

Use Case 1: Daten aus Power BI laden und verarbeiten

flowchart LR
    A[Client] -->|1. POST PreprocessingConfigSchema| B[API]
    B -->|2. Daten abrufen| C[Power BI]
    C -->|3. Raw Data| B
    B -->|4. Preprocessing| B
    B -->|5. Speichern| D[(SQLite)]
    B -->|6. UpdateDbWithConfigResponse| A
    
    style A fill:#bfb,stroke:#333,stroke-width:2px
    style C fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#bbf,stroke:#333,stroke-width:2px

Use Case 2: Daten abfragen

flowchart LR
    A[Client] -->|1. POST SqlQueryRequest| B[API]
    B -->|2. SELECT Query| C[(SQLite)]
    C -->|3. Ergebnisse| B
    B -->|4. SqlQueryResponse| A
    
    style A fill:#bfb,stroke:#333,stroke-width:2px
    style C fill:#bbf,stroke:#333,stroke-width:2px

Use Case 3: Datenbank-Struktur erkunden

flowchart TD
    A[Client] -->|1. GET /schema| B[API]
    B -->|2. DatabaseSchemaResponse| A
    A -->|3. GET /table/ProductData| B
    B -->|4. TableSchemaResponse| A
    A -->|5. POST SqlQueryRequest| B
    B -->|6. SqlQueryResponse| A
    
    style A fill:#bfb,stroke:#333,stroke-width:2px

Validierung und Fehlerbehandlung

Alle Schemas verwenden Pydantic für automatische Validierung:

  • Typsicherheit: Alle Felder werden auf korrekte Typen geprüft
  • Required Fields: Pflichtfelder müssen vorhanden sein
  • Min/Max Validierung: z.B. min_items=1 für tables
  • String Validierung: z.B. min_length=1 für SQL-Queries
  • Custom Validation: Zusätzliche Business-Logik in den Services

Bei Validierungsfehlern gibt die API einen HTTP 422 (Unprocessable Entity) mit detaillierten Fehlermeldungen zurück.