gateway/modules/features/automation/interfaceFeatureAutomation.py
2026-02-03 23:42:27 +01:00

655 lines
28 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface for Automation feature - manages AutomationDefinition and AutomationTemplate.
Uses the PostgreSQL connector for data access with user/mandate filtering.
"""
import logging
import uuid
import math
import asyncio
from typing import Dict, Any, List, Optional, Union
from modules.security.rbac import RbacClass
from modules.datamodels.datamodelRbac import AccessRuleContext
from modules.datamodels.datamodelUam import AccessLevel, User
from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition, AutomationTemplate
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC, buildDataObjectKey
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Singleton factory for Automation instances
_automationInterfaces = {}
class AutomationObjects:
"""
Interface for Automation database operations.
Manages AutomationDefinition and AutomationTemplate with RBAC support.
"""
def __init__(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.userId = currentUser.id if currentUser else None
# Initialize database with proper configuration
self._initializeDatabase()
# Initialize RBAC (dbApp = self.db since we use poweron_app)
self.rbac = RbacClass(self.db, dbApp=self.db)
# Update database context
self.db.updateContext(self.userId)
def _initializeDatabase(self):
"""Initializes the database connection with proper configuration."""
# Get configuration values
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = "poweron_automation"
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
# Create database connector with full configuration
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
# Initialize database system
self.db.initDbSystem()
logger.debug(f"Automation database initialized for user {self.userId}")
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""Update user context for the interface."""
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.userId = currentUser.id if currentUser else None
if hasattr(self.db, 'updateContext'):
self.db.updateContext(self.userId)
def checkRbacPermission(self, model, action: str, recordId: str = None) -> bool:
"""Check RBAC permission for a specific action on a model."""
objectKey = buildDataObjectKey(model.__name__)
permissions = self.rbac.getUserPermissions(
user=self.currentUser,
context=AccessRuleContext.DATA,
item=objectKey
)
accessLevel = getattr(permissions, action, AccessLevel.NONE)
if accessLevel == AccessLevel.ALL:
return True
elif accessLevel == AccessLevel.GROUP:
return True
elif accessLevel == AccessLevel.MY:
if recordId:
record = self.db.getRecordset(model, {"id": recordId})
if record:
return record[0].get("_createdBy") == self.userId
return True
return False
# =========================================================================
# AutomationDefinition CRUD methods
# =========================================================================
def _computeAutomationStatus(self, automation: Dict[str, Any]) -> str:
"""Compute status field based on eventId presence"""
eventId = automation.get("eventId")
return "Running" if eventId else "Idle"
def _enrichAutomationsWithUserAndMandate(self, automations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Batch enrich automations with user names and mandate names for display.
Uses AppObjects interface to fetch users and mandates with proper access control.
"""
if not automations:
return automations
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
# Collect all unique user IDs and mandate IDs
userIds = set()
mandateIds = set()
for automation in automations:
createdBy = automation.get("_createdBy")
if createdBy:
userIds.add(createdBy)
mandateId = automation.get("mandateId")
if mandateId:
mandateIds.add(mandateId)
# Use AppObjects interface to fetch users (respects access control)
appInterface = getAppInterface(self.currentUser)
usersMap = {}
if userIds:
for userId in userIds:
user = appInterface.getUser(userId)
if user:
usersMap[userId] = user.username or user.email or userId
# Use AppObjects interface to fetch mandates (respects access control)
mandatesMap = {}
if mandateIds:
for mandateId in mandateIds:
mandate = appInterface.getMandate(mandateId)
if mandate:
mandatesMap[mandateId] = mandate.name or mandateId
# Enrich each automation with the fetched data
for automation in automations:
createdBy = automation.get("_createdBy")
if createdBy:
automation["_createdByUserName"] = usersMap.get(createdBy, createdBy)
else:
automation["_createdByUserName"] = "-"
mandateId = automation.get("mandateId")
if mandateId:
automation["mandateName"] = mandatesMap.get(mandateId, mandateId)
else:
automation["mandateName"] = "-"
return automations
def _enrichAutomationWithUserAndMandate(self, automation: Dict[str, Any]) -> Dict[str, Any]:
"""
Enrich a single automation with user name and mandate name for display.
For multiple automations, use _enrichAutomationsWithUserAndMandate for better performance.
"""
return self._enrichAutomationsWithUserAndMandate([automation])[0]
def getAllAutomationDefinitions(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
"""
Returns automation definitions based on user access level.
Supports optional pagination, sorting, and filtering.
Computes status field for each automation.
"""
# Use RBAC filtering
filteredAutomations = getRecordsetWithRBAC(
self.db,
AutomationDefinition,
self.currentUser
)
# Compute status for each automation and normalize executionLogs
for automation in filteredAutomations:
automation["status"] = self._computeAutomationStatus(automation)
# Ensure executionLogs is always a list, not None
if automation.get("executionLogs") is None:
automation["executionLogs"] = []
# Batch enrich with user and mandate names
self._enrichAutomationsWithUserAndMandate(filteredAutomations)
# If no pagination requested, return all items
if pagination is None:
return filteredAutomations
# Apply filtering (if filters provided)
if pagination.filters:
filteredAutomations = self._applyFilters(filteredAutomations, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredAutomations = self._applySorting(filteredAutomations, pagination.sort)
# Count total items after filters
totalItems = len(filteredAutomations)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedAutomations = filteredAutomations[startIdx:endIdx]
return PaginatedResult(
items=pagedAutomations,
totalItems=totalItems,
totalPages=totalPages
)
def _applyFilters(self, items: List[Dict], filters: Dict[str, Any]) -> List[Dict]:
"""Apply filters to a list of items."""
if not filters:
return items
filtered = []
for item in items:
match = True
for key, value in filters.items():
itemValue = item.get(key)
if isinstance(value, str) and isinstance(itemValue, str):
if value.lower() not in itemValue.lower():
match = False
break
elif itemValue != value:
match = False
break
if match:
filtered.append(item)
return filtered
def _applySorting(self, items: List[Dict], sortFields: List[Dict]) -> List[Dict]:
"""Apply sorting to a list of items."""
if not sortFields:
return items
for sortField in reversed(sortFields):
field = sortField.get("field", "")
direction = sortField.get("direction", "asc")
reverse = direction.lower() == "desc"
items = sorted(items, key=lambda x: x.get(field, ""), reverse=reverse)
return items
def getAutomationDefinition(self, automationId: str, includeSystemFields: bool = False) -> Optional[AutomationDefinition]:
"""Returns an automation definition by ID if user has access, with computed status.
Args:
automationId: ID of the automation to get
includeSystemFields: If True, returns raw dict with system fields (_createdBy, etc).
If False (default), returns Pydantic model without system fields.
"""
try:
# Use RBAC filtering
filtered = getRecordsetWithRBAC(
self.db,
AutomationDefinition,
self.currentUser,
recordFilter={"id": automationId}
)
if not filtered:
return None
automation = filtered[0]
automation["status"] = self._computeAutomationStatus(automation)
# Ensure executionLogs is always a list, not None
if automation.get("executionLogs") is None:
automation["executionLogs"] = []
# Enrich with user and mandate names
self._enrichAutomationWithUserAndMandate(automation)
# For internal use (execution), return raw dict with system fields
if includeSystemFields:
# Return as simple namespace object so getattr works
class AutomationWithSystemFields:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
return AutomationWithSystemFields(automation)
# Clean metadata fields and return Pydantic model
cleanedRecord = {k: v for k, v in automation.items() if not k.startswith("_")}
return AutomationDefinition(**cleanedRecord)
except Exception as e:
logger.error(f"Error getting automation definition: {str(e)}")
return None
def createAutomationDefinition(self, automationData: Dict[str, Any]) -> AutomationDefinition:
"""Creates a new automation definition, then triggers sync."""
try:
# Ensure ID is present
if "id" not in automationData or not automationData["id"]:
automationData["id"] = str(uuid.uuid4())
# Ensure mandateId and featureInstanceId are set for proper data isolation
if "mandateId" not in automationData or not automationData.get("mandateId"):
# Use request context mandateId, or fall back to Root mandate
effectiveMandateId = self.mandateId
if not effectiveMandateId:
# Fall back to Root mandate (first mandate in system)
try:
from modules.datamodels.datamodelUam import Mandate
from modules.security.rootAccess import getRootDbAppConnector
dbAppConn = getRootDbAppConnector()
allMandates = dbAppConn.getRecordset(Mandate)
if allMandates:
effectiveMandateId = allMandates[0].get("id")
logger.debug(f"createAutomationDefinition: Using Root mandate {effectiveMandateId}")
except Exception as e:
logger.warning(f"Could not get Root mandate: {e}")
automationData["mandateId"] = effectiveMandateId
if "featureInstanceId" not in automationData:
automationData["featureInstanceId"] = self.featureInstanceId
# Ensure database connector has correct userId context
if not self.userId:
logger.error(f"createAutomationDefinition: userId is not set! Cannot set _createdBy. currentUser={self.currentUser}")
elif hasattr(self.db, 'updateContext'):
try:
self.db.updateContext(self.userId)
logger.debug(f"createAutomationDefinition: Updated database context with userId={self.userId}")
except Exception as e:
logger.warning(f"Could not update database context: {e}")
# Create automation in database
createdAutomation = self.db.recordCreate(AutomationDefinition, automationData)
# Compute status
createdAutomation["status"] = self._computeAutomationStatus(createdAutomation)
# Ensure executionLogs is always a list, not None
if createdAutomation.get("executionLogs") is None:
createdAutomation["executionLogs"] = []
# Trigger automation change callback (async, don't wait)
asyncio.create_task(self._notifyAutomationChanged())
# Clean metadata fields and return Pydantic model
cleanedRecord = {k: v for k, v in createdAutomation.items() if not k.startswith("_")}
return AutomationDefinition(**cleanedRecord)
except Exception as e:
logger.error(f"Error creating automation definition: {str(e)}")
raise
def updateAutomationDefinition(self, automationId: str, automationData: Dict[str, Any]) -> AutomationDefinition:
"""Updates an automation definition, then triggers sync."""
try:
# Check access
existing = self.getAutomationDefinition(automationId)
if not existing:
raise PermissionError(f"No access to automation {automationId}")
if not self.checkRbacPermission(AutomationDefinition, "update", automationId):
raise PermissionError(f"No permission to modify automation {automationId}")
# Update automation in database
updatedAutomation = self.db.recordModify(AutomationDefinition, automationId, automationData)
# Compute status
updatedAutomation["status"] = self._computeAutomationStatus(updatedAutomation)
# Ensure executionLogs is always a list, not None
if updatedAutomation.get("executionLogs") is None:
updatedAutomation["executionLogs"] = []
# Trigger automation change callback (async, don't wait)
asyncio.create_task(self._notifyAutomationChanged())
# Clean metadata fields and return Pydantic model
cleanedRecord = {k: v for k, v in updatedAutomation.items() if not k.startswith("_")}
return AutomationDefinition(**cleanedRecord)
except Exception as e:
logger.error(f"Error updating automation definition: {str(e)}")
raise
def deleteAutomationDefinition(self, automationId: str) -> bool:
"""Deletes an automation definition, then triggers sync."""
try:
# Check access
existing = self.getAutomationDefinition(automationId)
if not existing:
raise PermissionError(f"No access to automation {automationId}")
if not self.checkRbacPermission(AutomationDefinition, "delete", automationId):
raise PermissionError(f"No permission to delete automation {automationId}")
# Delete automation from database
self.db.recordDelete(AutomationDefinition, automationId)
# Trigger automation change callback (async, don't wait)
asyncio.create_task(self._notifyAutomationChanged())
return True
except Exception as e:
logger.error(f"Error deleting automation definition: {str(e)}")
raise
def getAllAutomationDefinitionsWithRBAC(self, user: User) -> List[Dict[str, Any]]:
"""
Get all automation definitions filtered by RBAC for a specific user.
This method encapsulates getRecordsetWithRBAC() to avoid exposing the connector.
Args:
user: User object for RBAC filtering
Returns:
List of automation definition dictionaries filtered by RBAC
"""
return getRecordsetWithRBAC(
self.db,
AutomationDefinition,
user
)
# =========================================================================
# AutomationTemplate CRUD methods
# =========================================================================
def getAllAutomationTemplates(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
"""
Returns automation templates filtered by RBAC (MY = own templates).
Supports optional pagination, sorting, and filtering.
"""
# Use RBAC filtering
filteredTemplates = getRecordsetWithRBAC(
self.db,
AutomationTemplate,
self.currentUser
)
# Enrich with user names
self._enrichTemplatesWithUserName(filteredTemplates)
# If no pagination requested, return all items
if pagination is None:
return filteredTemplates
# Apply filtering (if filters provided)
if pagination.filters:
filteredTemplates = self._applyFilters(filteredTemplates, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredTemplates = self._applySorting(filteredTemplates, pagination.sort)
# Count total items after filters
totalItems = len(filteredTemplates)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedTemplates = filteredTemplates[startIdx:endIdx]
return PaginatedResult(
items=pagedTemplates,
totalItems=totalItems,
totalPages=totalPages
)
def _enrichTemplatesWithUserName(self, templates: List[Dict[str, Any]]) -> None:
"""Batch enrich templates with creator user names."""
if not templates:
return
# Collect unique user IDs
userIds = set()
for template in templates:
createdBy = template.get("_createdBy")
if createdBy:
userIds.add(createdBy)
if not userIds:
return
# Batch fetch users
try:
from modules.datamodels.datamodelUam import UserInDB
from modules.security.rootAccess import getRootDbAppConnector
dbAppConn = getRootDbAppConnector()
userNameMap = {}
for userId in userIds:
users = dbAppConn.getRecordset(UserInDB, {"id": userId})
if users:
user = users[0]
fullName = f"{user.get('firstName', '')} {user.get('lastName', '')}".strip()
userNameMap[userId] = fullName or user.get("email", "Unknown")
# Apply to templates
for template in templates:
createdBy = template.get("_createdBy")
if createdBy and createdBy in userNameMap:
template["_createdByUserName"] = userNameMap[createdBy]
except Exception as e:
logger.warning(f"Could not enrich templates with user names: {e}")
def getAutomationTemplate(self, templateId: str) -> Optional[Dict[str, Any]]:
"""Returns an automation template by ID if user has access."""
try:
filtered = getRecordsetWithRBAC(
self.db,
AutomationTemplate,
self.currentUser,
recordFilter={"id": templateId}
)
if not filtered:
return None
template = filtered[0]
self._enrichTemplatesWithUserName([template])
return template
except Exception as e:
logger.error(f"Error getting automation template: {str(e)}")
return None
def createAutomationTemplate(self, templateData: Dict[str, Any]) -> Dict[str, Any]:
"""Creates a new automation template."""
try:
# Ensure ID is present
if "id" not in templateData or not templateData["id"]:
templateData["id"] = str(uuid.uuid4())
# RBAC check
if not self.checkRbacPermission(AutomationTemplate, "create"):
raise PermissionError("No permission to create template")
# Ensure database connector has correct userId context
if self.userId and hasattr(self.db, 'updateContext'):
try:
self.db.updateContext(self.userId)
except Exception as e:
logger.warning(f"Could not update database context: {e}")
# Convert template field to string if it's a dict (frontend may send parsed JSON)
if "template" in templateData and isinstance(templateData["template"], dict):
import json
templateData["template"] = json.dumps(templateData["template"])
# Validate through Pydantic model to ensure proper type conversion
# This converts dict fields like TextMultilingual to proper Pydantic objects
validatedTemplate = AutomationTemplate(**templateData)
# Create template in database using model_dump for proper serialization
createdTemplate = self.db.recordCreate(AutomationTemplate, validatedTemplate.model_dump())
return createdTemplate
except Exception as e:
logger.error(f"Error creating automation template: {str(e)}")
raise
def updateAutomationTemplate(self, templateId: str, templateData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates an automation template."""
try:
# Check access
existing = self.getAutomationTemplate(templateId)
if not existing:
raise PermissionError(f"No access to template {templateId}")
if not self.checkRbacPermission(AutomationTemplate, "update", templateId):
raise PermissionError(f"No permission to modify template {templateId}")
# Convert template field to string if it's a dict (frontend may send parsed JSON)
if "template" in templateData and isinstance(templateData["template"], dict):
import json
templateData["template"] = json.dumps(templateData["template"])
# Merge existing data with update data for partial updates
mergedData = {**existing, **templateData}
mergedData["id"] = templateId # Ensure ID is preserved
# Validate through Pydantic model to ensure proper type conversion
validatedTemplate = AutomationTemplate(**mergedData)
# Update template in database using model_dump for proper serialization
updatedTemplate = self.db.recordModify(AutomationTemplate, templateId, validatedTemplate.model_dump())
return updatedTemplate
except Exception as e:
logger.error(f"Error updating automation template: {str(e)}")
raise
def deleteAutomationTemplate(self, templateId: str) -> bool:
"""Deletes an automation template."""
try:
# Check access using RBAC
existing = self.getAutomationTemplate(templateId)
if not existing:
return False
if not self.checkRbacPermission(AutomationTemplate, "delete", templateId):
raise PermissionError(f"No permission to delete template {templateId}")
# Delete template from database
self.db.recordDelete(AutomationTemplate, templateId)
return True
except Exception as e:
logger.error(f"Error deleting automation template: {str(e)}")
raise
async def _notifyAutomationChanged(self):
"""Notify registered callbacks about automation changes (decoupled from features)."""
try:
from modules.shared.callbackRegistry import callbackRegistry
# Trigger callbacks without knowing which features are listening
await callbackRegistry.trigger('automation.changed', self)
except Exception as e:
logger.error(f"Error notifying automation change: {str(e)}")
def getInterface(currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> 'AutomationObjects':
"""
Returns an AutomationObjects instance for the current user.
Handles initialization of database and records.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header).
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header).
"""
if not currentUser:
raise ValueError("Invalid user context: user is required")
effectiveMandateId = str(mandateId) if mandateId else None
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
# Create context key including featureInstanceId for proper isolation
contextKey = f"automation_{effectiveMandateId}_{effectiveFeatureInstanceId}_{currentUser.id}"
# Create new instance if not exists
if contextKey not in _automationInterfaces:
_automationInterfaces[contextKey] = AutomationObjects(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
else:
# Update user context if needed
_automationInterfaces[contextKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
return _automationInterfaces[contextKey]