gateway/modules/features/chatAlthaus/mainChatAlthaus.py
2025-11-10 16:51:28 +01:00

200 lines
7.1 KiB
Python

"""
Chat Althaus Data Scheduler
This module handles scheduled data updates for Althaus preprocessing service.
Triggers daily at 01:00 UTC to update the database with configuration.
"""
import logging
import aiohttp
from modules.services import getInterface as getServices
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Configuration
ALTHAUS_ENDPOINT = "https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataprocessor/update-db-with-config"
# JSON configuration for Althaus data processing
ALTHAUS_CONFIG_JSON = {
"tables": [
{
"name": "Artikel",
"powerbi_table_name": "Artikel",
"steps": [
{
"keep": {
"columns": [
"I_ID",
"Artikelbeschrieb",
"Artikelbezeichnung",
"Artikelgruppe",
"Artikelkategorie",
"Artikelkürzel",
"Artikelnummer",
"Einheit",
"Gesperrt",
"Keywords",
"Lieferant",
"Warengruppe"
]
}
},
{
"fillna": {
"column": "Lieferant",
"value": "Unbekannt"
}
}
]
},
{
"name": "Einkaufspreis",
"powerbi_table_name": "Einkaufspreis",
"steps": [
{
"to_numeric": {
"column": "EP_CHF",
"errors": "coerce"
}
},
{
"dropna": {
"subset": ["EP_CHF"]
}
}
]
}
]
}
# Global manager instance
_dataScheduler = None
class ManagerChatAlthaus:
"""Manages scheduled data updates for Althaus preprocessing service."""
def __init__(self, eventUser=None):
self.eventUser = eventUser
self.services = None
self.appEnvType = "dev"
try:
if not eventUser:
logger.error("Event user not found - Althaus scheduler requires event user")
else:
self.services = getServices(eventUser, None)
self.appEnvType = self.services.utils.configGet("APP_ENV_TYPE", "dev")
logger.info(f"ChatAlthaus manager initialized for env: {self.appEnvType}")
except Exception as e:
logger.error(f"Initialization error in ManagerChatAlthaus.__init__: {e}")
async def updateDatabaseWithConfig(self) -> bool:
"""Update the Althaus database with configuration."""
try:
logger.info("Starting Althaus database update with config")
# Get authorization secret from config
authSecret = APP_CONFIG.get("PREPROCESS_ALTHAUS_CHAT_SECRET")
if not authSecret:
logger.error("PREPROCESS_ALTHAUS_CHAT_SECRET not found in config")
return False
# Prepare headers with authorization
headers = {
"X-PP-API-Key": authSecret,
"Content-Type": "application/json"
}
# Make POST request
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
ALTHAUS_ENDPOINT,
headers=headers,
json=ALTHAUS_CONFIG_JSON
) as response:
if response.status in [200, 201]:
responseText = await response.text()
logger.info(f"Althaus database update successful: {response.status}")
logger.debug(f"Response: {responseText}")
return True
else:
errorText = await response.text()
logger.error(f"Althaus database update failed: {response.status} - {errorText}")
return False
except Exception as e:
logger.error(f"Error during Althaus database update: {str(e)}")
return False
async def performDataUpdate(eventUser) -> bool:
"""Perform Althaus data update.
This function is called by the scheduler and can be used independently.
Args:
eventUser: Event user to use for the update
Returns:
bool: True if update was successful, False otherwise
"""
try:
logger.info("Starting Althaus data update...")
if not eventUser:
logger.error("Event user not provided - cannot perform data update")
return False
manager = ManagerChatAlthaus(eventUser)
success = await manager.updateDatabaseWithConfig()
if success:
logger.info("Althaus data update completed successfully")
else:
logger.error("Althaus data update failed")
return success
except Exception as e:
logger.error(f"Error in performing Althaus data update: {str(e)}")
return False
def startDataScheduler(eventUser):
"""Initialize the global data scheduler with the eventUser."""
global _dataScheduler
if _dataScheduler is None:
_dataScheduler = ManagerChatAlthaus(eventUser)
logger.info("Global Althaus data scheduler initialized with eventUser")
try:
# Register scheduled job - daily at 01:00 UTC
if _dataScheduler.services:
_dataScheduler.services.utils.eventRegisterCron(
job_id="chatAlthaus.updateData",
func=scheduledDataUpdate,
cron_kwargs={"hour": "1", "minute": "0"},
replace_existing=True,
coalesce=True,
max_instances=1,
misfire_grace_time=3600, # 1 hour grace time for daily jobs
)
logger.info("Registered Althaus data scheduler (daily at 01:00 UTC)")
else:
logger.error("Services not available - cannot register scheduler")
except Exception as e:
logger.error(f"Failed to register scheduler for Althaus data update: {str(e)}")
return _dataScheduler
async def scheduledDataUpdate():
"""Scheduled data update function that uses the global scheduler."""
try:
global _dataScheduler
if _dataScheduler and _dataScheduler.eventUser:
return await performDataUpdate(_dataScheduler.eventUser)
else:
logger.error("Data scheduler not properly initialized - no eventUser")
return False
except Exception as e:
logger.error(f"Error in scheduled data update: {str(e)}")
return False