wiki/z-archive/implementation/implementation_workflow_automation_architecture.md

24 KiB

Workflow Automation Feature - Analysis & Implementation Proposal

Overview

This document analyzes the requirements for the "Workflow Automation" feature, identifies gaps and unclear requirements, asks clarifying questions, and proposes an implementation solution.


What I Understand

1. Core Concept

  • Users can define static workflows with placeholders
  • Workflows can be tested manually
  • Workflows can be launched as scheduled background events
  • Each user manages their own automation definitions

2. UI Components Required

  • AutomationDefinition: FormGeneric view for CRUD operations + execute + status display
  • AutomationItemEdit: FormGeneric editor to edit placeholders of a record (simple form for placeholder values)
  • Admin Event Control Module: Sysadmin module in admin section to view and control all running events (for debugging and control)

3. Backend Components Required

  • Pydantic Model: AutomationDefinition in datamodelChat.py
  • Interface Methods: CRUD operations + execute + automation event handler (syncs with scheduler)
  • Route Endpoints: Standard REST endpoints following existing patterns
  • Automation Event Handler: Called on CUD operations and app start to sync scheduler with active automations

4. Model Fields Required

  • label: User-friendly name
  • schedule: List of schedule options (user selects from predefined patterns)
  • template: JSON string with placeholders (format: {{KEY:PLACEHOLDER_NAME}})
  • plan: Auto-generated from template every time (NOT stored, derived on-demand)
  • placeholders: Dictionary of placeholder key/value pairs (e.g., {'connectionName': 'MyConnection', 'webResearchUrl': 'https://...'})
  • active: Boolean flag indicating if automation should be launched in event handler
  • eventId: Event ID from event management system (None if not registered, readonly)
  • status: Computed field (readonly, for UI) - indicates if event is registered and running

4.1 Initial Template

The initial template JSON is stored in gateway/modules/workflows/processing/shared/automationTemplateInitial.json and provides a default workflow structure with three tasks:

  1. Web research (action 1) → web_research_response
  2. SharePoint data extraction (action 2) → sharepoint_data
  3. Document generation (action 3) using both previous results → result_data

This template includes placeholders that should be defined in the placeholders dictionary:

  • connectionName{{KEY:connectionName}} - SharePoint connection name
  • sharepointFolderNameSource{{KEY:sharepointFolderNameSource}} - SharePoint folder path
  • webResearchUrl{{KEY:webResearchUrl}} - URL for web research
  • webResearchPrompt{{KEY:webResearchPrompt}} - Prompt for web research
  • documentPrompt{{KEY:documentPrompt}} - Prompt for document generation

Clarified Requirements

1. Schedule Field Structure

Answer: B - A list of schedule options that the user can choose from.

Implementation: Select dropdown with predefined patterns:

schedule: str = Field(
    description="Schedule pattern",
    frontend_type="select",
    frontend_options=[
        {"value": "0 */4 * * *", "label": {"en": "Every 4 hours", "fr": "Toutes les 4 heures"}},
        {"value": "0 22 * * *", "label": {"en": "Daily at 22:00", "fr": "Quotidien à 22:00"}},
        {"value": "0 10 * * 1", "label": {"en": "Weekly Monday 10:00", "fr": "Hebdomadaire lundi 10:00"}}
    ]
)

2. Template and Plan JSON Structure

Answer: Yes, use the structure of TaskPlan from datamodelChat.py.

Placeholder Format: {{KEY:PLACEHOLDER_NAME}}

Example structure:

{
  "overview": "Brief description",
  "userMessage": "User-friendly message",
  "tasks": [
    {
      "id": "task_1",
      "objective": "Clear objective",
      "actionList": [
        {
          "execMethod": "web",
          "execAction": "research",
          "execParameters": {
            "url": "{{KEY:webResearchUrl}}",
            "prompt": "{{KEY:PromptWebResearch}}"
          }
        }
      ]
    }
  ]
}

3. Placeholder Replacement Mechanism

Answer: Placeholder values are set in the automation definition. Replacement happens when plan is created from template.

Implementation:

  • Placeholder key/value pairs are stored in the placeholders dictionary
  • Replacement happens automatically when generating plan from template (every time plan is needed)
  • Plan is static after generation - regenerated from template each execution/event trigger

4. Active Flag vs EventStart/EventStop

Answer: Replace eventStart/eventStop with an active boolean field.

Implementation:

  • active (bool): Indicates if automation should be launched in event handler
  • Automation Event Handler:
    • Triggered on any CUD operation (Create/Update/Delete) of AutomationDefinition
    • Triggered on app start
    • Checks all automations and syncs scheduler:
      • If active=True and eventId=None: Register new event
      • If active=True and eventId exists: Remove old event, register new (in case schedule changed)
      • If active=False and eventId exists: Remove event from scheduler, set eventId=None
  • Execute: Run workflow immediately (one-time execution, test mode) - does NOT affect active flag or scheduler

5. AutomationItemEdit Module

Answer: Simple formGeneric editor to edit placeholders of a record.

Implementation:

  • FormGeneric-based editor for placeholder values
  • Allows editing the placeholder values stored in the automation definition
  • Shows available placeholders and their current values

6. Event Execution Context

Answer: Use the user who created the automation (_createdBy), stored in the automation definition.

Implementation: When event fires, load automation, get _createdBy userId, create user context, and execute workflow.

7. Database Storage

Answer: Yes, store in database. Implement in existing interfaceDbChatObjects.

Implementation: Follow existing patterns in interfaceDbChatObjects.py for CRUD operations.

8. Plan vs Template Relationship

Answer: Plan to be auto-generated from template every time when executing/starting.

Implementation:

  • plan is NOT stored in database
  • Generated on-demand from template + placeholder values
  • Regenerated each time workflow is executed (manual or scheduled)

📋 Proposed Implementation Solution

Phase 1: Data Model & Access Control

1.1 Pydantic Model (datamodelChat.py)

class AutomationDefinition(BaseModel):
    id: str = Field(
        default_factory=lambda: str(uuid.uuid4()),
        description="Primary key",
        frontend_type="text",
        frontend_readonly=True,
        frontend_required=False
    )
    mandateId: str = Field(
        description="Mandate ID",
        frontend_type="text",
        frontend_readonly=True,
        frontend_required=False
    )
    label: str = Field(
        description="User-friendly name",
        frontend_type="text",
        frontend_required=True
    )
    schedule: str = Field(
        description="Cron schedule pattern",
        frontend_type="select",
        frontend_options=[
            {"value": "0 */4 * * *", "label": {"en": "Every 4 hours", "fr": "Toutes les 4 heures"}},
            {"value": "0 22 * * *", "label": {"en": "Daily at 22:00", "fr": "Quotidien à 22:00"}},
            {"value": "0 10 * * 1", "label": {"en": "Weekly Monday 10:00", "fr": "Hebdomadaire lundi 10:00"}}
        ],
        frontend_required=True
    )
    template: str = Field(
        description="JSON template with placeholders (format: {{KEY:PLACEHOLDER_NAME}})",
        frontend_type="textarea",
        frontend_required=True
    )
    placeholders: Dict[str, str] = Field(
        default_factory=dict,
        description="Dictionary of placeholder key/value pairs (e.g., {'connectionName': 'MyConnection', 'sharepointFolderNameSource': '/folder/path', 'webResearchUrl': 'https://...', 'webResearchPrompt': '...', 'documentPrompt': '...'})",
        frontend_type="text"
    )
    active: bool = Field(
        default=False,
        description="Whether automation should be launched in event handler",
        frontend_type="checkbox",
        frontend_required=False
    )
    eventId: Optional[str] = Field(
        None,
        description="Event ID from event management (None if not registered)",
        frontend_type="text",
        frontend_readonly=True,
        frontend_required=False
    )
    status: Optional[str] = Field(
        None,
        description="Status: 'active' if event is registered, 'inactive' if not (computed, readonly)",
        frontend_type="text",
        frontend_readonly=True,
        frontend_required=False
    )

Note:

  • plan is NOT stored - it's generated on-demand from template + placeholders dictionary
  • status is computed for UI display (based on eventId presence)
  • placeholders dictionary stores all placeholder key/value pairs (e.g., {'connectionName': 'MyConnection', 'webResearchUrl': 'https://...'})

1.2 Access Control

Add to interfaceDbChatAccess.py:

elif table_name == "AutomationDefinition":
    # Users see only their own automation definitions
    filtered_records = [
        r for r in recordset 
        if r.get("mandateId","-") == self.mandateId and r.get("_createdBy") == self.userId
    ]

1.3 Status Field Computation

The status field is computed when retrieving automation definitions:

  • If eventId is not None and not empty: status = "active" (event registered)
  • If eventId is None or empty: status = "inactive" (no event registered)

Phase 2: Interface Methods

2.1 Add to interfaceDbChatObjects.py

class ChatObjects:
    # CRUD methods
    def getAllAutomationDefinitions(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict], PaginatedResult]:
        """Get all automation definitions for current user, with computed status field"""
        
    def getAutomationDefinition(self, automationId: str) -> Optional[Dict]:
        """Get single automation definition with computed status"""
        
    def createAutomationDefinition(self, automation: Dict) -> Dict:
        """Create new automation definition, then trigger automation event handler"""
        
    def updateAutomationDefinition(self, automationId: str, automationData: Dict) -> Dict:
        """Update automation definition, then trigger automation event handler"""
        
    def deleteAutomationDefinition(self, automationId: str) -> bool:
        """Delete automation definition, then trigger automation event handler"""
    
    # Action methods
    async def executeAutomation(self, automationId: str) -> ChatWorkflow:
        """Execute automation immediately (test mode) - generates plan from template"""
        
    # Automation Event Handler (called on CUD operations and app start)
    async def syncAutomationEvents(self) -> Dict[str, Any]:
        """
        Automation event handler - syncs scheduler with all active automations.
        Called:
        - After any CUD operation on AutomationDefinition
        - On app start
        Logic:
        - For each automation with active=True:
          - If eventId is None: register new event
          - If eventId exists: remove old event, register new (handles schedule changes)
        - For each automation with active=False:
          - If eventId exists: remove event, set eventId=None
        """

2.2 Automation Event Handler Implementation

async def syncAutomationEvents(self) -> Dict[str, Any]:
    """Sync event scheduler with automation definitions"""
    from modules.shared.eventManagement import eventManager
    
    # Get all automation definitions (for current mandate)
    automations = self.db.getRecordset(AutomationDefinition)
    filtered = self._uam(AutomationDefinition, automations)
    
    registered_events = {}
    
    for automation in filtered:
        automation_id = automation.get("id")
        is_active = automation.get("active", False)
        current_event_id = automation.get("eventId")
        schedule = automation.get("schedule")
        
        # Parse schedule to cron kwargs
        cron_kwargs = self._parseScheduleToCron(schedule)
        
        if is_active:
            # Remove existing event if present (handles schedule changes)
            if current_event_id:
                eventManager.remove(current_event_id)
            
            # Register new event
            new_event_id = f"automation.{automation_id}"
            eventManager.registerCron(
                jobId=new_event_id,
                func=self._createAutomationEventHandler(automation_id),
                cronKwargs=cron_kwargs,
                replaceExisting=True
            )
            
            # Update automation with new eventId
            self.db.updateRecord(AutomationDefinition, automation_id, {"eventId": new_event_id})
            registered_events[automation_id] = new_event_id
        else:
            # Remove event if exists
            if current_event_id:
                eventManager.remove(current_event_id)
                self.db.updateRecord(AutomationDefinition, automation_id, {"eventId": None})
    
    return {
        "synced": len(registered_events),
        "events": registered_events
    }

2.3 Event Handler Function Factory

def _createAutomationEventHandler(self, automationId: str):
    """Create event handler function for a specific automation"""
    async def handler():
        # Load automation
        automation = self.getAutomationDefinition(automationId)
        if not automation or not automation.get("active"):
            return
        
        # Get user who created automation
        creator_user_id = automation.get("_createdBy")
        creator_user = self._getUserById(creator_user_id)
        
        # Execute workflow
        await self.executeAutomation(automationId)
    
    return handler

Phase 3: Route Endpoints

3.1 Create routeDataAutomation.py

router = APIRouter(prefix="/api/automations", tags=["Automation"])

@router.get("", response_model=PaginatedResponse[AutomationDefinition])
async def get_automations(...): # List with pagination, includes computed status

@router.post("", response_model=AutomationDefinition)
async def create_automation(...): # Create, then triggers syncAutomationEvents

@router.get("/{automationId}", response_model=AutomationDefinition)
async def get_automation(...): # Get one with computed status

@router.put("/{automationId}", response_model=AutomationDefinition)
async def update_automation(...): # Update, then triggers syncAutomationEvents

@router.delete("/{automationId}")
async def delete_automation(...): # Delete, then triggers syncAutomationEvents

@router.post("/{automationId}/execute")
async def execute_automation(...): # Execute immediately (test mode)

3.2 Create routeAdminAutomationEvents.py (Sysadmin Module)

