# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Chat Playground Feature Routes. Implements the endpoints for chat playground workflow management as a feature. """ import json import logging from typing import Optional, Dict, Any from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request, status # Import auth modules from modules.auth import limiter, getRequestContext, RequestContext # Import interfaces from modules.interfaces import interfaceDbChat # Import models from modules.datamodels.datamodelChat import ( ChatWorkflow, ChatMessage, ChatLog, UserInputRequest, WorkflowModeEnum, ) from modules.datamodels.datamodelPagination import ( PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict, ) # Import workflow control functions from modules.workflows.automation import chatStart, chatStop from modules.features.chatplayground.mainChatplayground import getChatplaygroundServices from modules.shared.attributeUtils import getModelAttributeDefinitions # Configure logger logger = logging.getLogger(__name__) # Model attributes for ChatWorkflow (workflow attributes endpoint) workflowAttributes = getModelAttributeDefinitions(ChatWorkflow) # Create router for chat playground feature endpoints router = APIRouter( prefix="/api/chatplayground", tags=["Chat Playground Feature"], responses={404: {"description": "Not found"}} ) def _getServiceChat(context: RequestContext, featureInstanceId: str = None): """Get chat interface with feature instance context.""" return interfaceDbChat.getInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=featureInstanceId ) def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str: """ Validate that user has access to the feature instance. Args: instanceId: Feature instance ID context: Request context Returns: mandateId for the instance Raises: HTTPException if access is denied """ from modules.interfaces.interfaceDbApp import getRootInterface rootInterface = getRootInterface() # Get feature instance (Pydantic model) instance = rootInterface.getFeatureInstance(instanceId) if not instance: raise HTTPException(status_code=404, detail=f"Feature instance {instanceId} not found") # Check user has access to this instance using interface method featureAccess = rootInterface.getFeatureAccess(str(context.user.id), instanceId) if not featureAccess or not featureAccess.enabled: raise HTTPException(status_code=403, detail="Access denied to this feature instance") return str(instance.mandateId) if instance.mandateId else None # Workflow start endpoint @router.post("/{instanceId}/start", response_model=ChatWorkflow) @limiter.limit("120/minute") async def start_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"), workflowMode: WorkflowModeEnum = Query(..., description="Workflow mode: 'Dynamic' or 'Automation' (mandatory)"), userInput: UserInputRequest = Body(...), context: RequestContext = Depends(getRequestContext) ) -> ChatWorkflow: """ Starts a new workflow or continues an existing one. Args: instanceId: Feature instance ID workflowMode: "Dynamic" for iterative dynamic-style processing, "Automation" for automated workflow execution """ try: # Validate access and get mandate ID mandateId = _validateInstanceAccess(instanceId, context) # Get chatplayground services from service center (not automation) services = getChatplaygroundServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) services.featureCode = 'chatplayground' if hasattr(userInput, 'allowedProviders') and userInput.allowedProviders: services.allowedProviders = userInput.allowedProviders # Start or continue workflow workflow = await chatStart( context.user, userInput, workflowMode, workflowId, mandateId=mandateId, featureInstanceId=instanceId, featureCode='chatplayground', services=services, ) return workflow except HTTPException: raise except Exception as e: logger.error(f"Error in start_workflow: {str(e)}") raise HTTPException( status_code=500, detail=str(e) ) # Stop workflow endpoint (under /workflows/{workflowId}/ for consistency) @router.post("/{instanceId}/workflows/{workflowId}/stop", response_model=ChatWorkflow) @limiter.limit("120/minute") async def stop_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow to stop"), context: RequestContext = Depends(getRequestContext) ) -> ChatWorkflow: """Stops a running workflow.""" try: # Validate access and get mandate ID mandateId = _validateInstanceAccess(instanceId, context) # Get chatplayground services from service center (not automation) services = getChatplaygroundServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) services.featureCode = 'chatplayground' # Stop workflow (pass featureInstanceId for proper RBAC filtering) workflow = await chatStop( context.user, workflowId, mandateId=mandateId, featureInstanceId=instanceId, featureCode='chatplayground', services=services, ) return workflow except HTTPException: raise except Exception as e: logger.error(f"Error in stop_workflow: {str(e)}") raise HTTPException( status_code=500, detail=str(e) ) # Unified Chat Data Endpoint for Polling (under /workflows/{workflowId}/ for consistency) @router.get("/{instanceId}/workflows/{workflowId}/chatData") @limiter.limit("120/minute") def get_workflow_chat_data( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow"), afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer. Returns all data types in chronological order based on _createdAt timestamp. """ try: # Validate access _validateInstanceAccess(instanceId, context) # Get service with feature instance context chatInterface = _getServiceChat(context, featureInstanceId=instanceId) # Verify workflow exists workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException( status_code=404, detail=f"Workflow with ID {workflowId} not found" ) # Get unified chat data chatData = chatInterface.getUnifiedChatData(workflowId, afterTimestamp) return chatData except HTTPException: raise except Exception as e: logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Error getting unified chat data: {str(e)}" ) # Get workflow attributes (ChatWorkflow model) @router.get("/{instanceId}/workflows/attributes", response_model=Dict[str, Any]) @limiter.limit("120/minute") def get_workflow_attributes( request: Request, instanceId: str = Path(..., description="Feature instance ID"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """Get attribute definitions for ChatWorkflow model.""" _validateInstanceAccess(instanceId, context) return {"attributes": workflowAttributes} # Get workflows for this instance @router.get("/{instanceId}/workflows", response_model=PaginatedResponse[ChatWorkflow]) @limiter.limit("120/minute") def get_workflows( request: Request, instanceId: str = Path(..., description="Feature instance ID"), pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), page: int = Query(1, ge=1, description="Page number (legacy)"), pageSize: int = Query(20, ge=1, le=100, description="Items per page (legacy)"), context: RequestContext = Depends(getRequestContext) ) -> PaginatedResponse[ChatWorkflow]: """ Get all workflows for this feature instance with optional pagination. """ try: # Validate access _validateInstanceAccess(instanceId, context) # Get service with feature instance context chatInterface = _getServiceChat(context, featureInstanceId=instanceId) # Parse pagination parameter paginationParams = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException( status_code=400, detail=f"Invalid pagination parameter: {str(e)}" ) else: paginationParams = PaginationParams(page=page, pageSize=pageSize) result = chatInterface.getWorkflows(pagination=paginationParams) if paginationParams: return PaginatedResponse( items=result.items, pagination=PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=result.totalItems, totalPages=result.totalPages, sort=paginationParams.sort, filters=paginationParams.filters ) ) else: return PaginatedResponse(items=result, pagination=None) except HTTPException: raise except Exception as e: logger.error(f"Error getting workflows: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Error getting workflows: {str(e)}" ) # Action Discovery Endpoints (must be before /{workflowId} to avoid path conflict) @router.get("/{instanceId}/workflows/actions", response_model=Dict[str, Any]) @limiter.limit("120/minute") def get_all_workflow_actions( request: Request, instanceId: str = Path(..., description="Feature instance ID"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """Get all available workflow actions for the current user (filtered by RBAC).""" try: mandateId = _validateInstanceAccess(instanceId, context) services = getChatplaygroundServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods discoverMethods(services) allActions = [] for methodName, methodInfo in methods.items(): if methodName.startswith('Method'): continue methodInstance = methodInfo['instance'] methodActions = methodInstance.actions for actionName, actionInfo in methodActions.items(): actionResponse = { "module": methodInstance.name, "actionId": f"{methodInstance.name}.{actionName}", "name": actionName, "description": actionInfo.get('description', ''), "parameters": actionInfo.get('parameters', {}) } allActions.append(actionResponse) return {"actions": allActions} except HTTPException: raise except Exception as e: logger.error(f"Error getting all actions: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}") @router.get("/{instanceId}/workflows/actions/{method}", response_model=Dict[str, Any]) @limiter.limit("120/minute") def get_method_workflow_actions( request: Request, instanceId: str = Path(..., description="Feature instance ID"), method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """Get all available actions for a specific method.""" try: mandateId = _validateInstanceAccess(instanceId, context) services = getChatplaygroundServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods discoverMethods(services) methodInstance = None for methodName, methodInfo in methods.items(): if methodInfo['instance'].name == method: methodInstance = methodInfo['instance'] break if not methodInstance: raise HTTPException(status_code=404, detail=f"Method '{method}' not found") actions = [] for actionName, actionInfo in methodInstance.actions.items(): actionResponse = { "actionId": f"{methodInstance.name}.{actionName}", "name": actionName, "description": actionInfo.get('description', ''), "parameters": actionInfo.get('parameters', {}) } actions.append(actionResponse) return {"module": methodInstance.name, "description": methodInstance.description, "actions": actions} except HTTPException: raise except Exception as e: logger.error(f"Error getting actions for method {method}: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get actions for method {method}: {str(e)}") @router.get("/{instanceId}/workflows/actions/{method}/{action}", response_model=Dict[str, Any]) @limiter.limit("120/minute") def get_action_schema( request: Request, instanceId: str = Path(..., description="Feature instance ID"), method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), action: str = Path(..., description="Action name (e.g., 'readEmails', 'uploadDocument')"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """Get action schema with parameter definitions for a specific action.""" try: mandateId = _validateInstanceAccess(instanceId, context) services = getChatplaygroundServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods discoverMethods(services) methodInstance = None for methodName, methodInfo in methods.items(): if methodInfo['instance'].name == method: methodInstance = methodInfo['instance'] break if not methodInstance: raise HTTPException(status_code=404, detail=f"Method '{method}' not found") methodActions = methodInstance.actions if action not in methodActions: raise HTTPException(status_code=404, detail=f"Action '{action}' not found in method '{method}'") actionInfo = methodActions[action] return { "method": methodInstance.name, "action": action, "actionId": f"{methodInstance.name}.{action}", "description": actionInfo.get('description', ''), "parameters": actionInfo.get('parameters', {}) } except HTTPException: raise except Exception as e: logger.error(f"Error getting action schema for {method}.{action}: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get action schema: {str(e)}") # Get single workflow by ID @router.get("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) @limiter.limit("120/minute") def get_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow"), context: RequestContext = Depends(getRequestContext) ) -> ChatWorkflow: """Get workflow by ID.""" try: _validateInstanceAccess(instanceId, context) chatInterface = _getServiceChat(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail="Workflow not found") return workflow except HTTPException: raise except Exception as e: logger.error(f"Error getting workflow: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to get workflow: {str(e)}") # Update workflow @router.put("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) @limiter.limit("120/minute") def update_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow to update"), workflowData: Dict[str, Any] = Body(...), context: RequestContext = Depends(getRequestContext) ) -> ChatWorkflow: """Update workflow by ID.""" try: _validateInstanceAccess(instanceId, context) chatInterface = _getServiceChat(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail="Workflow not found") if not chatInterface.checkRbacPermission(ChatWorkflow, "update", workflowId): raise HTTPException(status_code=403, detail="You don't have permission to update this workflow") updatedWorkflow = chatInterface.updateWorkflow(workflowId, workflowData) if not updatedWorkflow: raise HTTPException(status_code=500, detail="Failed to update workflow") return updatedWorkflow except HTTPException: raise except Exception as e: logger.error(f"Error updating workflow: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to update workflow: {str(e)}") # Delete workflow @router.delete("/{instanceId}/workflows/{workflowId}") @limiter.limit("120/minute") def delete_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow to delete"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """Deletes a workflow and its associated data.""" try: _validateInstanceAccess(instanceId, context) chatInterface = _getServiceChat(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") if not chatInterface.checkRbacPermission(ChatWorkflow, "delete", workflowId): raise HTTPException(status_code=403, detail="You don't have permission to delete this workflow") success = chatInterface.deleteWorkflow(workflowId) if not success: raise HTTPException(status_code=500, detail="Failed to delete workflow") return {"id": workflowId, "message": "Workflow and associated data deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Error deleting workflow: {str(e)}") # Get workflow status @router.get("/{instanceId}/workflows/{workflowId}/status", response_model=ChatWorkflow) @limiter.limit("120/minute") def get_workflow_status( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow"), context: RequestContext = Depends(getRequestContext) ) -> ChatWorkflow: """Get the current status of a workflow.""" try: _validateInstanceAccess(instanceId, context) chatInterface = _getServiceChat(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") return workflow except HTTPException: raise except Exception as e: logger.error(f"Error getting workflow status: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Error getting workflow status: {str(e)}") # Get workflow logs @router.get("/{instanceId}/workflows/{workflowId}/logs", response_model=PaginatedResponse[ChatLog]) @limiter.limit("120/minute") def get_workflow_logs( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow"), logId: Optional[str] = Query(None, description="Optional log ID for selective data transfer"), pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), context: RequestContext = Depends(getRequestContext) ) -> PaginatedResponse[ChatLog]: """Get logs for a workflow with optional pagination.""" try: _validateInstanceAccess(instanceId, context) chatInterface = _getServiceChat(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") paginationParams = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") result = chatInterface.getLogs(workflowId, pagination=paginationParams) if logId: allLogs = result.items if paginationParams else result logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1) if logIndex >= 0: filteredLogs = allLogs[logIndex + 1:] return PaginatedResponse(items=filteredLogs, pagination=None) if paginationParams: return PaginatedResponse( items=result.items, pagination=PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=result.totalItems, totalPages=result.totalPages, sort=paginationParams.sort, filters=paginationParams.filters ) ) return PaginatedResponse(items=result, pagination=None) except HTTPException: raise except Exception as e: logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Error getting workflow logs: {str(e)}") # Get workflow messages @router.get("/{instanceId}/workflows/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage]) @limiter.limit("120/minute") def get_workflow_messages( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow"), messageId: Optional[str] = Query(None, description="Optional message ID for selective data transfer"), pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), context: RequestContext = Depends(getRequestContext) ) -> PaginatedResponse[ChatMessage]: """Get messages for a workflow with optional pagination.""" try: _validateInstanceAccess(instanceId, context) chatInterface = _getServiceChat(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") paginationParams = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") result = chatInterface.getMessages(workflowId, pagination=paginationParams) if messageId: allMessages = result.items if paginationParams else result messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1) if messageIndex >= 0: filteredMessages = allMessages[messageIndex + 1:] return PaginatedResponse(items=filteredMessages, pagination=None) if paginationParams: return PaginatedResponse( items=result.items, pagination=PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=result.totalItems, totalPages=result.totalPages, sort=paginationParams.sort, filters=paginationParams.filters ) ) return PaginatedResponse(items=result, pagination=None) except HTTPException: raise except Exception as e: logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Error getting workflow messages: {str(e)}") # Delete message from workflow @router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}") @limiter.limit("120/minute") def delete_workflow_message( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow"), messageId: str = Path(..., description="ID of the message to delete"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """Delete a message from a workflow.""" try: _validateInstanceAccess(instanceId, context) chatInterface = _getServiceChat(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") success = chatInterface.deleteMessage(workflowId, messageId) if not success: raise HTTPException(status_code=404, detail=f"Message with ID {messageId} not found in workflow {workflowId}") return {"workflowId": workflowId, "messageId": messageId, "message": "Message deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Error deleting message: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Error deleting message: {str(e)}") # Delete file from message @router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}/files/{fileId}") @limiter.limit("120/minute") def delete_file_from_message( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="ID of the workflow"), messageId: str = Path(..., description="ID of the message"), fileId: str = Path(..., description="ID of the file to delete"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """Delete a file reference from a message in a workflow.""" try: _validateInstanceAccess(instanceId, context) chatInterface = _getServiceChat(context, featureInstanceId=instanceId) workflow = chatInterface.getWorkflow(workflowId) if not workflow: raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") success = chatInterface.deleteFileFromMessage(workflowId, messageId, fileId) if not success: raise HTTPException(status_code=404, detail=f"File with ID {fileId} not found in message {messageId}") return {"workflowId": workflowId, "messageId": messageId, "fileId": fileId, "message": "File reference deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Error deleting file reference: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Error deleting file reference: {str(e)}")