Merge pull request #53 from valueonag/feat/power-bi-tool

feat: add valueon powerbi tool
This commit is contained in:
ValueOn AG 2025-10-15 12:35:11 +02:00 committed by GitHub
commit e8e3b0c0db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -0,0 +1,362 @@
"""Power BI Query Tool for LangGraph.
This tool provides DAX query capabilities for Power BI datasets
via the Power BI REST API. Only read-only queries are allowed.
"""
import logging
import asyncio
import os
import re
import functools
from typing import Annotated
import anyio
import httpx
from langchain_core.tools import tool
from msal import ConfidentialClientApplication, SerializableTokenCache
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Configuration constants - encapsulated in this file
POWERBI_DATASET_ID = APP_CONFIG.get("VALUEON_POWERBI_DATASET_ID", "")
POWERBI_CLIENT_ID = APP_CONFIG.get("VALUEON_POWERBI_CLIENT_ID", "")
POWERBI_CLIENT_SECRET = APP_CONFIG.get("VALUEON_POWERBI_CLIENT_SECRET", "")
POWERBI_TENANT_ID = APP_CONFIG.get("VALUEON_POWERBI_TENANT_ID", "")
POWERBI_BASE_URL = "https://api.powerbi.com/v1.0/myorg"
POWERBI_AUTHORITY_BASE = "https://login.microsoftonline.com"
POWERBI_SCOPE = ["https://analysis.windows.net/powerbi/api/.default"]
# Limit results to prevent excessive context usage
MAX_ROWS_LIMIT = 100
def _validate_environment() -> tuple[bool, str]:
"""Validate that all required environment variables are set.
Returns:
A tuple of (is_valid, error_message)
"""
missing = []
if not POWERBI_DATASET_ID:
missing.append("POWERBI_DATASET_ID")
if not POWERBI_CLIENT_ID:
missing.append("POWERBI_CLIENT_ID")
if not POWERBI_CLIENT_SECRET:
missing.append("POWERBI_CLIENT_SECRET")
if not POWERBI_TENANT_ID:
missing.append("POWERBI_TENANT_ID")
if missing:
return False, f"Missing required environment variables: {', '.join(missing)}"
return True, ""
def _validate_dax_query(*, dax_query: str) -> tuple[bool, str]:
"""Validate that the query is a valid DAX query.
Args:
dax_query: The DAX query to validate
Returns:
A tuple of (is_valid, error_message)
"""
# Remove leading/trailing whitespace
normalized_query = dax_query.strip()
if not normalized_query:
return False, "Query cannot be empty"
# DAX queries typically start with EVALUATE, DEFINE, or are table expressions
# We'll be lenient and just check it's not trying to do something dangerous
# DAX is read-only by nature, but we validate structure
# Check for minimum length
if len(normalized_query) < 5:
return False, "Query is too short to be valid"
return True, ""
def _get_access_token_sync(
*,
tenant_id: str,
client_id: str,
client_secret: str,
authority_base: str = POWERBI_AUTHORITY_BASE,
cache: SerializableTokenCache | None = None,
) -> str:
"""Get Power BI access token using MSAL (synchronous).
Args:
tenant_id: Azure AD tenant ID
client_id: Application client ID
client_secret: Application client secret
authority_base: Azure AD authority base URL
cache: Optional token cache for reuse
Returns:
Access token string
Raises:
RuntimeError: If token acquisition fails
"""
authority = f"{authority_base}/{tenant_id}"
app = ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential=client_secret,
token_cache=cache,
)
# Try cache first; fall back to client credentials
result = app.acquire_token_silent(
POWERBI_SCOPE, account=None
) or app.acquire_token_for_client(scopes=POWERBI_SCOPE)
if "access_token" not in result:
raise RuntimeError(
f"MSAL token error: {result.get('error')} - {result.get('error_description')}"
)
return result["access_token"]
async def _get_access_token_async(
*,
tenant_id: str,
client_id: str,
client_secret: str,
**kwargs,
) -> str:
"""Get Power BI access token using MSAL (asynchronous).
Args:
tenant_id: Azure AD tenant ID
client_id: Application client ID
client_secret: Application client secret
**kwargs: Additional arguments for _get_access_token_sync
Returns:
Access token string
"""
# Create a partial function with arguments pre-filled
func = functools.partial(
_get_access_token_sync,
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
**kwargs,
)
# Offload the blocking MSAL HTTP call to a worker thread
return await anyio.to_thread.run_sync(func)
async def _execute_dax_query(
*, dax_query: str, dataset_id: str, access_token: str
) -> dict:
"""Execute a DAX query against Power BI dataset.
Args:
dax_query: The DAX query to execute
dataset_id: Power BI dataset ID
access_token: Access token for authentication
Returns:
Dictionary containing query results
Raises:
RuntimeError: If query execution fails
"""
url = f"{POWERBI_BASE_URL}/datasets/{dataset_id}/executeQueries"
body = {
"queries": [{"query": dax_query}],
"serializerSettings": {"includeNulls": True},
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(url, headers=headers, json=body)
if resp.status_code != 200:
raise RuntimeError(
f"Power BI executeQueries failed: {resp.status_code} - {resp.text}"
)
payload = resp.json()
try:
rows = payload["results"][0]["tables"][0]["rows"]
except (KeyError, IndexError) as e:
raise RuntimeError("Unexpected executeQueries response structure") from e
# Extract column names from the first row if available
if rows:
columns = list(rows[0].keys())
else:
columns = []
return {"columns": columns, "rows": rows}
def _strip_table_qualifier(*, column_name: str) -> str:
"""Strip table qualifier from column name.
Power BI often returns columns as 'Table[Column]'. This strips to 'Column'.
Args:
column_name: The column name to process
Returns:
Processed column name
"""
if "[" in column_name and column_name.endswith("]"):
return column_name.split("[", 1)[1][:-1]
return column_name
def _format_results(*, columns: list[str], rows: list[dict], max_rows: int) -> str:
"""Format query results into a readable string.
Args:
columns: List of column names
rows: List of row data (as dictionaries)
max_rows: Maximum number of rows to display
Returns:
Formatted string representation of the results
"""
total_rows = len(rows)
if total_rows == 0:
return "Query executed successfully but returned no results."
# Strip table qualifiers from column names
clean_columns = [_strip_table_qualifier(column_name=col) for col in columns]
# Limit rows to max_rows
display_rows = rows[:max_rows]
truncated = total_rows > max_rows
# Calculate column widths
col_widths = [len(str(col)) for col in clean_columns]
for row in display_rows:
for i, col in enumerate(columns):
value = row.get(col, "")
col_widths[i] = max(col_widths[i], len(str(value)))
# Build header
header_parts = []
for col, width in zip(clean_columns, col_widths):
header_parts.append(str(col).ljust(width))
header = " | ".join(header_parts)
separator = "-" * len(header)
# Build rows
row_lines = []
for row in display_rows:
row_parts = []
for col, width in zip(columns, col_widths):
value = row.get(col, "")
row_parts.append(str(value).ljust(width))
row_lines.append(" | ".join(row_parts))
# Combine all parts
result_parts = [
f"Query returned {total_rows} row(s):",
]
if truncated:
result_parts.append(
f"(Results limited to {max_rows} rows for context efficiency)\n"
)
else:
result_parts.append("")
result_parts.extend([header, separator, "\n".join(row_lines)])
return "\n".join(result_parts)
@tool
async def query_powerbi_data(
dax_query: Annotated[str, "The DAX query to execute against the Power BI dataset"],
) -> str:
"""Execute a DAX query against the Power BI dataset to access warehouse inventory data.
This tool provides access to a Power BI table called 'data_full' which contains
articles available in the warehouse of the user. Use DAX (Data Analysis Expressions)
queries to retrieve and analyze this inventory data.
Available table:
- 'data_full': Contains warehouse inventory articles and their details
Common query patterns:
- View all data: EVALUATE 'data_full'
- With filter: EVALUATE FILTER('data_full', [Column] = "Value")
- Top N rows: EVALUATE TOPN(10, 'data_full', [Column], DESC)
- Calculated: EVALUATE SUMMARIZE('data_full', [Column1], "Total", SUM([Column2]))
Results are limited to 100 rows maximum for efficiency.
Args:
dax_query: The DAX query to execute (e.g., "EVALUATE 'data_full'")
Returns:
A formatted string containing the query results with columns and rows
"""
try:
# Validate environment configuration
is_valid_env, error_msg = _validate_environment()
if not is_valid_env:
logger.error(f"Environment validation failed: {error_msg}")
return f"Configuration Error: {error_msg}"
# Validate the query
is_valid_query, error_msg = _validate_dax_query(dax_query=dax_query)
if not is_valid_query:
logger.warning(f"Invalid query attempt: {dax_query[:100]}...")
return f"Query Validation Error: {error_msg}"
logger.info(f"Executing Power BI query: {dax_query[:100]}...")
# Get access token
access_token = await _get_access_token_async(
tenant_id=POWERBI_TENANT_ID,
client_id=POWERBI_CLIENT_ID,
client_secret=POWERBI_CLIENT_SECRET,
)
# Execute the query
result = await _execute_dax_query(
dax_query=dax_query,
dataset_id=POWERBI_DATASET_ID,
access_token=access_token,
)
# Format and return results
formatted_output = _format_results(
columns=result["columns"], rows=result["rows"], max_rows=MAX_ROWS_LIMIT
)
logger.info(
f"Query completed successfully, returned {len(result['rows'])} row(s)"
)
return formatted_output
except RuntimeError as e:
logger.error(f"Runtime error in query_powerbi_data tool: {str(e)}")
return f"Error executing query: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error in query_powerbi_data tool: {str(e)}")
return f"Unexpected error: {str(e)}"