diff --git a/app.py b/app.py index 5caa64f8..a86f0d37 100644 --- a/app.py +++ b/app.py @@ -309,6 +309,9 @@ app.include_router(connectionsRouter) from modules.routes.routeWorkflows import router as workflowRouter app.include_router(workflowRouter) +from modules.routes.routeChatPlayground import router as chatPlaygroundRouter +app.include_router(chatPlaygroundRouter) + from modules.routes.routeSecurityLocal import router as localRouter app.include_router(localRouter) diff --git a/modules/features/chatPlayground/mainChatPlayground.py b/modules/features/chatPlayground/mainChatPlayground.py index 13eba835..07d43043 100644 --- a/modules/features/chatPlayground/mainChatPlayground.py +++ b/modules/features/chatPlayground/mainChatPlayground.py @@ -8,12 +8,24 @@ from modules.shared.timezoneUtils import get_utc_timestamp logger = logging.getLogger(__name__) -async def chatStart(interfaceChat, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: - """Starts a new chat or continues an existing one, then launches processing asynchronously.""" +async def chatStart(interfaceChat, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: str = "Actionplan") -> ChatWorkflow: + """ + Starts a new chat or continues an existing one, then launches processing asynchronously. + + Args: + interfaceChat: Chat interface instance + currentUser: Current user + userInput: User input request + workflowId: Optional workflow ID to continue existing workflow + workflowMode: "Actionplan" for traditional task planning, "React" for iterative react-style processing + + Example usage for React mode: + workflow = await chatStart(interfaceChat, currentUser, userInput, workflowMode="React") + """ try: from modules.workflows.workflowManager import WorkflowManager workflowManager = WorkflowManager(interfaceChat, currentUser) - return await workflowManager.workflowStart(userInput, workflowId) + return await workflowManager.workflowStart(userInput, workflowId, workflowMode) except Exception as e: logger.error(f"Error starting chat: {str(e)}") raise diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py index 1b0a2af5..a79b2284 100644 --- a/modules/interfaces/interfaceChatObjects.py +++ b/modules/interfaces/interfaceChatObjects.py @@ -270,7 +270,9 @@ class ChatObjects: logs=[], messages=[], stats=None, - mandateId=created.get("mandateId", self.currentUser.mandateId) + mandateId=created.get("mandateId", self.currentUser.mandateId), + workflowMode=created.get("workflowMode", "Actionplan"), + maxSteps=created.get("maxSteps", 1) ) def updateWorkflow(self, workflowId: str, workflowData: Dict[str, Any]) -> ChatWorkflow: @@ -885,6 +887,58 @@ class ChatObjects: stats.sort(key=lambda x: x.get("created_at", ""), reverse=True) return ChatStat(**stats[0]) + def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0) -> None: + """ + Updates workflow statistics in the database. + + Args: + workflowId: ID of the workflow to update + bytesSent: Bytes sent (incremental) + bytesReceived: Bytes received (incremental) + tokenCount: Token count (incremental, default 0) + """ + try: + # Check workflow access first + workflow = self.getWorkflow(workflowId) + if not workflow: + logger.warning(f"No access to workflow {workflowId} for stats update") + return + + if not self._canModify(ChatWorkflow, workflowId): + logger.warning(f"No permission to modify workflow {workflowId} for stats update") + return + + # Get existing stats or create new ones + existing_stats = self.getWorkflowStats(workflowId) + + if existing_stats: + # Update existing stats + updated_stats = { + "bytesSent": (existing_stats.bytesSent or 0) + bytesSent, + "bytesReceived": (existing_stats.bytesReceived or 0) + bytesReceived, + "tokenCount": (existing_stats.tokenCount or 0) + tokenCount, + "lastUpdated": get_utc_timestamp() + } + + # Update the stats record + self.db.recordModify(ChatStat, existing_stats.id, updated_stats) + else: + # Create new stats record + new_stats = { + "workflowId": workflowId, + "bytesSent": bytesSent, + "bytesReceived": bytesReceived, + "tokenCount": tokenCount, + "lastUpdated": get_utc_timestamp() + } + + self.db.recordCreate(ChatStat, new_stats) + + logger.debug(f"Updated workflow stats for {workflowId}: +{bytesSent} sent, +{bytesReceived} received, +{tokenCount} tokens") + + except Exception as e: + logger.error(f"Error updating workflow stats for {workflowId}: {str(e)}") + def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]: """ Returns unified chat data (messages, logs, stats) for a workflow in chronological order. diff --git a/modules/routes/routeChatPlayground.py b/modules/routes/routeChatPlayground.py index 24bc91a3..186e65a8 100644 --- a/modules/routes/routeChatPlayground.py +++ b/modules/routes/routeChatPlayground.py @@ -44,19 +44,23 @@ def getServiceChat(currentUser: User): async def start_workflow( request: Request, workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"), + workflowMode: str = Query("Actionplan", description="Workflow mode: 'Actionplan' or 'React'"), userInput: UserInputRequest = Body(...), currentUser: User = Depends(getCurrentUser) ) -> ChatWorkflow: """ Starts a new workflow or continues an existing one. Corresponds to State 1 in the state machine documentation. + + Args: + workflowMode: "Actionplan" for traditional task planning, "React" for iterative react-style processing """ try: # Get service center interfaceChat = getServiceChat(currentUser) # Start or continue workflow using playground controller - workflow = await chatStart(interfaceChat, currentUser, userInput, workflowId) + workflow = await chatStart(interfaceChat, currentUser, userInput, workflowId, workflowMode) return workflow diff --git a/modules/routes/routeSecurityAdmin.py b/modules/routes/routeSecurityAdmin.py index 2b509a0e..a7203965 100644 --- a/modules/routes/routeSecurityAdmin.py +++ b/modules/routes/routeSecurityAdmin.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/admin", - tags=["Admin"], + tags=["Security Administration"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, @@ -248,9 +248,145 @@ async def list_databases( currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: _ensure_admin_scope(currentUser) - # For safety, expose only configured database name - db_name = APP_CONFIG.get("DB_DATABASE") or APP_CONFIG.get("DB_NAME") or "poweron" - return {"databases": [db_name]} + + # Get database names from configuration for each interface + databases = [] + + # App database (interfaceAppObjects.py) + app_db = APP_CONFIG.get("DB_APP_DATABASE") + if app_db: + databases.append(app_db) + + # Chat database (interfaceChatObjects.py) + chat_db = APP_CONFIG.get("DB_CHAT_DATABASE") + if chat_db: + databases.append(chat_db) + + # Management database (interfaceComponentObjects.py) + management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE") + if management_db: + databases.append(management_db) + + # Fallback to default if no databases configured + if not databases: + databases = ["poweron"] + + return {"databases": databases} + + +@router.get("/databases/{database_name}/tables") +@limiter.limit("30/minute") +async def get_database_tables( + request: Request, + database_name: str, + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + _ensure_admin_scope(currentUser) + + # Get all configured database names + configured_dbs = [] + app_db = APP_CONFIG.get("DB_APP_DATABASE") + if app_db: + configured_dbs.append(app_db) + chat_db = APP_CONFIG.get("DB_CHAT_DATABASE") + if chat_db: + configured_dbs.append(chat_db) + management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE") + if management_db: + configured_dbs.append(management_db) + + if not configured_dbs: + configured_dbs = ["poweron"] + + if database_name not in configured_dbs: + raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}") + + try: + # Use the appropriate interface based on database name + if database_name == app_db: + appInterface = getRootInterface() + tables = appInterface.db.getTables() + elif database_name == chat_db: + from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface + chatInterface = getChatInterface(currentUser) + tables = chatInterface.db.getTables() + elif database_name == management_db: + from modules.interfaces.interfaceComponentObjects import getInterface as getComponentInterface + componentInterface = getComponentInterface(currentUser) + tables = componentInterface.db.getTables() + else: + raise HTTPException(status_code=400, detail="Database not found") + + return {"tables": tables} + except Exception as e: + logger.error(f"Error getting database tables: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to get database tables") + + +@router.post("/databases/{database_name}/tables/{table_name}/drop") +@limiter.limit("10/minute") +async def drop_table( + request: Request, + database_name: str, + table_name: str, + currentUser: User = Depends(getCurrentUser), + payload: Dict[str, Any] = Body(...) +) -> Dict[str, Any]: + _ensure_admin_scope(currentUser) + + # Get all configured database names + configured_dbs = [] + app_db = APP_CONFIG.get("DB_APP_DATABASE") + if app_db: + configured_dbs.append(app_db) + chat_db = APP_CONFIG.get("DB_CHAT_DATABASE") + if chat_db: + configured_dbs.append(chat_db) + management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE") + if management_db: + configured_dbs.append(management_db) + + if not configured_dbs: + configured_dbs = ["poweron"] + + if database_name not in configured_dbs: + raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}") + + try: + # Use the appropriate interface based on database name + if database_name == app_db: + interface = getRootInterface() + elif database_name == chat_db: + from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface + interface = getChatInterface(currentUser) + elif database_name == management_db: + from modules.interfaces.interfaceComponentObjects import getInterface as getComponentInterface + interface = getComponentInterface(currentUser) + else: + raise HTTPException(status_code=400, detail="Database not found") + + conn = interface.db.connection + with conn.cursor() as cursor: + # Check if table exists + cursor.execute(""" + SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = %s + """, (table_name,)) + if not cursor.fetchone(): + raise HTTPException(status_code=404, detail="Table not found") + + # Drop the table + cursor.execute(f'DROP TABLE IF EXISTS "{table_name}" CASCADE') + conn.commit() + logger.warning(f"Admin drop_table executed by {currentUser.id}: dropped table '{table_name}' from database '{database_name}'") + return {"message": f"Table '{table_name}' dropped successfully from database '{database_name}'"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error dropping table: {str(e)}") + if 'interface' in locals() and interface and interface.db and interface.db.connection: + interface.db.connection.rollback() + raise HTTPException(status_code=500, detail="Failed to drop table") @router.post("/databases/drop") @@ -262,13 +398,39 @@ async def drop_database( ) -> Dict[str, Any]: _ensure_admin_scope(currentUser) db_name = payload.get("database") - configured_db = APP_CONFIG.get("DB_DATABASE") or APP_CONFIG.get("DB_NAME") or "poweron" - if not db_name or db_name != configured_db: - raise HTTPException(status_code=400, detail="Invalid database name") + + # Get all configured database names + configured_dbs = [] + app_db = APP_CONFIG.get("DB_APP_DATABASE") + if app_db: + configured_dbs.append(app_db) + chat_db = APP_CONFIG.get("DB_CHAT_DATABASE") + if chat_db: + configured_dbs.append(chat_db) + management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE") + if management_db: + configured_dbs.append(management_db) + + if not configured_dbs: + configured_dbs = ["poweron"] + + if not db_name or db_name not in configured_dbs: + raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}") try: - appInterface = getRootInterface() - conn = appInterface.db.connection + # Use the appropriate interface based on database name + if db_name == app_db: + interface = getRootInterface() + elif db_name == chat_db: + from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface + interface = getChatInterface(currentUser) + elif db_name == management_db: + from modules.interfaces.interfaceComponentObjects import getInterface as getComponentInterface + interface = getComponentInterface(currentUser) + else: + raise HTTPException(status_code=400, detail="Database not found") + + conn = interface.db.connection with conn.cursor() as cursor: # Drop all user tables (public schema) except system table cursor.execute(""" @@ -281,12 +443,12 @@ async def drop_database( cursor.execute(f'DROP TABLE IF EXISTS "{tbl}" CASCADE') dropped.append(tbl) conn.commit() - logger.warning(f"Admin drop_database executed by {currentUser.id}: dropped tables: {dropped}") + logger.warning(f"Admin drop_database executed by {currentUser.id}: dropped tables from '{db_name}': {dropped}") return {"droppedTables": dropped} except Exception as e: logger.error(f"Error dropping database tables: {str(e)}") - if appInterface and appInterface.db and appInterface.db.connection: - appInterface.db.connection.rollback() + if 'interface' in locals() and interface and interface.db and interface.db.connection: + interface.db.connection.rollback() raise HTTPException(status_code=500, detail="Failed to drop database tables") diff --git a/modules/services/serviceCenter.py b/modules/services/serviceCenter.py index 85b04ed4..0b999772 100644 --- a/modules/services/serviceCenter.py +++ b/modules/services/serviceCenter.py @@ -47,10 +47,10 @@ class ServiceCenter: self._discoverMethods() def _discoverMethods(self): - """Dynamically discover all method classes and their actions in modules.methods package""" + """Dynamically discover all method classes and their actions in modules methods package""" try: # Import the methods package - methodsPackage = importlib.import_module('modules.methods') + methodsPackage = importlib.import_module('modules.workflows.methods') # Discover all modules in the package for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__): diff --git a/modules/workflows/_transfer/handlingTasks.py b/modules/workflows/_transfer/handlingTasks.py index 5346e2d1..7e64be58 100644 --- a/modules/workflows/_transfer/handlingTasks.py +++ b/modules/workflows/_transfer/handlingTasks.py @@ -567,7 +567,10 @@ class HandlingTasks: state = TaskExecutionState(task_step) # React mode path - check workflow mode instead of context - if isinstance(context, TaskContext) and hasattr(context, 'workflow') and context.workflow and getattr(context.workflow, 'workflowMode', 'Actionplan') == 'React': + workflow_mode = getattr(context.workflow, 'workflowMode', 'Actionplan') if context.workflow else 'Actionplan' + logger.info(f"Task execution - workflow mode: {workflow_mode}") + if isinstance(context, TaskContext) and hasattr(context, 'workflow') and context.workflow and workflow_mode == 'React': + logger.info(f"Using React mode execution with max_steps: {getattr(context.workflow, 'maxSteps', 5)}") state.max_steps = max(1, int(getattr(context.workflow, 'maxSteps', 5))) step = 1 last_review_dict = None @@ -579,6 +582,7 @@ class HandlingTasks: try: t0 = time.time() selection = await self.plan_select(context) + logger.info(f"React step {step}: Selected action: {selection}") result = await self.act_execute(context, selection, task_step, workflow, step) observation = self.observe_build(result) # Attach deterministic label for clarity @@ -630,6 +634,10 @@ class HandlingTasks: feedback=feedback, error=None if success else feedback ) + else: + # Actionplan mode execution + logger.info(f"Using Actionplan mode execution") + retry_context = context max_retries = state.max_retries for attempt in range(max_retries): diff --git a/modules/workflows/_transfer/promptFactory.py b/modules/workflows/_transfer/promptFactory.py index 3cf3f5b5..38686c57 100644 --- a/modules/workflows/_transfer/promptFactory.py +++ b/modules/workflows/_transfer/promptFactory.py @@ -887,9 +887,17 @@ def createActionParameterPrompt(context: TaskContext, selected_action: Dict[str, method = selected_action.get('method', '') if selected_action else '' name = selected_action.get('name', '') if selected_action else '' available_docs = _getAvailableDocuments(context.workflow) if context and context.workflow else "No documents available" + + # Get action signature from service center + action_signature = "" + if service and hasattr(service, 'methods') and method in service.methods: + method_instance = service.methods[method]['instance'] + action_signature = method_instance.getActionSignature(name) + return f"""Provide only the required parameters for this action. SELECTED ACTION: {method}.{name} +ACTION SIGNATURE: {action_signature} OBJECTIVE: {context.task_step.objective if context and context.task_step else ''} AVAILABLE DOCUMENTS: {available_docs} USER LANGUAGE: {user_language} @@ -899,6 +907,8 @@ RULES: - Include user language if relevant. - Reference documents only by exact labels available. - Avoid unnecessary fields; host applies defaults. +- Use the ACTION SIGNATURE above to understand what parameters are required. +- Convert the objective into appropriate parameter values as needed. RESPONSE FORMAT (JSON only): {{"parameters":{{}}}} diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 0d34b5b9..7caf958a 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -21,10 +21,14 @@ class WorkflowManager: self.chatInterface = chatInterface self.currentUser = currentUser self.handlingTasks = None - - async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: + + # Exported functions + + async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: str = "Actionplan") -> ChatWorkflow: """Starts a new workflow or continues an existing one, then launches processing.""" try: + # Debug log to check workflowMode parameter + logger.info(f"WorkflowManager received workflowMode: {workflowMode}") currentTime = get_utc_timestamp() if workflowId: @@ -80,6 +84,8 @@ class WorkflowManager: "totalActions": 0, "mandateId": self.chatInterface.mandateId, "messageIds": [], + "workflowMode": workflowMode, + "maxSteps": 5 if workflowMode == "React" else 1, # Set maxSteps for React mode "stats": { "processingTime": None, "tokenCount": None, @@ -91,6 +97,8 @@ class WorkflowManager: } workflow = self.chatInterface.createWorkflow(workflowData) + logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}") + logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}") workflow.currentRound = 1 self.chatInterface.updateWorkflow(workflow.id, {"currentRound": 1}) self.chatInterface.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0) @@ -127,7 +135,9 @@ class WorkflowManager: except Exception as e: logger.error(f"Error stopping workflow: {str(e)}") raise - + + # Main processor + async def _workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None: """Process a workflow with user input""" try: @@ -143,7 +153,9 @@ class WorkflowManager: except Exception as e: self._handleWorkflowError(workflow, e) - + + # Helper functions + async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage: """Send first message to start workflow""" try: @@ -205,7 +217,9 @@ class WorkflowManager: task_plan = await handling.generateTaskPlan(userInput.prompt, workflow) if not task_plan or not task_plan.tasks: raise Exception("No tasks generated in task plan.") - logger.info(f"Executing workflow mode={getattr(workflow, 'workflowMode', 'Actionplan')} with {len(task_plan.tasks)} tasks") + workflow_mode = getattr(workflow, 'workflowMode', 'Actionplan') + logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}") + logger.info(f"Executing workflow mode={workflow_mode} with {len(task_plan.tasks)} tasks") return task_plan async def _executeTasks(self, task_plan, workflow: ChatWorkflow) -> WorkflowResult: