gateway/modules/shared/eventManagement.py
2026-02-09 12:49:35 +01:00

122 lines
3.6 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import logging
from typing import Callable, Optional, Dict, Any
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from zoneinfo import ZoneInfo
logger = logging.getLogger(__name__)
class EventManagement:
"""
Generic event scheduler wrapper around APScheduler's AsyncIOScheduler.
Features:
- start/stop lifecycle
- register timed events with either cron or interval style
- remove events by id
"""
def __init__(self, timezone: str = "Europe/Zurich"):
self._timezone = ZoneInfo(timezone)
self._scheduler: Optional[AsyncIOScheduler] = None
@property
def scheduler(self) -> AsyncIOScheduler:
if self._scheduler is None:
self._scheduler = AsyncIOScheduler(timezone=self._timezone)
return self._scheduler
def start(self) -> None:
if not self.scheduler.running:
self.scheduler.start()
logger.info("EventManagement scheduler started")
def stop(self) -> None:
if self._scheduler and self._scheduler.running:
try:
self._scheduler.shutdown(wait=False)
logger.info("EventManagement scheduler stopped")
except Exception as exc:
logger.error(f"Error stopping scheduler: {exc}")
def registerCron(
self,
jobId: str,
func: Callable,
*,
cronKwargs: Optional[Dict[str, Any]] = None,
replaceExisting: bool = True,
coalesce: bool = True,
maxInstances: int = 1,
misfireGraceTime: int = 1800,
**kwargs: Any,
) -> None:
"""
Register a job using CronTrigger. Provide cron fields as keyword args, e.g.:
cronKwargs={"minute": "0,20,40"}
"""
trigger = CronTrigger(timezone=self._timezone, **(cronKwargs or {}))
self.scheduler.add_job(
func,
trigger,
id=jobId,
replace_existing=replaceExisting,
coalesce=coalesce,
max_instances=maxInstances,
misfire_grace_time=misfireGraceTime,
**kwargs,
)
logger.info(f"Registered cron job '{jobId}' with args {cronKwargs}")
def registerInterval(
self,
jobId: str,
func: Callable,
*,
seconds: Optional[int] = None,
minutes: Optional[int] = None,
hours: Optional[int] = None,
replaceExisting: bool = True,
coalesce: bool = True,
maxInstances: int = 1,
misfireGraceTime: int = 1800,
**kwargs: Any,
) -> None:
"""
Register a job using IntervalTrigger.
"""
trigger = IntervalTrigger(
seconds=seconds, minutes=minutes, hours=hours, timezone=self._timezone
)
self.scheduler.add_job(
func,
trigger,
id=jobId,
replace_existing=replaceExisting,
coalesce=coalesce,
max_instances=maxInstances,
misfire_grace_time=misfireGraceTime,
**kwargs,
)
logger.info(
f"Registered interval job '{jobId}' (h={hours}, m={minutes}, s={seconds})"
)
def remove(self, jobId: str) -> None:
try:
self.scheduler.remove_job(jobId)
logger.info(f"Removed job '{jobId}'")
except Exception as exc:
logger.debug(f"Could not remove job '{jobId}': {exc}")
# Singleton instance for easy import and reuse
eventManager = EventManagement()