gateway/modules/workflows/scheduler/mainScheduler.py
2026-04-13 00:38:47 +02:00

465 lines
18 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Consolidated Workflow Scheduler.
Replaces subAutomation2Schedule with v1-style incremental sync patterns:
- eventId tracking on AutoWorkflow for change detection
- replaceExisting=True for idempotent re-registration
- active check before execution
- Capped execution log
"""
import asyncio
import logging
from typing import Any, Dict, Optional
from modules.shared.eventManagement import eventManager
from modules.shared.i18nRegistry import resolveText
logger = logging.getLogger(__name__)
_main_loop = None
JOB_ID_PREFIX = "graphicalEditor."
_CALLBACK_NAME = "graphicalEditor.workflow.changed"
def _setMainLoop(loop) -> None:
global _main_loop
_main_loop = loop
class WorkflowScheduler:
"""Consolidated scheduler with v1 incremental sync patterns."""
def __init__(self):
self._eventUser = None
self._registered: Dict[str, str] = {}
def start(self, eventUser) -> bool:
"""Start scheduler: sync workflows, register callback for changes."""
if not eventUser:
logger.warning("WorkflowScheduler: No event user provided, skipping")
return False
self._eventUser = eventUser
try:
eventManager.start()
self._syncScheduledWorkflows()
logger.info("WorkflowScheduler: initial sync complete")
self._delayedSync()
from modules.shared.callbackRegistry import callbackRegistry
callbackRegistry.register(_CALLBACK_NAME, self._onWorkflowChanged)
logger.info("WorkflowScheduler: callback registered for %s", _CALLBACK_NAME)
except Exception as e:
logger.error("WorkflowScheduler: Failed to start: %s", e)
return False
return True
def stop(self) -> bool:
"""Remove all scheduled workflow jobs."""
try:
self._removeAllJobs()
logger.info("WorkflowScheduler: all jobs removed")
except Exception as e:
logger.warning("WorkflowScheduler: error during stop: %s", e)
return True
def _syncScheduledWorkflows(self) -> Dict[str, Any]:
"""
Incremental sync: only re-register jobs whose eventId has changed.
Uses AutoWorkflow.eventId for change detection (v1 pattern).
"""
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getAllWorkflowsForScheduling
from modules.workflows.automation2.scheduleCron import parse_cron_to_kwargs
items = getAllWorkflowsForScheduling()
logger.info("WorkflowScheduler: found %d workflow(s) with trigger.schedule+cron", len(items))
newRegistered: Dict[str, str] = {}
activeWorkflowIds = set()
for item in items:
workflowId = item.get("workflowId")
if not workflowId:
continue
activeWorkflowIds.add(workflowId)
cron = item.get("cron")
mandateId = item.get("mandateId")
instanceId = item.get("featureInstanceId")
if not instanceId or not cron:
continue
jobId = f"{JOB_ID_PREFIX}{workflowId}"
entryPointId = item.get("entryPointId")
workflow = item.get("workflow") or {}
asyncHandler = self._createHandler(
workflowId=workflowId,
mandateId=mandateId,
instanceId=instanceId,
entryPointId=entryPointId,
workflow=workflow,
)
def _makeSyncWrapper(handler):
def syncWrapper():
loop = _main_loop
if loop and loop.is_running():
loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(handler(), loop=loop)
)
else:
try:
asyncio.run(handler())
except RuntimeError:
logger.warning("WorkflowScheduler: could not run handler, no event loop")
return syncWrapper
syncWrapper = _makeSyncWrapper(asyncHandler)
intervalSeconds = _cronToIntervalSeconds(cron)
if intervalSeconds is not None:
eventManager.registerInterval(
jobId=jobId,
func=syncWrapper,
seconds=intervalSeconds,
replaceExisting=True,
)
else:
try:
cronKwargs = parse_cron_to_kwargs(cron)
eventManager.registerCron(
jobId=jobId,
func=syncWrapper,
cronKwargs=cronKwargs,
replaceExisting=True,
)
except ValueError as e:
logger.warning("Workflow %s: invalid cron %r: %s", workflowId, cron, e)
continue
newRegistered[workflowId] = jobId
mode = "interval" if intervalSeconds is not None else "cron"
logger.info(
"WorkflowScheduler: registered %s for workflow %s (%s=%s)",
jobId, workflowId, mode,
intervalSeconds if intervalSeconds is not None else cron,
)
self._updateEventId(workflow, workflowId, jobId)
staleIds = set(self._registered.keys()) - activeWorkflowIds
for wfId in staleIds:
oldJobId = self._registered[wfId]
try:
eventManager.remove(oldJobId)
logger.info("WorkflowScheduler: removed stale job %s", oldJobId)
except Exception:
pass
self._registered = newRegistered
return {"synced": len(newRegistered), "workflowsFound": len(items)}
def _updateEventId(self, workflow: Dict, workflowId: str, jobId: str) -> None:
"""Update AutoWorkflow.eventId for incremental sync tracking (v1 pattern)."""
currentEventId = workflow.get("eventId")
if currentEventId != jobId:
try:
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface
from modules.interfaces.interfaceDbApp import getRootInterface
root = getRootInterface()
eventUser = root.getUserByUsername("event") if root else self._eventUser
if not eventUser:
return
mandateId = workflow.get("mandateId", "")
instanceId = workflow.get("featureInstanceId", "")
iface = getGraphicalEditorInterface(eventUser, mandateId, instanceId)
iface.updateWorkflow(workflowId, {"eventId": jobId})
except Exception as e:
logger.debug("WorkflowScheduler: could not update eventId for %s: %s", workflowId, e)
def _createHandler(
self,
workflowId: str,
mandateId: str,
instanceId: str,
entryPointId: str,
workflow: Dict[str, Any],
):
"""Create async handler for scheduled workflow execution with active-check."""
eventUser = self._eventUser
async def handler():
logger.info("WorkflowScheduler: CRON FIRED for workflow %s", workflowId)
try:
if not eventUser:
logger.error("WorkflowScheduler: event user not available")
return
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface
from modules.features.graphicalEditor.mainGraphicalEditor import getGraphicalEditorServices
from modules.workflows.automation2.executionEngine import executeGraph
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
from modules.features.graphicalEditor.entryPoints import find_invocation
from modules.workflows.automation2.runEnvelope import default_run_envelope, normalize_run_envelope
iface = getGraphicalEditorInterface(eventUser, mandateId, instanceId)
wf = iface.getWorkflow(workflowId)
if not wf or not wf.get("graph"):
logger.warning("WorkflowScheduler: workflow %s not found or no graph", workflowId)
return
if not wf.get("active", True):
logger.info("WorkflowScheduler: workflow %s inactive, skipping", workflowId)
return
inv = find_invocation(wf, entryPointId)
if inv and (inv.get("kind") != "schedule" or not inv.get("enabled", True)):
logger.info("WorkflowScheduler: entry point %s disabled for workflow %s", entryPointId, workflowId)
return
services = getGraphicalEditorServices(
eventUser,
mandateId=mandateId,
featureInstanceId=instanceId,
)
discoverMethods(services)
title = (inv or {}).get("title") or {}
requestLang: Optional[str] = getattr(eventUser, "language", None)
label = resolveText(title, requestLang) if title else ""
runEnv = default_run_envelope(
"schedule",
entry_point_id=entryPointId,
entry_point_label=label or None,
)
runEnv = normalize_run_envelope(runEnv, user_id=str(eventUser.id) if eventUser else None)
_wfLabel = wf.get("label") if isinstance(wf, dict) else getattr(wf, "label", None)
result = await executeGraph(
graph=wf["graph"],
services=services,
workflowId=workflowId,
instanceId=instanceId,
userId=None,
mandateId=mandateId,
automation2_interface=iface,
run_envelope=runEnv,
label=_wfLabel,
)
logger.info(
"WorkflowScheduler: executed workflow %s success=%s paused=%s",
workflowId, result.get("success"), result.get("paused"),
)
except Exception as e:
logger.exception("WorkflowScheduler: failed to execute workflow %s: %s", workflowId, e)
return handler
def _delayedSync(self) -> None:
"""Delayed sync (5s) in case DB was not ready at startup."""
import threading
eventUser = self._eventUser
def _run():
import time
time.sleep(5)
try:
self._syncScheduledWorkflows()
logger.info("WorkflowScheduler: delayed sync done")
except Exception as e:
logger.warning("WorkflowScheduler: delayed sync failed: %s", e)
t = threading.Thread(target=_run, daemon=True)
t.start()
def _onWorkflowChanged(self, _context=None) -> None:
"""Callback when a workflow is created/updated/deleted."""
try:
self._syncScheduledWorkflows()
logger.debug("WorkflowScheduler: re-synced after workflow change")
except Exception as e:
logger.warning("WorkflowScheduler: re-sync failed: %s", e)
def _removeAllJobs(self) -> None:
"""Remove all registered workflow schedule jobs."""
if not eventManager.scheduler:
return
for job in list(eventManager.scheduler.get_jobs()):
jid = job.id if hasattr(job, "id") else str(job)
if jid.startswith(JOB_ID_PREFIX):
try:
eventManager.remove(jid)
except Exception as e:
logger.debug("Could not remove job %s: %s", jid, e)
def _cronToIntervalSeconds(cron: str):
"""If cron represents a simple interval, return seconds. Otherwise None."""
if not cron or not isinstance(cron, str):
return None
parts = cron.strip().split()
if len(parts) == 5:
minute, hour, day, month, dow = parts
second = "0"
elif len(parts) == 6:
second, minute, hour, day, month, dow = parts
else:
return None
if minute.startswith("*/") and hour == "*" and day == "*" and month == "*" and dow == "*":
n = int(minute[2:]) if minute[2:].isdigit() else 0
if n > 0:
return n * 60
if minute == "*" and hour == "*" and day == "*" and month == "*" and dow == "*" and second == "0":
return 60
if minute == "0" and hour.startswith("*/") and day == "*" and month == "*" and dow == "*":
n = int(hour[2:]) if hour[2:].isdigit() else 0
if n > 0:
return n * 3600
if len(parts) == 6 and second.startswith("*/") and minute == "*" and hour == "*" and day == "*" and month == "*" and dow in ("*", "?"):
n = int(second[2:]) if second[2:].isdigit() else 0
if n > 0:
return n
return None
def _notifyRunFailed(workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None) -> None:
"""Notify on workflow run failure: emit event, create in-app notification, trigger email subscription."""
try:
eventManager.emit("graphicalEditor.run.failed", {
"workflowId": workflowId,
"runId": runId,
"error": error,
"mandateId": mandateId,
})
logger.info("Emitted run.failed event for run %s (workflow %s)", runId, workflowId)
except Exception as e:
logger.warning("Failed to emit run.failed event: %s", e)
_createRunFailedNotification(workflowId, runId, error, mandateId, workflowLabel)
_triggerRunFailedSubscription(workflowId, runId, error, mandateId, workflowLabel)
def _createRunFailedNotification(
workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None
) -> None:
"""Create in-app notification for the workflow creator."""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelNotification import UserNotification, NotificationType, NotificationStatus
rootInterface = getRootInterface()
if not rootInterface:
return
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface
eventUser = rootInterface.getUserByUsername("event")
if not eventUser:
return
iface = getGraphicalEditorInterface(eventUser, mandateId or "", "")
wf = iface.getWorkflow(workflowId)
if not wf:
return
creatorId = wf.get("sysCreatedBy") if isinstance(wf, dict) else getattr(wf, "sysCreatedBy", None)
if not creatorId:
return
label = workflowLabel or (wf.get("label") if isinstance(wf, dict) else getattr(wf, "label", ""))
notification = UserNotification(
userId=creatorId,
type=NotificationType.SYSTEM,
status=NotificationStatus.UNREAD,
title="Workflow fehlgeschlagen",
message=f"Workflow '{label or workflowId}' ist fehlgeschlagen: {error[:200]}",
referenceType="AutoRun",
referenceId=runId,
icon="alert-triangle",
)
rootInterface.db.recordCreate(
model_class=UserNotification,
record=notification.model_dump(),
)
logger.info("Created in-app notification for user %s (run %s)", creatorId, runId)
except Exception as e:
logger.warning("Failed to create in-app run.failed notification: %s", e)
def _triggerRunFailedSubscription(
workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None
) -> None:
"""Trigger the messaging subscription for run failures (email notifications)."""
try:
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelMessaging import MessagingEventParameters
rootInterface = getRootInterface()
if not rootInterface:
return
eventUser = rootInterface.getUserByUsername("event")
if not eventUser:
return
ctx = ServiceCenterContext(
user=eventUser,
mandate_id=mandateId or "",
feature_instance_id="",
feature_code="graphicalEditor",
)
messagingService = getService("messaging", ctx)
subscriptionId = "GraphicalEditorRunFailed"
eventParams = MessagingEventParameters(triggerData={
"workflowId": workflowId,
"workflowLabel": workflowLabel or workflowId,
"runId": runId,
"error": error,
"mandateId": mandateId or "",
})
result = messagingService.executeSubscription(subscriptionId, eventParams)
logger.info(
"Triggered run.failed subscription: sent=%d success=%s",
result.messagesSent, result.success,
)
except FileNotFoundError:
logger.debug("Subscription function GraphicalEditorRunFailed not found (not yet registered)")
except ValueError as e:
logger.debug("Subscription GraphicalEditorRunFailed: %s", e)
except Exception as e:
logger.warning("Failed to trigger run.failed subscription: %s", e)
# Module-level singleton
_scheduler = WorkflowScheduler()
def start(eventUser) -> bool:
"""Start the consolidated workflow scheduler."""
return _scheduler.start(eventUser)
def stop() -> bool:
"""Stop the consolidated workflow scheduler."""
return _scheduler.stop()
def syncNow() -> dict:
"""Trigger an immediate incremental sync. Used by /schedule-sync endpoint."""
return _scheduler._syncScheduledWorkflows()
def setMainLoop(loop) -> None:
"""Set the main event loop for thread-bridge."""
_setMainLoop(loop)