service-preprocessing/src/dataprocessor/service.py
2025-11-05 08:34:19 +01:00

179 lines
7 KiB
Python

"""Main service module for data processing."""
from dataclasses import dataclass
from src.settings import settings
from src.dataprocessor.domain.powerbi_reader import PowerBIReader
from src.dataprocessor.domain.preprocessor import Preprocessor
from src.dataprocessor.domain.base_datasaver import BaseDataSaver
from src.dataprocessor.domain.sqlite_datasaver import SQLiteDataSaver
@dataclass
class DataProcessorService:
"""Service class for data processing operations."""
preprocessor: Preprocessor = None
data_saver: BaseDataSaver = None
access_token: str = None
@classmethod
async def create(cls) -> "DataProcessorService":
"""Create a new instance of DataProcessorService."""
instance = cls()
instance.access_token = await cls._get_access_token()
instance.preprocessor = await cls._create_preprocessor()
instance.data_saver = await SQLiteDataSaver.create(settings.DB_PATH)
return instance
@staticmethod
async def _get_access_token() -> str:
"""Get Power BI access token."""
access_token = await PowerBIReader._get_access_token_async(
tenant_id=settings.POWERBI_TENANT_ID,
client_id=settings.POWERBI_CLIENT_ID,
client_secret=settings.POWERBI_CLIENT_SECRET,
)
return access_token
@staticmethod
async def _create_preprocessor() -> Preprocessor:
"""Create and initialize the Preprocessor."""
preprocessor = await Preprocessor.create(settings.PP_CONFIG_PATH)
return preprocessor
async def update_database(self):
"""Update the database by processing all configured tables."""
table_configs = self.preprocessor.get_table_configs()
if not table_configs:
raise RuntimeError("No table configurations found in preprocessing config.")
for table_config in table_configs:
# Step 1: Create PowerBIReader for this table
power_bi_reader = await PowerBIReader.create(
dataset_id=settings.POWERBI_DATASET_ID,
access_token=self.access_token,
table_name=table_config.powerbi_table_name,
measures=table_config.measures,
group_by_columns=table_config.group_by_columns,
)
# Step 2: Read data from Power BI
df = await power_bi_reader.read_data()
if df.empty:
raise RuntimeError(
f"No data read from Power BI for table '{table_config.name}'."
)
# Step 3: Preprocess the data using this table's steps
df = await self.preprocessor.preprocess(df, steps=table_config.steps)
if df.empty:
raise RuntimeError(
f"No data returned from preprocessing for table '{table_config.name}'."
)
# Step 4: Update the local SQLite database
await self.data_saver.save_table(df, table_config.name, overwrite=True)
async def update_database_with_config(
self, *, config: dict
) -> tuple[int, list[str]]:
"""Update the database using a provided JSON configuration.
This method processes tables based on a configuration provided directly
as a dictionary (typically from a JSON request body), without relying
on a YAML configuration file. This enables dynamic, on-demand preprocessing
with custom configurations per API request.
The method follows the same workflow as update_database():
1. Reads data from Power BI for each configured table
2. Applies the specified preprocessing steps
3. Saves the processed data to the local SQLite database
Args:
config: Dictionary containing preprocessing configuration with structure:
{
"tables": [
{
"name": str,
"powerbi_table_name": str,
"steps": [
{step_type: {param1: value1, ...}},
...
]
},
...
]
}
Returns:
A tuple containing:
- int: Number of tables successfully processed
- List[str]: List of warnings collected during preprocessing
Raises:
RuntimeError: If no table configurations are found or if data processing fails.
ValueError: If the configuration structure is invalid.
Example:
>>> config = {
... "tables": [
... {
... "name": "Sales",
... "powerbi_table_name": "sales_raw",
... "steps": [
... {"keep": {"columns": ["Product", "Amount"]}},
... {"dropna": {"subset": ["Product", "Amount"]}}
... ]
... }
... ]
... }
>>> service = await DataProcessorService.create()
>>> tables_processed, warnings = await service.update_database_with_config(
... config=config
... )
"""
# Create a Preprocessor from the provided configuration
preprocessor = await Preprocessor.create_from_config(config=config)
table_configs = preprocessor.get_table_configs()
if not table_configs:
raise RuntimeError("No table configurations found in provided config.")
all_warnings = []
tables_processed = 0
for table_config in table_configs:
# Step 1: Create PowerBIReader for this table
power_bi_reader = await PowerBIReader.create(
dataset_id=settings.POWERBI_DATASET_ID,
access_token=self.access_token,
table_name=table_config.powerbi_table_name,
measures=table_config.measures,
group_by_columns=table_config.group_by_columns,
)
# Step 2: Read data from Power BI
df = await power_bi_reader.read_data()
if df.empty:
raise RuntimeError(
f"No data read from Power BI for table '{table_config.name}'."
)
# Step 3: Preprocess the data using this table's steps
df = await preprocessor.preprocess(df, steps=table_config.steps)
if df.empty:
raise RuntimeError(
f"No data returned from preprocessing for table '{table_config.name}'."
)
# Collect any warnings from preprocessing
if preprocessor.last_report:
all_warnings.extend(preprocessor.last_report)
# Step 4: Update the local SQLite database
await self.data_saver.save_table(df, table_config.name, overwrite=True)
tables_processed += 1
return tables_processed, all_warnings