From 837748dac9b57e6dd461905f1f4d0583402fa876 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 3 Nov 2025 00:14:02 +0100
Subject: [PATCH] implemented workflow automation
---
app.py | 6 +
modules/datamodels/datamodelChat.py | 86 +++-
modules/features/featuresLifecycle.py | 12 +
modules/interfaces/interfaceDbChatAccess.py | 14 +-
modules/interfaces/interfaceDbChatObjects.py | 360 ++++++++++++++-
modules/routes/routeAdminAutomationEvents.py | 150 ++++++
modules/routes/routeDataAutomation.py | 236 ++++++++++
.../processing/modes/modeAutomation.py | 430 ++++++++++++++++++
.../processing/modes/modeTemplate.py | 41 --
.../shared/automationTemplateInitial.json | 80 ++++
.../workflows/processing/workflowProcessor.py | 6 +-
11 files changed, 1370 insertions(+), 51 deletions(-)
create mode 100644 modules/routes/routeAdminAutomationEvents.py
create mode 100644 modules/routes/routeDataAutomation.py
create mode 100644 modules/workflows/processing/modes/modeAutomation.py
delete mode 100644 modules/workflows/processing/modes/modeTemplate.py
create mode 100644 modules/workflows/processing/shared/automationTemplateInitial.json
diff --git a/app.py b/app.py
index d4b469ff..a167503c 100644
--- a/app.py
+++ b/app.py
@@ -424,3 +424,9 @@ app.include_router(adminSecurityRouter)
from modules.routes.routeSharepoint import router as sharepointRouter
app.include_router(sharepointRouter)
+from modules.routes.routeDataAutomation import router as automationRouter
+app.include_router(automationRouter)
+
+from modules.routes.routeAdminAutomationEvents import router as adminAutomationEventsRouter
+app.include_router(adminAutomationEventsRouter)
+
diff --git a/modules/datamodels/datamodelChat.py b/modules/datamodels/datamodelChat.py
index cdeebfed..437eacb1 100644
--- a/modules/datamodels/datamodelChat.py
+++ b/modules/datamodels/datamodelChat.py
@@ -266,7 +266,7 @@ registerModelLabels(
class WorkflowModeEnum(str, Enum):
WORKFLOW_ACTIONPLAN = "Actionplan"
WORKFLOW_DYNAMIC = "Dynamic"
- WORKFLOW_TEMPLATE = "Template"
+ WORKFLOW_AUTOMATION = "Automation"
registerModelLabels(
@@ -275,7 +275,7 @@ registerModelLabels(
{
"WORKFLOW_ACTIONPLAN": {"en": "Actionplan", "fr": "Actionplan"},
"WORKFLOW_DYNAMIC": {"en": "Dynamic", "fr": "Dynamique"},
- "WORKFLOW_TEMPLATE": {"en": "Template", "fr": "Modèle"},
+ "WORKFLOW_AUTOMATION": {"en": "Automation", "fr": "Automatisation"},
},
)
@@ -405,8 +405,8 @@ class ChatWorkflow(BaseModel):
"label": {"en": "Dynamic", "fr": "Dynamique"},
},
{
- "value": WorkflowModeEnum.WORKFLOW_TEMPLATE.value,
- "label": {"en": "Template", "fr": "Modèle"},
+ "value": WorkflowModeEnum.WORKFLOW_AUTOMATION.value,
+ "label": {"en": "Automation", "fr": "Automatisation"},
},
],
)
@@ -1010,3 +1010,81 @@ registerModelLabels(
"placeholders": {"en": "Placeholders", "fr": "Espaces réservés"},
},
)
+
+
+class AutomationDefinition(BaseModel):
+ id: str = Field(
+ default_factory=lambda: str(uuid.uuid4()),
+ description="Primary key",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=False
+ )
+ mandateId: str = Field(
+ description="Mandate ID",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=False
+ )
+ label: str = Field(
+ description="User-friendly name",
+ frontend_type="text",
+ frontend_required=True
+ )
+ schedule: str = Field(
+ description="Cron schedule pattern",
+ frontend_type="select",
+ frontend_options=[
+ {"value": "0 */4 * * *", "label": {"en": "Every 4 hours", "fr": "Toutes les 4 heures"}},
+ {"value": "0 22 * * *", "label": {"en": "Daily at 22:00", "fr": "Quotidien à 22:00"}},
+ {"value": "0 10 * * 1", "label": {"en": "Weekly Monday 10:00", "fr": "Hebdomadaire lundi 10:00"}}
+ ],
+ frontend_required=True
+ )
+ template: str = Field(
+ description="JSON template with placeholders (format: {{KEY:PLACEHOLDER_NAME}})",
+ frontend_type="textarea",
+ frontend_required=True
+ )
+ placeholders: Dict[str, str] = Field(
+ default_factory=dict,
+ description="Dictionary of placeholder key/value pairs (e.g., {'connectionName': 'MyConnection', 'sharepointFolderNameSource': '/folder/path', 'webResearchUrl': 'https://...', 'webResearchPrompt': '...', 'documentPrompt': '...'})",
+ frontend_type="text"
+ )
+ active: bool = Field(
+ default=False,
+ description="Whether automation should be launched in event handler",
+ frontend_type="checkbox",
+ frontend_required=False
+ )
+ eventId: Optional[str] = Field(
+ None,
+ description="Event ID from event management (None if not registered)",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=False
+ )
+ status: Optional[str] = Field(
+ None,
+ description="Status: 'active' if event is registered, 'inactive' if not (computed, readonly)",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=False
+ )
+
+
+registerModelLabels(
+ "AutomationDefinition",
+ {"en": "Automation Definition", "fr": "Définition d'automatisation"},
+ {
+ "id": {"en": "ID", "fr": "ID"},
+ "mandateId": {"en": "Mandate ID", "fr": "ID du mandat"},
+ "label": {"en": "Label", "fr": "Libellé"},
+ "schedule": {"en": "Schedule", "fr": "Planification"},
+ "template": {"en": "Template", "fr": "Modèle"},
+ "placeholders": {"en": "Placeholders", "fr": "Espaces réservés"},
+ "active": {"en": "Active", "fr": "Actif"},
+ "eventId": {"en": "Event ID", "fr": "ID de l'événement"},
+ "status": {"en": "Status", "fr": "Statut"},
+ },
+)
diff --git a/modules/features/featuresLifecycle.py b/modules/features/featuresLifecycle.py
index 85650c14..1f09a592 100644
--- a/modules/features/featuresLifecycle.py
+++ b/modules/features/featuresLifecycle.py
@@ -13,6 +13,18 @@ async def start() -> None:
from modules.features.syncDelta import mainSyncDelta
mainSyncDelta.startSyncManager(eventUser)
+ # Feature Automation Events
+ if eventUser:
+ try:
+ from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
+ chatInterface = getChatInterface(eventUser)
+ if hasattr(chatInterface, 'syncAutomationEvents'):
+ await chatInterface.syncAutomationEvents()
+ logger.info("Automation events synced on startup")
+ except Exception as e:
+ logger.error(f"Error syncing automation events on startup: {str(e)}")
+ # Don't fail startup if automation sync fails
+
# Feature ...
return True
diff --git a/modules/interfaces/interfaceDbChatAccess.py b/modules/interfaces/interfaceDbChatAccess.py
index 3facab4d..4662484a 100644
--- a/modules/interfaces/interfaceDbChatAccess.py
+++ b/modules/interfaces/interfaceDbChatAccess.py
@@ -5,7 +5,7 @@ Handles user access management and permission checks.
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelUam import User, UserPrivilege
-from modules.datamodels.datamodelChat import ChatWorkflow
+from modules.datamodels.datamodelChat import ChatWorkflow, AutomationDefinition
class ChatAccess:
"""
@@ -41,7 +41,13 @@ class ChatAccess:
filtered_records = []
# Apply filtering based on privilege
- if userPrivilege == UserPrivilege.SYSADMIN:
+ if table_name == "AutomationDefinition":
+ # Users see only their own automation definitions
+ filtered_records = [
+ r for r in recordset
+ if r.get("mandateId","-") == self.mandateId and r.get("_createdBy") == self.userId
+ ]
+ elif userPrivilege == UserPrivilege.SYSADMIN:
filtered_records = recordset # System admins see all records
elif userPrivilege == UserPrivilege.ADMIN:
# Admins see records in their mandate
@@ -68,6 +74,10 @@ class ChatAccess:
record["_hideView"] = False # Everyone can view
record["_hideEdit"] = not self.canModify(ChatWorkflow, record.get("workflowId"))
record["_hideDelete"] = not self.canModify(ChatWorkflow, record.get("workflowId"))
+ elif table_name == "AutomationDefinition":
+ record["_hideView"] = False # Everyone can view
+ record["_hideEdit"] = not self.canModify(AutomationDefinition, record_id)
+ record["_hideDelete"] = not self.canModify(AutomationDefinition, record_id)
else:
# Default access control for other tables
record["_hideView"] = False
diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py
index 66927037..e6cb9aa0 100644
--- a/modules/interfaces/interfaceDbChatObjects.py
+++ b/modules/interfaces/interfaceDbChatObjects.py
@@ -18,8 +18,11 @@ from modules.datamodels.datamodelChat import (
ChatLog,
ChatMessage,
ChatWorkflow,
- WorkflowModeEnum
+ WorkflowModeEnum,
+ AutomationDefinition,
+ UserInputRequest
)
+import json
from modules.datamodels.datamodelUam import User
# DYNAMIC PART: Connectors to the Interface
@@ -1207,6 +1210,361 @@ class ChatObjects:
return {"items": items}
+ # ===== Automation Methods =====
+
+ def _computeAutomationStatus(self, automation: Dict[str, Any]) -> str:
+ """Compute status field based on eventId presence"""
+ eventId = automation.get("eventId")
+ return "active" if eventId else "inactive"
+
+ def getAllAutomationDefinitions(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
+ """
+ Returns automation definitions based on user access level.
+ Supports optional pagination, sorting, and filtering.
+ Computes status field for each automation.
+ """
+ allAutomations = self.db.getRecordset(AutomationDefinition)
+ filteredAutomations = self._uam(AutomationDefinition, allAutomations)
+
+ # Compute status for each automation
+ for automation in filteredAutomations:
+ automation["status"] = self._computeAutomationStatus(automation)
+
+ # If no pagination requested, return all items
+ if pagination is None:
+ return filteredAutomations
+
+ # Apply filtering (if filters provided)
+ if pagination.filters:
+ filteredAutomations = self._applyFilters(filteredAutomations, pagination.filters)
+
+ # Apply sorting (in order of sortFields)
+ if pagination.sort:
+ filteredAutomations = self._applySorting(filteredAutomations, pagination.sort)
+
+ # Count total items after filters
+ totalItems = len(filteredAutomations)
+ totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
+
+ # Apply pagination (skip/limit)
+ startIdx = (pagination.page - 1) * pagination.pageSize
+ endIdx = startIdx + pagination.pageSize
+ pagedAutomations = filteredAutomations[startIdx:endIdx]
+
+ return PaginatedResult(
+ items=pagedAutomations,
+ totalItems=totalItems,
+ totalPages=totalPages
+ )
+
+ def getAutomationDefinition(self, automationId: str) -> Optional[Dict[str, Any]]:
+ """Returns an automation definition by ID if user has access, with computed status."""
+ try:
+ automations = self.db.getRecordset(AutomationDefinition, recordFilter={"id": automationId})
+ filtered = self._uam(AutomationDefinition, automations)
+
+ if not filtered:
+ return None
+
+ automation = filtered[0]
+ automation["status"] = self._computeAutomationStatus(automation)
+ return automation
+ except Exception as e:
+ logger.error(f"Error getting automation definition: {str(e)}")
+ return None
+
+ def createAutomationDefinition(self, automationData: Dict[str, Any]) -> Dict[str, Any]:
+ """Creates a new automation definition, then triggers sync."""
+ try:
+ # Ensure ID is present
+ if "id" not in automationData or not automationData["id"]:
+ automationData["id"] = str(uuid.uuid4())
+
+ # Ensure mandateId is set
+ if "mandateId" not in automationData:
+ automationData["mandateId"] = self.mandateId
+
+ # Use generic field separation
+ simpleFields, objectFields = self._separateObjectFields(AutomationDefinition, automationData)
+
+ # Create automation in database
+ createdAutomation = self.db.recordCreate(AutomationDefinition, simpleFields)
+
+ # Compute status
+ createdAutomation["status"] = self._computeAutomationStatus(createdAutomation)
+
+ # Trigger sync (async, don't wait)
+ asyncio.create_task(self.syncAutomationEvents())
+
+ return createdAutomation
+ except Exception as e:
+ logger.error(f"Error creating automation definition: {str(e)}")
+ raise
+
+ def updateAutomationDefinition(self, automationId: str, automationData: Dict[str, Any]) -> Dict[str, Any]:
+ """Updates an automation definition, then triggers sync."""
+ try:
+ # Check access
+ existing = self.getAutomationDefinition(automationId)
+ if not existing:
+ raise PermissionError(f"No access to automation {automationId}")
+
+ if not self._canModify(AutomationDefinition, automationId):
+ raise PermissionError(f"No permission to modify automation {automationId}")
+
+ # Use generic field separation
+ simpleFields, objectFields = self._separateObjectFields(AutomationDefinition, automationData)
+
+ # Update automation in database
+ updatedAutomation = self.db.recordModify(AutomationDefinition, automationId, simpleFields)
+
+ # Compute status
+ updatedAutomation["status"] = self._computeAutomationStatus(updatedAutomation)
+
+ # Trigger sync (async, don't wait)
+ asyncio.create_task(self.syncAutomationEvents())
+
+ return updatedAutomation
+ except Exception as e:
+ logger.error(f"Error updating automation definition: {str(e)}")
+ raise
+
+ def deleteAutomationDefinition(self, automationId: str) -> bool:
+ """Deletes an automation definition, then triggers sync."""
+ try:
+ # Check access
+ existing = self.getAutomationDefinition(automationId)
+ if not existing:
+ raise PermissionError(f"No access to automation {automationId}")
+
+ if not self._canModify(AutomationDefinition, automationId):
+ raise PermissionError(f"No permission to delete automation {automationId}")
+
+ # Remove event if exists
+ if existing.get("eventId"):
+ from modules.shared.eventManagement import eventManager
+ try:
+ eventManager.remove(existing["eventId"])
+ except Exception as e:
+ logger.warning(f"Error removing event {existing['eventId']}: {str(e)}")
+
+ # Delete automation from database
+ self.db.recordDelete(AutomationDefinition, automationId)
+
+ # Trigger sync (async, don't wait)
+ asyncio.create_task(self.syncAutomationEvents())
+
+ return True
+ except Exception as e:
+ logger.error(f"Error deleting automation definition: {str(e)}")
+ raise
+
+ def _replacePlaceholders(self, template: str, placeholders: Dict[str, str]) -> str:
+ """Replace placeholders in template with actual values. Placeholder format: {{KEY:PLACEHOLDER_NAME}}"""
+ result = template
+ for placeholderName, value in placeholders.items():
+ pattern = f"{{{{KEY:{placeholderName}}}}}"
+ result = result.replace(pattern, str(value))
+ return result
+
+ def _parseScheduleToCron(self, schedule: str) -> Dict[str, Any]:
+ """Parse schedule string to cron kwargs for APScheduler"""
+ parts = schedule.split()
+ if len(parts) != 5:
+ raise ValueError(f"Invalid schedule format: {schedule}")
+
+ return {
+ "minute": parts[0],
+ "hour": parts[1],
+ "day": parts[2],
+ "month": parts[3],
+ "day_of_week": parts[4]
+ }
+
+ async def executeAutomation(self, automationId: str) -> ChatWorkflow:
+ """Execute automation workflow immediately (test mode) with placeholder replacement"""
+ # 1. Load automation definition
+ automation = self.getAutomationDefinition(automationId)
+ if not automation:
+ raise ValueError(f"Automation {automationId} not found")
+
+ # 2. Replace placeholders in template to generate plan
+ template = automation.get("template", "")
+ placeholders = automation.get("placeholders", {})
+ planJson = self._replacePlaceholders(template, placeholders)
+ plan = json.loads(planJson)
+
+ # 3. Get user who created automation
+ creator_user_id = automation.get("_createdBy")
+ if not creator_user_id:
+ raise ValueError(f"Automation {automationId} has no creator user")
+
+ # Get user from database
+ from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface
+ appInterface = getAppInterface(self.currentUser)
+ creator_user = appInterface.getUser(creator_user_id)
+ if not creator_user:
+ raise ValueError(f"Creator user {creator_user_id} not found")
+
+ # 4. Create UserInputRequest from plan
+ # Embed plan JSON in prompt for TemplateMode to extract
+ promptText = self._planToPrompt(plan)
+ planJson = json.dumps(plan)
+ # Embed plan as JSON comment so TemplateMode can extract it
+ promptWithPlan = f"{promptText}\n\n\n{planJson}\n"
+
+ userInput = UserInputRequest(
+ prompt=promptWithPlan,
+ listFileId=[],
+ userLanguage=creator_user.language or "en"
+ )
+
+ # 5. Start workflow using chatStart
+ from modules.features.chatPlayground.mainChatPlayground import chatStart
+
+ workflow = await chatStart(
+ currentUser=creator_user,
+ userInput=userInput,
+ workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION,
+ workflowId=None
+ )
+
+ # Also store plan in module-level cache as backup (keyed by workflow ID)
+ from modules.workflows.processing.modes import modeAutomation
+ if not hasattr(modeAutomation, '_templatePlanCache'):
+ modeAutomation._templatePlanCache = {}
+ modeAutomation._templatePlanCache[workflow.id] = plan
+ logger.info(f"Stored template plan for workflow {workflow.id} (cache + prompt) with {len(plan.get('tasks', []))} tasks")
+
+ return workflow
+
+ def _planToPrompt(self, plan: Dict) -> str:
+ """Convert plan structure to prompt string for workflow execution"""
+ return plan.get("userMessage", plan.get("overview", "Execute automation workflow"))
+
+ async def syncAutomationEvents(self) -> Dict[str, Any]:
+ """Automation event handler - syncs scheduler with all active automations."""
+ from modules.shared.eventManagement import eventManager
+
+ # Get all automation definitions (for current mandate)
+ allAutomations = self.db.getRecordset(AutomationDefinition)
+ filtered = self._uam(AutomationDefinition, allAutomations)
+
+ registered_events = {}
+
+ for automation in filtered:
+ automation_id = automation.get("id")
+ is_active = automation.get("active", False)
+ current_event_id = automation.get("eventId")
+ schedule = automation.get("schedule")
+
+ if not schedule:
+ logger.warning(f"Automation {automation_id} has no schedule, skipping")
+ continue
+
+ try:
+ # Parse schedule to cron kwargs
+ cron_kwargs = self._parseScheduleToCron(schedule)
+
+ if is_active:
+ # Remove existing event if present (handles schedule changes)
+ if current_event_id:
+ try:
+ eventManager.remove(current_event_id)
+ except Exception as e:
+ logger.warning(f"Error removing old event {current_event_id}: {str(e)}")
+
+ # Register new event
+ new_event_id = f"automation.{automation_id}"
+
+ # Create event handler function
+ handler = self._createAutomationEventHandler(automation_id)
+
+ # Register cron job
+ eventManager.registerCron(
+ jobId=new_event_id,
+ func=handler,
+ cronKwargs=cron_kwargs,
+ replaceExisting=True
+ )
+
+ # Update automation with new eventId
+ if current_event_id != new_event_id:
+ self.db.recordModify(
+ AutomationDefinition,
+ automation_id,
+ {"eventId": new_event_id}
+ )
+
+ registered_events[automation_id] = new_event_id
+ else:
+ # Remove event if exists
+ if current_event_id:
+ try:
+ eventManager.remove(current_event_id)
+ self.db.recordModify(
+ AutomationDefinition,
+ automation_id,
+ {"eventId": None}
+ )
+ except Exception as e:
+ logger.warning(f"Error removing event {current_event_id}: {str(e)}")
+ except Exception as e:
+ logger.error(f"Error syncing automation {automation_id}: {str(e)}")
+
+ return {
+ "synced": len(registered_events),
+ "events": registered_events
+ }
+
+ def _createAutomationEventHandler(self, automationId: str):
+ """Create event handler function for a specific automation"""
+ async def handler():
+ try:
+ # Get event user to access automation (event user can access all automations)
+ from modules.interfaces.interfaceDbAppObjects import getRootInterface
+ from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface
+ from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
+
+ rootInterface = getRootInterface()
+ eventUser = rootInterface.getUserByUsername("event")
+
+ if not eventUser:
+ logger.error("Could not get event user for automation execution")
+ return
+
+ # Create ChatObjects interface for event user (to access automation)
+ eventInterface = getChatInterface(eventUser)
+
+ # Load automation using event user context
+ automation = eventInterface.getAutomationDefinition(automationId)
+ if not automation or not automation.get("active"):
+ logger.warning(f"Automation {automationId} not found or not active, skipping execution")
+ return
+
+ # Get creator user
+ creator_user_id = automation.get("_createdBy")
+ if not creator_user_id:
+ logger.error(f"Automation {automationId} has no creator user")
+ return
+
+ # Get creator user from database
+ appInterface = getAppInterface(eventUser)
+ creator_user = appInterface.getUser(creator_user_id)
+ if not creator_user:
+ logger.error(f"Creator user {creator_user_id} not found for automation {automationId}")
+ return
+
+ # Create ChatObjects interface for creator user
+ creatorInterface = getChatInterface(creator_user)
+
+ # Execute automation with creator user's context
+ await creatorInterface.executeAutomation(automationId)
+ logger.info(f"Successfully executed automation {automationId} as user {creator_user_id}")
+ except Exception as e:
+ logger.error(f"Error executing automation {automationId}: {str(e)}")
+
+ return handler
def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects':
diff --git a/modules/routes/routeAdminAutomationEvents.py b/modules/routes/routeAdminAutomationEvents.py
new file mode 100644
index 00000000..bb4a233f
--- /dev/null
+++ b/modules/routes/routeAdminAutomationEvents.py
@@ -0,0 +1,150 @@
+"""
+Admin automation events routes for the backend API.
+Sysadmin-only endpoints for viewing and controlling automation events.
+"""
+
+from fastapi import APIRouter, HTTPException, Depends, Path, Request, Response
+from typing import List, Dict, Any
+from fastapi import status
+import logging
+
+# Import interfaces and models
+import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
+from modules.security.auth import getCurrentUser, limiter
+from modules.datamodels.datamodelUam import User, UserPrivilege
+
+# Configure logger
+logger = logging.getLogger(__name__)
+
+# Create router for admin automation events endpoints
+router = APIRouter(
+ prefix="/api/admin/automation-events",
+ tags=["Admin Automation Events"],
+ responses={
+ 404: {"description": "Not found"},
+ 400: {"description": "Bad request"},
+ 401: {"description": "Unauthorized"},
+ 403: {"description": "Forbidden - Sysadmin only"},
+ 500: {"description": "Internal server error"}
+ }
+)
+
+def requireSysadmin(currentUser: User):
+ """Require sysadmin privilege"""
+ if currentUser.privilege != UserPrivilege.SYSADMIN:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Sysadmin privilege required"
+ )
+
+@router.get("")
+@limiter.limit("30/minute")
+async def get_all_automation_events(
+ request: Request,
+ currentUser: User = Depends(getCurrentUser)
+) -> List[Dict[str, Any]]:
+ """
+ Get all automation events across all mandates (sysadmin only).
+ Returns list of all registered events with their automation IDs and schedules.
+ """
+ requireSysadmin(currentUser)
+
+ try:
+ from modules.shared.eventManagement import eventManager
+
+ # Get all jobs from scheduler
+ jobs = []
+ if eventManager.scheduler:
+ for job in eventManager.scheduler.get_jobs():
+ if job.id.startswith("automation."):
+ automation_id = job.id.replace("automation.", "")
+ jobs.append({
+ "eventId": job.id,
+ "automationId": automation_id,
+ "nextRunTime": str(job.next_run_time) if job.next_run_time else None,
+ "trigger": str(job.trigger) if job.trigger else None
+ })
+
+ return jobs
+ except Exception as e:
+ logger.error(f"Error getting automation events: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error getting automation events: {str(e)}"
+ )
+
+@router.post("/sync")
+@limiter.limit("5/minute")
+async def sync_all_automation_events(
+ request: Request,
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ """
+ Manually trigger sync for all automations (sysadmin only).
+ This will register/remove events based on active flags.
+ """
+ requireSysadmin(currentUser)
+
+ try:
+ chatInterface = interfaceDbChatObjects.getInterface(currentUser)
+
+ if not hasattr(chatInterface, 'syncAutomationEvents'):
+ raise HTTPException(
+ status_code=501,
+ detail="Automation methods not available"
+ )
+
+ result = await chatInterface.syncAutomationEvents()
+ return {
+ "success": True,
+ "synced": result.get("synced", 0),
+ "events": result.get("events", {})
+ }
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error syncing automation events: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error syncing automation events: {str(e)}"
+ )
+
+@router.post("/{eventId}/remove")
+@limiter.limit("10/minute")
+async def remove_event(
+ request: Request,
+ eventId: str = Path(..., description="Event ID to remove"),
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ """
+ Manually remove a specific event from scheduler (sysadmin only).
+ Used for debugging and manual event cleanup.
+ """
+ requireSysadmin(currentUser)
+
+ try:
+ from modules.shared.eventManagement import eventManager
+
+ # Remove event
+ eventManager.remove(eventId)
+
+ # Update automation's eventId if it exists
+ if eventId.startswith("automation."):
+ automation_id = eventId.replace("automation.", "")
+ chatInterface = interfaceDbChatObjects.getInterface(currentUser)
+ automation = chatInterface.getAutomationDefinition(automation_id)
+ if automation and automation.get("eventId") == eventId:
+ chatInterface.updateAutomationDefinition(automation_id, {"eventId": None})
+
+ return {
+ "success": True,
+ "eventId": eventId,
+ "message": f"Event {eventId} removed successfully"
+ }
+ except Exception as e:
+ logger.error(f"Error removing event: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error removing event: {str(e)}"
+ )
+
diff --git a/modules/routes/routeDataAutomation.py b/modules/routes/routeDataAutomation.py
new file mode 100644
index 00000000..7e268c0b
--- /dev/null
+++ b/modules/routes/routeDataAutomation.py
@@ -0,0 +1,236 @@
+"""
+Automation routes for the backend API.
+Implements the endpoints for automation definition management.
+"""
+
+from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
+from typing import List, Dict, Any, Optional
+from fastapi import status
+import logging
+import json
+
+# Import interfaces and models
+from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
+from modules.security.auth import getCurrentUser, limiter
+from modules.datamodels.datamodelChat import AutomationDefinition, ChatWorkflow
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
+from modules.shared.attributeUtils import getModelAttributeDefinitions
+
+# Configure logger
+logger = logging.getLogger(__name__)
+
+# Model attributes for AutomationDefinition
+automationAttributes = getModelAttributeDefinitions(AutomationDefinition)
+
+# Create router for automation endpoints
+router = APIRouter(
+ prefix="/api/automations",
+ tags=["Manage Automations"],
+ responses={
+ 404: {"description": "Not found"},
+ 400: {"description": "Bad request"},
+ 401: {"description": "Unauthorized"},
+ 403: {"description": "Forbidden"},
+ 500: {"description": "Internal server error"}
+ }
+)
+
+@router.get("", response_model=PaginatedResponse[AutomationDefinition])
+@limiter.limit("30/minute")
+async def get_automations(
+ request: Request,
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
+ currentUser = Depends(getCurrentUser)
+) -> PaginatedResponse[AutomationDefinition]:
+ """
+ Get automation definitions with optional pagination, sorting, and filtering.
+
+ Query Parameters:
+ - pagination: JSON-encoded PaginationParams object, or None for no pagination
+ """
+ try:
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ paginationParams = PaginationParams(**paginationDict) if paginationDict else None
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
+ chatInterface = getChatInterface(currentUser)
+ result = chatInterface.getAllAutomationDefinitions(pagination=paginationParams)
+
+ # If pagination was requested, result is PaginatedResult
+ # If no pagination, result is List[Dict]
+ if paginationParams:
+ return PaginatedResponse(
+ items=result.items,
+ pagination=PaginationMetadata(
+ currentPage=paginationParams.page,
+ pageSize=paginationParams.pageSize,
+ totalItems=result.totalItems,
+ totalPages=result.totalPages,
+ sort=paginationParams.sort,
+ filters=paginationParams.filters
+ )
+ )
+ else:
+ return PaginatedResponse(
+ items=result,
+ pagination=None
+ )
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error getting automations: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error getting automations: {str(e)}"
+ )
+
+@router.post("", response_model=AutomationDefinition)
+@limiter.limit("10/minute")
+async def create_automation(
+ request: Request,
+ automation: AutomationDefinition,
+ currentUser = Depends(getCurrentUser)
+) -> AutomationDefinition:
+ """Create a new automation definition"""
+ try:
+ chatInterface = getChatInterface(currentUser)
+ automationData = automation.model_dump()
+ created = chatInterface.createAutomationDefinition(automationData)
+ return AutomationDefinition(**created)
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error creating automation: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error creating automation: {str(e)}"
+ )
+
+@router.get("/{automationId}", response_model=AutomationDefinition)
+@limiter.limit("30/minute")
+async def get_automation(
+ request: Request,
+ automationId: str = Path(..., description="Automation ID"),
+ currentUser = Depends(getCurrentUser)
+) -> AutomationDefinition:
+ """Get a single automation definition by ID"""
+ try:
+ chatInterface = getChatInterface(currentUser)
+ automation = chatInterface.getAutomationDefinition(automationId)
+ if not automation:
+ raise HTTPException(
+ status_code=404,
+ detail=f"Automation {automationId} not found"
+ )
+
+ return AutomationDefinition(**automation)
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error getting automation: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error getting automation: {str(e)}"
+ )
+
+@router.put("/{automationId}", response_model=AutomationDefinition)
+@limiter.limit("10/minute")
+async def update_automation(
+ request: Request,
+ automationId: str = Path(..., description="Automation ID"),
+ automation: AutomationDefinition = Body(...),
+ currentUser = Depends(getCurrentUser)
+) -> AutomationDefinition:
+ """Update an automation definition"""
+ try:
+ chatInterface = getChatInterface(currentUser)
+ automationData = automation.model_dump()
+ updated = chatInterface.updateAutomationDefinition(automationId, automationData)
+ return AutomationDefinition(**updated)
+ except HTTPException:
+ raise
+ except PermissionError as e:
+ raise HTTPException(
+ status_code=403,
+ detail=str(e)
+ )
+ except Exception as e:
+ logger.error(f"Error updating automation: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error updating automation: {str(e)}"
+ )
+
+@router.delete("/{automationId}")
+@limiter.limit("10/minute")
+async def delete_automation(
+ request: Request,
+ automationId: str = Path(..., description="Automation ID"),
+ currentUser = Depends(getCurrentUser)
+) -> Response:
+ """Delete an automation definition"""
+ try:
+ chatInterface = getChatInterface(currentUser)
+ success = chatInterface.deleteAutomationDefinition(automationId)
+ if success:
+ return Response(status_code=204)
+ else:
+ raise HTTPException(
+ status_code=500,
+ detail="Failed to delete automation"
+ )
+ except HTTPException:
+ raise
+ except PermissionError as e:
+ raise HTTPException(
+ status_code=403,
+ detail=str(e)
+ )
+ except Exception as e:
+ logger.error(f"Error deleting automation: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error deleting automation: {str(e)}"
+ )
+
+@router.post("/{automationId}/execute", response_model=ChatWorkflow)
+@limiter.limit("5/minute")
+async def execute_automation(
+ request: Request,
+ automationId: str = Path(..., description="Automation ID"),
+ currentUser = Depends(getCurrentUser)
+) -> ChatWorkflow:
+ """Execute an automation immediately (test mode)"""
+ try:
+ chatInterface = getChatInterface(currentUser)
+ workflow = await chatInterface.executeAutomation(automationId)
+ return workflow
+ except HTTPException:
+ raise
+ except ValueError as e:
+ raise HTTPException(
+ status_code=404,
+ detail=str(e)
+ )
+ except Exception as e:
+ logger.error(f"Error executing automation: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error executing automation: {str(e)}"
+ )
+
+@router.get("/attributes", response_model=Dict[str, Any])
+async def get_automation_attributes(
+ request: Request
+) -> Dict[str, Any]:
+ """Get attribute definitions for AutomationDefinition model"""
+ return {"attributes": automationAttributes}
+
diff --git a/modules/workflows/processing/modes/modeAutomation.py b/modules/workflows/processing/modes/modeAutomation.py
new file mode 100644
index 00000000..f8415efa
--- /dev/null
+++ b/modules/workflows/processing/modes/modeAutomation.py
@@ -0,0 +1,430 @@
+# modeAutomation.py
+# Template mode implementation for workflows with predefined action plans
+
+import json
+import logging
+import uuid
+from typing import List, Dict, Any, Optional
+from modules.datamodels.datamodelChat import (
+ TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus,
+ TaskPlan, ActionResult
+)
+from modules.datamodels.datamodelChat import ChatWorkflow
+from modules.workflows.processing.modes.modeBase import BaseMode
+
+logger = logging.getLogger(__name__)
+
+class TemplateMode(BaseMode):
+ """Template mode implementation - executes workflows from predefined plans"""
+
+ def __init__(self, services, workflow):
+ super().__init__(services, workflow)
+ # Store action lists for each task (mapped by task ID)
+ self.taskActionMap: Dict[str, List[Dict[str, Any]]] = {}
+ logger.info("TemplateMode initialized - will use predefined plan from workflow")
+
+ async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
+ """
+ Generate task plan from stored template plan (no AI planning needed).
+ The plan is stored in module-level cache by executeAutomation.
+ """
+ try:
+ # Get plan from module-level cache (stored by executeAutomation)
+ templatePlan = None
+ if hasattr(self, '_templatePlanCache') and self._templatePlanCache:
+ templatePlan = self._templatePlanCache.get(workflow.id)
+
+ # Try module-level cache
+ if not templatePlan:
+ try:
+ from modules.workflows.processing.modes import modeAutomation
+ if hasattr(modeAutomation, '_templatePlanCache'):
+ templatePlan = modeAutomation._templatePlanCache.get(workflow.id)
+ if templatePlan:
+ logger.info(f"Retrieved template plan from module cache for workflow {workflow.id}")
+ except Exception as e:
+ logger.warning(f"Could not access module cache: {str(e)}")
+
+ if not templatePlan:
+ # Fallback: Extract from prompt (embedded as JSON comment)
+ try:
+ # Look for embedded plan in prompt (between and )
+ startMarker = ""
+ endMarker = ""
+ startIdx = userInput.find(startMarker)
+ endIdx = userInput.find(endMarker)
+
+ if startIdx >= 0 and endIdx > startIdx:
+ planJson = userInput[startIdx + len(startMarker):endIdx].strip()
+ templatePlan = json.loads(planJson)
+ logger.info("Extracted template plan from embedded JSON in prompt")
+ elif '{' in userInput and '"tasks"' in userInput:
+ # Try parsing entire userInput as JSON (fallback)
+ jsonStart = userInput.find('{')
+ jsonEnd = userInput.rfind('}') + 1
+ if jsonStart >= 0 and jsonEnd > jsonStart:
+ templatePlan = json.loads(userInput[jsonStart:jsonEnd])
+ logger.info("Parsed template plan from userInput JSON (fallback)")
+ else:
+ raise ValueError("No template plan found in cache or prompt")
+ except (json.JSONDecodeError, ValueError) as e:
+ logger.error(f"Could not parse template plan: {str(e)}")
+ raise ValueError(f"Template mode requires a predefined plan, but none was found: {str(e)}")
+
+ logger.info(f"Using template plan with {len(templatePlan.get('tasks', []))} tasks")
+
+ # Convert plan tasks to TaskStep objects
+ tasks = []
+ for i, taskDict in enumerate(templatePlan.get('tasks', [])):
+ if not isinstance(taskDict, dict):
+ logger.warning(f"Skipping invalid task {i+1}: not a dictionary")
+ continue
+
+ # Store actionList for this task (before converting to TaskStep)
+ taskId = taskDict.get('id', f"task_{i+1}")
+ actionList = taskDict.get('actionList', [])
+ if actionList:
+ self.taskActionMap[taskId] = actionList
+ logger.info(f"Stored {len(actionList)} actions for task {taskId}")
+
+ # Remove actionList from taskDict (TaskStep doesn't have this field)
+ taskDictWithoutActions = {k: v for k, v in taskDict.items() if k != 'actionList'}
+
+ try:
+ task = TaskStep(**taskDictWithoutActions)
+ tasks.append(task)
+ except Exception as e:
+ logger.warning(f"Skipping invalid task {i+1}: {str(e)}")
+ continue
+
+ if not tasks:
+ raise ValueError("No valid tasks found in template plan")
+
+ taskPlan = TaskPlan(
+ overview=templatePlan.get('overview', 'Automated workflow execution'),
+ tasks=tasks,
+ userMessage=templatePlan.get('userMessage', '')
+ )
+
+ logger.info(f"Generated task plan from template with {len(tasks)} tasks")
+
+ # Clean up cache after retrieving plan (prevent memory leaks)
+ try:
+ from modules.workflows.processing.modes import modeAutomation
+ if hasattr(modeAutomation, '_templatePlanCache') and workflow.id in modeAutomation._templatePlanCache:
+ del modeAutomation._templatePlanCache[workflow.id]
+ logger.debug(f"Cleaned up template plan cache for workflow {workflow.id}")
+ except Exception as e:
+ logger.warning(f"Could not clean up template plan cache: {str(e)}")
+
+ return taskPlan
+
+ except Exception as e:
+ logger.error(f"Error generating task plan from template: {str(e)}")
+ raise
+
+ async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
+ previousResults: List = None, enhancedContext: TaskContext = None) -> List[ActionItem]:
+ """
+ Generate actions from predefined actionList in template plan.
+ No AI planning needed - actions are already defined in the plan.
+ """
+ try:
+ taskId = taskStep.id
+
+ # Get actionList from stored map
+ actionList = self.taskActionMap.get(taskId, [])
+
+ if not actionList:
+ logger.warning(f"No actionList found for task {taskId} in template plan")
+ return []
+
+ logger.info(f"Generating {len(actionList)} actions for task {taskId} from template")
+
+ # Convert actionList to ActionItem objects
+ actionItems = []
+ for i, actionDict in enumerate(actionList):
+ if not isinstance(actionDict, dict):
+ logger.warning(f"Skipping invalid action {i+1} in task {taskId}: not a dictionary")
+ continue
+
+ # Extract action fields
+ execMethod = actionDict.get('execMethod', '')
+ execAction = actionDict.get('execAction', '')
+ execParameters = actionDict.get('execParameters', {})
+ execResultLabel = actionDict.get('execResultLabel', '')
+
+ if not execMethod or not execAction:
+ logger.warning(f"Skipping invalid action {i+1} in task {taskId}: missing execMethod or execAction")
+ continue
+
+ # Create ActionItem
+ actionItem = self._createActionItem({
+ "id": f"action_{uuid.uuid4()}",
+ "execMethod": execMethod,
+ "execAction": execAction,
+ "execParameters": execParameters,
+ "execResultLabel": execResultLabel,
+ "expectedDocumentFormats": actionDict.get('expectedDocumentFormats', None),
+ "status": TaskStatus.PENDING,
+ "userMessage": actionDict.get('userMessage', None)
+ })
+
+ if actionItem:
+ actionItems.append(actionItem)
+ else:
+ logger.warning(f"Failed to create ActionItem for action {i+1} in task {taskId}")
+
+ logger.info(f"Generated {len(actionItems)} action items for task {taskId}")
+ return actionItems
+
+ except Exception as e:
+ logger.error(f"Error generating action items from template: {str(e)}")
+ return []
+
+ async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
+ taskIndex: int = None, totalTasks: int = None) -> TaskResult:
+ """
+ Execute task using Template mode - executes predefined actions directly.
+ Similar to ActionplanMode but without AI planning or review phases.
+ """
+ logger.info(f"=== STARTING TASK {taskIndex or '?'}: {taskStep.objective} ===")
+
+ try:
+ # Check workflow status
+ self._checkWorkflowStopped(workflow)
+
+ # Update workflow before executing task
+ if taskIndex is not None:
+ self._updateWorkflowBeforeExecutingTask(taskIndex)
+ self.services.workflow.setWorkflowContext(taskNumber=taskIndex)
+
+ # Create task start message
+ await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks)
+
+ # Generate actions from template plan
+ actions = await self.generateActionItems(taskStep, workflow,
+ previousResults=context.previousResults if context else None,
+ enhancedContext=context)
+
+ if not actions:
+ logger.error(f"No actions found for task {taskIndex}, aborting")
+ return TaskResult(
+ taskId=taskStep.id,
+ status=TaskStatus.FAILED,
+ success=False,
+ feedback="No actions defined in template plan for this task",
+ error="No actions found in template plan"
+ )
+
+ # Update workflow after action planning
+ totalActions = len(actions)
+ self._updateWorkflowAfterActionPlanning(totalActions)
+ self._setWorkflowTotals(totalActions=totalActions)
+
+ logger.info(f"Task {taskIndex} has {totalActions} actions to execute")
+
+ # Execute all actions sequentially
+ actionResults = []
+ for actionIdx, action in enumerate(actions):
+ # Check workflow status before each action
+ self._checkWorkflowStopped(workflow)
+
+ # Update workflow before executing action
+ actionNumber = actionIdx + 1
+ self._updateWorkflowBeforeExecutingAction(actionNumber)
+
+ logger.info(f"Task {taskIndex} - Starting action {actionNumber}/{totalActions}: {action.execMethod}.{action.execAction}")
+
+ # Create action start message
+ actionStartMessage = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": f"⚡ **Action {actionNumber}** (Method {action.execMethod}.{action.execAction})",
+ "status": "step",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": f"action_{actionNumber}_start",
+ "documents": [],
+ "actionProgress": "running",
+ "roundNumber": workflow.currentRound,
+ "taskNumber": taskIndex,
+ "actionNumber": actionNumber
+ }
+
+ # Add user-friendly message if available
+ if action.userMessage:
+ actionStartMessage["message"] += f"\n\n💬 {action.userMessage}"
+
+ self.services.workflow.storeMessageWithDocuments(workflow, actionStartMessage, [])
+
+ # Execute action
+ result = await self.actionExecutor.executeSingleAction(
+ action, workflow, taskStep, taskIndex, actionNumber, totalActions
+ )
+ actionResults.append(result)
+
+ if result.success:
+ logger.info(f"Action {actionNumber} completed successfully")
+ else:
+ logger.warning(f"Action {actionNumber} failed: {result.error}")
+
+ # Check if all actions succeeded
+ allSucceeded = all(r.success for r in actionResults)
+ failedCount = sum(1 for r in actionResults if not r.success)
+
+ if allSucceeded:
+ logger.info(f"=== TASK {taskIndex or '?'} COMPLETED SUCCESSFULLY: {taskStep.objective} ===")
+
+ # Create task completion message
+ await self.messageCreator.createTaskCompletionMessage(
+ taskStep, workflow, taskIndex, totalTasks, None
+ )
+
+ return TaskResult(
+ taskId=taskStep.id,
+ status=TaskStatus.COMPLETED,
+ success=True,
+ feedback=f"Task completed successfully with {totalActions} actions",
+ error=None
+ )
+ else:
+ logger.error(f"=== TASK {taskIndex or '?'} FAILED: {taskStep.objective} ({failedCount}/{totalActions} actions failed) ===")
+
+ errorMessages = [r.error for r in actionResults if r.error]
+ errorSummary = "; ".join(errorMessages[:3]) # Limit to first 3 errors
+
+ await self.messageCreator.createErrorMessage(
+ taskStep, workflow, taskIndex, errorSummary
+ )
+
+ return TaskResult(
+ taskId=taskStep.id,
+ status=TaskStatus.FAILED,
+ success=False,
+ feedback=f"Task failed: {failedCount} out of {totalActions} actions failed",
+ error=errorSummary
+ )
+
+ except Exception as e:
+ logger.error(f"Error executing task {taskIndex}: {str(e)}")
+ await self.messageCreator.createErrorMessage(taskStep, workflow, taskIndex, str(e))
+
+ return TaskResult(
+ taskId=taskStep.id,
+ status=TaskStatus.FAILED,
+ success=False,
+ feedback="Task execution failed",
+ error=str(e)
+ )
+
+ def _createActionItem(self, actionData: Dict[str, Any]) -> Optional[ActionItem]:
+ """Create ActionItem from action data"""
+ try:
+ import uuid
+ from datetime import datetime, timezone
+
+ # Ensure ID is present
+ if "id" not in actionData or not actionData["id"]:
+ actionData["id"] = f"action_{uuid.uuid4()}"
+
+ # Ensure required fields
+ if "status" not in actionData:
+ actionData["status"] = TaskStatus.PENDING
+
+ if "execMethod" not in actionData:
+ logger.error("execMethod is required for task action")
+ return None
+
+ if "execAction" not in actionData:
+ logger.error("execAction is required for task action")
+ return None
+
+ if "execParameters" not in actionData:
+ actionData["execParameters"] = {}
+
+ # Use generic field separation based on ActionItem model
+ simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData)
+
+ # Create action in database
+ createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields)
+
+ # Convert to ActionItem model
+ return ActionItem(
+ id=createdAction["id"],
+ execMethod=createdAction["execMethod"],
+ execAction=createdAction["execAction"],
+ execParameters=createdAction.get("execParameters", {}),
+ execResultLabel=createdAction.get("execResultLabel"),
+ expectedDocumentFormats=createdAction.get("expectedDocumentFormats"),
+ status=createdAction.get("status", TaskStatus.PENDING),
+ error=createdAction.get("error"),
+ retryCount=createdAction.get("retryCount", 0),
+ retryMax=createdAction.get("retryMax", 3),
+ processingTime=createdAction.get("processingTime"),
+ timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())),
+ result=createdAction.get("result"),
+ userMessage=createdAction.get("userMessage")
+ )
+
+ except Exception as e:
+ logger.error(f"Error creating task action: {str(e)}")
+ return None
+
+ def _updateWorkflowBeforeExecutingTask(self, taskNumber: int):
+ """Update workflow object before executing a task"""
+ try:
+ updateData = {
+ "currentTask": taskNumber,
+ "currentAction": 0,
+ "totalActions": 0
+ }
+
+ self.workflow.currentTask = taskNumber
+ self.workflow.currentAction = 0
+ self.workflow.totalActions = 0
+
+ self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
+ logger.info(f"Updated workflow {self.workflow.id} before executing task {taskNumber}")
+ except Exception as e:
+ logger.error(f"Error updating workflow before executing task: {str(e)}")
+
+ def _updateWorkflowAfterActionPlanning(self, totalActions: int):
+ """Update workflow object after action planning"""
+ try:
+ updateData = {"totalActions": totalActions}
+ self.workflow.totalActions = totalActions
+ self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
+ logger.info(f"Updated workflow {self.workflow.id} after action planning: {updateData}")
+ except Exception as e:
+ logger.error(f"Error updating workflow after action planning: {str(e)}")
+
+ def _updateWorkflowBeforeExecutingAction(self, actionNumber: int):
+ """Update workflow object before executing an action"""
+ try:
+ updateData = {"currentAction": actionNumber}
+ self.workflow.currentAction = actionNumber
+ self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
+ logger.info(f"Updated workflow {self.workflow.id} before executing action {actionNumber}")
+ except Exception as e:
+ logger.error(f"Error updating workflow before executing action: {str(e)}")
+
+ def _setWorkflowTotals(self, totalTasks: int = None, totalActions: int = None):
+ """Set total counts for workflow progress tracking"""
+ try:
+ updateData = {}
+
+ if totalTasks is not None:
+ self.workflow.totalTasks = totalTasks
+ updateData["totalTasks"] = totalTasks
+
+ if totalActions is not None:
+ self.workflow.totalActions = totalActions
+ updateData["totalActions"] = totalActions
+
+ if updateData:
+ self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
+ logger.info(f"Updated workflow {self.workflow.id} totals: {updateData}")
+ except Exception as e:
+ logger.error(f"Error setting workflow totals: {str(e)}")
+
diff --git a/modules/workflows/processing/modes/modeTemplate.py b/modules/workflows/processing/modes/modeTemplate.py
deleted file mode 100644
index 6b476a2d..00000000
--- a/modules/workflows/processing/modes/modeTemplate.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# modeTemplate.py
-# Template mode implementation for workflows (placeholder for future use)
-
-import logging
-from typing import List, Dict, Any
-from modules.datamodels.datamodelChat import (
- TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus
-)
-from modules.datamodels.datamodelChat import ChatWorkflow
-from modules.workflows.processing.modes.modeBase import BaseMode
-
-logger = logging.getLogger(__name__)
-
-class TemplateMode(BaseMode):
- """Template mode implementation - placeholder for future template-based workflow execution"""
-
- def __init__(self, services, workflow):
- super().__init__(services, workflow)
- logger.warning("TemplateMode is not yet implemented - using placeholder implementation")
-
- async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
- previousResults: List = None, enhancedContext: TaskContext = None) -> List[ActionItem]:
- """Generate actions for a given task step using template-based approach"""
- logger.warning("TemplateMode.generateActionItems not yet implemented")
- # TODO: Implement template-based action generation
- return []
-
- async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
- taskIndex: int = None, totalTasks: int = None) -> TaskResult:
- """Execute task using Template mode - placeholder implementation"""
- logger.warning("TemplateMode.executeTask not yet implemented - returning placeholder result")
-
- # Return placeholder task result
- return TaskResult(
- taskId=taskStep.id,
- status=TaskStatus.COMPLETED,
- success=False,
- feedback="Template mode is not yet implemented",
- error="Template mode is not yet implemented"
- )
-
diff --git a/modules/workflows/processing/shared/automationTemplateInitial.json b/modules/workflows/processing/shared/automationTemplateInitial.json
new file mode 100644
index 00000000..7a7fec8b
--- /dev/null
+++ b/modules/workflows/processing/shared/automationTemplateInitial.json
@@ -0,0 +1,80 @@
+{
+ "overview": "Automated workflow: Web research, SharePoint data extraction, and document generation",
+ "userMessage": "Execute automated workflow: research web, extract SharePoint data, and generate document",
+ "tasks": [
+ {
+ "id": "task_1",
+ "objective": "Perform web research using provided URL and prompt to gather information",
+ "dependencies": [],
+ "successCriteria": [
+ "Web research completed successfully",
+ "Research results saved as web_research_response"
+ ],
+ "estimatedComplexity": "medium",
+ "userMessage": "Researching web for information",
+ "actionList": [
+ {
+ "execMethod": "ai",
+ "execAction": "webResearch",
+ "execParameters": {
+ "prompt": "{{KEY:webResearchPrompt}}",
+ "list(url)": ["{{KEY:webResearchUrl}}"]
+ },
+ "execResultLabel": "web_research_response"
+ }
+ ]
+ },
+ {
+ "id": "task_2",
+ "objective": "Extract data from files in SharePoint directory using provided folder path and prompt",
+ "dependencies": ["task_1"],
+ "successCriteria": [
+ "SharePoint files read successfully",
+ "Data extracted and saved as sharepoint_data"
+ ],
+ "estimatedComplexity": "medium",
+ "userMessage": "Extracting data from SharePoint files",
+ "actionList": [
+ {
+ "execMethod": "sharepoint",
+ "execAction": "readDocuments",
+ "execParameters": {
+ "connectionReference": "{{KEY:connectionName}}",
+ "pathQuery": "{{KEY:sharepointFolderNameSource}}",
+ "documentList": [],
+ "includeMetadata": true
+ },
+ "execResultLabel": "sharepoint_data"
+ }
+ ]
+ },
+ {
+ "id": "task_3",
+ "objective": "Generate document using web research results and SharePoint data with provided prompt",
+ "dependencies": ["task_1", "task_2"],
+ "successCriteria": [
+ "Document generated successfully",
+ "Document combines web research and SharePoint data",
+ "Document saved as result_data"
+ ],
+ "estimatedComplexity": "high",
+ "userMessage": "Generating final document",
+ "actionList": [
+ {
+ "execMethod": "ai",
+ "execAction": "process",
+ "execParameters": {
+ "prompt": "{{KEY:documentPrompt}}",
+ "documentList": [
+ "web_research_response",
+ "sharepoint_data"
+ ],
+ "outputExtension": ".docx"
+ },
+ "execResultLabel": "result_data"
+ }
+ ]
+ }
+ ]
+}
+
diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py
index 5884c5bc..e91f6afc 100644
--- a/modules/workflows/processing/workflowProcessor.py
+++ b/modules/workflows/processing/workflowProcessor.py
@@ -8,7 +8,7 @@ from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum
from modules.workflows.processing.modes.modeBase import BaseMode
from modules.workflows.processing.modes.modeActionplan import ActionplanMode
from modules.workflows.processing.modes.modeDynamic import DynamicMode
-from modules.workflows.processing.modes.modeTemplate import TemplateMode
+from modules.workflows.processing.modes.modeAutomation import AutomationMode
logger = logging.getLogger(__name__)
@@ -30,8 +30,8 @@ class WorkflowProcessor:
return DynamicMode(self.services, self.workflow)
elif workflowMode == WorkflowModeEnum.WORKFLOW_ACTIONPLAN:
return ActionplanMode(self.services, self.workflow)
- elif workflowMode == WorkflowModeEnum.WORKFLOW_TEMPLATE:
- return TemplateMode(self.services, self.workflow)
+ elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION:
+ return AutomationMode(self.services, self.workflow)
else:
raise ValueError(f"Invalid workflow mode: {workflowMode}")
def _checkWorkflowStopped(self, workflow):