From 30d0a8f70c02f767827239bb14f375ae54442ea6 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 22 Sep 2025 23:34:47 +0200
Subject: [PATCH] Full refactored workflow and features
---
app.py | 6 +-
modules/chat/managerChat.py | 115 ----
.../chatPlayground/mainChatPlayground.py | 29 +
.../mainNeutralizePlayground.py} | 2 +-
.../mainSyncDelta.py} | 0
modules/interfaces/interfaceAiCalls.py | 2 +-
modules/interfaces/interfaceAppObjects.py | 2 +-
modules/interfaces/interfaceChatModel.py | 92 ++-
modules/interfaces/interfaceChatObjects.py | 237 +------
modules/routes/routeAdmin.py | 219 -------
modules/routes/routeChatPlayground.py | 132 ++++
modules/routes/routeDataNeutralization.py | 2 +-
modules/routes/routeWorkflows.py | 96 +--
modules/{chat => services}/serviceCenter.py | 8 +-
.../serviceDocument}/documentExtraction.py | 4 +-
.../serviceDocument}/documentGeneration.py | 2 +-
.../serviceDocument}/documentUtility.py | 0
.../serviceNeutralization}/neutralizer.py | 12 +-
.../serviceNeutralization}/readme.md | 0
.../serviceNeutralization}/subParseString.py | 2 +-
.../serviceNeutralization}/subPatterns.py | 0
.../subProcessBinary.py | 0
.../subProcessCommon.py | 0
.../serviceNeutralization}/subProcessList.py | 8 +-
.../serviceNeutralization}/subProcessText.py | 2 +-
.../_transfer}/executionState.py | 29 +-
.../_transfer}/handlingTasks.py | 168 ++++-
.../_transfer}/promptFactory.py | 96 ++-
modules/{ => workflows}/methods/methodAi.py | 2 +-
modules/{ => workflows}/methods/methodBase.py | 0
.../{ => workflows}/methods/methodDocument.py | 2 +-
.../{ => workflows}/methods/methodOutlook.py | 2 +-
.../methods/methodSharepoint.py | 2 +-
modules/{ => workflows}/methods/methodWeb.py | 2 +-
.../workflowManager.py} | 585 +++++++++++-------
tests/methods/test_method_web.py | 2 +-
tool_stats_durations_from_log.py | 2 +-
37 files changed, 924 insertions(+), 940 deletions(-)
delete mode 100644 modules/chat/managerChat.py
create mode 100644 modules/features/chatPlayground/mainChatPlayground.py
rename modules/features/{featureNeutralizePlayground.py => neutralizePlayground/mainNeutralizePlayground.py} (99%)
rename modules/features/{featureSyncDelta.py => syncDelta/mainSyncDelta.py} (100%)
create mode 100644 modules/routes/routeChatPlayground.py
rename modules/{chat => services}/serviceCenter.py (99%)
rename modules/{chat/documents => services/serviceDocument}/documentExtraction.py (99%)
rename modules/{chat/documents => services/serviceDocument}/documentGeneration.py (99%)
rename modules/{chat/documents => services/serviceDocument}/documentUtility.py (100%)
rename modules/{neutralizer => services/serviceNeutralization}/neutralizer.py (87%)
rename modules/{neutralizer => services/serviceNeutralization}/readme.md (100%)
rename modules/{neutralizer => services/serviceNeutralization}/subParseString.py (98%)
rename modules/{neutralizer => services/serviceNeutralization}/subPatterns.py (100%)
rename modules/{neutralizer => services/serviceNeutralization}/subProcessBinary.py (100%)
rename modules/{neutralizer => services/serviceNeutralization}/subProcessCommon.py (100%)
rename modules/{neutralizer => services/serviceNeutralization}/subProcessList.py (96%)
rename modules/{neutralizer => services/serviceNeutralization}/subProcessText.py (97%)
rename modules/{chat/handling => workflows/_transfer}/executionState.py (64%)
rename modules/{chat/handling => workflows/_transfer}/handlingTasks.py (89%)
rename modules/{chat/handling => workflows/_transfer}/promptFactory.py (91%)
rename modules/{ => workflows}/methods/methodAi.py (99%)
rename modules/{ => workflows}/methods/methodBase.py (100%)
rename modules/{ => workflows}/methods/methodDocument.py (99%)
rename modules/{ => workflows}/methods/methodOutlook.py (99%)
rename modules/{ => workflows}/methods/methodSharepoint.py (99%)
rename modules/{ => workflows}/methods/methodWeb.py (99%)
rename modules/{features/featureChatPlayground.py => workflows/workflowManager.py} (55%)
diff --git a/app.py b/app.py
index 02e05076..5caa64f8 100644
--- a/app.py
+++ b/app.py
@@ -205,14 +205,10 @@ async def lifespan(app: FastAPI):
# Startup logic
logger.info("Application is starting up")
- # Initialize root interface to ensure database is properly set up
- from modules.interfaces.interfaceAppObjects import getRootInterface
- getRootInterface()
-
# Setup APScheduler for JIRA sync
scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Zurich"))
try:
- from modules.features.featureSyncDelta import perform_sync_jira_delta_group
+ from modules.features.syncDelta.mainSyncDelta import perform_sync_jira_delta_group
# Schedule sync every 20 minutes (at minutes 00, 20, 40)
scheduler.add_job(
perform_sync_jira_delta_group,
diff --git a/modules/chat/managerChat.py b/modules/chat/managerChat.py
deleted file mode 100644
index 882d46e3..00000000
--- a/modules/chat/managerChat.py
+++ /dev/null
@@ -1,115 +0,0 @@
-import logging
-from typing import Dict, Any, List
-from modules.interfaces.interfaceAppModel import User
-from modules.interfaces.interfaceChatModel import ChatWorkflow, UserInputRequest, TaskStep, TaskAction, ActionResult, ReviewResult, TaskPlan, WorkflowResult, TaskContext
-from modules.interfaces.interfaceChatObjects import ChatObjects
-from modules.chat.handling.handlingTasks import HandlingTasks, WorkflowStoppedException
-
-logger = logging.getLogger(__name__)
-
-# ===== STATE MANAGEMENT AND VALIDATION CLASSES =====
-
-class ChatManager:
- """Chat manager with improved AI integration and method handling"""
-
- def __init__(self, currentUser: User, chatInterface: ChatObjects):
- self.currentUser = currentUser
- self.chatInterface = chatInterface
- self.workflow: ChatWorkflow = None
- self.handlingTasks: HandlingTasks = None
-
- async def initialize(self, workflow: ChatWorkflow) -> None:
- """Initialize chat manager with workflow"""
- self.workflow = workflow
- self.handlingTasks = HandlingTasks(self.chatInterface, self.currentUser, self.workflow)
-
-
- async def executeUnifiedWorkflow(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> WorkflowResult:
- """Unified Workflow Execution"""
- try:
- logger.info(f"Starting unified workflow execution for workflow {workflow.id}")
-
- # Phase 1: High-Level Task Planning
- logger.info("Phase 1: Generating task plan")
- task_plan = await self.handlingTasks.generateTaskPlan(userInput.prompt, workflow)
- if not task_plan or not task_plan.tasks:
- raise Exception("No tasks generated in task plan.")
-
- # Phase 2-5: For each task, execute and get results
- total_tasks = len(task_plan.tasks)
- logger.info(f"Phase 2: Executing {total_tasks} tasks")
- all_task_results = []
- previous_results = []
- for idx, task_step in enumerate(task_plan.tasks):
- # Pass task index to executeTask method
- current_task_index = idx + 1
-
- logger.info(f"Task {idx+1}/{total_tasks}: {task_step.objective}")
-
- # Create proper context object for this task
- task_context = TaskContext(
- task_step=task_step,
- workflow=workflow,
- workflow_id=workflow.id,
- available_documents=None,
- available_connections=None,
- previous_results=previous_results,
- previous_handover=None,
- improvements=[],
- retry_count=0,
- previous_action_results=[],
- previous_review_result=None,
- is_regeneration=False,
- failure_patterns=[],
- failed_actions=[],
- successful_actions=[],
- criteria_progress={
- 'met_criteria': set(),
- 'unmet_criteria': set(),
- 'attempt_history': []
- }
- )
-
- # Execute task (this handles action generation, execution, and review internally)
- task_result = await self.handlingTasks.executeTask(task_step, workflow, task_context, current_task_index, total_tasks)
- # Handover
- handover_data = await self.handlingTasks.prepareTaskHandover(task_step, [], task_result, workflow)
- # Collect results
- all_task_results.append({
- 'task_step': task_step,
- 'task_result': task_result,
- 'handover_data': handover_data
- })
- # Update previous results for next task
- if task_result.success and task_result.feedback:
- previous_results.append(task_result.feedback)
-
- # Final workflow result
- workflow_result = WorkflowResult(
- status="completed",
- completed_tasks=len(all_task_results),
- total_tasks=len(task_plan.tasks),
- execution_time=0.0, # TODO: Calculate actual execution time
- final_results_count=len(all_task_results)
- )
- logger.info(f"Unified workflow execution completed successfully for workflow {workflow.id}")
- return workflow_result
- except WorkflowStoppedException:
- logger.info(f"Workflow {workflow.id} was stopped by user")
- return WorkflowResult(
- status="stopped",
- completed_tasks=0,
- total_tasks=0,
- execution_time=0.0,
- final_results_count=0
- )
- except Exception as e:
- logger.error(f"Error in executeUnifiedWorkflow: {str(e)}")
- return WorkflowResult(
- status="failed",
- completed_tasks=0,
- total_tasks=0,
- execution_time=0.0,
- final_results_count=0,
- error=str(e)
- )
diff --git a/modules/features/chatPlayground/mainChatPlayground.py b/modules/features/chatPlayground/mainChatPlayground.py
new file mode 100644
index 00000000..13eba835
--- /dev/null
+++ b/modules/features/chatPlayground/mainChatPlayground.py
@@ -0,0 +1,29 @@
+import logging
+import asyncio
+from typing import Optional
+
+from modules.interfaces.interfaceAppModel import User
+from modules.interfaces.interfaceChatModel import ChatWorkflow, UserInputRequest
+from modules.shared.timezoneUtils import get_utc_timestamp
+
+logger = logging.getLogger(__name__)
+
+async def chatStart(interfaceChat, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow:
+ """Starts a new chat or continues an existing one, then launches processing asynchronously."""
+ try:
+ from modules.workflows.workflowManager import WorkflowManager
+ workflowManager = WorkflowManager(interfaceChat, currentUser)
+ return await workflowManager.workflowStart(userInput, workflowId)
+ except Exception as e:
+ logger.error(f"Error starting chat: {str(e)}")
+ raise
+
+async def chatStop(interfaceChat, currentUser: User, workflowId: str) -> ChatWorkflow:
+ """Stops a running chat."""
+ try:
+ from modules.workflows.workflowManager import WorkflowManager
+ workflowManager = WorkflowManager(interfaceChat, currentUser)
+ return await workflowManager.workflowStop(workflowId)
+ except Exception as e:
+ logger.error(f"Error stopping chat: {str(e)}")
+ raise
diff --git a/modules/features/featureNeutralizePlayground.py b/modules/features/neutralizePlayground/mainNeutralizePlayground.py
similarity index 99%
rename from modules/features/featureNeutralizePlayground.py
rename to modules/features/neutralizePlayground/mainNeutralizePlayground.py
index e5c75a37..877ca8aa 100644
--- a/modules/features/featureNeutralizePlayground.py
+++ b/modules/features/neutralizePlayground/mainNeutralizePlayground.py
@@ -13,7 +13,7 @@ import mimetypes
from modules.interfaces.interfaceAppObjects import getInterface
from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes
-from modules.neutralizer.neutralizer import DataAnonymizer
+from modules.services.serviceNeutralization.neutralizer import DataAnonymizer
from modules.shared.timezoneUtils import get_utc_timestamp
logger = logging.getLogger(__name__)
diff --git a/modules/features/featureSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py
similarity index 100%
rename from modules/features/featureSyncDelta.py
rename to modules/features/syncDelta/mainSyncDelta.py
diff --git a/modules/interfaces/interfaceAiCalls.py b/modules/interfaces/interfaceAiCalls.py
index f0bb67b4..6f0de9c9 100644
--- a/modules/interfaces/interfaceAiCalls.py
+++ b/modules/interfaces/interfaceAiCalls.py
@@ -2,7 +2,7 @@ import logging
from typing import Dict, Any, List, Union, Optional
from modules.connectors.connectorAiOpenai import AiOpenai, ContextLengthExceededException
from modules.connectors.connectorAiAnthropic import AiAnthropic
-from modules.chat.documents.documentExtraction import DocumentExtraction
+from modules.services.serviceDocument.documentExtraction import DocumentExtraction
from modules.interfaces.interfaceChatModel import ChatDocument
logger = logging.getLogger(__name__)
diff --git a/modules/interfaces/interfaceAppObjects.py b/modules/interfaces/interfaceAppObjects.py
index ed8fdca1..ccd471f1 100644
--- a/modules/interfaces/interfaceAppObjects.py
+++ b/modules/interfaces/interfaceAppObjects.py
@@ -1141,7 +1141,7 @@ class AppObjects:
def neutralizeText(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]:
"""Neutralize text content and store attribute mappings"""
try:
- from modules.neutralizer.neutralizer import DataAnonymizer
+ from modules.services.serviceNeutralization.neutralizer import DataAnonymizer
# Get neutralization configuration to extract namesToParse
config = self.getNeutralizationConfig()
diff --git a/modules/interfaces/interfaceChatModel.py b/modules/interfaces/interfaceChatModel.py
index ed71963a..9ead0fb2 100644
--- a/modules/interfaces/interfaceChatModel.py
+++ b/modules/interfaces/interfaceChatModel.py
@@ -80,6 +80,70 @@ register_model_labels(
}
)
+# ===== Minimal ReAct-style Workflow Models =====
+
+class ActionSelection(BaseModel, ModelMixin):
+ """Model for selecting exactly one action in a step"""
+ method: str = Field(description="Method to execute (e.g., web, document, ai)")
+ name: str = Field(description="Action name within the method (e.g., search, extract)")
+
+register_model_labels(
+ "ActionSelection",
+ {"en": "Action Selection", "fr": "Sélection d'action"},
+ {
+ "method": {"en": "Method", "fr": "Méthode"},
+ "name": {"en": "Action Name", "fr": "Nom de l'action"}
+ }
+)
+
+class ActionParameters(BaseModel, ModelMixin):
+ """Model for specifying only the parameters for the selected action"""
+ parameters: Dict[str, Any] = Field(default_factory=dict, description="Parameters to execute the selected action")
+
+register_model_labels(
+ "ActionParameters",
+ {"en": "Action Parameters", "fr": "Paramètres d'action"},
+ {
+ "parameters": {"en": "Parameters", "fr": "Paramètres"}
+ }
+)
+
+class ObservationPreview(BaseModel, ModelMixin):
+ """Compact preview item for observations"""
+ name: str = Field(description="Document name or URL label")
+ mime: str = Field(description="MIME type or kind")
+ snippet: str = Field(description="Short snippet or summary")
+
+register_model_labels(
+ "ObservationPreview",
+ {"en": "Observation Preview", "fr": "Aperçu d'observation"},
+ {
+ "name": {"en": "Name", "fr": "Nom"},
+ "mime": {"en": "MIME", "fr": "MIME"},
+ "snippet": {"en": "Snippet", "fr": "Extrait"}
+ }
+)
+
+class Observation(BaseModel, ModelMixin):
+ """Compact observation returned to the model after each action"""
+ success: bool = Field(description="Action execution success flag")
+ resultLabel: str = Field(description="Deterministic label for produced documents")
+ documentsCount: int = Field(description="Number of produced documents")
+ previews: List[ObservationPreview] = Field(default_factory=list, description="Compact previews of outputs")
+ notes: List[str] = Field(default_factory=list, description="Short notes or key facts")
+
+register_model_labels(
+ "Observation",
+ {"en": "Observation", "fr": "Observation"},
+ {
+ "success": {"en": "Success", "fr": "Succès"},
+ "resultLabel": {"en": "Result Label", "fr": "Étiquette du résultat"},
+ "documentsCount": {"en": "Documents Count", "fr": "Nombre de documents"},
+ "previews": {"en": "Previews", "fr": "Aperçus"},
+ "notes": {"en": "Notes", "fr": "Notes"}
+ }
+)
+
# ===== Base Enums and Simple Models =====
class TaskStatus(str, Enum):
@@ -630,6 +694,25 @@ class ChatWorkflow(BaseModel, ModelMixin):
frontend_readonly=True,
frontend_required=False
)
+ # Workflow mode selection (e.g., Actionplan, React)
+ workflowMode: str = Field(
+ default="Actionplan",
+ description="Workflow mode selector",
+ frontend_type="select",
+ frontend_readonly=False,
+ frontend_required=False,
+ frontend_options=[
+ {"value": "Actionplan", "label": {"en": "Action Plan", "fr": "Plan d'actions"}},
+ {"value": "React", "label": {"en": "React", "fr": "Réactif"}}
+ ]
+ )
+ maxSteps: int = Field(
+ default=5,
+ description="Maximum number of iterations in react mode",
+ frontend_type="integer",
+ frontend_readonly=False,
+ frontend_required=False
+ )
# Register labels for ChatWorkflow
register_model_labels(
@@ -650,11 +733,13 @@ register_model_labels(
"logs": {"en": "Logs", "fr": "Journaux"},
"messages": {"en": "Messages", "fr": "Messages"},
"stats": {"en": "Statistics", "fr": "Statistiques"},
- "tasks": {"en": "Tasks", "fr": "Tâches"}
+ "tasks": {"en": "Tasks", "fr": "Tâches"},
+ "workflowMode": {"en": "Workflow Mode", "fr": "Mode de workflow"},
+ "maxSteps": {"en": "Max Steps", "fr": "Étapes max"}
}
)
-# ====== WORKFLOW SUPPORT MODELS (for managerChat.py compatibility) ======
+# ====== WORKFLOW SUPPORT MODELS ======
class TaskStep(BaseModel, ModelMixin):
id: str
@@ -763,6 +848,9 @@ class TaskContext(BaseModel, ModelMixin):
# Criteria progress tracking for retries
criteria_progress: Optional[dict] = None
+ # Iterative loop controls (moved to ChatWorkflow.workflowMode and ChatWorkflow.maxSteps)
+ # reactMode and maxSteps are now controlled at the workflow level
+
def getDocumentReferences(self) -> List[str]:
"""Get all available document references from previous handover"""
docs = []
diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py
index 7b6806da..1b0a2af5 100644
--- a/modules/interfaces/interfaceChatObjects.py
+++ b/modules/interfaces/interfaceChatObjects.py
@@ -748,10 +748,9 @@ class ChatObjects:
except Exception as e:
logger.error(f"Error removing file {fileId} from message {messageId}: {str(e)}")
return False
-
# Document methods
-
+
def getDocuments(self, messageId: str) -> List[ChatDocument]:
"""Returns documents for a message from normalized table."""
try:
@@ -910,7 +909,7 @@ class ChatObjects:
msg_timestamp = msg.get("publishedAt", get_utc_timestamp())
if afterTimestamp is not None and msg_timestamp <= afterTimestamp:
continue
-
+
# Load documents for each message
documents = self.getDocuments(msg["id"])
@@ -952,7 +951,7 @@ class ChatObjects:
log_timestamp = log.get("timestamp", get_utc_timestamp())
if afterTimestamp is not None and log_timestamp <= afterTimestamp:
continue
-
+
chat_log = ChatLog(**log)
items.append({
"type": "log",
@@ -967,7 +966,7 @@ class ChatObjects:
stat_timestamp = stat.get("_createdAt", get_utc_timestamp())
if afterTimestamp is not None and stat_timestamp <= afterTimestamp:
continue
-
+
chat_stat = ChatStat(**stat)
items.append({
"type": "stat",
@@ -980,234 +979,6 @@ class ChatObjects:
return {"items": items}
- def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool:
- """Updates workflow statistics during execution with incremental values."""
- try:
- # Get current workflow
- workflow = self.getWorkflow(workflowId)
- if not workflow:
- logger.error(f"Workflow {workflowId} not found for stats update")
- return False
-
- if not self._canModify(ChatWorkflow, workflowId):
- logger.error(f"No permission to update workflow {workflowId} stats")
- return False
-
- # Get current stats from normalized table
- currentStats = self.getWorkflowStats(workflowId)
- if currentStats:
- current_bytes_sent = currentStats.bytesSent or 0
- current_bytes_received = currentStats.bytesReceived or 0
- current_processing_time = currentStats.processingTime or 0
- else:
- current_bytes_sent = 0
- current_bytes_received = 0
- current_processing_time = 0
-
- # Calculate processing time as duration since workflow start
- if workflow and workflow.startedAt:
- try:
- start_time = int(float(workflow.startedAt))
- current_time = int(get_utc_timestamp())
- processing_time = current_time - start_time
-
- # Ensure processing time is reasonable
- if processing_time < 0:
- processing_time = 0
- elif processing_time > 86400 * 365: # More than 1 year
- processing_time = 0
- except Exception as e:
- logger.warning(f"Error calculating processing time: {str(e)}")
- processing_time = current_processing_time
- else:
- processing_time = current_processing_time
-
- # Update stats with incremental values
- new_bytes_sent = current_bytes_sent + bytesSent
- new_bytes_received = current_bytes_received + bytesReceived
- new_token_count = new_bytes_sent + new_bytes_received
-
- # Create or update stats record in normalized table
- stats_record = {
- "workflowId": workflowId,
- "processingTime": processing_time,
- "tokenCount": new_token_count,
- "bytesSent": new_bytes_sent,
- "bytesReceived": new_bytes_received,
- "successRate": None,
- "errorCount": None
- }
-
- # Create new stats record
- self.db.recordCreate(ChatStat, stats_record)
-
-
- return True
-
- except Exception as e:
- logger.error(f"Error updating workflow stats: {str(e)}")
- return False
-
-
- # Workflow Actions
-
- async def workflowStart(self, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow:
- """
- Starts a new workflow or continues an existing one.
-
- Args:
- userInput: The user input request containing workflow initialization data
- workflowId: Optional ID of an existing workflow to continue
-
- Returns:
- ChatWorkflow object representing the started/continued workflow
- """
- try:
- # Get current timestamp
- currentTime = get_utc_timestamp()
-
- if workflowId:
- # Continue existing workflow - load complete state including messages
- workflow = self.getWorkflow(workflowId)
- if not workflow:
- raise ValueError(f"Workflow {workflowId} not found")
-
- # Check if workflow is currently running and stop it first
- if workflow.status == "running":
- logger.info(f"Stopping running workflow {workflowId} before processing new prompt")
-
- # Stop the running workflow
- workflow.status = "stopped"
- workflow.lastActivity = currentTime
- self.updateWorkflow(workflowId, {
- "status": "stopped",
- "lastActivity": currentTime
- })
-
- # Add log entry for workflow stop
- self.createLog({
- "workflowId": workflowId,
- "message": "Workflow stopped for new prompt",
- "type": "info",
- "status": "stopped",
- "progress": 100
- })
-
- # Wait a moment for any running processes to detect the stop
- await asyncio.sleep(0.1)
-
- # Update workflow - increment round for existing workflows
- newRound = workflow.currentRound + 1
- self.updateWorkflow(workflowId, {
- "status": "running", # Set status back to running for resumed workflows
- "lastActivity": currentTime,
- "currentRound": newRound
- })
-
- # Reload workflow object to get updated currentRound from database
- workflow = self.getWorkflow(workflowId)
- if not workflow:
- raise ValueError(f"Failed to reload workflow {workflowId} after update")
-
- # Add log entry for workflow resumption
- self.createLog({
- "workflowId": workflowId,
- "message": f"Workflow resumed (round {workflow.currentRound})",
- "type": "info",
- "status": "running",
- "progress": 0
- })
-
- else:
- # Create new workflow
- workflowData = {
- "name": "New Workflow", # Default name since UserInputRequest doesn't have a name field
- "status": "running",
- "startedAt": currentTime,
- "lastActivity": currentTime,
- "currentRound": 0, # Default value, will be set to 1 in workflowStart()
- "currentTask": 0,
- "currentAction": 0,
- "totalTasks": 0,
- "totalActions": 0,
- "mandateId": self.mandateId,
- "messageIds": [],
- "stats": {
- "processingTime": None,
- "tokenCount": None,
- "bytesSent": None,
- "bytesReceived": None,
- "successRate": None,
- "errorCount": None
- }
- }
-
- # Create workflow
- workflow = self.createWorkflow(workflowData)
-
- # Set currentRound to 1 for new workflows
- workflow.currentRound = 1
- self.updateWorkflow(workflow.id, {"currentRound": 1})
-
- # Initialize stats for the new workflow
- self.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0)
-
- # Remove the 'Workflow started' log entry
-
- # Start workflow processing
- from modules.features.featureChatPlayground import WorkflowManager
- workflowManager = WorkflowManager(self, currentUser)
-
- # Start the workflow processing asynchronously
- # The workflow will be updated with progress data during execution
- asyncio.create_task(workflowManager.workflowProcess(userInput, workflow))
-
- return workflow
-
- except Exception as e:
- logger.error(f"Error starting workflow: {str(e)}")
- raise
-
- async def workflowStop(self, workflowId: str) -> ChatWorkflow:
- """
- Stops a running workflow (State 8: Workflow Stopped).
-
- Args:
- workflowId: ID of the workflow to stop
-
- Returns:
- Updated ChatWorkflow object
- """
- try:
- # Load workflow state
- workflow = self.getWorkflow(workflowId)
- if not workflow:
- raise ValueError(f"Workflow {workflowId} not found")
-
- # Update workflow status
- workflow.status = "stopped"
- workflow.lastActivity = get_utc_timestamp()
-
- # Update in database
- self.updateWorkflow(workflowId, {
- "status": "stopped",
- "lastActivity": workflow.lastActivity
- })
-
- # Add log entry
- self.createLog({
- "workflowId": workflowId,
- "message": "Workflow stopped",
- "type": "warning",
- "status": "stopped",
- "progress": 100
- })
-
- return workflow
-
- except Exception as e:
- logger.error(f"Error stopping workflow: {str(e)}")
- raise
def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects':
"""
diff --git a/modules/routes/routeAdmin.py b/modules/routes/routeAdmin.py
index 60663598..4ddfcf84 100644
--- a/modules/routes/routeAdmin.py
+++ b/modules/routes/routeAdmin.py
@@ -12,8 +12,6 @@ from modules.shared.configuration import APP_CONFIG
from modules.security.auth import limiter, getCurrentUser
from modules.interfaces.interfaceAppModel import User
from modules.interfaces.interfaceAppObjects import getRootInterface
-from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface
-from modules.interfaces.interfaceComponentObjects import getInterface as getComponentInterface
# Static folder setup - using absolute path from app root
baseDir = FilePath(__file__).parent.parent.parent # Go up to gateway root
@@ -31,43 +29,6 @@ router = APIRouter(
# Mount static files
router.mount("/static", StaticFiles(directory=str(staticFolder), html=True), name="static")
-def get_interface_for_database(database_name: str, currentUser: User):
- """
- Get the appropriate interface based on database name.
-
- Args:
- database_name: Name of the database
- currentUser: Current user for interface initialization
-
- Returns:
- Interface object for the specified database
-
- Raises:
- HTTPException: If database name is unknown or interface cannot be created
- """
- # Get database names from configuration
- appDbName = APP_CONFIG.get("DB_APP_DATABASE")
- chatDbName = APP_CONFIG.get("DB_CHAT_DATABASE")
- managementDbName = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
-
- if not appDbName:
- raise HTTPException(status_code=500, detail="DB_APP_DATABASE configuration is required")
-
- # Map database names to their corresponding interfaces
- if database_name == appDbName:
- return getRootInterface()
- elif chatDbName and database_name == chatDbName:
- return getChatInterface(currentUser)
- elif managementDbName and database_name == managementDbName:
- return getComponentInterface(currentUser)
- else:
- available_dbs = [appDbName]
- if chatDbName:
- available_dbs.append(chatDbName)
- if managementDbName:
- available_dbs.append(managementDbName)
- raise HTTPException(status_code=400, detail=f"Unknown database. Available: {', '.join(available_dbs)}")
-
@router.get("/")
@limiter.limit("30/minute")
async def root(request: Request) -> Dict[str, str]:
@@ -117,183 +78,3 @@ async def options_route(request: Request, fullPath: str) -> Response:
async def favicon(request: Request) -> FileResponse:
return FileResponse(str(staticFolder / "favicon.ico"), media_type="image/x-icon")
-# ----------------------
-# Log Management
-# ----------------------
-
-@router.get("/api/logs/app")
-@limiter.limit("10/minute")
-async def download_app_log(request: Request, currentUser: User = Depends(getCurrentUser)) -> FileResponse:
- """Download the current day's application log file"""
- # Check if user has admin privileges
- if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'):
- raise HTTPException(status_code=403, detail="Admin privileges required")
-
- # Get log directory from config
- logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR")
- if not logDir:
- raise HTTPException(status_code=500, detail="APP_LOGGING_LOG_DIR configuration is required")
-
- if not os.path.isabs(logDir):
- # If relative path, make it relative to the gateway directory
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- logDir = os.path.join(gatewayDir, logDir)
-
- # Get current date for log file
- today = datetime.now().strftime("%Y%m%d")
- logFile = os.path.join(logDir, f"log_app_{today}.log")
-
- if not os.path.exists(logFile):
- raise HTTPException(status_code=404, detail=f"Application log file for today not found: {logFile}")
-
- return FileResponse(
- path=logFile,
- filename=f"log_app_{today}.log",
- media_type="text/plain"
- )
-
-@router.get("/api/logs/audit")
-@limiter.limit("10/minute")
-async def download_audit_log(request: Request, currentUser: User = Depends(getCurrentUser)) -> FileResponse:
- """Download the current day's audit log file"""
- # Check if user has admin privileges
- if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'):
- raise HTTPException(status_code=403, detail="Admin privileges required")
-
- # Get log directory from config
- logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR")
- if not logDir:
- raise HTTPException(status_code=500, detail="APP_LOGGING_LOG_DIR configuration is required")
-
- if not os.path.isabs(logDir):
- # If relative path, make it relative to the gateway directory
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- logDir = os.path.join(gatewayDir, logDir)
-
- # Get current date for log file
- today = datetime.now().strftime("%Y%m%d")
- logFile = os.path.join(logDir, f"log_audit_{today}.log")
-
- if not os.path.exists(logFile):
- raise HTTPException(status_code=404, detail=f"Audit log file for today not found: {logFile}")
-
- return FileResponse(
- path=logFile,
- filename=f"log_audit_{today}.log",
- media_type="text/plain"
- )
-
-# ----------------------
-# Database Management
-# ----------------------
-
-@router.get("/api/databases")
-@limiter.limit("10/minute")
-async def list_databases(request: Request, currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
- """List available databases"""
- # Check if user has admin privileges
- if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'):
- raise HTTPException(status_code=403, detail="Admin privileges required")
-
- try:
- # Get configured database names from configuration
- databases = []
-
- # App database - required configuration
- appDb = APP_CONFIG.get("DB_APP_DATABASE")
- if not appDb:
- raise HTTPException(status_code=500, detail="DB_APP_DATABASE configuration is required")
- databases.append(appDb)
-
- # Chat database - optional configuration
- chatDb = APP_CONFIG.get("DB_CHAT_DATABASE")
- if chatDb and chatDb not in databases:
- databases.append(chatDb)
-
- # Management database - optional configuration
- managementDb = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
- if managementDb and managementDb not in databases:
- databases.append(managementDb)
-
- return {"databases": databases}
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error listing databases: {e}")
- raise HTTPException(status_code=500, detail="Failed to list databases")
-
-@router.get("/api/databases/{database_name}/tables")
-@limiter.limit("10/minute")
-async def list_tables(
- request: Request,
- database_name: str,
- currentUser: User = Depends(getCurrentUser)
-) -> Dict[str, Any]:
- """List tables in a specific database"""
- # Check if user has admin privileges
- if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'):
- raise HTTPException(status_code=403, detail="Admin privileges required")
-
- try:
- # Get the appropriate interface based on database name
- interface = get_interface_for_database(database_name, currentUser)
-
- # Check if interface and database connection exist
- if not interface or not interface.db:
- raise HTTPException(status_code=500, detail="Database interface not available")
-
- # Get tables from database
- tables = interface.db.getTables()
-
- return {"database": database_name, "tables": tables}
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error listing tables for database {database_name}: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to list tables for database {database_name}")
-
-@router.post("/api/databases/{database_name}/tables/drop")
-@limiter.limit("5/minute")
-async def drop_table(
- request: Request,
- database_name: str,
- currentUser: User = Depends(getCurrentUser),
- payload: Dict[str, Any] = Body(...)
-) -> Dict[str, Any]:
- """Drop a specific table from a database"""
- # Check if user has admin privileges
- if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'):
- raise HTTPException(status_code=403, detail="Admin privileges required")
-
- table_name = payload.get("table")
- if not table_name:
- raise HTTPException(status_code=400, detail="Table name is required")
-
- try:
- # Get the appropriate interface based on database name
- interface = get_interface_for_database(database_name, currentUser)
-
- # Check if interface and database connection exist
- if not interface or not interface.db:
- raise HTTPException(status_code=500, detail="Database interface not available")
-
- # Check if table exists
- tables = interface.db.getTables()
- if table_name not in tables:
- raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found in database '{database_name}'")
-
- # Drop the table
- with interface.db.connection.cursor() as cursor:
- cursor.execute(f'DROP TABLE IF EXISTS "{table_name}" CASCADE')
- interface.db.connection.commit()
-
- logger.warning(f"Admin drop_table executed by {currentUser.id}: dropped table '{table_name}' from database '{database_name}'")
- return {"message": f"Table '{table_name}' dropped successfully from database '{database_name}'"}
-
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error dropping table {table_name} from database {database_name}: {e}")
- if 'interface' in locals() and interface.db.connection:
- interface.db.connection.rollback()
- raise HTTPException(status_code=500, detail=f"Failed to drop table '{table_name}' from database '{database_name}'")
diff --git a/modules/routes/routeChatPlayground.py b/modules/routes/routeChatPlayground.py
new file mode 100644
index 00000000..24bc91a3
--- /dev/null
+++ b/modules/routes/routeChatPlayground.py
@@ -0,0 +1,132 @@
+"""
+Chat Playground routes for the backend API.
+Implements the endpoints for chat playground workflow management.
+"""
+
+import logging
+from typing import Optional, Dict, Any
+from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request
+from datetime import datetime
+
+# Import auth modules
+from modules.security.auth import limiter, getCurrentUser
+
+# Import interfaces
+import modules.interfaces.interfaceChatObjects as interfaceChatObjects
+from modules.interfaces.interfaceChatObjects import getInterface
+
+# Import models
+from modules.interfaces.interfaceChatModel import (
+ ChatWorkflow,
+ UserInputRequest
+)
+from modules.interfaces.interfaceAppModel import User
+
+# Import workflow control functions
+from modules.features.chatPlayground.mainChatPlayground import chatStart, chatStop
+
+# Configure logger
+logger = logging.getLogger(__name__)
+
+# Create router for chat playground endpoints
+router = APIRouter(
+ prefix="/api/chat/playground",
+ tags=["Chat Playground"],
+ responses={404: {"description": "Not found"}}
+)
+
+def getServiceChat(currentUser: User):
+ return interfaceChatObjects.getInterface(currentUser)
+
+# Workflow start endpoint
+@router.post("/start", response_model=ChatWorkflow)
+@limiter.limit("120/minute")
+async def start_workflow(
+ request: Request,
+ workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"),
+ userInput: UserInputRequest = Body(...),
+ currentUser: User = Depends(getCurrentUser)
+) -> ChatWorkflow:
+ """
+ Starts a new workflow or continues an existing one.
+ Corresponds to State 1 in the state machine documentation.
+ """
+ try:
+ # Get service center
+ interfaceChat = getServiceChat(currentUser)
+
+ # Start or continue workflow using playground controller
+ workflow = await chatStart(interfaceChat, currentUser, userInput, workflowId)
+
+ return workflow
+
+ except Exception as e:
+ logger.error(f"Error in start_workflow: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=str(e)
+ )
+
+# State 8: Workflow Stopped endpoint
+@router.post("/{workflowId}/stop", response_model=ChatWorkflow)
+@limiter.limit("120/minute")
+async def stop_workflow(
+ request: Request,
+ workflowId: str = Path(..., description="ID of the workflow to stop"),
+ currentUser: User = Depends(getCurrentUser)
+) -> ChatWorkflow:
+ """Stops a running workflow."""
+ try:
+ # Get service center
+ interfaceChat = getServiceChat(currentUser)
+
+ # Stop workflow using playground controller
+ workflow = await chatStop(interfaceChat, currentUser, workflowId)
+
+ return workflow
+
+ except Exception as e:
+ logger.error(f"Error in stop_workflow: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=str(e)
+ )
+
+# Unified Chat Data Endpoint for Polling
+@router.get("/{workflowId}/chatData")
+@limiter.limit("120/minute")
+async def get_workflow_chat_data(
+ request: Request,
+ workflowId: str = Path(..., description="ID of the workflow"),
+ afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"),
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ """
+ Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer.
+ Returns all data types in chronological order based on _createdAt timestamp.
+ """
+ try:
+ # Get service center
+ interfaceChat = getServiceChat(currentUser)
+
+ # Verify workflow exists
+ workflow = interfaceChat.getWorkflow(workflowId)
+ if not workflow:
+ raise HTTPException(
+ status_code=404,
+ detail=f"Workflow with ID {workflowId} not found"
+ )
+
+ # Get unified chat data using the new method
+ chatData = interfaceChat.getUnifiedChatData(workflowId, afterTimestamp)
+
+ return chatData
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True)
+ raise HTTPException(
+ status_code=500,
+ detail=f"Error getting unified chat data: {str(e)}"
+ )
diff --git a/modules/routes/routeDataNeutralization.py b/modules/routes/routeDataNeutralization.py
index 697c6f1c..939c4422 100644
--- a/modules/routes/routeDataNeutralization.py
+++ b/modules/routes/routeDataNeutralization.py
@@ -7,7 +7,7 @@ from modules.security.auth import limiter, getCurrentUser
# Import interfaces
from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes
-from modules.features.featureNeutralizePlayground import NeutralizationService
+from modules.features.neutralizePlayground.mainNeutralizePlayground import NeutralizationService
# Configure logger
logger = logging.getLogger(__name__)
diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py
index fe70e347..7b9dd8f9 100644
--- a/modules/routes/routeWorkflows.py
+++ b/modules/routes/routeWorkflows.py
@@ -24,13 +24,13 @@ from modules.interfaces.interfaceChatModel import (
ChatMessage,
ChatLog,
ChatStat,
- ChatDocument,
- UserInputRequest
+ ChatDocument
)
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse
from modules.interfaces.interfaceAppModel import User
from modules.shared.timezoneUtils import get_utc_timestamp
+
# Configure logger
logger = logging.getLogger(__name__)
@@ -276,59 +276,6 @@ async def get_workflow_messages(
detail=f"Error getting workflow messages: {str(e)}"
)
-# State 1: Workflow Initialization endpoint
-@router.post("/start", response_model=ChatWorkflow)
-@limiter.limit("120/minute")
-async def start_workflow(
- request: Request,
- workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"),
- userInput: UserInputRequest = Body(...),
- currentUser: User = Depends(getCurrentUser)
-) -> ChatWorkflow:
- """
- Starts a new workflow or continues an existing one.
- Corresponds to State 1 in the state machine documentation.
- """
- try:
- # Get service center
- interfaceChat = getServiceChat(currentUser)
-
- # Start or continue workflow using ChatObjects
- workflow = await interfaceChat.workflowStart(currentUser, userInput, workflowId)
-
- return workflow
-
- except Exception as e:
- logger.error(f"Error in start_workflow: {str(e)}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=str(e)
- )
-
-# State 8: Workflow Stopped endpoint
-@router.post("/{workflowId}/stop", response_model=ChatWorkflow)
-@limiter.limit("120/minute")
-async def stop_workflow(
- request: Request,
- workflowId: str = Path(..., description="ID of the workflow to stop"),
- currentUser: User = Depends(getCurrentUser)
-) -> ChatWorkflow:
- """Stops a running workflow."""
- try:
- # Get service center
- interfaceChat = getServiceChat(currentUser)
-
- # Stop workflow using ChatObjects
- workflow = await interfaceChat.workflowStop(workflowId)
-
- return workflow
-
- except Exception as e:
- logger.error(f"Error in stop_workflow: {str(e)}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=str(e)
- )
# State 11: Workflow Reset/Deletion endpoint
@router.delete("/{workflowId}", response_model=Dict[str, Any])
@@ -383,45 +330,6 @@ async def delete_workflow(
)
-# Unified Chat Data Endpoint for Polling
-@router.get("/{workflowId}/chatData")
-@limiter.limit("120/minute")
-async def get_workflow_chat_data(
- request: Request,
- workflowId: str = Path(..., description="ID of the workflow"),
- afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"),
- currentUser: User = Depends(getCurrentUser)
-) -> Dict[str, Any]:
- """
- Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer.
- Returns all data types in chronological order based on _createdAt timestamp.
- """
- try:
- # Get service center
- interfaceChat = getServiceChat(currentUser)
-
- # Verify workflow exists
- workflow = interfaceChat.getWorkflow(workflowId)
- if not workflow:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Workflow with ID {workflowId} not found"
- )
-
- # Get unified chat data using the new method
- chatData = interfaceChat.getUnifiedChatData(workflowId, afterTimestamp)
-
- return chatData
-
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True)
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Error getting unified chat data: {str(e)}"
- )
-
# Document Management Endpoints
@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any])
diff --git a/modules/chat/serviceCenter.py b/modules/services/serviceCenter.py
similarity index 99%
rename from modules/chat/serviceCenter.py
rename to modules/services/serviceCenter.py
index 55648ead..85b04ed4 100644
--- a/modules/chat/serviceCenter.py
+++ b/modules/services/serviceCenter.py
@@ -13,9 +13,9 @@ from modules.interfaces.interfaceChatObjects import getInterface as getChatObjec
from modules.interfaces.interfaceChatModel import ActionResult
from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects
from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects
-from modules.chat.documents.documentExtraction import DocumentExtraction
-from modules.chat.documents.documentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
-from modules.methods.methodBase import MethodBase
+from modules.services.serviceDocument.documentExtraction import DocumentExtraction
+from modules.services.serviceDocument.documentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
+from modules.workflows.methods.methodBase import MethodBase
from modules.shared.timezoneUtils import get_utc_timestamp
import uuid
@@ -57,7 +57,7 @@ class ServiceCenter:
if not isPkg and name.startswith('method'):
try:
# Import the module
- module = importlib.import_module(f'modules.methods.{name}')
+ module = importlib.import_module(f'modules.workflows.methods.{name}')
# Find all classes in the module that inherit from MethodBase
for itemName, item in inspect.getmembers(module):
diff --git a/modules/chat/documents/documentExtraction.py b/modules/services/serviceDocument/documentExtraction.py
similarity index 99%
rename from modules/chat/documents/documentExtraction.py
rename to modules/services/serviceDocument/documentExtraction.py
index b6165b9e..0a73e46a 100644
--- a/modules/chat/documents/documentExtraction.py
+++ b/modules/services/serviceDocument/documentExtraction.py
@@ -9,7 +9,7 @@ from pathlib import Path
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
import uuid
-from modules.chat.documents.documentUtility import (
+from modules.services.serviceDocument.documentUtility import (
getFileExtension,
getMimeTypeFromExtension,
detectMimeTypeFromContent,
@@ -22,7 +22,7 @@ from modules.interfaces.interfaceChatModel import (
ContentItem,
ContentMetadata
)
-from modules.neutralizer.neutralizer import DataAnonymizer
+from modules.services.serviceNeutralization.neutralizer import DataAnonymizer
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
diff --git a/modules/chat/documents/documentGeneration.py b/modules/services/serviceDocument/documentGeneration.py
similarity index 99%
rename from modules/chat/documents/documentGeneration.py
rename to modules/services/serviceDocument/documentGeneration.py
index 2d844ed3..16ab7b16 100644
--- a/modules/chat/documents/documentGeneration.py
+++ b/modules/services/serviceDocument/documentGeneration.py
@@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional
from datetime import datetime, UTC
import re
from modules.shared.timezoneUtils import get_utc_timestamp
-from modules.chat.documents.documentUtility import (
+from modules.services.serviceDocument.documentUtility import (
getFileExtension,
getMimeTypeFromExtension,
detectMimeTypeFromContent,
diff --git a/modules/chat/documents/documentUtility.py b/modules/services/serviceDocument/documentUtility.py
similarity index 100%
rename from modules/chat/documents/documentUtility.py
rename to modules/services/serviceDocument/documentUtility.py
diff --git a/modules/neutralizer/neutralizer.py b/modules/services/serviceNeutralization/neutralizer.py
similarity index 87%
rename from modules/neutralizer/neutralizer.py
rename to modules/services/serviceNeutralization/neutralizer.py
index f8677465..e284ae00 100644
--- a/modules/neutralizer/neutralizer.py
+++ b/modules/services/serviceNeutralization/neutralizer.py
@@ -8,12 +8,12 @@ import logging
from typing import Dict, List, Any
# Import all necessary classes and functions
-from modules.neutralizer.subProcessCommon import ProcessResult, CommonUtils
-from modules.neutralizer.subProcessText import TextProcessor, PlainText
-from modules.neutralizer.subProcessList import ListProcessor, TableData
-from modules.neutralizer.subProcessBinary import BinaryProcessor, BinaryData
-from modules.neutralizer.subParseString import StringParser
-from modules.neutralizer.subPatterns import Pattern, HeaderPatterns, DataPatterns, TextTablePatterns
+from modules.services.serviceNeutralization.subProcessCommon import ProcessResult, CommonUtils
+from modules.services.serviceNeutralization.subProcessText import TextProcessor, PlainText
+from modules.services.serviceNeutralization.subProcessList import ListProcessor, TableData
+from modules.services.serviceNeutralization.subProcessBinary import BinaryProcessor, BinaryData
+from modules.services.serviceNeutralization.subParseString import StringParser
+from modules.services.serviceNeutralization.subPatterns import Pattern, HeaderPatterns, DataPatterns, TextTablePatterns
# Configure logging
logger = logging.getLogger(__name__)
diff --git a/modules/neutralizer/readme.md b/modules/services/serviceNeutralization/readme.md
similarity index 100%
rename from modules/neutralizer/readme.md
rename to modules/services/serviceNeutralization/readme.md
diff --git a/modules/neutralizer/subParseString.py b/modules/services/serviceNeutralization/subParseString.py
similarity index 98%
rename from modules/neutralizer/subParseString.py
rename to modules/services/serviceNeutralization/subParseString.py
index a2b39333..fd9f54cc 100644
--- a/modules/neutralizer/subParseString.py
+++ b/modules/services/serviceNeutralization/subParseString.py
@@ -6,7 +6,7 @@ Handles pattern matching and replacement for emails, phones, addresses, IDs and
import re
import uuid
from typing import Dict, List, Tuple, Any
-from modules.neutralizer.subPatterns import DataPatterns, find_patterns_in_text
+from modules.services.serviceNeutralization.subPatterns import DataPatterns, find_patterns_in_text
class StringParser:
"""Handles string parsing and replacement operations"""
diff --git a/modules/neutralizer/subPatterns.py b/modules/services/serviceNeutralization/subPatterns.py
similarity index 100%
rename from modules/neutralizer/subPatterns.py
rename to modules/services/serviceNeutralization/subPatterns.py
diff --git a/modules/neutralizer/subProcessBinary.py b/modules/services/serviceNeutralization/subProcessBinary.py
similarity index 100%
rename from modules/neutralizer/subProcessBinary.py
rename to modules/services/serviceNeutralization/subProcessBinary.py
diff --git a/modules/neutralizer/subProcessCommon.py b/modules/services/serviceNeutralization/subProcessCommon.py
similarity index 100%
rename from modules/neutralizer/subProcessCommon.py
rename to modules/services/serviceNeutralization/subProcessCommon.py
diff --git a/modules/neutralizer/subProcessList.py b/modules/services/serviceNeutralization/subProcessList.py
similarity index 96%
rename from modules/neutralizer/subProcessList.py
rename to modules/services/serviceNeutralization/subProcessList.py
index 58981333..e4ac91f7 100644
--- a/modules/neutralizer/subProcessList.py
+++ b/modules/services/serviceNeutralization/subProcessList.py
@@ -9,8 +9,8 @@ import xml.etree.ElementTree as ET
from typing import Dict, List, Any, Union
from dataclasses import dataclass
from io import StringIO
-from modules.neutralizer.subParseString import StringParser
-from modules.neutralizer.subPatterns import get_pattern_for_header, HeaderPatterns
+from modules.services.serviceNeutralization.subParseString import StringParser
+from modules.services.serviceNeutralization.subPatterns import get_pattern_for_header, HeaderPatterns
@dataclass
class TableData:
@@ -156,7 +156,7 @@ class ListProcessor:
processed_attrs[attr_name] = self.string_parser.mapping[attr_value]
else:
# Check if attribute value matches any data patterns
- from modules.neutralizer.subPatterns import find_patterns_in_text, DataPatterns
+ from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns
matches = find_patterns_in_text(attr_value, DataPatterns.patterns)
if matches:
pattern_name = matches[0][0]
@@ -191,7 +191,7 @@ class ListProcessor:
# Skip if already a placeholder
if not self.string_parser.is_placeholder(text):
# Check if text matches any patterns
- from modules.neutralizer.subPatterns import find_patterns_in_text, DataPatterns
+ from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns
pattern_matches = find_patterns_in_text(text, DataPatterns.patterns)
if pattern_matches:
diff --git a/modules/neutralizer/subProcessText.py b/modules/services/serviceNeutralization/subProcessText.py
similarity index 97%
rename from modules/neutralizer/subProcessText.py
rename to modules/services/serviceNeutralization/subProcessText.py
index c9ad872f..20dfe291 100644
--- a/modules/neutralizer/subProcessText.py
+++ b/modules/services/serviceNeutralization/subProcessText.py
@@ -5,7 +5,7 @@ Handles plain text processing without header information
from typing import Dict, List, Any
from dataclasses import dataclass
-from modules.neutralizer.subParseString import StringParser
+from modules.services.serviceNeutralization.subParseString import StringParser
@dataclass
class PlainText:
diff --git a/modules/chat/handling/executionState.py b/modules/workflows/_transfer/executionState.py
similarity index 64%
rename from modules/chat/handling/executionState.py
rename to modules/workflows/_transfer/executionState.py
index 1f806745..1d9b1963 100644
--- a/modules/chat/handling/executionState.py
+++ b/modules/workflows/_transfer/executionState.py
@@ -1,5 +1,5 @@
# executionState.py
-# Contains all execution state management logic extracted from managerChat.py
+# Contains all execution state management logic
import logging
from typing import List
@@ -18,6 +18,9 @@ class TaskExecutionState:
self.current_action_index = 0
self.retry_count = 0
self.max_retries = 3
+ # Iterative loop (react mode)
+ self.current_step = 0
+ self.max_steps = 5
def addSuccessfulAction(self, action_result: ActionResult):
"""Add a successful action to the state"""
@@ -52,4 +55,26 @@ class TaskExecutionState:
patterns.append("format_issues")
elif "permission" in error or "access denied" in error:
patterns.append("permission_issues")
- return list(set(patterns))
\ No newline at end of file
+ return list(set(patterns))
+
+def should_continue(observation, review=None, current_step: int = 0, max_steps: int = 5) -> bool:
+ """Helper to decide if the iterative loop should continue
+ - Stop if review indicates 'stop' or success criteria are met
+ - Stop on failure with no retry path
+ - Stop if max steps reached
+ """
+ try:
+ if current_step >= max_steps:
+ return False
+ if review and isinstance(review, dict):
+ decision = review.get('decision') or review.get('status')
+ if decision in ('stop', 'success'):
+ return False
+ # If observation exists but indicates hard failure with no documents repeatedly
+ if observation and isinstance(observation, dict):
+ if observation.get('success') is False and observation.get('documentsCount', 0) == 0:
+ # allow next step once; the caller can cap by max_steps
+ return True
+ return True
+ except Exception:
+ return False
\ No newline at end of file
diff --git a/modules/chat/handling/handlingTasks.py b/modules/workflows/_transfer/handlingTasks.py
similarity index 89%
rename from modules/chat/handling/handlingTasks.py
rename to modules/workflows/_transfer/handlingTasks.py
index 98cca8bb..5346e2d1 100644
--- a/modules/chat/handling/handlingTasks.py
+++ b/modules/workflows/_transfer/handlingTasks.py
@@ -12,13 +12,16 @@ from modules.interfaces.interfaceChatModel import (
)
from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects
from modules.shared.timezoneUtils import get_utc_timestamp
-from modules.chat.handling.executionState import TaskExecutionState
-from modules.chat.handling.promptFactory import (
+from modules.workflows._transfer.executionState import TaskExecutionState
+from modules.workflows._transfer.promptFactory import (
createTaskPlanningPrompt,
createActionDefinitionPrompt,
- createResultReviewPrompt
+ createResultReviewPrompt,
+ createActionSelectionPrompt,
+ createActionParameterPrompt,
+ createRefinementPrompt
)
-from modules.chat.documents.documentGeneration import DocumentGenerator
+from modules.services.serviceDocument.documentGeneration import DocumentGenerator
import uuid
logger = logging.getLogger(__name__)
@@ -32,7 +35,7 @@ class HandlingTasks:
self.chatInterface = chatInterface
self.currentUser = currentUser
self.workflow = workflow
- from modules.chat.serviceCenter import ServiceCenter
+ from modules.services.serviceCenter import ServiceCenter
self.service = ServiceCenter(currentUser, workflow)
self.documentGenerator = DocumentGenerator(self.service)
@@ -430,8 +433,95 @@ class HandlingTasks:
logger.error(f"Error in generateTaskActions: {str(e)}")
return []
+ # ===== React-mode iterative functions =====
+
+ async def plan_select(self, context: TaskContext) -> Dict[str, Any]:
+ """Plan: select exactly one action. Returns {"action": {method, name}}"""
+ prompt = createActionSelectionPrompt(context, self.service)
+ self.service.writeTraceLog("React Plan Selection Prompt", prompt)
+ response = await self.service.callAiTextAdvanced(prompt)
+ self.service.writeTraceLog("React Plan Selection Response", response)
+ json_start = response.find('{') if response else -1
+ json_end = response.rfind('}') + 1 if response else 0
+ if json_start == -1 or json_end == 0:
+ raise ValueError("No JSON in selection response")
+ selection = json.loads(response[json_start:json_end])
+ if 'action' not in selection or not isinstance(selection['action'], dict):
+ raise ValueError("Selection missing 'action'")
+ return selection
+
+ async def act_execute(self, context: TaskContext, selection: Dict[str, Any], task_step: TaskStep, workflow, step_index: int) -> ActionResult:
+ """Act: request minimal parameters then execute selected action."""
+ action = selection.get('action', {})
+ params_prompt = createActionParameterPrompt(context, action, self.service)
+ self.service.writeTraceLog("React Parameters Prompt", params_prompt)
+ params_resp = await self.service.callAiTextAdvanced(params_prompt)
+ self.service.writeTraceLog("React Parameters Response", params_resp)
+ js = params_resp[params_resp.find('{'):params_resp.rfind('}')+1] if params_resp else '{}'
+ try:
+ param_obj = json.loads(js)
+ except Exception:
+ param_obj = {"parameters": {}}
+ parameters = param_obj.get('parameters', {}) if isinstance(param_obj, dict) else {}
+
+ # Apply minimal defaults in-code (language)
+ if 'language' not in parameters and hasattr(self.service, 'user') and getattr(self.service.user, 'language', None):
+ parameters['language'] = self.service.user.language
+
+ # Build a synthetic TaskAction for execution routing and labels
+ current_round = getattr(self.workflow, 'currentRound', 0)
+ current_task = getattr(self.workflow, 'currentTask', 0)
+ result_label = f"round{current_round}_task{current_task}_action{step_index}_results"
+ task_action = self.createTaskAction({
+ "execMethod": action.get('method', ''),
+ "execAction": action.get('name', ''),
+ "execParameters": parameters,
+ "execResultLabel": result_label,
+ "status": TaskStatus.PENDING
+ })
+ # Execute using existing single action flow
+ return await self.executeSingleAction(task_action, workflow, task_step, current_task, step_index, 1)
+
+ def observe_build(self, action_result: ActionResult) -> Dict[str, Any]:
+ """Observe: build compact observation object from ActionResult"""
+ previews = []
+ if action_result and action_result.documents:
+ for doc in action_result.documents[:5]:
+ name = getattr(doc, 'documentName', '')
+ mime = getattr(doc, 'mimeType', '')
+ snippet = ''
+ data = getattr(doc, 'documentData', None)
+ if isinstance(data, str):
+ snippet = data[:200]
+ elif isinstance(data, dict):
+ snippet = str(data)[:200]
+ previews.append({"name": name, "mime": mime, "snippet": snippet})
+ observation = {
+ "success": bool(action_result.success),
+ "resultLabel": action_result.resultLabel or "",
+ "documentsCount": len(action_result.documents) if action_result.documents else 0,
+ "previews": previews,
+ "notes": []
+ }
+ return observation
+
+ async def refine_decide(self, context: TaskContext, observation: Dict[str, Any]) -> Dict[str, Any]:
+ """Refine: decide continue or stop, with reason"""
+ prompt = createRefinementPrompt(context, observation)
+ self.service.writeTraceLog("React Refinement Prompt", prompt)
+ resp = await self.service.callAiTextAdvanced(prompt)
+ self.service.writeTraceLog("React Refinement Response", resp)
+ js = resp[resp.find('{'):resp.rfind('}')+1] if resp else '{}'
+ try:
+ decision = json.loads(js)
+ except Exception:
+ decision = {"decision": "continue", "reason": "default"}
+ return decision
+
async def executeTask(self, task_step, workflow, context, task_index=None, total_tasks=None) -> TaskResult:
- """Execute all actions for a task step, with state management and retries."""
+ """Execute all actions for a task step, with state management and retries.
+ When workflow.workflowMode is 'React', run compact plan–act–observe–refine loop.
+ """
logger.info(f"=== STARTING TASK {task_index or '?'}: {task_step.objective} ===")
# PHASE 4: Update workflow object before executing task
@@ -476,6 +566,70 @@ class HandlingTasks:
logger.info(f"Task start message created for task {task_index}")
state = TaskExecutionState(task_step)
+ # React mode path - check workflow mode instead of context
+ if isinstance(context, TaskContext) and hasattr(context, 'workflow') and context.workflow and getattr(context.workflow, 'workflowMode', 'Actionplan') == 'React':
+ state.max_steps = max(1, int(getattr(context.workflow, 'maxSteps', 5)))
+ step = 1
+ last_review_dict = None
+ while step <= state.max_steps:
+ self._checkWorkflowStopped()
+ # Update workflow[currentAction] for UI
+ self.updateWorkflowBeforeExecutingAction(step)
+ self.service.setWorkflowContext(action_number=step)
+ try:
+ t0 = time.time()
+ selection = await self.plan_select(context)
+ result = await self.act_execute(context, selection, task_step, workflow, step)
+ observation = self.observe_build(result)
+ # Attach deterministic label for clarity
+ observation['resultLabel'] = result.resultLabel
+ decision = await self.refine_decide(context, observation)
+ # Telemetry: simple duration per step
+ duration = time.time() - t0
+ self.chatInterface.createLog({
+ "workflowId": workflow.id,
+ "message": f"react_step_duration_sec={duration:.3f}",
+ "type": "info"
+ })
+ last_review_dict = decision
+ # Simple messaging per iteration
+ msg = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": f"🔁 Step {step}/{state.max_steps}: {selection.get('action',{}).get('method','')}.{selection.get('action',{}).get('name','')} → {'✅' if result.success else '❌'}",
+ "status": "step",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": get_utc_timestamp(),
+ "documentsLabel": observation.get('resultLabel'),
+ "documents": [],
+ "roundNumber": workflow.currentRound,
+ "taskNumber": task_index,
+ "actionNumber": step,
+ "actionProgress": "success" if result.success else "fail"
+ }
+ self.chatInterface.createMessage(msg)
+ except Exception as e:
+ logger.error(f"React step {step} error: {e}")
+ break
+
+ from modules.workflows._transfer.executionState import should_continue
+ if not should_continue(observation, last_review_dict, step, state.max_steps):
+ break
+ step += 1
+
+ # Summarize task result for react mode
+ status = TaskStatus.COMPLETED
+ success = True
+ feedback = last_review_dict.get('reason') if isinstance(last_review_dict, dict) else 'Completed'
+ if isinstance(last_review_dict, dict) and last_review_dict.get('decision') == 'stop':
+ success = True
+ return TaskResult(
+ taskId=task_step.id,
+ status=status,
+ success=success,
+ feedback=feedback,
+ error=None if success else feedback
+ )
retry_context = context
max_retries = state.max_retries
for attempt in range(max_retries):
@@ -1511,4 +1665,4 @@ class HandlingTasks:
logger.info("Workflow reset for new session - all values set to initial state and updated in database")
except Exception as e:
- logger.error(f"Error resetting workflow for new session: {str(e)}")
\ No newline at end of file
+ logger.error(f"Error resetting workflow for new session: {str(e)}")
diff --git a/modules/chat/handling/promptFactory.py b/modules/workflows/_transfer/promptFactory.py
similarity index 91%
rename from modules/chat/handling/promptFactory.py
rename to modules/workflows/_transfer/promptFactory.py
index c15979e2..3cf3f5b5 100644
--- a/modules/chat/handling/promptFactory.py
+++ b/modules/workflows/_transfer/promptFactory.py
@@ -1,16 +1,16 @@
# promptFactory.py
-# Contains all prompt creation functions extracted from managerChat.py
+# Contains all prompt creation functions
import json
import logging
from typing import Any, Dict, List
from modules.interfaces.interfaceChatModel import TaskContext, ReviewContext
-from modules.chat.documents.documentUtility import getFileExtension
+from modules.services.serviceDocument.documentUtility import getFileExtension
# Set up logger
logger = logging.getLogger(__name__)
-# Prompt creation helpers extracted from managerChat.py
+# Prompt creation helpers
def _getAvailableDocuments(workflow) -> str:
"""
@@ -831,3 +831,93 @@ USER LANGUAGE: {user_language} - All user messages must be generated in this lan
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
return prompt
+
+# ===== New compact prompts for React-style workflow =====
+
+def _build_tiny_catalog(service) -> str:
+ """Return minimal tool catalog: method -> { action -> [paramNames] }"""
+ try:
+ method_signatures = service.getMethodsList()
+ except Exception:
+ method_signatures = []
+ catalog: Dict[str, Dict[str, List[str]]] = {}
+ for sig in method_signatures:
+ if '.' not in sig or '(' not in sig or ')' not in sig:
+ continue
+ method, rest = sig.split('.', 1)
+ action = rest.split('(')[0]
+ params_str = rest[rest.find('(')+1:rest.find(')')].strip()
+ param_names = []
+ if params_str:
+ for p in params_str.split(','):
+ name = p.strip().split(':')[0].split('=')[0].strip()
+ if name:
+ param_names.append(name)
+ catalog.setdefault(method, {})[action] = param_names
+ return json.dumps(catalog, separators=(',', ':'), ensure_ascii=False)
+
+def createActionSelectionPrompt(context: TaskContext, service) -> str:
+ """Prompt that returns exactly one action selection: {"action":{"method":"..","name":".."}}"""
+ user_language = service.user.language if service and service.user else 'en'
+ tiny_catalog = _build_tiny_catalog(service)
+ objective = context.task_step.objective if context and context.task_step else ''
+ available_docs = _getAvailableDocuments(context.workflow) if context and context.workflow else "No documents available"
+ return f"""Select exactly one action to advance the task.
+
+OBJECTIVE: {objective}
+AVAILABLE DOCUMENTS: {available_docs}
+USER LANGUAGE: {user_language}
+
+MINIMAL TOOL CATALOG (method -> action -> [parameterNames]):
+{tiny_catalog}
+
+BUSINESS RULES:
+- Pick exactly one action per step.
+- Derive choice from objective and success criteria.
+- Prefer user language.
+- Keep it minimal; avoid provider specifics.
+
+RESPONSE FORMAT (JSON only):
+{{"action":{{"method":"web","name":"search"}}}}
+"""
+
+def createActionParameterPrompt(context: TaskContext, selected_action: Dict[str, str], service=None) -> str:
+ """Prompt that returns only parameters for the selected action: {"parameters":{...}}"""
+ user_language = service.user.language if service and service.user else 'en'
+ method = selected_action.get('method', '') if selected_action else ''
+ name = selected_action.get('name', '') if selected_action else ''
+ available_docs = _getAvailableDocuments(context.workflow) if context and context.workflow else "No documents available"
+ return f"""Provide only the required parameters for this action.
+
+SELECTED ACTION: {method}.{name}
+OBJECTIVE: {context.task_step.objective if context and context.task_step else ''}
+AVAILABLE DOCUMENTS: {available_docs}
+USER LANGUAGE: {user_language}
+
+RULES:
+- Return only the parameters object.
+- Include user language if relevant.
+- Reference documents only by exact labels available.
+- Avoid unnecessary fields; host applies defaults.
+
+RESPONSE FORMAT (JSON only):
+{{"parameters":{{}}}}
+"""
+
+def createRefinementPrompt(context: TaskContext, observation: Dict[str, Any]) -> str:
+ """Prompt that decides to continue or stop based on observation: {"decision":"continue|stop","reason":".."} """
+ user_language = context.workflow.messages[-1].role if False else (getattr(context.workflow, 'user_language', None) or (getattr(context.workflow, 'language', None))) # not used, keep minimal
+ objective = context.task_step.objective if context and context.task_step else ''
+ return f"""Decide next step based on observation.
+
+OBJECTIVE: {objective}
+OBSERVATION:
+{json.dumps(observation, ensure_ascii=False)}
+
+RULES:
+- If criteria are met or no further action helps, decide stop.
+- Else decide continue.
+
+RESPONSE FORMAT (JSON only):
+{{"decision":"continue","reason":"Need more data"}}
+"""
\ No newline at end of file
diff --git a/modules/methods/methodAi.py b/modules/workflows/methods/methodAi.py
similarity index 99%
rename from modules/methods/methodAi.py
rename to modules/workflows/methods/methodAi.py
index f947db83..ffd9d58e 100644
--- a/modules/methods/methodAi.py
+++ b/modules/workflows/methods/methodAi.py
@@ -7,7 +7,7 @@ import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
-from modules.methods.methodBase import MethodBase, action
+from modules.workflows.methods.methodBase import MethodBase, action
from modules.interfaces.interfaceChatModel import ActionResult
from modules.shared.timezoneUtils import get_utc_timestamp
diff --git a/modules/methods/methodBase.py b/modules/workflows/methods/methodBase.py
similarity index 100%
rename from modules/methods/methodBase.py
rename to modules/workflows/methods/methodBase.py
diff --git a/modules/methods/methodDocument.py b/modules/workflows/methods/methodDocument.py
similarity index 99%
rename from modules/methods/methodDocument.py
rename to modules/workflows/methods/methodDocument.py
index 54f45cb9..23fc8b10 100644
--- a/modules/methods/methodDocument.py
+++ b/modules/workflows/methods/methodDocument.py
@@ -8,7 +8,7 @@ import os
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
-from modules.methods.methodBase import MethodBase, action
+from modules.workflows.methods.methodBase import MethodBase, action
from modules.interfaces.interfaceChatModel import ActionResult
from modules.shared.timezoneUtils import get_utc_timestamp
diff --git a/modules/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py
similarity index 99%
rename from modules/methods/methodOutlook.py
rename to modules/workflows/methods/methodOutlook.py
index 658b3982..6e2c6440 100644
--- a/modules/methods/methodOutlook.py
+++ b/modules/workflows/methods/methodOutlook.py
@@ -81,7 +81,7 @@ from datetime import datetime, UTC
import json
import uuid
-from modules.methods.methodBase import MethodBase, action
+from modules.workflows.methods.methodBase import MethodBase, action
from modules.interfaces.interfaceChatModel import ActionResult
from modules.interfaces.interfaceAppModel import ConnectionStatus
from modules.shared.timezoneUtils import get_utc_timestamp
diff --git a/modules/methods/methodSharepoint.py b/modules/workflows/methods/methodSharepoint.py
similarity index 99%
rename from modules/methods/methodSharepoint.py
rename to modules/workflows/methods/methodSharepoint.py
index bcb92e0b..d474992e 100644
--- a/modules/methods/methodSharepoint.py
+++ b/modules/workflows/methods/methodSharepoint.py
@@ -13,7 +13,7 @@ from urllib.parse import urlparse
import aiohttp
import asyncio
-from modules.methods.methodBase import MethodBase, action
+from modules.workflows.methods.methodBase import MethodBase, action
from modules.interfaces.interfaceChatModel import ActionResult
from modules.shared.timezoneUtils import get_utc_timestamp
diff --git a/modules/methods/methodWeb.py b/modules/workflows/methods/methodWeb.py
similarity index 99%
rename from modules/methods/methodWeb.py
rename to modules/workflows/methods/methodWeb.py
index 409b7151..0de6d26c 100644
--- a/modules/methods/methodWeb.py
+++ b/modules/workflows/methods/methodWeb.py
@@ -2,7 +2,7 @@ import logging
import csv
import io
from typing import Any, Dict
-from modules.methods.methodBase import MethodBase, action
+from modules.workflows.methods.methodBase import MethodBase, action
from modules.interfaces.interfaceChatModel import ActionResult, ActionDocument
from modules.interfaces.interfaceWebObjects import WebInterface
from modules.interfaces.interfaceWebModel import (
diff --git a/modules/features/featureChatPlayground.py b/modules/workflows/workflowManager.py
similarity index 55%
rename from modules/features/featureChatPlayground.py
rename to modules/workflows/workflowManager.py
index 85d8c2d4..0d34b5b9 100644
--- a/modules/features/featureChatPlayground.py
+++ b/modules/workflows/workflowManager.py
@@ -8,8 +8,7 @@ from modules.interfaces.interfaceAppObjects import User
from modules.interfaces.interfaceChatModel import (UserInputRequest, ChatMessage, ChatWorkflow, TaskItem, TaskStatus)
from modules.interfaces.interfaceChatObjects import ChatObjects
-from modules.chat.managerChat import ChatManager
-from modules.chat.handling.handlingTasks import WorkflowStoppedException
+from modules.workflows._transfer.handlingTasks import HandlingTasks, WorkflowStoppedException
from modules.interfaces.interfaceChatModel import WorkflowResult
from modules.shared.timezoneUtils import get_utc_timestamp
@@ -20,125 +19,135 @@ class WorkflowManager:
def __init__(self, chatInterface: ChatObjects, currentUser: User):
self.chatInterface = chatInterface
- self.chatManager = ChatManager(currentUser, chatInterface)
self.currentUser = currentUser
+ self.handlingTasks = None
- async def workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
- """Process a workflow with user input using unified workflow phases"""
+ async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow:
+ """Starts a new workflow or continues an existing one, then launches processing."""
try:
- # Initialize chat manager
- await self.chatManager.initialize(workflow)
-
- # Set user language
- self.chatManager.handlingTasks.service.setUserLanguage(userInput.userLanguage)
-
- # Send first message
- message = await self._sendFirstMessage(userInput, workflow)
-
- # Execute unified workflow
- workflow_result = await self.chatManager.executeUnifiedWorkflow(userInput, workflow)
-
- # Process workflow results
- await self._processWorkflowResults(workflow, workflow_result, message)
-
- # Only send last message for successful workflows
- # Stopped/failed workflows get their final messages in _processWorkflowResults
- if workflow_result.status == 'success':
- await self._sendLastMessage(workflow)
-
- except WorkflowStoppedException:
- logger.info("Workflow stopped by user")
- # Update workflow status to stopped
+ currentTime = get_utc_timestamp()
+
+ if workflowId:
+ workflow = self.chatInterface.getWorkflow(workflowId)
+ if not workflow:
+ raise ValueError(f"Workflow {workflowId} not found")
+
+ if workflow.status == "running":
+ logger.info(f"Stopping running workflow {workflowId} before processing new prompt")
+ workflow.status = "stopped"
+ workflow.lastActivity = currentTime
+ self.chatInterface.updateWorkflow(workflowId, {
+ "status": "stopped",
+ "lastActivity": currentTime
+ })
+ self.chatInterface.createLog({
+ "workflowId": workflowId,
+ "message": "Workflow stopped for new prompt",
+ "type": "info",
+ "status": "stopped",
+ "progress": 100
+ })
+ await asyncio.sleep(0.1)
+
+ newRound = workflow.currentRound + 1
+ self.chatInterface.updateWorkflow(workflowId, {
+ "status": "running",
+ "lastActivity": currentTime,
+ "currentRound": newRound
+ })
+
+ workflow = self.chatInterface.getWorkflow(workflowId)
+ if not workflow:
+ raise ValueError(f"Failed to reload workflow {workflowId} after update")
+
+ self.chatInterface.createLog({
+ "workflowId": workflowId,
+ "message": f"Workflow resumed (round {workflow.currentRound})",
+ "type": "info",
+ "status": "running",
+ "progress": 0
+ })
+ else:
+ workflowData = {
+ "name": "New Workflow",
+ "status": "running",
+ "startedAt": currentTime,
+ "lastActivity": currentTime,
+ "currentRound": 0,
+ "currentTask": 0,
+ "currentAction": 0,
+ "totalTasks": 0,
+ "totalActions": 0,
+ "mandateId": self.chatInterface.mandateId,
+ "messageIds": [],
+ "stats": {
+ "processingTime": None,
+ "tokenCount": None,
+ "bytesSent": None,
+ "bytesReceived": None,
+ "successRate": None,
+ "errorCount": None
+ }
+ }
+
+ workflow = self.chatInterface.createWorkflow(workflowData)
+ workflow.currentRound = 1
+ self.chatInterface.updateWorkflow(workflow.id, {"currentRound": 1})
+ self.chatInterface.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0)
+
+ # Start workflow processing asynchronously
+ asyncio.create_task(self._workflowProcess(userInput, workflow))
+
+ return workflow
+ except Exception as e:
+ logger.error(f"Error starting workflow: {str(e)}")
+ raise
+
+ async def workflowStop(self, workflowId: str) -> ChatWorkflow:
+ """Stops a running workflow."""
+ try:
+ workflow = self.chatInterface.getWorkflow(workflowId)
+ if not workflow:
+ raise ValueError(f"Workflow {workflowId} not found")
+
workflow.status = "stopped"
workflow.lastActivity = get_utc_timestamp()
- self.chatInterface.updateWorkflow(workflow.id, {
+ self.chatInterface.updateWorkflow(workflowId, {
"status": "stopped",
- "lastActivity": workflow.lastActivity,
- "totalTasks": workflow.totalTasks,
- "totalActions": workflow.totalActions
+ "lastActivity": workflow.lastActivity
})
-
- # Create final stopped message
- stopped_message = {
- "workflowId": workflow.id,
- "role": "assistant",
- "message": "🛑 Workflow stopped by user",
- "status": "last",
- "sequenceNr": len(workflow.messages) + 1,
- "publishedAt": get_utc_timestamp(),
- "documentsLabel": "workflow_stopped",
- "documents": [],
- # Add workflow context fields
- "roundNumber": workflow.currentRound,
- "taskNumber": 0,
- "actionNumber": 0,
- # Add progress status
- "taskProgress": "pending",
- "actionProgress": "pending"
- }
- message = self.chatInterface.createMessage(stopped_message)
- if message:
- workflow.messages.append(message)
-
- # Add log entry
self.chatInterface.createLog({
- "workflowId": workflow.id,
- "message": "Workflow stopped by user",
+ "workflowId": workflowId,
+ "message": "Workflow stopped",
"type": "warning",
"status": "stopped",
"progress": 100
})
+ return workflow
+ except Exception as e:
+ logger.error(f"Error stopping workflow: {str(e)}")
+ raise
+
+ async def _workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
+ """Process a workflow with user input"""
+ try:
+ self.handlingTasks = HandlingTasks(self.chatInterface, self.currentUser, workflow)
+ self.handlingTasks.service.setUserLanguage(userInput.userLanguage)
+ message = await self._sendFirstMessage(userInput, workflow)
+ task_plan = await self._planTasks(userInput, workflow)
+ workflow_result = await self._executeTasks(task_plan, workflow)
+ await self._processWorkflowResults(workflow, workflow_result, message)
+
+ except WorkflowStoppedException:
+ self._handleWorkflowStop(workflow)
except Exception as e:
- logger.error(f"Workflow processing error: {str(e)}")
-
- # Update workflow status to failed
- workflow.status = "failed"
- workflow.lastActivity = get_utc_timestamp()
- self.chatInterface.updateWorkflow(workflow.id, {
- "status": "failed",
- "lastActivity": workflow.lastActivity,
- "totalTasks": workflow.totalTasks,
- "totalActions": workflow.totalActions
- })
-
- # Create error message
- error_message = {
- "workflowId": workflow.id,
- "role": "assistant",
- "message": f"Workflow processing failed: {str(e)}",
- "status": "last",
- "sequenceNr": len(workflow.messages) + 1,
- "publishedAt": get_utc_timestamp(),
- "documentsLabel": "workflow_error",
- "documents": [],
- # Add workflow context fields
- "roundNumber": workflow.currentRound,
- "taskNumber": 0,
- "actionNumber": 0,
- # Add progress status
- "taskProgress": "fail",
- "actionProgress": "fail"
- }
- message = self.chatInterface.createMessage(error_message)
- if message:
- workflow.messages.append(message)
-
- # Add error log entry
- self.chatInterface.createLog({
- "workflowId": workflow.id,
- "message": f"Workflow failed: {str(e)}",
- "type": "error",
- "status": "failed",
- "progress": 100
- })
-
- raise
+ self._handleWorkflowError(workflow, e)
async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage:
"""Send first message to start workflow"""
try:
- self.chatManager.handlingTasks._checkWorkflowStopped()
+ self.handlingTasks._checkWorkflowStopped()
# Create initial message using interface
# Generate the correct documentsLabel that matches what getDocumentReferenceString will create
@@ -171,12 +180,12 @@ class WorkflowManager:
workflow.messages.append(message)
# Clear trace log for new workflow session
- self.chatManager.handlingTasks.service.clearTraceLog()
+ self.handlingTasks.service.clearTraceLog()
# Add documents if any, now with messageId
if userInput.listFileId:
# Process file IDs and add to message data
- documents = await self.chatManager.handlingTasks.service.processFileIds(userInput.listFileId, message.id)
+ documents = await self.handlingTasks.service.processFileIds(userInput.listFileId, message.id)
message.documents = documents
# Update the message with documents in database
self.chatInterface.updateMessage(message.id, {"documents": [doc.to_dict() for doc in documents]})
@@ -188,97 +197,76 @@ class WorkflowManager:
except Exception as e:
logger.error(f"Error sending first message: {str(e)}")
raise
-
- async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
- """Generate feedback message for workflow completion"""
- try:
- self.chatManager.handlingTasks._checkWorkflowStopped()
-
- # Count messages by role
- user_messages = [msg for msg in workflow.messages if msg.role == 'user']
- assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant']
-
- # Generate summary feedback
- feedback = f"Workflow completed.\n\n"
- feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n"
-
- # Add final status
- if workflow.status == "completed":
- feedback += "All tasks completed successfully."
- elif workflow.status == "partial":
- feedback += "Some tasks completed with partial success."
- else:
- feedback += f"Workflow status: {workflow.status}"
-
- return feedback
-
- except Exception as e:
- logger.error(f"Error generating workflow feedback: {str(e)}")
- return "Workflow processing completed."
- async def _sendLastMessage(self, workflow: ChatWorkflow) -> None:
- """Send last message to complete workflow (only for successful workflows)"""
- try:
- # Safety check: ensure this is only called for successful workflows
- if workflow.status in ['stopped', 'failed']:
- logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}")
- return
-
- # Generate feedback
- feedback = await self._generateWorkflowFeedback(workflow)
-
- # Create last message using interface
- messageData = {
- "workflowId": workflow.id,
- "role": "assistant",
- "message": feedback,
- "status": "last",
- "sequenceNr": len(workflow.messages) + 1,
- "publishedAt": get_utc_timestamp(),
- "documentsLabel": "workflow_feedback",
- "documents": [],
- # Add workflow context fields
- "roundNumber": workflow.currentRound,
- "taskNumber": 0,
- "actionNumber": 0,
- # Add progress status
- "taskProgress": "success",
- "actionProgress": "success"
- }
-
- # Create message using interface
- message = self.chatInterface.createMessage(messageData)
- if message:
- workflow.messages.append(message)
-
- # Update workflow status to completed
- workflow.status = "completed"
- workflow.lastActivity = get_utc_timestamp()
-
- # Update workflow in database
- self.chatInterface.updateWorkflow(workflow.id, {
- "status": "completed",
- "lastActivity": workflow.lastActivity
+ async def _planTasks(self, userInput: UserInputRequest, workflow: ChatWorkflow):
+ """Generate task plan for workflow execution"""
+ handling = self.handlingTasks
+ # Generate task plan first (shared for both modes)
+ task_plan = await handling.generateTaskPlan(userInput.prompt, workflow)
+ if not task_plan or not task_plan.tasks:
+ raise Exception("No tasks generated in task plan.")
+ logger.info(f"Executing workflow mode={getattr(workflow, 'workflowMode', 'Actionplan')} with {len(task_plan.tasks)} tasks")
+ return task_plan
+
+ async def _executeTasks(self, task_plan, workflow: ChatWorkflow) -> WorkflowResult:
+ """Execute all tasks in the task plan"""
+ handling = self.handlingTasks
+ total_tasks = len(task_plan.tasks)
+ all_task_results: List = []
+ previous_results: List[str] = []
+
+ for idx, task_step in enumerate(task_plan.tasks):
+ current_task_index = idx + 1
+ logger.info(f"Task {current_task_index}/{total_tasks}: {task_step.objective}")
+
+ # Build TaskContext (mode-specific behavior is inside HandlingTasks)
+ from modules.interfaces.interfaceChatModel import TaskContext
+ task_context = TaskContext(
+ task_step=task_step,
+ workflow=workflow,
+ workflow_id=workflow.id,
+ available_documents=None,
+ available_connections=None,
+ previous_results=previous_results,
+ previous_handover=None,
+ improvements=[],
+ retry_count=0,
+ previous_action_results=[],
+ previous_review_result=None,
+ is_regeneration=False,
+ failure_patterns=[],
+ failed_actions=[],
+ successful_actions=[],
+ criteria_progress={
+ 'met_criteria': set(),
+ 'unmet_criteria': set(),
+ 'attempt_history': []
+ }
+ )
+
+ task_result = await handling.executeTask(task_step, workflow, task_context, current_task_index, total_tasks)
+ handover_data = await handling.prepareTaskHandover(task_step, [], task_result, workflow)
+ all_task_results.append({
+ 'task_step': task_step,
+ 'task_result': task_result,
+ 'handover_data': handover_data
})
-
- # Add completion log entry
- self.chatInterface.createLog({
- "workflowId": workflow.id,
- "message": "Workflow completed",
- "type": "success",
- "status": "completed",
- "progress": 100
- })
-
- except Exception as e:
- logger.error(f"Error sending last message: {str(e)}")
- raise
+ if task_result.success and task_result.feedback:
+ previous_results.append(task_result.feedback)
+
+ return WorkflowResult(
+ status="completed",
+ completed_tasks=len(all_task_results),
+ total_tasks=total_tasks,
+ execution_time=0.0,
+ final_results_count=len(all_task_results)
+ )
async def _processWorkflowResults(self, workflow: ChatWorkflow, workflow_result: WorkflowResult, initial_message: ChatMessage) -> None:
"""Process workflow results and create appropriate messages"""
try:
try:
- self.chatManager.handlingTasks._checkWorkflowStopped()
+ self.handlingTasks._checkWorkflowStopped()
except WorkflowStoppedException:
logger.info(f"Workflow {workflow.id} was stopped during result processing")
@@ -398,47 +386,8 @@ class WorkflowManager:
})
return
- # For successful workflows, create a simple completion message
- summary_message = {
- "workflowId": workflow.id,
- "role": "assistant",
- "message": f"Workflow completed successfully.",
- "status": "last",
- "sequenceNr": len(workflow.messages) + 1,
- "publishedAt": get_utc_timestamp(),
- "documentsLabel": "workflow_completion",
- "documents": [],
- # Add workflow context fields
- "roundNumber": workflow.currentRound,
- "taskNumber": 0,
- "actionNumber": 0,
- # Add progress status
- "taskProgress": "success",
- "actionProgress": "success"
- }
-
- message = self.chatInterface.createMessage(summary_message)
- if message:
- workflow.messages.append(message)
-
- # Update workflow status to completed for successful workflows
- workflow.status = "completed"
- workflow.lastActivity = get_utc_timestamp()
- self.chatInterface.updateWorkflow(workflow.id, {
- "status": "completed",
- "lastActivity": workflow.lastActivity,
- "totalTasks": workflow.totalTasks,
- "totalActions": workflow.totalActions
- })
-
- # Add completion log entry
- self.chatInterface.createLog({
- "workflowId": workflow.id,
- "message": "Workflow completed successfully",
- "type": "success",
- "status": "completed",
- "progress": 100
- })
+ # For successful workflows, send detailed completion message
+ await self._sendLastMessage(workflow)
except Exception as e:
logger.error(f"Error processing workflow results: {str(e)}")
@@ -474,3 +423,179 @@ class WorkflowManager:
"totalActions": workflow.totalActions
})
+ async def _sendLastMessage(self, workflow: ChatWorkflow) -> None:
+ """Send last message to complete workflow (only for successful workflows)"""
+ try:
+ # Safety check: ensure this is only called for successful workflows
+ if workflow.status in ['stopped', 'failed']:
+ logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}")
+ return
+
+ # Generate feedback
+ feedback = await self._generateWorkflowFeedback(workflow)
+
+ # Create last message using interface
+ messageData = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": feedback,
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": get_utc_timestamp(),
+ "documentsLabel": "workflow_feedback",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "success",
+ "actionProgress": "success"
+ }
+
+ # Create message using interface
+ message = self.chatInterface.createMessage(messageData)
+ if message:
+ workflow.messages.append(message)
+
+ # Update workflow status to completed
+ workflow.status = "completed"
+ workflow.lastActivity = get_utc_timestamp()
+
+ # Update workflow in database
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "completed",
+ "lastActivity": workflow.lastActivity
+ })
+
+ # Add completion log entry
+ self.chatInterface.createLog({
+ "workflowId": workflow.id,
+ "message": "Workflow completed",
+ "type": "success",
+ "status": "completed",
+ "progress": 100
+ })
+
+ except Exception as e:
+ logger.error(f"Error sending last message: {str(e)}")
+ raise
+
+ async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
+ """Generate feedback message for workflow completion"""
+ try:
+ self.handlingTasks._checkWorkflowStopped()
+
+ # Count messages by role
+ user_messages = [msg for msg in workflow.messages if msg.role == 'user']
+ assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant']
+
+ # Generate summary feedback
+ feedback = f"Workflow completed.\n\n"
+ feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n"
+
+ # Add final status
+ if workflow.status == "completed":
+ feedback += "All tasks completed successfully."
+ elif workflow.status == "partial":
+ feedback += "Some tasks completed with partial success."
+ else:
+ feedback += f"Workflow status: {workflow.status}"
+
+ return feedback
+
+ except Exception as e:
+ logger.error(f"Error generating workflow feedback: {str(e)}")
+ return "Workflow processing completed."
+
+ def _handleWorkflowStop(self, workflow: ChatWorkflow) -> None:
+ """Handle workflow stop exception"""
+ logger.info("Workflow stopped by user")
+
+ # Update workflow status to stopped
+ workflow.status = "stopped"
+ workflow.lastActivity = get_utc_timestamp()
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "stopped",
+ "lastActivity": workflow.lastActivity,
+ "totalTasks": workflow.totalTasks,
+ "totalActions": workflow.totalActions
+ })
+
+ # Create final stopped message
+ stopped_message = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": "🛑 Workflow stopped by user",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": get_utc_timestamp(),
+ "documentsLabel": "workflow_stopped",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "pending",
+ "actionProgress": "pending"
+ }
+ message = self.chatInterface.createMessage(stopped_message)
+ if message:
+ workflow.messages.append(message)
+
+ # Add log entry
+ self.chatInterface.createLog({
+ "workflowId": workflow.id,
+ "message": "Workflow stopped by user",
+ "type": "warning",
+ "status": "stopped",
+ "progress": 100
+ })
+
+ def _handleWorkflowError(self, workflow: ChatWorkflow, error: Exception) -> None:
+ """Handle workflow error exception"""
+ logger.error(f"Workflow processing error: {str(error)}")
+
+ # Update workflow status to failed
+ workflow.status = "failed"
+ workflow.lastActivity = get_utc_timestamp()
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "failed",
+ "lastActivity": workflow.lastActivity,
+ "totalTasks": workflow.totalTasks,
+ "totalActions": workflow.totalActions
+ })
+
+ # Create error message
+ error_message = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": f"Workflow processing failed: {str(error)}",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": get_utc_timestamp(),
+ "documentsLabel": "workflow_error",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "fail",
+ "actionProgress": "fail"
+ }
+ message = self.chatInterface.createMessage(error_message)
+ if message:
+ workflow.messages.append(message)
+
+ # Add error log entry
+ self.chatInterface.createLog({
+ "workflowId": workflow.id,
+ "message": f"Workflow failed: {str(error)}",
+ "type": "error",
+ "status": "failed",
+ "progress": 100
+ })
+
+ raise
diff --git a/tests/methods/test_method_web.py b/tests/methods/test_method_web.py
index 27344ab3..0d1509e2 100644
--- a/tests/methods/test_method_web.py
+++ b/tests/methods/test_method_web.py
@@ -5,7 +5,7 @@ import logging
import pytest
from unittest.mock import patch
-from modules.methods.methodWeb import MethodWeb
+from modules.workflows.methods.methodWeb import MethodWeb
from tests.fixtures.tavily_responses import (
RESPONSE_SEARCH_HOW_OLD_IS_EARTH_NO_ANSWER,
RESPONSE_EXTRACT_HOW_OLD_IS_EARTH_NO_ANSWER,
diff --git a/tool_stats_durations_from_log.py b/tool_stats_durations_from_log.py
index 103cd5be..483af2d2 100644
--- a/tool_stats_durations_from_log.py
+++ b/tool_stats_durations_from_log.py
@@ -13,7 +13,7 @@ def parse_line(line: str) -> Tuple[Optional[str], Optional[str], Optional[dateti
Extract (logger, function, timestamp) from a log line.
Expected format examples (single line):
- 2025-09-18 16:35:04 - INFO - modules.chat.handling.handlingTasks - Task 1 - Starting action 3/4 - D:\\Athi\\...\\handlingTasks.py:572 - executeTask
+ 2025-09-18 16:35:04 - INFO - modules.workflows._transfer.handlingTasks - Task 1 - Starting action 3/4 - D:\\Athi\\...\\handlingTasks.py:572 - executeTask
Returns (logger, function, timestamp_dt) or (None, None, None) if not matched.
"""