329 lines
15 KiB
Python
329 lines
15 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Main workflow service - handles workflow execution, scheduling, and chat playground operations.
|
|
|
|
Combines functionality from:
|
|
- mainAutomation.py: Automation workflow execution and scheduling
|
|
- mainChatPlayground.py: Chat playground workflow start/stop operations
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
from typing import Dict, Any, Optional
|
|
|
|
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum
|
|
from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
from modules.shared.eventManagement import eventManager
|
|
from modules.features.automation.mainAutomation import getAutomationServices
|
|
from modules.workflows.workflowManager import WorkflowManager
|
|
from .subAutomationUtils import parseScheduleToCron, planToPrompt, replacePlaceholders
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None, services=None) -> ChatWorkflow:
|
|
"""
|
|
Starts a new chat or continues an existing one, then launches processing asynchronously.
|
|
|
|
Args:
|
|
currentUser: Current user
|
|
userInput: User input request
|
|
workflowId: Optional workflow ID to continue existing workflow
|
|
workflowMode: Workflow mode (Dynamic, Automation, etc.)
|
|
mandateId: Mandate ID (required for billing)
|
|
featureInstanceId: Feature instance ID (required for billing)
|
|
featureCode: Feature code (e.g., 'chatplayground', 'automation')
|
|
services: Pre-built service hub from the calling feature (required). Each feature must pass its own services.
|
|
"""
|
|
if services is None:
|
|
raise ValueError("services is required: each feature must pass its own service hub (e.g. getChatplaygroundServices, getAutomationServices)")
|
|
try:
|
|
|
|
# Store allowedProviders in services context for model selection
|
|
if hasattr(userInput, 'allowedProviders') and userInput.allowedProviders:
|
|
services.allowedProviders = userInput.allowedProviders
|
|
logger.info(f"AI provider filter active: {userInput.allowedProviders}")
|
|
|
|
# Store feature code in services (for billing)
|
|
if featureCode:
|
|
services.featureCode = featureCode
|
|
|
|
workflowManager = WorkflowManager(services)
|
|
workflow = await workflowManager.workflowStart(userInput, workflowMode, workflowId)
|
|
return workflow
|
|
except Exception as e:
|
|
logger.error(f"Error starting chat: {str(e)}")
|
|
raise
|
|
|
|
async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None, services=None) -> ChatWorkflow:
|
|
"""Stops a running chat. Caller must pass services from the owning feature."""
|
|
if services is None:
|
|
raise ValueError("services is required: each feature must pass its own service hub (e.g. getChatplaygroundServices, getAutomationServices)")
|
|
try:
|
|
if featureCode:
|
|
services.featureCode = featureCode
|
|
workflowManager = WorkflowManager(services)
|
|
return await workflowManager.workflowStop(workflowId)
|
|
except Exception as e:
|
|
logger.error(f"Error stopping chat: {str(e)}")
|
|
raise
|
|
|
|
|
|
async def executeAutomation(automationId: str, automation, creatorUser: User, services) -> ChatWorkflow:
|
|
"""Execute automation workflow with the creator user's context.
|
|
|
|
The automation object and creatorUser are resolved by the caller (handler)
|
|
using the SysAdmin eventUser. This function does NOT re-load them.
|
|
|
|
Args:
|
|
automationId: ID of automation to execute
|
|
automation: Pre-loaded automation object (with system fields like _createdBy)
|
|
creatorUser: The user who created the automation (workflow runs in this context)
|
|
services: Services instance (used for interfaceDbApp etc.)
|
|
|
|
Returns:
|
|
ChatWorkflow instance created by automation execution
|
|
"""
|
|
executionStartTime = getUtcTimestamp()
|
|
executionLog = {
|
|
"timestamp": executionStartTime,
|
|
"workflowId": None,
|
|
"status": "running",
|
|
"messages": []
|
|
}
|
|
|
|
try:
|
|
executionLog["messages"].append(f"Started execution at {executionStartTime}")
|
|
|
|
# Store allowed providers from automation in services context
|
|
if hasattr(automation, 'allowedProviders') and automation.allowedProviders:
|
|
services.allowedProviders = automation.allowedProviders
|
|
logger.debug(f"Automation {automationId} restricted to providers: {automation.allowedProviders}")
|
|
|
|
# Context comes EXCLUSIVELY from the automation definition
|
|
automationMandateId = str(automation.mandateId) if automation.mandateId is not None else None
|
|
automationFeatureInstanceId = str(automation.featureInstanceId) if automation.featureInstanceId is not None else None
|
|
|
|
if not automationMandateId or not automationFeatureInstanceId:
|
|
raise ValueError(f"Automation {automationId} missing mandateId or featureInstanceId")
|
|
|
|
logger.info(f"Executing automation {automationId} as user {creatorUser.id} with mandateId={automationMandateId}, featureInstanceId={automationFeatureInstanceId}")
|
|
|
|
# 1. Replace placeholders in template to generate plan
|
|
template = automation.template or ""
|
|
placeholders = automation.placeholders or {}
|
|
planJson = replacePlaceholders(template, placeholders)
|
|
try:
|
|
plan = json.loads(planJson)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse plan JSON after placeholder replacement: {str(e)}")
|
|
logger.error(f"Template: {template[:500]}...")
|
|
logger.error(f"Placeholders: {placeholders}")
|
|
logger.error(f"Generated planJson (first 1000 chars): {planJson[:1000]}")
|
|
logger.error(f"Error position: line {e.lineno}, column {e.colno}, char {e.pos}")
|
|
if e.pos is not None:
|
|
start = max(0, e.pos - 100)
|
|
end = min(len(planJson), e.pos + 100)
|
|
logger.error(f"Context around error: ...{planJson[start:end]}...")
|
|
raise ValueError(f"Invalid JSON after placeholder replacement: {str(e)}")
|
|
executionLog["messages"].append("Template placeholders replaced successfully")
|
|
executionLog["messages"].append(f"Using creator user: {creatorUser.id}")
|
|
|
|
# 2. Create UserInputRequest from plan
|
|
# Embed plan JSON in prompt for TemplateMode to extract
|
|
promptText = planToPrompt(plan)
|
|
planJsonStr = json.dumps(plan)
|
|
# Embed plan as JSON comment so TemplateMode can extract it
|
|
promptWithPlan = f"{promptText}\n\n<!--TEMPLATE_PLAN_START-->\n{planJsonStr}\n<!--TEMPLATE_PLAN_END-->"
|
|
|
|
userInput = UserInputRequest(
|
|
prompt=promptWithPlan,
|
|
listFileId=[],
|
|
userLanguage=creatorUser.language or "en"
|
|
)
|
|
|
|
executionLog["messages"].append("Starting workflow execution")
|
|
|
|
# 3. Start workflow using chatStart with creator's context
|
|
# mandateId and featureInstanceId come from the automation definition
|
|
# Each feature must pass its own services - no fallback
|
|
creatorServices = getAutomationServices(
|
|
creatorUser,
|
|
mandateId=automationMandateId,
|
|
featureInstanceId=automationFeatureInstanceId,
|
|
)
|
|
workflow = await chatStart(
|
|
currentUser=creatorUser,
|
|
userInput=userInput,
|
|
workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION,
|
|
workflowId=None,
|
|
mandateId=automationMandateId,
|
|
featureInstanceId=automationFeatureInstanceId,
|
|
featureCode='automation',
|
|
services=creatorServices,
|
|
)
|
|
|
|
executionLog["workflowId"] = workflow.id
|
|
executionLog["status"] = "completed"
|
|
executionLog["messages"].append(f"Workflow {workflow.id} started successfully")
|
|
logger.info(f"Started workflow {workflow.id} with plan containing {len(plan.get('tasks', []))} tasks (plan embedded in userInput)")
|
|
|
|
# Set workflow name with "automated" prefix — use creatorServices from chatStart
|
|
automationLabel = automation.label or "Unknown Automation"
|
|
workflowName = f"automated: {automationLabel}"
|
|
creatorServices.interfaceDbChat.updateWorkflow(workflow.id, {"name": workflowName})
|
|
logger.info(f"Set workflow {workflow.id} name to: {workflowName}")
|
|
|
|
# Save execution log (bypasses RBAC — system operation, not a user edit)
|
|
executionLogs = list(automation.executionLogs or [])
|
|
executionLogs.append(executionLog)
|
|
# Keep only last 50 executions
|
|
if len(executionLogs) > 50:
|
|
executionLogs = executionLogs[-50:]
|
|
|
|
services.interfaceDbAutomation._saveExecutionLog(automationId, executionLogs)
|
|
|
|
return workflow
|
|
except Exception as e:
|
|
# Log error to execution log
|
|
executionLog["status"] = "error"
|
|
executionLog["messages"].append(f"Error: {str(e)}")
|
|
|
|
# Save execution log even on error (bypasses RBAC — system operation)
|
|
# Use the automation object already passed in (no re-load needed)
|
|
try:
|
|
executionLogs = list(getattr(automation, 'executionLogs', None) or [])
|
|
executionLogs.append(executionLog)
|
|
if len(executionLogs) > 50:
|
|
executionLogs = executionLogs[-50:]
|
|
services.interfaceDbAutomation._saveExecutionLog(automationId, executionLogs)
|
|
except Exception as logError:
|
|
logger.error(f"Error saving execution log: {str(logError)}")
|
|
|
|
raise
|
|
|
|
|
|
def syncAutomationEvents(services, eventUser) -> Dict[str, Any]:
|
|
"""Sync scheduler with all active automations.
|
|
All operations (DB reads, scheduler registration) are synchronous.
|
|
|
|
Args:
|
|
services: Services instance for data access
|
|
eventUser: System-level event user for accessing automations
|
|
|
|
Returns:
|
|
Dictionary with sync results (synced count and event IDs)
|
|
"""
|
|
# Get all automation definitions filtered by RBAC (for current mandate)
|
|
filtered = services.interfaceDbAutomation.getAllAutomationDefinitionsWithRBAC(eventUser)
|
|
|
|
registeredEvents = {}
|
|
|
|
for automation in filtered:
|
|
# Handle both dict and object access patterns
|
|
if isinstance(automation, dict):
|
|
automationId = automation.get('id')
|
|
isActive = automation.get('active', False)
|
|
currentEventId = automation.get('eventId')
|
|
schedule = automation.get('schedule')
|
|
else:
|
|
automationId = automation.id
|
|
isActive = automation.active if hasattr(automation, 'active') else False
|
|
currentEventId = automation.eventId if hasattr(automation, 'eventId') else None
|
|
schedule = automation.schedule if hasattr(automation, 'schedule') else None
|
|
|
|
if not schedule:
|
|
logger.warning(f"Automation {automationId} has no schedule, skipping")
|
|
continue
|
|
|
|
try:
|
|
# Parse schedule to cron kwargs
|
|
cronKwargs = parseScheduleToCron(schedule)
|
|
|
|
if isActive:
|
|
newEventId = f"automation.{automationId}"
|
|
handler = createAutomationEventHandler(automationId, eventUser)
|
|
|
|
# Register with replaceExisting=True (atomically replaces old event)
|
|
eventManager.registerCron(
|
|
jobId=newEventId,
|
|
func=handler,
|
|
cronKwargs=cronKwargs,
|
|
replaceExisting=True
|
|
)
|
|
|
|
# Update automation with new eventId
|
|
if currentEventId != newEventId:
|
|
services.interfaceDbAutomation.updateAutomationDefinition(
|
|
automationId,
|
|
{"eventId": newEventId}
|
|
)
|
|
|
|
registeredEvents[automationId] = newEventId
|
|
else:
|
|
# Remove event if exists
|
|
if currentEventId:
|
|
try:
|
|
eventManager.remove(currentEventId)
|
|
services.interfaceDbAutomation.updateAutomationDefinition(
|
|
automationId,
|
|
{"eventId": None}
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Error removing event {currentEventId}: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Error syncing automation {automationId}: {str(e)}")
|
|
|
|
return {
|
|
"synced": len(registeredEvents),
|
|
"events": registeredEvents
|
|
}
|
|
|
|
|
|
def createAutomationEventHandler(automationId: str, eventUser):
|
|
"""Create event handler function for a specific automation.
|
|
|
|
Args:
|
|
automationId: ID of automation to create handler for
|
|
eventUser: System-level event user for accessing automations (captured in closure)
|
|
|
|
Returns:
|
|
Async handler function for scheduled automation execution
|
|
"""
|
|
async def handler():
|
|
try:
|
|
if not eventUser:
|
|
logger.error("Event user not available for automation execution")
|
|
return
|
|
|
|
# Load automation using SysAdmin eventUser (has unrestricted access)
|
|
eventServices = getAutomationServices(eventUser, mandateId=None, featureInstanceId=None)
|
|
automation = eventServices.interfaceDbAutomation.getAutomationDefinition(automationId, includeSystemFields=True)
|
|
if not automation or not getattr(automation, "active", False):
|
|
logger.warning(f"Automation {automationId} not found or not active, skipping execution")
|
|
return
|
|
|
|
# Get creator user ID from automation's _createdBy system field
|
|
creatorUserId = getattr(automation, "_createdBy", None)
|
|
if not creatorUserId:
|
|
logger.error(f"Automation {automationId} has no creator user (_createdBy missing)")
|
|
return
|
|
|
|
# Get creator user from database (using SysAdmin access)
|
|
creatorUser = eventServices.interfaceDbApp.getUser(creatorUserId)
|
|
if not creatorUser:
|
|
logger.error(f"Creator user {creatorUserId} not found for automation {automationId}")
|
|
return
|
|
|
|
# Execute automation — pass automation object and creatorUser directly
|
|
# No re-load needed in executeAutomation
|
|
await executeAutomation(automationId, automation, creatorUser, eventServices)
|
|
logger.info(f"Successfully executed automation {automationId} as user {creatorUserId}")
|
|
except Exception as e:
|
|
logger.error(f"Error executing automation {automationId}: {str(e)}")
|
|
|
|
return handler
|
|
|