From 6060d74eb28590ec4635eefef734e3053b1f0775 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 2 Nov 2025 19:53:36 +0100
Subject: [PATCH] issues fixing in chat workflow
---
.../chatPlayground/mainChatPlayground.py | 9 +-
modules/interfaces/interfaceDbChatObjects.py | 2 +-
modules/routes/routeChatPlayground.py | 12 +-
modules/routes/routeSecurityLocal.py | 7 +-
.../serviceWorkflow/mainServiceWorkflow.py | 24 ++--
modules/shared/attributeUtils.py | 18 +++
.../processing/modes/modeActionplan.py | 2 +-
.../workflows/processing/workflowProcessor.py | 2 +-
modules/workflows/workflowManager.py | 132 +++++++++---------
9 files changed, 111 insertions(+), 97 deletions(-)
diff --git a/modules/features/chatPlayground/mainChatPlayground.py b/modules/features/chatPlayground/mainChatPlayground.py
index 8420cb41..fc148e56 100644
--- a/modules/features/chatPlayground/mainChatPlayground.py
+++ b/modules/features/chatPlayground/mainChatPlayground.py
@@ -8,30 +8,29 @@ from modules.services import getInterface as getServices
logger = logging.getLogger(__name__)
-async def chatStart(interfaceDbChat, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: WorkflowModeEnum = WorkflowModeEnum.WORKFLOW_ACTIONPLAN) -> ChatWorkflow:
+async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow:
"""
Starts a new chat or continues an existing one, then launches processing asynchronously.
Args:
- interfaceDbChat: Chat interface instance
currentUser: Current user
userInput: User input request
workflowId: Optional workflow ID to continue existing workflow
workflowMode: "Actionplan" for traditional task planning, "Dynamic" for iterative dynamic-style processing, "Template" for template-based processing
Example usage for Dynamic mode:
- workflow = await chatStart(interfaceDbChat, currentUser, userInput, workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC)
+ workflow = await chatStart(currentUser, userInput, workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC)
"""
try:
services = getServices(currentUser, None)
workflowManager = WorkflowManager(services)
- workflow = await workflowManager.workflowStart(userInput, workflowId, workflowMode)
+ workflow = await workflowManager.workflowStart(userInput, workflowMode, workflowId)
return workflow
except Exception as e:
logger.error(f"Error starting chat: {str(e)}")
raise
-async def chatStop(interfaceDbChat, currentUser: User, workflowId: str) -> ChatWorkflow:
+async def chatStop(currentUser: User, workflowId: str) -> ChatWorkflow:
"""Stops a running chat."""
try:
services = getServices(currentUser, None)
diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py
index 87f0bf5b..66927037 100644
--- a/modules/interfaces/interfaceDbChatObjects.py
+++ b/modules/interfaces/interfaceDbChatObjects.py
@@ -368,7 +368,7 @@ class ChatObjects:
messages=[],
stats=[],
mandateId=created.get("mandateId", self.currentUser.mandateId),
- workflowMode=created.get("workflowMode", WorkflowModeEnum.WORKFLOW_DYNAMIC),
+ workflowMode=created["workflowMode"],
maxSteps=created.get("maxSteps", 1)
)
diff --git a/modules/routes/routeChatPlayground.py b/modules/routes/routeChatPlayground.py
index d376a12d..8f23f7fd 100644
--- a/modules/routes/routeChatPlayground.py
+++ b/modules/routes/routeChatPlayground.py
@@ -39,7 +39,7 @@ def getServiceChat(currentUser: User):
async def start_workflow(
request: Request,
workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"),
- workflowMode: WorkflowModeEnum = Query(WorkflowModeEnum.WORKFLOW_DYNAMIC, description="Workflow mode: 'Actionplan', 'Dynamic', or 'Template'"),
+ workflowMode: WorkflowModeEnum = Query(..., description="Workflow mode: 'Actionplan', 'Dynamic', or 'Template' (mandatory)"),
userInput: UserInputRequest = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
@@ -51,11 +51,8 @@ async def start_workflow(
workflowMode: "Actionplan" for traditional task planning, "Dynamic" for iterative dynamic-style processing, "Template" for template-based processing
"""
try:
- # Get service center
- interfaceDbChat = getServiceChat(currentUser)
-
# Start or continue workflow using playground controller
- workflow = await chatStart(interfaceDbChat, currentUser, userInput, workflowId, workflowMode)
+ workflow = await chatStart(currentUser, userInput, workflowMode, workflowId)
return workflow
@@ -76,11 +73,8 @@ async def stop_workflow(
) -> ChatWorkflow:
"""Stops a running workflow."""
try:
- # Get service center
- interfaceDbChat = getServiceChat(currentUser)
-
# Stop workflow using playground controller
- workflow = await chatStop(interfaceDbChat, currentUser, workflowId)
+ workflow = await chatStop(currentUser, workflowId)
return workflow
diff --git a/modules/routes/routeSecurityLocal.py b/modules/routes/routeSecurityLocal.py
index 0a2fff71..7b08ceed 100644
--- a/modules/routes/routeSecurityLocal.py
+++ b/modules/routes/routeSecurityLocal.py
@@ -156,6 +156,10 @@ async def login(
error_msg = str(e)
logger.warning(f"Authentication failed for user {formData.username}: {error_msg}")
+ # Check if user is disabled and provide specific message
+ if error_msg == "User is disabled":
+ error_msg = "Your account is disabled. Please send an email to p.motsch@valueon.ch to get access to the PowerOn center."
+
# Log failed login attempt
try:
from modules.shared.auditLogger import audit_logger
@@ -209,6 +213,7 @@ async def register_user(
# Create user with local authentication
# Set safe default privilege level for new registrations
+ # New users are disabled by default and require admin approval
from modules.datamodels.datamodelUam import UserPrivilege
user = appInterface.createUser(
username=userData.username,
@@ -216,7 +221,7 @@ async def register_user(
email=userData.email,
fullName=userData.fullName,
language=userData.language,
- enabled=userData.enabled,
+ enabled=False, # New users are disabled by default
privilege=UserPrivilege.USER, # Always set to USER for new registrations
authenticationAuthority=AuthAuthority.LOCAL
)
diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py
index a38be0c7..4dad14bf 100644
--- a/modules/services/serviceWorkflow/mainServiceWorkflow.py
+++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py
@@ -375,7 +375,7 @@ class WorkflowService:
logger.error(f"Error getting workflow context: {str(e)}")
return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
- def setWorkflowContext(self, round_number: int = None, task_number: int = None, action_number: int = None):
+ def setWorkflowContext(self, roundNumber: int = None, taskNumber: int = None, actionNumber: int = None):
"""Set current workflow context for document generation and routing"""
try:
workflow = self.services.currentWorkflow
@@ -383,15 +383,15 @@ class WorkflowService:
# Prepare update data
update_data = {}
- if round_number is not None:
- workflow.currentRound = round_number
- update_data["currentRound"] = round_number
- if task_number is not None:
- workflow.currentTask = task_number
- update_data["currentTask"] = task_number
- if action_number is not None:
- workflow.currentAction = action_number
- update_data["currentAction"] = action_number
+ if roundNumber is not None:
+ workflow.currentRound = roundNumber
+ update_data["currentRound"] = roundNumber
+ if taskNumber is not None:
+ workflow.currentTask = taskNumber
+ update_data["currentTask"] = taskNumber
+ if actionNumber is not None:
+ workflow.currentAction = actionNumber
+ update_data["currentAction"] = actionNumber
# Persist changes to database if any updates were made
if update_data:
@@ -863,8 +863,8 @@ class WorkflowService:
logger.error(f"Error getting connection reference list: {str(e)}")
return []
- def setWorkflowContext(self, workflow):
- """Set the current workflow context for this service"""
+ def setCurrentWorkflow(self, workflow):
+ """Set the current workflow reference for this service"""
self.workflow = workflow
# Reset progress logger for new workflow
self._progressLogger = None
diff --git a/modules/shared/attributeUtils.py b/modules/shared/attributeUtils.py
index 1dc58779..b88a94e7 100644
--- a/modules/shared/attributeUtils.py
+++ b/modules/shared/attributeUtils.py
@@ -176,6 +176,23 @@ def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguag
else str(field.annotation)
)
)
+
+ # Extract default value from field
+ # In Pydantic v2, FieldInfo has a 'default' attribute
+ field_default = None
+ if hasattr(field_info, 'default'):
+ default_value = field_info.default
+ # Handle default_factory (callable that generates default)
+ if hasattr(field_info, 'default_factory') and callable(field_info.default_factory):
+ # Don't call it here - it's meant to be called per-instance
+ # Instead, store a marker that indicates it exists
+ field_default = None # Frontend should use first option or specific logic
+ elif default_value is not ...: # Ellipsis means no default
+ # Convert enum to its value if it's an enum
+ if hasattr(default_value, 'value'):
+ field_default = default_value.value
+ else:
+ field_default = default_value
attributes.append(
{
@@ -192,6 +209,7 @@ def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguag
"order": len(attributes),
"readonly": frontend_readonly,
"options": frontend_options,
+ "default": field_default,
}
)
diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py
index e2406324..b9820ffc 100644
--- a/modules/workflows/processing/modes/modeActionplan.py
+++ b/modules/workflows/processing/modes/modeActionplan.py
@@ -258,7 +258,7 @@ class ActionplanMode(BaseMode):
# Update workflow context for this task
if taskIndex is not None:
- self.services.workflow.setWorkflowContext(task_number=taskIndex)
+ self.services.workflow.setWorkflowContext(taskNumber=taskIndex)
# Create task start message
await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks)
diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py
index 36027d9e..5884c5bc 100644
--- a/modules/workflows/processing/workflowProcessor.py
+++ b/modules/workflows/processing/workflowProcessor.py
@@ -22,7 +22,7 @@ class WorkflowProcessor:
def __init__(self, services, workflow=None):
self.services = services
self.workflow = workflow
- self.mode = self._createMode(workflow.workflowMode if workflow else WorkflowModeEnum.WORKFLOW_DYNAMIC)
+ self.mode = self._createMode(workflow.workflowMode)
def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode:
"""Create the appropriate mode implementation based on workflow mode"""
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 9123d102..de53e875 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -26,7 +26,7 @@ class WorkflowManager:
# Exported functions
- async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: WorkflowModeEnum = WorkflowModeEnum.WORKFLOW_DYNAMIC) -> ChatWorkflow:
+ async def workflowStart(self, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow:
"""Starts a new workflow or continues an existing one, then launches processing."""
try:
# Debug log to check workflowMode parameter
@@ -142,7 +142,7 @@ class WorkflowManager:
self.services.currentUserPrompt = userInput.prompt
# Update the workflow service with the current workflow context
- self.services.workflow.setWorkflowContext(workflow)
+ self.services.workflow.setCurrentWorkflow(workflow)
self.workflowProcessor = WorkflowProcessor(self.services, workflow)
await self._sendFirstMessage(userInput, workflow)
@@ -165,10 +165,8 @@ class WorkflowManager:
# Create initial message using interface
# For first user message, include round info in the user context label
- round_num = workflow.currentRound
- task_num = 0
- action_num = 0
- context_label = f"round{round_num}_usercontext"
+ roundNum = workflow.currentRound
+ contextLabel = f"round{roundNum}_usercontext"
messageData = {
"workflowId": workflow.id,
@@ -177,7 +175,7 @@ class WorkflowManager:
"status": "first",
"sequenceNr": 1,
"publishedAt": self.services.utils.timestampGetUtc(),
- "documentsLabel": context_label,
+ "documentsLabel": contextLabel,
"documents": [],
# Add workflow context fields
"roundNumber": workflow.currentRound,
@@ -189,7 +187,7 @@ class WorkflowManager:
}
# Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents
- created_docs = []
+ createdDocs = []
try:
analyzerPrompt = (
@@ -271,28 +269,28 @@ class WorkflowManager:
mimeType = (mime or "text/plain").strip()
# Neutralize content before storing if neutralization is enabled
- content_bytes = content.encode('utf-8')
- content_bytes = await self._neutralizeContentIfEnabled(content_bytes, mimeType)
+ contentBytes = content.encode('utf-8')
+ contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType)
# Create file in component storage
- file_item = self.services.interfaceDbComponent.createFile(
+ fileItem = self.services.interfaceDbComponent.createFile(
name=fileName,
mimeType=mimeType,
- content=content_bytes
+ content=contentBytes
)
# Persist file data
- self.services.interfaceDbComponent.createFileData(file_item.id, content_bytes)
-
+ self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes)
+
# Collect file info
- file_info = self.services.workflow.getFileInfo(file_item.id)
+ fileInfo = self.services.workflow.getFileInfo(fileItem.id)
from modules.datamodels.datamodelChat import ChatDocument
doc = ChatDocument(
- fileId=file_item.id,
- fileName=file_info.get("fileName", fileName) if file_info else fileName,
- fileSize=file_info.get("size", len(content_bytes)) if file_info else len(content_bytes),
- mimeType=file_info.get("mimeType", mimeType) if file_info else mimeType
+ fileId=fileItem.id,
+ fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName,
+ fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes),
+ mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType
)
- created_docs.append(doc)
+ createdDocs.append(doc)
except Exception:
continue
except Exception as e:
@@ -301,14 +299,14 @@ class WorkflowManager:
# Process user-uploaded documents (fileIds) and combine with context documents
if userInput.listFileId:
try:
- user_docs = await self._processFileIds(userInput.listFileId, None)
- if user_docs:
- created_docs.extend(user_docs)
+ userDocs = await self._processFileIds(userInput.listFileId, None)
+ if userDocs:
+ createdDocs.extend(userDocs)
except Exception as e:
logger.warning(f"Failed to process user fileIds: {e}")
# Finally, persist and bind the first message with combined documents (context + user)
- self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs)
+ self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocs)
except Exception as e:
logger.error(f"Error sending first message: {str(e)}")
@@ -318,33 +316,33 @@ class WorkflowManager:
"""Generate task plan for workflow execution"""
handling = self.workflowProcessor
# Generate task plan first (shared for both modes)
- task_plan = await handling.generateTaskPlan(userInput.prompt, workflow)
- if not task_plan or not task_plan.tasks:
+ taskPlan = await handling.generateTaskPlan(userInput.prompt, workflow)
+ if not taskPlan or not taskPlan.tasks:
raise Exception("No tasks generated in task plan.")
- workflow_mode = getattr(workflow, 'workflowMode', WorkflowModeEnum.WORKFLOW_DYNAMIC)
+ workflowMode = getattr(workflow, 'workflowMode')
logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}")
- logger.info(f"Executing workflow mode={workflow_mode} with {len(task_plan.tasks)} tasks")
- return task_plan
+ logger.info(f"Executing workflow mode={workflowMode} with {len(taskPlan.tasks)} tasks")
+ return taskPlan
- async def _executeTasks(self, task_plan, workflow: ChatWorkflow) -> None:
+ async def _executeTasks(self, taskPlan, workflow: ChatWorkflow) -> None:
"""Execute all tasks in the task plan and update workflow status."""
handling = self.workflowProcessor
- total_tasks = len(task_plan.tasks)
- all_task_results: List = []
- previous_results: List[str] = []
+ totalTasks = len(taskPlan.tasks)
+ allTaskResults: List = []
+ previousResults: List[str] = []
- for idx, task_step in enumerate(task_plan.tasks):
- current_task_index = idx + 1
- logger.info(f"Task {current_task_index}/{total_tasks}: {task_step.objective}")
+ for idx, taskStep in enumerate(taskPlan.tasks):
+ currentTaskIndex = idx + 1
+ logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}")
# Build TaskContext (mode-specific behavior is inside WorkflowProcessor)
- task_context = TaskContext(
- taskStep=task_step,
+ taskContext = TaskContext(
+ taskStep=taskStep,
workflow=workflow,
workflowId=workflow.id,
availableDocuments=None,
availableConnections=None,
- previousResults=previous_results,
+ previousResults=previousResults,
previousHandover=None,
improvements=[],
retryCount=0,
@@ -361,15 +359,15 @@ class WorkflowManager:
}
)
- task_result = await handling.executeTask(task_step, workflow, task_context, current_task_index, total_tasks)
- handover_data = await handling.prepareTaskHandover(task_step, [], task_result, workflow)
- all_task_results.append({
- 'task_step': task_step,
- 'task_result': task_result,
- 'handover_data': handover_data
+ taskResult = await handling.executeTask(taskStep, workflow, taskContext, currentTaskIndex, totalTasks)
+ handoverData = await handling.prepareTaskHandover(taskStep, [], taskResult, workflow)
+ allTaskResults.append({
+ 'taskStep': taskStep,
+ 'taskResult': taskResult,
+ 'handoverData': handoverData
})
- if task_result.success and task_result.feedback:
- previous_results.append(task_result.feedback)
+ if taskResult.success and taskResult.feedback:
+ previousResults.append(taskResult.feedback)
# Mark workflow as completed; error/stop cases update status elsewhere
workflow.status = "completed"
@@ -384,7 +382,7 @@ class WorkflowManager:
logger.info(f"Workflow {workflow.id} was stopped during result processing")
# Create final stopped message
- stopped_message = {
+ stoppedMessage = {
"workflowId": workflow.id,
"role": "assistant",
"message": "🛑 Workflow stopped by user",
@@ -401,7 +399,7 @@ class WorkflowManager:
"taskProgress": "stopped",
"actionProgress": "stopped"
}
- self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, stoppedMessage, [])
# Update workflow status to stopped
workflow.status = "stopped"
@@ -431,7 +429,7 @@ class WorkflowManager:
"taskProgress": "stopped",
"actionProgress": "stopped"
}
- self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, stoppedMessage, [])
# Update workflow status to stopped
workflow.status = "stopped"
@@ -453,7 +451,7 @@ class WorkflowManager:
return
elif workflow.status == 'failed':
# Create error message
- error_message = {
+ errorMessage = {
"workflowId": workflow.id,
"role": "assistant",
"message": f"Workflow failed: {'Unknown error'}",
@@ -470,7 +468,7 @@ class WorkflowManager:
"taskProgress": "fail",
"actionProgress": "fail"
}
- self.services.workflow.storeMessageWithDocuments(workflow, error_message, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, errorMessage, [])
# Update workflow status to failed
workflow.status = "failed"
@@ -514,7 +512,7 @@ class WorkflowManager:
"taskProgress": "fail",
"actionProgress": "fail"
}
- self.services.workflow.storeMessageWithDocuments(workflow, error_message, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, errorMessage, [])
# Update workflow status to failed
workflow.status = "failed"
@@ -587,12 +585,12 @@ class WorkflowManager:
self.workflowProcessor._checkWorkflowStopped(workflow)
# Count messages by role
- user_messages = [msg for msg in workflow.messages if msg.role == 'user']
- assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant']
+ userMessages = [msg for msg in workflow.messages if msg.role == 'user']
+ assistantMessages = [msg for msg in workflow.messages if msg.role == 'assistant']
# Generate summary feedback
feedback = f"Workflow completed.\n\n"
- feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n"
+ feedback += f"Processed {len(userMessages)} user inputs and generated {len(assistantMessages)} responses.\n"
# Add final status
if workflow.status == "completed":
@@ -724,45 +722,45 @@ class WorkflowManager:
"""Set user language for the service center"""
self.services.user.language = language
- async def _neutralizeContentIfEnabled(self, content_bytes: bytes, mimeType: str) -> bytes:
+ async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes:
"""Neutralize content if neutralization is enabled in user settings"""
try:
# Check if neutralization is enabled
config = self.services.neutralization.getConfig()
if not config or not config.enabled:
- return content_bytes
+ return contentBytes
# Decode content to text for neutralization
try:
- text_content = content_bytes.decode('utf-8')
+ textContent = contentBytes.decode('utf-8')
except UnicodeDecodeError:
# Try alternative encodings
for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
try:
- text_content = content_bytes.decode(enc)
+ textContent = contentBytes.decode(enc)
break
except UnicodeDecodeError:
continue
else:
# If unable to decode, return original bytes (binary content)
logger.debug(f"Unable to decode content for neutralization, skipping: {mimeType}")
- return content_bytes
+ return contentBytes
# Neutralize the text content
# Note: The neutralization service should use names from config when processing
- result = self.services.neutralization.processText(text_content)
+ result = self.services.neutralization.processText(textContent)
if result and 'neutralized_text' in result:
- neutralized_text = result['neutralized_text']
+ neutralizedText = result['neutralized_text']
# Encode back to bytes using the same encoding
try:
- return neutralized_text.encode('utf-8')
+ return neutralizedText.encode('utf-8')
except Exception as e:
logger.warning(f"Error encoding neutralized text: {str(e)}")
- return content_bytes
+ return contentBytes
else:
logger.warning("Neutralization did not return neutralized_text")
- return content_bytes
+ return contentBytes
except Exception as e:
logger.error(f"Error during content neutralization: {str(e)}")
# Return original content on error
- return content_bytes
+ return contentBytes