# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Background email poller for automation2. Checks paused runs waiting for email (email.checkEmail node) and resumes when a new matching email arrives. """ import asyncio import json import logging from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional logger = logging.getLogger(__name__) # Job ID for scheduler POLLER_JOB_ID = "automation2_email_poller" POLL_INTERVAL_MINUTES = 2 async def _pollEmailWaits(eventUser) -> None: """ Poll for new emails for runs waiting on email.checkEmail. Uses eventUser for DB access; loads owner user for each run to call readEmails. Stops the poller when no runs are waiting. """ try: from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface as getAutomation2Interface from modules.features.graphicalEditor.mainGraphicalEditor import getGraphicalEditorServices as getAutomation2Services from modules.workflows.automation2.executionEngine import executeGraph from modules.workflows.processing.shared.methodDiscovery import discoverMethods from modules.interfaces.interfaceDbApp import getRootInterface root = getRootInterface() if not root: logger.warning("Email poller: root interface not available") return # Use eventUser - getRunsWaitingForEmail queries by status only a2 = getAutomation2Interface(eventUser, mandateId="", featureInstanceId="") runs = a2.getRunsWaitingForEmail() if not runs: # No workflows waiting for email - stop the poller stop(eventUser) return logger.info("Automation2 email poller: checking %d run(s) waiting for email", len(runs)) for run in runs: run_id = run.get("id") workflow_id = run.get("workflowId") context = run.get("context") or {} wait_config = context.get("waitConfig") or {} node_id = run.get("currentNodeId") or context.get("waitConfig", {}).get("_nodeId") owner_id = context.get("ownerId") mandate_id = context.get("mandateId") instance_id = context.get("instanceId") last_checked = context.get("lastCheckedAt") if not owner_id or not mandate_id or not instance_id or not workflow_id or not node_id: logger.warning("Email wait run %s missing ownerId/mandateId/instanceId/workflowId/nodeId - skipping", run_id) continue # First poll: use pausedAt (or now - 5 min) as baseline so we don't miss emails # that arrived between pause and first poll if last_checked is None: paused_at = context.get("pausedAt") if paused_at: baseline = paused_at else: # Fallback: look back 5 minutes for runs created before pausedAt existed baseline = (datetime.now(timezone.utc) - timedelta(minutes=5)).strftime("%Y-%m-%dT%H:%M:%SZ") last_checked = baseline # Load owner user (root interface has broad access) owner = root.getUser(owner_id) if hasattr(root, "getUser") else None if not owner: logger.warning("Email wait run %s: owner user %s not found", run_id, owner_id) continue # Get workflow (need scoped interface for mandate/instance) a2_scoped = getAutomation2Interface(eventUser, mandateId=mandate_id, featureInstanceId=instance_id) wf = a2_scoped.getWorkflow(workflow_id) if not wf or not wf.get("graph"): logger.warning("Email wait run %s: workflow %s not found or has no graph", run_id, workflow_id) continue # Only process runs paused at email.checkEmail – searchEmail never waits, it searches all immediately nodes = (wf.get("graph") or {}).get("nodes") or [] paused_node = next((n for n in nodes if n.get("id") == node_id), None) if paused_node and paused_node.get("type") == "email.searchEmail": logger.warning("Email wait run %s: paused at email.searchEmail (should not wait) – skipping", run_id) continue services = getAutomation2Services(owner, mandateId=mandate_id, featureInstanceId=instance_id) discoverMethods(services) # Build filter with receivedDateTime – only emails received at or after baseline (new emails) base_filter = wait_config.get("filter") or "" dt_filter = f"receivedDateTime ge {last_checked}" combined_filter = f"({base_filter}) and {dt_filter}" if base_filter else dt_filter logger.debug("Email wait run %s: fetch filter (new emails only) %s", run_id, combined_filter) from modules.workflows.processing.core.actionExecutor import ActionExecutor executor = ActionExecutor(services) params = { "connectionReference": wait_config.get("connectionReference"), "folder": wait_config.get("folder", "Inbox"), "limit": min(int(wait_config.get("limit", 10)), 50), "filter": combined_filter, } try: result = await executor.executeAction("outlook", "readEmails", params) except Exception as e: logger.warning("Email wait run %s: readEmails failed: %s", run_id, e) continue # readEmails always returns 1 document (JSON wrapper); check actual email count email_count = 0 if result and result.documents: doc = result.documents[0] meta = getattr(doc, "validationMetadata", None) if not meta and isinstance(doc, dict): meta = doc.get("validationMetadata") if meta and isinstance(meta, dict): email_count = int(meta.get("emailCount", 0)) else: try: data = json.loads(getattr(doc, "documentData", "") or "{}") email_count = len(data.get("emails", {}).get("emails", [])) except Exception: pass if not result or not result.success or email_count == 0: # No new emails - persist lastCheckedAt so next poll uses this as baseline now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") ctx = dict(context) ctx["lastCheckedAt"] = now_iso a2_scoped.updateRun(run_id, context=ctx) continue # Only pass NEW emails (receivedDateTime >= last_checked) – filter server-side as safeguard doc = result.documents[0] raw_data = json.loads(getattr(doc, "documentData", "") or "{}") emails_data = raw_data.get("emails", {}) all_emails = emails_data.get("emails", []) new_emails = [ e for e in all_emails if (e.get("receivedDateTime") or "") >= last_checked ] if not new_emails: now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") ctx = dict(context) ctx["lastCheckedAt"] = now_iso a2_scoped.updateRun(run_id, context=ctx) continue # Rebuild document with only new emails for downstream nodes result_data = dict(raw_data) result_data["emails"] = dict(emails_data) result_data["emails"]["emails"] = new_emails result_data["emails"]["count"] = len(new_emails) from modules.datamodels.datamodelChat import ActionDocument filtered_doc = ActionDocument( documentName=getattr(doc, "documentName", "outlook_emails.json"), documentData=json.dumps(result_data, indent=2), mimeType=getattr(doc, "mimeType", "application/json"), validationMetadata={**(getattr(doc, "validationMetadata") or {}), "emailCount": len(new_emails)}, ) # Build node output in same format as ActionNodeExecutor for readEmails node_output = { "success": result.success, "error": result.error, "documents": [filtered_doc.model_dump() if hasattr(filtered_doc, "model_dump") else filtered_doc], "data": result.model_dump() if hasattr(result, "model_dump") else {"success": result.success, "error": result.error}, } # Update lastCheckedAt before resume now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") ctx = dict(context) ctx["lastCheckedAt"] = now_iso a2_scoped.updateRun(run_id, status="running", context=ctx) node_outputs = dict(run.get("nodeOutputs") or {}) node_outputs[node_id] = node_output logger.info("Email wait run %s: found new email, resuming from node %s", run_id, node_id) resume_result = await executeGraph( graph=wf["graph"], services=services, workflowId=workflow_id, instanceId=instance_id, userId=owner_id, mandateId=mandate_id, automation2_interface=a2_scoped, initialNodeOutputs=node_outputs, startAfterNodeId=node_id, runId=run_id, ) if resume_result.get("success"): logger.info("Email wait run %s: completed successfully", run_id) elif resume_result.get("paused"): logger.info("Email wait run %s: paused again (e.g. human task)", run_id) else: logger.warning("Email wait run %s: failed: %s", run_id, resume_result.get("error")) except Exception as e: logger.exception("Email poller failed: %s", e) def _runPollSync(ev_user): """Sync job for scheduler - runs async poll. Thread-safe for both main loop and worker threads.""" try: try: loop = asyncio.get_running_loop() # Already in event loop - schedule, don't block loop.create_task(_pollEmailWaits(ev_user)) except RuntimeError: # No running loop (worker thread) - run in new loop asyncio.run(_pollEmailWaits(ev_user)) except Exception as e: logger.exception("Automation2 email poller job failed: %s", e) def ensureRunning(eventUser) -> bool: """Start the poller if not already running. Called when a run pauses for email.checkEmail.""" return start(eventUser) def start(eventUser) -> bool: """Register the email poller interval job.""" if not eventUser: logger.warning("Automation2 email poller: no eventUser, not registering") return False try: from modules.shared.eventManagement import eventManager # Use sync wrapper - APScheduler may run jobs in thread pool where async doesn't work job_func = lambda: _runPollSync(eventUser) eventManager.registerInterval( POLLER_JOB_ID, job_func, seconds=0, minutes=POLL_INTERVAL_MINUTES, hours=0, ) logger.info("Automation2 email poller started (interval=%s min)", POLL_INTERVAL_MINUTES) # Run once immediately so we don't wait 2 minutes for the first check _runPollSync(eventUser) return True except Exception as e: logger.error("Failed to register automation2 email poller: %s", e) return False def stop(eventUser) -> bool: """Remove the email poller job.""" try: from modules.shared.eventManagement import eventManager eventManager.remove(POLLER_JOB_ID) logger.info("Automation2 email poller removed") return True except Exception as e: logger.warning("Error removing automation2 email poller: %s", e) return True