feat: yaml-free preprocessing
This commit is contained in:
parent
833df786a8
commit
56869051df
5 changed files with 481 additions and 2 deletions
|
|
@ -1,5 +1,9 @@
|
|||
# Preprocessor Configuration
|
||||
# Path to the preprocessor configuration YAML file (relative to project root)
|
||||
# OPTIONAL: Only if you want to use /api/v1/dataprocessor/update-db endpoint
|
||||
# Otherwise you can directly pass preprocessor config in the request body
|
||||
# at /api/v1/dataprocessor/update-db-with-config
|
||||
# Example YAML: pp-config.yaml in the src/ directory
|
||||
PP_CONFIG_PATH="src/pp-config.yaml"
|
||||
|
||||
# API Keys
|
||||
|
|
|
|||
|
|
@ -190,6 +190,75 @@ class Preprocessor:
|
|||
|
||||
return cls(table_configs=table_configs)
|
||||
|
||||
@classmethod
|
||||
async def create_from_config(cls, *, config: Dict[str, Any]) -> "Preprocessor":
|
||||
"""Create a Preprocessor instance from a configuration dictionary.
|
||||
|
||||
This method allows creating a Preprocessor without relying on a YAML file,
|
||||
enabling dynamic configuration via API requests with JSON payloads.
|
||||
|
||||
The configuration dictionary should follow the same structure as the YAML
|
||||
configuration file, containing a "tables" key with a list of table
|
||||
configurations.
|
||||
|
||||
Args:
|
||||
config: Dictionary containing preprocessing configuration with structure:
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": str,
|
||||
"powerbi_table_name": str,
|
||||
"steps": [
|
||||
{step_type: {param1: value1, ...}},
|
||||
...
|
||||
]
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
Returns:
|
||||
A new Preprocessor instance configured with the provided tables.
|
||||
|
||||
Raises:
|
||||
ValueError: If the configuration dictionary is invalid or missing required keys.
|
||||
|
||||
Example:
|
||||
>>> config = {
|
||||
... "tables": [
|
||||
... {
|
||||
... "name": "Sales",
|
||||
... "powerbi_table_name": "sales_raw",
|
||||
... "steps": [
|
||||
... {"keep": {"columns": ["Product", "Amount"]}},
|
||||
... {"fillna": {"column": "Amount", "value": 0}}
|
||||
... ]
|
||||
... }
|
||||
... ]
|
||||
... }
|
||||
>>> preprocessor = await Preprocessor.create_from_config(config=config)
|
||||
"""
|
||||
if not isinstance(config, dict):
|
||||
raise ValueError("Configuration must be a dictionary")
|
||||
|
||||
if "tables" not in config:
|
||||
raise ValueError("Configuration must contain a 'tables' key")
|
||||
|
||||
# Parse table configurations
|
||||
table_configs = []
|
||||
for table_data in config.get("tables", []):
|
||||
if not isinstance(table_data, dict):
|
||||
raise ValueError("Each table configuration must be a dictionary")
|
||||
|
||||
table_config = TableConfig(
|
||||
name=table_data.get("name", ""),
|
||||
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||
steps=table_data.get("steps", []),
|
||||
)
|
||||
table_configs.append(table_config)
|
||||
|
||||
return cls(table_configs=table_configs)
|
||||
|
||||
def get_table_configs(self) -> List[TableConfig]:
|
||||
"""Get all table configurations.
|
||||
|
||||
|
|
|
|||
|
|
@ -2,10 +2,14 @@
|
|||
|
||||
# Set up router
|
||||
import logging
|
||||
from fastapi import APIRouter
|
||||
from fastapi import APIRouter, Body
|
||||
from fastapi import Security
|
||||
|
||||
from src.dataprocessor.schemas import UpdateDbResponse
|
||||
from src.dataprocessor.schemas import (
|
||||
UpdateDbResponse,
|
||||
PreprocessingConfigSchema,
|
||||
UpdateDbWithConfigResponse,
|
||||
)
|
||||
from src.dependencies import require_pp_api_key
|
||||
from src.dataprocessor.service import DataProcessorService
|
||||
|
||||
|
|
@ -21,3 +25,171 @@ async def update_db(_: None = Security(require_pp_api_key)) -> UpdateDbResponse:
|
|||
service = await DataProcessorService.create()
|
||||
await service.update_database()
|
||||
return UpdateDbResponse(success=True)
|
||||
|
||||
|
||||
@router.post("/update-db-with-config")
|
||||
async def update_db_with_config(
|
||||
config: PreprocessingConfigSchema = Body(...),
|
||||
_: None = Security(require_pp_api_key),
|
||||
) -> UpdateDbWithConfigResponse:
|
||||
"""Update the database using a JSON configuration instead of a YAML file.
|
||||
|
||||
This endpoint provides a flexible, dynamic way to preprocess and update your
|
||||
database without relying on a static YAML configuration file. You can specify
|
||||
the entire preprocessing pipeline in the request body, allowing for on-demand
|
||||
custom preprocessing operations.
|
||||
|
||||
## How It Works
|
||||
|
||||
1. **Reads data from Power BI**: For each table specified in the configuration,
|
||||
data is fetched from the corresponding Power BI dataset table.
|
||||
|
||||
2. **Applies preprocessing steps**: Each table configuration includes a list of
|
||||
preprocessing steps that are applied sequentially to transform the data.
|
||||
|
||||
3. **Saves to local database**: The processed data is saved to the local SQLite
|
||||
database with the specified table name.
|
||||
|
||||
## Available Preprocessing Steps
|
||||
|
||||
The following preprocessing steps are supported. Each step is specified as a
|
||||
dictionary with the step name as the key and its parameters as the value:
|
||||
|
||||
### keep
|
||||
Keep only specified columns from the DataFrame.
|
||||
- **Parameters**:
|
||||
- `columns` (List[str]): List of column names to retain
|
||||
- **Example**:
|
||||
```json
|
||||
{"keep": {"columns": ["ProductID", "ProductName", "Price"]}}
|
||||
```
|
||||
|
||||
### fillna
|
||||
Fill missing (NaN) values in a specified column.
|
||||
- **Parameters**:
|
||||
- `column` (str): Name of the column to fill
|
||||
- `value` (Any): Value to use for filling NaN entries
|
||||
- **Example**:
|
||||
```json
|
||||
{"fillna": {"column": "Supplier", "value": "Unknown"}}
|
||||
```
|
||||
|
||||
### to_numeric
|
||||
Convert a column to numeric type.
|
||||
- **Parameters**:
|
||||
- `column` (str): Name of the column to convert
|
||||
- `errors` (str, optional): How to handle conversion errors
|
||||
- "coerce": Convert invalid values to NaN
|
||||
- "raise": Raise an exception for invalid values
|
||||
- "ignore": Leave invalid values unchanged
|
||||
- **Example**:
|
||||
```json
|
||||
{"to_numeric": {"column": "Price", "errors": "coerce"}}
|
||||
```
|
||||
|
||||
### dropna
|
||||
Drop rows that contain missing values in specified columns.
|
||||
- **Parameters**:
|
||||
- `subset` (List[str]): List of column names to check for NaN values
|
||||
- **Example**:
|
||||
```json
|
||||
{"dropna": {"subset": ["ProductID", "Price"]}}
|
||||
```
|
||||
|
||||
## Request Body Structure
|
||||
|
||||
The request body must contain a `tables` array, where each table object specifies:
|
||||
- `name`: The name for the table in the local SQLite database
|
||||
- `powerbi_table_name`: The source table name in the Power BI dataset
|
||||
- `steps`: An ordered list of preprocessing steps to apply
|
||||
|
||||
## Complete Example
|
||||
|
||||
```json
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": "ProductData",
|
||||
"powerbi_table_name": "products_raw",
|
||||
"steps": [
|
||||
{
|
||||
"keep": {
|
||||
"columns": [
|
||||
"ProductID",
|
||||
"ProductName",
|
||||
"Supplier",
|
||||
"Stock",
|
||||
"Unit",
|
||||
"Price"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"fillna": {
|
||||
"column": "Supplier",
|
||||
"value": "Unknown"
|
||||
}
|
||||
},
|
||||
{
|
||||
"to_numeric": {
|
||||
"column": "Price",
|
||||
"errors": "coerce"
|
||||
}
|
||||
},
|
||||
{
|
||||
"dropna": {
|
||||
"subset": [
|
||||
"ProductID",
|
||||
"ProductName",
|
||||
"Stock",
|
||||
"Unit",
|
||||
"Price"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Differences from /update-db Endpoint
|
||||
|
||||
- **No YAML file dependency**: Configuration is provided directly in the request
|
||||
- **Dynamic configuration**: Different preprocessing pipelines can be specified per request
|
||||
- **Warnings included**: Response includes any warnings encountered during preprocessing
|
||||
- **Table count**: Response includes the number of tables successfully processed
|
||||
|
||||
## Response
|
||||
|
||||
Returns a `UpdateDbWithConfigResponse` containing:
|
||||
- `success`: Boolean indicating if the operation was successful
|
||||
- `tables_processed`: Number of tables that were successfully processed
|
||||
- `warnings`: List of any warnings encountered during preprocessing (e.g., missing columns)
|
||||
|
||||
## Error Handling
|
||||
|
||||
The endpoint will raise an error if:
|
||||
- The configuration structure is invalid
|
||||
- No table configurations are provided
|
||||
- No data is returned from Power BI for a specified table
|
||||
- Preprocessing results in an empty DataFrame
|
||||
- Database save operations fail
|
||||
|
||||
Args:
|
||||
config: The preprocessing configuration specifying tables and their processing steps
|
||||
_: Security dependency requiring valid API key
|
||||
|
||||
Returns:
|
||||
UpdateDbWithConfigResponse with operation status, table count, and any warnings
|
||||
|
||||
Raises:
|
||||
HTTPException: If authentication fails or processing errors occur
|
||||
"""
|
||||
service = await DataProcessorService.create()
|
||||
tables_processed, warnings = await service.update_database_with_config(
|
||||
config=config.model_dump()
|
||||
)
|
||||
return UpdateDbWithConfigResponse(
|
||||
success=True, tables_processed=tables_processed, warnings=warnings
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
"""Schemas for the data processor service."""
|
||||
|
||||
from typing import Any, Dict, List
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
|
|
@ -7,3 +8,135 @@ class UpdateDbResponse(BaseModel):
|
|||
success: bool = Field(
|
||||
..., description="Indicates if the database update was successful."
|
||||
)
|
||||
|
||||
|
||||
class TableConfigSchema(BaseModel):
|
||||
"""Schema for a single table configuration.
|
||||
|
||||
Defines how a table should be read from Power BI and preprocessed.
|
||||
|
||||
Attributes:
|
||||
name: The name to use for the table in the local SQLite database
|
||||
powerbi_table_name: The name of the source table in Power BI dataset
|
||||
steps: List of preprocessing steps to apply to the table data
|
||||
"""
|
||||
|
||||
name: str = Field(
|
||||
..., description="Name for the table in the local database", example="Data"
|
||||
)
|
||||
powerbi_table_name: str = Field(
|
||||
...,
|
||||
description="Name of the table in the Power BI dataset",
|
||||
example="data_full",
|
||||
)
|
||||
steps: List[Dict[str, Any]] = Field(
|
||||
default_factory=list,
|
||||
description="List of preprocessing steps to apply",
|
||||
example=[
|
||||
{"keep": {"columns": ["col1", "col2"]}},
|
||||
{"fillna": {"column": "col1", "value": "Unknown"}},
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class PreprocessingConfigSchema(BaseModel):
|
||||
"""Schema for the complete preprocessing configuration.
|
||||
|
||||
This schema defines the structure for JSON-based preprocessing configuration,
|
||||
replacing the need for a YAML configuration file. It allows dynamic
|
||||
configuration of table preprocessing operations per API request.
|
||||
|
||||
The configuration supports multiple tables, each with its own set of
|
||||
preprocessing steps. Available preprocessing steps include:
|
||||
|
||||
- **keep**: Keep only specified columns
|
||||
- Parameters: columns (List[str])
|
||||
- Example: {"keep": {"columns": ["Name", "Price", "Quantity"]}}
|
||||
|
||||
- **fillna**: Fill missing values in a column
|
||||
- Parameters: column (str), value (Any)
|
||||
- Example: {"fillna": {"column": "Supplier", "value": "Unknown"}}
|
||||
|
||||
- **to_numeric**: Convert a column to numeric type
|
||||
- Parameters: column (str), errors (str, optional)
|
||||
- Example: {"to_numeric": {"column": "Price", "errors": "coerce"}}
|
||||
|
||||
- **dropna**: Drop rows with missing values in specified columns
|
||||
- Parameters: subset (List[str])
|
||||
- Example: {"dropna": {"subset": ["Name", "Price"]}}
|
||||
|
||||
Attributes:
|
||||
tables: List of table configurations to process
|
||||
|
||||
Example Request Body:
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": "ProductData",
|
||||
"powerbi_table_name": "products_raw",
|
||||
"steps": [
|
||||
{
|
||||
"keep": {
|
||||
"columns": [
|
||||
"ProductID",
|
||||
"ProductName",
|
||||
"Supplier",
|
||||
"Stock",
|
||||
"Unit",
|
||||
"Price"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"fillna": {
|
||||
"column": "Supplier",
|
||||
"value": "Unknown"
|
||||
}
|
||||
},
|
||||
{
|
||||
"to_numeric": {
|
||||
"column": "Price",
|
||||
"errors": "coerce"
|
||||
}
|
||||
},
|
||||
{
|
||||
"dropna": {
|
||||
"subset": [
|
||||
"ProductID",
|
||||
"ProductName",
|
||||
"Stock",
|
||||
"Unit",
|
||||
"Price"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
tables: List[TableConfigSchema] = Field(
|
||||
..., description="List of table configurations to process", min_items=1
|
||||
)
|
||||
|
||||
|
||||
class UpdateDbWithConfigResponse(BaseModel):
|
||||
"""Response schema for the JSON-based database update endpoint.
|
||||
|
||||
Attributes:
|
||||
success: Indicates if the database update was successful
|
||||
tables_processed: Number of tables that were processed
|
||||
warnings: List of any warnings encountered during preprocessing
|
||||
"""
|
||||
|
||||
success: bool = Field(
|
||||
..., description="Indicates if the database update was successful"
|
||||
)
|
||||
tables_processed: int = Field(
|
||||
..., description="Number of tables that were successfully processed"
|
||||
)
|
||||
warnings: List[str] = Field(
|
||||
default_factory=list,
|
||||
description="List of warnings encountered during preprocessing",
|
||||
)
|
||||
|
|
|
|||
|
|
@ -72,3 +72,104 @@ class DataProcessorService:
|
|||
|
||||
# Step 4: Update the local SQLite database
|
||||
await self.data_saver.save_table(df, table_config.name, overwrite=True)
|
||||
|
||||
async def update_database_with_config(
|
||||
self, *, config: dict
|
||||
) -> tuple[int, list[str]]:
|
||||
"""Update the database using a provided JSON configuration.
|
||||
|
||||
This method processes tables based on a configuration provided directly
|
||||
as a dictionary (typically from a JSON request body), without relying
|
||||
on a YAML configuration file. This enables dynamic, on-demand preprocessing
|
||||
with custom configurations per API request.
|
||||
|
||||
The method follows the same workflow as update_database():
|
||||
1. Reads data from Power BI for each configured table
|
||||
2. Applies the specified preprocessing steps
|
||||
3. Saves the processed data to the local SQLite database
|
||||
|
||||
Args:
|
||||
config: Dictionary containing preprocessing configuration with structure:
|
||||
{
|
||||
"tables": [
|
||||
{
|
||||
"name": str,
|
||||
"powerbi_table_name": str,
|
||||
"steps": [
|
||||
{step_type: {param1: value1, ...}},
|
||||
...
|
||||
]
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
- int: Number of tables successfully processed
|
||||
- List[str]: List of warnings collected during preprocessing
|
||||
|
||||
Raises:
|
||||
RuntimeError: If no table configurations are found or if data processing fails.
|
||||
ValueError: If the configuration structure is invalid.
|
||||
|
||||
Example:
|
||||
>>> config = {
|
||||
... "tables": [
|
||||
... {
|
||||
... "name": "Sales",
|
||||
... "powerbi_table_name": "sales_raw",
|
||||
... "steps": [
|
||||
... {"keep": {"columns": ["Product", "Amount"]}},
|
||||
... {"dropna": {"subset": ["Product", "Amount"]}}
|
||||
... ]
|
||||
... }
|
||||
... ]
|
||||
... }
|
||||
>>> service = await DataProcessorService.create()
|
||||
>>> tables_processed, warnings = await service.update_database_with_config(
|
||||
... config=config
|
||||
... )
|
||||
"""
|
||||
# Create a Preprocessor from the provided configuration
|
||||
preprocessor = await Preprocessor.create_from_config(config=config)
|
||||
table_configs = preprocessor.get_table_configs()
|
||||
|
||||
if not table_configs:
|
||||
raise RuntimeError("No table configurations found in provided config.")
|
||||
|
||||
all_warnings = []
|
||||
tables_processed = 0
|
||||
|
||||
for table_config in table_configs:
|
||||
# Step 1: Create PowerBIReader for this table
|
||||
power_bi_reader = await PowerBIReader.create(
|
||||
dataset_id=settings.POWERBI_DATASET_ID,
|
||||
access_token=self.access_token,
|
||||
table_name=table_config.powerbi_table_name,
|
||||
)
|
||||
|
||||
# Step 2: Read data from Power BI
|
||||
df = await power_bi_reader.read_data()
|
||||
if df.empty:
|
||||
raise RuntimeError(
|
||||
f"No data read from Power BI for table '{table_config.name}'."
|
||||
)
|
||||
|
||||
# Step 3: Preprocess the data using this table's steps
|
||||
df = await preprocessor.preprocess(df, steps=table_config.steps)
|
||||
if df.empty:
|
||||
raise RuntimeError(
|
||||
f"No data returned from preprocessing for table '{table_config.name}'."
|
||||
)
|
||||
|
||||
# Collect any warnings from preprocessing
|
||||
if preprocessor.last_report:
|
||||
all_warnings.extend(preprocessor.last_report)
|
||||
|
||||
# Step 4: Update the local SQLite database
|
||||
await self.data_saver.save_table(df, table_config.name, overwrite=True)
|
||||
|
||||
tables_processed += 1
|
||||
|
||||
return tables_processed, all_warnings
|
||||
|
|
|
|||
Loading…
Reference in a new issue