256 lines
8.6 KiB
Python
256 lines
8.6 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Chat Playground Feature Routes.
|
|
Implements the endpoints for chat playground workflow management as a feature.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request
|
|
|
|
# Import auth modules
|
|
from modules.auth import limiter, getRequestContext, RequestContext
|
|
|
|
# Import interfaces
|
|
from modules.interfaces import interfaceDbChat
|
|
|
|
# Import models
|
|
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum
|
|
|
|
# Import workflow control functions
|
|
from modules.workflows.automation import chatStart, chatStop
|
|
from modules.features.chatplayground.mainChatplayground import getChatplaygroundServices
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create router for chat playground feature endpoints
|
|
router = APIRouter(
|
|
prefix="/api/chatplayground",
|
|
tags=["Chat Playground Feature"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
def _getServiceChat(context: RequestContext, featureInstanceId: str = None):
|
|
"""Get chat interface with feature instance context."""
|
|
return interfaceDbChat.getInterface(
|
|
context.user,
|
|
mandateId=str(context.mandateId) if context.mandateId else None,
|
|
featureInstanceId=featureInstanceId
|
|
)
|
|
|
|
|
|
def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
|
|
"""
|
|
Validate that user has access to the feature instance.
|
|
|
|
Args:
|
|
instanceId: Feature instance ID
|
|
context: Request context
|
|
|
|
Returns:
|
|
mandateId for the instance
|
|
|
|
Raises:
|
|
HTTPException if access is denied
|
|
"""
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
|
|
rootInterface = getRootInterface()
|
|
|
|
# Get feature instance (Pydantic model)
|
|
instance = rootInterface.getFeatureInstance(instanceId)
|
|
if not instance:
|
|
raise HTTPException(status_code=404, detail=f"Feature instance {instanceId} not found")
|
|
|
|
# Check user has access to this instance using interface method
|
|
featureAccess = rootInterface.getFeatureAccess(str(context.user.id), instanceId)
|
|
|
|
if not featureAccess or not featureAccess.enabled:
|
|
raise HTTPException(status_code=403, detail="Access denied to this feature instance")
|
|
|
|
return str(instance.mandateId) if instance.mandateId else None
|
|
|
|
|
|
# Workflow start endpoint
|
|
@router.post("/{instanceId}/start", response_model=ChatWorkflow)
|
|
@limiter.limit("120/minute")
|
|
async def start_workflow(
|
|
request: Request,
|
|
instanceId: str = Path(..., description="Feature instance ID"),
|
|
workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"),
|
|
workflowMode: WorkflowModeEnum = Query(..., description="Workflow mode: 'Dynamic' or 'Automation' (mandatory)"),
|
|
userInput: UserInputRequest = Body(...),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> ChatWorkflow:
|
|
"""
|
|
Starts a new workflow or continues an existing one.
|
|
|
|
Args:
|
|
instanceId: Feature instance ID
|
|
workflowMode: "Dynamic" for iterative dynamic-style processing, "Automation" for automated workflow execution
|
|
"""
|
|
try:
|
|
# Validate access and get mandate ID
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
|
|
# Get chatplayground services from service center (not automation)
|
|
services = getChatplaygroundServices(
|
|
context.user,
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId,
|
|
)
|
|
services.featureCode = 'chatplayground'
|
|
if hasattr(userInput, 'allowedProviders') and userInput.allowedProviders:
|
|
services.allowedProviders = userInput.allowedProviders
|
|
|
|
# Start or continue workflow
|
|
workflow = await chatStart(
|
|
context.user,
|
|
userInput,
|
|
workflowMode,
|
|
workflowId,
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId,
|
|
featureCode='chatplayground',
|
|
services=services,
|
|
)
|
|
|
|
return workflow
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error in start_workflow: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
# Stop workflow endpoint
|
|
@router.post("/{instanceId}/{workflowId}/stop", response_model=ChatWorkflow)
|
|
@limiter.limit("120/minute")
|
|
async def stop_workflow(
|
|
request: Request,
|
|
instanceId: str = Path(..., description="Feature instance ID"),
|
|
workflowId: str = Path(..., description="ID of the workflow to stop"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> ChatWorkflow:
|
|
"""Stops a running workflow."""
|
|
try:
|
|
# Validate access and get mandate ID
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
|
|
# Get chatplayground services from service center (not automation)
|
|
services = getChatplaygroundServices(
|
|
context.user,
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId,
|
|
)
|
|
services.featureCode = 'chatplayground'
|
|
|
|
# Stop workflow (pass featureInstanceId for proper RBAC filtering)
|
|
workflow = await chatStop(
|
|
context.user,
|
|
workflowId,
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId,
|
|
featureCode='chatplayground',
|
|
services=services,
|
|
)
|
|
|
|
return workflow
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error in stop_workflow: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=str(e)
|
|
)
|
|
|
|
|
|
# Unified Chat Data Endpoint for Polling
|
|
@router.get("/{instanceId}/{workflowId}/chatData")
|
|
@limiter.limit("120/minute")
|
|
def get_workflow_chat_data(
|
|
request: Request,
|
|
instanceId: str = Path(..., description="Feature instance ID"),
|
|
workflowId: str = Path(..., description="ID of the workflow"),
|
|
afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer.
|
|
Returns all data types in chronological order based on _createdAt timestamp.
|
|
"""
|
|
try:
|
|
# Validate access
|
|
_validateInstanceAccess(instanceId, context)
|
|
|
|
# Get service with feature instance context
|
|
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
|
|
|
|
# Verify workflow exists
|
|
workflow = chatInterface.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Workflow with ID {workflowId} not found"
|
|
)
|
|
|
|
# Get unified chat data
|
|
chatData = chatInterface.getUnifiedChatData(workflowId, afterTimestamp)
|
|
|
|
return chatData
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error getting unified chat data: {str(e)}"
|
|
)
|
|
|
|
|
|
# Get workflows for this instance
|
|
@router.get("/{instanceId}/workflows")
|
|
@limiter.limit("120/minute")
|
|
def get_workflows(
|
|
request: Request,
|
|
instanceId: str = Path(..., description="Feature instance ID"),
|
|
page: int = Query(1, ge=1, description="Page number"),
|
|
pageSize: int = Query(20, ge=1, le=100, description="Items per page"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get all workflows for this feature instance.
|
|
"""
|
|
try:
|
|
# Validate access
|
|
_validateInstanceAccess(instanceId, context)
|
|
|
|
# Get service with feature instance context
|
|
chatInterface = _getServiceChat(context, featureInstanceId=instanceId)
|
|
|
|
# Get workflows with pagination
|
|
from modules.datamodels.datamodelPagination import PaginationParams
|
|
pagination = PaginationParams(page=page, pageSize=pageSize)
|
|
|
|
result = chatInterface.getWorkflows(pagination=pagination)
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflows: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error getting workflows: {str(e)}"
|
|
)
|