gateway/modules/workflows/processing/modes/modeBase.py
2026-04-10 12:33:27 +02:00

120 lines
5.6 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
# modeBase.py
# Abstract base class for workflow modes
from abc import ABC, abstractmethod
import uuid
import logging
from typing import List, Dict, Any, Optional
from modules.datamodels.datamodelChat import TaskStep, TaskContext, ChatTaskResult, ActionItem, TaskStatus
from modules.datamodels.datamodelChat import ChatWorkflow
from modules.workflows.processing.core.taskPlanner import TaskPlanner
from modules.workflows.processing.core.actionExecutor import ActionExecutor
from modules.workflows.processing.core.messageCreator import MessageCreator
from modules.workflows.processing.core.validator import WorkflowValidator
from modules.shared.timeUtils import parseTimestamp
logger = logging.getLogger(__name__)
class BaseMode(ABC):
"""Abstract base class for workflow execution modes"""
def __init__(self, services):
self.services = services
self.taskPlanner = TaskPlanner(services)
self.actionExecutor = ActionExecutor(services)
self.messageCreator = MessageCreator(services)
self.validator = WorkflowValidator(services)
@abstractmethod
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext) -> ChatTaskResult:
"""Execute a task step - must be implemented by concrete modes"""
pass
@abstractmethod
async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
previousResults: List = None, enhancedContext: TaskContext = None) -> List[ActionItem]:
"""Generate actions for a task step - must be implemented by concrete modes"""
pass
async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow):
"""Generate task plan - common to all modes"""
return await self.taskPlanner.generateTaskPlan(userInput, workflow)
async def createTaskPlanMessage(self, taskPlan, workflow: ChatWorkflow):
"""Create task plan message - common to all modes"""
return await self.messageCreator.createTaskPlanMessage(taskPlan, workflow)
def _createActionItem(self, actionData: Dict[str, Any]) -> Optional[ActionItem]:
"""Create an ActionItem from action data, persist to DB, and return the model instance"""
try:
if "id" not in actionData or not actionData["id"]:
actionData["id"] = f"action_{uuid.uuid4()}"
if "status" not in actionData:
actionData["status"] = TaskStatus.PENDING
if "execMethod" not in actionData:
logger.error("execMethod is required for task action")
return None
if "execAction" not in actionData:
logger.error("execAction is required for task action")
return None
if "execParameters" not in actionData:
actionData["execParameters"] = {}
simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData)
createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields)
return ActionItem(
id=createdAction["id"],
execMethod=createdAction["execMethod"],
execAction=createdAction["execAction"],
execParameters=createdAction.get("execParameters", {}),
execResultLabel=createdAction.get("execResultLabel"),
expectedDocumentFormats=createdAction.get("expectedDocumentFormats"),
status=createdAction.get("status", TaskStatus.PENDING),
error=createdAction.get("error"),
retryCount=createdAction.get("retryCount", 0),
retryMax=createdAction.get("retryMax", 3),
processingTime=createdAction.get("processingTime"),
timestamp=parseTimestamp(createdAction.get("timestamp"), default=self.services.utils.timestampGetUtc()),
result=createdAction.get("result"),
userMessage=createdAction.get("userMessage")
)
except Exception as e:
logger.error(f"Error creating task action: {str(e)}")
return None
def _updateWorkflowBeforeExecutingTask(self, taskNumber: int):
"""Update workflow state before executing a task"""
try:
workflow = self.services.workflow
updateData = {
"currentTask": taskNumber,
"currentAction": 0,
"totalActions": 0
}
workflow.currentTask = taskNumber
workflow.currentAction = 0
workflow.totalActions = 0
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
logger.info(f"Updated workflow {workflow.id} before executing task {taskNumber}")
except Exception as e:
logger.error(f"Error updating workflow before executing task: {str(e)}")
def _updateWorkflowBeforeExecutingAction(self, actionNumber: int):
"""Update workflow state before executing an action"""
try:
workflow = self.services.workflow
updateData = {"currentAction": actionNumber}
workflow.currentAction = actionNumber
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
logger.info(f"Updated workflow {workflow.id} before executing action {actionNumber}")
except Exception as e:
logger.error(f"Error updating workflow before executing action: {str(e)}")