Merge pull request #8 from valueonag/feat/powerbi-measures
feat: support powerbi measures
This commit is contained in:
commit
e7dd3ea999
6 changed files with 146 additions and 3 deletions
|
|
@ -13,22 +13,69 @@ class PowerBIReader:
|
||||||
table_name: str
|
table_name: str
|
||||||
base_url: str = settings.POWERBI_BASE_URL
|
base_url: str = settings.POWERBI_BASE_URL
|
||||||
include_nulls: bool = True
|
include_nulls: bool = True
|
||||||
|
measures: list[str] = None
|
||||||
|
group_by_columns: list[str] = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create(
|
async def create(
|
||||||
cls, dataset_id: str, access_token: str, table_name: str, **kwargs
|
cls,
|
||||||
|
*,
|
||||||
|
dataset_id: str,
|
||||||
|
access_token: str,
|
||||||
|
table_name: str,
|
||||||
|
measures: list[str] = None,
|
||||||
|
group_by_columns: list[str] = None,
|
||||||
|
**kwargs,
|
||||||
):
|
):
|
||||||
return cls(
|
return cls(
|
||||||
dataset_id=dataset_id,
|
dataset_id=dataset_id,
|
||||||
access_token=access_token,
|
access_token=access_token,
|
||||||
table_name=table_name,
|
table_name=table_name,
|
||||||
|
measures=measures or [],
|
||||||
|
group_by_columns=group_by_columns or [],
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _dax_query(self) -> str:
|
def _dax_query(self) -> str:
|
||||||
|
"""Generate DAX query based on configuration.
|
||||||
|
|
||||||
|
Generates different DAX queries depending on whether measures and/or
|
||||||
|
group_by_columns are specified:
|
||||||
|
|
||||||
|
1. No measures: EVALUATE 'TableName'
|
||||||
|
Returns all physical/calculated columns from the table.
|
||||||
|
|
||||||
|
2. Measures only: EVALUATE ADDCOLUMNS('TableName', "Measure1", [Measure1], ...)
|
||||||
|
Returns all columns plus the specified measures.
|
||||||
|
|
||||||
|
3. Measures + group_by_columns: EVALUATE SUMMARIZECOLUMNS('Table'[Col1], ..., "Measure1", [Measure1], ...)
|
||||||
|
Returns aggregated measures grouped by the specified columns.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DAX query string to execute against Power BI.
|
||||||
|
"""
|
||||||
# Escape single quotes in table names per DAX rules
|
# Escape single quotes in table names per DAX rules
|
||||||
safe = self.table_name.replace("'", "''")
|
safe_table = self.table_name.replace("'", "''")
|
||||||
return f"EVALUATE '{safe}'"
|
|
||||||
|
# Case 1: No measures - simple table evaluation
|
||||||
|
if not self.measures:
|
||||||
|
return f"EVALUATE '{safe_table}'"
|
||||||
|
|
||||||
|
# Case 2: Measures without grouping - use ADDCOLUMNS
|
||||||
|
if not self.group_by_columns:
|
||||||
|
measure_clauses = ", ".join(
|
||||||
|
[f'"{measure}", [{measure}]' for measure in self.measures]
|
||||||
|
)
|
||||||
|
return f"EVALUATE ADDCOLUMNS('{safe_table}', {measure_clauses})"
|
||||||
|
|
||||||
|
# Case 3: Measures with grouping - use SUMMARIZECOLUMNS
|
||||||
|
group_cols = ", ".join(
|
||||||
|
[f"'{safe_table}'[{col}]" for col in self.group_by_columns]
|
||||||
|
)
|
||||||
|
measure_clauses = ", ".join(
|
||||||
|
[f'"{measure}", [{measure}]' for measure in self.measures]
|
||||||
|
)
|
||||||
|
return f"EVALUATE SUMMARIZECOLUMNS({group_cols}, {measure_clauses})"
|
||||||
|
|
||||||
async def read_data(self) -> pd.DataFrame:
|
async def read_data(self) -> pd.DataFrame:
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -147,6 +147,8 @@ class TableConfig:
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
powerbi_table_name: str
|
powerbi_table_name: str
|
||||||
|
measures: List[str] = field(default_factory=list)
|
||||||
|
group_by_columns: List[str] = field(default_factory=list)
|
||||||
steps: List[Dict[str, Any]] = field(default_factory=list)
|
steps: List[Dict[str, Any]] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -184,6 +186,8 @@ class Preprocessor:
|
||||||
table_config = TableConfig(
|
table_config = TableConfig(
|
||||||
name=table_data.get("name", ""),
|
name=table_data.get("name", ""),
|
||||||
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||||
|
measures=table_data.get("measures", []),
|
||||||
|
group_by_columns=table_data.get("group_by_columns", []),
|
||||||
steps=table_data.get("steps", []),
|
steps=table_data.get("steps", []),
|
||||||
)
|
)
|
||||||
table_configs.append(table_config)
|
table_configs.append(table_config)
|
||||||
|
|
@ -253,6 +257,8 @@ class Preprocessor:
|
||||||
table_config = TableConfig(
|
table_config = TableConfig(
|
||||||
name=table_data.get("name", ""),
|
name=table_data.get("name", ""),
|
||||||
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
powerbi_table_name=table_data.get("powerbi_table_name", ""),
|
||||||
|
measures=table_data.get("measures", []),
|
||||||
|
group_by_columns=table_data.get("group_by_columns", []),
|
||||||
steps=table_data.get("steps", []),
|
steps=table_data.get("steps", []),
|
||||||
)
|
)
|
||||||
table_configs.append(table_config)
|
table_configs.append(table_config)
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,53 @@ async def update_db_with_config(
|
||||||
3. **Saves to local database**: The processed data is saved to the local SQLite
|
3. **Saves to local database**: The processed data is saved to the local SQLite
|
||||||
database with the specified table name.
|
database with the specified table name.
|
||||||
|
|
||||||
|
## Power BI Measures Support
|
||||||
|
|
||||||
|
In addition to retrieving physical/calculated columns from Power BI tables, you can
|
||||||
|
now retrieve Power BI measures using the optional `measures` and `group_by_columns`
|
||||||
|
fields in your table configuration.
|
||||||
|
|
||||||
|
### Retrieving Measures
|
||||||
|
|
||||||
|
Power BI measures are calculated values that live only in the model and are computed
|
||||||
|
at query time. To retrieve them alongside your table data, add a `measures` array
|
||||||
|
to your table configuration:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "Einkaufspreis",
|
||||||
|
"powerbi_table_name": "Einkaufspreis",
|
||||||
|
"measures": ["EP in CHF", "Gesamtbetrag in CHF"],
|
||||||
|
"steps": [...]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
This uses the DAX ADDCOLUMNS pattern: `EVALUATE ADDCOLUMNS('TableName', "MeasureName", [MeasureName], ...)`
|
||||||
|
|
||||||
|
### Grouping Measures
|
||||||
|
|
||||||
|
If your measures need to be aggregated by specific columns, add the `group_by_columns`
|
||||||
|
field. This is useful when measures are defined with aggregation functions:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "Einkaufspreis_Aggregated",
|
||||||
|
"powerbi_table_name": "Einkaufspreis",
|
||||||
|
"measures": ["EP in CHF", "Gesamtbetrag in CHF"],
|
||||||
|
"group_by_columns": ["m_Artikel"],
|
||||||
|
"steps": [...]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
This uses the DAX SUMMARIZECOLUMNS pattern: `EVALUATE SUMMARIZECOLUMNS('Table'[Column], "MeasureName", [MeasureName], ...)`
|
||||||
|
|
||||||
|
### Measure Name Formatting
|
||||||
|
|
||||||
|
- Measure names with spaces are automatically handled (e.g., "EP in CHF" becomes `[EP in CHF]` in DAX)
|
||||||
|
- If `measures` is empty or not provided, the standard table evaluation is used
|
||||||
|
- If `measures` is provided without `group_by_columns`, ADDCOLUMNS is used
|
||||||
|
- If both `measures` and `group_by_columns` are provided, SUMMARIZECOLUMNS is used
|
||||||
|
|
||||||
## Available Preprocessing Steps
|
## Available Preprocessing Steps
|
||||||
|
|
||||||
The following preprocessing steps are supported. Each step is specified as a
|
The following preprocessing steps are supported. Each step is specified as a
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ class TableConfigSchema(BaseModel):
|
||||||
Attributes:
|
Attributes:
|
||||||
name: The name to use for the table in the local SQLite database
|
name: The name to use for the table in the local SQLite database
|
||||||
powerbi_table_name: The name of the source table in Power BI dataset
|
powerbi_table_name: The name of the source table in Power BI dataset
|
||||||
|
measures: Optional list of Power BI measures to retrieve
|
||||||
|
group_by_columns: Optional list of columns to group by when retrieving measures
|
||||||
steps: List of preprocessing steps to apply to the table data
|
steps: List of preprocessing steps to apply to the table data
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -29,6 +31,16 @@ class TableConfigSchema(BaseModel):
|
||||||
description="Name of the table in the Power BI dataset",
|
description="Name of the table in the Power BI dataset",
|
||||||
example="data_full",
|
example="data_full",
|
||||||
)
|
)
|
||||||
|
measures: List[str] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="List of Power BI measure names to retrieve",
|
||||||
|
example=["EP in CHF", "Gesamtbetrag in CHF"],
|
||||||
|
)
|
||||||
|
group_by_columns: List[str] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="Columns to group by when retrieving measures (triggers SUMMARIZECOLUMNS)",
|
||||||
|
example=["m_Artikel"],
|
||||||
|
)
|
||||||
steps: List[Dict[str, Any]] = Field(
|
steps: List[Dict[str, Any]] = Field(
|
||||||
default_factory=list,
|
default_factory=list,
|
||||||
description="List of preprocessing steps to apply",
|
description="List of preprocessing steps to apply",
|
||||||
|
|
|
||||||
|
|
@ -54,6 +54,8 @@ class DataProcessorService:
|
||||||
dataset_id=settings.POWERBI_DATASET_ID,
|
dataset_id=settings.POWERBI_DATASET_ID,
|
||||||
access_token=self.access_token,
|
access_token=self.access_token,
|
||||||
table_name=table_config.powerbi_table_name,
|
table_name=table_config.powerbi_table_name,
|
||||||
|
measures=table_config.measures,
|
||||||
|
group_by_columns=table_config.group_by_columns,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 2: Read data from Power BI
|
# Step 2: Read data from Power BI
|
||||||
|
|
@ -147,6 +149,8 @@ class DataProcessorService:
|
||||||
dataset_id=settings.POWERBI_DATASET_ID,
|
dataset_id=settings.POWERBI_DATASET_ID,
|
||||||
access_token=self.access_token,
|
access_token=self.access_token,
|
||||||
table_name=table_config.powerbi_table_name,
|
table_name=table_config.powerbi_table_name,
|
||||||
|
measures=table_config.measures,
|
||||||
|
group_by_columns=table_config.group_by_columns,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 2: Read data from Power BI
|
# Step 2: Read data from Power BI
|
||||||
|
|
|
||||||
|
|
@ -30,3 +30,30 @@ tables:
|
||||||
"Einheit",
|
"Einheit",
|
||||||
"EP in CHF",
|
"EP in CHF",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Example: Retrieving Power BI measures with ADDCOLUMNS
|
||||||
|
# Uncomment to retrieve measures alongside all table columns
|
||||||
|
# - name: "Einkaufspreis_With_Measures"
|
||||||
|
# powerbi_table_name: "Einkaufspreis"
|
||||||
|
# measures:
|
||||||
|
# - "EP in CHF"
|
||||||
|
# - "Gesamtbetrag in CHF"
|
||||||
|
# steps:
|
||||||
|
# - to_numeric:
|
||||||
|
# column: "EP_CHF"
|
||||||
|
# errors: "coerce"
|
||||||
|
# - dropna:
|
||||||
|
# subset: ["EP_CHF"]
|
||||||
|
|
||||||
|
# Example: Retrieving aggregated measures with SUMMARIZECOLUMNS
|
||||||
|
# Uncomment to retrieve measures grouped by specific columns
|
||||||
|
# - name: "Einkaufspreis_Aggregated"
|
||||||
|
# powerbi_table_name: "Einkaufspreis"
|
||||||
|
# measures:
|
||||||
|
# - "EP in CHF"
|
||||||
|
# - "Gesamtbetrag in CHF"
|
||||||
|
# group_by_columns:
|
||||||
|
# - "m_Artikel"
|
||||||
|
# steps:
|
||||||
|
# - dropna:
|
||||||
|
# subset: ["m_Artikel"]
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue