62 lines
2.4 KiB
Python
62 lines
2.4 KiB
Python
"""Main service module for data processing."""
|
|
|
|
from dataclasses import dataclass
|
|
from src.settings import settings
|
|
from src.dataprocessor.domain.powerbi_reader import PowerBIReader
|
|
from src.dataprocessor.domain.preprocessor import Preprocessor
|
|
from dataprocessor.domain.base_datasaver import BaseDataSaver
|
|
from src.dataprocessor.domain.sqlite_datasaver import SQLiteDataSaver
|
|
|
|
|
|
@dataclass
|
|
class DataProcessorService:
|
|
"""Service class for data processing operations."""
|
|
|
|
power_bi_reader: PowerBIReader = None
|
|
preprocessor: Preprocessor = None
|
|
data_saver: BaseDataSaver = None
|
|
|
|
@classmethod
|
|
async def create(cls) -> "DataProcessorService":
|
|
"""Create a new instance of DataProcessorService."""
|
|
instance = cls()
|
|
instance.power_bi_reader = await cls._create_powerbi_reader()
|
|
instance.preprocessor = await cls._create_preprocessor()
|
|
instance.data_saver = await SQLiteDataSaver.create(settings.DB_PATH)
|
|
return instance
|
|
|
|
@staticmethod
|
|
async def _create_powerbi_reader() -> PowerBIReader:
|
|
"""Create and initialize the PowerBIReader."""
|
|
access_token = await PowerBIReader._get_access_token_async(
|
|
tenant_id=settings.POWERBI_TENANT_ID,
|
|
client_id=settings.POWERBI_CLIENT_ID,
|
|
client_secret=settings.POWERBI_CLIENT_SECRET,
|
|
)
|
|
power_bi_reader = await PowerBIReader.create(
|
|
dataset_id=settings.POWERBI_DATASET_ID,
|
|
access_token=access_token,
|
|
table_name=settings.POWERBI_TABLE_NAME,
|
|
)
|
|
return power_bi_reader
|
|
|
|
@staticmethod
|
|
async def _create_preprocessor() -> Preprocessor:
|
|
"""Create and initialize the Preprocessor."""
|
|
preprocessor = await Preprocessor.create(settings.PP_CONFIG_PATH)
|
|
return preprocessor
|
|
|
|
async def update_database(self):
|
|
"""Placeholder method for updating the database."""
|
|
# Step 1: Read data from Power BI
|
|
df = await self.power_bi_reader.read_data()
|
|
if df.empty:
|
|
raise RuntimeError("No data read from Power BI.")
|
|
# Step 2: Preprocess the data
|
|
df = await self.preprocessor.preprocess(df)
|
|
if df.empty:
|
|
raise RuntimeError("No data returned from preprocessing.")
|
|
# Step 3: Update the local SQLite database
|
|
await self.data_saver.save_table(
|
|
df, self.power_bi_reader.table_name, overwrite=True
|
|
)
|