diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..4ff550d --- /dev/null +++ b/.env.example @@ -0,0 +1,27 @@ +# Preprocessor Configuration +# Path to the preprocessor configuration YAML file +PP_CONFIG_PATH="/path/to/your/pp-config.yaml" + +# API Keys +# API key for the preprocessor service +PP_API_KEY="your-preprocessor-api-key-here" + +# API key for database endpoint access +DB_ENDPOINT_API_KEY="your-database-endpoint-api-key-here" + +# Database Configuration +# Path to the SQLite database file +DB_URL="/path/to/your/database.db" + +# Power BI Configuration +# Power BI dataset identifier +POWERBI_DATASET_ID="your-powerbi-dataset-id" + +# Power BI client ID for Azure AD authentication +POWERBI_CLIENT_ID="your-powerbi-client-id" + +# Power BI client secret for Azure AD authentication +POWERBI_CLIENT_SECRET="your-powerbi-client-secret" + +# Power BI tenant ID (Azure AD tenant) +POWERBI_TENANT_ID="your-powerbi-tenant-id" diff --git a/.gitignore b/.gitignore index 303cc54..9343a49 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,9 @@ wheels/ # Environment variables .env + +# Data files +data/ + +# System files +.DS_Store \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 546d703..6db03c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.13" dependencies = [ "aiosqlite>=0.20.0", "fastapi>=0.117.1", + "greenlet>=3.2.4", "httpx>=0.28.1", "msal>=1.34.0", "pandas>=2.3.2", diff --git a/src/dataprocessor/domain/preprocessor.py b/src/dataprocessor/domain/preprocessor.py index 5209b07..4b0e48b 100644 --- a/src/dataprocessor/domain/preprocessor.py +++ b/src/dataprocessor/domain/preprocessor.py @@ -142,22 +142,31 @@ REGISTRY: Dict[ @dataclass -class Preprocessor: +class TableConfig: + """Configuration for a single table.""" + + name: str + powerbi_table_name: str steps: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass +class Preprocessor: + table_configs: List[TableConfig] = field(default_factory=list) last_report: List[str] = field(default_factory=list) @classmethod async def create(cls, config_path: str) -> "Preprocessor": """Create a Preprocessor instance from a YAML configuration file. - Loads preprocessing steps from a YAML configuration file and creates - a new Preprocessor instance with those steps. + Loads table configurations from a YAML configuration file and creates + a new Preprocessor instance with those configurations. Args: - config_path: Path to the YAML configuration file containing preprocessing steps. + config_path: Path to the YAML configuration file containing table configurations. Returns: - A new Preprocessor instance configured with the steps from the file. + A new Preprocessor instance configured with tables from the file. Raises: FileNotFoundError: If the configuration file does not exist. @@ -168,10 +177,37 @@ class Preprocessor: """ path = Path(config_path) cfg = yaml.safe_load(path.read_text(encoding="utf-8")) or {} - return cls(steps=cfg.get("steps", [])) - async def preprocess(self, df: pd.DataFrame) -> pd.DataFrame: - """Apply all configured preprocessing steps to a DataFrame. + # Parse table configurations + table_configs = [] + for table_data in cfg.get("tables", []): + table_config = TableConfig( + name=table_data.get("name", ""), + powerbi_table_name=table_data.get("powerbi_table_name", ""), + steps=table_data.get("steps", []), + ) + table_configs.append(table_config) + + return cls(table_configs=table_configs) + + def get_table_configs(self) -> List[TableConfig]: + """Get all table configurations. + + Returns: + List of TableConfig objects containing table configurations. + + Example: + >>> preprocessor = await Preprocessor.create("config.yaml") + >>> configs = preprocessor.get_table_configs() + >>> for config in configs: + ... print(config.name, config.powerbi_table_name) + """ + return self.table_configs + + async def preprocess( + self, df: pd.DataFrame, *, steps: List[Dict[str, Any]] + ) -> pd.DataFrame: + """Apply preprocessing steps to a DataFrame. Executes each preprocessing step in sequence on the provided DataFrame. Each step is looked up in the REGISTRY and applied with its parameters. @@ -180,18 +216,20 @@ class Preprocessor: Args: df: The input DataFrame to preprocess. + steps: List of preprocessing steps to apply. Returns: The preprocessed DataFrame after applying all configured steps. Example: >>> preprocessor = await Preprocessor.create("config.yaml") + >>> table_config = preprocessor.get_table_configs()[0] >>> df = pd.DataFrame({'A': [1, 2, None], 'B': ['x', 'y', 'z']}) - >>> result = await preprocessor.preprocess(df) + >>> result = await preprocessor.preprocess(df, steps=table_config.steps) >>> print(preprocessor.last_report) # Check for any warnings """ report: List[str] = [] - for step in self.steps: + for step in steps: name, params = next(iter(step.items())) fn = REGISTRY.get(name) if not fn: diff --git a/src/dataprocessor/service.py b/src/dataprocessor/service.py index ac9082a..dcf7931 100644 --- a/src/dataprocessor/service.py +++ b/src/dataprocessor/service.py @@ -12,33 +12,28 @@ from src.dataprocessor.domain.sqlite_datasaver import SQLiteDataSaver class DataProcessorService: """Service class for data processing operations.""" - power_bi_reader: PowerBIReader = None preprocessor: Preprocessor = None data_saver: BaseDataSaver = None + access_token: str = None @classmethod async def create(cls) -> "DataProcessorService": """Create a new instance of DataProcessorService.""" instance = cls() - instance.power_bi_reader = await cls._create_powerbi_reader() + instance.access_token = await cls._get_access_token() instance.preprocessor = await cls._create_preprocessor() instance.data_saver = await SQLiteDataSaver.create(settings.DB_PATH) return instance @staticmethod - async def _create_powerbi_reader() -> PowerBIReader: - """Create and initialize the PowerBIReader.""" + async def _get_access_token() -> str: + """Get Power BI access token.""" access_token = await PowerBIReader._get_access_token_async( tenant_id=settings.POWERBI_TENANT_ID, client_id=settings.POWERBI_CLIENT_ID, client_secret=settings.POWERBI_CLIENT_SECRET, ) - power_bi_reader = await PowerBIReader.create( - dataset_id=settings.POWERBI_DATASET_ID, - access_token=access_token, - table_name=settings.POWERBI_TABLE_NAME, - ) - return power_bi_reader + return access_token @staticmethod async def _create_preprocessor() -> Preprocessor: @@ -47,16 +42,33 @@ class DataProcessorService: return preprocessor async def update_database(self): - """Placeholder method for updating the database.""" - # Step 1: Read data from Power BI - df = await self.power_bi_reader.read_data() - if df.empty: - raise RuntimeError("No data read from Power BI.") - # Step 2: Preprocess the data - df = await self.preprocessor.preprocess(df) - if df.empty: - raise RuntimeError("No data returned from preprocessing.") - # Step 3: Update the local SQLite database - await self.data_saver.save_table( - df, self.power_bi_reader.table_name, overwrite=True - ) + """Update the database by processing all configured tables.""" + table_configs = self.preprocessor.get_table_configs() + + if not table_configs: + raise RuntimeError("No table configurations found in preprocessing config.") + + for table_config in table_configs: + # Step 1: Create PowerBIReader for this table + power_bi_reader = await PowerBIReader.create( + dataset_id=settings.POWERBI_DATASET_ID, + access_token=self.access_token, + table_name=table_config.powerbi_table_name, + ) + + # Step 2: Read data from Power BI + df = await power_bi_reader.read_data() + if df.empty: + raise RuntimeError( + f"No data read from Power BI for table '{table_config.name}'." + ) + + # Step 3: Preprocess the data using this table's steps + df = await self.preprocessor.preprocess(df, steps=table_config.steps) + if df.empty: + raise RuntimeError( + f"No data returned from preprocessing for table '{table_config.name}'." + ) + + # Step 4: Update the local SQLite database + await self.data_saver.save_table(df, table_config.name, overwrite=True) diff --git a/src/pp-config.yaml b/src/pp-config.yaml index 62981e0..1dee930 100644 --- a/src/pp-config.yaml +++ b/src/pp-config.yaml @@ -1,29 +1,32 @@ version: 1 -steps: - - keep: - columns: - [ - "Artikelkürzel", - "Artikelnummer", - "Artikelbezeichnung", - "Lieferant", - "Ist-Bestand", - "Einheit", - "EP in CHF", - ] - - fillna: - column: "Lieferant" - value: "Unbekannt" - - to_numeric: - column: "EP in CHF" - errors: "coerce" - - dropna: - subset: - [ - "Artikelkürzel", - "Artikelnummer", - "Artikelbezeichnung", - "Ist-Bestand", - "Einheit", - "EP in CHF", - ] +tables: + - name: "Data" + powerbi_table_name: "data_full" + steps: + - keep: + columns: + [ + "Artikelkürzel", + "Artikelnummer", + "Artikelbezeichnung", + "Lieferant", + "Ist-Bestand", + "Einheit", + "EP in CHF", + ] + - fillna: + column: "Lieferant" + value: "Unbekannt" + - to_numeric: + column: "EP in CHF" + errors: "coerce" + - dropna: + subset: + [ + "Artikelkürzel", + "Artikelnummer", + "Artikelbezeichnung", + "Ist-Bestand", + "Einheit", + "EP in CHF", + ] diff --git a/src/settings.py b/src/settings.py index 5ea5512..4778f55 100644 --- a/src/settings.py +++ b/src/settings.py @@ -59,11 +59,6 @@ class Settings(BaseSettings): ..., description="Power BI Dataset ID to read data from." ) - # Power BI Table Name. - POWERBI_TABLE_NAME: str = Field( - ..., description="Power BI Table name to read data from." - ) - # Power BI Tenant ID. POWERBI_TENANT_ID: str = Field( ..., description="Azure AD Tenant ID for Power BI authentication." diff --git a/tests/dataprocessor/domain/test_powerbi_reader.py b/tests/dataprocessor/domain/test_powerbi_reader.py index bdd938f..69d6680 100644 --- a/tests/dataprocessor/domain/test_powerbi_reader.py +++ b/tests/dataprocessor/domain/test_powerbi_reader.py @@ -3,14 +3,29 @@ import pytest from src.dataprocessor.domain.powerbi_reader import PowerBIReader +from src.dataprocessor.domain.preprocessor import Preprocessor from src.settings import settings @pytest.mark.asyncio async def test_read_data_prints_dataframe_info() -> None: """Test PowerBIReader.read_data() and print DataFrame info for development.""" + # Load preprocessor config to get table configurations + print("\nLoading table configurations...") + preprocessor = await Preprocessor.create(settings.PP_CONFIG_PATH) + table_configs = preprocessor.get_table_configs() + + if not table_configs: + pytest.skip("No table configurations found in preprocessing config.") + + # Use the first table configuration for testing + table_config = table_configs[0] + print( + f"✓ Using table configuration: {table_config.name} -> {table_config.powerbi_table_name}" + ) + # Get access token - print("\nGetting access token...") + print("Getting access token...") access_token = await PowerBIReader._get_access_token_async( tenant_id=settings.POWERBI_TENANT_ID, client_id=settings.POWERBI_CLIENT_ID, @@ -23,9 +38,9 @@ async def test_read_data_prints_dataframe_info() -> None: reader = await PowerBIReader.create( dataset_id=settings.POWERBI_DATASET_ID, access_token=access_token, - table_name=settings.POWERBI_TABLE_NAME, + table_name=table_config.powerbi_table_name, ) - print(f"✓ Reader created for table: {settings.POWERBI_TABLE_NAME}") + print(f"✓ Reader created for table: {table_config.powerbi_table_name}") # Call read_data() once print("Fetching data from Power BI...") diff --git a/uv.lock b/uv.lock index 2e1e780..393e9a2 100644 --- a/uv.lock +++ b/uv.lock @@ -691,6 +691,7 @@ source = { virtual = "." } dependencies = [ { name = "aiosqlite" }, { name = "fastapi" }, + { name = "greenlet" }, { name = "httpx" }, { name = "msal" }, { name = "pandas" }, @@ -707,6 +708,7 @@ dependencies = [ requires-dist = [ { name = "aiosqlite", specifier = ">=0.20.0" }, { name = "fastapi", specifier = ">=0.117.1" }, + { name = "greenlet", specifier = ">=3.2.4" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "msal", specifier = ">=1.34.0" }, { name = "pandas", specifier = ">=2.3.2" },