154 lines
5.4 KiB
Python
154 lines
5.4 KiB
Python
"""Schemas for the data processor service."""
|
|
|
|
from typing import Any, Dict, List
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class UpdateDbResponse(BaseModel):
|
|
success: bool = Field(
|
|
..., description="Indicates if the database update was successful."
|
|
)
|
|
|
|
|
|
class TableConfigSchema(BaseModel):
|
|
"""Schema for a single table configuration.
|
|
|
|
Defines how a table should be read from Power BI and preprocessed.
|
|
|
|
Attributes:
|
|
name: The name to use for the table in the local SQLite database
|
|
powerbi_table_name: The name of the source table in Power BI dataset
|
|
measures: Optional list of Power BI measures to retrieve
|
|
group_by_columns: Optional list of columns to group by when retrieving measures
|
|
steps: List of preprocessing steps to apply to the table data
|
|
"""
|
|
|
|
name: str = Field(
|
|
..., description="Name for the table in the local database", example="Data"
|
|
)
|
|
powerbi_table_name: str = Field(
|
|
...,
|
|
description="Name of the table in the Power BI dataset",
|
|
example="data_full",
|
|
)
|
|
measures: List[str] = Field(
|
|
default_factory=list,
|
|
description="List of Power BI measure names to retrieve",
|
|
example=["EP in CHF", "Gesamtbetrag in CHF"],
|
|
)
|
|
group_by_columns: List[str] = Field(
|
|
default_factory=list,
|
|
description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)",
|
|
example=["m_Artikel"],
|
|
)
|
|
steps: List[Dict[str, Any]] = Field(
|
|
default_factory=list,
|
|
description="List of preprocessing steps to apply",
|
|
example=[
|
|
{"keep": {"columns": ["col1", "col2"]}},
|
|
{"fillna": {"column": "col1", "value": "Unknown"}},
|
|
],
|
|
)
|
|
|
|
|
|
class PreprocessingConfigSchema(BaseModel):
|
|
"""Schema for the complete preprocessing configuration.
|
|
|
|
This schema defines the structure for JSON-based preprocessing configuration,
|
|
replacing the need for a YAML configuration file. It allows dynamic
|
|
configuration of table preprocessing operations per API request.
|
|
|
|
The configuration supports multiple tables, each with its own set of
|
|
preprocessing steps. Available preprocessing steps include:
|
|
|
|
- **keep**: Keep only specified columns
|
|
- Parameters: columns (List[str])
|
|
- Example: {"keep": {"columns": ["Name", "Price", "Quantity"]}}
|
|
|
|
- **fillna**: Fill missing values in a column
|
|
- Parameters: column (str), value (Any)
|
|
- Example: {"fillna": {"column": "Supplier", "value": "Unknown"}}
|
|
|
|
- **to_numeric**: Convert a column to numeric type
|
|
- Parameters: column (str), errors (str, optional)
|
|
- Example: {"to_numeric": {"column": "Price", "errors": "coerce"}}
|
|
|
|
- **dropna**: Drop rows with missing values in specified columns
|
|
- Parameters: subset (List[str])
|
|
- Example: {"dropna": {"subset": ["Name", "Price"]}}
|
|
|
|
Attributes:
|
|
tables: List of table configurations to process
|
|
|
|
Example Request Body:
|
|
{
|
|
"tables": [
|
|
{
|
|
"name": "ProductData",
|
|
"powerbi_table_name": "products_raw",
|
|
"steps": [
|
|
{
|
|
"keep": {
|
|
"columns": [
|
|
"ProductID",
|
|
"ProductName",
|
|
"Supplier",
|
|
"Stock",
|
|
"Unit",
|
|
"Price"
|
|
]
|
|
}
|
|
},
|
|
{
|
|
"fillna": {
|
|
"column": "Supplier",
|
|
"value": "Unknown"
|
|
}
|
|
},
|
|
{
|
|
"to_numeric": {
|
|
"column": "Price",
|
|
"errors": "coerce"
|
|
}
|
|
},
|
|
{
|
|
"dropna": {
|
|
"subset": [
|
|
"ProductID",
|
|
"ProductName",
|
|
"Stock",
|
|
"Unit",
|
|
"Price"
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
|
|
tables: List[TableConfigSchema] = Field(
|
|
..., description="List of table configurations to process", min_items=1
|
|
)
|
|
|
|
|
|
class UpdateDbWithConfigResponse(BaseModel):
|
|
"""Response schema for the JSON-based database update endpoint.
|
|
|
|
Attributes:
|
|
success: Indicates if the database update was successful
|
|
tables_processed: Number of tables that were processed
|
|
warnings: List of any warnings encountered during preprocessing
|
|
"""
|
|
|
|
success: bool = Field(
|
|
..., description="Indicates if the database update was successful"
|
|
)
|
|
tables_processed: int = Field(
|
|
..., description="Number of tables that were successfully processed"
|
|
)
|
|
warnings: List[str] = Field(
|
|
default_factory=list,
|
|
description="List of warnings encountered during preprocessing",
|
|
)
|