gateway/modules/shared/eventManagement.py

145 lines
4.7 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import asyncio
import logging
from typing import Callable, Optional, Dict, Any
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from zoneinfo import ZoneInfo
logger = logging.getLogger(__name__)
class EventManagement:
"""
Generic event scheduler wrapper around APScheduler's AsyncIOScheduler.
Features:
- start/stop lifecycle
- register timed events with either cron or interval style
- remove events by id
"""
def __init__(self, timezone: str = "Europe/Zurich"):
self._timezone = ZoneInfo(timezone)
self._scheduler: Optional[AsyncIOScheduler] = None
self._event_loop: Optional[asyncio.AbstractEventLoop] = None
def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""Set the event loop for the scheduler (call from lifespan before start)."""
self._event_loop = loop
@property
def scheduler(self) -> AsyncIOScheduler:
if self._scheduler is None:
kwargs = {"timezone": self._timezone}
if self._event_loop is not None:
kwargs["event_loop"] = self._event_loop
self._scheduler = AsyncIOScheduler(**kwargs)
return self._scheduler
def start(self) -> None:
if self._event_loop is None:
try:
self._event_loop = asyncio.get_running_loop()
logger.debug("EventManagement: using get_running_loop()")
except RuntimeError:
pass
if not self.scheduler.running:
self.scheduler.start()
logger.info("EventManagement scheduler started")
def stop(self) -> None:
if self._scheduler and self._scheduler.running:
try:
self._scheduler.shutdown(wait=False)
logger.info("EventManagement scheduler stopped")
except Exception as exc:
logger.error(f"Error stopping scheduler: {exc}")
def registerCron(
self,
jobId: str,
func: Callable,
*,
cronKwargs: Optional[Dict[str, Any]] = None,
replaceExisting: bool = True,
coalesce: bool = True,
maxInstances: int = 1,
misfireGraceTime: int = 1800,
**kwargs: Any,
) -> None:
"""
Register a job using CronTrigger. Provide cron fields as keyword args, e.g.:
cronKwargs={"minute": "0,20,40"}
"""
trigger = CronTrigger(timezone=self._timezone, **(cronKwargs or {}))
self.scheduler.add_job(
func,
trigger,
id=jobId,
replace_existing=replaceExisting,
coalesce=coalesce,
max_instances=maxInstances,
misfire_grace_time=misfireGraceTime,
**kwargs,
)
logger.info(f"Registered cron job '{jobId}' with args {cronKwargs}")
def registerInterval(
self,
jobId: str,
func: Callable,
*,
seconds: Optional[int] = None,
minutes: Optional[int] = None,
hours: Optional[int] = None,
replaceExisting: bool = True,
coalesce: bool = True,
maxInstances: int = 1,
misfireGraceTime: int = 1800,
**kwargs: Any,
) -> None:
"""
Register a job using IntervalTrigger.
Only passes non-None interval components (IntervalTrigger fails on None).
"""
trigger_kwargs: Dict[str, Any] = {"timezone": self._timezone}
if seconds is not None:
trigger_kwargs["seconds"] = seconds
if minutes is not None:
trigger_kwargs["minutes"] = minutes
if hours is not None:
trigger_kwargs["hours"] = hours
if len(trigger_kwargs) <= 1:
raise ValueError("At least one of seconds, minutes, hours must be provided")
trigger = IntervalTrigger(**trigger_kwargs)
self.scheduler.add_job(
func,
trigger,
id=jobId,
replace_existing=replaceExisting,
coalesce=coalesce,
max_instances=maxInstances,
misfire_grace_time=misfireGraceTime,
**kwargs,
)
logger.info(
f"Registered interval job '{jobId}' (h={hours}, m={minutes}, s={seconds})"
)
def remove(self, jobId: str) -> None:
try:
self.scheduler.remove_job(jobId)
logger.info(f"Removed job '{jobId}'")
except Exception as exc:
logger.debug(f"Could not remove job '{jobId}': {exc}")
# Singleton instance for easy import and reuse
eventManager = EventManagement()