459 lines
18 KiB
Python
459 lines
18 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Consolidated Workflow Scheduler.
|
|
Replaces subAutomation2Schedule with v1-style incremental sync patterns:
|
|
- eventId tracking on AutoWorkflow for change detection
|
|
- replaceExisting=True for idempotent re-registration
|
|
- active check before execution
|
|
- Capped execution log
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Dict
|
|
|
|
from modules.shared.eventManagement import eventManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_main_loop = None
|
|
|
|
JOB_ID_PREFIX = "graphicalEditor."
|
|
_CALLBACK_NAME = "graphicalEditor.workflow.changed"
|
|
|
|
|
|
def _setMainLoop(loop) -> None:
|
|
global _main_loop
|
|
_main_loop = loop
|
|
|
|
|
|
class WorkflowScheduler:
|
|
"""Consolidated scheduler with v1 incremental sync patterns."""
|
|
|
|
def __init__(self):
|
|
self._eventUser = None
|
|
self._registered: Dict[str, str] = {}
|
|
|
|
def start(self, eventUser) -> bool:
|
|
"""Start scheduler: sync workflows, register callback for changes."""
|
|
if not eventUser:
|
|
logger.warning("WorkflowScheduler: No event user provided, skipping")
|
|
return False
|
|
|
|
self._eventUser = eventUser
|
|
|
|
try:
|
|
eventManager.start()
|
|
self._syncScheduledWorkflows()
|
|
logger.info("WorkflowScheduler: initial sync complete")
|
|
|
|
self._delayedSync()
|
|
|
|
from modules.shared.callbackRegistry import callbackRegistry
|
|
callbackRegistry.register(_CALLBACK_NAME, self._onWorkflowChanged)
|
|
logger.info("WorkflowScheduler: callback registered for %s", _CALLBACK_NAME)
|
|
except Exception as e:
|
|
logger.error("WorkflowScheduler: Failed to start: %s", e)
|
|
return False
|
|
|
|
return True
|
|
|
|
def stop(self) -> bool:
|
|
"""Remove all scheduled workflow jobs."""
|
|
try:
|
|
self._removeAllJobs()
|
|
logger.info("WorkflowScheduler: all jobs removed")
|
|
except Exception as e:
|
|
logger.warning("WorkflowScheduler: error during stop: %s", e)
|
|
return True
|
|
|
|
def _syncScheduledWorkflows(self) -> Dict[str, Any]:
|
|
"""
|
|
Incremental sync: only re-register jobs whose eventId has changed.
|
|
Uses AutoWorkflow.eventId for change detection (v1 pattern).
|
|
"""
|
|
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getAllWorkflowsForScheduling
|
|
from modules.workflows.automation2.scheduleCron import parse_cron_to_kwargs
|
|
|
|
items = getAllWorkflowsForScheduling()
|
|
logger.info("WorkflowScheduler: found %d workflow(s) with trigger.schedule+cron", len(items))
|
|
|
|
newRegistered: Dict[str, str] = {}
|
|
activeWorkflowIds = set()
|
|
|
|
for item in items:
|
|
workflowId = item.get("workflowId")
|
|
if not workflowId:
|
|
continue
|
|
|
|
activeWorkflowIds.add(workflowId)
|
|
cron = item.get("cron")
|
|
mandateId = item.get("mandateId")
|
|
instanceId = item.get("featureInstanceId")
|
|
|
|
if not instanceId or not cron:
|
|
continue
|
|
|
|
jobId = f"{JOB_ID_PREFIX}{workflowId}"
|
|
entryPointId = item.get("entryPointId")
|
|
workflow = item.get("workflow") or {}
|
|
|
|
asyncHandler = self._createHandler(
|
|
workflowId=workflowId,
|
|
mandateId=mandateId,
|
|
instanceId=instanceId,
|
|
entryPointId=entryPointId,
|
|
workflow=workflow,
|
|
)
|
|
|
|
def _makeSyncWrapper(handler):
|
|
def syncWrapper():
|
|
loop = _main_loop
|
|
if loop and loop.is_running():
|
|
loop.call_soon_threadsafe(
|
|
lambda: asyncio.ensure_future(handler(), loop=loop)
|
|
)
|
|
else:
|
|
try:
|
|
asyncio.run(handler())
|
|
except RuntimeError:
|
|
logger.warning("WorkflowScheduler: could not run handler, no event loop")
|
|
return syncWrapper
|
|
|
|
syncWrapper = _makeSyncWrapper(asyncHandler)
|
|
|
|
intervalSeconds = _cronToIntervalSeconds(cron)
|
|
if intervalSeconds is not None:
|
|
eventManager.registerInterval(
|
|
jobId=jobId,
|
|
func=syncWrapper,
|
|
seconds=intervalSeconds,
|
|
replaceExisting=True,
|
|
)
|
|
else:
|
|
try:
|
|
cronKwargs = parse_cron_to_kwargs(cron)
|
|
eventManager.registerCron(
|
|
jobId=jobId,
|
|
func=syncWrapper,
|
|
cronKwargs=cronKwargs,
|
|
replaceExisting=True,
|
|
)
|
|
except ValueError as e:
|
|
logger.warning("Workflow %s: invalid cron %r: %s", workflowId, cron, e)
|
|
continue
|
|
|
|
newRegistered[workflowId] = jobId
|
|
mode = "interval" if intervalSeconds is not None else "cron"
|
|
logger.info(
|
|
"WorkflowScheduler: registered %s for workflow %s (%s=%s)",
|
|
jobId, workflowId, mode,
|
|
intervalSeconds if intervalSeconds is not None else cron,
|
|
)
|
|
|
|
self._updateEventId(workflow, workflowId, jobId)
|
|
|
|
staleIds = set(self._registered.keys()) - activeWorkflowIds
|
|
for wfId in staleIds:
|
|
oldJobId = self._registered[wfId]
|
|
try:
|
|
eventManager.remove(oldJobId)
|
|
logger.info("WorkflowScheduler: removed stale job %s", oldJobId)
|
|
except Exception:
|
|
pass
|
|
|
|
self._registered = newRegistered
|
|
return {"synced": len(newRegistered), "workflowsFound": len(items)}
|
|
|
|
def _updateEventId(self, workflow: Dict, workflowId: str, jobId: str) -> None:
|
|
"""Update AutoWorkflow.eventId for incremental sync tracking (v1 pattern)."""
|
|
currentEventId = workflow.get("eventId")
|
|
if currentEventId != jobId:
|
|
try:
|
|
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
root = getRootInterface()
|
|
eventUser = root.getUserByUsername("event") if root else self._eventUser
|
|
if not eventUser:
|
|
return
|
|
mandateId = workflow.get("mandateId", "")
|
|
instanceId = workflow.get("featureInstanceId", "")
|
|
iface = getGraphicalEditorInterface(eventUser, mandateId, instanceId)
|
|
iface.updateWorkflow(workflowId, {"eventId": jobId})
|
|
except Exception as e:
|
|
logger.debug("WorkflowScheduler: could not update eventId for %s: %s", workflowId, e)
|
|
|
|
def _createHandler(
|
|
self,
|
|
workflowId: str,
|
|
mandateId: str,
|
|
instanceId: str,
|
|
entryPointId: str,
|
|
workflow: Dict[str, Any],
|
|
):
|
|
"""Create async handler for scheduled workflow execution with active-check."""
|
|
eventUser = self._eventUser
|
|
|
|
async def handler():
|
|
logger.info("WorkflowScheduler: CRON FIRED for workflow %s", workflowId)
|
|
try:
|
|
if not eventUser:
|
|
logger.error("WorkflowScheduler: event user not available")
|
|
return
|
|
|
|
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface
|
|
from modules.features.graphicalEditor.mainGraphicalEditor import getGraphicalEditorServices
|
|
from modules.workflows.automation2.executionEngine import executeGraph
|
|
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
|
|
from modules.features.graphicalEditor.entryPoints import find_invocation
|
|
from modules.workflows.automation2.runEnvelope import default_run_envelope, normalize_run_envelope
|
|
|
|
iface = getGraphicalEditorInterface(eventUser, mandateId, instanceId)
|
|
wf = iface.getWorkflow(workflowId)
|
|
if not wf or not wf.get("graph"):
|
|
logger.warning("WorkflowScheduler: workflow %s not found or no graph", workflowId)
|
|
return
|
|
if not wf.get("active", True):
|
|
logger.info("WorkflowScheduler: workflow %s inactive, skipping", workflowId)
|
|
return
|
|
|
|
inv = find_invocation(wf, entryPointId)
|
|
if inv and (inv.get("kind") != "schedule" or not inv.get("enabled", True)):
|
|
logger.info("WorkflowScheduler: entry point %s disabled for workflow %s", entryPointId, workflowId)
|
|
return
|
|
|
|
services = getGraphicalEditorServices(
|
|
eventUser,
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId,
|
|
)
|
|
discoverMethods(services)
|
|
|
|
title = (inv or {}).get("title") or {}
|
|
label = ""
|
|
if isinstance(title, dict):
|
|
label = title.get("en") or title.get("de") or ""
|
|
elif isinstance(title, str):
|
|
label = title
|
|
|
|
runEnv = default_run_envelope(
|
|
"schedule",
|
|
entry_point_id=entryPointId,
|
|
entry_point_label=label or None,
|
|
)
|
|
runEnv = normalize_run_envelope(runEnv, user_id=str(eventUser.id) if eventUser else None)
|
|
|
|
result = await executeGraph(
|
|
graph=wf["graph"],
|
|
services=services,
|
|
workflowId=workflowId,
|
|
instanceId=instanceId,
|
|
userId=None,
|
|
mandateId=mandateId,
|
|
automation2_interface=iface,
|
|
run_envelope=runEnv,
|
|
)
|
|
logger.info(
|
|
"WorkflowScheduler: executed workflow %s success=%s paused=%s",
|
|
workflowId, result.get("success"), result.get("paused"),
|
|
)
|
|
except Exception as e:
|
|
logger.exception("WorkflowScheduler: failed to execute workflow %s: %s", workflowId, e)
|
|
|
|
return handler
|
|
|
|
def _delayedSync(self) -> None:
|
|
"""Delayed sync (5s) in case DB was not ready at startup."""
|
|
import threading
|
|
|
|
eventUser = self._eventUser
|
|
|
|
def _run():
|
|
import time
|
|
time.sleep(5)
|
|
try:
|
|
self._syncScheduledWorkflows()
|
|
logger.info("WorkflowScheduler: delayed sync done")
|
|
except Exception as e:
|
|
logger.warning("WorkflowScheduler: delayed sync failed: %s", e)
|
|
|
|
t = threading.Thread(target=_run, daemon=True)
|
|
t.start()
|
|
|
|
def _onWorkflowChanged(self, _context=None) -> None:
|
|
"""Callback when a workflow is created/updated/deleted."""
|
|
try:
|
|
self._syncScheduledWorkflows()
|
|
logger.debug("WorkflowScheduler: re-synced after workflow change")
|
|
except Exception as e:
|
|
logger.warning("WorkflowScheduler: re-sync failed: %s", e)
|
|
|
|
def _removeAllJobs(self) -> None:
|
|
"""Remove all registered workflow schedule jobs."""
|
|
if not eventManager.scheduler:
|
|
return
|
|
for job in list(eventManager.scheduler.get_jobs()):
|
|
jid = job.id if hasattr(job, "id") else str(job)
|
|
if jid.startswith(JOB_ID_PREFIX):
|
|
try:
|
|
eventManager.remove(jid)
|
|
except Exception as e:
|
|
logger.debug("Could not remove job %s: %s", jid, e)
|
|
|
|
|
|
def _cronToIntervalSeconds(cron: str):
|
|
"""If cron represents a simple interval, return seconds. Otherwise None."""
|
|
if not cron or not isinstance(cron, str):
|
|
return None
|
|
parts = cron.strip().split()
|
|
if len(parts) == 5:
|
|
minute, hour, day, month, dow = parts
|
|
second = "0"
|
|
elif len(parts) == 6:
|
|
second, minute, hour, day, month, dow = parts
|
|
else:
|
|
return None
|
|
if minute.startswith("*/") and hour == "*" and day == "*" and month == "*" and dow == "*":
|
|
n = int(minute[2:]) if minute[2:].isdigit() else 0
|
|
if n > 0:
|
|
return n * 60
|
|
if minute == "*" and hour == "*" and day == "*" and month == "*" and dow == "*" and second == "0":
|
|
return 60
|
|
if minute == "0" and hour.startswith("*/") and day == "*" and month == "*" and dow == "*":
|
|
n = int(hour[2:]) if hour[2:].isdigit() else 0
|
|
if n > 0:
|
|
return n * 3600
|
|
if len(parts) == 6 and second.startswith("*/") and minute == "*" and hour == "*" and day == "*" and month == "*" and dow in ("*", "?"):
|
|
n = int(second[2:]) if second[2:].isdigit() else 0
|
|
if n > 0:
|
|
return n
|
|
return None
|
|
|
|
|
|
def _notifyRunFailed(workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None) -> None:
|
|
"""Notify on workflow run failure: emit event, create in-app notification, trigger email subscription."""
|
|
try:
|
|
eventManager.emit("graphicalEditor.run.failed", {
|
|
"workflowId": workflowId,
|
|
"runId": runId,
|
|
"error": error,
|
|
"mandateId": mandateId,
|
|
})
|
|
logger.info("Emitted run.failed event for run %s (workflow %s)", runId, workflowId)
|
|
except Exception as e:
|
|
logger.warning("Failed to emit run.failed event: %s", e)
|
|
|
|
_createRunFailedNotification(workflowId, runId, error, mandateId, workflowLabel)
|
|
_triggerRunFailedSubscription(workflowId, runId, error, mandateId, workflowLabel)
|
|
|
|
|
|
def _createRunFailedNotification(
|
|
workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None
|
|
) -> None:
|
|
"""Create in-app notification for the workflow creator."""
|
|
try:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.datamodels.datamodelNotification import UserNotification, NotificationType, NotificationStatus
|
|
|
|
rootInterface = getRootInterface()
|
|
if not rootInterface:
|
|
return
|
|
|
|
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface
|
|
eventUser = rootInterface.getUserByUsername("event")
|
|
if not eventUser:
|
|
return
|
|
|
|
iface = getGraphicalEditorInterface(eventUser, mandateId or "", "")
|
|
wf = iface.getWorkflow(workflowId)
|
|
if not wf:
|
|
return
|
|
|
|
creatorId = wf.get("sysCreatedBy") if isinstance(wf, dict) else getattr(wf, "sysCreatedBy", None)
|
|
if not creatorId:
|
|
return
|
|
|
|
label = workflowLabel or (wf.get("label") if isinstance(wf, dict) else getattr(wf, "label", ""))
|
|
notification = UserNotification(
|
|
userId=creatorId,
|
|
type=NotificationType.SYSTEM,
|
|
status=NotificationStatus.UNREAD,
|
|
title="Workflow fehlgeschlagen",
|
|
message=f"Workflow '{label or workflowId}' ist fehlgeschlagen: {error[:200]}",
|
|
referenceType="AutoRun",
|
|
referenceId=runId,
|
|
icon="alert-triangle",
|
|
)
|
|
rootInterface.db.recordCreate(
|
|
model_class=UserNotification,
|
|
record=notification.model_dump(),
|
|
)
|
|
logger.info("Created in-app notification for user %s (run %s)", creatorId, runId)
|
|
except Exception as e:
|
|
logger.warning("Failed to create in-app run.failed notification: %s", e)
|
|
|
|
|
|
def _triggerRunFailedSubscription(
|
|
workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None
|
|
) -> None:
|
|
"""Trigger the messaging subscription for run failures (email notifications)."""
|
|
try:
|
|
from modules.serviceCenter import getService
|
|
from modules.serviceCenter.context import ServiceCenterContext
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.datamodels.datamodelMessaging import MessagingEventParameters
|
|
|
|
rootInterface = getRootInterface()
|
|
if not rootInterface:
|
|
return
|
|
eventUser = rootInterface.getUserByUsername("event")
|
|
if not eventUser:
|
|
return
|
|
|
|
ctx = ServiceCenterContext(
|
|
user=eventUser,
|
|
mandate_id=mandateId or "",
|
|
feature_instance_id="",
|
|
feature_code="graphicalEditor",
|
|
)
|
|
messagingService = getService("messaging", ctx)
|
|
|
|
subscriptionId = "GraphicalEditorRunFailed"
|
|
eventParams = MessagingEventParameters(triggerData={
|
|
"workflowId": workflowId,
|
|
"workflowLabel": workflowLabel or workflowId,
|
|
"runId": runId,
|
|
"error": error,
|
|
"mandateId": mandateId or "",
|
|
})
|
|
result = messagingService.executeSubscription(subscriptionId, eventParams)
|
|
logger.info(
|
|
"Triggered run.failed subscription: sent=%d success=%s",
|
|
result.messagesSent, result.success,
|
|
)
|
|
except FileNotFoundError:
|
|
logger.debug("Subscription function GraphicalEditorRunFailed not found (not yet registered)")
|
|
except ValueError as e:
|
|
logger.debug("Subscription GraphicalEditorRunFailed: %s", e)
|
|
except Exception as e:
|
|
logger.warning("Failed to trigger run.failed subscription: %s", e)
|
|
|
|
|
|
# Module-level singleton
|
|
_scheduler = WorkflowScheduler()
|
|
|
|
|
|
def start(eventUser) -> bool:
|
|
"""Start the consolidated workflow scheduler."""
|
|
return _scheduler.start(eventUser)
|
|
|
|
|
|
def stop() -> bool:
|
|
"""Stop the consolidated workflow scheduler."""
|
|
return _scheduler.stop()
|
|
|
|
|
|
def setMainLoop(loop) -> None:
|
|
"""Set the main event loop for thread-bridge."""
|
|
_setMainLoop(loop)
|