chore: add multi table support
This commit is contained in:
parent
97c503e727
commit
70c5f3a6b0
5 changed files with 132 additions and 69 deletions
|
|
@ -142,22 +142,31 @@ REGISTRY: Dict[
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Preprocessor:
|
class TableConfig:
|
||||||
|
"""Configuration for a single table."""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
powerbi_table_name: str
|
||||||
steps: List[Dict[str, Any]] = field(default_factory=list)
|
steps: List[Dict[str, Any]] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Preprocessor:
|
||||||
|
table_configs: List[TableConfig] = field(default_factory=list)
|
||||||
last_report: List[str] = field(default_factory=list)
|
last_report: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create(cls, config_path: str) -> "Preprocessor":
|
async def create(cls, config_path: str) -> "Preprocessor":
|
||||||
"""Create a Preprocessor instance from a YAML configuration file.
|
"""Create a Preprocessor instance from a YAML configuration file.
|
||||||
|
|
||||||
Loads preprocessing steps from a YAML configuration file and creates
|
Loads table configurations from a YAML configuration file and creates
|
||||||
a new Preprocessor instance with those steps.
|
a new Preprocessor instance with those configurations.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
config_path: Path to the YAML configuration file containing preprocessing steps.
|
config_path: Path to the YAML configuration file containing table configurations.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A new Preprocessor instance configured with the steps from the file.
|
A new Preprocessor instance configured with tables from the file.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
FileNotFoundError: If the configuration file does not exist.
|
FileNotFoundError: If the configuration file does not exist.
|
||||||
|
|
@ -168,10 +177,37 @@ class Preprocessor:
|
||||||
"""
|
"""
|
||||||
path = Path(config_path)
|
path = Path(config_path)
|
||||||
cfg = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
|
cfg = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
|
||||||
return cls(steps=cfg.get("steps", []))
|
|
||||||
|
|
||||||
async def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
|
# Parse table configurations
|
||||||
"""Apply all configured preprocessing steps to a DataFrame.
|
table_configs = []
|
||||||
|
for table_data in cfg.get("tables", []):
|
||||||
|
table_config = TableConfig(
|
||||||
|
name=table_data.get("name", ""),
|
||||||
|
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||||
|
steps=table_data.get("steps", []),
|
||||||
|
)
|
||||||
|
table_configs.append(table_config)
|
||||||
|
|
||||||
|
return cls(table_configs=table_configs)
|
||||||
|
|
||||||
|
def get_table_configs(self) -> List[TableConfig]:
|
||||||
|
"""Get all table configurations.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of TableConfig objects containing table configurations.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> preprocessor = await Preprocessor.create("config.yaml")
|
||||||
|
>>> configs = preprocessor.get_table_configs()
|
||||||
|
>>> for config in configs:
|
||||||
|
... print(config.name, config.powerbi_table_name)
|
||||||
|
"""
|
||||||
|
return self.table_configs
|
||||||
|
|
||||||
|
async def preprocess(
|
||||||
|
self, df: pd.DataFrame, *, steps: List[Dict[str, Any]]
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
"""Apply preprocessing steps to a DataFrame.
|
||||||
|
|
||||||
Executes each preprocessing step in sequence on the provided DataFrame.
|
Executes each preprocessing step in sequence on the provided DataFrame.
|
||||||
Each step is looked up in the REGISTRY and applied with its parameters.
|
Each step is looked up in the REGISTRY and applied with its parameters.
|
||||||
|
|
@ -180,18 +216,20 @@ class Preprocessor:
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
df: The input DataFrame to preprocess.
|
df: The input DataFrame to preprocess.
|
||||||
|
steps: List of preprocessing steps to apply.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The preprocessed DataFrame after applying all configured steps.
|
The preprocessed DataFrame after applying all configured steps.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
>>> preprocessor = await Preprocessor.create("config.yaml")
|
>>> preprocessor = await Preprocessor.create("config.yaml")
|
||||||
|
>>> table_config = preprocessor.get_table_configs()[0]
|
||||||
>>> df = pd.DataFrame({'A': [1, 2, None], 'B': ['x', 'y', 'z']})
|
>>> df = pd.DataFrame({'A': [1, 2, None], 'B': ['x', 'y', 'z']})
|
||||||
>>> result = await preprocessor.preprocess(df)
|
>>> result = await preprocessor.preprocess(df, steps=table_config.steps)
|
||||||
>>> print(preprocessor.last_report) # Check for any warnings
|
>>> print(preprocessor.last_report) # Check for any warnings
|
||||||
"""
|
"""
|
||||||
report: List[str] = []
|
report: List[str] = []
|
||||||
for step in self.steps:
|
for step in steps:
|
||||||
name, params = next(iter(step.items()))
|
name, params = next(iter(step.items()))
|
||||||
fn = REGISTRY.get(name)
|
fn = REGISTRY.get(name)
|
||||||
if not fn:
|
if not fn:
|
||||||
|
|
|
||||||
|
|
@ -12,33 +12,28 @@ from src.dataprocessor.domain.sqlite_datasaver import SQLiteDataSaver
|
||||||
class DataProcessorService:
|
class DataProcessorService:
|
||||||
"""Service class for data processing operations."""
|
"""Service class for data processing operations."""
|
||||||
|
|
||||||
power_bi_reader: PowerBIReader = None
|
|
||||||
preprocessor: Preprocessor = None
|
preprocessor: Preprocessor = None
|
||||||
data_saver: BaseDataSaver = None
|
data_saver: BaseDataSaver = None
|
||||||
|
access_token: str = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create(cls) -> "DataProcessorService":
|
async def create(cls) -> "DataProcessorService":
|
||||||
"""Create a new instance of DataProcessorService."""
|
"""Create a new instance of DataProcessorService."""
|
||||||
instance = cls()
|
instance = cls()
|
||||||
instance.power_bi_reader = await cls._create_powerbi_reader()
|
instance.access_token = await cls._get_access_token()
|
||||||
instance.preprocessor = await cls._create_preprocessor()
|
instance.preprocessor = await cls._create_preprocessor()
|
||||||
instance.data_saver = await SQLiteDataSaver.create(settings.DB_PATH)
|
instance.data_saver = await SQLiteDataSaver.create(settings.DB_PATH)
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _create_powerbi_reader() -> PowerBIReader:
|
async def _get_access_token() -> str:
|
||||||
"""Create and initialize the PowerBIReader."""
|
"""Get Power BI access token."""
|
||||||
access_token = await PowerBIReader._get_access_token_async(
|
access_token = await PowerBIReader._get_access_token_async(
|
||||||
tenant_id=settings.POWERBI_TENANT_ID,
|
tenant_id=settings.POWERBI_TENANT_ID,
|
||||||
client_id=settings.POWERBI_CLIENT_ID,
|
client_id=settings.POWERBI_CLIENT_ID,
|
||||||
client_secret=settings.POWERBI_CLIENT_SECRET,
|
client_secret=settings.POWERBI_CLIENT_SECRET,
|
||||||
)
|
)
|
||||||
power_bi_reader = await PowerBIReader.create(
|
return access_token
|
||||||
dataset_id=settings.POWERBI_DATASET_ID,
|
|
||||||
access_token=access_token,
|
|
||||||
table_name=settings.POWERBI_TABLE_NAME,
|
|
||||||
)
|
|
||||||
return power_bi_reader
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _create_preprocessor() -> Preprocessor:
|
async def _create_preprocessor() -> Preprocessor:
|
||||||
|
|
@ -47,16 +42,33 @@ class DataProcessorService:
|
||||||
return preprocessor
|
return preprocessor
|
||||||
|
|
||||||
async def update_database(self):
|
async def update_database(self):
|
||||||
"""Placeholder method for updating the database."""
|
"""Update the database by processing all configured tables."""
|
||||||
# Step 1: Read data from Power BI
|
table_configs = self.preprocessor.get_table_configs()
|
||||||
df = await self.power_bi_reader.read_data()
|
|
||||||
if df.empty:
|
if not table_configs:
|
||||||
raise RuntimeError("No data read from Power BI.")
|
raise RuntimeError("No table configurations found in preprocessing config.")
|
||||||
# Step 2: Preprocess the data
|
|
||||||
df = await self.preprocessor.preprocess(df)
|
for table_config in table_configs:
|
||||||
if df.empty:
|
# Step 1: Create PowerBIReader for this table
|
||||||
raise RuntimeError("No data returned from preprocessing.")
|
power_bi_reader = await PowerBIReader.create(
|
||||||
# Step 3: Update the local SQLite database
|
dataset_id=settings.POWERBI_DATASET_ID,
|
||||||
await self.data_saver.save_table(
|
access_token=self.access_token,
|
||||||
df, self.power_bi_reader.table_name, overwrite=True
|
table_name=table_config.powerbi_table_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Step 2: Read data from Power BI
|
||||||
|
df = await power_bi_reader.read_data()
|
||||||
|
if df.empty:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"No data read from Power BI for table '{table_config.name}'."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 3: Preprocess the data using this table's steps
|
||||||
|
df = await self.preprocessor.preprocess(df, steps=table_config.steps)
|
||||||
|
if df.empty:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"No data returned from preprocessing for table '{table_config.name}'."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 4: Update the local SQLite database
|
||||||
|
await self.data_saver.save_table(df, table_config.name, overwrite=True)
|
||||||
|
|
|
||||||
|
|
@ -1,29 +1,32 @@
|
||||||
version: 1
|
version: 1
|
||||||
steps:
|
tables:
|
||||||
- keep:
|
- name: "inventory_table"
|
||||||
columns:
|
powerbi_table_name: "InventoryData"
|
||||||
[
|
steps:
|
||||||
"Artikelkürzel",
|
- keep:
|
||||||
"Artikelnummer",
|
columns:
|
||||||
"Artikelbezeichnung",
|
[
|
||||||
"Lieferant",
|
"Artikelkürzel",
|
||||||
"Ist-Bestand",
|
"Artikelnummer",
|
||||||
"Einheit",
|
"Artikelbezeichnung",
|
||||||
"EP in CHF",
|
"Lieferant",
|
||||||
]
|
"Ist-Bestand",
|
||||||
- fillna:
|
"Einheit",
|
||||||
column: "Lieferant"
|
"EP in CHF",
|
||||||
value: "Unbekannt"
|
]
|
||||||
- to_numeric:
|
- fillna:
|
||||||
column: "EP in CHF"
|
column: "Lieferant"
|
||||||
errors: "coerce"
|
value: "Unbekannt"
|
||||||
- dropna:
|
- to_numeric:
|
||||||
subset:
|
column: "EP in CHF"
|
||||||
[
|
errors: "coerce"
|
||||||
"Artikelkürzel",
|
- dropna:
|
||||||
"Artikelnummer",
|
subset:
|
||||||
"Artikelbezeichnung",
|
[
|
||||||
"Ist-Bestand",
|
"Artikelkürzel",
|
||||||
"Einheit",
|
"Artikelnummer",
|
||||||
"EP in CHF",
|
"Artikelbezeichnung",
|
||||||
]
|
"Ist-Bestand",
|
||||||
|
"Einheit",
|
||||||
|
"EP in CHF",
|
||||||
|
]
|
||||||
|
|
|
||||||
|
|
@ -59,11 +59,6 @@ class Settings(BaseSettings):
|
||||||
..., description="Power BI Dataset ID to read data from."
|
..., description="Power BI Dataset ID to read data from."
|
||||||
)
|
)
|
||||||
|
|
||||||
# Power BI Table Name.
|
|
||||||
POWERBI_TABLE_NAME: str = Field(
|
|
||||||
..., description="Power BI Table name to read data from."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Power BI Tenant ID.
|
# Power BI Tenant ID.
|
||||||
POWERBI_TENANT_ID: str = Field(
|
POWERBI_TENANT_ID: str = Field(
|
||||||
..., description="Azure AD Tenant ID for Power BI authentication."
|
..., description="Azure AD Tenant ID for Power BI authentication."
|
||||||
|
|
|
||||||
|
|
@ -3,14 +3,29 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from src.dataprocessor.domain.powerbi_reader import PowerBIReader
|
from src.dataprocessor.domain.powerbi_reader import PowerBIReader
|
||||||
|
from src.dataprocessor.domain.preprocessor import Preprocessor
|
||||||
from src.settings import settings
|
from src.settings import settings
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_read_data_prints_dataframe_info() -> None:
|
async def test_read_data_prints_dataframe_info() -> None:
|
||||||
"""Test PowerBIReader.read_data() and print DataFrame info for development."""
|
"""Test PowerBIReader.read_data() and print DataFrame info for development."""
|
||||||
|
# Load preprocessor config to get table configurations
|
||||||
|
print("\nLoading table configurations...")
|
||||||
|
preprocessor = await Preprocessor.create(settings.PP_CONFIG_PATH)
|
||||||
|
table_configs = preprocessor.get_table_configs()
|
||||||
|
|
||||||
|
if not table_configs:
|
||||||
|
pytest.skip("No table configurations found in preprocessing config.")
|
||||||
|
|
||||||
|
# Use the first table configuration for testing
|
||||||
|
table_config = table_configs[0]
|
||||||
|
print(
|
||||||
|
f"✓ Using table configuration: {table_config.name} -> {table_config.powerbi_table_name}"
|
||||||
|
)
|
||||||
|
|
||||||
# Get access token
|
# Get access token
|
||||||
print("\nGetting access token...")
|
print("Getting access token...")
|
||||||
access_token = await PowerBIReader._get_access_token_async(
|
access_token = await PowerBIReader._get_access_token_async(
|
||||||
tenant_id=settings.POWERBI_TENANT_ID,
|
tenant_id=settings.POWERBI_TENANT_ID,
|
||||||
client_id=settings.POWERBI_CLIENT_ID,
|
client_id=settings.POWERBI_CLIENT_ID,
|
||||||
|
|
@ -23,9 +38,9 @@ async def test_read_data_prints_dataframe_info() -> None:
|
||||||
reader = await PowerBIReader.create(
|
reader = await PowerBIReader.create(
|
||||||
dataset_id=settings.POWERBI_DATASET_ID,
|
dataset_id=settings.POWERBI_DATASET_ID,
|
||||||
access_token=access_token,
|
access_token=access_token,
|
||||||
table_name=settings.POWERBI_TABLE_NAME,
|
table_name=table_config.powerbi_table_name,
|
||||||
)
|
)
|
||||||
print(f"✓ Reader created for table: {settings.POWERBI_TABLE_NAME}")
|
print(f"✓ Reader created for table: {table_config.powerbi_table_name}")
|
||||||
|
|
||||||
# Call read_data() once
|
# Call read_data() once
|
||||||
print("Fetching data from Power BI...")
|
print("Fetching data from Power BI...")
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue