gateway/modules/workflows/automation/mainWorkflow.py

321 lines
15 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Main workflow service - handles workflow execution, scheduling, and chat playground operations.
Combines functionality from:
- mainAutomation.py: Automation workflow execution and scheduling
- mainChatPlayground.py: Chat playground workflow start/stop operations
"""
import logging
import json
from typing import Dict, Any, Optional
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum
from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition
from modules.datamodels.datamodelUam import User
from modules.shared.timeUtils import getUtcTimestamp
from modules.shared.eventManagement import eventManager
from modules.features.automation.mainAutomation import getAutomationServices
from modules.workflows.workflowManager import WorkflowManager
from .subAutomationUtils import parseScheduleToCron, planToPrompt, replacePlaceholders
logger = logging.getLogger(__name__)
async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None) -> ChatWorkflow:
"""
Starts a new chat or continues an existing one, then launches processing asynchronously.
Args:
currentUser: Current user
userInput: User input request
workflowId: Optional workflow ID to continue existing workflow
workflowMode: Workflow mode (Dynamic, Automation, etc.)
mandateId: Mandate ID (required for billing)
featureInstanceId: Feature instance ID (required for billing)
featureCode: Feature code (e.g., 'chatplayground', 'automation')
"""
try:
services = getAutomationServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
# Store allowedProviders in services context for model selection
if hasattr(userInput, 'allowedProviders') and userInput.allowedProviders:
services.allowedProviders = userInput.allowedProviders
logger.info(f"AI provider filter active: {userInput.allowedProviders}")
# Store feature code in services (for billing)
if featureCode:
services.featureCode = featureCode
workflowManager = WorkflowManager(services)
workflow = await workflowManager.workflowStart(userInput, workflowMode, workflowId)
return workflow
except Exception as e:
logger.error(f"Error starting chat: {str(e)}")
raise
async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None) -> ChatWorkflow:
"""Stops a running chat."""
try:
services = getAutomationServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
if featureCode:
services.featureCode = featureCode
workflowManager = WorkflowManager(services)
return await workflowManager.workflowStop(workflowId)
except Exception as e:
logger.error(f"Error stopping chat: {str(e)}")
raise
async def executeAutomation(automationId: str, automation, creatorUser: User, services) -> ChatWorkflow:
"""Execute automation workflow with the creator user's context.
The automation object and creatorUser are resolved by the caller (handler)
using the SysAdmin eventUser. This function does NOT re-load them.
Args:
automationId: ID of automation to execute
automation: Pre-loaded automation object (with system fields like _createdBy)
creatorUser: The user who created the automation (workflow runs in this context)
services: Services instance (used for interfaceDbApp etc.)
Returns:
ChatWorkflow instance created by automation execution
"""
executionStartTime = getUtcTimestamp()
executionLog = {
"timestamp": executionStartTime,
"workflowId": None,
"status": "running",
"messages": []
}
try:
executionLog["messages"].append(f"Started execution at {executionStartTime}")
# Store allowed providers from automation in services context
if hasattr(automation, 'allowedProviders') and automation.allowedProviders:
services.allowedProviders = automation.allowedProviders
logger.debug(f"Automation {automationId} restricted to providers: {automation.allowedProviders}")
# Context comes EXCLUSIVELY from the automation definition
automationMandateId = str(automation.mandateId) if automation.mandateId is not None else None
automationFeatureInstanceId = str(automation.featureInstanceId) if automation.featureInstanceId is not None else None
if not automationMandateId or not automationFeatureInstanceId:
raise ValueError(f"Automation {automationId} missing mandateId or featureInstanceId")
logger.info(f"Executing automation {automationId} as user {creatorUser.id} with mandateId={automationMandateId}, featureInstanceId={automationFeatureInstanceId}")
# 1. Replace placeholders in template to generate plan
template = automation.template or ""
placeholders = automation.placeholders or {}
planJson = replacePlaceholders(template, placeholders)
try:
plan = json.loads(planJson)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse plan JSON after placeholder replacement: {str(e)}")
logger.error(f"Template: {template[:500]}...")
logger.error(f"Placeholders: {placeholders}")
logger.error(f"Generated planJson (first 1000 chars): {planJson[:1000]}")
logger.error(f"Error position: line {e.lineno}, column {e.colno}, char {e.pos}")
if e.pos is not None:
start = max(0, e.pos - 100)
end = min(len(planJson), e.pos + 100)
logger.error(f"Context around error: ...{planJson[start:end]}...")
raise ValueError(f"Invalid JSON after placeholder replacement: {str(e)}")
executionLog["messages"].append("Template placeholders replaced successfully")
executionLog["messages"].append(f"Using creator user: {creatorUser.id}")
# 2. Create UserInputRequest from plan
# Embed plan JSON in prompt for TemplateMode to extract
promptText = planToPrompt(plan)
planJsonStr = json.dumps(plan)
# Embed plan as JSON comment so TemplateMode can extract it
promptWithPlan = f"{promptText}\n\n<!--TEMPLATE_PLAN_START-->\n{planJsonStr}\n<!--TEMPLATE_PLAN_END-->"
userInput = UserInputRequest(
prompt=promptWithPlan,
listFileId=[],
userLanguage=creatorUser.language or "en"
)
executionLog["messages"].append("Starting workflow execution")
# 3. Start workflow using chatStart with creator's context
# mandateId and featureInstanceId come from the automation definition
workflow = await chatStart(
currentUser=creatorUser,
userInput=userInput,
workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION,
workflowId=None,
mandateId=automationMandateId,
featureInstanceId=automationFeatureInstanceId,
featureCode='automation'
)
executionLog["workflowId"] = workflow.id
executionLog["status"] = "completed"
executionLog["messages"].append(f"Workflow {workflow.id} started successfully")
logger.info(f"Started workflow {workflow.id} with plan containing {len(plan.get('tasks', []))} tasks (plan embedded in userInput)")
# Set workflow name with "automated" prefix — use creatorUser's Services
# (services parameter is eventServices with eventUser context, must use creatorUser context)
creatorServices = getAutomationServices(creatorUser, mandateId=automationMandateId, featureInstanceId=automationFeatureInstanceId)
automationLabel = automation.label or "Unknown Automation"
workflowName = f"automated: {automationLabel}"
creatorServices.interfaceDbChat.updateWorkflow(workflow.id, {"name": workflowName})
logger.info(f"Set workflow {workflow.id} name to: {workflowName}")
# Save execution log (bypasses RBAC — system operation, not a user edit)
executionLogs = list(automation.executionLogs or [])
executionLogs.append(executionLog)
# Keep only last 50 executions
if len(executionLogs) > 50:
executionLogs = executionLogs[-50:]
services.interfaceDbAutomation._saveExecutionLog(automationId, executionLogs)
return workflow
except Exception as e:
# Log error to execution log
executionLog["status"] = "error"
executionLog["messages"].append(f"Error: {str(e)}")
# Save execution log even on error (bypasses RBAC — system operation)
# Use the automation object already passed in (no re-load needed)
try:
executionLogs = list(getattr(automation, 'executionLogs', None) or [])
executionLogs.append(executionLog)
if len(executionLogs) > 50:
executionLogs = executionLogs[-50:]
services.interfaceDbAutomation._saveExecutionLog(automationId, executionLogs)
except Exception as logError:
logger.error(f"Error saving execution log: {str(logError)}")
raise
def syncAutomationEvents(services, eventUser) -> Dict[str, Any]:
"""Sync scheduler with all active automations.
All operations (DB reads, scheduler registration) are synchronous.
Args:
services: Services instance for data access
eventUser: System-level event user for accessing automations
Returns:
Dictionary with sync results (synced count and event IDs)
"""
# Get all automation definitions filtered by RBAC (for current mandate)
filtered = services.interfaceDbAutomation.getAllAutomationDefinitionsWithRBAC(eventUser)
registeredEvents = {}
for automation in filtered:
# Handle both dict and object access patterns
if isinstance(automation, dict):
automationId = automation.get('id')
isActive = automation.get('active', False)
currentEventId = automation.get('eventId')
schedule = automation.get('schedule')
else:
automationId = automation.id
isActive = automation.active if hasattr(automation, 'active') else False
currentEventId = automation.eventId if hasattr(automation, 'eventId') else None
schedule = automation.schedule if hasattr(automation, 'schedule') else None
if not schedule:
logger.warning(f"Automation {automationId} has no schedule, skipping")
continue
try:
# Parse schedule to cron kwargs
cronKwargs = parseScheduleToCron(schedule)
if isActive:
newEventId = f"automation.{automationId}"
handler = createAutomationEventHandler(automationId, eventUser)
# Register with replaceExisting=True (atomically replaces old event)
eventManager.registerCron(
jobId=newEventId,
func=handler,
cronKwargs=cronKwargs,
replaceExisting=True
)
# Update automation with new eventId
if currentEventId != newEventId:
services.interfaceDbAutomation.updateAutomationDefinition(
automationId,
{"eventId": newEventId}
)
registeredEvents[automationId] = newEventId
else:
# Remove event if exists
if currentEventId:
try:
eventManager.remove(currentEventId)
services.interfaceDbAutomation.updateAutomationDefinition(
automationId,
{"eventId": None}
)
except Exception as e:
logger.warning(f"Error removing event {currentEventId}: {str(e)}")
except Exception as e:
logger.error(f"Error syncing automation {automationId}: {str(e)}")
return {
"synced": len(registeredEvents),
"events": registeredEvents
}
def createAutomationEventHandler(automationId: str, eventUser):
"""Create event handler function for a specific automation.
Args:
automationId: ID of automation to create handler for
eventUser: System-level event user for accessing automations (captured in closure)
Returns:
Async handler function for scheduled automation execution
"""
async def handler():
try:
if not eventUser:
logger.error("Event user not available for automation execution")
return
# Load automation using SysAdmin eventUser (has unrestricted access)
eventServices = getAutomationServices(eventUser, mandateId=None, featureInstanceId=None)
automation = eventServices.interfaceDbAutomation.getAutomationDefinition(automationId, includeSystemFields=True)
if not automation or not getattr(automation, "active", False):
logger.warning(f"Automation {automationId} not found or not active, skipping execution")
return
# Get creator user ID from automation's _createdBy system field
creatorUserId = getattr(automation, "_createdBy", None)
if not creatorUserId:
logger.error(f"Automation {automationId} has no creator user (_createdBy missing)")
return
# Get creator user from database (using SysAdmin access)
creatorUser = eventServices.interfaceDbApp.getUser(creatorUserId)
if not creatorUser:
logger.error(f"Creator user {creatorUserId} not found for automation {automationId}")
return
# Execute automation — pass automation object and creatorUser directly
# No re-load needed in executeAutomation
await executeAutomation(automationId, automation, creatorUser, eventServices)
logger.info(f"Successfully executed automation {automationId} as user {creatorUserId}")
except Exception as e:
logger.error(f"Error executing automation {automationId}: {str(e)}")
return handler