diff --git a/app.py b/app.py index d4b469ff..a167503c 100644 --- a/app.py +++ b/app.py @@ -424,3 +424,9 @@ app.include_router(adminSecurityRouter) from modules.routes.routeSharepoint import router as sharepointRouter app.include_router(sharepointRouter) +from modules.routes.routeDataAutomation import router as automationRouter +app.include_router(automationRouter) + +from modules.routes.routeAdminAutomationEvents import router as adminAutomationEventsRouter +app.include_router(adminAutomationEventsRouter) + diff --git a/modules/datamodels/datamodelChat.py b/modules/datamodels/datamodelChat.py index cdeebfed..437eacb1 100644 --- a/modules/datamodels/datamodelChat.py +++ b/modules/datamodels/datamodelChat.py @@ -266,7 +266,7 @@ registerModelLabels( class WorkflowModeEnum(str, Enum): WORKFLOW_ACTIONPLAN = "Actionplan" WORKFLOW_DYNAMIC = "Dynamic" - WORKFLOW_TEMPLATE = "Template" + WORKFLOW_AUTOMATION = "Automation" registerModelLabels( @@ -275,7 +275,7 @@ registerModelLabels( { "WORKFLOW_ACTIONPLAN": {"en": "Actionplan", "fr": "Actionplan"}, "WORKFLOW_DYNAMIC": {"en": "Dynamic", "fr": "Dynamique"}, - "WORKFLOW_TEMPLATE": {"en": "Template", "fr": "Modèle"}, + "WORKFLOW_AUTOMATION": {"en": "Automation", "fr": "Automatisation"}, }, ) @@ -405,8 +405,8 @@ class ChatWorkflow(BaseModel): "label": {"en": "Dynamic", "fr": "Dynamique"}, }, { - "value": WorkflowModeEnum.WORKFLOW_TEMPLATE.value, - "label": {"en": "Template", "fr": "Modèle"}, + "value": WorkflowModeEnum.WORKFLOW_AUTOMATION.value, + "label": {"en": "Automation", "fr": "Automatisation"}, }, ], ) @@ -1010,3 +1010,81 @@ registerModelLabels( "placeholders": {"en": "Placeholders", "fr": "Espaces réservés"}, }, ) + + +class AutomationDefinition(BaseModel): + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), + description="Primary key", + frontend_type="text", + frontend_readonly=True, + frontend_required=False + ) + mandateId: str = Field( + description="Mandate ID", + frontend_type="text", + frontend_readonly=True, + frontend_required=False + ) + label: str = Field( + description="User-friendly name", + frontend_type="text", + frontend_required=True + ) + schedule: str = Field( + description="Cron schedule pattern", + frontend_type="select", + frontend_options=[ + {"value": "0 */4 * * *", "label": {"en": "Every 4 hours", "fr": "Toutes les 4 heures"}}, + {"value": "0 22 * * *", "label": {"en": "Daily at 22:00", "fr": "Quotidien à 22:00"}}, + {"value": "0 10 * * 1", "label": {"en": "Weekly Monday 10:00", "fr": "Hebdomadaire lundi 10:00"}} + ], + frontend_required=True + ) + template: str = Field( + description="JSON template with placeholders (format: {{KEY:PLACEHOLDER_NAME}})", + frontend_type="textarea", + frontend_required=True + ) + placeholders: Dict[str, str] = Field( + default_factory=dict, + description="Dictionary of placeholder key/value pairs (e.g., {'connectionName': 'MyConnection', 'sharepointFolderNameSource': '/folder/path', 'webResearchUrl': 'https://...', 'webResearchPrompt': '...', 'documentPrompt': '...'})", + frontend_type="text" + ) + active: bool = Field( + default=False, + description="Whether automation should be launched in event handler", + frontend_type="checkbox", + frontend_required=False + ) + eventId: Optional[str] = Field( + None, + description="Event ID from event management (None if not registered)", + frontend_type="text", + frontend_readonly=True, + frontend_required=False + ) + status: Optional[str] = Field( + None, + description="Status: 'active' if event is registered, 'inactive' if not (computed, readonly)", + frontend_type="text", + frontend_readonly=True, + frontend_required=False + ) + + +registerModelLabels( + "AutomationDefinition", + {"en": "Automation Definition", "fr": "Définition d'automatisation"}, + { + "id": {"en": "ID", "fr": "ID"}, + "mandateId": {"en": "Mandate ID", "fr": "ID du mandat"}, + "label": {"en": "Label", "fr": "Libellé"}, + "schedule": {"en": "Schedule", "fr": "Planification"}, + "template": {"en": "Template", "fr": "Modèle"}, + "placeholders": {"en": "Placeholders", "fr": "Espaces réservés"}, + "active": {"en": "Active", "fr": "Actif"}, + "eventId": {"en": "Event ID", "fr": "ID de l'événement"}, + "status": {"en": "Status", "fr": "Statut"}, + }, +) diff --git a/modules/features/featuresLifecycle.py b/modules/features/featuresLifecycle.py index 85650c14..1f09a592 100644 --- a/modules/features/featuresLifecycle.py +++ b/modules/features/featuresLifecycle.py @@ -13,6 +13,18 @@ async def start() -> None: from modules.features.syncDelta import mainSyncDelta mainSyncDelta.startSyncManager(eventUser) + # Feature Automation Events + if eventUser: + try: + from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface + chatInterface = getChatInterface(eventUser) + if hasattr(chatInterface, 'syncAutomationEvents'): + await chatInterface.syncAutomationEvents() + logger.info("Automation events synced on startup") + except Exception as e: + logger.error(f"Error syncing automation events on startup: {str(e)}") + # Don't fail startup if automation sync fails + # Feature ... return True diff --git a/modules/interfaces/interfaceDbChatAccess.py b/modules/interfaces/interfaceDbChatAccess.py index 3facab4d..4662484a 100644 --- a/modules/interfaces/interfaceDbChatAccess.py +++ b/modules/interfaces/interfaceDbChatAccess.py @@ -5,7 +5,7 @@ Handles user access management and permission checks. from typing import Dict, Any, List, Optional from modules.datamodels.datamodelUam import User, UserPrivilege -from modules.datamodels.datamodelChat import ChatWorkflow +from modules.datamodels.datamodelChat import ChatWorkflow, AutomationDefinition class ChatAccess: """ @@ -41,7 +41,13 @@ class ChatAccess: filtered_records = [] # Apply filtering based on privilege - if userPrivilege == UserPrivilege.SYSADMIN: + if table_name == "AutomationDefinition": + # Users see only their own automation definitions + filtered_records = [ + r for r in recordset + if r.get("mandateId","-") == self.mandateId and r.get("_createdBy") == self.userId + ] + elif userPrivilege == UserPrivilege.SYSADMIN: filtered_records = recordset # System admins see all records elif userPrivilege == UserPrivilege.ADMIN: # Admins see records in their mandate @@ -68,6 +74,10 @@ class ChatAccess: record["_hideView"] = False # Everyone can view record["_hideEdit"] = not self.canModify(ChatWorkflow, record.get("workflowId")) record["_hideDelete"] = not self.canModify(ChatWorkflow, record.get("workflowId")) + elif table_name == "AutomationDefinition": + record["_hideView"] = False # Everyone can view + record["_hideEdit"] = not self.canModify(AutomationDefinition, record_id) + record["_hideDelete"] = not self.canModify(AutomationDefinition, record_id) else: # Default access control for other tables record["_hideView"] = False diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 66927037..e6cb9aa0 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -18,8 +18,11 @@ from modules.datamodels.datamodelChat import ( ChatLog, ChatMessage, ChatWorkflow, - WorkflowModeEnum + WorkflowModeEnum, + AutomationDefinition, + UserInputRequest ) +import json from modules.datamodels.datamodelUam import User # DYNAMIC PART: Connectors to the Interface @@ -1207,6 +1210,361 @@ class ChatObjects: return {"items": items} + # ===== Automation Methods ===== + + def _computeAutomationStatus(self, automation: Dict[str, Any]) -> str: + """Compute status field based on eventId presence""" + eventId = automation.get("eventId") + return "active" if eventId else "inactive" + + def getAllAutomationDefinitions(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]: + """ + Returns automation definitions based on user access level. + Supports optional pagination, sorting, and filtering. + Computes status field for each automation. + """ + allAutomations = self.db.getRecordset(AutomationDefinition) + filteredAutomations = self._uam(AutomationDefinition, allAutomations) + + # Compute status for each automation + for automation in filteredAutomations: + automation["status"] = self._computeAutomationStatus(automation) + + # If no pagination requested, return all items + if pagination is None: + return filteredAutomations + + # Apply filtering (if filters provided) + if pagination.filters: + filteredAutomations = self._applyFilters(filteredAutomations, pagination.filters) + + # Apply sorting (in order of sortFields) + if pagination.sort: + filteredAutomations = self._applySorting(filteredAutomations, pagination.sort) + + # Count total items after filters + totalItems = len(filteredAutomations) + totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0 + + # Apply pagination (skip/limit) + startIdx = (pagination.page - 1) * pagination.pageSize + endIdx = startIdx + pagination.pageSize + pagedAutomations = filteredAutomations[startIdx:endIdx] + + return PaginatedResult( + items=pagedAutomations, + totalItems=totalItems, + totalPages=totalPages + ) + + def getAutomationDefinition(self, automationId: str) -> Optional[Dict[str, Any]]: + """Returns an automation definition by ID if user has access, with computed status.""" + try: + automations = self.db.getRecordset(AutomationDefinition, recordFilter={"id": automationId}) + filtered = self._uam(AutomationDefinition, automations) + + if not filtered: + return None + + automation = filtered[0] + automation["status"] = self._computeAutomationStatus(automation) + return automation + except Exception as e: + logger.error(f"Error getting automation definition: {str(e)}") + return None + + def createAutomationDefinition(self, automationData: Dict[str, Any]) -> Dict[str, Any]: + """Creates a new automation definition, then triggers sync.""" + try: + # Ensure ID is present + if "id" not in automationData or not automationData["id"]: + automationData["id"] = str(uuid.uuid4()) + + # Ensure mandateId is set + if "mandateId" not in automationData: + automationData["mandateId"] = self.mandateId + + # Use generic field separation + simpleFields, objectFields = self._separateObjectFields(AutomationDefinition, automationData) + + # Create automation in database + createdAutomation = self.db.recordCreate(AutomationDefinition, simpleFields) + + # Compute status + createdAutomation["status"] = self._computeAutomationStatus(createdAutomation) + + # Trigger sync (async, don't wait) + asyncio.create_task(self.syncAutomationEvents()) + + return createdAutomation + except Exception as e: + logger.error(f"Error creating automation definition: {str(e)}") + raise + + def updateAutomationDefinition(self, automationId: str, automationData: Dict[str, Any]) -> Dict[str, Any]: + """Updates an automation definition, then triggers sync.""" + try: + # Check access + existing = self.getAutomationDefinition(automationId) + if not existing: + raise PermissionError(f"No access to automation {automationId}") + + if not self._canModify(AutomationDefinition, automationId): + raise PermissionError(f"No permission to modify automation {automationId}") + + # Use generic field separation + simpleFields, objectFields = self._separateObjectFields(AutomationDefinition, automationData) + + # Update automation in database + updatedAutomation = self.db.recordModify(AutomationDefinition, automationId, simpleFields) + + # Compute status + updatedAutomation["status"] = self._computeAutomationStatus(updatedAutomation) + + # Trigger sync (async, don't wait) + asyncio.create_task(self.syncAutomationEvents()) + + return updatedAutomation + except Exception as e: + logger.error(f"Error updating automation definition: {str(e)}") + raise + + def deleteAutomationDefinition(self, automationId: str) -> bool: + """Deletes an automation definition, then triggers sync.""" + try: + # Check access + existing = self.getAutomationDefinition(automationId) + if not existing: + raise PermissionError(f"No access to automation {automationId}") + + if not self._canModify(AutomationDefinition, automationId): + raise PermissionError(f"No permission to delete automation {automationId}") + + # Remove event if exists + if existing.get("eventId"): + from modules.shared.eventManagement import eventManager + try: + eventManager.remove(existing["eventId"]) + except Exception as e: + logger.warning(f"Error removing event {existing['eventId']}: {str(e)}") + + # Delete automation from database + self.db.recordDelete(AutomationDefinition, automationId) + + # Trigger sync (async, don't wait) + asyncio.create_task(self.syncAutomationEvents()) + + return True + except Exception as e: + logger.error(f"Error deleting automation definition: {str(e)}") + raise + + def _replacePlaceholders(self, template: str, placeholders: Dict[str, str]) -> str: + """Replace placeholders in template with actual values. Placeholder format: {{KEY:PLACEHOLDER_NAME}}""" + result = template + for placeholderName, value in placeholders.items(): + pattern = f"{{{{KEY:{placeholderName}}}}}" + result = result.replace(pattern, str(value)) + return result + + def _parseScheduleToCron(self, schedule: str) -> Dict[str, Any]: + """Parse schedule string to cron kwargs for APScheduler""" + parts = schedule.split() + if len(parts) != 5: + raise ValueError(f"Invalid schedule format: {schedule}") + + return { + "minute": parts[0], + "hour": parts[1], + "day": parts[2], + "month": parts[3], + "day_of_week": parts[4] + } + + async def executeAutomation(self, automationId: str) -> ChatWorkflow: + """Execute automation workflow immediately (test mode) with placeholder replacement""" + # 1. Load automation definition + automation = self.getAutomationDefinition(automationId) + if not automation: + raise ValueError(f"Automation {automationId} not found") + + # 2. Replace placeholders in template to generate plan + template = automation.get("template", "") + placeholders = automation.get("placeholders", {}) + planJson = self._replacePlaceholders(template, placeholders) + plan = json.loads(planJson) + + # 3. Get user who created automation + creator_user_id = automation.get("_createdBy") + if not creator_user_id: + raise ValueError(f"Automation {automationId} has no creator user") + + # Get user from database + from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface + appInterface = getAppInterface(self.currentUser) + creator_user = appInterface.getUser(creator_user_id) + if not creator_user: + raise ValueError(f"Creator user {creator_user_id} not found") + + # 4. Create UserInputRequest from plan + # Embed plan JSON in prompt for TemplateMode to extract + promptText = self._planToPrompt(plan) + planJson = json.dumps(plan) + # Embed plan as JSON comment so TemplateMode can extract it + promptWithPlan = f"{promptText}\n\n\n{planJson}\n" + + userInput = UserInputRequest( + prompt=promptWithPlan, + listFileId=[], + userLanguage=creator_user.language or "en" + ) + + # 5. Start workflow using chatStart + from modules.features.chatPlayground.mainChatPlayground import chatStart + + workflow = await chatStart( + currentUser=creator_user, + userInput=userInput, + workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION, + workflowId=None + ) + + # Also store plan in module-level cache as backup (keyed by workflow ID) + from modules.workflows.processing.modes import modeAutomation + if not hasattr(modeAutomation, '_templatePlanCache'): + modeAutomation._templatePlanCache = {} + modeAutomation._templatePlanCache[workflow.id] = plan + logger.info(f"Stored template plan for workflow {workflow.id} (cache + prompt) with {len(plan.get('tasks', []))} tasks") + + return workflow + + def _planToPrompt(self, plan: Dict) -> str: + """Convert plan structure to prompt string for workflow execution""" + return plan.get("userMessage", plan.get("overview", "Execute automation workflow")) + + async def syncAutomationEvents(self) -> Dict[str, Any]: + """Automation event handler - syncs scheduler with all active automations.""" + from modules.shared.eventManagement import eventManager + + # Get all automation definitions (for current mandate) + allAutomations = self.db.getRecordset(AutomationDefinition) + filtered = self._uam(AutomationDefinition, allAutomations) + + registered_events = {} + + for automation in filtered: + automation_id = automation.get("id") + is_active = automation.get("active", False) + current_event_id = automation.get("eventId") + schedule = automation.get("schedule") + + if not schedule: + logger.warning(f"Automation {automation_id} has no schedule, skipping") + continue + + try: + # Parse schedule to cron kwargs + cron_kwargs = self._parseScheduleToCron(schedule) + + if is_active: + # Remove existing event if present (handles schedule changes) + if current_event_id: + try: + eventManager.remove(current_event_id) + except Exception as e: + logger.warning(f"Error removing old event {current_event_id}: {str(e)}") + + # Register new event + new_event_id = f"automation.{automation_id}" + + # Create event handler function + handler = self._createAutomationEventHandler(automation_id) + + # Register cron job + eventManager.registerCron( + jobId=new_event_id, + func=handler, + cronKwargs=cron_kwargs, + replaceExisting=True + ) + + # Update automation with new eventId + if current_event_id != new_event_id: + self.db.recordModify( + AutomationDefinition, + automation_id, + {"eventId": new_event_id} + ) + + registered_events[automation_id] = new_event_id + else: + # Remove event if exists + if current_event_id: + try: + eventManager.remove(current_event_id) + self.db.recordModify( + AutomationDefinition, + automation_id, + {"eventId": None} + ) + except Exception as e: + logger.warning(f"Error removing event {current_event_id}: {str(e)}") + except Exception as e: + logger.error(f"Error syncing automation {automation_id}: {str(e)}") + + return { + "synced": len(registered_events), + "events": registered_events + } + + def _createAutomationEventHandler(self, automationId: str): + """Create event handler function for a specific automation""" + async def handler(): + try: + # Get event user to access automation (event user can access all automations) + from modules.interfaces.interfaceDbAppObjects import getRootInterface + from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface + from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface + + rootInterface = getRootInterface() + eventUser = rootInterface.getUserByUsername("event") + + if not eventUser: + logger.error("Could not get event user for automation execution") + return + + # Create ChatObjects interface for event user (to access automation) + eventInterface = getChatInterface(eventUser) + + # Load automation using event user context + automation = eventInterface.getAutomationDefinition(automationId) + if not automation or not automation.get("active"): + logger.warning(f"Automation {automationId} not found or not active, skipping execution") + return + + # Get creator user + creator_user_id = automation.get("_createdBy") + if not creator_user_id: + logger.error(f"Automation {automationId} has no creator user") + return + + # Get creator user from database + appInterface = getAppInterface(eventUser) + creator_user = appInterface.getUser(creator_user_id) + if not creator_user: + logger.error(f"Creator user {creator_user_id} not found for automation {automationId}") + return + + # Create ChatObjects interface for creator user + creatorInterface = getChatInterface(creator_user) + + # Execute automation with creator user's context + await creatorInterface.executeAutomation(automationId) + logger.info(f"Successfully executed automation {automationId} as user {creator_user_id}") + except Exception as e: + logger.error(f"Error executing automation {automationId}: {str(e)}") + + return handler def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects': diff --git a/modules/routes/routeAdminAutomationEvents.py b/modules/routes/routeAdminAutomationEvents.py new file mode 100644 index 00000000..bb4a233f --- /dev/null +++ b/modules/routes/routeAdminAutomationEvents.py @@ -0,0 +1,150 @@ +""" +Admin automation events routes for the backend API. +Sysadmin-only endpoints for viewing and controlling automation events. +""" + +from fastapi import APIRouter, HTTPException, Depends, Path, Request, Response +from typing import List, Dict, Any +from fastapi import status +import logging + +# Import interfaces and models +import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects +from modules.security.auth import getCurrentUser, limiter +from modules.datamodels.datamodelUam import User, UserPrivilege + +# Configure logger +logger = logging.getLogger(__name__) + +# Create router for admin automation events endpoints +router = APIRouter( + prefix="/api/admin/automation-events", + tags=["Admin Automation Events"], + responses={ + 404: {"description": "Not found"}, + 400: {"description": "Bad request"}, + 401: {"description": "Unauthorized"}, + 403: {"description": "Forbidden - Sysadmin only"}, + 500: {"description": "Internal server error"} + } +) + +def requireSysadmin(currentUser: User): + """Require sysadmin privilege""" + if currentUser.privilege != UserPrivilege.SYSADMIN: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Sysadmin privilege required" + ) + +@router.get("") +@limiter.limit("30/minute") +async def get_all_automation_events( + request: Request, + currentUser: User = Depends(getCurrentUser) +) -> List[Dict[str, Any]]: + """ + Get all automation events across all mandates (sysadmin only). + Returns list of all registered events with their automation IDs and schedules. + """ + requireSysadmin(currentUser) + + try: + from modules.shared.eventManagement import eventManager + + # Get all jobs from scheduler + jobs = [] + if eventManager.scheduler: + for job in eventManager.scheduler.get_jobs(): + if job.id.startswith("automation."): + automation_id = job.id.replace("automation.", "") + jobs.append({ + "eventId": job.id, + "automationId": automation_id, + "nextRunTime": str(job.next_run_time) if job.next_run_time else None, + "trigger": str(job.trigger) if job.trigger else None + }) + + return jobs + except Exception as e: + logger.error(f"Error getting automation events: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error getting automation events: {str(e)}" + ) + +@router.post("/sync") +@limiter.limit("5/minute") +async def sync_all_automation_events( + request: Request, + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """ + Manually trigger sync for all automations (sysadmin only). + This will register/remove events based on active flags. + """ + requireSysadmin(currentUser) + + try: + chatInterface = interfaceDbChatObjects.getInterface(currentUser) + + if not hasattr(chatInterface, 'syncAutomationEvents'): + raise HTTPException( + status_code=501, + detail="Automation methods not available" + ) + + result = await chatInterface.syncAutomationEvents() + return { + "success": True, + "synced": result.get("synced", 0), + "events": result.get("events", {}) + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error syncing automation events: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error syncing automation events: {str(e)}" + ) + +@router.post("/{eventId}/remove") +@limiter.limit("10/minute") +async def remove_event( + request: Request, + eventId: str = Path(..., description="Event ID to remove"), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """ + Manually remove a specific event from scheduler (sysadmin only). + Used for debugging and manual event cleanup. + """ + requireSysadmin(currentUser) + + try: + from modules.shared.eventManagement import eventManager + + # Remove event + eventManager.remove(eventId) + + # Update automation's eventId if it exists + if eventId.startswith("automation."): + automation_id = eventId.replace("automation.", "") + chatInterface = interfaceDbChatObjects.getInterface(currentUser) + automation = chatInterface.getAutomationDefinition(automation_id) + if automation and automation.get("eventId") == eventId: + chatInterface.updateAutomationDefinition(automation_id, {"eventId": None}) + + return { + "success": True, + "eventId": eventId, + "message": f"Event {eventId} removed successfully" + } + except Exception as e: + logger.error(f"Error removing event: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error removing event: {str(e)}" + ) + diff --git a/modules/routes/routeDataAutomation.py b/modules/routes/routeDataAutomation.py new file mode 100644 index 00000000..7e268c0b --- /dev/null +++ b/modules/routes/routeDataAutomation.py @@ -0,0 +1,236 @@ +""" +Automation routes for the backend API. +Implements the endpoints for automation definition management. +""" + +from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query +from typing import List, Dict, Any, Optional +from fastapi import status +import logging +import json + +# Import interfaces and models +from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface +from modules.security.auth import getCurrentUser, limiter +from modules.datamodels.datamodelChat import AutomationDefinition, ChatWorkflow +from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata +from modules.shared.attributeUtils import getModelAttributeDefinitions + +# Configure logger +logger = logging.getLogger(__name__) + +# Model attributes for AutomationDefinition +automationAttributes = getModelAttributeDefinitions(AutomationDefinition) + +# Create router for automation endpoints +router = APIRouter( + prefix="/api/automations", + tags=["Manage Automations"], + responses={ + 404: {"description": "Not found"}, + 400: {"description": "Bad request"}, + 401: {"description": "Unauthorized"}, + 403: {"description": "Forbidden"}, + 500: {"description": "Internal server error"} + } +) + +@router.get("", response_model=PaginatedResponse[AutomationDefinition]) +@limiter.limit("30/minute") +async def get_automations( + request: Request, + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + currentUser = Depends(getCurrentUser) +) -> PaginatedResponse[AutomationDefinition]: + """ + Get automation definitions with optional pagination, sorting, and filtering. + + Query Parameters: + - pagination: JSON-encoded PaginationParams object, or None for no pagination + """ + try: + # Parse pagination parameter + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + paginationParams = PaginationParams(**paginationDict) if paginationDict else None + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException( + status_code=400, + detail=f"Invalid pagination parameter: {str(e)}" + ) + + chatInterface = getChatInterface(currentUser) + result = chatInterface.getAllAutomationDefinitions(pagination=paginationParams) + + # If pagination was requested, result is PaginatedResult + # If no pagination, result is List[Dict] + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + else: + return PaginatedResponse( + items=result, + pagination=None + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting automations: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error getting automations: {str(e)}" + ) + +@router.post("", response_model=AutomationDefinition) +@limiter.limit("10/minute") +async def create_automation( + request: Request, + automation: AutomationDefinition, + currentUser = Depends(getCurrentUser) +) -> AutomationDefinition: + """Create a new automation definition""" + try: + chatInterface = getChatInterface(currentUser) + automationData = automation.model_dump() + created = chatInterface.createAutomationDefinition(automationData) + return AutomationDefinition(**created) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating automation: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error creating automation: {str(e)}" + ) + +@router.get("/{automationId}", response_model=AutomationDefinition) +@limiter.limit("30/minute") +async def get_automation( + request: Request, + automationId: str = Path(..., description="Automation ID"), + currentUser = Depends(getCurrentUser) +) -> AutomationDefinition: + """Get a single automation definition by ID""" + try: + chatInterface = getChatInterface(currentUser) + automation = chatInterface.getAutomationDefinition(automationId) + if not automation: + raise HTTPException( + status_code=404, + detail=f"Automation {automationId} not found" + ) + + return AutomationDefinition(**automation) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting automation: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error getting automation: {str(e)}" + ) + +@router.put("/{automationId}", response_model=AutomationDefinition) +@limiter.limit("10/minute") +async def update_automation( + request: Request, + automationId: str = Path(..., description="Automation ID"), + automation: AutomationDefinition = Body(...), + currentUser = Depends(getCurrentUser) +) -> AutomationDefinition: + """Update an automation definition""" + try: + chatInterface = getChatInterface(currentUser) + automationData = automation.model_dump() + updated = chatInterface.updateAutomationDefinition(automationId, automationData) + return AutomationDefinition(**updated) + except HTTPException: + raise + except PermissionError as e: + raise HTTPException( + status_code=403, + detail=str(e) + ) + except Exception as e: + logger.error(f"Error updating automation: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error updating automation: {str(e)}" + ) + +@router.delete("/{automationId}") +@limiter.limit("10/minute") +async def delete_automation( + request: Request, + automationId: str = Path(..., description="Automation ID"), + currentUser = Depends(getCurrentUser) +) -> Response: + """Delete an automation definition""" + try: + chatInterface = getChatInterface(currentUser) + success = chatInterface.deleteAutomationDefinition(automationId) + if success: + return Response(status_code=204) + else: + raise HTTPException( + status_code=500, + detail="Failed to delete automation" + ) + except HTTPException: + raise + except PermissionError as e: + raise HTTPException( + status_code=403, + detail=str(e) + ) + except Exception as e: + logger.error(f"Error deleting automation: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error deleting automation: {str(e)}" + ) + +@router.post("/{automationId}/execute", response_model=ChatWorkflow) +@limiter.limit("5/minute") +async def execute_automation( + request: Request, + automationId: str = Path(..., description="Automation ID"), + currentUser = Depends(getCurrentUser) +) -> ChatWorkflow: + """Execute an automation immediately (test mode)""" + try: + chatInterface = getChatInterface(currentUser) + workflow = await chatInterface.executeAutomation(automationId) + return workflow + except HTTPException: + raise + except ValueError as e: + raise HTTPException( + status_code=404, + detail=str(e) + ) + except Exception as e: + logger.error(f"Error executing automation: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error executing automation: {str(e)}" + ) + +@router.get("/attributes", response_model=Dict[str, Any]) +async def get_automation_attributes( + request: Request +) -> Dict[str, Any]: + """Get attribute definitions for AutomationDefinition model""" + return {"attributes": automationAttributes} + diff --git a/modules/workflows/processing/modes/modeAutomation.py b/modules/workflows/processing/modes/modeAutomation.py new file mode 100644 index 00000000..f8415efa --- /dev/null +++ b/modules/workflows/processing/modes/modeAutomation.py @@ -0,0 +1,430 @@ +# modeAutomation.py +# Template mode implementation for workflows with predefined action plans + +import json +import logging +import uuid +from typing import List, Dict, Any, Optional +from modules.datamodels.datamodelChat import ( + TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus, + TaskPlan, ActionResult +) +from modules.datamodels.datamodelChat import ChatWorkflow +from modules.workflows.processing.modes.modeBase import BaseMode + +logger = logging.getLogger(__name__) + +class TemplateMode(BaseMode): + """Template mode implementation - executes workflows from predefined plans""" + + def __init__(self, services, workflow): + super().__init__(services, workflow) + # Store action lists for each task (mapped by task ID) + self.taskActionMap: Dict[str, List[Dict[str, Any]]] = {} + logger.info("TemplateMode initialized - will use predefined plan from workflow") + + async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan: + """ + Generate task plan from stored template plan (no AI planning needed). + The plan is stored in module-level cache by executeAutomation. + """ + try: + # Get plan from module-level cache (stored by executeAutomation) + templatePlan = None + if hasattr(self, '_templatePlanCache') and self._templatePlanCache: + templatePlan = self._templatePlanCache.get(workflow.id) + + # Try module-level cache + if not templatePlan: + try: + from modules.workflows.processing.modes import modeAutomation + if hasattr(modeAutomation, '_templatePlanCache'): + templatePlan = modeAutomation._templatePlanCache.get(workflow.id) + if templatePlan: + logger.info(f"Retrieved template plan from module cache for workflow {workflow.id}") + except Exception as e: + logger.warning(f"Could not access module cache: {str(e)}") + + if not templatePlan: + # Fallback: Extract from prompt (embedded as JSON comment) + try: + # Look for embedded plan in prompt (between and ) + startMarker = "" + endMarker = "" + startIdx = userInput.find(startMarker) + endIdx = userInput.find(endMarker) + + if startIdx >= 0 and endIdx > startIdx: + planJson = userInput[startIdx + len(startMarker):endIdx].strip() + templatePlan = json.loads(planJson) + logger.info("Extracted template plan from embedded JSON in prompt") + elif '{' in userInput and '"tasks"' in userInput: + # Try parsing entire userInput as JSON (fallback) + jsonStart = userInput.find('{') + jsonEnd = userInput.rfind('}') + 1 + if jsonStart >= 0 and jsonEnd > jsonStart: + templatePlan = json.loads(userInput[jsonStart:jsonEnd]) + logger.info("Parsed template plan from userInput JSON (fallback)") + else: + raise ValueError("No template plan found in cache or prompt") + except (json.JSONDecodeError, ValueError) as e: + logger.error(f"Could not parse template plan: {str(e)}") + raise ValueError(f"Template mode requires a predefined plan, but none was found: {str(e)}") + + logger.info(f"Using template plan with {len(templatePlan.get('tasks', []))} tasks") + + # Convert plan tasks to TaskStep objects + tasks = [] + for i, taskDict in enumerate(templatePlan.get('tasks', [])): + if not isinstance(taskDict, dict): + logger.warning(f"Skipping invalid task {i+1}: not a dictionary") + continue + + # Store actionList for this task (before converting to TaskStep) + taskId = taskDict.get('id', f"task_{i+1}") + actionList = taskDict.get('actionList', []) + if actionList: + self.taskActionMap[taskId] = actionList + logger.info(f"Stored {len(actionList)} actions for task {taskId}") + + # Remove actionList from taskDict (TaskStep doesn't have this field) + taskDictWithoutActions = {k: v for k, v in taskDict.items() if k != 'actionList'} + + try: + task = TaskStep(**taskDictWithoutActions) + tasks.append(task) + except Exception as e: + logger.warning(f"Skipping invalid task {i+1}: {str(e)}") + continue + + if not tasks: + raise ValueError("No valid tasks found in template plan") + + taskPlan = TaskPlan( + overview=templatePlan.get('overview', 'Automated workflow execution'), + tasks=tasks, + userMessage=templatePlan.get('userMessage', '') + ) + + logger.info(f"Generated task plan from template with {len(tasks)} tasks") + + # Clean up cache after retrieving plan (prevent memory leaks) + try: + from modules.workflows.processing.modes import modeAutomation + if hasattr(modeAutomation, '_templatePlanCache') and workflow.id in modeAutomation._templatePlanCache: + del modeAutomation._templatePlanCache[workflow.id] + logger.debug(f"Cleaned up template plan cache for workflow {workflow.id}") + except Exception as e: + logger.warning(f"Could not clean up template plan cache: {str(e)}") + + return taskPlan + + except Exception as e: + logger.error(f"Error generating task plan from template: {str(e)}") + raise + + async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow, + previousResults: List = None, enhancedContext: TaskContext = None) -> List[ActionItem]: + """ + Generate actions from predefined actionList in template plan. + No AI planning needed - actions are already defined in the plan. + """ + try: + taskId = taskStep.id + + # Get actionList from stored map + actionList = self.taskActionMap.get(taskId, []) + + if not actionList: + logger.warning(f"No actionList found for task {taskId} in template plan") + return [] + + logger.info(f"Generating {len(actionList)} actions for task {taskId} from template") + + # Convert actionList to ActionItem objects + actionItems = [] + for i, actionDict in enumerate(actionList): + if not isinstance(actionDict, dict): + logger.warning(f"Skipping invalid action {i+1} in task {taskId}: not a dictionary") + continue + + # Extract action fields + execMethod = actionDict.get('execMethod', '') + execAction = actionDict.get('execAction', '') + execParameters = actionDict.get('execParameters', {}) + execResultLabel = actionDict.get('execResultLabel', '') + + if not execMethod or not execAction: + logger.warning(f"Skipping invalid action {i+1} in task {taskId}: missing execMethod or execAction") + continue + + # Create ActionItem + actionItem = self._createActionItem({ + "id": f"action_{uuid.uuid4()}", + "execMethod": execMethod, + "execAction": execAction, + "execParameters": execParameters, + "execResultLabel": execResultLabel, + "expectedDocumentFormats": actionDict.get('expectedDocumentFormats', None), + "status": TaskStatus.PENDING, + "userMessage": actionDict.get('userMessage', None) + }) + + if actionItem: + actionItems.append(actionItem) + else: + logger.warning(f"Failed to create ActionItem for action {i+1} in task {taskId}") + + logger.info(f"Generated {len(actionItems)} action items for task {taskId}") + return actionItems + + except Exception as e: + logger.error(f"Error generating action items from template: {str(e)}") + return [] + + async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext, + taskIndex: int = None, totalTasks: int = None) -> TaskResult: + """ + Execute task using Template mode - executes predefined actions directly. + Similar to ActionplanMode but without AI planning or review phases. + """ + logger.info(f"=== STARTING TASK {taskIndex or '?'}: {taskStep.objective} ===") + + try: + # Check workflow status + self._checkWorkflowStopped(workflow) + + # Update workflow before executing task + if taskIndex is not None: + self._updateWorkflowBeforeExecutingTask(taskIndex) + self.services.workflow.setWorkflowContext(taskNumber=taskIndex) + + # Create task start message + await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks) + + # Generate actions from template plan + actions = await self.generateActionItems(taskStep, workflow, + previousResults=context.previousResults if context else None, + enhancedContext=context) + + if not actions: + logger.error(f"No actions found for task {taskIndex}, aborting") + return TaskResult( + taskId=taskStep.id, + status=TaskStatus.FAILED, + success=False, + feedback="No actions defined in template plan for this task", + error="No actions found in template plan" + ) + + # Update workflow after action planning + totalActions = len(actions) + self._updateWorkflowAfterActionPlanning(totalActions) + self._setWorkflowTotals(totalActions=totalActions) + + logger.info(f"Task {taskIndex} has {totalActions} actions to execute") + + # Execute all actions sequentially + actionResults = [] + for actionIdx, action in enumerate(actions): + # Check workflow status before each action + self._checkWorkflowStopped(workflow) + + # Update workflow before executing action + actionNumber = actionIdx + 1 + self._updateWorkflowBeforeExecutingAction(actionNumber) + + logger.info(f"Task {taskIndex} - Starting action {actionNumber}/{totalActions}: {action.execMethod}.{action.execAction}") + + # Create action start message + actionStartMessage = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"⚡ **Action {actionNumber}** (Method {action.execMethod}.{action.execAction})", + "status": "step", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": f"action_{actionNumber}_start", + "documents": [], + "actionProgress": "running", + "roundNumber": workflow.currentRound, + "taskNumber": taskIndex, + "actionNumber": actionNumber + } + + # Add user-friendly message if available + if action.userMessage: + actionStartMessage["message"] += f"\n\n💬 {action.userMessage}" + + self.services.workflow.storeMessageWithDocuments(workflow, actionStartMessage, []) + + # Execute action + result = await self.actionExecutor.executeSingleAction( + action, workflow, taskStep, taskIndex, actionNumber, totalActions + ) + actionResults.append(result) + + if result.success: + logger.info(f"Action {actionNumber} completed successfully") + else: + logger.warning(f"Action {actionNumber} failed: {result.error}") + + # Check if all actions succeeded + allSucceeded = all(r.success for r in actionResults) + failedCount = sum(1 for r in actionResults if not r.success) + + if allSucceeded: + logger.info(f"=== TASK {taskIndex or '?'} COMPLETED SUCCESSFULLY: {taskStep.objective} ===") + + # Create task completion message + await self.messageCreator.createTaskCompletionMessage( + taskStep, workflow, taskIndex, totalTasks, None + ) + + return TaskResult( + taskId=taskStep.id, + status=TaskStatus.COMPLETED, + success=True, + feedback=f"Task completed successfully with {totalActions} actions", + error=None + ) + else: + logger.error(f"=== TASK {taskIndex or '?'} FAILED: {taskStep.objective} ({failedCount}/{totalActions} actions failed) ===") + + errorMessages = [r.error for r in actionResults if r.error] + errorSummary = "; ".join(errorMessages[:3]) # Limit to first 3 errors + + await self.messageCreator.createErrorMessage( + taskStep, workflow, taskIndex, errorSummary + ) + + return TaskResult( + taskId=taskStep.id, + status=TaskStatus.FAILED, + success=False, + feedback=f"Task failed: {failedCount} out of {totalActions} actions failed", + error=errorSummary + ) + + except Exception as e: + logger.error(f"Error executing task {taskIndex}: {str(e)}") + await self.messageCreator.createErrorMessage(taskStep, workflow, taskIndex, str(e)) + + return TaskResult( + taskId=taskStep.id, + status=TaskStatus.FAILED, + success=False, + feedback="Task execution failed", + error=str(e) + ) + + def _createActionItem(self, actionData: Dict[str, Any]) -> Optional[ActionItem]: + """Create ActionItem from action data""" + try: + import uuid + from datetime import datetime, timezone + + # Ensure ID is present + if "id" not in actionData or not actionData["id"]: + actionData["id"] = f"action_{uuid.uuid4()}" + + # Ensure required fields + if "status" not in actionData: + actionData["status"] = TaskStatus.PENDING + + if "execMethod" not in actionData: + logger.error("execMethod is required for task action") + return None + + if "execAction" not in actionData: + logger.error("execAction is required for task action") + return None + + if "execParameters" not in actionData: + actionData["execParameters"] = {} + + # Use generic field separation based on ActionItem model + simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData) + + # Create action in database + createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields) + + # Convert to ActionItem model + return ActionItem( + id=createdAction["id"], + execMethod=createdAction["execMethod"], + execAction=createdAction["execAction"], + execParameters=createdAction.get("execParameters", {}), + execResultLabel=createdAction.get("execResultLabel"), + expectedDocumentFormats=createdAction.get("expectedDocumentFormats"), + status=createdAction.get("status", TaskStatus.PENDING), + error=createdAction.get("error"), + retryCount=createdAction.get("retryCount", 0), + retryMax=createdAction.get("retryMax", 3), + processingTime=createdAction.get("processingTime"), + timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())), + result=createdAction.get("result"), + userMessage=createdAction.get("userMessage") + ) + + except Exception as e: + logger.error(f"Error creating task action: {str(e)}") + return None + + def _updateWorkflowBeforeExecutingTask(self, taskNumber: int): + """Update workflow object before executing a task""" + try: + updateData = { + "currentTask": taskNumber, + "currentAction": 0, + "totalActions": 0 + } + + self.workflow.currentTask = taskNumber + self.workflow.currentAction = 0 + self.workflow.totalActions = 0 + + self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) + logger.info(f"Updated workflow {self.workflow.id} before executing task {taskNumber}") + except Exception as e: + logger.error(f"Error updating workflow before executing task: {str(e)}") + + def _updateWorkflowAfterActionPlanning(self, totalActions: int): + """Update workflow object after action planning""" + try: + updateData = {"totalActions": totalActions} + self.workflow.totalActions = totalActions + self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) + logger.info(f"Updated workflow {self.workflow.id} after action planning: {updateData}") + except Exception as e: + logger.error(f"Error updating workflow after action planning: {str(e)}") + + def _updateWorkflowBeforeExecutingAction(self, actionNumber: int): + """Update workflow object before executing an action""" + try: + updateData = {"currentAction": actionNumber} + self.workflow.currentAction = actionNumber + self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) + logger.info(f"Updated workflow {self.workflow.id} before executing action {actionNumber}") + except Exception as e: + logger.error(f"Error updating workflow before executing action: {str(e)}") + + def _setWorkflowTotals(self, totalTasks: int = None, totalActions: int = None): + """Set total counts for workflow progress tracking""" + try: + updateData = {} + + if totalTasks is not None: + self.workflow.totalTasks = totalTasks + updateData["totalTasks"] = totalTasks + + if totalActions is not None: + self.workflow.totalActions = totalActions + updateData["totalActions"] = totalActions + + if updateData: + self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) + logger.info(f"Updated workflow {self.workflow.id} totals: {updateData}") + except Exception as e: + logger.error(f"Error setting workflow totals: {str(e)}") + diff --git a/modules/workflows/processing/modes/modeTemplate.py b/modules/workflows/processing/modes/modeTemplate.py deleted file mode 100644 index 6b476a2d..00000000 --- a/modules/workflows/processing/modes/modeTemplate.py +++ /dev/null @@ -1,41 +0,0 @@ -# modeTemplate.py -# Template mode implementation for workflows (placeholder for future use) - -import logging -from typing import List, Dict, Any -from modules.datamodels.datamodelChat import ( - TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus -) -from modules.datamodels.datamodelChat import ChatWorkflow -from modules.workflows.processing.modes.modeBase import BaseMode - -logger = logging.getLogger(__name__) - -class TemplateMode(BaseMode): - """Template mode implementation - placeholder for future template-based workflow execution""" - - def __init__(self, services, workflow): - super().__init__(services, workflow) - logger.warning("TemplateMode is not yet implemented - using placeholder implementation") - - async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow, - previousResults: List = None, enhancedContext: TaskContext = None) -> List[ActionItem]: - """Generate actions for a given task step using template-based approach""" - logger.warning("TemplateMode.generateActionItems not yet implemented") - # TODO: Implement template-based action generation - return [] - - async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext, - taskIndex: int = None, totalTasks: int = None) -> TaskResult: - """Execute task using Template mode - placeholder implementation""" - logger.warning("TemplateMode.executeTask not yet implemented - returning placeholder result") - - # Return placeholder task result - return TaskResult( - taskId=taskStep.id, - status=TaskStatus.COMPLETED, - success=False, - feedback="Template mode is not yet implemented", - error="Template mode is not yet implemented" - ) - diff --git a/modules/workflows/processing/shared/automationTemplateInitial.json b/modules/workflows/processing/shared/automationTemplateInitial.json new file mode 100644 index 00000000..7a7fec8b --- /dev/null +++ b/modules/workflows/processing/shared/automationTemplateInitial.json @@ -0,0 +1,80 @@ +{ + "overview": "Automated workflow: Web research, SharePoint data extraction, and document generation", + "userMessage": "Execute automated workflow: research web, extract SharePoint data, and generate document", + "tasks": [ + { + "id": "task_1", + "objective": "Perform web research using provided URL and prompt to gather information", + "dependencies": [], + "successCriteria": [ + "Web research completed successfully", + "Research results saved as web_research_response" + ], + "estimatedComplexity": "medium", + "userMessage": "Researching web for information", + "actionList": [ + { + "execMethod": "ai", + "execAction": "webResearch", + "execParameters": { + "prompt": "{{KEY:webResearchPrompt}}", + "list(url)": ["{{KEY:webResearchUrl}}"] + }, + "execResultLabel": "web_research_response" + } + ] + }, + { + "id": "task_2", + "objective": "Extract data from files in SharePoint directory using provided folder path and prompt", + "dependencies": ["task_1"], + "successCriteria": [ + "SharePoint files read successfully", + "Data extracted and saved as sharepoint_data" + ], + "estimatedComplexity": "medium", + "userMessage": "Extracting data from SharePoint files", + "actionList": [ + { + "execMethod": "sharepoint", + "execAction": "readDocuments", + "execParameters": { + "connectionReference": "{{KEY:connectionName}}", + "pathQuery": "{{KEY:sharepointFolderNameSource}}", + "documentList": [], + "includeMetadata": true + }, + "execResultLabel": "sharepoint_data" + } + ] + }, + { + "id": "task_3", + "objective": "Generate document using web research results and SharePoint data with provided prompt", + "dependencies": ["task_1", "task_2"], + "successCriteria": [ + "Document generated successfully", + "Document combines web research and SharePoint data", + "Document saved as result_data" + ], + "estimatedComplexity": "high", + "userMessage": "Generating final document", + "actionList": [ + { + "execMethod": "ai", + "execAction": "process", + "execParameters": { + "prompt": "{{KEY:documentPrompt}}", + "documentList": [ + "web_research_response", + "sharepoint_data" + ], + "outputExtension": ".docx" + }, + "execResultLabel": "result_data" + } + ] + } + ] +} + diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index 5884c5bc..e91f6afc 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -8,7 +8,7 @@ from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum from modules.workflows.processing.modes.modeBase import BaseMode from modules.workflows.processing.modes.modeActionplan import ActionplanMode from modules.workflows.processing.modes.modeDynamic import DynamicMode -from modules.workflows.processing.modes.modeTemplate import TemplateMode +from modules.workflows.processing.modes.modeAutomation import AutomationMode logger = logging.getLogger(__name__) @@ -30,8 +30,8 @@ class WorkflowProcessor: return DynamicMode(self.services, self.workflow) elif workflowMode == WorkflowModeEnum.WORKFLOW_ACTIONPLAN: return ActionplanMode(self.services, self.workflow) - elif workflowMode == WorkflowModeEnum.WORKFLOW_TEMPLATE: - return TemplateMode(self.services, self.workflow) + elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION: + return AutomationMode(self.services, self.workflow) else: raise ValueError(f"Invalid workflow mode: {workflowMode}") def _checkWorkflowStopped(self, workflow):