317 lines
13 KiB
Python
317 lines
13 KiB
Python
"""
|
|
Main workflow service - handles workflow execution, scheduling, and chat playground operations.
|
|
|
|
Combines functionality from:
|
|
- mainAutomation.py: Automation workflow execution and scheduling
|
|
- mainChatPlayground.py: Chat playground workflow start/stop operations
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
from typing import Dict, Any, Optional
|
|
|
|
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum, AutomationDefinition
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
from modules.shared.eventManagement import eventManager
|
|
from modules.services import getInterface as getServices
|
|
from modules.workflows.workflowManager import WorkflowManager
|
|
from .subAutomationUtils import parseScheduleToCron, planToPrompt, replacePlaceholders
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow:
|
|
"""
|
|
Starts a new chat or continues an existing one, then launches processing asynchronously.
|
|
|
|
Args:
|
|
currentUser: Current user
|
|
userInput: User input request
|
|
workflowId: Optional workflow ID to continue existing workflow
|
|
workflowMode: "Dynamic" for iterative dynamic-style processing, "Automation" for automated workflow execution
|
|
|
|
Example usage for Dynamic mode:
|
|
workflow = await chatStart(currentUser, userInput, workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC)
|
|
"""
|
|
try:
|
|
services = getServices(currentUser, None)
|
|
workflowManager = WorkflowManager(services)
|
|
workflow = await workflowManager.workflowStart(userInput, workflowMode, workflowId)
|
|
return workflow
|
|
except Exception as e:
|
|
logger.error(f"Error starting chat: {str(e)}")
|
|
raise
|
|
|
|
async def chatStop(currentUser: User, workflowId: str) -> ChatWorkflow:
|
|
"""Stops a running chat."""
|
|
try:
|
|
services = getServices(currentUser, None)
|
|
workflowManager = WorkflowManager(services)
|
|
return await workflowManager.workflowStop(workflowId)
|
|
except Exception as e:
|
|
logger.error(f"Error stopping chat: {str(e)}")
|
|
raise
|
|
|
|
|
|
async def executeAutomation(automationId: str, services) -> ChatWorkflow:
|
|
"""Execute automation workflow immediately (test mode) with placeholder replacement.
|
|
|
|
Args:
|
|
automationId: ID of automation to execute
|
|
services: Services instance for data access
|
|
|
|
Returns:
|
|
ChatWorkflow instance created by automation execution
|
|
"""
|
|
executionStartTime = getUtcTimestamp()
|
|
executionLog = {
|
|
"timestamp": executionStartTime,
|
|
"workflowId": None,
|
|
"status": "running",
|
|
"messages": []
|
|
}
|
|
|
|
try:
|
|
# 1. Load automation definition
|
|
automation = services.interfaceDbChat.getAutomationDefinition(automationId)
|
|
if not automation:
|
|
raise ValueError(f"Automation {automationId} not found")
|
|
|
|
executionLog["messages"].append(f"Started execution at {executionStartTime}")
|
|
|
|
# 2. Replace placeholders in template to generate plan
|
|
template = automation.get("template", "")
|
|
placeholders = automation.get("placeholders", {})
|
|
planJson = replacePlaceholders(template, placeholders)
|
|
try:
|
|
plan = json.loads(planJson)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse plan JSON after placeholder replacement: {str(e)}")
|
|
logger.error(f"Template: {template[:500]}...")
|
|
logger.error(f"Placeholders: {placeholders}")
|
|
logger.error(f"Generated planJson (first 1000 chars): {planJson[:1000]}")
|
|
logger.error(f"Error position: line {e.lineno}, column {e.colno}, char {e.pos}")
|
|
if e.pos:
|
|
start = max(0, e.pos - 100)
|
|
end = min(len(planJson), e.pos + 100)
|
|
logger.error(f"Context around error: ...{planJson[start:end]}...")
|
|
raise ValueError(f"Invalid JSON after placeholder replacement: {str(e)}")
|
|
executionLog["messages"].append("Template placeholders replaced successfully")
|
|
|
|
# 3. Get user who created automation
|
|
creatorUserId = automation.get("_createdBy")
|
|
|
|
# CRITICAL: Automation MUST run as creator user only, or fail
|
|
if not creatorUserId:
|
|
errorMsg = f"Automation {automationId} has no creator user (_createdBy field missing). Cannot execute automation."
|
|
logger.error(errorMsg)
|
|
executionLog["messages"].append(errorMsg)
|
|
raise ValueError(errorMsg)
|
|
|
|
# Get user from database using services
|
|
creatorUser = services.interfaceDbApp.getUser(creatorUserId)
|
|
if not creatorUser:
|
|
raise ValueError(f"Creator user {creatorUserId} not found")
|
|
|
|
executionLog["messages"].append(f"Using creator user: {creatorUserId}")
|
|
|
|
# 4. Create UserInputRequest from plan
|
|
# Embed plan JSON in prompt for TemplateMode to extract
|
|
promptText = planToPrompt(plan)
|
|
planJsonStr = json.dumps(plan)
|
|
# Embed plan as JSON comment so TemplateMode can extract it
|
|
promptWithPlan = f"{promptText}\n\n<!--TEMPLATE_PLAN_START-->\n{planJsonStr}\n<!--TEMPLATE_PLAN_END-->"
|
|
|
|
userInput = UserInputRequest(
|
|
prompt=promptWithPlan,
|
|
listFileId=[],
|
|
userLanguage=creatorUser.language or "en"
|
|
)
|
|
|
|
executionLog["messages"].append("Starting workflow execution")
|
|
|
|
# 5. Start workflow using chatStart
|
|
workflow = await chatStart(
|
|
currentUser=creatorUser,
|
|
userInput=userInput,
|
|
workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION,
|
|
workflowId=None
|
|
)
|
|
|
|
executionLog["workflowId"] = workflow.id
|
|
executionLog["status"] = "completed"
|
|
executionLog["messages"].append(f"Workflow {workflow.id} started successfully")
|
|
logger.info(f"Started workflow {workflow.id} with plan containing {len(plan.get('tasks', []))} tasks (plan embedded in userInput)")
|
|
|
|
# Set workflow name with "automated" prefix
|
|
automationLabel = automation.get("label", "Unknown Automation")
|
|
workflowName = f"automated: {automationLabel}"
|
|
workflow = services.interfaceDbChat.updateWorkflow(workflow.id, {"name": workflowName})
|
|
logger.info(f"Set workflow {workflow.id} name to: {workflowName}")
|
|
|
|
# Update automation with execution log
|
|
executionLogs = automation.get("executionLogs", [])
|
|
executionLogs.append(executionLog)
|
|
# Keep only last 50 executions
|
|
if len(executionLogs) > 50:
|
|
executionLogs = executionLogs[-50:]
|
|
|
|
services.interfaceDbChat.updateAutomationDefinition(
|
|
automationId,
|
|
{"executionLogs": executionLogs}
|
|
)
|
|
|
|
return workflow
|
|
except Exception as e:
|
|
# Log error to execution log
|
|
executionLog["status"] = "error"
|
|
executionLog["messages"].append(f"Error: {str(e)}")
|
|
|
|
# Update automation with execution log even on error
|
|
try:
|
|
automation = services.interfaceDbChat.getAutomationDefinition(automationId)
|
|
if automation:
|
|
executionLogs = automation.get("executionLogs", [])
|
|
executionLogs.append(executionLog)
|
|
if len(executionLogs) > 50:
|
|
executionLogs = executionLogs[-50:]
|
|
services.interfaceDbChat.updateAutomationDefinition(
|
|
automationId,
|
|
{"executionLogs": executionLogs}
|
|
)
|
|
except Exception as logError:
|
|
logger.error(f"Error saving execution log: {str(logError)}")
|
|
|
|
raise
|
|
|
|
|
|
async def syncAutomationEvents(services, eventUser) -> Dict[str, Any]:
|
|
"""Automation event handler - syncs scheduler with all active automations.
|
|
|
|
Args:
|
|
services: Services instance for data access
|
|
eventUser: System-level event user for accessing automations
|
|
|
|
Returns:
|
|
Dictionary with sync results (synced count and event IDs)
|
|
"""
|
|
# Get all automation definitions filtered by RBAC (for current mandate)
|
|
filtered = services.interfaceDbChat.getAllAutomationDefinitionsWithRBAC(eventUser)
|
|
|
|
registeredEvents = {}
|
|
|
|
for automation in filtered:
|
|
automationId = automation.get("id")
|
|
isActive = automation.get("active", False)
|
|
currentEventId = automation.get("eventId")
|
|
schedule = automation.get("schedule")
|
|
|
|
if not schedule:
|
|
logger.warning(f"Automation {automationId} has no schedule, skipping")
|
|
continue
|
|
|
|
try:
|
|
# Parse schedule to cron kwargs
|
|
cronKwargs = parseScheduleToCron(schedule)
|
|
|
|
if isActive:
|
|
# Remove existing event if present (handles schedule changes)
|
|
if currentEventId:
|
|
try:
|
|
eventManager.remove(currentEventId)
|
|
except Exception as e:
|
|
logger.warning(f"Error removing old event {currentEventId}: {str(e)}")
|
|
|
|
# Register new event
|
|
newEventId = f"automation.{automationId}"
|
|
|
|
# Create event handler function
|
|
handler = createAutomationEventHandler(automationId, eventUser)
|
|
|
|
# Register cron job
|
|
eventManager.registerCron(
|
|
jobId=newEventId,
|
|
func=handler,
|
|
cronKwargs=cronKwargs,
|
|
replaceExisting=True
|
|
)
|
|
|
|
# Update automation with new eventId
|
|
if currentEventId != newEventId:
|
|
services.interfaceDbChat.updateAutomationDefinition(
|
|
automationId,
|
|
{"eventId": newEventId}
|
|
)
|
|
|
|
registeredEvents[automationId] = newEventId
|
|
else:
|
|
# Remove event if exists
|
|
if currentEventId:
|
|
try:
|
|
eventManager.remove(currentEventId)
|
|
services.interfaceDbChat.updateAutomationDefinition(
|
|
automationId,
|
|
{"eventId": None}
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Error removing event {currentEventId}: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Error syncing automation {automationId}: {str(e)}")
|
|
|
|
return {
|
|
"synced": len(registeredEvents),
|
|
"events": registeredEvents
|
|
}
|
|
|
|
|
|
def createAutomationEventHandler(automationId: str, eventUser):
|
|
"""Create event handler function for a specific automation.
|
|
|
|
Args:
|
|
automationId: ID of automation to create handler for
|
|
eventUser: System-level event user for accessing automations (captured in closure)
|
|
|
|
Returns:
|
|
Async handler function for scheduled automation execution
|
|
"""
|
|
async def handler():
|
|
try:
|
|
if not eventUser:
|
|
logger.error("Event user not available for automation execution")
|
|
return
|
|
|
|
# Get services for event user (provides access to interfaces)
|
|
eventServices = getServices(eventUser, None)
|
|
|
|
# Load automation using event user context
|
|
automation = eventServices.interfaceDbChat.getAutomationDefinition(automationId)
|
|
if not automation or not automation.get("active"):
|
|
logger.warning(f"Automation {automationId} not found or not active, skipping execution")
|
|
return
|
|
|
|
# Get creator user
|
|
creatorUserId = automation.get("_createdBy")
|
|
if not creatorUserId:
|
|
logger.error(f"Automation {automationId} has no creator user")
|
|
return
|
|
|
|
# Get creator user from database using services
|
|
eventServices = getServices(eventUser, None)
|
|
creatorUser = eventServices.interfaceDbApp.getUser(creatorUserId)
|
|
if not creatorUser:
|
|
logger.error(f"Creator user {creatorUserId} not found for automation {automationId}")
|
|
return
|
|
|
|
# Get services for creator user (provides access to interfaces)
|
|
creatorServices = getServices(creatorUser, None)
|
|
|
|
# Execute automation with creator user's context
|
|
# executeAutomation is in same module, so we can call it directly
|
|
await executeAutomation(automationId, creatorServices)
|
|
logger.info(f"Successfully executed automation {automationId} as user {creatorUserId}")
|
|
except Exception as e:
|
|
logger.error(f"Error executing automation {automationId}: {str(e)}")
|
|
|
|
return handler
|
|
|