import logging from typing import Callable, Optional, Dict, Any from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from zoneinfo import ZoneInfo logger = logging.getLogger(__name__) class EventManagement: """ Generic event scheduler wrapper around APScheduler's AsyncIOScheduler. Features: - start/stop lifecycle - register timed events with either cron or interval style - remove events by id """ def __init__(self, timezone: str = "Europe/Zurich"): self._timezone = ZoneInfo(timezone) self._scheduler: Optional[AsyncIOScheduler] = None @property def scheduler(self) -> AsyncIOScheduler: if self._scheduler is None: self._scheduler = AsyncIOScheduler(timezone=self._timezone) return self._scheduler def start(self) -> None: if not self.scheduler.running: self.scheduler.start() logger.info("EventManagement scheduler started") def stop(self) -> None: if self._scheduler and self._scheduler.running: try: self._scheduler.shutdown(wait=False) logger.info("EventManagement scheduler stopped") except Exception as exc: logger.error(f"Error stopping scheduler: {exc}") def registerCron( self, jobId: str, func: Callable, *, cronKwargs: Optional[Dict[str, Any]] = None, replaceExisting: bool = True, coalesce: bool = True, maxInstances: int = 1, misfireGraceTime: int = 1800, **kwargs: Any, ) -> None: """ Register a job using CronTrigger. Provide cron fields as keyword args, e.g.: cronKwargs={"minute": "0,20,40"} """ trigger = CronTrigger(timezone=self._timezone, **(cronKwargs or {})) self.scheduler.add_job( func, trigger, id=jobId, replace_existing=replaceExisting, coalesce=coalesce, max_instances=maxInstances, misfire_grace_time=misfireGraceTime, **kwargs, ) logger.info(f"Registered cron job '{jobId}' with args {cronKwargs}") def registerInterval( self, jobId: str, func: Callable, *, seconds: Optional[int] = None, minutes: Optional[int] = None, hours: Optional[int] = None, replaceExisting: bool = True, coalesce: bool = True, maxInstances: int = 1, misfireGraceTime: int = 1800, **kwargs: Any, ) -> None: """ Register a job using IntervalTrigger. """ trigger = IntervalTrigger( seconds=seconds, minutes=minutes, hours=hours, timezone=self._timezone ) self.scheduler.add_job( func, trigger, id=jobId, replace_existing=replaceExisting, coalesce=coalesce, max_instances=maxInstances, misfire_grace_time=misfireGraceTime, **kwargs, ) logger.info( f"Registered interval job '{jobId}' (h={hours}, m={minutes}, s={seconds})" ) def remove(self, jobId: str) -> None: try: self.scheduler.remove_job(jobId) logger.info(f"Removed job '{jobId}'") except Exception as exc: logger.warning(f"Could not remove job '{jobId}': {exc}") # Singleton instance for easy import and reuse eventManager = EventManagement()