From 56869051dfe96b43941273aa0d7f947fd073c08e Mon Sep 17 00:00:00 2001 From: Christopher Gondek Date: Mon, 27 Oct 2025 16:56:04 +0100 Subject: [PATCH] feat: yaml-free preprocessing --- .env.example | 4 + src/dataprocessor/domain/preprocessor.py | 69 +++++++++ src/dataprocessor/router.py | 176 ++++++++++++++++++++++- src/dataprocessor/schemas.py | 133 +++++++++++++++++ src/dataprocessor/service.py | 101 +++++++++++++ 5 files changed, 481 insertions(+), 2 deletions(-) diff --git a/.env.example b/.env.example index 4cc3816..fc6ab3c 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,9 @@ # Preprocessor Configuration # Path to the preprocessor configuration YAML file (relative to project root) +# OPTIONAL: Only if you want to use /api/v1/dataprocessor/update-db endpoint +# Otherwise you can directly pass preprocessor config in the request body +# at /api/v1/dataprocessor/update-db-with-config +# Example YAML: pp-config.yaml in the src/ directory PP_CONFIG_PATH="src/pp-config.yaml" # API Keys diff --git a/src/dataprocessor/domain/preprocessor.py b/src/dataprocessor/domain/preprocessor.py index 4b0e48b..fad50c8 100644 --- a/src/dataprocessor/domain/preprocessor.py +++ b/src/dataprocessor/domain/preprocessor.py @@ -190,6 +190,75 @@ class Preprocessor: return cls(table_configs=table_configs) + @classmethod + async def create_from_config(cls, *, config: Dict[str, Any]) -> "Preprocessor": + """Create a Preprocessor instance from a configuration dictionary. + + This method allows creating a Preprocessor without relying on a YAML file, + enabling dynamic configuration via API requests with JSON payloads. + + The configuration dictionary should follow the same structure as the YAML + configuration file, containing a "tables" key with a list of table + configurations. + + Args: + config: Dictionary containing preprocessing configuration with structure: + { + "tables": [ + { + "name": str, + "powerbi_table_name": str, + "steps": [ + {step_type: {param1: value1, ...}}, + ... + ] + }, + ... + ] + } + + Returns: + A new Preprocessor instance configured with the provided tables. + + Raises: + ValueError: If the configuration dictionary is invalid or missing required keys. + + Example: + >>> config = { + ... "tables": [ + ... { + ... "name": "Sales", + ... "powerbi_table_name": "sales_raw", + ... "steps": [ + ... {"keep": {"columns": ["Product", "Amount"]}}, + ... {"fillna": {"column": "Amount", "value": 0}} + ... ] + ... } + ... ] + ... } + >>> preprocessor = await Preprocessor.create_from_config(config=config) + """ + if not isinstance(config, dict): + raise ValueError("Configuration must be a dictionary") + + if "tables" not in config: + raise ValueError("Configuration must contain a 'tables' key") + + # Parse table configurations + table_configs = [] + for table_data in config.get("tables", []): + if not isinstance(table_data, dict): + raise ValueError("Each table configuration must be a dictionary") + + table_config = TableConfig( + name=table_data.get("name", ""), + powerbi_table_name=table_data.get("powerbi_table_name", ""), + steps=table_data.get("steps", []), + ) + table_configs.append(table_config) + + return cls(table_configs=table_configs) + def get_table_configs(self) -> List[TableConfig]: """Get all table configurations. diff --git a/src/dataprocessor/router.py b/src/dataprocessor/router.py index 2c63480..05fcd03 100644 --- a/src/dataprocessor/router.py +++ b/src/dataprocessor/router.py @@ -2,10 +2,14 @@ # Set up router import logging -from fastapi import APIRouter +from fastapi import APIRouter, Body from fastapi import Security -from src.dataprocessor.schemas import UpdateDbResponse +from src.dataprocessor.schemas import ( + UpdateDbResponse, + PreprocessingConfigSchema, + UpdateDbWithConfigResponse, +) from src.dependencies import require_pp_api_key from src.dataprocessor.service import DataProcessorService @@ -21,3 +25,171 @@ async def update_db(_: None = Security(require_pp_api_key)) -> UpdateDbResponse: service = await DataProcessorService.create() await service.update_database() return UpdateDbResponse(success=True) + + +@router.post("/update-db-with-config") +async def update_db_with_config( + config: PreprocessingConfigSchema = Body(...), + _: None = Security(require_pp_api_key), +) -> UpdateDbWithConfigResponse: + """Update the database using a JSON configuration instead of a YAML file. + + This endpoint provides a flexible, dynamic way to preprocess and update your + database without relying on a static YAML configuration file. You can specify + the entire preprocessing pipeline in the request body, allowing for on-demand + custom preprocessing operations. + + ## How It Works + + 1. **Reads data from Power BI**: For each table specified in the configuration, + data is fetched from the corresponding Power BI dataset table. + + 2. **Applies preprocessing steps**: Each table configuration includes a list of + preprocessing steps that are applied sequentially to transform the data. + + 3. **Saves to local database**: The processed data is saved to the local SQLite + database with the specified table name. + + ## Available Preprocessing Steps + + The following preprocessing steps are supported. Each step is specified as a + dictionary with the step name as the key and its parameters as the value: + + ### keep + Keep only specified columns from the DataFrame. + - **Parameters**: + - `columns` (List[str]): List of column names to retain + - **Example**: + ```json + {"keep": {"columns": ["ProductID", "ProductName", "Price"]}} + ``` + + ### fillna + Fill missing (NaN) values in a specified column. + - **Parameters**: + - `column` (str): Name of the column to fill + - `value` (Any): Value to use for filling NaN entries + - **Example**: + ```json + {"fillna": {"column": "Supplier", "value": "Unknown"}} + ``` + + ### to_numeric + Convert a column to numeric type. + - **Parameters**: + - `column` (str): Name of the column to convert + - `errors` (str, optional): How to handle conversion errors + - "coerce": Convert invalid values to NaN + - "raise": Raise an exception for invalid values + - "ignore": Leave invalid values unchanged + - **Example**: + ```json + {"to_numeric": {"column": "Price", "errors": "coerce"}} + ``` + + ### dropna + Drop rows that contain missing values in specified columns. + - **Parameters**: + - `subset` (List[str]): List of column names to check for NaN values + - **Example**: + ```json + {"dropna": {"subset": ["ProductID", "Price"]}} + ``` + + ## Request Body Structure + + The request body must contain a `tables` array, where each table object specifies: + - `name`: The name for the table in the local SQLite database + - `powerbi_table_name`: The source table name in the Power BI dataset + - `steps`: An ordered list of preprocessing steps to apply + + ## Complete Example + + ```json + { + "tables": [ + { + "name": "ProductData", + "powerbi_table_name": "products_raw", + "steps": [ + { + "keep": { + "columns": [ + "ProductID", + "ProductName", + "Supplier", + "Stock", + "Unit", + "Price" + ] + } + }, + { + "fillna": { + "column": "Supplier", + "value": "Unknown" + } + }, + { + "to_numeric": { + "column": "Price", + "errors": "coerce" + } + }, + { + "dropna": { + "subset": [ + "ProductID", + "ProductName", + "Stock", + "Unit", + "Price" + ] + } + } + ] + } + ] + } + ``` + + ## Differences from /update-db Endpoint + + - **No YAML file dependency**: Configuration is provided directly in the request + - **Dynamic configuration**: Different preprocessing pipelines can be specified per request + - **Warnings included**: Response includes any warnings encountered during preprocessing + - **Table count**: Response includes the number of tables successfully processed + + ## Response + + Returns a `UpdateDbWithConfigResponse` containing: + - `success`: Boolean indicating if the operation was successful + - `tables_processed`: Number of tables that were successfully processed + - `warnings`: List of any warnings encountered during preprocessing (e.g., missing columns) + + ## Error Handling + + The endpoint will raise an error if: + - The configuration structure is invalid + - No table configurations are provided + - No data is returned from Power BI for a specified table + - Preprocessing results in an empty DataFrame + - Database save operations fail + + Args: + config: The preprocessing configuration specifying tables and their processing steps + _: Security dependency requiring valid API key + + Returns: + UpdateDbWithConfigResponse with operation status, table count, and any warnings + + Raises: + HTTPException: If authentication fails or processing errors occur + """ + service = await DataProcessorService.create() + tables_processed, warnings = await service.update_database_with_config( + config=config.model_dump() + ) + return UpdateDbWithConfigResponse( + success=True, tables_processed=tables_processed, warnings=warnings + ) diff --git a/src/dataprocessor/schemas.py b/src/dataprocessor/schemas.py index 1586576..0955ca4 100644 --- a/src/dataprocessor/schemas.py +++ b/src/dataprocessor/schemas.py @@ -1,5 +1,6 @@ """Schemas for the data processor service.""" +from typing import Any, Dict, List from pydantic import BaseModel, Field @@ -7,3 +8,135 @@ class UpdateDbResponse(BaseModel): success: bool = Field( ..., description="Indicates if the database update was successful." ) + + +class TableConfigSchema(BaseModel): + """Schema for a single table configuration. + + Defines how a table should be read from Power BI and preprocessed. + + Attributes: + name: The name to use for the table in the local SQLite database + powerbi_table_name: The name of the source table in Power BI dataset + steps: List of preprocessing steps to apply to the table data + """ + + name: str = Field( + ..., description="Name for the table in the local database", example="Data" + ) + powerbi_table_name: str = Field( + ..., + description="Name of the table in the Power BI dataset", + example="data_full", + ) + steps: List[Dict[str, Any]] = Field( + default_factory=list, + description="List of preprocessing steps to apply", + example=[ + {"keep": {"columns": ["col1", "col2"]}}, + {"fillna": {"column": "col1", "value": "Unknown"}}, + ], + ) + + +class PreprocessingConfigSchema(BaseModel): + """Schema for the complete preprocessing configuration. + + This schema defines the structure for JSON-based preprocessing configuration, + replacing the need for a YAML configuration file. It allows dynamic + configuration of table preprocessing operations per API request. + + The configuration supports multiple tables, each with its own set of + preprocessing steps. Available preprocessing steps include: + + - **keep**: Keep only specified columns + - Parameters: columns (List[str]) + - Example: {"keep": {"columns": ["Name", "Price", "Quantity"]}} + + - **fillna**: Fill missing values in a column + - Parameters: column (str), value (Any) + - Example: {"fillna": {"column": "Supplier", "value": "Unknown"}} + + - **to_numeric**: Convert a column to numeric type + - Parameters: column (str), errors (str, optional) + - Example: {"to_numeric": {"column": "Price", "errors": "coerce"}} + + - **dropna**: Drop rows with missing values in specified columns + - Parameters: subset (List[str]) + - Example: {"dropna": {"subset": ["Name", "Price"]}} + + Attributes: + tables: List of table configurations to process + + Example Request Body: + { + "tables": [ + { + "name": "ProductData", + "powerbi_table_name": "products_raw", + "steps": [ + { + "keep": { + "columns": [ + "ProductID", + "ProductName", + "Supplier", + "Stock", + "Unit", + "Price" + ] + } + }, + { + "fillna": { + "column": "Supplier", + "value": "Unknown" + } + }, + { + "to_numeric": { + "column": "Price", + "errors": "coerce" + } + }, + { + "dropna": { + "subset": [ + "ProductID", + "ProductName", + "Stock", + "Unit", + "Price" + ] + } + } + ] + } + ] + } + """ + + tables: List[TableConfigSchema] = Field( + ..., description="List of table configurations to process", min_items=1 + ) + + +class UpdateDbWithConfigResponse(BaseModel): + """Response schema for the JSON-based database update endpoint. + + Attributes: + success: Indicates if the database update was successful + tables_processed: Number of tables that were processed + warnings: List of any warnings encountered during preprocessing + """ + + success: bool = Field( + ..., description="Indicates if the database update was successful" + ) + tables_processed: int = Field( + ..., description="Number of tables that were successfully processed" + ) + warnings: List[str] = Field( + default_factory=list, + description="List of warnings encountered during preprocessing", + ) diff --git a/src/dataprocessor/service.py b/src/dataprocessor/service.py index dcf7931..b526b59 100644 --- a/src/dataprocessor/service.py +++ b/src/dataprocessor/service.py @@ -72,3 +72,104 @@ class DataProcessorService: # Step 4: Update the local SQLite database await self.data_saver.save_table(df, table_config.name, overwrite=True) + + async def update_database_with_config( + self, *, config: dict + ) -> tuple[int, list[str]]: + """Update the database using a provided JSON configuration. + + This method processes tables based on a configuration provided directly + as a dictionary (typically from a JSON request body), without relying + on a YAML configuration file. This enables dynamic, on-demand preprocessing + with custom configurations per API request. + + The method follows the same workflow as update_database(): + 1. Reads data from Power BI for each configured table + 2. Applies the specified preprocessing steps + 3. Saves the processed data to the local SQLite database + + Args: + config: Dictionary containing preprocessing configuration with structure: + { + "tables": [ + { + "name": str, + "powerbi_table_name": str, + "steps": [ + {step_type: {param1: value1, ...}}, + ... + ] + }, + ... + ] + } + + Returns: + A tuple containing: + - int: Number of tables successfully processed + - List[str]: List of warnings collected during preprocessing + + Raises: + RuntimeError: If no table configurations are found or if data processing fails. + ValueError: If the configuration structure is invalid. + + Example: + >>> config = { + ... "tables": [ + ... { + ... "name": "Sales", + ... "powerbi_table_name": "sales_raw", + ... "steps": [ + ... {"keep": {"columns": ["Product", "Amount"]}}, + ... {"dropna": {"subset": ["Product", "Amount"]}} + ... ] + ... } + ... ] + ... } + >>> service = await DataProcessorService.create() + >>> tables_processed, warnings = await service.update_database_with_config( + ... config=config + ... ) + """ + # Create a Preprocessor from the provided configuration + preprocessor = await Preprocessor.create_from_config(config=config) + table_configs = preprocessor.get_table_configs() + + if not table_configs: + raise RuntimeError("No table configurations found in provided config.") + + all_warnings = [] + tables_processed = 0 + + for table_config in table_configs: + # Step 1: Create PowerBIReader for this table + power_bi_reader = await PowerBIReader.create( + dataset_id=settings.POWERBI_DATASET_ID, + access_token=self.access_token, + table_name=table_config.powerbi_table_name, + ) + + # Step 2: Read data from Power BI + df = await power_bi_reader.read_data() + if df.empty: + raise RuntimeError( + f"No data read from Power BI for table '{table_config.name}'." + ) + + # Step 3: Preprocess the data using this table's steps + df = await preprocessor.preprocess(df, steps=table_config.steps) + if df.empty: + raise RuntimeError( + f"No data returned from preprocessing for table '{table_config.name}'." + ) + + # Collect any warnings from preprocessing + if preprocessor.last_report: + all_warnings.extend(preprocessor.last_report) + + # Step 4: Update the local SQLite database + await self.data_saver.save_table(df, table_config.name, overwrite=True) + + tables_processed += 1 + + return tables_processed, all_warnings