gateway/modules/shared/eventManagement.py
2025-10-31 00:28:09 +01:00

120 lines
3.6 KiB
Python

import logging
from typing import Callable, Optional, Dict, Any
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from zoneinfo import ZoneInfo
logger = logging.getLogger(__name__)
class EventManagement:
"""
Generic event scheduler wrapper around APScheduler's AsyncIOScheduler.
Features:
- start/stop lifecycle
- register timed events with either cron or interval style
- remove events by id
"""
def __init__(self, timezone: str = "Europe/Zurich"):
self._timezone = ZoneInfo(timezone)
self._scheduler: Optional[AsyncIOScheduler] = None
@property
def scheduler(self) -> AsyncIOScheduler:
if self._scheduler is None:
self._scheduler = AsyncIOScheduler(timezone=self._timezone)
return self._scheduler
def start(self) -> None:
if not self.scheduler.running:
self.scheduler.start()
logger.info("EventManagement scheduler started")
def stop(self) -> None:
if self._scheduler and self._scheduler.running:
try:
self._scheduler.shutdown(wait=False)
logger.info("EventManagement scheduler stopped")
except Exception as exc:
logger.error(f"Error stopping scheduler: {exc}")
def registerCron(
self,
jobId: str,
func: Callable,
*,
cronKwargs: Optional[Dict[str, Any]] = None,
replaceExisting: bool = True,
coalesce: bool = True,
maxInstances: int = 1,
misfireGraceTime: int = 1800,
**kwargs: Any,
) -> None:
"""
Register a job using CronTrigger. Provide cron fields as keyword args, e.g.:
cronKwargs={"minute": "0,20,40"}
"""
trigger = CronTrigger(timezone=self._timezone, **(cronKwargs or {}))
self.scheduler.add_job(
func,
trigger,
id=jobId,
replace_existing=replaceExisting,
coalesce=coalesce,
max_instances=maxInstances,
misfire_grace_time=misfireGraceTime,
**kwargs,
)
logger.info(f"Registered cron job '{jobId}' with args {cronKwargs}")
def registerInterval(
self,
jobId: str,
func: Callable,
*,
seconds: Optional[int] = None,
minutes: Optional[int] = None,
hours: Optional[int] = None,
replaceExisting: bool = True,
coalesce: bool = True,
maxInstances: int = 1,
misfireGraceTime: int = 1800,
**kwargs: Any,
) -> None:
"""
Register a job using IntervalTrigger.
"""
trigger = IntervalTrigger(
seconds=seconds, minutes=minutes, hours=hours, timezone=self._timezone
)
self.scheduler.add_job(
func,
trigger,
id=jobId,
replace_existing=replaceExisting,
coalesce=coalesce,
max_instances=maxInstances,
misfire_grace_time=misfireGraceTime,
**kwargs,
)
logger.info(
f"Registered interval job '{jobId}' (h={hours}, m={minutes}, s={seconds})"
)
def remove(self, jobId: str) -> None:
try:
self.scheduler.remove_job(jobId)
logger.info(f"Removed job '{jobId}'")
except Exception as exc:
logger.warning(f"Could not remove job '{jobId}': {exc}")
# Singleton instance for easy import and reuse
eventManager = EventManagement()