# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Main workflow service - handles workflow execution, scheduling, and chat playground operations. Combines functionality from: - mainAutomation.py: Automation workflow execution and scheduling - mainChatPlayground.py: Chat playground workflow start/stop operations """ import logging import json from typing import Dict, Any, Optional from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition from modules.datamodels.datamodelUam import User from modules.shared.timeUtils import getUtcTimestamp from modules.shared.eventManagement import eventManager from modules.features.automation.mainAutomation import getAutomationServices from modules.workflows.workflowManager import WorkflowManager from .subAutomationUtils import parseScheduleToCron, planToPrompt, replacePlaceholders logger = logging.getLogger(__name__) async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None) -> ChatWorkflow: """ Starts a new chat or continues an existing one, then launches processing asynchronously. Args: currentUser: Current user userInput: User input request workflowId: Optional workflow ID to continue existing workflow workflowMode: Workflow mode (Dynamic, Automation, etc.) mandateId: Mandate ID (required for billing) featureInstanceId: Feature instance ID (required for billing) featureCode: Feature code (e.g., 'chatplayground', 'automation') """ try: services = getAutomationServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) # Store allowedProviders in services context for model selection if hasattr(userInput, 'allowedProviders') and userInput.allowedProviders: services.allowedProviders = userInput.allowedProviders logger.info(f"AI provider filter active: {userInput.allowedProviders}") # Store feature code in services (for billing) if featureCode: services.featureCode = featureCode workflowManager = WorkflowManager(services) workflow = await workflowManager.workflowStart(userInput, workflowMode, workflowId) return workflow except Exception as e: logger.error(f"Error starting chat: {str(e)}") raise async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None) -> ChatWorkflow: """Stops a running chat.""" try: services = getAutomationServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) if featureCode: services.featureCode = featureCode workflowManager = WorkflowManager(services) return await workflowManager.workflowStop(workflowId) except Exception as e: logger.error(f"Error stopping chat: {str(e)}") raise async def executeAutomation(automationId: str, automation, creatorUser: User, services) -> ChatWorkflow: """Execute automation workflow with the creator user's context. The automation object and creatorUser are resolved by the caller (handler) using the SysAdmin eventUser. This function does NOT re-load them. Args: automationId: ID of automation to execute automation: Pre-loaded automation object (with system fields like _createdBy) creatorUser: The user who created the automation (workflow runs in this context) services: Services instance (used for interfaceDbApp etc.) Returns: ChatWorkflow instance created by automation execution """ executionStartTime = getUtcTimestamp() executionLog = { "timestamp": executionStartTime, "workflowId": None, "status": "running", "messages": [] } try: executionLog["messages"].append(f"Started execution at {executionStartTime}") # Store allowed providers from automation in services context if hasattr(automation, 'allowedProviders') and automation.allowedProviders: services.allowedProviders = automation.allowedProviders logger.debug(f"Automation {automationId} restricted to providers: {automation.allowedProviders}") # Context comes EXCLUSIVELY from the automation definition automationMandateId = str(automation.mandateId) if automation.mandateId is not None else None automationFeatureInstanceId = str(automation.featureInstanceId) if automation.featureInstanceId is not None else None if not automationMandateId or not automationFeatureInstanceId: raise ValueError(f"Automation {automationId} missing mandateId or featureInstanceId") logger.info(f"Executing automation {automationId} as user {creatorUser.id} with mandateId={automationMandateId}, featureInstanceId={automationFeatureInstanceId}") # 1. Replace placeholders in template to generate plan template = automation.template or "" placeholders = automation.placeholders or {} planJson = replacePlaceholders(template, placeholders) try: plan = json.loads(planJson) except json.JSONDecodeError as e: logger.error(f"Failed to parse plan JSON after placeholder replacement: {str(e)}") logger.error(f"Template: {template[:500]}...") logger.error(f"Placeholders: {placeholders}") logger.error(f"Generated planJson (first 1000 chars): {planJson[:1000]}") logger.error(f"Error position: line {e.lineno}, column {e.colno}, char {e.pos}") if e.pos is not None: start = max(0, e.pos - 100) end = min(len(planJson), e.pos + 100) logger.error(f"Context around error: ...{planJson[start:end]}...") raise ValueError(f"Invalid JSON after placeholder replacement: {str(e)}") executionLog["messages"].append("Template placeholders replaced successfully") executionLog["messages"].append(f"Using creator user: {creatorUser.id}") # 2. Create UserInputRequest from plan # Embed plan JSON in prompt for TemplateMode to extract promptText = planToPrompt(plan) planJsonStr = json.dumps(plan) # Embed plan as JSON comment so TemplateMode can extract it promptWithPlan = f"{promptText}\n\n\n{planJsonStr}\n" userInput = UserInputRequest( prompt=promptWithPlan, listFileId=[], userLanguage=creatorUser.language or "en" ) executionLog["messages"].append("Starting workflow execution") # 3. Start workflow using chatStart with creator's context # mandateId and featureInstanceId come from the automation definition workflow = await chatStart( currentUser=creatorUser, userInput=userInput, workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION, workflowId=None, mandateId=automationMandateId, featureInstanceId=automationFeatureInstanceId, featureCode='automation' ) executionLog["workflowId"] = workflow.id executionLog["status"] = "completed" executionLog["messages"].append(f"Workflow {workflow.id} started successfully") logger.info(f"Started workflow {workflow.id} with plan containing {len(plan.get('tasks', []))} tasks (plan embedded in userInput)") # Set workflow name with "automated" prefix — use creatorUser's Services # (services parameter is eventServices with eventUser context, must use creatorUser context) creatorServices = getAutomationServices(creatorUser, mandateId=automationMandateId, featureInstanceId=automationFeatureInstanceId) automationLabel = automation.label or "Unknown Automation" workflowName = f"automated: {automationLabel}" creatorServices.interfaceDbChat.updateWorkflow(workflow.id, {"name": workflowName}) logger.info(f"Set workflow {workflow.id} name to: {workflowName}") # Save execution log (bypasses RBAC — system operation, not a user edit) executionLogs = list(automation.executionLogs or []) executionLogs.append(executionLog) # Keep only last 50 executions if len(executionLogs) > 50: executionLogs = executionLogs[-50:] services.interfaceDbAutomation._saveExecutionLog(automationId, executionLogs) return workflow except Exception as e: # Log error to execution log executionLog["status"] = "error" executionLog["messages"].append(f"Error: {str(e)}") # Save execution log even on error (bypasses RBAC — system operation) # Use the automation object already passed in (no re-load needed) try: executionLogs = list(getattr(automation, 'executionLogs', None) or []) executionLogs.append(executionLog) if len(executionLogs) > 50: executionLogs = executionLogs[-50:] services.interfaceDbAutomation._saveExecutionLog(automationId, executionLogs) except Exception as logError: logger.error(f"Error saving execution log: {str(logError)}") raise def syncAutomationEvents(services, eventUser) -> Dict[str, Any]: """Sync scheduler with all active automations. All operations (DB reads, scheduler registration) are synchronous. Args: services: Services instance for data access eventUser: System-level event user for accessing automations Returns: Dictionary with sync results (synced count and event IDs) """ # Get all automation definitions filtered by RBAC (for current mandate) filtered = services.interfaceDbAutomation.getAllAutomationDefinitionsWithRBAC(eventUser) registeredEvents = {} for automation in filtered: # Handle both dict and object access patterns if isinstance(automation, dict): automationId = automation.get('id') isActive = automation.get('active', False) currentEventId = automation.get('eventId') schedule = automation.get('schedule') else: automationId = automation.id isActive = automation.active if hasattr(automation, 'active') else False currentEventId = automation.eventId if hasattr(automation, 'eventId') else None schedule = automation.schedule if hasattr(automation, 'schedule') else None if not schedule: logger.warning(f"Automation {automationId} has no schedule, skipping") continue try: # Parse schedule to cron kwargs cronKwargs = parseScheduleToCron(schedule) if isActive: newEventId = f"automation.{automationId}" handler = createAutomationEventHandler(automationId, eventUser) # Register with replaceExisting=True (atomically replaces old event) eventManager.registerCron( jobId=newEventId, func=handler, cronKwargs=cronKwargs, replaceExisting=True ) # Update automation with new eventId if currentEventId != newEventId: services.interfaceDbAutomation.updateAutomationDefinition( automationId, {"eventId": newEventId} ) registeredEvents[automationId] = newEventId else: # Remove event if exists if currentEventId: try: eventManager.remove(currentEventId) services.interfaceDbAutomation.updateAutomationDefinition( automationId, {"eventId": None} ) except Exception as e: logger.warning(f"Error removing event {currentEventId}: {str(e)}") except Exception as e: logger.error(f"Error syncing automation {automationId}: {str(e)}") return { "synced": len(registeredEvents), "events": registeredEvents } def createAutomationEventHandler(automationId: str, eventUser): """Create event handler function for a specific automation. Args: automationId: ID of automation to create handler for eventUser: System-level event user for accessing automations (captured in closure) Returns: Async handler function for scheduled automation execution """ async def handler(): try: if not eventUser: logger.error("Event user not available for automation execution") return # Load automation using SysAdmin eventUser (has unrestricted access) eventServices = getAutomationServices(eventUser, mandateId=None, featureInstanceId=None) automation = eventServices.interfaceDbAutomation.getAutomationDefinition(automationId, includeSystemFields=True) if not automation or not getattr(automation, "active", False): logger.warning(f"Automation {automationId} not found or not active, skipping execution") return # Get creator user ID from automation's _createdBy system field creatorUserId = getattr(automation, "_createdBy", None) if not creatorUserId: logger.error(f"Automation {automationId} has no creator user (_createdBy missing)") return # Get creator user from database (using SysAdmin access) creatorUser = eventServices.interfaceDbApp.getUser(creatorUserId) if not creatorUser: logger.error(f"Creator user {creatorUserId} not found for automation {automationId}") return # Execute automation — pass automation object and creatorUser directly # No re-load needed in executeAutomation await executeAutomation(automationId, automation, creatorUser, eventServices) logger.info(f"Successfully executed automation {automationId} as user {creatorUserId}") except Exception as e: logger.error(f"Error executing automation {automationId}: {str(e)}") return handler