# Copyright (c) 2025 Patrick Motsch # All rights reserved. import asyncio import logging from typing import Callable, Optional, Dict, Any from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from zoneinfo import ZoneInfo logger = logging.getLogger(__name__) class EventManagement: """ Generic event scheduler wrapper around APScheduler's AsyncIOScheduler. Features: - start/stop lifecycle - register timed events with either cron or interval style - remove events by id """ def __init__(self, timezone: str = "Europe/Zurich"): self._timezone = ZoneInfo(timezone) self._scheduler: Optional[AsyncIOScheduler] = None self._event_loop: Optional[asyncio.AbstractEventLoop] = None def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: """Set the event loop for the scheduler (call from lifespan before start).""" self._event_loop = loop @property def scheduler(self) -> AsyncIOScheduler: if self._scheduler is None: kwargs = {"timezone": self._timezone} if self._event_loop is not None: kwargs["event_loop"] = self._event_loop self._scheduler = AsyncIOScheduler(**kwargs) return self._scheduler def start(self) -> None: if self._event_loop is None: try: self._event_loop = asyncio.get_running_loop() logger.debug("EventManagement: using get_running_loop()") except RuntimeError: pass if not self.scheduler.running: self.scheduler.start() logger.info("EventManagement scheduler started") def stop(self) -> None: if self._scheduler and self._scheduler.running: try: self._scheduler.shutdown(wait=False) logger.info("EventManagement scheduler stopped") except Exception as exc: logger.error(f"Error stopping scheduler: {exc}") def registerCron( self, jobId: str, func: Callable, *, cronKwargs: Optional[Dict[str, Any]] = None, replaceExisting: bool = True, coalesce: bool = True, maxInstances: int = 1, misfireGraceTime: int = 1800, **kwargs: Any, ) -> None: """ Register a job using CronTrigger. Provide cron fields as keyword args, e.g.: cronKwargs={"minute": "0,20,40"} """ trigger = CronTrigger(timezone=self._timezone, **(cronKwargs or {})) self.scheduler.add_job( func, trigger, id=jobId, replace_existing=replaceExisting, coalesce=coalesce, max_instances=maxInstances, misfire_grace_time=misfireGraceTime, **kwargs, ) logger.info(f"Registered cron job '{jobId}' with args {cronKwargs}") def registerInterval( self, jobId: str, func: Callable, *, seconds: Optional[int] = None, minutes: Optional[int] = None, hours: Optional[int] = None, replaceExisting: bool = True, coalesce: bool = True, maxInstances: int = 1, misfireGraceTime: int = 1800, **kwargs: Any, ) -> None: """ Register a job using IntervalTrigger. Only passes non-None interval components (IntervalTrigger fails on None). """ trigger_kwargs: Dict[str, Any] = {"timezone": self._timezone} if seconds is not None: trigger_kwargs["seconds"] = seconds if minutes is not None: trigger_kwargs["minutes"] = minutes if hours is not None: trigger_kwargs["hours"] = hours if len(trigger_kwargs) <= 1: raise ValueError("At least one of seconds, minutes, hours must be provided") trigger = IntervalTrigger(**trigger_kwargs) self.scheduler.add_job( func, trigger, id=jobId, replace_existing=replaceExisting, coalesce=coalesce, max_instances=maxInstances, misfire_grace_time=misfireGraceTime, **kwargs, ) logger.info( f"Registered interval job '{jobId}' (h={hours}, m={minutes}, s={seconds})" ) def remove(self, jobId: str) -> None: try: self.scheduler.remove_job(jobId) logger.info(f"Removed job '{jobId}'") except Exception as exc: logger.debug(f"Could not remove job '{jobId}': {exc}") # Singleton instance for easy import and reuse eventManager = EventManagement()