From 1019cb7a6511240c19148374bb17311f998983e2 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 23 Sep 2025 00:36:24 +0200
Subject: [PATCH] CHAT 2.0 - Iterative mode
---
app.py | 3 +
.../chatPlayground/mainChatPlayground.py | 18 +-
modules/interfaces/interfaceChatObjects.py | 56 +++++-
modules/routes/routeChatPlayground.py | 6 +-
modules/routes/routeSecurityAdmin.py | 186 ++++++++++++++++--
modules/services/serviceCenter.py | 4 +-
modules/workflows/_transfer/handlingTasks.py | 10 +-
modules/workflows/_transfer/promptFactory.py | 10 +
modules/workflows/workflowManager.py | 24 ++-
9 files changed, 292 insertions(+), 25 deletions(-)
diff --git a/app.py b/app.py
index 5caa64f8..a86f0d37 100644
--- a/app.py
+++ b/app.py
@@ -309,6 +309,9 @@ app.include_router(connectionsRouter)
from modules.routes.routeWorkflows import router as workflowRouter
app.include_router(workflowRouter)
+from modules.routes.routeChatPlayground import router as chatPlaygroundRouter
+app.include_router(chatPlaygroundRouter)
+
from modules.routes.routeSecurityLocal import router as localRouter
app.include_router(localRouter)
diff --git a/modules/features/chatPlayground/mainChatPlayground.py b/modules/features/chatPlayground/mainChatPlayground.py
index 13eba835..07d43043 100644
--- a/modules/features/chatPlayground/mainChatPlayground.py
+++ b/modules/features/chatPlayground/mainChatPlayground.py
@@ -8,12 +8,24 @@ from modules.shared.timezoneUtils import get_utc_timestamp
logger = logging.getLogger(__name__)
-async def chatStart(interfaceChat, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow:
- """Starts a new chat or continues an existing one, then launches processing asynchronously."""
+async def chatStart(interfaceChat, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: str = "Actionplan") -> ChatWorkflow:
+ """
+ Starts a new chat or continues an existing one, then launches processing asynchronously.
+
+ Args:
+ interfaceChat: Chat interface instance
+ currentUser: Current user
+ userInput: User input request
+ workflowId: Optional workflow ID to continue existing workflow
+ workflowMode: "Actionplan" for traditional task planning, "React" for iterative react-style processing
+
+ Example usage for React mode:
+ workflow = await chatStart(interfaceChat, currentUser, userInput, workflowMode="React")
+ """
try:
from modules.workflows.workflowManager import WorkflowManager
workflowManager = WorkflowManager(interfaceChat, currentUser)
- return await workflowManager.workflowStart(userInput, workflowId)
+ return await workflowManager.workflowStart(userInput, workflowId, workflowMode)
except Exception as e:
logger.error(f"Error starting chat: {str(e)}")
raise
diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py
index 1b0a2af5..a79b2284 100644
--- a/modules/interfaces/interfaceChatObjects.py
+++ b/modules/interfaces/interfaceChatObjects.py
@@ -270,7 +270,9 @@ class ChatObjects:
logs=[],
messages=[],
stats=None,
- mandateId=created.get("mandateId", self.currentUser.mandateId)
+ mandateId=created.get("mandateId", self.currentUser.mandateId),
+ workflowMode=created.get("workflowMode", "Actionplan"),
+ maxSteps=created.get("maxSteps", 1)
)
def updateWorkflow(self, workflowId: str, workflowData: Dict[str, Any]) -> ChatWorkflow:
@@ -885,6 +887,58 @@ class ChatObjects:
stats.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return ChatStat(**stats[0])
+ def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0) -> None:
+ """
+ Updates workflow statistics in the database.
+
+ Args:
+ workflowId: ID of the workflow to update
+ bytesSent: Bytes sent (incremental)
+ bytesReceived: Bytes received (incremental)
+ tokenCount: Token count (incremental, default 0)
+ """
+ try:
+ # Check workflow access first
+ workflow = self.getWorkflow(workflowId)
+ if not workflow:
+ logger.warning(f"No access to workflow {workflowId} for stats update")
+ return
+
+ if not self._canModify(ChatWorkflow, workflowId):
+ logger.warning(f"No permission to modify workflow {workflowId} for stats update")
+ return
+
+ # Get existing stats or create new ones
+ existing_stats = self.getWorkflowStats(workflowId)
+
+ if existing_stats:
+ # Update existing stats
+ updated_stats = {
+ "bytesSent": (existing_stats.bytesSent or 0) + bytesSent,
+ "bytesReceived": (existing_stats.bytesReceived or 0) + bytesReceived,
+ "tokenCount": (existing_stats.tokenCount or 0) + tokenCount,
+ "lastUpdated": get_utc_timestamp()
+ }
+
+ # Update the stats record
+ self.db.recordModify(ChatStat, existing_stats.id, updated_stats)
+ else:
+ # Create new stats record
+ new_stats = {
+ "workflowId": workflowId,
+ "bytesSent": bytesSent,
+ "bytesReceived": bytesReceived,
+ "tokenCount": tokenCount,
+ "lastUpdated": get_utc_timestamp()
+ }
+
+ self.db.recordCreate(ChatStat, new_stats)
+
+ logger.debug(f"Updated workflow stats for {workflowId}: +{bytesSent} sent, +{bytesReceived} received, +{tokenCount} tokens")
+
+ except Exception as e:
+ logger.error(f"Error updating workflow stats for {workflowId}: {str(e)}")
+
def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]:
"""
Returns unified chat data (messages, logs, stats) for a workflow in chronological order.
diff --git a/modules/routes/routeChatPlayground.py b/modules/routes/routeChatPlayground.py
index 24bc91a3..186e65a8 100644
--- a/modules/routes/routeChatPlayground.py
+++ b/modules/routes/routeChatPlayground.py
@@ -44,19 +44,23 @@ def getServiceChat(currentUser: User):
async def start_workflow(
request: Request,
workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"),
+ workflowMode: str = Query("Actionplan", description="Workflow mode: 'Actionplan' or 'React'"),
userInput: UserInputRequest = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""
Starts a new workflow or continues an existing one.
Corresponds to State 1 in the state machine documentation.
+
+ Args:
+ workflowMode: "Actionplan" for traditional task planning, "React" for iterative react-style processing
"""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Start or continue workflow using playground controller
- workflow = await chatStart(interfaceChat, currentUser, userInput, workflowId)
+ workflow = await chatStart(interfaceChat, currentUser, userInput, workflowId, workflowMode)
return workflow
diff --git a/modules/routes/routeSecurityAdmin.py b/modules/routes/routeSecurityAdmin.py
index 2b509a0e..a7203965 100644
--- a/modules/routes/routeSecurityAdmin.py
+++ b/modules/routes/routeSecurityAdmin.py
@@ -13,7 +13,7 @@ logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin",
- tags=["Admin"],
+ tags=["Security Administration"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
@@ -248,9 +248,145 @@ async def list_databases(
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
_ensure_admin_scope(currentUser)
- # For safety, expose only configured database name
- db_name = APP_CONFIG.get("DB_DATABASE") or APP_CONFIG.get("DB_NAME") or "poweron"
- return {"databases": [db_name]}
+
+ # Get database names from configuration for each interface
+ databases = []
+
+ # App database (interfaceAppObjects.py)
+ app_db = APP_CONFIG.get("DB_APP_DATABASE")
+ if app_db:
+ databases.append(app_db)
+
+ # Chat database (interfaceChatObjects.py)
+ chat_db = APP_CONFIG.get("DB_CHAT_DATABASE")
+ if chat_db:
+ databases.append(chat_db)
+
+ # Management database (interfaceComponentObjects.py)
+ management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
+ if management_db:
+ databases.append(management_db)
+
+ # Fallback to default if no databases configured
+ if not databases:
+ databases = ["poweron"]
+
+ return {"databases": databases}
+
+
+@router.get("/databases/{database_name}/tables")
+@limiter.limit("30/minute")
+async def get_database_tables(
+ request: Request,
+ database_name: str,
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ _ensure_admin_scope(currentUser)
+
+ # Get all configured database names
+ configured_dbs = []
+ app_db = APP_CONFIG.get("DB_APP_DATABASE")
+ if app_db:
+ configured_dbs.append(app_db)
+ chat_db = APP_CONFIG.get("DB_CHAT_DATABASE")
+ if chat_db:
+ configured_dbs.append(chat_db)
+ management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
+ if management_db:
+ configured_dbs.append(management_db)
+
+ if not configured_dbs:
+ configured_dbs = ["poweron"]
+
+ if database_name not in configured_dbs:
+ raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}")
+
+ try:
+ # Use the appropriate interface based on database name
+ if database_name == app_db:
+ appInterface = getRootInterface()
+ tables = appInterface.db.getTables()
+ elif database_name == chat_db:
+ from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface
+ chatInterface = getChatInterface(currentUser)
+ tables = chatInterface.db.getTables()
+ elif database_name == management_db:
+ from modules.interfaces.interfaceComponentObjects import getInterface as getComponentInterface
+ componentInterface = getComponentInterface(currentUser)
+ tables = componentInterface.db.getTables()
+ else:
+ raise HTTPException(status_code=400, detail="Database not found")
+
+ return {"tables": tables}
+ except Exception as e:
+ logger.error(f"Error getting database tables: {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to get database tables")
+
+
+@router.post("/databases/{database_name}/tables/{table_name}/drop")
+@limiter.limit("10/minute")
+async def drop_table(
+ request: Request,
+ database_name: str,
+ table_name: str,
+ currentUser: User = Depends(getCurrentUser),
+ payload: Dict[str, Any] = Body(...)
+) -> Dict[str, Any]:
+ _ensure_admin_scope(currentUser)
+
+ # Get all configured database names
+ configured_dbs = []
+ app_db = APP_CONFIG.get("DB_APP_DATABASE")
+ if app_db:
+ configured_dbs.append(app_db)
+ chat_db = APP_CONFIG.get("DB_CHAT_DATABASE")
+ if chat_db:
+ configured_dbs.append(chat_db)
+ management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
+ if management_db:
+ configured_dbs.append(management_db)
+
+ if not configured_dbs:
+ configured_dbs = ["poweron"]
+
+ if database_name not in configured_dbs:
+ raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}")
+
+ try:
+ # Use the appropriate interface based on database name
+ if database_name == app_db:
+ interface = getRootInterface()
+ elif database_name == chat_db:
+ from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface
+ interface = getChatInterface(currentUser)
+ elif database_name == management_db:
+ from modules.interfaces.interfaceComponentObjects import getInterface as getComponentInterface
+ interface = getComponentInterface(currentUser)
+ else:
+ raise HTTPException(status_code=400, detail="Database not found")
+
+ conn = interface.db.connection
+ with conn.cursor() as cursor:
+ # Check if table exists
+ cursor.execute("""
+ SELECT table_name FROM information_schema.tables
+ WHERE table_schema = 'public' AND table_name = %s
+ """, (table_name,))
+ if not cursor.fetchone():
+ raise HTTPException(status_code=404, detail="Table not found")
+
+ # Drop the table
+ cursor.execute(f'DROP TABLE IF EXISTS "{table_name}" CASCADE')
+ conn.commit()
+ logger.warning(f"Admin drop_table executed by {currentUser.id}: dropped table '{table_name}' from database '{database_name}'")
+ return {"message": f"Table '{table_name}' dropped successfully from database '{database_name}'"}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error dropping table: {str(e)}")
+ if 'interface' in locals() and interface and interface.db and interface.db.connection:
+ interface.db.connection.rollback()
+ raise HTTPException(status_code=500, detail="Failed to drop table")
@router.post("/databases/drop")
@@ -262,13 +398,39 @@ async def drop_database(
) -> Dict[str, Any]:
_ensure_admin_scope(currentUser)
db_name = payload.get("database")
- configured_db = APP_CONFIG.get("DB_DATABASE") or APP_CONFIG.get("DB_NAME") or "poweron"
- if not db_name or db_name != configured_db:
- raise HTTPException(status_code=400, detail="Invalid database name")
+
+ # Get all configured database names
+ configured_dbs = []
+ app_db = APP_CONFIG.get("DB_APP_DATABASE")
+ if app_db:
+ configured_dbs.append(app_db)
+ chat_db = APP_CONFIG.get("DB_CHAT_DATABASE")
+ if chat_db:
+ configured_dbs.append(chat_db)
+ management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
+ if management_db:
+ configured_dbs.append(management_db)
+
+ if not configured_dbs:
+ configured_dbs = ["poweron"]
+
+ if not db_name or db_name not in configured_dbs:
+ raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}")
try:
- appInterface = getRootInterface()
- conn = appInterface.db.connection
+ # Use the appropriate interface based on database name
+ if db_name == app_db:
+ interface = getRootInterface()
+ elif db_name == chat_db:
+ from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface
+ interface = getChatInterface(currentUser)
+ elif db_name == management_db:
+ from modules.interfaces.interfaceComponentObjects import getInterface as getComponentInterface
+ interface = getComponentInterface(currentUser)
+ else:
+ raise HTTPException(status_code=400, detail="Database not found")
+
+ conn = interface.db.connection
with conn.cursor() as cursor:
# Drop all user tables (public schema) except system table
cursor.execute("""
@@ -281,12 +443,12 @@ async def drop_database(
cursor.execute(f'DROP TABLE IF EXISTS "{tbl}" CASCADE')
dropped.append(tbl)
conn.commit()
- logger.warning(f"Admin drop_database executed by {currentUser.id}: dropped tables: {dropped}")
+ logger.warning(f"Admin drop_database executed by {currentUser.id}: dropped tables from '{db_name}': {dropped}")
return {"droppedTables": dropped}
except Exception as e:
logger.error(f"Error dropping database tables: {str(e)}")
- if appInterface and appInterface.db and appInterface.db.connection:
- appInterface.db.connection.rollback()
+ if 'interface' in locals() and interface and interface.db and interface.db.connection:
+ interface.db.connection.rollback()
raise HTTPException(status_code=500, detail="Failed to drop database tables")
diff --git a/modules/services/serviceCenter.py b/modules/services/serviceCenter.py
index 85b04ed4..0b999772 100644
--- a/modules/services/serviceCenter.py
+++ b/modules/services/serviceCenter.py
@@ -47,10 +47,10 @@ class ServiceCenter:
self._discoverMethods()
def _discoverMethods(self):
- """Dynamically discover all method classes and their actions in modules.methods package"""
+ """Dynamically discover all method classes and their actions in modules methods package"""
try:
# Import the methods package
- methodsPackage = importlib.import_module('modules.methods')
+ methodsPackage = importlib.import_module('modules.workflows.methods')
# Discover all modules in the package
for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__):
diff --git a/modules/workflows/_transfer/handlingTasks.py b/modules/workflows/_transfer/handlingTasks.py
index 5346e2d1..7e64be58 100644
--- a/modules/workflows/_transfer/handlingTasks.py
+++ b/modules/workflows/_transfer/handlingTasks.py
@@ -567,7 +567,10 @@ class HandlingTasks:
state = TaskExecutionState(task_step)
# React mode path - check workflow mode instead of context
- if isinstance(context, TaskContext) and hasattr(context, 'workflow') and context.workflow and getattr(context.workflow, 'workflowMode', 'Actionplan') == 'React':
+ workflow_mode = getattr(context.workflow, 'workflowMode', 'Actionplan') if context.workflow else 'Actionplan'
+ logger.info(f"Task execution - workflow mode: {workflow_mode}")
+ if isinstance(context, TaskContext) and hasattr(context, 'workflow') and context.workflow and workflow_mode == 'React':
+ logger.info(f"Using React mode execution with max_steps: {getattr(context.workflow, 'maxSteps', 5)}")
state.max_steps = max(1, int(getattr(context.workflow, 'maxSteps', 5)))
step = 1
last_review_dict = None
@@ -579,6 +582,7 @@ class HandlingTasks:
try:
t0 = time.time()
selection = await self.plan_select(context)
+ logger.info(f"React step {step}: Selected action: {selection}")
result = await self.act_execute(context, selection, task_step, workflow, step)
observation = self.observe_build(result)
# Attach deterministic label for clarity
@@ -630,6 +634,10 @@ class HandlingTasks:
feedback=feedback,
error=None if success else feedback
)
+ else:
+ # Actionplan mode execution
+ logger.info(f"Using Actionplan mode execution")
+
retry_context = context
max_retries = state.max_retries
for attempt in range(max_retries):
diff --git a/modules/workflows/_transfer/promptFactory.py b/modules/workflows/_transfer/promptFactory.py
index 3cf3f5b5..38686c57 100644
--- a/modules/workflows/_transfer/promptFactory.py
+++ b/modules/workflows/_transfer/promptFactory.py
@@ -887,9 +887,17 @@ def createActionParameterPrompt(context: TaskContext, selected_action: Dict[str,
method = selected_action.get('method', '') if selected_action else ''
name = selected_action.get('name', '') if selected_action else ''
available_docs = _getAvailableDocuments(context.workflow) if context and context.workflow else "No documents available"
+
+ # Get action signature from service center
+ action_signature = ""
+ if service and hasattr(service, 'methods') and method in service.methods:
+ method_instance = service.methods[method]['instance']
+ action_signature = method_instance.getActionSignature(name)
+
return f"""Provide only the required parameters for this action.
SELECTED ACTION: {method}.{name}
+ACTION SIGNATURE: {action_signature}
OBJECTIVE: {context.task_step.objective if context and context.task_step else ''}
AVAILABLE DOCUMENTS: {available_docs}
USER LANGUAGE: {user_language}
@@ -899,6 +907,8 @@ RULES:
- Include user language if relevant.
- Reference documents only by exact labels available.
- Avoid unnecessary fields; host applies defaults.
+- Use the ACTION SIGNATURE above to understand what parameters are required.
+- Convert the objective into appropriate parameter values as needed.
RESPONSE FORMAT (JSON only):
{{"parameters":{{}}}}
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 0d34b5b9..7caf958a 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -21,10 +21,14 @@ class WorkflowManager:
self.chatInterface = chatInterface
self.currentUser = currentUser
self.handlingTasks = None
-
- async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow:
+
+ # Exported functions
+
+ async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: str = "Actionplan") -> ChatWorkflow:
"""Starts a new workflow or continues an existing one, then launches processing."""
try:
+ # Debug log to check workflowMode parameter
+ logger.info(f"WorkflowManager received workflowMode: {workflowMode}")
currentTime = get_utc_timestamp()
if workflowId:
@@ -80,6 +84,8 @@ class WorkflowManager:
"totalActions": 0,
"mandateId": self.chatInterface.mandateId,
"messageIds": [],
+ "workflowMode": workflowMode,
+ "maxSteps": 5 if workflowMode == "React" else 1, # Set maxSteps for React mode
"stats": {
"processingTime": None,
"tokenCount": None,
@@ -91,6 +97,8 @@ class WorkflowManager:
}
workflow = self.chatInterface.createWorkflow(workflowData)
+ logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}")
+ logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}")
workflow.currentRound = 1
self.chatInterface.updateWorkflow(workflow.id, {"currentRound": 1})
self.chatInterface.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0)
@@ -127,7 +135,9 @@ class WorkflowManager:
except Exception as e:
logger.error(f"Error stopping workflow: {str(e)}")
raise
-
+
+ # Main processor
+
async def _workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
"""Process a workflow with user input"""
try:
@@ -143,7 +153,9 @@ class WorkflowManager:
except Exception as e:
self._handleWorkflowError(workflow, e)
-
+
+ # Helper functions
+
async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage:
"""Send first message to start workflow"""
try:
@@ -205,7 +217,9 @@ class WorkflowManager:
task_plan = await handling.generateTaskPlan(userInput.prompt, workflow)
if not task_plan or not task_plan.tasks:
raise Exception("No tasks generated in task plan.")
- logger.info(f"Executing workflow mode={getattr(workflow, 'workflowMode', 'Actionplan')} with {len(task_plan.tasks)} tasks")
+ workflow_mode = getattr(workflow, 'workflowMode', 'Actionplan')
+ logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}")
+ logger.info(f"Executing workflow mode={workflow_mode} with {len(task_plan.tasks)} tasks")
return task_plan
async def _executeTasks(self, task_plan, workflow: ChatWorkflow) -> WorkflowResult: