gateway/modules/workflows/automation/mainWorkflow.py
2026-02-10 09:55:51 +01:00

326 lines
14 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Main workflow service - handles workflow execution, scheduling, and chat playground operations.
Combines functionality from:
- mainAutomation.py: Automation workflow execution and scheduling
- mainChatPlayground.py: Chat playground workflow start/stop operations
"""
import logging
import json
from typing import Dict, Any, Optional
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum
from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition
from modules.datamodels.datamodelUam import User
from modules.shared.timeUtils import getUtcTimestamp
from modules.shared.eventManagement import eventManager
from modules.services import getInterface as getServices
from modules.workflows.workflowManager import WorkflowManager
from .subAutomationUtils import parseScheduleToCron, planToPrompt, replacePlaceholders
logger = logging.getLogger(__name__)
async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None) -> ChatWorkflow:
"""
Starts a new chat or continues an existing one, then launches processing asynchronously.
Args:
currentUser: Current user
userInput: User input request
workflowId: Optional workflow ID to continue existing workflow
workflowMode: Workflow mode (Dynamic, Automation, etc.)
mandateId: Mandate ID (required for billing)
featureInstanceId: Feature instance ID (required for billing)
featureCode: Feature code (e.g., 'chatplayground', 'automation')
"""
try:
services = getServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
# Store allowedProviders in services context for model selection
if hasattr(userInput, 'allowedProviders') and userInput.allowedProviders:
services.allowedProviders = userInput.allowedProviders
logger.info(f"AI provider filter active: {userInput.allowedProviders}")
# Store feature code in services (for billing)
if featureCode:
services.featureCode = featureCode
workflowManager = WorkflowManager(services)
workflow = await workflowManager.workflowStart(userInput, workflowMode, workflowId)
return workflow
except Exception as e:
logger.error(f"Error starting chat: {str(e)}")
raise
async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> ChatWorkflow:
"""Stops a running chat."""
try:
services = getServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
if featureInstanceId:
services.featureCode = 'chatplayground'
workflowManager = WorkflowManager(services)
return await workflowManager.workflowStop(workflowId)
except Exception as e:
logger.error(f"Error stopping chat: {str(e)}")
raise
async def executeAutomation(automationId: str, automation, creatorUser: User, services) -> ChatWorkflow:
"""Execute automation workflow with the creator user's context.
The automation object and creatorUser are resolved by the caller (handler)
using the SysAdmin eventUser. This function does NOT re-load them.
Args:
automationId: ID of automation to execute
automation: Pre-loaded automation object (with system fields like _createdBy)
creatorUser: The user who created the automation (workflow runs in this context)
services: Services instance (used for interfaceDbApp etc.)
Returns:
ChatWorkflow instance created by automation execution
"""
executionStartTime = getUtcTimestamp()
executionLog = {
"timestamp": executionStartTime,
"workflowId": None,
"status": "running",
"messages": []
}
try:
executionLog["messages"].append(f"Started execution at {executionStartTime}")
# Store allowed providers from automation in services context
if hasattr(automation, 'allowedProviders') and automation.allowedProviders:
services.allowedProviders = automation.allowedProviders
logger.debug(f"Automation {automationId} restricted to providers: {automation.allowedProviders}")
# Context comes EXCLUSIVELY from the automation definition
automationMandateId = str(automation.mandateId)
automationFeatureInstanceId = str(automation.featureInstanceId)
logger.info(f"Executing automation {automationId} as user {creatorUser.id} with mandateId={automationMandateId}, featureInstanceId={automationFeatureInstanceId}")
# 1. Replace placeholders in template to generate plan
template = automation.template or ""
placeholders = automation.placeholders or {}
planJson = replacePlaceholders(template, placeholders)
try:
plan = json.loads(planJson)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse plan JSON after placeholder replacement: {str(e)}")
logger.error(f"Template: {template[:500]}...")
logger.error(f"Placeholders: {placeholders}")
logger.error(f"Generated planJson (first 1000 chars): {planJson[:1000]}")
logger.error(f"Error position: line {e.lineno}, column {e.colno}, char {e.pos}")
if e.pos:
start = max(0, e.pos - 100)
end = min(len(planJson), e.pos + 100)
logger.error(f"Context around error: ...{planJson[start:end]}...")
raise ValueError(f"Invalid JSON after placeholder replacement: {str(e)}")
executionLog["messages"].append("Template placeholders replaced successfully")
executionLog["messages"].append(f"Using creator user: {creatorUser.id}")
# 2. Create UserInputRequest from plan
# Embed plan JSON in prompt for TemplateMode to extract
promptText = planToPrompt(plan)
planJsonStr = json.dumps(plan)
# Embed plan as JSON comment so TemplateMode can extract it
promptWithPlan = f"{promptText}\n\n<!--TEMPLATE_PLAN_START-->\n{planJsonStr}\n<!--TEMPLATE_PLAN_END-->"
userInput = UserInputRequest(
prompt=promptWithPlan,
listFileId=[],
userLanguage=creatorUser.language or "en"
)
executionLog["messages"].append("Starting workflow execution")
# 3. Start workflow using chatStart with creator's context
# mandateId and featureInstanceId come from the automation definition
workflow = await chatStart(
currentUser=creatorUser,
userInput=userInput,
workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION,
workflowId=None,
mandateId=automationMandateId,
featureInstanceId=automationFeatureInstanceId,
featureCode='automation'
)
executionLog["workflowId"] = workflow.id
executionLog["status"] = "completed"
executionLog["messages"].append(f"Workflow {workflow.id} started successfully")
logger.info(f"Started workflow {workflow.id} with plan containing {len(plan.get('tasks', []))} tasks (plan embedded in userInput)")
# Set workflow name with "automated" prefix
automationLabel = automation.label or "Unknown Automation"
workflowName = f"automated: {automationLabel}"
services.interfaceDbChat.updateWorkflow(workflow.id, {"name": workflowName})
logger.info(f"Set workflow {workflow.id} name to: {workflowName}")
# Save execution log (bypasses RBAC — system operation, not a user edit)
executionLogs = list(automation.executionLogs or [])
executionLogs.append(executionLog)
# Keep only last 50 executions
if len(executionLogs) > 50:
executionLogs = executionLogs[-50:]
services.interfaceDbAutomation._saveExecutionLog(automationId, executionLogs)
return workflow
except Exception as e:
# Log error to execution log
executionLog["status"] = "error"
executionLog["messages"].append(f"Error: {str(e)}")
# Save execution log even on error (bypasses RBAC — system operation)
# Use the automation object already passed in (no re-load needed)
try:
executionLogs = list(getattr(automation, 'executionLogs', None) or [])
executionLogs.append(executionLog)
if len(executionLogs) > 50:
executionLogs = executionLogs[-50:]
services.interfaceDbAutomation._saveExecutionLog(automationId, executionLogs)
except Exception as logError:
logger.error(f"Error saving execution log: {str(logError)}")
raise
def syncAutomationEvents(services, eventUser) -> Dict[str, Any]:
"""Sync scheduler with all active automations.
All operations (DB reads, scheduler registration) are synchronous.
Args:
services: Services instance for data access
eventUser: System-level event user for accessing automations
Returns:
Dictionary with sync results (synced count and event IDs)
"""
# Get all automation definitions filtered by RBAC (for current mandate)
filtered = services.interfaceDbAutomation.getAllAutomationDefinitionsWithRBAC(eventUser)
registeredEvents = {}
for automation in filtered:
# Handle both dict and object access patterns
if isinstance(automation, dict):
automationId = automation.get('id')
isActive = automation.get('active', False)
currentEventId = automation.get('eventId')
schedule = automation.get('schedule')
else:
automationId = automation.id
isActive = automation.active if hasattr(automation, 'active') else False
currentEventId = automation.eventId if hasattr(automation, 'eventId') else None
schedule = automation.schedule if hasattr(automation, 'schedule') else None
if not schedule:
logger.warning(f"Automation {automationId} has no schedule, skipping")
continue
try:
# Parse schedule to cron kwargs
cronKwargs = parseScheduleToCron(schedule)
if isActive:
# Remove existing event if present (handles schedule changes)
if currentEventId:
try:
eventManager.remove(currentEventId)
except Exception as e:
logger.warning(f"Error removing old event {currentEventId}: {str(e)}")
# Register new event
newEventId = f"automation.{automationId}"
# Create event handler function
handler = createAutomationEventHandler(automationId, eventUser)
# Register cron job
eventManager.registerCron(
jobId=newEventId,
func=handler,
cronKwargs=cronKwargs,
replaceExisting=True
)
# Update automation with new eventId
if currentEventId != newEventId:
services.interfaceDbAutomation.updateAutomationDefinition(
automationId,
{"eventId": newEventId}
)
registeredEvents[automationId] = newEventId
else:
# Remove event if exists
if currentEventId:
try:
eventManager.remove(currentEventId)
services.interfaceDbAutomation.updateAutomationDefinition(
automationId,
{"eventId": None}
)
except Exception as e:
logger.warning(f"Error removing event {currentEventId}: {str(e)}")
except Exception as e:
logger.error(f"Error syncing automation {automationId}: {str(e)}")
return {
"synced": len(registeredEvents),
"events": registeredEvents
}
def createAutomationEventHandler(automationId: str, eventUser):
"""Create event handler function for a specific automation.
Args:
automationId: ID of automation to create handler for
eventUser: System-level event user for accessing automations (captured in closure)
Returns:
Async handler function for scheduled automation execution
"""
async def handler():
try:
if not eventUser:
logger.error("Event user not available for automation execution")
return
# Load automation using SysAdmin eventUser (has unrestricted access)
eventServices = getServices(eventUser, None)
automation = eventServices.interfaceDbAutomation.getAutomationDefinition(automationId, includeSystemFields=True)
if not automation or not getattr(automation, "active", False):
logger.warning(f"Automation {automationId} not found or not active, skipping execution")
return
# Get creator user ID from automation's _createdBy system field
creatorUserId = getattr(automation, "_createdBy", None)
if not creatorUserId:
logger.error(f"Automation {automationId} has no creator user (_createdBy missing)")
return
# Get creator user from database (using SysAdmin access)
creatorUser = eventServices.interfaceDbApp.getUser(creatorUserId)
if not creatorUser:
logger.error(f"Creator user {creatorUserId} not found for automation {automationId}")
return
# Execute automation — pass automation object and creatorUser directly
# No re-load needed in executeAutomation
await executeAutomation(automationId, automation, creatorUser, eventServices)
logger.info(f"Successfully executed automation {automationId} as user {creatorUserId}")
except Exception as e:
logger.error(f"Error executing automation {automationId}: {str(e)}")
return handler