gateway/modules/features/automation2/emailPoller.py
2026-03-22 18:20:31 +01:00

268 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Background email poller for automation2.
Checks paused runs waiting for email (email.checkEmail node) and resumes when a new matching email arrives.
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# Job ID for scheduler
POLLER_JOB_ID = "automation2_email_poller"
POLL_INTERVAL_MINUTES = 2
async def _pollEmailWaits(eventUser) -> None:
"""
Poll for new emails for runs waiting on email.checkEmail.
Uses eventUser for DB access; loads owner user for each run to call readEmails.
Stops the poller when no runs are waiting.
"""
try:
from modules.features.automation2.interfaceFeatureAutomation2 import getAutomation2Interface
from modules.features.automation2.mainAutomation2 import getAutomation2Services
from modules.workflows.automation2.executionEngine import executeGraph
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
from modules.interfaces.interfaceDbApp import getRootInterface
root = getRootInterface()
if not root:
logger.warning("Email poller: root interface not available")
return
# Use eventUser - getRunsWaitingForEmail queries by status only
a2 = getAutomation2Interface(eventUser, mandateId="", featureInstanceId="")
runs = a2.getRunsWaitingForEmail()
if not runs:
# No workflows waiting for email - stop the poller
stop(eventUser)
return
logger.info("Automation2 email poller: checking %d run(s) waiting for email", len(runs))
for run in runs:
run_id = run.get("id")
workflow_id = run.get("workflowId")
context = run.get("context") or {}
wait_config = context.get("waitConfig") or {}
node_id = run.get("currentNodeId") or context.get("waitConfig", {}).get("_nodeId")
owner_id = context.get("ownerId")
mandate_id = context.get("mandateId")
instance_id = context.get("instanceId")
last_checked = context.get("lastCheckedAt")
if not owner_id or not mandate_id or not instance_id or not workflow_id or not node_id:
logger.warning("Email wait run %s missing ownerId/mandateId/instanceId/workflowId/nodeId - skipping", run_id)
continue
# First poll: use pausedAt (or now - 5 min) as baseline so we don't miss emails
# that arrived between pause and first poll
if last_checked is None:
paused_at = context.get("pausedAt")
if paused_at:
baseline = paused_at
else:
# Fallback: look back 5 minutes for runs created before pausedAt existed
baseline = (datetime.now(timezone.utc) - timedelta(minutes=5)).strftime("%Y-%m-%dT%H:%M:%SZ")
last_checked = baseline
# Load owner user (root interface has broad access)
owner = root.getUser(owner_id) if hasattr(root, "getUser") else None
if not owner:
logger.warning("Email wait run %s: owner user %s not found", run_id, owner_id)
continue
# Get workflow (need scoped interface for mandate/instance)
a2_scoped = getAutomation2Interface(eventUser, mandateId=mandate_id, featureInstanceId=instance_id)
wf = a2_scoped.getWorkflow(workflow_id)
if not wf or not wf.get("graph"):
logger.warning("Email wait run %s: workflow %s not found or has no graph", run_id, workflow_id)
continue
# Only process runs paused at email.checkEmail searchEmail never waits, it searches all immediately
nodes = (wf.get("graph") or {}).get("nodes") or []
paused_node = next((n for n in nodes if n.get("id") == node_id), None)
if paused_node and paused_node.get("type") == "email.searchEmail":
logger.warning("Email wait run %s: paused at email.searchEmail (should not wait) skipping", run_id)
continue
services = getAutomation2Services(owner, mandateId=mandate_id, featureInstanceId=instance_id)
discoverMethods(services)
# Build filter with receivedDateTime only emails received at or after baseline (new emails)
base_filter = wait_config.get("filter") or ""
dt_filter = f"receivedDateTime ge {last_checked}"
combined_filter = f"({base_filter}) and {dt_filter}" if base_filter else dt_filter
logger.debug("Email wait run %s: fetch filter (new emails only) %s", run_id, combined_filter)
from modules.workflows.processing.core.actionExecutor import ActionExecutor
executor = ActionExecutor(services)
params = {
"connectionReference": wait_config.get("connectionReference"),
"folder": wait_config.get("folder", "Inbox"),
"limit": min(int(wait_config.get("limit", 10)), 50),
"filter": combined_filter,
}
try:
result = await executor.executeAction("outlook", "readEmails", params)
except Exception as e:
logger.warning("Email wait run %s: readEmails failed: %s", run_id, e)
continue
# readEmails always returns 1 document (JSON wrapper); check actual email count
email_count = 0
if result and result.documents:
doc = result.documents[0]
meta = getattr(doc, "validationMetadata", None)
if not meta and isinstance(doc, dict):
meta = doc.get("validationMetadata")
if meta and isinstance(meta, dict):
email_count = int(meta.get("emailCount", 0))
else:
try:
data = json.loads(getattr(doc, "documentData", "") or "{}")
email_count = len(data.get("emails", {}).get("emails", []))
except Exception:
pass
if not result or not result.success or email_count == 0:
# No new emails - persist lastCheckedAt so next poll uses this as baseline
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
ctx = dict(context)
ctx["lastCheckedAt"] = now_iso
a2_scoped.updateRun(run_id, context=ctx)
continue
# Only pass NEW emails (receivedDateTime >= last_checked) filter server-side as safeguard
doc = result.documents[0]
raw_data = json.loads(getattr(doc, "documentData", "") or "{}")
emails_data = raw_data.get("emails", {})
all_emails = emails_data.get("emails", [])
new_emails = [
e for e in all_emails
if (e.get("receivedDateTime") or "") >= last_checked
]
if not new_emails:
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
ctx = dict(context)
ctx["lastCheckedAt"] = now_iso
a2_scoped.updateRun(run_id, context=ctx)
continue
# Rebuild document with only new emails for downstream nodes
result_data = dict(raw_data)
result_data["emails"] = dict(emails_data)
result_data["emails"]["emails"] = new_emails
result_data["emails"]["count"] = len(new_emails)
from modules.datamodels.datamodelChat import ActionDocument
filtered_doc = ActionDocument(
documentName=getattr(doc, "documentName", "outlook_emails.json"),
documentData=json.dumps(result_data, indent=2),
mimeType=getattr(doc, "mimeType", "application/json"),
validationMetadata={**(getattr(doc, "validationMetadata") or {}), "emailCount": len(new_emails)},
)
# Build node output in same format as ActionNodeExecutor for readEmails
node_output = {
"success": result.success,
"error": result.error,
"documents": [filtered_doc.model_dump() if hasattr(filtered_doc, "model_dump") else filtered_doc],
"data": result.model_dump() if hasattr(result, "model_dump") else {"success": result.success, "error": result.error},
}
# Update lastCheckedAt before resume
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
ctx = dict(context)
ctx["lastCheckedAt"] = now_iso
a2_scoped.updateRun(run_id, status="running", context=ctx)
node_outputs = dict(run.get("nodeOutputs") or {})
node_outputs[node_id] = node_output
logger.info("Email wait run %s: found new email, resuming from node %s", run_id, node_id)
resume_result = await executeGraph(
graph=wf["graph"],
services=services,
workflowId=workflow_id,
instanceId=instance_id,
userId=owner_id,
mandateId=mandate_id,
automation2_interface=a2_scoped,
initialNodeOutputs=node_outputs,
startAfterNodeId=node_id,
runId=run_id,
)
if resume_result.get("success"):
logger.info("Email wait run %s: completed successfully", run_id)
elif resume_result.get("paused"):
logger.info("Email wait run %s: paused again (e.g. human task)", run_id)
else:
logger.warning("Email wait run %s: failed: %s", run_id, resume_result.get("error"))
except Exception as e:
logger.exception("Email poller failed: %s", e)
def _runPollSync(ev_user):
"""Sync job for scheduler - runs async poll. Thread-safe for both main loop and worker threads."""
try:
try:
loop = asyncio.get_running_loop()
# Already in event loop - schedule, don't block
loop.create_task(_pollEmailWaits(ev_user))
except RuntimeError:
# No running loop (worker thread) - run in new loop
asyncio.run(_pollEmailWaits(ev_user))
except Exception as e:
logger.exception("Automation2 email poller job failed: %s", e)
def ensureRunning(eventUser) -> bool:
"""Start the poller if not already running. Called when a run pauses for email.checkEmail."""
return start(eventUser)
def start(eventUser) -> bool:
"""Register the email poller interval job."""
if not eventUser:
logger.warning("Automation2 email poller: no eventUser, not registering")
return False
try:
from modules.shared.eventManagement import eventManager
# Use sync wrapper - APScheduler may run jobs in thread pool where async doesn't work
job_func = lambda: _runPollSync(eventUser)
eventManager.registerInterval(
POLLER_JOB_ID,
job_func,
seconds=0,
minutes=POLL_INTERVAL_MINUTES,
hours=0,
)
logger.info("Automation2 email poller started (interval=%s min)", POLL_INTERVAL_MINUTES)
# Run once immediately so we don't wait 2 minutes for the first check
_runPollSync(eventUser)
return True
except Exception as e:
logger.error("Failed to register automation2 email poller: %s", e)
return False
def stop(eventUser) -> bool:
"""Remove the email poller job."""
try:
from modules.shared.eventManagement import eventManager
eventManager.remove(POLLER_JOB_ID)
logger.info("Automation2 email poller removed")
return True
except Exception as e:
logger.warning("Error removing automation2 email poller: %s", e)
return True