service-preprocessing/tests/dataprocessor/domain/test_powerbi_reader.py
2025-10-13 10:27:52 +02:00

50 lines
1.6 KiB
Python

"""Simple development test for PowerBIReader.read_data()"""
import pytest
from src.dataprocessor.domain.powerbi_reader import PowerBIReader
from src.settings import settings
@pytest.mark.asyncio
async def test_read_data_prints_dataframe_info() -> None:
"""Test PowerBIReader.read_data() and print DataFrame info for development."""
# Get access token
print("\nGetting access token...")
access_token = await PowerBIReader._get_access_token_async(
tenant_id=settings.POWERBI_TENANT_ID,
client_id=settings.POWERBI_CLIENT_ID,
client_secret=settings.POWERBI_CLIENT_SECRET,
)
print("✓ Access token acquired")
# Create PowerBIReader instance
print("Creating PowerBIReader instance...")
reader = await PowerBIReader.create(
dataset_id=settings.POWERBI_DATASET_ID,
access_token=access_token,
table_name=settings.POWERBI_TABLE_NAME,
)
print(f"✓ Reader created for table: {settings.POWERBI_TABLE_NAME}")
# Call read_data() once
print("Fetching data from Power BI...")
df = await reader.read_data()
print("✓ Data fetched successfully")
# Print DataFrame information
print("=" * 80)
print("DATAFRAME INFO")
print("=" * 80)
print(f"\nShape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
print(f"\nData Types:\n{df.dtypes}")
print("\n" + "=" * 80)
print("DATAFRAME HEAD (5 rows)")
print("=" * 80)
print(df.head(5))
print("\n")
# Basic assertions to make it a valid test
assert df is not None
assert len(df.columns) > 0