# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Automation routes for the backend API. Implements the endpoints for automation definition management. """ from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query from typing import List, Dict, Any, Optional from fastapi import status from fastapi.responses import JSONResponse import logging import json # Import interfaces and models from modules.features.automation.interfaceFeatureAutomation import getInterface as getAutomationInterface from modules.auth import limiter, getRequestContext, RequestContext from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition, AutomationTemplate from modules.datamodels.datamodelChat import ChatWorkflow from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict from modules.shared.attributeUtils import getModelAttributeDefinitions # Configure logger logger = logging.getLogger(__name__) # Model attributes for AutomationDefinition automationAttributes = getModelAttributeDefinitions(AutomationDefinition) # Create router for automation endpoints router = APIRouter( prefix="/api/automations", tags=["Manage Automations"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"} } ) @router.get("", response_model=PaginatedResponse[AutomationDefinition]) @limiter.limit("30/minute") def get_automations( request: Request, pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), context: RequestContext = Depends(getRequestContext) ) -> PaginatedResponse[AutomationDefinition]: """ Get automation definitions with optional pagination, sorting, and filtering. Query Parameters: - pagination: JSON-encoded PaginationParams object, or None for no pagination """ try: # Parse pagination parameter paginationParams = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException( status_code=400, detail=f"Invalid pagination parameter: {str(e)}" ) # AutomationDefinitions can belong to ANY feature instance within a mandate. # The list endpoint must show all definitions for the user's mandate, not filter by a specific featureInstanceId. chatInterface = getAutomationInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None) result = chatInterface.getAllAutomationDefinitions(pagination=paginationParams) # If pagination was requested, result is PaginatedResult # If no pagination, result is List[Dict] # Note: Using JSONResponse to bypass Pydantic validation which would filter out _createdBy # The enriched fields (_createdByUserName, mandateName) are not in the Pydantic model from fastapi.responses import JSONResponse if paginationParams: response_data = { "items": result.items, "pagination": { "currentPage": paginationParams.page, "pageSize": paginationParams.pageSize, "totalItems": result.totalItems, "totalPages": result.totalPages, "sort": paginationParams.sort, "filters": paginationParams.filters } } else: response_data = { "items": result, "pagination": None } return JSONResponse(content=response_data) except HTTPException: raise except Exception as e: logger.error(f"Error getting automations: {str(e)}") raise HTTPException( status_code=500, detail=f"Error getting automations: {str(e)}" ) @router.post("", response_model=AutomationDefinition) @limiter.limit("10/minute") def create_automation( request: Request, automation: AutomationDefinition, context: RequestContext = Depends(getRequestContext) ) -> AutomationDefinition: """Create a new automation definition""" try: chatInterface = getAutomationInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None) automationData = automation.model_dump() created = chatInterface.createAutomationDefinition(automationData) return created except HTTPException: raise except Exception as e: logger.error(f"Error creating automation: {str(e)}") raise HTTPException( status_code=500, detail=f"Error creating automation: {str(e)}" ) @router.get("/attributes", response_model=Dict[str, Any]) def get_automation_attributes( request: Request ) -> Dict[str, Any]: """Get attribute definitions for AutomationDefinition model""" return {"attributes": automationAttributes} @router.get("/actions") @limiter.limit("30/minute") def get_available_actions( request: Request, context: RequestContext = Depends(getRequestContext) ) -> JSONResponse: """ Get available workflow actions for template editor. Returns action definitions with parameters and example JSON snippets. """ try: from modules.workflows.processing.shared.methodDiscovery import methods, discoverMethods from modules.services import getInterface as getServices # Ensure methods are discovered (need a service center for discovery) if not methods: # Create a lightweight service center for method discovery services = getServices(context.user, mandateId=context.mandateId) discoverMethods(services) actionsList = [] processedMethods = set() for methodName, methodInfo in methods.items(): # Skip short name aliases - only process full class names (MethodXxx) if not methodName.startswith('Method'): continue shortName = methodName.replace('Method', '').lower() # Skip if already processed if shortName in processedMethods: continue processedMethods.add(shortName) methodInstance = methodInfo.get('instance') if not methodInstance: continue # Get actions from method instance for actionName, actionDef in methodInstance._actions.items(): # Build action info actionInfo = { "method": shortName, "action": actionName, "actionId": actionDef.actionId if hasattr(actionDef, 'actionId') else f"{shortName}.{actionName}", "description": actionDef.description if hasattr(actionDef, 'description') else "", "category": actionDef.category if hasattr(actionDef, 'category') else "general", "parameters": [] } # Add parameters from WorkflowActionParameter parametersDef = actionDef.parameters if hasattr(actionDef, 'parameters') else {} for paramName, paramDef in parametersDef.items(): paramInfo = { "name": paramName, "type": paramDef.type if hasattr(paramDef, 'type') else "Any", "frontendType": paramDef.frontendType.value if hasattr(paramDef, 'frontendType') and paramDef.frontendType else "text", "required": paramDef.required if hasattr(paramDef, 'required') else False, "default": paramDef.default if hasattr(paramDef, 'default') else None, "description": paramDef.description if hasattr(paramDef, 'description') else "", } if hasattr(paramDef, 'frontendOptions') and paramDef.frontendOptions: paramInfo["frontendOptions"] = paramDef.frontendOptions actionInfo["parameters"].append(paramInfo) # Build example JSON snippet for copy/paste exampleParams = {} for paramName, paramDef in parametersDef.items(): if hasattr(paramDef, 'required') and paramDef.required: exampleParams[paramName] = f"{{{{KEY:{paramName}}}}}" else: default = paramDef.default if hasattr(paramDef, 'default') else None exampleParams[paramName] = default or f"{{{{KEY:{paramName}}}}}" actionInfo["exampleJson"] = { "execMethod": shortName, "execAction": actionName, "execParameters": exampleParams, "execResultLabel": f"{shortName}_{actionName}_result" } actionsList.append(actionInfo) return JSONResponse(content={"actions": actionsList}) except Exception as e: logger.error(f"Error getting available actions: {str(e)}") raise HTTPException( status_code=500, detail=f"Error getting available actions: {str(e)}" ) @router.get("/{automationId}", response_model=AutomationDefinition) @limiter.limit("30/minute") def get_automation( request: Request, automationId: str = Path(..., description="Automation ID"), context: RequestContext = Depends(getRequestContext) ) -> AutomationDefinition: """Get a single automation definition by ID""" try: chatInterface = getAutomationInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None) automation = chatInterface.getAutomationDefinition(automationId) if not automation: raise HTTPException( status_code=404, detail=f"Automation {automationId} not found" ) return automation except HTTPException: raise except Exception as e: logger.error(f"Error getting automation: {str(e)}") raise HTTPException( status_code=500, detail=f"Error getting automation: {str(e)}" ) @router.put("/{automationId}", response_model=AutomationDefinition) @limiter.limit("10/minute") def update_automation( request: Request, automationId: str = Path(..., description="Automation ID"), automation: AutomationDefinition = Body(...), context: RequestContext = Depends(getRequestContext) ) -> AutomationDefinition: """Update an automation definition""" try: chatInterface = getAutomationInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None) automationData = automation.model_dump() updated = chatInterface.updateAutomationDefinition(automationId, automationData) return updated except HTTPException: raise except PermissionError as e: raise HTTPException( status_code=403, detail=str(e) ) except Exception as e: logger.error(f"Error updating automation: {str(e)}") raise HTTPException( status_code=500, detail=f"Error updating automation: {str(e)}" ) @router.patch("/{automationId}/status") @limiter.limit("30/minute") def update_automation_status( request: Request, automationId: str = Path(..., description="Automation ID"), active: bool = Body(..., embed=True), context: RequestContext = Depends(getRequestContext) ) -> AutomationDefinition: """Update only the active status of an automation definition""" try: chatInterface = getAutomationInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None) # Get existing automation automation = chatInterface.getAutomationDefinition(automationId) if not automation: raise HTTPException( status_code=404, detail=f"Automation {automationId} not found" ) # Update only the active field automationData = automation if isinstance(automation, dict) else automation.model_dump() automationData['active'] = active updated = chatInterface.updateAutomationDefinition(automationId, automationData) return updated except HTTPException: raise except PermissionError as e: raise HTTPException( status_code=403, detail=str(e) ) except Exception as e: logger.error(f"Error updating automation status: {str(e)}") raise HTTPException( status_code=500, detail=f"Error updating automation status: {str(e)}" ) @router.delete("/{automationId}") @limiter.limit("10/minute") def delete_automation( request: Request, automationId: str = Path(..., description="Automation ID"), context: RequestContext = Depends(getRequestContext) ) -> Response: """Delete an automation definition""" try: chatInterface = getAutomationInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None) success = chatInterface.deleteAutomationDefinition(automationId) if success: return Response(status_code=204) else: raise HTTPException( status_code=500, detail="Failed to delete automation" ) except HTTPException: raise except PermissionError as e: raise HTTPException( status_code=403, detail=str(e) ) except Exception as e: logger.error(f"Error deleting automation: {str(e)}") raise HTTPException( status_code=500, detail=f"Error deleting automation: {str(e)}" ) @router.post("/{automationId}/execute", response_model=ChatWorkflow) @limiter.limit("5/minute") async def execute_automation_route( request: Request, automationId: str = Path(..., description="Automation ID"), context: RequestContext = Depends(getRequestContext) ) -> ChatWorkflow: """Execute an automation immediately (test mode)""" try: from modules.services import getInterface as getServices services = getServices(context.user, mandateId=context.mandateId, featureInstanceId=context.featureInstanceId) # Load automation with current user's context (user has RBAC permissions via UI) automation = services.interfaceDbAutomation.getAutomationDefinition(automationId, includeSystemFields=True) if not automation: raise ValueError(f"Automation {automationId} not found") from modules.workflows.automation import executeAutomation workflow = await executeAutomation(automationId, automation, context.user, services) return workflow except HTTPException: raise except ValueError as e: raise HTTPException( status_code=404, detail=str(e) ) except Exception as e: logger.error(f"Error executing automation: {str(e)}") raise HTTPException( status_code=500, detail=f"Error executing automation: {str(e)}" ) # ============================================================================= # AutomationTemplate Routes (DB-persistiert) # ============================================================================= # Separater Router für /api/automation-templates templateRouter = APIRouter( prefix="/api/automation-templates", tags=["Manage Automation Templates"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"} } ) # Model attributes for AutomationTemplate templateAttributes = getModelAttributeDefinitions(AutomationTemplate) @templateRouter.get("", response_model=PaginatedResponse[AutomationTemplate]) @limiter.limit("30/minute") def get_db_templates( request: Request, pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), context: RequestContext = Depends(getRequestContext) ) -> JSONResponse: """ Get automation templates from database (RBAC-filtered, MY = own templates). Query Parameters: - pagination: JSON-encoded PaginationParams object, or None for no pagination """ try: # Parse pagination parameter paginationParams = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException( status_code=400, detail=f"Invalid pagination parameter: {str(e)}" ) chatInterface = getAutomationInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None ) result = chatInterface.getAllAutomationTemplates(pagination=paginationParams) if paginationParams: response_data = { "items": result.items, "pagination": { "currentPage": paginationParams.page, "pageSize": paginationParams.pageSize, "totalItems": result.totalItems, "totalPages": result.totalPages, "sort": paginationParams.sort, "filters": paginationParams.filters } } else: response_data = { "items": result, "pagination": None } return JSONResponse(content=response_data) except HTTPException: raise except Exception as e: logger.error(f"Error getting templates: {str(e)}") raise HTTPException( status_code=500, detail=f"Error getting templates: {str(e)}" ) @templateRouter.get("/attributes", response_model=Dict[str, Any]) def get_template_attributes( request: Request ) -> Dict[str, Any]: """Get attribute definitions for AutomationTemplate model""" return {"attributes": templateAttributes} @templateRouter.get("/{templateId}") @limiter.limit("30/minute") def get_db_template( request: Request, templateId: str = Path(..., description="Template ID"), context: RequestContext = Depends(getRequestContext) ) -> JSONResponse: """Get a single automation template by ID""" try: chatInterface = getAutomationInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None ) template = chatInterface.getAutomationTemplate(templateId) if not template: raise HTTPException( status_code=404, detail=f"Template {templateId} not found" ) return JSONResponse(content=template) except HTTPException: raise except Exception as e: logger.error(f"Error getting template: {str(e)}") raise HTTPException( status_code=500, detail=f"Error getting template: {str(e)}" ) @templateRouter.post("") @limiter.limit("10/minute") def create_db_template( request: Request, templateData: Dict[str, Any] = Body(...), context: RequestContext = Depends(getRequestContext) ) -> JSONResponse: """Create a new automation template""" try: chatInterface = getAutomationInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None ) created = chatInterface.createAutomationTemplate(templateData, isSysAdmin=context.hasSysAdminRole) return JSONResponse(content=created) except HTTPException: raise except PermissionError as e: raise HTTPException( status_code=403, detail=str(e) ) except Exception as e: logger.error(f"Error creating template: {str(e)}") raise HTTPException( status_code=500, detail=f"Error creating template: {str(e)}" ) @templateRouter.put("/{templateId}") @limiter.limit("10/minute") def update_db_template( request: Request, templateId: str = Path(..., description="Template ID"), templateData: Dict[str, Any] = Body(...), context: RequestContext = Depends(getRequestContext) ) -> JSONResponse: """Update an automation template""" try: chatInterface = getAutomationInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None ) updated = chatInterface.updateAutomationTemplate(templateId, templateData, isSysAdmin=context.hasSysAdminRole) return JSONResponse(content=updated) except HTTPException: raise except PermissionError as e: raise HTTPException( status_code=403, detail=str(e) ) except Exception as e: logger.error(f"Error updating template: {str(e)}") raise HTTPException( status_code=500, detail=f"Error updating template: {str(e)}" ) @templateRouter.delete("/{templateId}") @limiter.limit("10/minute") def delete_db_template( request: Request, templateId: str = Path(..., description="Template ID"), context: RequestContext = Depends(getRequestContext) ) -> Response: """Delete an automation template""" try: chatInterface = getAutomationInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None ) success = chatInterface.deleteAutomationTemplate(templateId, isSysAdmin=context.hasSysAdminRole) if success: return Response(status_code=204) else: raise HTTPException( status_code=404, detail="Template not found or no permission" ) except HTTPException: raise except PermissionError as e: raise HTTPException( status_code=403, detail=str(e) ) except Exception as e: logger.error(f"Error deleting template: {str(e)}") raise HTTPException( status_code=500, detail=f"Error deleting template: {str(e)}" ) @templateRouter.post("/{templateId}/duplicate") @limiter.limit("10/minute") def duplicate_db_template( request: Request, templateId: str = Path(..., description="Template ID to duplicate"), context: RequestContext = Depends(getRequestContext) ) -> JSONResponse: """Duplicate a template into the current feature instance (system or instance template).""" try: chatInterface = getAutomationInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None ) duplicated = chatInterface.duplicateAutomationTemplate(templateId) return JSONResponse(content=duplicated) except HTTPException: raise except PermissionError as e: raise HTTPException( status_code=403, detail=str(e) ) except Exception as e: logger.error(f"Error duplicating template: {str(e)}") raise HTTPException( status_code=500, detail=f"Error duplicating template: {str(e)}" ) @router.post("/{automationId}/duplicate") @limiter.limit("10/minute") def duplicate_automation( request: Request, automationId: str = Path(..., description="Automation definition ID to duplicate"), context: RequestContext = Depends(getRequestContext) ) -> JSONResponse: """Duplicate an automation definition within the same feature instance.""" try: chatInterface = getAutomationInterface( context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None ) duplicated = chatInterface.duplicateAutomationDefinition(automationId) return JSONResponse(content=duplicated) except HTTPException: raise except PermissionError as e: raise HTTPException( status_code=403, detail=str(e) ) except Exception as e: logger.error(f"Error duplicating automation: {str(e)}") raise HTTPException( status_code=500, detail=f"Error duplicating automation: {str(e)}" )