gateway/modules/features/automation/mainAutomation.py

287 lines
12 KiB
Python

"""
Main automation service - handles automation workflow execution and scheduling.
Moved from interfaces/interfaceDbChatObjects.py to follow proper architectural separation.
"""
import logging
import json
from typing import Dict, Any
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum, AutomationDefinition
from modules.shared.timeUtils import getUtcTimestamp
from modules.shared.eventManagement import eventManager
from modules.services import getInterface as getServices
from modules.features.chatPlayground.mainChatPlayground import chatStart
from .subAutomationUtils import parseScheduleToCron, planToPrompt, replacePlaceholders
logger = logging.getLogger(__name__)
async def executeAutomation(automationId: str, chatInterface) -> ChatWorkflow:
"""Execute automation workflow immediately (test mode) with placeholder replacement.
Args:
automationId: ID of automation to execute
chatInterface: ChatObjects interface instance for data access
Returns:
ChatWorkflow instance created by automation execution
"""
executionStartTime = getUtcTimestamp()
executionLog = {
"timestamp": executionStartTime,
"workflowId": None,
"status": "running",
"messages": []
}
try:
# 1. Load automation definition
automation = chatInterface.getAutomationDefinition(automationId)
if not automation:
raise ValueError(f"Automation {automationId} not found")
executionLog["messages"].append(f"Started execution at {executionStartTime}")
# 2. Replace placeholders in template to generate plan
template = automation.get("template", "")
placeholders = automation.get("placeholders", {})
planJson = replacePlaceholders(template, placeholders)
try:
plan = json.loads(planJson)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse plan JSON after placeholder replacement: {str(e)}")
logger.error(f"Template: {template[:500]}...")
logger.error(f"Placeholders: {placeholders}")
logger.error(f"Generated planJson (first 1000 chars): {planJson[:1000]}")
logger.error(f"Error position: line {e.lineno}, column {e.colno}, char {e.pos}")
if e.pos:
start = max(0, e.pos - 100)
end = min(len(planJson), e.pos + 100)
logger.error(f"Context around error: ...{planJson[start:end]}...")
raise ValueError(f"Invalid JSON after placeholder replacement: {str(e)}")
executionLog["messages"].append("Template placeholders replaced successfully")
# 3. Get user who created automation
creatorUserId = automation.get("_createdBy")
# CRITICAL: Automation MUST run as creator user only, or fail
if not creatorUserId:
errorMsg = f"Automation {automationId} has no creator user (_createdBy field missing). Cannot execute automation."
logger.error(errorMsg)
executionLog["messages"].append(errorMsg)
raise ValueError(errorMsg)
# Get user from database using services
services = getServices(chatInterface.currentUser, None)
creatorUser = services.interfaceDbApp.getUser(creatorUserId)
if not creatorUser:
raise ValueError(f"Creator user {creatorUserId} not found")
executionLog["messages"].append(f"Using creator user: {creatorUserId}")
# 4. Create UserInputRequest from plan
# Embed plan JSON in prompt for TemplateMode to extract
promptText = planToPrompt(plan)
planJsonStr = json.dumps(plan)
# Embed plan as JSON comment so TemplateMode can extract it
promptWithPlan = f"{promptText}\n\n<!--TEMPLATE_PLAN_START-->\n{planJsonStr}\n<!--TEMPLATE_PLAN_END-->"
userInput = UserInputRequest(
prompt=promptWithPlan,
listFileId=[],
userLanguage=creatorUser.language or "en"
)
executionLog["messages"].append("Starting workflow execution")
# 5. Start workflow using chatStart
workflow = await chatStart(
currentUser=creatorUser,
userInput=userInput,
workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION,
workflowId=None
)
executionLog["workflowId"] = workflow.id
executionLog["status"] = "completed"
executionLog["messages"].append(f"Workflow {workflow.id} started successfully")
logger.info(f"Started workflow {workflow.id} with plan containing {len(plan.get('tasks', []))} tasks (plan embedded in userInput)")
# Set workflow name with "automated" prefix
automationLabel = automation.get("label", "Unknown Automation")
workflowName = f"automated: {automationLabel}"
workflow = chatInterface.updateWorkflow(workflow.id, {"name": workflowName})
logger.info(f"Set workflow {workflow.id} name to: {workflowName}")
# Update automation with execution log
executionLogs = automation.get("executionLogs", [])
executionLogs.append(executionLog)
# Keep only last 50 executions
if len(executionLogs) > 50:
executionLogs = executionLogs[-50:]
chatInterface.db.recordModify(
AutomationDefinition,
automationId,
{"executionLogs": executionLogs}
)
return workflow
except Exception as e:
# Log error to execution log
executionLog["status"] = "error"
executionLog["messages"].append(f"Error: {str(e)}")
# Update automation with execution log even on error
try:
automation = chatInterface.getAutomationDefinition(automationId)
if automation:
executionLogs = automation.get("executionLogs", [])
executionLogs.append(executionLog)
if len(executionLogs) > 50:
executionLogs = executionLogs[-50:]
chatInterface.db.recordModify(
AutomationDefinition,
automationId,
{"executionLogs": executionLogs}
)
except Exception as logError:
logger.error(f"Error saving execution log: {str(logError)}")
raise
async def syncAutomationEvents(chatInterface, eventUser) -> Dict[str, Any]:
"""Automation event handler - syncs scheduler with all active automations.
Args:
chatInterface: ChatObjects interface instance for data access
eventUser: System-level event user for accessing automations
Returns:
Dictionary with sync results (synced count and event IDs)
"""
# Get all automation definitions (for current mandate)
allAutomations = chatInterface.db.getRecordset(AutomationDefinition)
filtered = chatInterface._uam(AutomationDefinition, allAutomations)
registeredEvents = {}
for automation in filtered:
automationId = automation.get("id")
isActive = automation.get("active", False)
currentEventId = automation.get("eventId")
schedule = automation.get("schedule")
if not schedule:
logger.warning(f"Automation {automationId} has no schedule, skipping")
continue
try:
# Parse schedule to cron kwargs
cronKwargs = parseScheduleToCron(schedule)
if isActive:
# Remove existing event if present (handles schedule changes)
if currentEventId:
try:
eventManager.remove(currentEventId)
except Exception as e:
logger.warning(f"Error removing old event {currentEventId}: {str(e)}")
# Register new event
newEventId = f"automation.{automationId}"
# Create event handler function
handler = createAutomationEventHandler(automationId, eventUser)
# Register cron job
eventManager.registerCron(
jobId=newEventId,
func=handler,
cronKwargs=cronKwargs,
replaceExisting=True
)
# Update automation with new eventId
if currentEventId != newEventId:
chatInterface.db.recordModify(
AutomationDefinition,
automationId,
{"eventId": newEventId}
)
registeredEvents[automationId] = newEventId
else:
# Remove event if exists
if currentEventId:
try:
eventManager.remove(currentEventId)
chatInterface.db.recordModify(
AutomationDefinition,
automationId,
{"eventId": None}
)
except Exception as e:
logger.warning(f"Error removing event {currentEventId}: {str(e)}")
except Exception as e:
logger.error(f"Error syncing automation {automationId}: {str(e)}")
return {
"synced": len(registeredEvents),
"events": registeredEvents
}
def createAutomationEventHandler(automationId: str, eventUser):
"""Create event handler function for a specific automation.
Args:
automationId: ID of automation to create handler for
eventUser: System-level event user for accessing automations (captured in closure)
Returns:
Async handler function for scheduled automation execution
"""
async def handler():
try:
if not eventUser:
logger.error("Event user not available for automation execution")
return
# Get services for event user (provides access to interfaces)
eventServices = getServices(eventUser, None)
# Load automation using event user context
automation = eventServices.interfaceDbChat.getAutomationDefinition(automationId)
if not automation or not automation.get("active"):
logger.warning(f"Automation {automationId} not found or not active, skipping execution")
return
# Get creator user
creatorUserId = automation.get("_createdBy")
if not creatorUserId:
logger.error(f"Automation {automationId} has no creator user")
return
# Get creator user from database using services
eventServices = getServices(eventUser, None)
creatorUser = eventServices.interfaceDbApp.getUser(creatorUserId)
if not creatorUser:
logger.error(f"Creator user {creatorUserId} not found for automation {automationId}")
return
# Get services for creator user (provides access to interfaces)
creatorServices = getServices(creatorUser, None)
# Execute automation with creator user's context
# executeAutomation is in same module, so we can call it directly
await executeAutomation(automationId, creatorServices.interfaceDbChat)
logger.info(f"Successfully executed automation {automationId} as user {creatorUserId}")
except Exception as e:
logger.error(f"Error executing automation {automationId}: {str(e)}")
return handler