diff --git a/modules/features/chatBot/chatbotTools/customerTools/toolValueOnPowerBi.py b/modules/features/chatBot/chatbotTools/customerTools/toolValueOnPowerBi.py new file mode 100644 index 00000000..7c00ee6a --- /dev/null +++ b/modules/features/chatBot/chatbotTools/customerTools/toolValueOnPowerBi.py @@ -0,0 +1,362 @@ +"""Power BI Query Tool for LangGraph. + +This tool provides DAX query capabilities for Power BI datasets +via the Power BI REST API. Only read-only queries are allowed. +""" + +import logging +import asyncio +import os +import re +import functools +from typing import Annotated + +import anyio +import httpx +from langchain_core.tools import tool +from msal import ConfidentialClientApplication, SerializableTokenCache + +from modules.shared.configuration import APP_CONFIG + +logger = logging.getLogger(__name__) + + +# Configuration constants - encapsulated in this file +POWERBI_DATASET_ID = APP_CONFIG.get("VALUEON_POWERBI_DATASET_ID", "") +POWERBI_CLIENT_ID = APP_CONFIG.get("VALUEON_POWERBI_CLIENT_ID", "") +POWERBI_CLIENT_SECRET = APP_CONFIG.get("VALUEON_POWERBI_CLIENT_SECRET", "") +POWERBI_TENANT_ID = APP_CONFIG.get("VALUEON_POWERBI_TENANT_ID", "") +POWERBI_BASE_URL = "https://api.powerbi.com/v1.0/myorg" +POWERBI_AUTHORITY_BASE = "https://login.microsoftonline.com" +POWERBI_SCOPE = ["https://analysis.windows.net/powerbi/api/.default"] + +# Limit results to prevent excessive context usage +MAX_ROWS_LIMIT = 100 + + +def _validate_environment() -> tuple[bool, str]: + """Validate that all required environment variables are set. + + Returns: + A tuple of (is_valid, error_message) + """ + missing = [] + if not POWERBI_DATASET_ID: + missing.append("POWERBI_DATASET_ID") + if not POWERBI_CLIENT_ID: + missing.append("POWERBI_CLIENT_ID") + if not POWERBI_CLIENT_SECRET: + missing.append("POWERBI_CLIENT_SECRET") + if not POWERBI_TENANT_ID: + missing.append("POWERBI_TENANT_ID") + + if missing: + return False, f"Missing required environment variables: {', '.join(missing)}" + + return True, "" + + +def _validate_dax_query(*, dax_query: str) -> tuple[bool, str]: + """Validate that the query is a valid DAX query. + + Args: + dax_query: The DAX query to validate + + Returns: + A tuple of (is_valid, error_message) + """ + # Remove leading/trailing whitespace + normalized_query = dax_query.strip() + + if not normalized_query: + return False, "Query cannot be empty" + + # DAX queries typically start with EVALUATE, DEFINE, or are table expressions + # We'll be lenient and just check it's not trying to do something dangerous + # DAX is read-only by nature, but we validate structure + + # Check for minimum length + if len(normalized_query) < 5: + return False, "Query is too short to be valid" + + return True, "" + + +def _get_access_token_sync( + *, + tenant_id: str, + client_id: str, + client_secret: str, + authority_base: str = POWERBI_AUTHORITY_BASE, + cache: SerializableTokenCache | None = None, +) -> str: + """Get Power BI access token using MSAL (synchronous). + + Args: + tenant_id: Azure AD tenant ID + client_id: Application client ID + client_secret: Application client secret + authority_base: Azure AD authority base URL + cache: Optional token cache for reuse + + Returns: + Access token string + + Raises: + RuntimeError: If token acquisition fails + """ + authority = f"{authority_base}/{tenant_id}" + + app = ConfidentialClientApplication( + client_id=client_id, + authority=authority, + client_credential=client_secret, + token_cache=cache, + ) + + # Try cache first; fall back to client credentials + result = app.acquire_token_silent( + POWERBI_SCOPE, account=None + ) or app.acquire_token_for_client(scopes=POWERBI_SCOPE) + + if "access_token" not in result: + raise RuntimeError( + f"MSAL token error: {result.get('error')} - {result.get('error_description')}" + ) + + return result["access_token"] + + +async def _get_access_token_async( + *, + tenant_id: str, + client_id: str, + client_secret: str, + **kwargs, +) -> str: + """Get Power BI access token using MSAL (asynchronous). + + Args: + tenant_id: Azure AD tenant ID + client_id: Application client ID + client_secret: Application client secret + **kwargs: Additional arguments for _get_access_token_sync + + Returns: + Access token string + """ + # Create a partial function with arguments pre-filled + func = functools.partial( + _get_access_token_sync, + tenant_id=tenant_id, + client_id=client_id, + client_secret=client_secret, + **kwargs, + ) + # Offload the blocking MSAL HTTP call to a worker thread + return await anyio.to_thread.run_sync(func) + + +async def _execute_dax_query( + *, dax_query: str, dataset_id: str, access_token: str +) -> dict: + """Execute a DAX query against Power BI dataset. + + Args: + dax_query: The DAX query to execute + dataset_id: Power BI dataset ID + access_token: Access token for authentication + + Returns: + Dictionary containing query results + + Raises: + RuntimeError: If query execution fails + """ + url = f"{POWERBI_BASE_URL}/datasets/{dataset_id}/executeQueries" + + body = { + "queries": [{"query": dax_query}], + "serializerSettings": {"includeNulls": True}, + } + + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.post(url, headers=headers, json=body) + + if resp.status_code != 200: + raise RuntimeError( + f"Power BI executeQueries failed: {resp.status_code} - {resp.text}" + ) + + payload = resp.json() + + try: + rows = payload["results"][0]["tables"][0]["rows"] + except (KeyError, IndexError) as e: + raise RuntimeError("Unexpected executeQueries response structure") from e + + # Extract column names from the first row if available + if rows: + columns = list(rows[0].keys()) + else: + columns = [] + + return {"columns": columns, "rows": rows} + + +def _strip_table_qualifier(*, column_name: str) -> str: + """Strip table qualifier from column name. + + Power BI often returns columns as 'Table[Column]'. This strips to 'Column'. + + Args: + column_name: The column name to process + + Returns: + Processed column name + """ + if "[" in column_name and column_name.endswith("]"): + return column_name.split("[", 1)[1][:-1] + return column_name + + +def _format_results(*, columns: list[str], rows: list[dict], max_rows: int) -> str: + """Format query results into a readable string. + + Args: + columns: List of column names + rows: List of row data (as dictionaries) + max_rows: Maximum number of rows to display + + Returns: + Formatted string representation of the results + """ + total_rows = len(rows) + + if total_rows == 0: + return "Query executed successfully but returned no results." + + # Strip table qualifiers from column names + clean_columns = [_strip_table_qualifier(column_name=col) for col in columns] + + # Limit rows to max_rows + display_rows = rows[:max_rows] + truncated = total_rows > max_rows + + # Calculate column widths + col_widths = [len(str(col)) for col in clean_columns] + for row in display_rows: + for i, col in enumerate(columns): + value = row.get(col, "") + col_widths[i] = max(col_widths[i], len(str(value))) + + # Build header + header_parts = [] + for col, width in zip(clean_columns, col_widths): + header_parts.append(str(col).ljust(width)) + header = " | ".join(header_parts) + separator = "-" * len(header) + + # Build rows + row_lines = [] + for row in display_rows: + row_parts = [] + for col, width in zip(columns, col_widths): + value = row.get(col, "") + row_parts.append(str(value).ljust(width)) + row_lines.append(" | ".join(row_parts)) + + # Combine all parts + result_parts = [ + f"Query returned {total_rows} row(s):", + ] + + if truncated: + result_parts.append( + f"(Results limited to {max_rows} rows for context efficiency)\n" + ) + else: + result_parts.append("") + + result_parts.extend([header, separator, "\n".join(row_lines)]) + + return "\n".join(result_parts) + + +@tool +async def query_powerbi_data( + dax_query: Annotated[str, "The DAX query to execute against the Power BI dataset"], +) -> str: + """Execute a DAX query against the Power BI dataset to access warehouse inventory data. + + This tool provides access to a Power BI table called 'data_full' which contains + articles available in the warehouse of the user. Use DAX (Data Analysis Expressions) + queries to retrieve and analyze this inventory data. + + Available table: + - 'data_full': Contains warehouse inventory articles and their details + + Common query patterns: + - View all data: EVALUATE 'data_full' + - With filter: EVALUATE FILTER('data_full', [Column] = "Value") + - Top N rows: EVALUATE TOPN(10, 'data_full', [Column], DESC) + - Calculated: EVALUATE SUMMARIZE('data_full', [Column1], "Total", SUM([Column2])) + + Results are limited to 100 rows maximum for efficiency. + + Args: + dax_query: The DAX query to execute (e.g., "EVALUATE 'data_full'") + + Returns: + A formatted string containing the query results with columns and rows + """ + try: + # Validate environment configuration + is_valid_env, error_msg = _validate_environment() + if not is_valid_env: + logger.error(f"Environment validation failed: {error_msg}") + return f"Configuration Error: {error_msg}" + + # Validate the query + is_valid_query, error_msg = _validate_dax_query(dax_query=dax_query) + if not is_valid_query: + logger.warning(f"Invalid query attempt: {dax_query[:100]}...") + return f"Query Validation Error: {error_msg}" + + logger.info(f"Executing Power BI query: {dax_query[:100]}...") + + # Get access token + access_token = await _get_access_token_async( + tenant_id=POWERBI_TENANT_ID, + client_id=POWERBI_CLIENT_ID, + client_secret=POWERBI_CLIENT_SECRET, + ) + + # Execute the query + result = await _execute_dax_query( + dax_query=dax_query, + dataset_id=POWERBI_DATASET_ID, + access_token=access_token, + ) + + # Format and return results + formatted_output = _format_results( + columns=result["columns"], rows=result["rows"], max_rows=MAX_ROWS_LIMIT + ) + + logger.info( + f"Query completed successfully, returned {len(result['rows'])} row(s)" + ) + return formatted_output + + except RuntimeError as e: + logger.error(f"Runtime error in query_powerbi_data tool: {str(e)}") + return f"Error executing query: {str(e)}" + except Exception as e: + logger.error(f"Unexpected error in query_powerbi_data tool: {str(e)}") + return f"Unexpected error: {str(e)}"