# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Main workflow service - handles workflow execution, scheduling, and chat playground operations. Combines functionality from: - mainAutomation.py: Automation workflow execution and scheduling - mainChatPlayground.py: Chat playground workflow start/stop operations """ import logging import json from typing import Dict, Any, Optional from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition from modules.datamodels.datamodelUam import User from modules.shared.timeUtils import getUtcTimestamp from modules.shared.eventManagement import eventManager from modules.services import getInterface as getServices from modules.workflows.workflowManager import WorkflowManager from .subAutomationUtils import parseScheduleToCron, planToPrompt, replacePlaceholders logger = logging.getLogger(__name__) async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> ChatWorkflow: """ Starts a new chat or continues an existing one, then launches processing asynchronously. Args: currentUser: Current user userInput: User input request workflowId: Optional workflow ID to continue existing workflow workflowMode: "Dynamic" for iterative dynamic-style processing, "Automation" for automated workflow execution mandateId: Mandate ID from request context (required for proper data isolation) featureInstanceId: Feature instance ID for context Example usage for Dynamic mode: workflow = await chatStart(currentUser, userInput, workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC, mandateId=mandateId) """ try: services = getServices(currentUser, mandateId=mandateId) # Store preferred providers in services context for billing/model selection # Support both preferredProviders (list) and legacy preferredProvider (string) if hasattr(userInput, 'preferredProviders') and userInput.preferredProviders: services.preferredProviders = userInput.preferredProviders logger.debug(f"Using preferred providers: {userInput.preferredProviders}") elif hasattr(userInput, 'preferredProvider') and userInput.preferredProvider: services.preferredProviders = [userInput.preferredProvider] logger.debug(f"Using preferred provider (legacy): {userInput.preferredProvider}") # Store feature instance ID in services context if featureInstanceId: services.featureInstanceId = featureInstanceId services.featureCode = 'chatplayground' workflowManager = WorkflowManager(services) workflow = await workflowManager.workflowStart(userInput, workflowMode, workflowId) return workflow except Exception as e: logger.error(f"Error starting chat: {str(e)}") raise async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> ChatWorkflow: """Stops a running chat.""" try: services = getServices(currentUser, mandateId=mandateId) # Store feature instance ID in services context for proper RBAC filtering if featureInstanceId: services.featureInstanceId = featureInstanceId services.featureCode = 'chatplayground' workflowManager = WorkflowManager(services) return await workflowManager.workflowStop(workflowId) except Exception as e: logger.error(f"Error stopping chat: {str(e)}") raise async def executeAutomation(automationId: str, services) -> ChatWorkflow: """Execute automation workflow immediately (test mode) with placeholder replacement. Args: automationId: ID of automation to execute services: Services instance for data access Returns: ChatWorkflow instance created by automation execution """ executionStartTime = getUtcTimestamp() executionLog = { "timestamp": executionStartTime, "workflowId": None, "status": "running", "messages": [] } try: # 1. Load automation definition (with system fields for _createdBy access) automation = services.interfaceDbAutomation.getAutomationDefinition(automationId, includeSystemFields=True) if not automation: raise ValueError(f"Automation {automationId} not found") executionLog["messages"].append(f"Started execution at {executionStartTime}") # Store allowed providers from automation in services context if hasattr(automation, 'allowedProviders') and automation.allowedProviders: services.allowedProviders = automation.allowedProviders logger.debug(f"Automation {automationId} restricted to providers: {automation.allowedProviders}") # Store feature context for billing services.featureCode = 'automation' # 2. Replace placeholders in template to generate plan template = automation.template or "" placeholders = automation.placeholders or {} planJson = replacePlaceholders(template, placeholders) try: plan = json.loads(planJson) except json.JSONDecodeError as e: logger.error(f"Failed to parse plan JSON after placeholder replacement: {str(e)}") logger.error(f"Template: {template[:500]}...") logger.error(f"Placeholders: {placeholders}") logger.error(f"Generated planJson (first 1000 chars): {planJson[:1000]}") logger.error(f"Error position: line {e.lineno}, column {e.colno}, char {e.pos}") if e.pos: start = max(0, e.pos - 100) end = min(len(planJson), e.pos + 100) logger.error(f"Context around error: ...{planJson[start:end]}...") raise ValueError(f"Invalid JSON after placeholder replacement: {str(e)}") executionLog["messages"].append("Template placeholders replaced successfully") # 3. Get user who created automation creatorUserId = getattr(automation, "_createdBy", None) # _createdBy is a system attribute - must be present if not creatorUserId: errorMsg = f"Automation {automationId} has no creator user (_createdBy field missing). Cannot execute automation." logger.error(errorMsg) executionLog["messages"].append(errorMsg) raise ValueError(errorMsg) # Get creator user from database creatorUser = services.interfaceDbApp.getUser(creatorUserId) if not creatorUser: raise ValueError(f"Creator user {creatorUserId} not found") executionLog["messages"].append(f"Using creator user: {creatorUserId}") # 4. Create UserInputRequest from plan # Embed plan JSON in prompt for TemplateMode to extract promptText = planToPrompt(plan) planJsonStr = json.dumps(plan) # Embed plan as JSON comment so TemplateMode can extract it promptWithPlan = f"{promptText}\n\n\n{planJsonStr}\n" userInput = UserInputRequest( prompt=promptWithPlan, listFileId=[], userLanguage=creatorUser.language or "en" ) executionLog["messages"].append("Starting workflow execution") # 5. Start workflow using chatStart workflow = await chatStart( currentUser=creatorUser, userInput=userInput, workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION, workflowId=None ) executionLog["workflowId"] = workflow.id executionLog["status"] = "completed" executionLog["messages"].append(f"Workflow {workflow.id} started successfully") logger.info(f"Started workflow {workflow.id} with plan containing {len(plan.get('tasks', []))} tasks (plan embedded in userInput)") # Set workflow name with "automated" prefix automationLabel = automation.label or "Unknown Automation" workflowName = f"automated: {automationLabel}" workflow = services.interfaceDbChat.updateWorkflow(workflow.id, {"name": workflowName}) logger.info(f"Set workflow {workflow.id} name to: {workflowName}") # Update automation with execution log executionLogs = list(automation.executionLogs or []) executionLogs.append(executionLog) # Keep only last 50 executions if len(executionLogs) > 50: executionLogs = executionLogs[-50:] services.interfaceDbAutomation.updateAutomationDefinition( automationId, {"executionLogs": executionLogs} ) return workflow except Exception as e: # Log error to execution log executionLog["status"] = "error" executionLog["messages"].append(f"Error: {str(e)}") # Update automation with execution log even on error try: automation = services.interfaceDbAutomation.getAutomationDefinition(automationId) if automation: executionLogs = list(automation.executionLogs or []) executionLogs.append(executionLog) if len(executionLogs) > 50: executionLogs = executionLogs[-50:] services.interfaceDbAutomation.updateAutomationDefinition( automationId, {"executionLogs": executionLogs} ) except Exception as logError: logger.error(f"Error saving execution log: {str(logError)}") raise async def syncAutomationEvents(services, eventUser) -> Dict[str, Any]: """Automation event handler - syncs scheduler with all active automations. Args: services: Services instance for data access eventUser: System-level event user for accessing automations Returns: Dictionary with sync results (synced count and event IDs) """ # Get all automation definitions filtered by RBAC (for current mandate) filtered = services.interfaceDbAutomation.getAllAutomationDefinitionsWithRBAC(eventUser) registeredEvents = {} for automation in filtered: # Handle both dict and object access patterns if isinstance(automation, dict): automationId = automation.get('id') isActive = automation.get('active', False) currentEventId = automation.get('eventId') schedule = automation.get('schedule') else: automationId = automation.id isActive = automation.active if hasattr(automation, 'active') else False currentEventId = automation.eventId if hasattr(automation, 'eventId') else None schedule = automation.schedule if hasattr(automation, 'schedule') else None if not schedule: logger.warning(f"Automation {automationId} has no schedule, skipping") continue try: # Parse schedule to cron kwargs cronKwargs = parseScheduleToCron(schedule) if isActive: # Remove existing event if present (handles schedule changes) if currentEventId: try: eventManager.remove(currentEventId) except Exception as e: logger.warning(f"Error removing old event {currentEventId}: {str(e)}") # Register new event newEventId = f"automation.{automationId}" # Create event handler function handler = createAutomationEventHandler(automationId, eventUser) # Register cron job eventManager.registerCron( jobId=newEventId, func=handler, cronKwargs=cronKwargs, replaceExisting=True ) # Update automation with new eventId if currentEventId != newEventId: services.interfaceDbAutomation.updateAutomationDefinition( automationId, {"eventId": newEventId} ) registeredEvents[automationId] = newEventId else: # Remove event if exists if currentEventId: try: eventManager.remove(currentEventId) services.interfaceDbAutomation.updateAutomationDefinition( automationId, {"eventId": None} ) except Exception as e: logger.warning(f"Error removing event {currentEventId}: {str(e)}") except Exception as e: logger.error(f"Error syncing automation {automationId}: {str(e)}") return { "synced": len(registeredEvents), "events": registeredEvents } def createAutomationEventHandler(automationId: str, eventUser): """Create event handler function for a specific automation. Args: automationId: ID of automation to create handler for eventUser: System-level event user for accessing automations (captured in closure) Returns: Async handler function for scheduled automation execution """ async def handler(): try: if not eventUser: logger.error("Event user not available for automation execution") return # Get services for event user (provides access to interfaces) eventServices = getServices(eventUser, None) # Load automation using event user context (with system fields for _createdBy access) automation = eventServices.interfaceDbAutomation.getAutomationDefinition(automationId, includeSystemFields=True) if not automation or not getattr(automation, "active", False): logger.warning(f"Automation {automationId} not found or not active, skipping execution") return # Get creator user creatorUserId = getattr(automation, "_createdBy", None) if not creatorUserId: logger.error(f"Automation {automationId} has no creator user") return # Get mandate context from automation definition automationMandateId = getattr(automation, "mandateId", None) # Get creator user from database using services eventServices = getServices(eventUser, None) creatorUser = eventServices.interfaceDbApp.getUser(creatorUserId) if not creatorUser: logger.error(f"Creator user {creatorUserId} not found for automation {automationId}") return # Get services for creator user WITH mandate context from automation creatorServices = getServices(creatorUser, automationMandateId) # Execute automation with creator user's context and mandate # executeAutomation is in same module, so we can call it directly await executeAutomation(automationId, creatorServices) logger.info(f"Successfully executed automation {automationId} as user {creatorUserId}") except Exception as e: logger.error(f"Error executing automation {automationId}: {str(e)}") return handler