diff --git a/app.py b/app.py index 02e05076..5caa64f8 100644 --- a/app.py +++ b/app.py @@ -205,14 +205,10 @@ async def lifespan(app: FastAPI): # Startup logic logger.info("Application is starting up") - # Initialize root interface to ensure database is properly set up - from modules.interfaces.interfaceAppObjects import getRootInterface - getRootInterface() - # Setup APScheduler for JIRA sync scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Zurich")) try: - from modules.features.featureSyncDelta import perform_sync_jira_delta_group + from modules.features.syncDelta.mainSyncDelta import perform_sync_jira_delta_group # Schedule sync every 20 minutes (at minutes 00, 20, 40) scheduler.add_job( perform_sync_jira_delta_group, diff --git a/modules/chat/managerChat.py b/modules/chat/managerChat.py deleted file mode 100644 index 882d46e3..00000000 --- a/modules/chat/managerChat.py +++ /dev/null @@ -1,115 +0,0 @@ -import logging -from typing import Dict, Any, List -from modules.interfaces.interfaceAppModel import User -from modules.interfaces.interfaceChatModel import ChatWorkflow, UserInputRequest, TaskStep, TaskAction, ActionResult, ReviewResult, TaskPlan, WorkflowResult, TaskContext -from modules.interfaces.interfaceChatObjects import ChatObjects -from modules.chat.handling.handlingTasks import HandlingTasks, WorkflowStoppedException - -logger = logging.getLogger(__name__) - -# ===== STATE MANAGEMENT AND VALIDATION CLASSES ===== - -class ChatManager: - """Chat manager with improved AI integration and method handling""" - - def __init__(self, currentUser: User, chatInterface: ChatObjects): - self.currentUser = currentUser - self.chatInterface = chatInterface - self.workflow: ChatWorkflow = None - self.handlingTasks: HandlingTasks = None - - async def initialize(self, workflow: ChatWorkflow) -> None: - """Initialize chat manager with workflow""" - self.workflow = workflow - self.handlingTasks = HandlingTasks(self.chatInterface, self.currentUser, self.workflow) - - - async def executeUnifiedWorkflow(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> WorkflowResult: - """Unified Workflow Execution""" - try: - logger.info(f"Starting unified workflow execution for workflow {workflow.id}") - - # Phase 1: High-Level Task Planning - logger.info("Phase 1: Generating task plan") - task_plan = await self.handlingTasks.generateTaskPlan(userInput.prompt, workflow) - if not task_plan or not task_plan.tasks: - raise Exception("No tasks generated in task plan.") - - # Phase 2-5: For each task, execute and get results - total_tasks = len(task_plan.tasks) - logger.info(f"Phase 2: Executing {total_tasks} tasks") - all_task_results = [] - previous_results = [] - for idx, task_step in enumerate(task_plan.tasks): - # Pass task index to executeTask method - current_task_index = idx + 1 - - logger.info(f"Task {idx+1}/{total_tasks}: {task_step.objective}") - - # Create proper context object for this task - task_context = TaskContext( - task_step=task_step, - workflow=workflow, - workflow_id=workflow.id, - available_documents=None, - available_connections=None, - previous_results=previous_results, - previous_handover=None, - improvements=[], - retry_count=0, - previous_action_results=[], - previous_review_result=None, - is_regeneration=False, - failure_patterns=[], - failed_actions=[], - successful_actions=[], - criteria_progress={ - 'met_criteria': set(), - 'unmet_criteria': set(), - 'attempt_history': [] - } - ) - - # Execute task (this handles action generation, execution, and review internally) - task_result = await self.handlingTasks.executeTask(task_step, workflow, task_context, current_task_index, total_tasks) - # Handover - handover_data = await self.handlingTasks.prepareTaskHandover(task_step, [], task_result, workflow) - # Collect results - all_task_results.append({ - 'task_step': task_step, - 'task_result': task_result, - 'handover_data': handover_data - }) - # Update previous results for next task - if task_result.success and task_result.feedback: - previous_results.append(task_result.feedback) - - # Final workflow result - workflow_result = WorkflowResult( - status="completed", - completed_tasks=len(all_task_results), - total_tasks=len(task_plan.tasks), - execution_time=0.0, # TODO: Calculate actual execution time - final_results_count=len(all_task_results) - ) - logger.info(f"Unified workflow execution completed successfully for workflow {workflow.id}") - return workflow_result - except WorkflowStoppedException: - logger.info(f"Workflow {workflow.id} was stopped by user") - return WorkflowResult( - status="stopped", - completed_tasks=0, - total_tasks=0, - execution_time=0.0, - final_results_count=0 - ) - except Exception as e: - logger.error(f"Error in executeUnifiedWorkflow: {str(e)}") - return WorkflowResult( - status="failed", - completed_tasks=0, - total_tasks=0, - execution_time=0.0, - final_results_count=0, - error=str(e) - ) diff --git a/modules/features/chatPlayground/mainChatPlayground.py b/modules/features/chatPlayground/mainChatPlayground.py new file mode 100644 index 00000000..13eba835 --- /dev/null +++ b/modules/features/chatPlayground/mainChatPlayground.py @@ -0,0 +1,29 @@ +import logging +import asyncio +from typing import Optional + +from modules.interfaces.interfaceAppModel import User +from modules.interfaces.interfaceChatModel import ChatWorkflow, UserInputRequest +from modules.shared.timezoneUtils import get_utc_timestamp + +logger = logging.getLogger(__name__) + +async def chatStart(interfaceChat, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: + """Starts a new chat or continues an existing one, then launches processing asynchronously.""" + try: + from modules.workflows.workflowManager import WorkflowManager + workflowManager = WorkflowManager(interfaceChat, currentUser) + return await workflowManager.workflowStart(userInput, workflowId) + except Exception as e: + logger.error(f"Error starting chat: {str(e)}") + raise + +async def chatStop(interfaceChat, currentUser: User, workflowId: str) -> ChatWorkflow: + """Stops a running chat.""" + try: + from modules.workflows.workflowManager import WorkflowManager + workflowManager = WorkflowManager(interfaceChat, currentUser) + return await workflowManager.workflowStop(workflowId) + except Exception as e: + logger.error(f"Error stopping chat: {str(e)}") + raise diff --git a/modules/features/featureNeutralizePlayground.py b/modules/features/neutralizePlayground/mainNeutralizePlayground.py similarity index 99% rename from modules/features/featureNeutralizePlayground.py rename to modules/features/neutralizePlayground/mainNeutralizePlayground.py index e5c75a37..877ca8aa 100644 --- a/modules/features/featureNeutralizePlayground.py +++ b/modules/features/neutralizePlayground/mainNeutralizePlayground.py @@ -13,7 +13,7 @@ import mimetypes from modules.interfaces.interfaceAppObjects import getInterface from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes -from modules.neutralizer.neutralizer import DataAnonymizer +from modules.services.serviceNeutralization.neutralizer import DataAnonymizer from modules.shared.timezoneUtils import get_utc_timestamp logger = logging.getLogger(__name__) diff --git a/modules/features/featureSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py similarity index 100% rename from modules/features/featureSyncDelta.py rename to modules/features/syncDelta/mainSyncDelta.py diff --git a/modules/interfaces/interfaceAiCalls.py b/modules/interfaces/interfaceAiCalls.py index f0bb67b4..6f0de9c9 100644 --- a/modules/interfaces/interfaceAiCalls.py +++ b/modules/interfaces/interfaceAiCalls.py @@ -2,7 +2,7 @@ import logging from typing import Dict, Any, List, Union, Optional from modules.connectors.connectorAiOpenai import AiOpenai, ContextLengthExceededException from modules.connectors.connectorAiAnthropic import AiAnthropic -from modules.chat.documents.documentExtraction import DocumentExtraction +from modules.services.serviceDocument.documentExtraction import DocumentExtraction from modules.interfaces.interfaceChatModel import ChatDocument logger = logging.getLogger(__name__) diff --git a/modules/interfaces/interfaceAppObjects.py b/modules/interfaces/interfaceAppObjects.py index ed8fdca1..ccd471f1 100644 --- a/modules/interfaces/interfaceAppObjects.py +++ b/modules/interfaces/interfaceAppObjects.py @@ -1141,7 +1141,7 @@ class AppObjects: def neutralizeText(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]: """Neutralize text content and store attribute mappings""" try: - from modules.neutralizer.neutralizer import DataAnonymizer + from modules.services.serviceNeutralization.neutralizer import DataAnonymizer # Get neutralization configuration to extract namesToParse config = self.getNeutralizationConfig() diff --git a/modules/interfaces/interfaceChatModel.py b/modules/interfaces/interfaceChatModel.py index ed71963a..9ead0fb2 100644 --- a/modules/interfaces/interfaceChatModel.py +++ b/modules/interfaces/interfaceChatModel.py @@ -80,6 +80,70 @@ register_model_labels( } ) +# ===== Minimal ReAct-style Workflow Models ===== + +class ActionSelection(BaseModel, ModelMixin): + """Model for selecting exactly one action in a step""" + method: str = Field(description="Method to execute (e.g., web, document, ai)") + name: str = Field(description="Action name within the method (e.g., search, extract)") + +register_model_labels( + "ActionSelection", + {"en": "Action Selection", "fr": "Sélection d'action"}, + { + "method": {"en": "Method", "fr": "Méthode"}, + "name": {"en": "Action Name", "fr": "Nom de l'action"} + } +) + +class ActionParameters(BaseModel, ModelMixin): + """Model for specifying only the parameters for the selected action""" + parameters: Dict[str, Any] = Field(default_factory=dict, description="Parameters to execute the selected action") + +register_model_labels( + "ActionParameters", + {"en": "Action Parameters", "fr": "Paramètres d'action"}, + { + "parameters": {"en": "Parameters", "fr": "Paramètres"} + } +) + +class ObservationPreview(BaseModel, ModelMixin): + """Compact preview item for observations""" + name: str = Field(description="Document name or URL label") + mime: str = Field(description="MIME type or kind") + snippet: str = Field(description="Short snippet or summary") + +register_model_labels( + "ObservationPreview", + {"en": "Observation Preview", "fr": "Aperçu d'observation"}, + { + "name": {"en": "Name", "fr": "Nom"}, + "mime": {"en": "MIME", "fr": "MIME"}, + "snippet": {"en": "Snippet", "fr": "Extrait"} + } +) + +class Observation(BaseModel, ModelMixin): + """Compact observation returned to the model after each action""" + success: bool = Field(description="Action execution success flag") + resultLabel: str = Field(description="Deterministic label for produced documents") + documentsCount: int = Field(description="Number of produced documents") + previews: List[ObservationPreview] = Field(default_factory=list, description="Compact previews of outputs") + notes: List[str] = Field(default_factory=list, description="Short notes or key facts") + +register_model_labels( + "Observation", + {"en": "Observation", "fr": "Observation"}, + { + "success": {"en": "Success", "fr": "Succès"}, + "resultLabel": {"en": "Result Label", "fr": "Étiquette du résultat"}, + "documentsCount": {"en": "Documents Count", "fr": "Nombre de documents"}, + "previews": {"en": "Previews", "fr": "Aperçus"}, + "notes": {"en": "Notes", "fr": "Notes"} + } +) + # ===== Base Enums and Simple Models ===== class TaskStatus(str, Enum): @@ -630,6 +694,25 @@ class ChatWorkflow(BaseModel, ModelMixin): frontend_readonly=True, frontend_required=False ) + # Workflow mode selection (e.g., Actionplan, React) + workflowMode: str = Field( + default="Actionplan", + description="Workflow mode selector", + frontend_type="select", + frontend_readonly=False, + frontend_required=False, + frontend_options=[ + {"value": "Actionplan", "label": {"en": "Action Plan", "fr": "Plan d'actions"}}, + {"value": "React", "label": {"en": "React", "fr": "Réactif"}} + ] + ) + maxSteps: int = Field( + default=5, + description="Maximum number of iterations in react mode", + frontend_type="integer", + frontend_readonly=False, + frontend_required=False + ) # Register labels for ChatWorkflow register_model_labels( @@ -650,11 +733,13 @@ register_model_labels( "logs": {"en": "Logs", "fr": "Journaux"}, "messages": {"en": "Messages", "fr": "Messages"}, "stats": {"en": "Statistics", "fr": "Statistiques"}, - "tasks": {"en": "Tasks", "fr": "Tâches"} + "tasks": {"en": "Tasks", "fr": "Tâches"}, + "workflowMode": {"en": "Workflow Mode", "fr": "Mode de workflow"}, + "maxSteps": {"en": "Max Steps", "fr": "Étapes max"} } ) -# ====== WORKFLOW SUPPORT MODELS (for managerChat.py compatibility) ====== +# ====== WORKFLOW SUPPORT MODELS ====== class TaskStep(BaseModel, ModelMixin): id: str @@ -763,6 +848,9 @@ class TaskContext(BaseModel, ModelMixin): # Criteria progress tracking for retries criteria_progress: Optional[dict] = None + # Iterative loop controls (moved to ChatWorkflow.workflowMode and ChatWorkflow.maxSteps) + # reactMode and maxSteps are now controlled at the workflow level + def getDocumentReferences(self) -> List[str]: """Get all available document references from previous handover""" docs = [] diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py index 7b6806da..1b0a2af5 100644 --- a/modules/interfaces/interfaceChatObjects.py +++ b/modules/interfaces/interfaceChatObjects.py @@ -748,10 +748,9 @@ class ChatObjects: except Exception as e: logger.error(f"Error removing file {fileId} from message {messageId}: {str(e)}") return False - # Document methods - + def getDocuments(self, messageId: str) -> List[ChatDocument]: """Returns documents for a message from normalized table.""" try: @@ -910,7 +909,7 @@ class ChatObjects: msg_timestamp = msg.get("publishedAt", get_utc_timestamp()) if afterTimestamp is not None and msg_timestamp <= afterTimestamp: continue - + # Load documents for each message documents = self.getDocuments(msg["id"]) @@ -952,7 +951,7 @@ class ChatObjects: log_timestamp = log.get("timestamp", get_utc_timestamp()) if afterTimestamp is not None and log_timestamp <= afterTimestamp: continue - + chat_log = ChatLog(**log) items.append({ "type": "log", @@ -967,7 +966,7 @@ class ChatObjects: stat_timestamp = stat.get("_createdAt", get_utc_timestamp()) if afterTimestamp is not None and stat_timestamp <= afterTimestamp: continue - + chat_stat = ChatStat(**stat) items.append({ "type": "stat", @@ -980,234 +979,6 @@ class ChatObjects: return {"items": items} - def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool: - """Updates workflow statistics during execution with incremental values.""" - try: - # Get current workflow - workflow = self.getWorkflow(workflowId) - if not workflow: - logger.error(f"Workflow {workflowId} not found for stats update") - return False - - if not self._canModify(ChatWorkflow, workflowId): - logger.error(f"No permission to update workflow {workflowId} stats") - return False - - # Get current stats from normalized table - currentStats = self.getWorkflowStats(workflowId) - if currentStats: - current_bytes_sent = currentStats.bytesSent or 0 - current_bytes_received = currentStats.bytesReceived or 0 - current_processing_time = currentStats.processingTime or 0 - else: - current_bytes_sent = 0 - current_bytes_received = 0 - current_processing_time = 0 - - # Calculate processing time as duration since workflow start - if workflow and workflow.startedAt: - try: - start_time = int(float(workflow.startedAt)) - current_time = int(get_utc_timestamp()) - processing_time = current_time - start_time - - # Ensure processing time is reasonable - if processing_time < 0: - processing_time = 0 - elif processing_time > 86400 * 365: # More than 1 year - processing_time = 0 - except Exception as e: - logger.warning(f"Error calculating processing time: {str(e)}") - processing_time = current_processing_time - else: - processing_time = current_processing_time - - # Update stats with incremental values - new_bytes_sent = current_bytes_sent + bytesSent - new_bytes_received = current_bytes_received + bytesReceived - new_token_count = new_bytes_sent + new_bytes_received - - # Create or update stats record in normalized table - stats_record = { - "workflowId": workflowId, - "processingTime": processing_time, - "tokenCount": new_token_count, - "bytesSent": new_bytes_sent, - "bytesReceived": new_bytes_received, - "successRate": None, - "errorCount": None - } - - # Create new stats record - self.db.recordCreate(ChatStat, stats_record) - - - return True - - except Exception as e: - logger.error(f"Error updating workflow stats: {str(e)}") - return False - - - # Workflow Actions - - async def workflowStart(self, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: - """ - Starts a new workflow or continues an existing one. - - Args: - userInput: The user input request containing workflow initialization data - workflowId: Optional ID of an existing workflow to continue - - Returns: - ChatWorkflow object representing the started/continued workflow - """ - try: - # Get current timestamp - currentTime = get_utc_timestamp() - - if workflowId: - # Continue existing workflow - load complete state including messages - workflow = self.getWorkflow(workflowId) - if not workflow: - raise ValueError(f"Workflow {workflowId} not found") - - # Check if workflow is currently running and stop it first - if workflow.status == "running": - logger.info(f"Stopping running workflow {workflowId} before processing new prompt") - - # Stop the running workflow - workflow.status = "stopped" - workflow.lastActivity = currentTime - self.updateWorkflow(workflowId, { - "status": "stopped", - "lastActivity": currentTime - }) - - # Add log entry for workflow stop - self.createLog({ - "workflowId": workflowId, - "message": "Workflow stopped for new prompt", - "type": "info", - "status": "stopped", - "progress": 100 - }) - - # Wait a moment for any running processes to detect the stop - await asyncio.sleep(0.1) - - # Update workflow - increment round for existing workflows - newRound = workflow.currentRound + 1 - self.updateWorkflow(workflowId, { - "status": "running", # Set status back to running for resumed workflows - "lastActivity": currentTime, - "currentRound": newRound - }) - - # Reload workflow object to get updated currentRound from database - workflow = self.getWorkflow(workflowId) - if not workflow: - raise ValueError(f"Failed to reload workflow {workflowId} after update") - - # Add log entry for workflow resumption - self.createLog({ - "workflowId": workflowId, - "message": f"Workflow resumed (round {workflow.currentRound})", - "type": "info", - "status": "running", - "progress": 0 - }) - - else: - # Create new workflow - workflowData = { - "name": "New Workflow", # Default name since UserInputRequest doesn't have a name field - "status": "running", - "startedAt": currentTime, - "lastActivity": currentTime, - "currentRound": 0, # Default value, will be set to 1 in workflowStart() - "currentTask": 0, - "currentAction": 0, - "totalTasks": 0, - "totalActions": 0, - "mandateId": self.mandateId, - "messageIds": [], - "stats": { - "processingTime": None, - "tokenCount": None, - "bytesSent": None, - "bytesReceived": None, - "successRate": None, - "errorCount": None - } - } - - # Create workflow - workflow = self.createWorkflow(workflowData) - - # Set currentRound to 1 for new workflows - workflow.currentRound = 1 - self.updateWorkflow(workflow.id, {"currentRound": 1}) - - # Initialize stats for the new workflow - self.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0) - - # Remove the 'Workflow started' log entry - - # Start workflow processing - from modules.features.featureChatPlayground import WorkflowManager - workflowManager = WorkflowManager(self, currentUser) - - # Start the workflow processing asynchronously - # The workflow will be updated with progress data during execution - asyncio.create_task(workflowManager.workflowProcess(userInput, workflow)) - - return workflow - - except Exception as e: - logger.error(f"Error starting workflow: {str(e)}") - raise - - async def workflowStop(self, workflowId: str) -> ChatWorkflow: - """ - Stops a running workflow (State 8: Workflow Stopped). - - Args: - workflowId: ID of the workflow to stop - - Returns: - Updated ChatWorkflow object - """ - try: - # Load workflow state - workflow = self.getWorkflow(workflowId) - if not workflow: - raise ValueError(f"Workflow {workflowId} not found") - - # Update workflow status - workflow.status = "stopped" - workflow.lastActivity = get_utc_timestamp() - - # Update in database - self.updateWorkflow(workflowId, { - "status": "stopped", - "lastActivity": workflow.lastActivity - }) - - # Add log entry - self.createLog({ - "workflowId": workflowId, - "message": "Workflow stopped", - "type": "warning", - "status": "stopped", - "progress": 100 - }) - - return workflow - - except Exception as e: - logger.error(f"Error stopping workflow: {str(e)}") - raise def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects': """ diff --git a/modules/routes/routeAdmin.py b/modules/routes/routeAdmin.py index 60663598..4ddfcf84 100644 --- a/modules/routes/routeAdmin.py +++ b/modules/routes/routeAdmin.py @@ -12,8 +12,6 @@ from modules.shared.configuration import APP_CONFIG from modules.security.auth import limiter, getCurrentUser from modules.interfaces.interfaceAppModel import User from modules.interfaces.interfaceAppObjects import getRootInterface -from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface -from modules.interfaces.interfaceComponentObjects import getInterface as getComponentInterface # Static folder setup - using absolute path from app root baseDir = FilePath(__file__).parent.parent.parent # Go up to gateway root @@ -31,43 +29,6 @@ router = APIRouter( # Mount static files router.mount("/static", StaticFiles(directory=str(staticFolder), html=True), name="static") -def get_interface_for_database(database_name: str, currentUser: User): - """ - Get the appropriate interface based on database name. - - Args: - database_name: Name of the database - currentUser: Current user for interface initialization - - Returns: - Interface object for the specified database - - Raises: - HTTPException: If database name is unknown or interface cannot be created - """ - # Get database names from configuration - appDbName = APP_CONFIG.get("DB_APP_DATABASE") - chatDbName = APP_CONFIG.get("DB_CHAT_DATABASE") - managementDbName = APP_CONFIG.get("DB_MANAGEMENT_DATABASE") - - if not appDbName: - raise HTTPException(status_code=500, detail="DB_APP_DATABASE configuration is required") - - # Map database names to their corresponding interfaces - if database_name == appDbName: - return getRootInterface() - elif chatDbName and database_name == chatDbName: - return getChatInterface(currentUser) - elif managementDbName and database_name == managementDbName: - return getComponentInterface(currentUser) - else: - available_dbs = [appDbName] - if chatDbName: - available_dbs.append(chatDbName) - if managementDbName: - available_dbs.append(managementDbName) - raise HTTPException(status_code=400, detail=f"Unknown database. Available: {', '.join(available_dbs)}") - @router.get("/") @limiter.limit("30/minute") async def root(request: Request) -> Dict[str, str]: @@ -117,183 +78,3 @@ async def options_route(request: Request, fullPath: str) -> Response: async def favicon(request: Request) -> FileResponse: return FileResponse(str(staticFolder / "favicon.ico"), media_type="image/x-icon") -# ---------------------- -# Log Management -# ---------------------- - -@router.get("/api/logs/app") -@limiter.limit("10/minute") -async def download_app_log(request: Request, currentUser: User = Depends(getCurrentUser)) -> FileResponse: - """Download the current day's application log file""" - # Check if user has admin privileges - if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'): - raise HTTPException(status_code=403, detail="Admin privileges required") - - # Get log directory from config - logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR") - if not logDir: - raise HTTPException(status_code=500, detail="APP_LOGGING_LOG_DIR configuration is required") - - if not os.path.isabs(logDir): - # If relative path, make it relative to the gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - - # Get current date for log file - today = datetime.now().strftime("%Y%m%d") - logFile = os.path.join(logDir, f"log_app_{today}.log") - - if not os.path.exists(logFile): - raise HTTPException(status_code=404, detail=f"Application log file for today not found: {logFile}") - - return FileResponse( - path=logFile, - filename=f"log_app_{today}.log", - media_type="text/plain" - ) - -@router.get("/api/logs/audit") -@limiter.limit("10/minute") -async def download_audit_log(request: Request, currentUser: User = Depends(getCurrentUser)) -> FileResponse: - """Download the current day's audit log file""" - # Check if user has admin privileges - if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'): - raise HTTPException(status_code=403, detail="Admin privileges required") - - # Get log directory from config - logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR") - if not logDir: - raise HTTPException(status_code=500, detail="APP_LOGGING_LOG_DIR configuration is required") - - if not os.path.isabs(logDir): - # If relative path, make it relative to the gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - - # Get current date for log file - today = datetime.now().strftime("%Y%m%d") - logFile = os.path.join(logDir, f"log_audit_{today}.log") - - if not os.path.exists(logFile): - raise HTTPException(status_code=404, detail=f"Audit log file for today not found: {logFile}") - - return FileResponse( - path=logFile, - filename=f"log_audit_{today}.log", - media_type="text/plain" - ) - -# ---------------------- -# Database Management -# ---------------------- - -@router.get("/api/databases") -@limiter.limit("10/minute") -async def list_databases(request: Request, currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]: - """List available databases""" - # Check if user has admin privileges - if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'): - raise HTTPException(status_code=403, detail="Admin privileges required") - - try: - # Get configured database names from configuration - databases = [] - - # App database - required configuration - appDb = APP_CONFIG.get("DB_APP_DATABASE") - if not appDb: - raise HTTPException(status_code=500, detail="DB_APP_DATABASE configuration is required") - databases.append(appDb) - - # Chat database - optional configuration - chatDb = APP_CONFIG.get("DB_CHAT_DATABASE") - if chatDb and chatDb not in databases: - databases.append(chatDb) - - # Management database - optional configuration - managementDb = APP_CONFIG.get("DB_MANAGEMENT_DATABASE") - if managementDb and managementDb not in databases: - databases.append(managementDb) - - return {"databases": databases} - except HTTPException: - raise - except Exception as e: - logger.error(f"Error listing databases: {e}") - raise HTTPException(status_code=500, detail="Failed to list databases") - -@router.get("/api/databases/{database_name}/tables") -@limiter.limit("10/minute") -async def list_tables( - request: Request, - database_name: str, - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """List tables in a specific database""" - # Check if user has admin privileges - if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'): - raise HTTPException(status_code=403, detail="Admin privileges required") - - try: - # Get the appropriate interface based on database name - interface = get_interface_for_database(database_name, currentUser) - - # Check if interface and database connection exist - if not interface or not interface.db: - raise HTTPException(status_code=500, detail="Database interface not available") - - # Get tables from database - tables = interface.db.getTables() - - return {"database": database_name, "tables": tables} - except HTTPException: - raise - except Exception as e: - logger.error(f"Error listing tables for database {database_name}: {e}") - raise HTTPException(status_code=500, detail=f"Failed to list tables for database {database_name}") - -@router.post("/api/databases/{database_name}/tables/drop") -@limiter.limit("5/minute") -async def drop_table( - request: Request, - database_name: str, - currentUser: User = Depends(getCurrentUser), - payload: Dict[str, Any] = Body(...) -) -> Dict[str, Any]: - """Drop a specific table from a database""" - # Check if user has admin privileges - if not hasattr(currentUser, 'privilege') or currentUser.privilege not in ('admin', 'sysadmin'): - raise HTTPException(status_code=403, detail="Admin privileges required") - - table_name = payload.get("table") - if not table_name: - raise HTTPException(status_code=400, detail="Table name is required") - - try: - # Get the appropriate interface based on database name - interface = get_interface_for_database(database_name, currentUser) - - # Check if interface and database connection exist - if not interface or not interface.db: - raise HTTPException(status_code=500, detail="Database interface not available") - - # Check if table exists - tables = interface.db.getTables() - if table_name not in tables: - raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found in database '{database_name}'") - - # Drop the table - with interface.db.connection.cursor() as cursor: - cursor.execute(f'DROP TABLE IF EXISTS "{table_name}" CASCADE') - interface.db.connection.commit() - - logger.warning(f"Admin drop_table executed by {currentUser.id}: dropped table '{table_name}' from database '{database_name}'") - return {"message": f"Table '{table_name}' dropped successfully from database '{database_name}'"} - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error dropping table {table_name} from database {database_name}: {e}") - if 'interface' in locals() and interface.db.connection: - interface.db.connection.rollback() - raise HTTPException(status_code=500, detail=f"Failed to drop table '{table_name}' from database '{database_name}'") diff --git a/modules/routes/routeChatPlayground.py b/modules/routes/routeChatPlayground.py new file mode 100644 index 00000000..24bc91a3 --- /dev/null +++ b/modules/routes/routeChatPlayground.py @@ -0,0 +1,132 @@ +""" +Chat Playground routes for the backend API. +Implements the endpoints for chat playground workflow management. +""" + +import logging +from typing import Optional, Dict, Any +from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request +from datetime import datetime + +# Import auth modules +from modules.security.auth import limiter, getCurrentUser + +# Import interfaces +import modules.interfaces.interfaceChatObjects as interfaceChatObjects +from modules.interfaces.interfaceChatObjects import getInterface + +# Import models +from modules.interfaces.interfaceChatModel import ( + ChatWorkflow, + UserInputRequest +) +from modules.interfaces.interfaceAppModel import User + +# Import workflow control functions +from modules.features.chatPlayground.mainChatPlayground import chatStart, chatStop + +# Configure logger +logger = logging.getLogger(__name__) + +# Create router for chat playground endpoints +router = APIRouter( + prefix="/api/chat/playground", + tags=["Chat Playground"], + responses={404: {"description": "Not found"}} +) + +def getServiceChat(currentUser: User): + return interfaceChatObjects.getInterface(currentUser) + +# Workflow start endpoint +@router.post("/start", response_model=ChatWorkflow) +@limiter.limit("120/minute") +async def start_workflow( + request: Request, + workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"), + userInput: UserInputRequest = Body(...), + currentUser: User = Depends(getCurrentUser) +) -> ChatWorkflow: + """ + Starts a new workflow or continues an existing one. + Corresponds to State 1 in the state machine documentation. + """ + try: + # Get service center + interfaceChat = getServiceChat(currentUser) + + # Start or continue workflow using playground controller + workflow = await chatStart(interfaceChat, currentUser, userInput, workflowId) + + return workflow + + except Exception as e: + logger.error(f"Error in start_workflow: {str(e)}") + raise HTTPException( + status_code=500, + detail=str(e) + ) + +# State 8: Workflow Stopped endpoint +@router.post("/{workflowId}/stop", response_model=ChatWorkflow) +@limiter.limit("120/minute") +async def stop_workflow( + request: Request, + workflowId: str = Path(..., description="ID of the workflow to stop"), + currentUser: User = Depends(getCurrentUser) +) -> ChatWorkflow: + """Stops a running workflow.""" + try: + # Get service center + interfaceChat = getServiceChat(currentUser) + + # Stop workflow using playground controller + workflow = await chatStop(interfaceChat, currentUser, workflowId) + + return workflow + + except Exception as e: + logger.error(f"Error in stop_workflow: {str(e)}") + raise HTTPException( + status_code=500, + detail=str(e) + ) + +# Unified Chat Data Endpoint for Polling +@router.get("/{workflowId}/chatData") +@limiter.limit("120/minute") +async def get_workflow_chat_data( + request: Request, + workflowId: str = Path(..., description="ID of the workflow"), + afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """ + Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer. + Returns all data types in chronological order based on _createdAt timestamp. + """ + try: + # Get service center + interfaceChat = getServiceChat(currentUser) + + # Verify workflow exists + workflow = interfaceChat.getWorkflow(workflowId) + if not workflow: + raise HTTPException( + status_code=404, + detail=f"Workflow with ID {workflowId} not found" + ) + + # Get unified chat data using the new method + chatData = interfaceChat.getUnifiedChatData(workflowId, afterTimestamp) + + return chatData + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Error getting unified chat data: {str(e)}" + ) diff --git a/modules/routes/routeDataNeutralization.py b/modules/routes/routeDataNeutralization.py index 697c6f1c..939c4422 100644 --- a/modules/routes/routeDataNeutralization.py +++ b/modules/routes/routeDataNeutralization.py @@ -7,7 +7,7 @@ from modules.security.auth import limiter, getCurrentUser # Import interfaces from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes -from modules.features.featureNeutralizePlayground import NeutralizationService +from modules.features.neutralizePlayground.mainNeutralizePlayground import NeutralizationService # Configure logger logger = logging.getLogger(__name__) diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py index fe70e347..7b9dd8f9 100644 --- a/modules/routes/routeWorkflows.py +++ b/modules/routes/routeWorkflows.py @@ -24,13 +24,13 @@ from modules.interfaces.interfaceChatModel import ( ChatMessage, ChatLog, ChatStat, - ChatDocument, - UserInputRequest + ChatDocument ) from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse from modules.interfaces.interfaceAppModel import User from modules.shared.timezoneUtils import get_utc_timestamp + # Configure logger logger = logging.getLogger(__name__) @@ -276,59 +276,6 @@ async def get_workflow_messages( detail=f"Error getting workflow messages: {str(e)}" ) -# State 1: Workflow Initialization endpoint -@router.post("/start", response_model=ChatWorkflow) -@limiter.limit("120/minute") -async def start_workflow( - request: Request, - workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"), - userInput: UserInputRequest = Body(...), - currentUser: User = Depends(getCurrentUser) -) -> ChatWorkflow: - """ - Starts a new workflow or continues an existing one. - Corresponds to State 1 in the state machine documentation. - """ - try: - # Get service center - interfaceChat = getServiceChat(currentUser) - - # Start or continue workflow using ChatObjects - workflow = await interfaceChat.workflowStart(currentUser, userInput, workflowId) - - return workflow - - except Exception as e: - logger.error(f"Error in start_workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) - -# State 8: Workflow Stopped endpoint -@router.post("/{workflowId}/stop", response_model=ChatWorkflow) -@limiter.limit("120/minute") -async def stop_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to stop"), - currentUser: User = Depends(getCurrentUser) -) -> ChatWorkflow: - """Stops a running workflow.""" - try: - # Get service center - interfaceChat = getServiceChat(currentUser) - - # Stop workflow using ChatObjects - workflow = await interfaceChat.workflowStop(workflowId) - - return workflow - - except Exception as e: - logger.error(f"Error in stop_workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) # State 11: Workflow Reset/Deletion endpoint @router.delete("/{workflowId}", response_model=Dict[str, Any]) @@ -383,45 +330,6 @@ async def delete_workflow( ) -# Unified Chat Data Endpoint for Polling -@router.get("/{workflowId}/chatData") -@limiter.limit("120/minute") -async def get_workflow_chat_data( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"), - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """ - Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer. - Returns all data types in chronological order based on _createdAt timestamp. - """ - try: - # Get service center - interfaceChat = getServiceChat(currentUser) - - # Verify workflow exists - workflow = interfaceChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Get unified chat data using the new method - chatData = interfaceChat.getUnifiedChatData(workflowId, afterTimestamp) - - return chatData - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error getting unified chat data: {str(e)}" - ) - # Document Management Endpoints @router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any]) diff --git a/modules/chat/serviceCenter.py b/modules/services/serviceCenter.py similarity index 99% rename from modules/chat/serviceCenter.py rename to modules/services/serviceCenter.py index 55648ead..85b04ed4 100644 --- a/modules/chat/serviceCenter.py +++ b/modules/services/serviceCenter.py @@ -13,9 +13,9 @@ from modules.interfaces.interfaceChatObjects import getInterface as getChatObjec from modules.interfaces.interfaceChatModel import ActionResult from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects -from modules.chat.documents.documentExtraction import DocumentExtraction -from modules.chat.documents.documentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData -from modules.methods.methodBase import MethodBase +from modules.services.serviceDocument.documentExtraction import DocumentExtraction +from modules.services.serviceDocument.documentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData +from modules.workflows.methods.methodBase import MethodBase from modules.shared.timezoneUtils import get_utc_timestamp import uuid @@ -57,7 +57,7 @@ class ServiceCenter: if not isPkg and name.startswith('method'): try: # Import the module - module = importlib.import_module(f'modules.methods.{name}') + module = importlib.import_module(f'modules.workflows.methods.{name}') # Find all classes in the module that inherit from MethodBase for itemName, item in inspect.getmembers(module): diff --git a/modules/chat/documents/documentExtraction.py b/modules/services/serviceDocument/documentExtraction.py similarity index 99% rename from modules/chat/documents/documentExtraction.py rename to modules/services/serviceDocument/documentExtraction.py index b6165b9e..0a73e46a 100644 --- a/modules/chat/documents/documentExtraction.py +++ b/modules/services/serviceDocument/documentExtraction.py @@ -9,7 +9,7 @@ from pathlib import Path import xml.etree.ElementTree as ET from bs4 import BeautifulSoup import uuid -from modules.chat.documents.documentUtility import ( +from modules.services.serviceDocument.documentUtility import ( getFileExtension, getMimeTypeFromExtension, detectMimeTypeFromContent, @@ -22,7 +22,7 @@ from modules.interfaces.interfaceChatModel import ( ContentItem, ContentMetadata ) -from modules.neutralizer.neutralizer import DataAnonymizer +from modules.services.serviceNeutralization.neutralizer import DataAnonymizer from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) diff --git a/modules/chat/documents/documentGeneration.py b/modules/services/serviceDocument/documentGeneration.py similarity index 99% rename from modules/chat/documents/documentGeneration.py rename to modules/services/serviceDocument/documentGeneration.py index 2d844ed3..16ab7b16 100644 --- a/modules/chat/documents/documentGeneration.py +++ b/modules/services/serviceDocument/documentGeneration.py @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional from datetime import datetime, UTC import re from modules.shared.timezoneUtils import get_utc_timestamp -from modules.chat.documents.documentUtility import ( +from modules.services.serviceDocument.documentUtility import ( getFileExtension, getMimeTypeFromExtension, detectMimeTypeFromContent, diff --git a/modules/chat/documents/documentUtility.py b/modules/services/serviceDocument/documentUtility.py similarity index 100% rename from modules/chat/documents/documentUtility.py rename to modules/services/serviceDocument/documentUtility.py diff --git a/modules/neutralizer/neutralizer.py b/modules/services/serviceNeutralization/neutralizer.py similarity index 87% rename from modules/neutralizer/neutralizer.py rename to modules/services/serviceNeutralization/neutralizer.py index f8677465..e284ae00 100644 --- a/modules/neutralizer/neutralizer.py +++ b/modules/services/serviceNeutralization/neutralizer.py @@ -8,12 +8,12 @@ import logging from typing import Dict, List, Any # Import all necessary classes and functions -from modules.neutralizer.subProcessCommon import ProcessResult, CommonUtils -from modules.neutralizer.subProcessText import TextProcessor, PlainText -from modules.neutralizer.subProcessList import ListProcessor, TableData -from modules.neutralizer.subProcessBinary import BinaryProcessor, BinaryData -from modules.neutralizer.subParseString import StringParser -from modules.neutralizer.subPatterns import Pattern, HeaderPatterns, DataPatterns, TextTablePatterns +from modules.services.serviceNeutralization.subProcessCommon import ProcessResult, CommonUtils +from modules.services.serviceNeutralization.subProcessText import TextProcessor, PlainText +from modules.services.serviceNeutralization.subProcessList import ListProcessor, TableData +from modules.services.serviceNeutralization.subProcessBinary import BinaryProcessor, BinaryData +from modules.services.serviceNeutralization.subParseString import StringParser +from modules.services.serviceNeutralization.subPatterns import Pattern, HeaderPatterns, DataPatterns, TextTablePatterns # Configure logging logger = logging.getLogger(__name__) diff --git a/modules/neutralizer/readme.md b/modules/services/serviceNeutralization/readme.md similarity index 100% rename from modules/neutralizer/readme.md rename to modules/services/serviceNeutralization/readme.md diff --git a/modules/neutralizer/subParseString.py b/modules/services/serviceNeutralization/subParseString.py similarity index 98% rename from modules/neutralizer/subParseString.py rename to modules/services/serviceNeutralization/subParseString.py index a2b39333..fd9f54cc 100644 --- a/modules/neutralizer/subParseString.py +++ b/modules/services/serviceNeutralization/subParseString.py @@ -6,7 +6,7 @@ Handles pattern matching and replacement for emails, phones, addresses, IDs and import re import uuid from typing import Dict, List, Tuple, Any -from modules.neutralizer.subPatterns import DataPatterns, find_patterns_in_text +from modules.services.serviceNeutralization.subPatterns import DataPatterns, find_patterns_in_text class StringParser: """Handles string parsing and replacement operations""" diff --git a/modules/neutralizer/subPatterns.py b/modules/services/serviceNeutralization/subPatterns.py similarity index 100% rename from modules/neutralizer/subPatterns.py rename to modules/services/serviceNeutralization/subPatterns.py diff --git a/modules/neutralizer/subProcessBinary.py b/modules/services/serviceNeutralization/subProcessBinary.py similarity index 100% rename from modules/neutralizer/subProcessBinary.py rename to modules/services/serviceNeutralization/subProcessBinary.py diff --git a/modules/neutralizer/subProcessCommon.py b/modules/services/serviceNeutralization/subProcessCommon.py similarity index 100% rename from modules/neutralizer/subProcessCommon.py rename to modules/services/serviceNeutralization/subProcessCommon.py diff --git a/modules/neutralizer/subProcessList.py b/modules/services/serviceNeutralization/subProcessList.py similarity index 96% rename from modules/neutralizer/subProcessList.py rename to modules/services/serviceNeutralization/subProcessList.py index 58981333..e4ac91f7 100644 --- a/modules/neutralizer/subProcessList.py +++ b/modules/services/serviceNeutralization/subProcessList.py @@ -9,8 +9,8 @@ import xml.etree.ElementTree as ET from typing import Dict, List, Any, Union from dataclasses import dataclass from io import StringIO -from modules.neutralizer.subParseString import StringParser -from modules.neutralizer.subPatterns import get_pattern_for_header, HeaderPatterns +from modules.services.serviceNeutralization.subParseString import StringParser +from modules.services.serviceNeutralization.subPatterns import get_pattern_for_header, HeaderPatterns @dataclass class TableData: @@ -156,7 +156,7 @@ class ListProcessor: processed_attrs[attr_name] = self.string_parser.mapping[attr_value] else: # Check if attribute value matches any data patterns - from modules.neutralizer.subPatterns import find_patterns_in_text, DataPatterns + from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns matches = find_patterns_in_text(attr_value, DataPatterns.patterns) if matches: pattern_name = matches[0][0] @@ -191,7 +191,7 @@ class ListProcessor: # Skip if already a placeholder if not self.string_parser.is_placeholder(text): # Check if text matches any patterns - from modules.neutralizer.subPatterns import find_patterns_in_text, DataPatterns + from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns pattern_matches = find_patterns_in_text(text, DataPatterns.patterns) if pattern_matches: diff --git a/modules/neutralizer/subProcessText.py b/modules/services/serviceNeutralization/subProcessText.py similarity index 97% rename from modules/neutralizer/subProcessText.py rename to modules/services/serviceNeutralization/subProcessText.py index c9ad872f..20dfe291 100644 --- a/modules/neutralizer/subProcessText.py +++ b/modules/services/serviceNeutralization/subProcessText.py @@ -5,7 +5,7 @@ Handles plain text processing without header information from typing import Dict, List, Any from dataclasses import dataclass -from modules.neutralizer.subParseString import StringParser +from modules.services.serviceNeutralization.subParseString import StringParser @dataclass class PlainText: diff --git a/modules/chat/handling/executionState.py b/modules/workflows/_transfer/executionState.py similarity index 64% rename from modules/chat/handling/executionState.py rename to modules/workflows/_transfer/executionState.py index 1f806745..1d9b1963 100644 --- a/modules/chat/handling/executionState.py +++ b/modules/workflows/_transfer/executionState.py @@ -1,5 +1,5 @@ # executionState.py -# Contains all execution state management logic extracted from managerChat.py +# Contains all execution state management logic import logging from typing import List @@ -18,6 +18,9 @@ class TaskExecutionState: self.current_action_index = 0 self.retry_count = 0 self.max_retries = 3 + # Iterative loop (react mode) + self.current_step = 0 + self.max_steps = 5 def addSuccessfulAction(self, action_result: ActionResult): """Add a successful action to the state""" @@ -52,4 +55,26 @@ class TaskExecutionState: patterns.append("format_issues") elif "permission" in error or "access denied" in error: patterns.append("permission_issues") - return list(set(patterns)) \ No newline at end of file + return list(set(patterns)) + +def should_continue(observation, review=None, current_step: int = 0, max_steps: int = 5) -> bool: + """Helper to decide if the iterative loop should continue + - Stop if review indicates 'stop' or success criteria are met + - Stop on failure with no retry path + - Stop if max steps reached + """ + try: + if current_step >= max_steps: + return False + if review and isinstance(review, dict): + decision = review.get('decision') or review.get('status') + if decision in ('stop', 'success'): + return False + # If observation exists but indicates hard failure with no documents repeatedly + if observation and isinstance(observation, dict): + if observation.get('success') is False and observation.get('documentsCount', 0) == 0: + # allow next step once; the caller can cap by max_steps + return True + return True + except Exception: + return False \ No newline at end of file diff --git a/modules/chat/handling/handlingTasks.py b/modules/workflows/_transfer/handlingTasks.py similarity index 89% rename from modules/chat/handling/handlingTasks.py rename to modules/workflows/_transfer/handlingTasks.py index 98cca8bb..5346e2d1 100644 --- a/modules/chat/handling/handlingTasks.py +++ b/modules/workflows/_transfer/handlingTasks.py @@ -12,13 +12,16 @@ from modules.interfaces.interfaceChatModel import ( ) from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects from modules.shared.timezoneUtils import get_utc_timestamp -from modules.chat.handling.executionState import TaskExecutionState -from modules.chat.handling.promptFactory import ( +from modules.workflows._transfer.executionState import TaskExecutionState +from modules.workflows._transfer.promptFactory import ( createTaskPlanningPrompt, createActionDefinitionPrompt, - createResultReviewPrompt + createResultReviewPrompt, + createActionSelectionPrompt, + createActionParameterPrompt, + createRefinementPrompt ) -from modules.chat.documents.documentGeneration import DocumentGenerator +from modules.services.serviceDocument.documentGeneration import DocumentGenerator import uuid logger = logging.getLogger(__name__) @@ -32,7 +35,7 @@ class HandlingTasks: self.chatInterface = chatInterface self.currentUser = currentUser self.workflow = workflow - from modules.chat.serviceCenter import ServiceCenter + from modules.services.serviceCenter import ServiceCenter self.service = ServiceCenter(currentUser, workflow) self.documentGenerator = DocumentGenerator(self.service) @@ -430,8 +433,95 @@ class HandlingTasks: logger.error(f"Error in generateTaskActions: {str(e)}") return [] + # ===== React-mode iterative functions ===== + + async def plan_select(self, context: TaskContext) -> Dict[str, Any]: + """Plan: select exactly one action. Returns {"action": {method, name}}""" + prompt = createActionSelectionPrompt(context, self.service) + self.service.writeTraceLog("React Plan Selection Prompt", prompt) + response = await self.service.callAiTextAdvanced(prompt) + self.service.writeTraceLog("React Plan Selection Response", response) + json_start = response.find('{') if response else -1 + json_end = response.rfind('}') + 1 if response else 0 + if json_start == -1 or json_end == 0: + raise ValueError("No JSON in selection response") + selection = json.loads(response[json_start:json_end]) + if 'action' not in selection or not isinstance(selection['action'], dict): + raise ValueError("Selection missing 'action'") + return selection + + async def act_execute(self, context: TaskContext, selection: Dict[str, Any], task_step: TaskStep, workflow, step_index: int) -> ActionResult: + """Act: request minimal parameters then execute selected action.""" + action = selection.get('action', {}) + params_prompt = createActionParameterPrompt(context, action, self.service) + self.service.writeTraceLog("React Parameters Prompt", params_prompt) + params_resp = await self.service.callAiTextAdvanced(params_prompt) + self.service.writeTraceLog("React Parameters Response", params_resp) + js = params_resp[params_resp.find('{'):params_resp.rfind('}')+1] if params_resp else '{}' + try: + param_obj = json.loads(js) + except Exception: + param_obj = {"parameters": {}} + parameters = param_obj.get('parameters', {}) if isinstance(param_obj, dict) else {} + + # Apply minimal defaults in-code (language) + if 'language' not in parameters and hasattr(self.service, 'user') and getattr(self.service.user, 'language', None): + parameters['language'] = self.service.user.language + + # Build a synthetic TaskAction for execution routing and labels + current_round = getattr(self.workflow, 'currentRound', 0) + current_task = getattr(self.workflow, 'currentTask', 0) + result_label = f"round{current_round}_task{current_task}_action{step_index}_results" + task_action = self.createTaskAction({ + "execMethod": action.get('method', ''), + "execAction": action.get('name', ''), + "execParameters": parameters, + "execResultLabel": result_label, + "status": TaskStatus.PENDING + }) + # Execute using existing single action flow + return await self.executeSingleAction(task_action, workflow, task_step, current_task, step_index, 1) + + def observe_build(self, action_result: ActionResult) -> Dict[str, Any]: + """Observe: build compact observation object from ActionResult""" + previews = [] + if action_result and action_result.documents: + for doc in action_result.documents[:5]: + name = getattr(doc, 'documentName', '') + mime = getattr(doc, 'mimeType', '') + snippet = '' + data = getattr(doc, 'documentData', None) + if isinstance(data, str): + snippet = data[:200] + elif isinstance(data, dict): + snippet = str(data)[:200] + previews.append({"name": name, "mime": mime, "snippet": snippet}) + observation = { + "success": bool(action_result.success), + "resultLabel": action_result.resultLabel or "", + "documentsCount": len(action_result.documents) if action_result.documents else 0, + "previews": previews, + "notes": [] + } + return observation + + async def refine_decide(self, context: TaskContext, observation: Dict[str, Any]) -> Dict[str, Any]: + """Refine: decide continue or stop, with reason""" + prompt = createRefinementPrompt(context, observation) + self.service.writeTraceLog("React Refinement Prompt", prompt) + resp = await self.service.callAiTextAdvanced(prompt) + self.service.writeTraceLog("React Refinement Response", resp) + js = resp[resp.find('{'):resp.rfind('}')+1] if resp else '{}' + try: + decision = json.loads(js) + except Exception: + decision = {"decision": "continue", "reason": "default"} + return decision + async def executeTask(self, task_step, workflow, context, task_index=None, total_tasks=None) -> TaskResult: - """Execute all actions for a task step, with state management and retries.""" + """Execute all actions for a task step, with state management and retries. + When workflow.workflowMode is 'React', run compact plan–act–observe–refine loop. + """ logger.info(f"=== STARTING TASK {task_index or '?'}: {task_step.objective} ===") # PHASE 4: Update workflow object before executing task @@ -476,6 +566,70 @@ class HandlingTasks: logger.info(f"Task start message created for task {task_index}") state = TaskExecutionState(task_step) + # React mode path - check workflow mode instead of context + if isinstance(context, TaskContext) and hasattr(context, 'workflow') and context.workflow and getattr(context.workflow, 'workflowMode', 'Actionplan') == 'React': + state.max_steps = max(1, int(getattr(context.workflow, 'maxSteps', 5))) + step = 1 + last_review_dict = None + while step <= state.max_steps: + self._checkWorkflowStopped() + # Update workflow[currentAction] for UI + self.updateWorkflowBeforeExecutingAction(step) + self.service.setWorkflowContext(action_number=step) + try: + t0 = time.time() + selection = await self.plan_select(context) + result = await self.act_execute(context, selection, task_step, workflow, step) + observation = self.observe_build(result) + # Attach deterministic label for clarity + observation['resultLabel'] = result.resultLabel + decision = await self.refine_decide(context, observation) + # Telemetry: simple duration per step + duration = time.time() - t0 + self.chatInterface.createLog({ + "workflowId": workflow.id, + "message": f"react_step_duration_sec={duration:.3f}", + "type": "info" + }) + last_review_dict = decision + # Simple messaging per iteration + msg = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"🔁 Step {step}/{state.max_steps}: {selection.get('action',{}).get('method','')}.{selection.get('action',{}).get('name','')} → {'✅' if result.success else '❌'}", + "status": "step", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": get_utc_timestamp(), + "documentsLabel": observation.get('resultLabel'), + "documents": [], + "roundNumber": workflow.currentRound, + "taskNumber": task_index, + "actionNumber": step, + "actionProgress": "success" if result.success else "fail" + } + self.chatInterface.createMessage(msg) + except Exception as e: + logger.error(f"React step {step} error: {e}") + break + + from modules.workflows._transfer.executionState import should_continue + if not should_continue(observation, last_review_dict, step, state.max_steps): + break + step += 1 + + # Summarize task result for react mode + status = TaskStatus.COMPLETED + success = True + feedback = last_review_dict.get('reason') if isinstance(last_review_dict, dict) else 'Completed' + if isinstance(last_review_dict, dict) and last_review_dict.get('decision') == 'stop': + success = True + return TaskResult( + taskId=task_step.id, + status=status, + success=success, + feedback=feedback, + error=None if success else feedback + ) retry_context = context max_retries = state.max_retries for attempt in range(max_retries): @@ -1511,4 +1665,4 @@ class HandlingTasks: logger.info("Workflow reset for new session - all values set to initial state and updated in database") except Exception as e: - logger.error(f"Error resetting workflow for new session: {str(e)}") \ No newline at end of file + logger.error(f"Error resetting workflow for new session: {str(e)}") diff --git a/modules/chat/handling/promptFactory.py b/modules/workflows/_transfer/promptFactory.py similarity index 91% rename from modules/chat/handling/promptFactory.py rename to modules/workflows/_transfer/promptFactory.py index c15979e2..3cf3f5b5 100644 --- a/modules/chat/handling/promptFactory.py +++ b/modules/workflows/_transfer/promptFactory.py @@ -1,16 +1,16 @@ # promptFactory.py -# Contains all prompt creation functions extracted from managerChat.py +# Contains all prompt creation functions import json import logging from typing import Any, Dict, List from modules.interfaces.interfaceChatModel import TaskContext, ReviewContext -from modules.chat.documents.documentUtility import getFileExtension +from modules.services.serviceDocument.documentUtility import getFileExtension # Set up logger logger = logging.getLogger(__name__) -# Prompt creation helpers extracted from managerChat.py +# Prompt creation helpers def _getAvailableDocuments(workflow) -> str: """ @@ -831,3 +831,93 @@ USER LANGUAGE: {user_language} - All user messages must be generated in this lan NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" return prompt + +# ===== New compact prompts for React-style workflow ===== + +def _build_tiny_catalog(service) -> str: + """Return minimal tool catalog: method -> { action -> [paramNames] }""" + try: + method_signatures = service.getMethodsList() + except Exception: + method_signatures = [] + catalog: Dict[str, Dict[str, List[str]]] = {} + for sig in method_signatures: + if '.' not in sig or '(' not in sig or ')' not in sig: + continue + method, rest = sig.split('.', 1) + action = rest.split('(')[0] + params_str = rest[rest.find('(')+1:rest.find(')')].strip() + param_names = [] + if params_str: + for p in params_str.split(','): + name = p.strip().split(':')[0].split('=')[0].strip() + if name: + param_names.append(name) + catalog.setdefault(method, {})[action] = param_names + return json.dumps(catalog, separators=(',', ':'), ensure_ascii=False) + +def createActionSelectionPrompt(context: TaskContext, service) -> str: + """Prompt that returns exactly one action selection: {"action":{"method":"..","name":".."}}""" + user_language = service.user.language if service and service.user else 'en' + tiny_catalog = _build_tiny_catalog(service) + objective = context.task_step.objective if context and context.task_step else '' + available_docs = _getAvailableDocuments(context.workflow) if context and context.workflow else "No documents available" + return f"""Select exactly one action to advance the task. + +OBJECTIVE: {objective} +AVAILABLE DOCUMENTS: {available_docs} +USER LANGUAGE: {user_language} + +MINIMAL TOOL CATALOG (method -> action -> [parameterNames]): +{tiny_catalog} + +BUSINESS RULES: +- Pick exactly one action per step. +- Derive choice from objective and success criteria. +- Prefer user language. +- Keep it minimal; avoid provider specifics. + +RESPONSE FORMAT (JSON only): +{{"action":{{"method":"web","name":"search"}}}} +""" + +def createActionParameterPrompt(context: TaskContext, selected_action: Dict[str, str], service=None) -> str: + """Prompt that returns only parameters for the selected action: {"parameters":{...}}""" + user_language = service.user.language if service and service.user else 'en' + method = selected_action.get('method', '') if selected_action else '' + name = selected_action.get('name', '') if selected_action else '' + available_docs = _getAvailableDocuments(context.workflow) if context and context.workflow else "No documents available" + return f"""Provide only the required parameters for this action. + +SELECTED ACTION: {method}.{name} +OBJECTIVE: {context.task_step.objective if context and context.task_step else ''} +AVAILABLE DOCUMENTS: {available_docs} +USER LANGUAGE: {user_language} + +RULES: +- Return only the parameters object. +- Include user language if relevant. +- Reference documents only by exact labels available. +- Avoid unnecessary fields; host applies defaults. + +RESPONSE FORMAT (JSON only): +{{"parameters":{{}}}} +""" + +def createRefinementPrompt(context: TaskContext, observation: Dict[str, Any]) -> str: + """Prompt that decides to continue or stop based on observation: {"decision":"continue|stop","reason":".."} """ + user_language = context.workflow.messages[-1].role if False else (getattr(context.workflow, 'user_language', None) or (getattr(context.workflow, 'language', None))) # not used, keep minimal + objective = context.task_step.objective if context and context.task_step else '' + return f"""Decide next step based on observation. + +OBJECTIVE: {objective} +OBSERVATION: +{json.dumps(observation, ensure_ascii=False)} + +RULES: +- If criteria are met or no further action helps, decide stop. +- Else decide continue. + +RESPONSE FORMAT (JSON only): +{{"decision":"continue","reason":"Need more data"}} +""" \ No newline at end of file diff --git a/modules/methods/methodAi.py b/modules/workflows/methods/methodAi.py similarity index 99% rename from modules/methods/methodAi.py rename to modules/workflows/methods/methodAi.py index f947db83..ffd9d58e 100644 --- a/modules/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -7,7 +7,7 @@ import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC -from modules.methods.methodBase import MethodBase, action +from modules.workflows.methods.methodBase import MethodBase, action from modules.interfaces.interfaceChatModel import ActionResult from modules.shared.timezoneUtils import get_utc_timestamp diff --git a/modules/methods/methodBase.py b/modules/workflows/methods/methodBase.py similarity index 100% rename from modules/methods/methodBase.py rename to modules/workflows/methods/methodBase.py diff --git a/modules/methods/methodDocument.py b/modules/workflows/methods/methodDocument.py similarity index 99% rename from modules/methods/methodDocument.py rename to modules/workflows/methods/methodDocument.py index 54f45cb9..23fc8b10 100644 --- a/modules/methods/methodDocument.py +++ b/modules/workflows/methods/methodDocument.py @@ -8,7 +8,7 @@ import os from typing import Dict, Any, List, Optional from datetime import datetime, UTC -from modules.methods.methodBase import MethodBase, action +from modules.workflows.methods.methodBase import MethodBase, action from modules.interfaces.interfaceChatModel import ActionResult from modules.shared.timezoneUtils import get_utc_timestamp diff --git a/modules/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py similarity index 99% rename from modules/methods/methodOutlook.py rename to modules/workflows/methods/methodOutlook.py index 658b3982..6e2c6440 100644 --- a/modules/methods/methodOutlook.py +++ b/modules/workflows/methods/methodOutlook.py @@ -81,7 +81,7 @@ from datetime import datetime, UTC import json import uuid -from modules.methods.methodBase import MethodBase, action +from modules.workflows.methods.methodBase import MethodBase, action from modules.interfaces.interfaceChatModel import ActionResult from modules.interfaces.interfaceAppModel import ConnectionStatus from modules.shared.timezoneUtils import get_utc_timestamp diff --git a/modules/methods/methodSharepoint.py b/modules/workflows/methods/methodSharepoint.py similarity index 99% rename from modules/methods/methodSharepoint.py rename to modules/workflows/methods/methodSharepoint.py index bcb92e0b..d474992e 100644 --- a/modules/methods/methodSharepoint.py +++ b/modules/workflows/methods/methodSharepoint.py @@ -13,7 +13,7 @@ from urllib.parse import urlparse import aiohttp import asyncio -from modules.methods.methodBase import MethodBase, action +from modules.workflows.methods.methodBase import MethodBase, action from modules.interfaces.interfaceChatModel import ActionResult from modules.shared.timezoneUtils import get_utc_timestamp diff --git a/modules/methods/methodWeb.py b/modules/workflows/methods/methodWeb.py similarity index 99% rename from modules/methods/methodWeb.py rename to modules/workflows/methods/methodWeb.py index 409b7151..0de6d26c 100644 --- a/modules/methods/methodWeb.py +++ b/modules/workflows/methods/methodWeb.py @@ -2,7 +2,7 @@ import logging import csv import io from typing import Any, Dict -from modules.methods.methodBase import MethodBase, action +from modules.workflows.methods.methodBase import MethodBase, action from modules.interfaces.interfaceChatModel import ActionResult, ActionDocument from modules.interfaces.interfaceWebObjects import WebInterface from modules.interfaces.interfaceWebModel import ( diff --git a/modules/features/featureChatPlayground.py b/modules/workflows/workflowManager.py similarity index 55% rename from modules/features/featureChatPlayground.py rename to modules/workflows/workflowManager.py index 85d8c2d4..0d34b5b9 100644 --- a/modules/features/featureChatPlayground.py +++ b/modules/workflows/workflowManager.py @@ -8,8 +8,7 @@ from modules.interfaces.interfaceAppObjects import User from modules.interfaces.interfaceChatModel import (UserInputRequest, ChatMessage, ChatWorkflow, TaskItem, TaskStatus) from modules.interfaces.interfaceChatObjects import ChatObjects -from modules.chat.managerChat import ChatManager -from modules.chat.handling.handlingTasks import WorkflowStoppedException +from modules.workflows._transfer.handlingTasks import HandlingTasks, WorkflowStoppedException from modules.interfaces.interfaceChatModel import WorkflowResult from modules.shared.timezoneUtils import get_utc_timestamp @@ -20,125 +19,135 @@ class WorkflowManager: def __init__(self, chatInterface: ChatObjects, currentUser: User): self.chatInterface = chatInterface - self.chatManager = ChatManager(currentUser, chatInterface) self.currentUser = currentUser + self.handlingTasks = None - async def workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None: - """Process a workflow with user input using unified workflow phases""" + async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: + """Starts a new workflow or continues an existing one, then launches processing.""" try: - # Initialize chat manager - await self.chatManager.initialize(workflow) - - # Set user language - self.chatManager.handlingTasks.service.setUserLanguage(userInput.userLanguage) - - # Send first message - message = await self._sendFirstMessage(userInput, workflow) - - # Execute unified workflow - workflow_result = await self.chatManager.executeUnifiedWorkflow(userInput, workflow) - - # Process workflow results - await self._processWorkflowResults(workflow, workflow_result, message) - - # Only send last message for successful workflows - # Stopped/failed workflows get their final messages in _processWorkflowResults - if workflow_result.status == 'success': - await self._sendLastMessage(workflow) - - except WorkflowStoppedException: - logger.info("Workflow stopped by user") - # Update workflow status to stopped + currentTime = get_utc_timestamp() + + if workflowId: + workflow = self.chatInterface.getWorkflow(workflowId) + if not workflow: + raise ValueError(f"Workflow {workflowId} not found") + + if workflow.status == "running": + logger.info(f"Stopping running workflow {workflowId} before processing new prompt") + workflow.status = "stopped" + workflow.lastActivity = currentTime + self.chatInterface.updateWorkflow(workflowId, { + "status": "stopped", + "lastActivity": currentTime + }) + self.chatInterface.createLog({ + "workflowId": workflowId, + "message": "Workflow stopped for new prompt", + "type": "info", + "status": "stopped", + "progress": 100 + }) + await asyncio.sleep(0.1) + + newRound = workflow.currentRound + 1 + self.chatInterface.updateWorkflow(workflowId, { + "status": "running", + "lastActivity": currentTime, + "currentRound": newRound + }) + + workflow = self.chatInterface.getWorkflow(workflowId) + if not workflow: + raise ValueError(f"Failed to reload workflow {workflowId} after update") + + self.chatInterface.createLog({ + "workflowId": workflowId, + "message": f"Workflow resumed (round {workflow.currentRound})", + "type": "info", + "status": "running", + "progress": 0 + }) + else: + workflowData = { + "name": "New Workflow", + "status": "running", + "startedAt": currentTime, + "lastActivity": currentTime, + "currentRound": 0, + "currentTask": 0, + "currentAction": 0, + "totalTasks": 0, + "totalActions": 0, + "mandateId": self.chatInterface.mandateId, + "messageIds": [], + "stats": { + "processingTime": None, + "tokenCount": None, + "bytesSent": None, + "bytesReceived": None, + "successRate": None, + "errorCount": None + } + } + + workflow = self.chatInterface.createWorkflow(workflowData) + workflow.currentRound = 1 + self.chatInterface.updateWorkflow(workflow.id, {"currentRound": 1}) + self.chatInterface.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0) + + # Start workflow processing asynchronously + asyncio.create_task(self._workflowProcess(userInput, workflow)) + + return workflow + except Exception as e: + logger.error(f"Error starting workflow: {str(e)}") + raise + + async def workflowStop(self, workflowId: str) -> ChatWorkflow: + """Stops a running workflow.""" + try: + workflow = self.chatInterface.getWorkflow(workflowId) + if not workflow: + raise ValueError(f"Workflow {workflowId} not found") + workflow.status = "stopped" workflow.lastActivity = get_utc_timestamp() - self.chatInterface.updateWorkflow(workflow.id, { + self.chatInterface.updateWorkflow(workflowId, { "status": "stopped", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions + "lastActivity": workflow.lastActivity }) - - # Create final stopped message - stopped_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": "🛑 Workflow stopped by user", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp(), - "documentsLabel": "workflow_stopped", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "pending", - "actionProgress": "pending" - } - message = self.chatInterface.createMessage(stopped_message) - if message: - workflow.messages.append(message) - - # Add log entry self.chatInterface.createLog({ - "workflowId": workflow.id, - "message": "Workflow stopped by user", + "workflowId": workflowId, + "message": "Workflow stopped", "type": "warning", "status": "stopped", "progress": 100 }) + return workflow + except Exception as e: + logger.error(f"Error stopping workflow: {str(e)}") + raise + + async def _workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None: + """Process a workflow with user input""" + try: + self.handlingTasks = HandlingTasks(self.chatInterface, self.currentUser, workflow) + self.handlingTasks.service.setUserLanguage(userInput.userLanguage) + message = await self._sendFirstMessage(userInput, workflow) + task_plan = await self._planTasks(userInput, workflow) + workflow_result = await self._executeTasks(task_plan, workflow) + await self._processWorkflowResults(workflow, workflow_result, message) + + except WorkflowStoppedException: + self._handleWorkflowStop(workflow) except Exception as e: - logger.error(f"Workflow processing error: {str(e)}") - - # Update workflow status to failed - workflow.status = "failed" - workflow.lastActivity = get_utc_timestamp() - self.chatInterface.updateWorkflow(workflow.id, { - "status": "failed", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Create error message - error_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Workflow processing failed: {str(e)}", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp(), - "documentsLabel": "workflow_error", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "fail", - "actionProgress": "fail" - } - message = self.chatInterface.createMessage(error_message) - if message: - workflow.messages.append(message) - - # Add error log entry - self.chatInterface.createLog({ - "workflowId": workflow.id, - "message": f"Workflow failed: {str(e)}", - "type": "error", - "status": "failed", - "progress": 100 - }) - - raise + self._handleWorkflowError(workflow, e) async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage: """Send first message to start workflow""" try: - self.chatManager.handlingTasks._checkWorkflowStopped() + self.handlingTasks._checkWorkflowStopped() # Create initial message using interface # Generate the correct documentsLabel that matches what getDocumentReferenceString will create @@ -171,12 +180,12 @@ class WorkflowManager: workflow.messages.append(message) # Clear trace log for new workflow session - self.chatManager.handlingTasks.service.clearTraceLog() + self.handlingTasks.service.clearTraceLog() # Add documents if any, now with messageId if userInput.listFileId: # Process file IDs and add to message data - documents = await self.chatManager.handlingTasks.service.processFileIds(userInput.listFileId, message.id) + documents = await self.handlingTasks.service.processFileIds(userInput.listFileId, message.id) message.documents = documents # Update the message with documents in database self.chatInterface.updateMessage(message.id, {"documents": [doc.to_dict() for doc in documents]}) @@ -188,97 +197,76 @@ class WorkflowManager: except Exception as e: logger.error(f"Error sending first message: {str(e)}") raise - - async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str: - """Generate feedback message for workflow completion""" - try: - self.chatManager.handlingTasks._checkWorkflowStopped() - - # Count messages by role - user_messages = [msg for msg in workflow.messages if msg.role == 'user'] - assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant'] - - # Generate summary feedback - feedback = f"Workflow completed.\n\n" - feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n" - - # Add final status - if workflow.status == "completed": - feedback += "All tasks completed successfully." - elif workflow.status == "partial": - feedback += "Some tasks completed with partial success." - else: - feedback += f"Workflow status: {workflow.status}" - - return feedback - - except Exception as e: - logger.error(f"Error generating workflow feedback: {str(e)}") - return "Workflow processing completed." - async def _sendLastMessage(self, workflow: ChatWorkflow) -> None: - """Send last message to complete workflow (only for successful workflows)""" - try: - # Safety check: ensure this is only called for successful workflows - if workflow.status in ['stopped', 'failed']: - logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}") - return - - # Generate feedback - feedback = await self._generateWorkflowFeedback(workflow) - - # Create last message using interface - messageData = { - "workflowId": workflow.id, - "role": "assistant", - "message": feedback, - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp(), - "documentsLabel": "workflow_feedback", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "success", - "actionProgress": "success" - } - - # Create message using interface - message = self.chatInterface.createMessage(messageData) - if message: - workflow.messages.append(message) - - # Update workflow status to completed - workflow.status = "completed" - workflow.lastActivity = get_utc_timestamp() - - # Update workflow in database - self.chatInterface.updateWorkflow(workflow.id, { - "status": "completed", - "lastActivity": workflow.lastActivity + async def _planTasks(self, userInput: UserInputRequest, workflow: ChatWorkflow): + """Generate task plan for workflow execution""" + handling = self.handlingTasks + # Generate task plan first (shared for both modes) + task_plan = await handling.generateTaskPlan(userInput.prompt, workflow) + if not task_plan or not task_plan.tasks: + raise Exception("No tasks generated in task plan.") + logger.info(f"Executing workflow mode={getattr(workflow, 'workflowMode', 'Actionplan')} with {len(task_plan.tasks)} tasks") + return task_plan + + async def _executeTasks(self, task_plan, workflow: ChatWorkflow) -> WorkflowResult: + """Execute all tasks in the task plan""" + handling = self.handlingTasks + total_tasks = len(task_plan.tasks) + all_task_results: List = [] + previous_results: List[str] = [] + + for idx, task_step in enumerate(task_plan.tasks): + current_task_index = idx + 1 + logger.info(f"Task {current_task_index}/{total_tasks}: {task_step.objective}") + + # Build TaskContext (mode-specific behavior is inside HandlingTasks) + from modules.interfaces.interfaceChatModel import TaskContext + task_context = TaskContext( + task_step=task_step, + workflow=workflow, + workflow_id=workflow.id, + available_documents=None, + available_connections=None, + previous_results=previous_results, + previous_handover=None, + improvements=[], + retry_count=0, + previous_action_results=[], + previous_review_result=None, + is_regeneration=False, + failure_patterns=[], + failed_actions=[], + successful_actions=[], + criteria_progress={ + 'met_criteria': set(), + 'unmet_criteria': set(), + 'attempt_history': [] + } + ) + + task_result = await handling.executeTask(task_step, workflow, task_context, current_task_index, total_tasks) + handover_data = await handling.prepareTaskHandover(task_step, [], task_result, workflow) + all_task_results.append({ + 'task_step': task_step, + 'task_result': task_result, + 'handover_data': handover_data }) - - # Add completion log entry - self.chatInterface.createLog({ - "workflowId": workflow.id, - "message": "Workflow completed", - "type": "success", - "status": "completed", - "progress": 100 - }) - - except Exception as e: - logger.error(f"Error sending last message: {str(e)}") - raise + if task_result.success and task_result.feedback: + previous_results.append(task_result.feedback) + + return WorkflowResult( + status="completed", + completed_tasks=len(all_task_results), + total_tasks=total_tasks, + execution_time=0.0, + final_results_count=len(all_task_results) + ) async def _processWorkflowResults(self, workflow: ChatWorkflow, workflow_result: WorkflowResult, initial_message: ChatMessage) -> None: """Process workflow results and create appropriate messages""" try: try: - self.chatManager.handlingTasks._checkWorkflowStopped() + self.handlingTasks._checkWorkflowStopped() except WorkflowStoppedException: logger.info(f"Workflow {workflow.id} was stopped during result processing") @@ -398,47 +386,8 @@ class WorkflowManager: }) return - # For successful workflows, create a simple completion message - summary_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Workflow completed successfully.", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp(), - "documentsLabel": "workflow_completion", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "success", - "actionProgress": "success" - } - - message = self.chatInterface.createMessage(summary_message) - if message: - workflow.messages.append(message) - - # Update workflow status to completed for successful workflows - workflow.status = "completed" - workflow.lastActivity = get_utc_timestamp() - self.chatInterface.updateWorkflow(workflow.id, { - "status": "completed", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Add completion log entry - self.chatInterface.createLog({ - "workflowId": workflow.id, - "message": "Workflow completed successfully", - "type": "success", - "status": "completed", - "progress": 100 - }) + # For successful workflows, send detailed completion message + await self._sendLastMessage(workflow) except Exception as e: logger.error(f"Error processing workflow results: {str(e)}") @@ -474,3 +423,179 @@ class WorkflowManager: "totalActions": workflow.totalActions }) + async def _sendLastMessage(self, workflow: ChatWorkflow) -> None: + """Send last message to complete workflow (only for successful workflows)""" + try: + # Safety check: ensure this is only called for successful workflows + if workflow.status in ['stopped', 'failed']: + logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}") + return + + # Generate feedback + feedback = await self._generateWorkflowFeedback(workflow) + + # Create last message using interface + messageData = { + "workflowId": workflow.id, + "role": "assistant", + "message": feedback, + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_feedback", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "success", + "actionProgress": "success" + } + + # Create message using interface + message = self.chatInterface.createMessage(messageData) + if message: + workflow.messages.append(message) + + # Update workflow status to completed + workflow.status = "completed" + workflow.lastActivity = get_utc_timestamp() + + # Update workflow in database + self.chatInterface.updateWorkflow(workflow.id, { + "status": "completed", + "lastActivity": workflow.lastActivity + }) + + # Add completion log entry + self.chatInterface.createLog({ + "workflowId": workflow.id, + "message": "Workflow completed", + "type": "success", + "status": "completed", + "progress": 100 + }) + + except Exception as e: + logger.error(f"Error sending last message: {str(e)}") + raise + + async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str: + """Generate feedback message for workflow completion""" + try: + self.handlingTasks._checkWorkflowStopped() + + # Count messages by role + user_messages = [msg for msg in workflow.messages if msg.role == 'user'] + assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant'] + + # Generate summary feedback + feedback = f"Workflow completed.\n\n" + feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n" + + # Add final status + if workflow.status == "completed": + feedback += "All tasks completed successfully." + elif workflow.status == "partial": + feedback += "Some tasks completed with partial success." + else: + feedback += f"Workflow status: {workflow.status}" + + return feedback + + except Exception as e: + logger.error(f"Error generating workflow feedback: {str(e)}") + return "Workflow processing completed." + + def _handleWorkflowStop(self, workflow: ChatWorkflow) -> None: + """Handle workflow stop exception""" + logger.info("Workflow stopped by user") + + # Update workflow status to stopped + workflow.status = "stopped" + workflow.lastActivity = get_utc_timestamp() + self.chatInterface.updateWorkflow(workflow.id, { + "status": "stopped", + "lastActivity": workflow.lastActivity, + "totalTasks": workflow.totalTasks, + "totalActions": workflow.totalActions + }) + + # Create final stopped message + stopped_message = { + "workflowId": workflow.id, + "role": "assistant", + "message": "🛑 Workflow stopped by user", + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_stopped", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "pending", + "actionProgress": "pending" + } + message = self.chatInterface.createMessage(stopped_message) + if message: + workflow.messages.append(message) + + # Add log entry + self.chatInterface.createLog({ + "workflowId": workflow.id, + "message": "Workflow stopped by user", + "type": "warning", + "status": "stopped", + "progress": 100 + }) + + def _handleWorkflowError(self, workflow: ChatWorkflow, error: Exception) -> None: + """Handle workflow error exception""" + logger.error(f"Workflow processing error: {str(error)}") + + # Update workflow status to failed + workflow.status = "failed" + workflow.lastActivity = get_utc_timestamp() + self.chatInterface.updateWorkflow(workflow.id, { + "status": "failed", + "lastActivity": workflow.lastActivity, + "totalTasks": workflow.totalTasks, + "totalActions": workflow.totalActions + }) + + # Create error message + error_message = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"Workflow processing failed: {str(error)}", + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_error", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "fail", + "actionProgress": "fail" + } + message = self.chatInterface.createMessage(error_message) + if message: + workflow.messages.append(message) + + # Add error log entry + self.chatInterface.createLog({ + "workflowId": workflow.id, + "message": f"Workflow failed: {str(error)}", + "type": "error", + "status": "failed", + "progress": 100 + }) + + raise diff --git a/tests/methods/test_method_web.py b/tests/methods/test_method_web.py index 27344ab3..0d1509e2 100644 --- a/tests/methods/test_method_web.py +++ b/tests/methods/test_method_web.py @@ -5,7 +5,7 @@ import logging import pytest from unittest.mock import patch -from modules.methods.methodWeb import MethodWeb +from modules.workflows.methods.methodWeb import MethodWeb from tests.fixtures.tavily_responses import ( RESPONSE_SEARCH_HOW_OLD_IS_EARTH_NO_ANSWER, RESPONSE_EXTRACT_HOW_OLD_IS_EARTH_NO_ANSWER, diff --git a/tool_stats_durations_from_log.py b/tool_stats_durations_from_log.py index 103cd5be..483af2d2 100644 --- a/tool_stats_durations_from_log.py +++ b/tool_stats_durations_from_log.py @@ -13,7 +13,7 @@ def parse_line(line: str) -> Tuple[Optional[str], Optional[str], Optional[dateti Extract (logger, function, timestamp) from a log line. Expected format examples (single line): - 2025-09-18 16:35:04 - INFO - modules.chat.handling.handlingTasks - Task 1 - Starting action 3/4 - D:\\Athi\\...\\handlingTasks.py:572 - executeTask + 2025-09-18 16:35:04 - INFO - modules.workflows._transfer.handlingTasks - Task 1 - Starting action 3/4 - D:\\Athi\\...\\handlingTasks.py:572 - executeTask Returns (logger, function, timestamp_dt) or (None, None, None) if not matched. """