router = APIRouter(prefix="/api/admin/automation-events", tags=["Admin Automation Events"])

@router.get("", response_model=List[Dict])
async def get_all_automation_events(...): # Get all events across all mandates (sysadmin only)

@router.post("/sync")
async def sync_all_automation_events(...): # Manually trigger sync for all automations (sysadmin only)

@router.post("/{eventId}/remove")
async def remove_event(...): # Manually remove event from scheduler (sysadmin only)

Phase 4: Placeholder Replacement & Execution

4.1 Placeholder Replacement Logic

def replacePlaceholders(template: str, automation: Dict) -> str:
    """
    Replace placeholders in template with actual values from automation.
    
    Placeholder format: {{KEY:PLACEHOLDER_NAME}}
    Values come from automation.placeholders dictionary
    """
    import re
    
    result = template
    placeholders = automation.get("placeholders", {})
    
    # Replace all {{KEY:PLACEHOLDER_NAME}} patterns
    for placeholderName, value in placeholders.items():
        pattern = f"{{{{KEY:{placeholderName}}}}}"
        result = result.replace(pattern, str(value))
    
    return result

4.2 Execution Function

async def executeAutomation(self, automationId: str) -> ChatWorkflow:
    """Execute automation workflow with placeholder replacement"""
    # 1. Load automation definition
    automation = self.getAutomationDefinition(automationId)
    if not automation:
        raise ValueError(f"Automation {automationId} not found")
    
    # 2. Replace placeholders in template to generate plan
    planJson = replacePlaceholders(automation["template"], automation)
    plan = json.loads(planJson)
    
    # 3. Get user who created automation
    creator_user_id = automation.get("_createdBy")
    creator_user = self.access.currentUser  # Need to get user by ID
    
    # 4. Create UserInputRequest from plan
    # Convert plan to UserInputRequest format
    userInput = UserInputRequest(
        prompt=self._planToPrompt(plan),
        listFileId=[],
        userLanguage=creator_user.language
    )
    
    # 5. Start workflow using WorkflowManager
    from modules.features.chatPlayground.mainChatPlayground import chatStart
    from modules.datamodels.datamodelChat import WorkflowModeEnum
    
    workflow = await chatStart(
        currentUser=creator_user,
        userInput=userInput,
        workflowMode=WorkflowModeEnum.WORKFLOW_TEMPLATE,
        workflowId=None
    )
    
    return workflow

def _planToPrompt(self, plan: Dict) -> str:
    """Convert plan structure to prompt string for workflow execution"""
    # Extract user message or generate from tasks
    return plan.get("userMessage", plan.get("overview", "Execute automation workflow"))

4.3 Schedule Parsing

def _parseScheduleToCron(self, schedule: str) -> Dict[str, Any]:
    """Parse schedule string to cron kwargs for APScheduler"""
    # schedule format: "0 */4 * * *" (cron string)
    # Parse to: {"minute": "0,20,40", "hour": "*", "day": "*", "month": "*", "day_of_week": "*"}
    parts = schedule.split()
    if len(parts) != 5:
        raise ValueError(f"Invalid schedule format: {schedule}")
    
    return {
        "minute": parts[0],
        "hour": parts[1],
        "day": parts[2],
        "month": parts[3],
        "day_of_week": parts[4]
    }

4.4 App Start Integration

In app startup (e.g., app.py or main startup file):

async def on_app_start():
    """Called when app starts"""
    from modules.interfaces.interfaceDbChatObjects import getInterface
    from modules.datamodels.datamodelUam import User
    
    # Get event user or system user
    eventUser = getEventUser()  # Get system/event user
    interface = getInterface(eventUser)
    
    # Sync all automation events
    await interface.syncAutomationEvents()

Phase 5: UI Integration

5.1 FormGeneric Module Configuration

// frontend_agents/public/js/modules/formAutomationDefinition.js
const automationConfig = {
    entityType: "automation",
    apiEndpoint: {
        get: () => api.get("/api/automations"),
        create: (data) => api.post("/api/automations", data),
        update: (id, data) => api.put(`/api/automations/${id}`, data),
        delete: (id) => api.delete(`/api/automations/${id}`)
    },
    customActions: [
        {
            label: "Execute",
            action: async (item) => {
                await api.post(`/api/automations/${item.id}/execute`);
            }
        }
    ],
    // Status field is displayed in table (readonly, computed)
    // active field is shown as checkbox in form
};

5.2 AutomationItemEdit Module

// frontend_agents/public/js/modules/formAutomationItemEdit.js
// Simple formGeneric editor for placeholder values
const automationItemEditConfig = {
    entityType: "automation",
    entityId: null, // Set dynamically when editing
    fields: [
        // Dynamically generated from automation.placeholders dictionary keys
        // Each placeholder key becomes a text input field
        // Values stored directly in automation.placeholders[placeholderName]
    ]
};
  • FormGeneric-based editor for editing placeholder values
  • Dynamically generates form fields from automation.placeholders dictionary keys
  • Shows placeholder names and allows editing their values
  • Values stored directly in automation.placeholders dictionary (key/value pairs)

5.3 Admin Event Control Module

// frontend_agents/public/js/modules/adminAutomationEvents.js
// Sysadmin module in admin section of navigator
const adminEventsConfig = {
    title: "Automation Events",
    apiEndpoint: {
        get: () => api.get("/api/admin/automation-events"),
        sync: () => api.post("/api/admin/automation-events/sync"),
        remove: (eventId) => api.post(`/api/admin/automation-events/${eventId}/remove`)
    },
    // Shows all registered events across all mandates
    // Allows manual sync and event removal for debugging
};
  • Shows all running automation events (sysadmin only)
  • Displays event ID, automation ID, schedule, status
  • Allows manual sync of all events
  • Allows manual removal of specific events (for debugging)
  • Located in admin section of navigator

🔍 Additional Considerations

Error Handling

  • What happens if placeholder values are missing?
  • What if template JSON is invalid?
  • What if workflow execution fails?

Validation

  • Validate cron schedule format
  • Validate template JSON structure
  • Validate placeholder names match defined list

Security

  • Ensure users can only execute their own automations
  • Validate placeholder values (sanitize inputs)
  • Prevent injection attacks in placeholder replacement

Monitoring

  • Log automation executions
  • Track success/failure rates
  • Monitor event scheduler status

📝 Implementation Checklist

Phase 1: Data Model

  • Create AutomationDefinition Pydantic model in datamodelChat.py
  • Add active boolean field (replaces eventStart/eventStop)
  • Add status computed field for UI
  • Add placeholders dict field (key/value pairs)
  • Add registerModelLabels for translations
  • Update access control in interfaceDbChatAccess.py

Phase 2: Interface Methods

  • Add CRUD methods to interfaceDbChatObjects.py
  • Add executeAutomation method
  • Add syncAutomationEvents method (automation event handler)
  • Add replacePlaceholders helper function
  • Add _parseScheduleToCron helper function
  • Add status computation in get methods

Phase 3: Route Endpoints

  • Create routeDataAutomation.py with CRUD + execute endpoints
  • Add sync trigger after CUD operations
  • Create routeAdminAutomationEvents.py for sysadmin module
  • Register routes in app.py

Phase 4: Event Integration

  • Implement syncAutomationEvents logic
  • Create event handler factory function
  • Add app startup integration
  • Test event registration/removal

Phase 5: UI Components

  • Create formAutomationDefinition.js module
  • Create formAutomationItemEdit.js module
  • Create adminAutomationEvents.js module (sysadmin)
  • Add to navigator admin section

Phase 6: Testing

  • Test CRUD operations
  • Test placeholder replacement
  • Test execute endpoint
  • Test active flag changes trigger sync
  • Test event registration/removal
  • Test app startup sync
  • Test sysadmin module

All Questions Answered

  1. Schedule Format: B - List of schedule options (select dropdown)
  2. Template Structure: Use TaskPlan structure with {{KEY:PLACEHOLDER_NAME}} format
  3. Placeholder Source: Values stored in placeholders dictionary (key/value pairs) in automation definition
  4. Plan Storage: Auto-generated from template every time (not stored)
  5. Execute Behavior: Execute runs workflow immediately (test mode)
  6. Active Flag: Boolean active field, sync triggered on CUD operations and app start
  7. User Context: Use _createdBy user for scheduled executions
  8. Database Storage: Implement in interfaceDbChatObjects
  9. Placeholder Format: {{KEY:PLACEHOLDER_NAME}}
  10. Status Field: Computed readonly field showing if event is registered

Next Steps

Ready to implement:

  1. Pydantic model with all clarified fields
  2. Interface methods with event sync logic
  3. Route endpoints with CUD + execute
  4. Placeholder replacement with {{KEY:NAME}} format
  5. Event management integration with active flag
  6. UI modules for automation management
  7. Admin module for event control