fix: chat-playground and automation feature routes unification

This commit is contained in:
Ida Dittrich 2026-03-09 10:53:23 +01:00
parent d7636d72c4
commit 22e70aa878
4 changed files with 974 additions and 879 deletions

View file

@ -17,14 +17,16 @@ from modules.features.automation.interfaceFeatureAutomation import getInterface
from modules.features.automation.mainAutomation import getAutomationServices
from modules.auth import limiter, getRequestContext, RequestContext
from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition, AutomationTemplate
from modules.datamodels.datamodelChat import ChatWorkflow
from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage, ChatLog
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
from modules.shared.attributeUtils import getModelAttributeDefinitions
from modules.interfaces import interfaceDbChat
# Configure logger
logger = logging.getLogger(__name__)
# Model attributes for AutomationDefinition
# Model attributes for AutomationDefinition and ChatWorkflow
automationAttributes = getModelAttributeDefinitions(AutomationDefinition)
workflowAttributes = getModelAttributeDefinitions(ChatWorkflow)
# Create router for automation endpoints
router = APIRouter(
@ -231,6 +233,496 @@ def get_available_actions(
)
# -----------------------------------------------------------------------------
# Workflow routes under /{instanceId}/workflows/ (instance-scoped, same as chatplayground)
# -----------------------------------------------------------------------------
def _validateAutomationInstanceAccess(instanceId: str, context: RequestContext) -> Optional[str]:
"""Validate user has access to the automation feature instance. Returns mandateId."""
from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
instance = rootInterface.getFeatureInstance(instanceId)
if not instance:
raise HTTPException(status_code=404, detail=f"Feature instance {instanceId} not found")
featureAccess = rootInterface.getFeatureAccess(str(context.user.id), instanceId)
if not featureAccess or not featureAccess.enabled:
raise HTTPException(status_code=403, detail="Access denied to this feature instance")
return str(instance.mandateId) if instance.mandateId else None
def _getAutomationServiceChat(context: RequestContext, featureInstanceId: str = None, mandateId: str = None):
"""Get chat interface with feature instance context for workflows."""
return interfaceDbChat.getInterface(
context.user,
mandateId=mandateId or (str(context.mandateId) if context.mandateId else None),
featureInstanceId=featureInstanceId
)
@router.get("/{instanceId}/workflows", response_model=PaginatedResponse[ChatWorkflow])
@limiter.limit("120/minute")
def get_automation_workflows(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
page: int = Query(1, ge=1, description="Page number (legacy)"),
pageSize: int = Query(20, ge=1, le=100, description="Items per page (legacy)"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[ChatWorkflow]:
"""Get all workflows for this automation feature instance."""
try:
mandateId = _validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId, mandateId=mandateId)
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}")
else:
paginationParams = PaginationParams(page=page, pageSize=pageSize)
result = chatInterface.getWorkflows(pagination=paginationParams)
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
return PaginatedResponse(items=result, pagination=None)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting automation workflows: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting workflows: {str(e)}")
# Workflow attributes (ChatWorkflow model)
@router.get("/{instanceId}/workflows/attributes", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_automation_workflow_attributes(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get attribute definitions for ChatWorkflow model."""
_validateAutomationInstanceAccess(instanceId, context)
return {"attributes": workflowAttributes}
# Actions (must be before /{workflowId} to avoid path conflict)
@router.get("/{instanceId}/workflows/actions", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_automation_workflow_actions(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get all available workflow actions."""
try:
mandateId = _validateAutomationInstanceAccess(instanceId, context)
services = getAutomationServices(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods
discoverMethods(services)
allActions = []
for methodName, methodInfo in methods.items():
if methodName.startswith('Method'):
continue
methodInstance = methodInfo['instance']
for actionName, actionInfo in methodInstance.actions.items():
allActions.append({
"module": methodInstance.name,
"actionId": f"{methodInstance.name}.{actionName}",
"name": actionName,
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
})
return {"actions": allActions}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting actions: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}")
@router.get("/{instanceId}/workflows/actions/{method}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_automation_method_actions(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
method: str = Path(..., description="Method name"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get actions for a specific method."""
try:
_validateAutomationInstanceAccess(instanceId, context)
services = getAutomationServices(context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=instanceId)
from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods
discoverMethods(services)
methodInstance = None
for mn, mi in methods.items():
if mi['instance'].name == method:
methodInstance = mi['instance']
break
if not methodInstance:
raise HTTPException(status_code=404, detail=f"Method '{method}' not found")
actions = [{"actionId": f"{methodInstance.name}.{an}", "name": an, "description": ai.get('description', ''), "parameters": ai.get('parameters', {})}
for an, ai in methodInstance.actions.items()]
return {"module": methodInstance.name, "description": methodInstance.description, "actions": actions}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting actions for {method}: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}")
@router.get("/{instanceId}/workflows/actions/{method}/{action}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_automation_action_schema(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
method: str = Path(..., description="Method name"),
action: str = Path(..., description="Action name"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get action schema for a specific action."""
try:
_validateAutomationInstanceAccess(instanceId, context)
services = getAutomationServices(context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=instanceId)
from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods
discoverMethods(services)
methodInstance = None
for mn, mi in methods.items():
if mi['instance'].name == method:
methodInstance = mi['instance']
break
if not methodInstance:
raise HTTPException(status_code=404, detail=f"Method '{method}' not found")
if action not in methodInstance.actions:
raise HTTPException(status_code=404, detail=f"Action '{action}' not found in method '{method}'")
ai = methodInstance.actions[action]
return {"method": methodInstance.name, "action": action, "actionId": f"{methodInstance.name}.{action}",
"description": ai.get('description', ''), "parameters": ai.get('parameters', {})}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting action schema: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get action schema: {str(e)}")
@router.get("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def get_automation_workflow(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
context: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Get workflow by ID."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
return workflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get workflow: {str(e)}")
@router.put("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def update_automation_workflow(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
workflowData: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Update workflow by ID."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
if not chatInterface.checkRbacPermission(ChatWorkflow, "update", workflowId):
raise HTTPException(status_code=403, detail="You don't have permission to update this workflow")
updated = chatInterface.updateWorkflow(workflowId, workflowData)
if not updated:
raise HTTPException(status_code=500, detail="Failed to update workflow")
return updated
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating workflow: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to update workflow: {str(e)}")
@router.delete("/{instanceId}/workflows/{workflowId}")
@limiter.limit("120/minute")
def delete_automation_workflow(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete workflow and associated data."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
if not chatInterface.checkRbacPermission(ChatWorkflow, "delete", workflowId):
raise HTTPException(status_code=403, detail="You don't have permission to delete this workflow")
success = chatInterface.deleteWorkflow(workflowId)
if not success:
raise HTTPException(status_code=500, detail="Failed to delete workflow")
return {"id": workflowId, "message": "Workflow and associated data deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting workflow: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error deleting workflow: {str(e)}")
@router.get("/{instanceId}/workflows/{workflowId}/status", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def get_automation_workflow_status(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
context: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Get workflow status."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
return workflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow status: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting workflow status: {str(e)}")
@router.get("/{instanceId}/workflows/{workflowId}/logs", response_model=PaginatedResponse[ChatLog])
@limiter.limit("120/minute")
def get_automation_workflow_logs(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
logId: Optional[str] = Query(None),
pagination: Optional[str] = Query(None),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[ChatLog]:
"""Get workflow logs."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
paginationParams = None
if pagination:
try:
pd = json.loads(pagination)
if pd:
pd = normalize_pagination_dict(pd)
paginationParams = PaginationParams(**pd)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(status_code=400, detail=f"Invalid pagination: {str(e)}")
result = chatInterface.getLogs(workflowId, pagination=paginationParams)
if logId:
allLogs = result.items if paginationParams else result
idx = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
if idx >= 0:
return PaginatedResponse(items=allLogs[idx + 1:], pagination=None)
if paginationParams:
return PaginatedResponse(items=result.items, pagination=PaginationMetadata(
currentPage=paginationParams.page, pageSize=paginationParams.pageSize,
totalItems=result.totalItems, totalPages=result.totalPages,
sort=paginationParams.sort, filters=paginationParams.filters))
return PaginatedResponse(items=result, pagination=None)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting workflow logs: {str(e)}")
@router.get("/{instanceId}/workflows/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage])
@limiter.limit("120/minute")
def get_automation_workflow_messages(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
messageId: Optional[str] = Query(None),
pagination: Optional[str] = Query(None),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[ChatMessage]:
"""Get workflow messages."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
paginationParams = None
if pagination:
try:
pd = json.loads(pagination)
if pd:
pd = normalize_pagination_dict(pd)
paginationParams = PaginationParams(**pd)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(status_code=400, detail=f"Invalid pagination: {str(e)}")
result = chatInterface.getMessages(workflowId, pagination=paginationParams)
if messageId:
allMsgs = result.items if paginationParams else result
idx = next((i for i, m in enumerate(allMsgs) if m.id == messageId), -1)
if idx >= 0:
return PaginatedResponse(items=allMsgs[idx + 1:], pagination=None)
if paginationParams:
return PaginatedResponse(items=result.items, pagination=PaginationMetadata(
currentPage=paginationParams.page, pageSize=paginationParams.pageSize,
totalItems=result.totalItems, totalPages=result.totalPages,
sort=paginationParams.sort, filters=paginationParams.filters))
return PaginatedResponse(items=result, pagination=None)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting workflow messages: {str(e)}")
@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}")
@limiter.limit("120/minute")
def delete_automation_workflow_message(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
messageId: str = Path(..., description="Message ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete message from workflow."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
success = chatInterface.deleteMessage(workflowId, messageId)
if not success:
raise HTTPException(status_code=404, detail=f"Message {messageId} not found")
return {"workflowId": workflowId, "messageId": messageId, "message": "Message deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting message: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error deleting message: {str(e)}")
@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}/files/{fileId}")
@limiter.limit("120/minute")
def delete_automation_file_from_message(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
messageId: str = Path(..., description="Message ID"),
fileId: str = Path(..., description="File ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete file from message."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
success = chatInterface.deleteFileFromMessage(workflowId, messageId, fileId)
if not success:
raise HTTPException(status_code=404, detail=f"File {fileId} not found")
return {"workflowId": workflowId, "messageId": messageId, "fileId": fileId, "message": "File reference deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting file: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error deleting file: {str(e)}")
@router.get("/{instanceId}/workflows/{workflowId}/chatData")
@limiter.limit("120/minute")
def get_automation_workflow_chat_data(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
afterTimestamp: Optional[float] = Query(None),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get unified chat data for workflow."""
try:
_validateAutomationInstanceAccess(instanceId, context)
chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found")
return chatInterface.getUnifiedChatData(workflowId, afterTimestamp)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting chat data: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting chat data: {str(e)}")
@router.post("/{instanceId}/workflows/{workflowId}/stop", response_model=ChatWorkflow)
@limiter.limit("30/minute")
async def stop_automation_workflow(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="Workflow ID"),
context: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Stop a running automation workflow. Uses instance-scoped services."""
try:
from modules.workflows.automation import chatStop
mandateId = _validateAutomationInstanceAccess(instanceId, context)
services = getAutomationServices(
context.user,
mandateId=mandateId,
featureInstanceId=instanceId,
)
services.featureCode = "automation"
return await chatStop(
context.user,
workflowId,
mandateId=mandateId,
featureInstanceId=instanceId,
featureCode="automation",
services=services,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error stopping automation workflow: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{automationId}", response_model=AutomationDefinition)
@limiter.limit("30/minute")
def get_automation(
@ -379,7 +871,7 @@ async def execute_automation_route(
if not automation:
raise ValueError(f"Automation {automationId} not found")
from modules.workflows.automation import executeAutomation
from modules.workflows.automation import executeAutomation, chatStop
workflow = await executeAutomation(automationId, automation, context.user, services)
return workflow
except HTTPException:

View file

@ -5,9 +5,10 @@ Chat Playground Feature Routes.
Implements the endpoints for chat playground workflow management as a feature.
"""
import json
import logging
from typing import Optional, Dict, Any
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request, status
# Import auth modules
from modules.auth import limiter, getRequestContext, RequestContext
@ -16,15 +17,31 @@ from modules.auth import limiter, getRequestContext, RequestContext
from modules.interfaces import interfaceDbChat
# Import models
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum
from modules.datamodels.datamodelChat import (
ChatWorkflow,
ChatMessage,
ChatLog,
UserInputRequest,
WorkflowModeEnum,
)
from modules.datamodels.datamodelPagination import (
PaginationParams,
PaginatedResponse,
PaginationMetadata,
normalize_pagination_dict,
)
# Import workflow control functions
from modules.workflows.automation import chatStart, chatStop
from modules.features.chatplayground.mainChatplayground import getChatplaygroundServices
from modules.shared.attributeUtils import getModelAttributeDefinitions
# Configure logger
logger = logging.getLogger(__name__)
# Model attributes for ChatWorkflow (workflow attributes endpoint)
workflowAttributes = getModelAttributeDefinitions(ChatWorkflow)
# Create router for chat playground feature endpoints
router = APIRouter(
prefix="/api/chatplayground",
@ -130,8 +147,8 @@ async def start_workflow(
)
# Stop workflow endpoint
@router.post("/{instanceId}/{workflowId}/stop", response_model=ChatWorkflow)
# Stop workflow endpoint (under /workflows/{workflowId}/ for consistency)
@router.post("/{instanceId}/workflows/{workflowId}/stop", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def stop_workflow(
request: Request,
@ -174,8 +191,8 @@ async def stop_workflow(
)
# Unified Chat Data Endpoint for Polling
@router.get("/{instanceId}/{workflowId}/chatData")
# Unified Chat Data Endpoint for Polling (under /workflows/{workflowId}/ for consistency)
@router.get("/{instanceId}/workflows/{workflowId}/chatData")
@limiter.limit("120/minute")
def get_workflow_chat_data(
request: Request,
@ -218,18 +235,32 @@ def get_workflow_chat_data(
)
# Get workflow attributes (ChatWorkflow model)
@router.get("/{instanceId}/workflows/attributes", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_workflow_attributes(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get attribute definitions for ChatWorkflow model."""
_validateInstanceAccess(instanceId, context)
return {"attributes": workflowAttributes}
# Get workflows for this instance
@router.get("/{instanceId}/workflows")
@router.get("/{instanceId}/workflows", response_model=PaginatedResponse[ChatWorkflow])
@limiter.limit("120/minute")
def get_workflows(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
page: int = Query(1, ge=1, description="Page number"),
pageSize: int = Query(20, ge=1, le=100, description="Items per page"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
page: int = Query(1, ge=1, description="Page number (legacy)"),
pageSize: int = Query(20, ge=1, le=100, description="Items per page (legacy)"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
) -> PaginatedResponse[ChatWorkflow]:
"""
Get all workflows for this feature instance.
Get all workflows for this feature instance with optional pagination.
"""
try:
# Validate access
@ -238,13 +269,38 @@ def get_workflows(
# Get service with feature instance context
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
# Get workflows with pagination
from modules.datamodels.datamodelPagination import PaginationParams
pagination = PaginationParams(page=page, pageSize=pageSize)
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
else:
paginationParams = PaginationParams(page=page, pageSize=pageSize)
result = chatInterface.getWorkflows(pagination=pagination)
result = chatInterface.getWorkflows(pagination=paginationParams)
return result
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(items=result, pagination=None)
except HTTPException:
raise
@ -254,3 +310,410 @@ def get_workflows(
status_code=500,
detail=f"Error getting workflows: {str(e)}"
)
# Action Discovery Endpoints (must be before /{workflowId} to avoid path conflict)
@router.get("/{instanceId}/workflows/actions", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_all_workflow_actions(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get all available workflow actions for the current user (filtered by RBAC)."""
try:
mandateId = _validateInstanceAccess(instanceId, context)
services = getChatplaygroundServices(
context.user,
mandateId=mandateId,
featureInstanceId=instanceId,
)
from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods
discoverMethods(services)
allActions = []
for methodName, methodInfo in methods.items():
if methodName.startswith('Method'):
continue
methodInstance = methodInfo['instance']
methodActions = methodInstance.actions
for actionName, actionInfo in methodActions.items():
actionResponse = {
"module": methodInstance.name,
"actionId": f"{methodInstance.name}.{actionName}",
"name": actionName,
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
allActions.append(actionResponse)
return {"actions": allActions}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting all actions: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}")
@router.get("/{instanceId}/workflows/actions/{method}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_method_workflow_actions(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get all available actions for a specific method."""
try:
mandateId = _validateInstanceAccess(instanceId, context)
services = getChatplaygroundServices(
context.user,
mandateId=mandateId,
featureInstanceId=instanceId,
)
from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods
discoverMethods(services)
methodInstance = None
for methodName, methodInfo in methods.items():
if methodInfo['instance'].name == method:
methodInstance = methodInfo['instance']
break
if not methodInstance:
raise HTTPException(status_code=404, detail=f"Method '{method}' not found")
actions = []
for actionName, actionInfo in methodInstance.actions.items():
actionResponse = {
"actionId": f"{methodInstance.name}.{actionName}",
"name": actionName,
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
actions.append(actionResponse)
return {"module": methodInstance.name, "description": methodInstance.description, "actions": actions}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting actions for method {method}: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get actions for method {method}: {str(e)}")
@router.get("/{instanceId}/workflows/actions/{method}/{action}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_action_schema(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"),
action: str = Path(..., description="Action name (e.g., 'readEmails', 'uploadDocument')"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get action schema with parameter definitions for a specific action."""
try:
mandateId = _validateInstanceAccess(instanceId, context)
services = getChatplaygroundServices(
context.user,
mandateId=mandateId,
featureInstanceId=instanceId,
)
from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods
discoverMethods(services)
methodInstance = None
for methodName, methodInfo in methods.items():
if methodInfo['instance'].name == method:
methodInstance = methodInfo['instance']
break
if not methodInstance:
raise HTTPException(status_code=404, detail=f"Method '{method}' not found")
methodActions = methodInstance.actions
if action not in methodActions:
raise HTTPException(status_code=404, detail=f"Action '{action}' not found in method '{method}'")
actionInfo = methodActions[action]
return {
"method": methodInstance.name,
"action": action,
"actionId": f"{methodInstance.name}.{action}",
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting action schema for {method}.{action}: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get action schema: {str(e)}")
# Get single workflow by ID
@router.get("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def get_workflow(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="ID of the workflow"),
context: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Get workflow by ID."""
try:
_validateInstanceAccess(instanceId, context)
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
return workflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get workflow: {str(e)}")
# Update workflow
@router.put("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def update_workflow(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="ID of the workflow to update"),
workflowData: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Update workflow by ID."""
try:
_validateInstanceAccess(instanceId, context)
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
if not chatInterface.checkRbacPermission(ChatWorkflow, "update", workflowId):
raise HTTPException(status_code=403, detail="You don't have permission to update this workflow")
updatedWorkflow = chatInterface.updateWorkflow(workflowId, workflowData)
if not updatedWorkflow:
raise HTTPException(status_code=500, detail="Failed to update workflow")
return updatedWorkflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating workflow: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to update workflow: {str(e)}")
# Delete workflow
@router.delete("/{instanceId}/workflows/{workflowId}")
@limiter.limit("120/minute")
def delete_workflow(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="ID of the workflow to delete"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Deletes a workflow and its associated data."""
try:
_validateInstanceAccess(instanceId, context)
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found")
if not chatInterface.checkRbacPermission(ChatWorkflow, "delete", workflowId):
raise HTTPException(status_code=403, detail="You don't have permission to delete this workflow")
success = chatInterface.deleteWorkflow(workflowId)
if not success:
raise HTTPException(status_code=500, detail="Failed to delete workflow")
return {"id": workflowId, "message": "Workflow and associated data deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting workflow: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error deleting workflow: {str(e)}")
# Get workflow status
@router.get("/{instanceId}/workflows/{workflowId}/status", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def get_workflow_status(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="ID of the workflow"),
context: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Get the current status of a workflow."""
try:
_validateInstanceAccess(instanceId, context)
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found")
return workflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow status: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting workflow status: {str(e)}")
# Get workflow logs
@router.get("/{instanceId}/workflows/{workflowId}/logs", response_model=PaginatedResponse[ChatLog])
@limiter.limit("120/minute")
def get_workflow_logs(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="ID of the workflow"),
logId: Optional[str] = Query(None, description="Optional log ID for selective data transfer"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[ChatLog]:
"""Get logs for a workflow with optional pagination."""
try:
_validateInstanceAccess(instanceId, context)
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found")
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}")
result = chatInterface.getLogs(workflowId, pagination=paginationParams)
if logId:
allLogs = result.items if paginationParams else result
logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
if logIndex >= 0:
filteredLogs = allLogs[logIndex + 1:]
return PaginatedResponse(items=filteredLogs, pagination=None)
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
return PaginatedResponse(items=result, pagination=None)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting workflow logs: {str(e)}")
# Get workflow messages
@router.get("/{instanceId}/workflows/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage])
@limiter.limit("120/minute")
def get_workflow_messages(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="ID of the workflow"),
messageId: Optional[str] = Query(None, description="Optional message ID for selective data transfer"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[ChatMessage]:
"""Get messages for a workflow with optional pagination."""
try:
_validateInstanceAccess(instanceId, context)
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found")
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}")
result = chatInterface.getMessages(workflowId, pagination=paginationParams)
if messageId:
allMessages = result.items if paginationParams else result
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
filteredMessages = allMessages[messageIndex + 1:]
return PaginatedResponse(items=filteredMessages, pagination=None)
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
return PaginatedResponse(items=result, pagination=None)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting workflow messages: {str(e)}")
# Delete message from workflow
@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}")
@limiter.limit("120/minute")
def delete_workflow_message(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="ID of the workflow"),
messageId: str = Path(..., description="ID of the message to delete"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a message from a workflow."""
try:
_validateInstanceAccess(instanceId, context)
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found")
success = chatInterface.deleteMessage(workflowId, messageId)
if not success:
raise HTTPException(status_code=404, detail=f"Message with ID {messageId} not found in workflow {workflowId}")
return {"workflowId": workflowId, "messageId": messageId, "message": "Message deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting message: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error deleting message: {str(e)}")
# Delete file from message
@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}/files/{fileId}")
@limiter.limit("120/minute")
def delete_file_from_message(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
workflowId: str = Path(..., description="ID of the workflow"),
messageId: str = Path(..., description="ID of the message"),
fileId: str = Path(..., description="ID of the file to delete"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a file reference from a message in a workflow."""
try:
_validateInstanceAccess(instanceId, context)
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
workflow = chatInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found")
success = chatInterface.deleteFileFromMessage(workflowId, messageId, fileId)
if not success:
raise HTTPException(status_code=404, detail=f"File with ID {fileId} not found in message {messageId}")
return {"workflowId": workflowId, "messageId": messageId, "fileId": fileId, "message": "File reference deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting file reference: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error deleting file reference: {str(e)}")

View file

@ -1,857 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Workflow routes for the backend API.
Implements the endpoints for workflow management according to the state machine.
"""
import logging
import json
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Response, status, Request
# Import auth modules
from modules.auth import limiter, getCurrentUser
from modules.auth.authentication import getRequestContext, RequestContext
# Import interfaces from feature containers
import modules.interfaces.interfaceDbChat as interfaceDbChat
from modules.interfaces.interfaceDbChat import getInterface
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
# Import models from feature containers
from modules.datamodels.datamodelChat import (
ChatWorkflow,
ChatMessage,
ChatLog,
ChatStat,
ChatDocument
)
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
# Configure logger
logger = logging.getLogger(__name__)
# Model attributes for ChatWorkflow
workflowAttributes = getModelAttributeDefinitions(ChatWorkflow)
# Create router for workflow endpoints
router = APIRouter(
prefix="/api/workflows",
tags=["Workflow"],
responses={404: {"description": "Not found"}}
)
def getServiceChat(ctx: RequestContext):
# Workflows are not feature-instance-specific — only mandateId is needed for RBAC.
# Passing featureInstanceId would add a SQL WHERE filter that excludes workflows
# created by other features (e.g., automation vs chatbot).
return interfaceDbChat.getInterface(ctx.user, mandateId=ctx.mandateId)
# Consolidated endpoint for getting all workflows
@router.get("/", response_model=PaginatedResponse[ChatWorkflow])
@limiter.limit("120/minute")
def get_workflows(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
ctx: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[ChatWorkflow]:
"""
Get workflows with optional pagination, sorting, and filtering.
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Examples:
- GET /api/workflows/ (no pagination - returns all workflows)
- GET /api/workflows/?pagination={"page":1,"pageSize":10,"sort":[]}
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict (handles top-level "search" field)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
appInterface = getServiceChat(ctx)
result = appInterface.getWorkflows(pagination=paginationParams)
# If pagination was requested, result is PaginatedResult with items as dicts
# If no pagination, result is List[Dict]
if paginationParams:
workflows = result.items
totalItems = result.totalItems
totalPages = result.totalPages
else:
workflows = result
totalItems = len(result)
totalPages = 1
if paginationParams:
return PaginatedResponse(
items=workflows,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=totalItems,
totalPages=totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=workflows,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflows: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get workflows: {str(e)}"
)
@router.get("/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def get_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
ctx: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Get workflow by ID"""
try:
# Get workflow interface with current user context
workflowInterface = getServiceChat(ctx)
# Get workflow
workflow = workflowInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workflow not found"
)
return workflow
except Exception as e:
logger.error(f"Error getting workflow: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get workflow: {str(e)}"
)
@router.put("/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def update_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to update"),
workflowData: Dict[str, Any] = Body(...),
ctx: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Update workflow by ID"""
try:
# Get workflow interface with current user context
workflowInterface = getServiceChat(ctx)
# Get workflow using interface method to check permissions
workflow = workflowInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workflow not found"
)
# Check if user has permission to update using RBAC
if not workflowInterface.checkRbacPermission(ChatWorkflow, "update", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to update this workflow"
)
# Update workflow
updatedWorkflow = workflowInterface.updateWorkflow(workflowId, workflowData)
if not updatedWorkflow:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update workflow"
)
return updatedWorkflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating workflow: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update workflow: {str(e)}"
)
# API Endpoint for workflow status
@router.get("/{workflowId}/status", response_model=ChatWorkflow)
@limiter.limit("120/minute")
def get_workflow_status(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
ctx: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""Get the current status of a workflow."""
try:
# Get service center
interfaceDbChat = getServiceChat(ctx)
# Retrieve workflow
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
return workflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow status: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow status: {str(e)}"
)
# API Endpoint for stopping a workflow
@router.post("/{workflowId}/stop", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def stop_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to stop"),
ctx: RequestContext = Depends(getRequestContext)
) -> ChatWorkflow:
"""
Stop a running workflow.
This is a general endpoint that can be used by any feature to stop a workflow.
"""
try:
from modules.workflows.automation import chatStop
# Get the workflow first to get mandateId
interfaceChatDb = getServiceChat(ctx)
workflow = interfaceChatDb.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
mandateId = workflow.get("mandateId") if isinstance(workflow, dict) else getattr(workflow, "mandateId", None)
# Stop the workflow
stoppedWorkflow = await chatStop(ctx.user, workflowId, mandateId=mandateId)
return stoppedWorkflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error stopping workflow: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error stopping workflow: {str(e)}"
)
# API Endpoint for workflow logs with selective data transfer
@router.get("/{workflowId}/logs", response_model=PaginatedResponse[ChatLog])
@limiter.limit("120/minute")
def get_workflow_logs(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs (legacy selective data transfer)"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
ctx: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[ChatLog]:
"""
Get logs for a workflow with optional pagination, sorting, and filtering.
Also supports legacy selective data transfer via logId parameter.
Query Parameters:
- logId: Optional log ID for selective data transfer (returns only logs after this ID)
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict (handles top-level "search" field)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get service center
interfaceDbChat = getServiceChat(ctx)
# Verify workflow exists
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Get logs with optional pagination
result = interfaceDbChat.getLogs(workflowId, pagination=paginationParams)
# Handle legacy selective data transfer if logId is provided (takes precedence over pagination)
if logId:
# If pagination was requested, result is PaginatedResult, otherwise List[ChatLog]
allLogs = result.items if paginationParams else result
# Find the index of the log with the given ID
logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
if logIndex >= 0:
# Return only logs after the specified log
filteredLogs = allLogs[logIndex + 1:]
return PaginatedResponse(
items=filteredLogs,
pagination=None
)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[ChatLog]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow logs: {str(e)}"
)
# API Endpoint for workflow messages with selective data transfer
@router.get("/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage])
@limiter.limit("120/minute")
def get_workflow_messages(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages (legacy selective data transfer)"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
ctx: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[ChatMessage]:
"""
Get messages for a workflow with optional pagination, sorting, and filtering.
Also supports legacy selective data transfer via messageId parameter.
Query Parameters:
- messageId: Optional message ID for selective data transfer (returns only messages after this ID)
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict (handles top-level "search" field)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get service center
interfaceDbChat = getServiceChat(ctx)
# Verify workflow exists
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Get messages with optional pagination
result = interfaceDbChat.getMessages(workflowId, pagination=paginationParams)
# Handle legacy selective data transfer if messageId is provided (takes precedence over pagination)
if messageId:
# If pagination was requested, result is PaginatedResult, otherwise List[ChatMessage]
allMessages = result.items if paginationParams else result
# Find the index of the message with the given ID
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
# Return only messages after the specified message
filteredMessages = allMessages[messageIndex + 1:]
return PaginatedResponse(
items=filteredMessages,
pagination=None
)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[ChatMessage]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow messages: {str(e)}"
)
# State 11: Workflow Reset/Deletion endpoint
@router.delete("/{workflowId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def delete_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to delete"),
ctx: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Deletes a workflow and its associated data."""
try:
# Get service center
interfaceDbChat = getServiceChat(ctx)
# Check workflow access and permission using RBAC
workflows = getRecordsetWithRBAC(
interfaceDbChat.db,
ChatWorkflow,
ctx.user,
recordFilter={"id": workflowId},
mandateId=ctx.mandateId
)
if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
workflow_data = workflows[0]
# Check if user has permission to delete using RBAC
if not interfaceDbChat.checkRbacPermission(ChatWorkflow, "delete", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this workflow"
)
# Delete workflow
success = interfaceDbChat.deleteWorkflow(workflowId)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete workflow"
)
return {
"id": workflowId,
"message": "Workflow and associated data deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting workflow: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting workflow: {str(e)}"
)
# Document Management Endpoints
@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def delete_workflow_message(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: str = Path(..., description="ID of the message to delete"),
ctx: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a message from a workflow."""
try:
# Get service center
interfaceDbChat = getServiceChat(ctx)
# Verify workflow exists
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Delete the message
success = interfaceDbChat.deleteMessage(workflowId, messageId)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Message with ID {messageId} not found in workflow {workflowId}"
)
# Messages are stored in ChatMessage table linked by workflowId --
# no messageIds list on ChatWorkflow to update.
return {
"workflowId": workflowId,
"messageId": messageId,
"message": "Message deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting message: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting message: {str(e)}"
)
@router.delete("/{workflowId}/messages/{messageId}/files/{fileId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def delete_file_from_message(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: str = Path(..., description="ID of the message"),
fileId: str = Path(..., description="ID of the file to delete"),
ctx: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a file reference from a message in a workflow."""
try:
# Get service center
interfaceDbChat = getServiceChat(ctx)
# Verify workflow exists
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Delete file reference from message
success = interfaceDbChat.deleteFileFromMessage(workflowId, messageId, fileId)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found in message {messageId}"
)
return {
"workflowId": workflowId,
"messageId": messageId,
"fileId": fileId,
"message": "File reference deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting file reference: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting file reference: {str(e)}"
)
# Action Discovery Endpoints
@router.get("/actions", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_all_actions(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get all available workflow actions for the current user (filtered by RBAC).
Returns:
- Dictionary with actions grouped by module, filtered by RBAC permissions
Example response:
{
"actions": [
{
"module": "outlook",
"actionId": "outlook.readEmails",
"name": "readEmails",
"description": "Read emails and metadata from a mailbox folder",
"parameters": {...}
},
...
]
}
"""
try:
from modules.services import getInterface as getServices
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
# Get services and discover methods
services = getServices(currentUser, None)
discoverMethods(services)
# Import methods catalog
from modules.workflows.processing.shared.methodDiscovery import methods
# Collect all actions from all methods
allActions = []
for methodName, methodInfo in methods.items():
# Skip duplicate entries (same method stored with full and short name)
if methodName.startswith('Method'):
continue
methodInstance = methodInfo['instance']
methodActions = methodInstance.actions
for actionName, actionInfo in methodActions.items():
# Build action response
actionResponse = {
"module": methodInstance.name,
"actionId": f"{methodInstance.name}.{actionName}",
"name": actionName,
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
allActions.append(actionResponse)
return {
"actions": allActions
}
except Exception as e:
logger.error(f"Error getting all actions: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get actions: {str(e)}"
)
@router.get("/actions/{method}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_method_actions(
request: Request,
method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get all available actions for a specific method (filtered by RBAC).
Path Parameters:
- method: Method name (e.g., 'outlook', 'sharepoint', 'ai')
Returns:
- Dictionary with actions for the specified method
Example response:
{
"module": "outlook",
"actions": [
{
"actionId": "outlook.readEmails",
"name": "readEmails",
"description": "Read emails and metadata from a mailbox folder",
"parameters": {...}
},
...
]
}
"""
try:
from modules.services import getInterface as getServices
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
# Get services and discover methods
services = getServices(currentUser, None)
discoverMethods(services)
# Import methods catalog
from modules.workflows.processing.shared.methodDiscovery import methods
# Find method instance
methodInstance = None
for methodName, methodInfo in methods.items():
if methodInfo['instance'].name == method:
methodInstance = methodInfo['instance']
break
if not methodInstance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Method '{method}' not found"
)
# Collect actions for this method
actions = []
methodActions = methodInstance.actions
for actionName, actionInfo in methodActions.items():
actionResponse = {
"actionId": f"{methodInstance.name}.{actionName}",
"name": actionName,
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
actions.append(actionResponse)
return {
"module": methodInstance.name,
"description": methodInstance.description,
"actions": actions
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting actions for method {method}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get actions for method {method}: {str(e)}"
)
@router.get("/actions/{method}/{action}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_action_schema(
request: Request,
method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"),
action: str = Path(..., description="Action name (e.g., 'readEmails', 'uploadDocument')"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get action schema with parameter definitions for a specific action.
Path Parameters:
- method: Method name (e.g., 'outlook', 'sharepoint', 'ai')
- action: Action name (e.g., 'readEmails', 'uploadDocument')
Returns:
- Action schema with full parameter definitions
Example response:
{
"method": "outlook",
"action": "readEmails",
"actionId": "outlook.readEmails",
"description": "Read emails and metadata from a mailbox folder",
"parameters": {
"connectionReference": {
"name": "connectionReference",
"type": "str",
"frontendType": "userConnection",
"frontendOptions": "user.connection",
"required": true,
"description": "Microsoft connection label"
},
...
}
}
"""
try:
from modules.services import getInterface as getServices
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
# Get services and discover methods
services = getServices(currentUser, None)
discoverMethods(services)
# Import methods catalog
from modules.workflows.processing.shared.methodDiscovery import methods
# Find method instance
methodInstance = None
for methodName, methodInfo in methods.items():
if methodInfo['instance'].name == method:
methodInstance = methodInfo['instance']
break
if not methodInstance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Method '{method}' not found"
)
# Get action
methodActions = methodInstance.actions
if action not in methodActions:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Action '{action}' not found in method '{method}'"
)
actionInfo = methodActions[action]
return {
"method": methodInstance.name,
"action": action,
"actionId": f"{methodInstance.name}.{action}",
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting action schema for {method}.{action}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get action schema: {str(e)}"
)

View file

@ -52,9 +52,6 @@ _MUST_STAY_ASYNC: Dict[str, Set[str]] = {
"modules/routes/routeDataFiles.py": {
"upload_file", # await file.read()
},
"modules/routes/routeDataWorkflows.py": {
"stop_workflow", # await chatStop(...)
},
# These files have many genuinely async routes (httpx, external APIs) -- keep ALL async:
"modules/routes/routeRealEstate.py": "__ALL__",
"modules/routes/routeSharepoint.py": "__ALL__",