Merge pull request #3 from valueonag/feat/file-free-config

feat: yaml-free preprocessing
This commit is contained in:
Christopher Gondek 2025-11-03 07:56:54 +01:00 committed by GitHub
commit 30f4b0e781
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 481 additions and 2 deletions

View file

@ -1,5 +1,9 @@
# Preprocessor Configuration
# Path to the preprocessor configuration YAML file (relative to project root)
# OPTIONAL: Only if you want to use /api/v1/dataprocessor/update-db endpoint
# Otherwise you can directly pass preprocessor config in the request body
# at /api/v1/dataprocessor/update-db-with-config
# Example YAML: pp-config.yaml in the src/ directory
PP_CONFIG_PATH="src/pp-config.yaml"
# API Keys

View file

@ -190,6 +190,75 @@ class Preprocessor:
return cls(table_configs=table_configs)
@classmethod
async def create_from_config(cls, *, config: Dict[str, Any]) -> "Preprocessor":
"""Create a Preprocessor instance from a configuration dictionary.
This method allows creating a Preprocessor without relying on a YAML file,
enabling dynamic configuration via API requests with JSON payloads.
The configuration dictionary should follow the same structure as the YAML
configuration file, containing a "tables" key with a list of table
configurations.
Args:
config: Dictionary containing preprocessing configuration with structure:
{
"tables": [
{
"name": str,
"powerbi_table_name": str,
"steps": [
{step_type: {param1: value1, ...}},
...
]
},
...
]
}
Returns:
A new Preprocessor instance configured with the provided tables.
Raises:
ValueError: If the configuration dictionary is invalid or missing required keys.
Example:
>>> config = {
... "tables": [
... {
... "name": "Sales",
... "powerbi_table_name": "sales_raw",
... "steps": [
... {"keep": {"columns": ["Product", "Amount"]}},
... {"fillna": {"column": "Amount", "value": 0}}
... ]
... }
... ]
... }
>>> preprocessor = await Preprocessor.create_from_config(config=config)
"""
if not isinstance(config, dict):
raise ValueError("Configuration must be a dictionary")
if "tables" not in config:
raise ValueError("Configuration must contain a 'tables' key")
# Parse table configurations
table_configs = []
for table_data in config.get("tables", []):
if not isinstance(table_data, dict):
raise ValueError("Each table configuration must be a dictionary")
table_config = TableConfig(
name=table_data.get("name", ""),
powerbi_table_name=table_data.get("powerbi_table_name", ""),
steps=table_data.get("steps", []),
)
table_configs.append(table_config)
return cls(table_configs=table_configs)
def get_table_configs(self) -> List[TableConfig]:
"""Get all table configurations.

View file

@ -2,10 +2,14 @@
# Set up router
import logging
from fastapi import APIRouter
from fastapi import APIRouter, Body
from fastapi import Security
from src.dataprocessor.schemas import UpdateDbResponse
from src.dataprocessor.schemas import (
UpdateDbResponse,
PreprocessingConfigSchema,
UpdateDbWithConfigResponse,
)
from src.dependencies import require_pp_api_key
from src.dataprocessor.service import DataProcessorService
@ -21,3 +25,171 @@ async def update_db(_: None = Security(require_pp_api_key)) -> UpdateDbResponse:
service = await DataProcessorService.create()
await service.update_database()
return UpdateDbResponse(success=True)
@router.post("/update-db-with-config")
async def update_db_with_config(
config: PreprocessingConfigSchema = Body(...),
_: None = Security(require_pp_api_key),
) -> UpdateDbWithConfigResponse:
"""Update the database using a JSON configuration instead of a YAML file.
This endpoint provides a flexible, dynamic way to preprocess and update your
database without relying on a static YAML configuration file. You can specify
the entire preprocessing pipeline in the request body, allowing for on-demand
custom preprocessing operations.
## How It Works
1. **Reads data from Power BI**: For each table specified in the configuration,
data is fetched from the corresponding Power BI dataset table.
2. **Applies preprocessing steps**: Each table configuration includes a list of
preprocessing steps that are applied sequentially to transform the data.
3. **Saves to local database**: The processed data is saved to the local SQLite
database with the specified table name.
## Available Preprocessing Steps
The following preprocessing steps are supported. Each step is specified as a
dictionary with the step name as the key and its parameters as the value:
### keep
Keep only specified columns from the DataFrame.
- **Parameters**:
- `columns` (List[str]): List of column names to retain
- **Example**:
```json
{"keep": {"columns": ["ProductID", "ProductName", "Price"]}}
```
### fillna
Fill missing (NaN) values in a specified column.
- **Parameters**:
- `column` (str): Name of the column to fill
- `value` (Any): Value to use for filling NaN entries
- **Example**:
```json
{"fillna": {"column": "Supplier", "value": "Unknown"}}
```
### to_numeric
Convert a column to numeric type.
- **Parameters**:
- `column` (str): Name of the column to convert
- `errors` (str, optional): How to handle conversion errors
- "coerce": Convert invalid values to NaN
- "raise": Raise an exception for invalid values
- "ignore": Leave invalid values unchanged
- **Example**:
```json
{"to_numeric": {"column": "Price", "errors": "coerce"}}
```
### dropna
Drop rows that contain missing values in specified columns.
- **Parameters**:
- `subset` (List[str]): List of column names to check for NaN values
- **Example**:
```json
{"dropna": {"subset": ["ProductID", "Price"]}}
```
## Request Body Structure
The request body must contain a `tables` array, where each table object specifies:
- `name`: The name for the table in the local SQLite database
- `powerbi_table_name`: The source table name in the Power BI dataset
- `steps`: An ordered list of preprocessing steps to apply
## Complete Example
```json
{
"tables": [
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"steps": [
{
"keep": {
"columns": [
"ProductID",
"ProductName",
"Supplier",
"Stock",
"Unit",
"Price"
]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": [
"ProductID",
"ProductName",
"Stock",
"Unit",
"Price"
]
}
}
]
}
]
}
```
## Differences from /update-db Endpoint
- **No YAML file dependency**: Configuration is provided directly in the request
- **Dynamic configuration**: Different preprocessing pipelines can be specified per request
- **Warnings included**: Response includes any warnings encountered during preprocessing
- **Table count**: Response includes the number of tables successfully processed
## Response
Returns a `UpdateDbWithConfigResponse` containing:
- `success`: Boolean indicating if the operation was successful
- `tables_processed`: Number of tables that were successfully processed
- `warnings`: List of any warnings encountered during preprocessing (e.g., missing columns)
## Error Handling
The endpoint will raise an error if:
- The configuration structure is invalid
- No table configurations are provided
- No data is returned from Power BI for a specified table
- Preprocessing results in an empty DataFrame
- Database save operations fail
Args:
config: The preprocessing configuration specifying tables and their processing steps
_: Security dependency requiring valid API key
Returns:
UpdateDbWithConfigResponse with operation status, table count, and any warnings
Raises:
HTTPException: If authentication fails or processing errors occur
"""
service = await DataProcessorService.create()
tables_processed, warnings = await service.update_database_with_config(
config=config.model_dump()
)
return UpdateDbWithConfigResponse(
success=True, tables_processed=tables_processed, warnings=warnings
)

View file

@ -1,5 +1,6 @@
"""Schemas for the data processor service."""
from typing import Any, Dict, List
from pydantic import BaseModel, Field
@ -7,3 +8,135 @@ class UpdateDbResponse(BaseModel):
success: bool = Field(
..., description="Indicates if the database update was successful."
)
class TableConfigSchema(BaseModel):
"""Schema for a single table configuration.
Defines how a table should be read from Power BI and preprocessed.
Attributes:
name: The name to use for the table in the local SQLite database
powerbi_table_name: The name of the source table in Power BI dataset
steps: List of preprocessing steps to apply to the table data
"""
name: str = Field(
..., description="Name for the table in the local database", example="Data"
)
powerbi_table_name: str = Field(
...,
description="Name of the table in the Power BI dataset",
example="data_full",
)
steps: List[Dict[str, Any]] = Field(
default_factory=list,
description="List of preprocessing steps to apply",
example=[
{"keep": {"columns": ["col1", "col2"]}},
{"fillna": {"column": "col1", "value": "Unknown"}},
],
)
class PreprocessingConfigSchema(BaseModel):
"""Schema for the complete preprocessing configuration.
This schema defines the structure for JSON-based preprocessing configuration,
replacing the need for a YAML configuration file. It allows dynamic
configuration of table preprocessing operations per API request.
The configuration supports multiple tables, each with its own set of
preprocessing steps. Available preprocessing steps include:
- **keep**: Keep only specified columns
- Parameters: columns (List[str])
- Example: {"keep": {"columns": ["Name", "Price", "Quantity"]}}
- **fillna**: Fill missing values in a column
- Parameters: column (str), value (Any)
- Example: {"fillna": {"column": "Supplier", "value": "Unknown"}}
- **to_numeric**: Convert a column to numeric type
- Parameters: column (str), errors (str, optional)
- Example: {"to_numeric": {"column": "Price", "errors": "coerce"}}
- **dropna**: Drop rows with missing values in specified columns
- Parameters: subset (List[str])
- Example: {"dropna": {"subset": ["Name", "Price"]}}
Attributes:
tables: List of table configurations to process
Example Request Body:
{
"tables": [
{
"name": "ProductData",
"powerbi_table_name": "products_raw",
"steps": [
{
"keep": {
"columns": [
"ProductID",
"ProductName",
"Supplier",
"Stock",
"Unit",
"Price"
]
}
},
{
"fillna": {
"column": "Supplier",
"value": "Unknown"
}
},
{
"to_numeric": {
"column": "Price",
"errors": "coerce"
}
},
{
"dropna": {
"subset": [
"ProductID",
"ProductName",
"Stock",
"Unit",
"Price"
]
}
}
]
}
]
}
"""
tables: List[TableConfigSchema] = Field(
..., description="List of table configurations to process", min_items=1
)
class UpdateDbWithConfigResponse(BaseModel):
"""Response schema for the JSON-based database update endpoint.
Attributes:
success: Indicates if the database update was successful
tables_processed: Number of tables that were processed
warnings: List of any warnings encountered during preprocessing
"""
success: bool = Field(
..., description="Indicates if the database update was successful"
)
tables_processed: int = Field(
..., description="Number of tables that were successfully processed"
)
warnings: List[str] = Field(
default_factory=list,
description="List of warnings encountered during preprocessing",
)

View file

@ -72,3 +72,104 @@ class DataProcessorService:
# Step 4: Update the local SQLite database
await self.data_saver.save_table(df, table_config.name, overwrite=True)
async def update_database_with_config(
self, *, config: dict
) -> tuple[int, list[str]]:
"""Update the database using a provided JSON configuration.
This method processes tables based on a configuration provided directly
as a dictionary (typically from a JSON request body), without relying
on a YAML configuration file. This enables dynamic, on-demand preprocessing
with custom configurations per API request.
The method follows the same workflow as update_database():
1. Reads data from Power BI for each configured table
2. Applies the specified preprocessing steps
3. Saves the processed data to the local SQLite database
Args:
config: Dictionary containing preprocessing configuration with structure:
{
"tables": [
{
"name": str,
"powerbi_table_name": str,
"steps": [
{step_type: {param1: value1, ...}},
...
]
},
...
]
}
Returns:
A tuple containing:
- int: Number of tables successfully processed
- List[str]: List of warnings collected during preprocessing
Raises:
RuntimeError: If no table configurations are found or if data processing fails.
ValueError: If the configuration structure is invalid.
Example:
>>> config = {
... "tables": [
... {
... "name": "Sales",
... "powerbi_table_name": "sales_raw",
... "steps": [
... {"keep": {"columns": ["Product", "Amount"]}},
... {"dropna": {"subset": ["Product", "Amount"]}}
... ]
... }
... ]
... }
>>> service = await DataProcessorService.create()
>>> tables_processed, warnings = await service.update_database_with_config(
... config=config
... )
"""
# Create a Preprocessor from the provided configuration
preprocessor = await Preprocessor.create_from_config(config=config)
table_configs = preprocessor.get_table_configs()
if not table_configs:
raise RuntimeError("No table configurations found in provided config.")
all_warnings = []
tables_processed = 0
for table_config in table_configs:
# Step 1: Create PowerBIReader for this table
power_bi_reader = await PowerBIReader.create(
dataset_id=settings.POWERBI_DATASET_ID,
access_token=self.access_token,
table_name=table_config.powerbi_table_name,
)
# Step 2: Read data from Power BI
df = await power_bi_reader.read_data()
if df.empty:
raise RuntimeError(
f"No data read from Power BI for table '{table_config.name}'."
)
# Step 3: Preprocess the data using this table's steps
df = await preprocessor.preprocess(df, steps=table_config.steps)
if df.empty:
raise RuntimeError(
f"No data returned from preprocessing for table '{table_config.name}'."
)
# Collect any warnings from preprocessing
if preprocessor.last_report:
all_warnings.extend(preprocessor.last_report)
# Step 4: Update the local SQLite database
await self.data_saver.save_table(df, table_config.name, overwrite=True)
tables_processed += 1
return tables_processed, all_warnings