gateway/modules/features/automation/interfaceFeatureAutomation.py
2026-02-12 00:34:17 +01:00

860 lines
38 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface for Automation feature - manages AutomationDefinition and AutomationTemplate.
Uses the PostgreSQL connector for data access with user/mandate filtering.
"""
import logging
import uuid
import math
from typing import Dict, Any, List, Optional, Union
from modules.security.rbac import RbacClass
from modules.datamodels.datamodelRbac import AccessRuleContext
from modules.datamodels.datamodelUam import AccessLevel, User
from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition, AutomationTemplate
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC, buildDataObjectKey
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Singleton factory for Automation instances
_automationInterfaces = {}
class AutomationObjects:
"""
Interface for Automation database operations.
Manages AutomationDefinition and AutomationTemplate with RBAC support.
"""
def __init__(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.userId = currentUser.id if currentUser else None
# Initialize database with proper configuration
self._initializeDatabase()
# Initialize RBAC - AccessRules are in poweron_app, not poweron_automation!
from modules.security.rootAccess import getRootDbAppConnector
dbApp = getRootDbAppConnector()
self.rbac = RbacClass(self.db, dbApp=dbApp)
# Update database context
self.db.updateContext(self.userId)
def _initializeDatabase(self):
"""Initializes the database connection with proper configuration."""
# Get configuration values
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = "poweron_automation"
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
# Create database connector with full configuration
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
logger.debug(f"Automation database initialized for user {self.userId}")
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""Update user context for the interface."""
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.userId = currentUser.id if currentUser else None
if hasattr(self.db, 'updateContext'):
self.db.updateContext(self.userId)
def checkRbacPermission(self, model, action: str, recordId: str = None) -> bool:
"""Check RBAC permission for a specific action on a model."""
objectKey = buildDataObjectKey(model.__name__)
permissions = self.rbac.getUserPermissions(
user=self.currentUser,
context=AccessRuleContext.DATA,
item=objectKey,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
accessLevel = getattr(permissions, action, AccessLevel.NONE)
if accessLevel == AccessLevel.ALL:
return True
elif accessLevel == AccessLevel.GROUP:
return True
elif accessLevel == AccessLevel.MY:
if recordId:
record = self.db.getRecordset(model, recordFilter={"id": recordId})
if record:
return record[0].get("_createdBy") == self.userId
else:
return False # Record not found = no access
return True # No recordId needed (e.g., for CREATE)
return False
# =========================================================================
# AutomationDefinition CRUD methods
# =========================================================================
def _computeAutomationStatus(self, automation: Dict[str, Any]) -> str:
"""Compute status field based on eventId presence"""
eventId = automation.get("eventId")
return "Running" if eventId else "Idle"
def _enrichAutomationsWithUserAndMandate(self, automations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Batch enrich automations with user names, mandate names and feature instance labels.
Uses direct DB lookup (no RBAC) because this is purely cosmetic enrichment —
the user already has RBAC-verified access to the automations themselves.
"""
if not automations:
return automations
# Collect all unique IDs
userIds = set()
mandateIds = set()
featureInstanceIds = set()
for automation in automations:
createdBy = automation.get("_createdBy")
if createdBy:
userIds.add(createdBy)
mandateId = automation.get("mandateId")
if mandateId:
mandateIds.add(mandateId)
featureInstanceId = automation.get("featureInstanceId")
if featureInstanceId:
featureInstanceIds.add(featureInstanceId)
# Use root DB connector for display-only lookups (no RBAC needed)
usersMap = {}
mandatesMap = {}
featureInstancesMap = {}
try:
from modules.datamodels.datamodelUam import UserInDB, Mandate
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.security.rootAccess import getRootDbAppConnector
dbAppConn = getRootDbAppConnector()
# Batch fetch user display names
if userIds:
for userId in userIds:
users = dbAppConn.getRecordset(UserInDB, recordFilter={"id": userId})
if users:
user = users[0]
displayName = user.get("fullName") or user.get("username") or user.get("email") or None
if displayName:
usersMap[userId] = displayName
# Batch fetch mandate display names
if mandateIds:
for mandateId in mandateIds:
mandates = dbAppConn.getRecordset(Mandate, recordFilter={"id": mandateId})
if mandates:
label = mandates[0].get("label") or mandates[0].get("name") or None
if label:
mandatesMap[mandateId] = label
# Batch fetch feature instance labels
if featureInstanceIds:
for fiId in featureInstanceIds:
instances = dbAppConn.getRecordset(FeatureInstance, recordFilter={"id": fiId})
if instances:
fi = instances[0]
label = fi.get("label") or fi.get("featureCode") or None
if label:
featureInstancesMap[fiId] = label
except Exception as e:
logger.warning(f"Could not enrich automations with display names: {e}")
# Enrich each automation with the fetched data
# SECURITY: Never show a fallback name — if lookup fails, show empty string
for automation in automations:
createdBy = automation.get("_createdBy")
automation["_createdByUserName"] = usersMap.get(createdBy, "") if createdBy else ""
mandateId = automation.get("mandateId")
automation["mandateName"] = mandatesMap.get(mandateId, "") if mandateId else ""
featureInstanceId = automation.get("featureInstanceId")
automation["featureInstanceName"] = featureInstancesMap.get(featureInstanceId, "") if featureInstanceId else ""
return automations
def _enrichAutomationWithUserAndMandate(self, automation: Dict[str, Any]) -> Dict[str, Any]:
"""
Enrich a single automation with user name and mandate name for display.
For multiple automations, use _enrichAutomationsWithUserAndMandate for better performance.
"""
return self._enrichAutomationsWithUserAndMandate([automation])[0]
def getAllAutomationDefinitions(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
"""
Returns automation definitions based on user access level.
Supports optional pagination, sorting, and filtering.
Computes status field for each automation.
"""
# AutomationDefinitions can belong to any feature instance within a mandate.
# Filter by mandateId only — not by featureInstanceId — to show all definitions across features.
filteredAutomations = getRecordsetWithRBAC(
self.db,
AutomationDefinition,
self.currentUser,
mandateId=self.mandateId
)
# Compute status for each automation and normalize executionLogs
for automation in filteredAutomations:
automation["status"] = self._computeAutomationStatus(automation)
# Ensure executionLogs is always a list, not None
if automation.get("executionLogs") is None:
automation["executionLogs"] = []
# Batch enrich with user and mandate names
self._enrichAutomationsWithUserAndMandate(filteredAutomations)
# If no pagination requested, return all items
if pagination is None:
return filteredAutomations
# Apply filtering (if filters provided)
if pagination.filters:
filteredAutomations = self._applyFilters(filteredAutomations, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredAutomations = self._applySorting(filteredAutomations, pagination.sort)
# Count total items after filters
totalItems = len(filteredAutomations)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedAutomations = filteredAutomations[startIdx:endIdx]
return PaginatedResult(
items=pagedAutomations,
totalItems=totalItems,
totalPages=totalPages
)
def _applyFilters(self, items: List[Dict], filters: Dict[str, Any]) -> List[Dict]:
"""Apply filters to a list of items."""
if not filters:
return items
filtered = []
for item in items:
match = True
for key, value in filters.items():
itemValue = item.get(key)
if isinstance(value, str) and isinstance(itemValue, str):
if value.lower() not in itemValue.lower():
match = False
break
elif itemValue != value:
match = False
break
if match:
filtered.append(item)
return filtered
def _applySorting(self, items: List[Dict], sortFields: List[Dict]) -> List[Dict]:
"""Apply sorting to a list of items."""
if not sortFields:
return items
for sortField in reversed(sortFields):
field = sortField.get("field", "")
direction = sortField.get("direction", "asc")
reverse = direction.lower() == "desc"
items = sorted(items, key=lambda x: x.get(field, ""), reverse=reverse)
return items
def getAutomationDefinition(self, automationId: str, includeSystemFields: bool = False) -> Optional[AutomationDefinition]:
"""Returns an automation definition by ID if user has access, with computed status.
Args:
automationId: ID of the automation to get
includeSystemFields: If True, returns raw dict with system fields (_createdBy, etc).
If False (default), returns Pydantic model without system fields.
"""
try:
# AutomationDefinitions can belong to any feature instance within a mandate.
# Filter by mandateId only — not by featureInstanceId.
filtered = getRecordsetWithRBAC(
self.db,
AutomationDefinition,
self.currentUser,
recordFilter={"id": automationId},
mandateId=self.mandateId
)
if not filtered:
return None
automation = filtered[0]
automation["status"] = self._computeAutomationStatus(automation)
# Ensure executionLogs is always a list, not None
if automation.get("executionLogs") is None:
automation["executionLogs"] = []
# Enrich with user and mandate names
self._enrichAutomationWithUserAndMandate(automation)
# For internal use (execution), return raw dict with system fields
if includeSystemFields:
# Return as simple namespace object so getattr works
class AutomationWithSystemFields:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
return AutomationWithSystemFields(automation)
# Clean metadata fields and return Pydantic model
cleanedRecord = {k: v for k, v in automation.items() if not k.startswith("_")}
return AutomationDefinition(**cleanedRecord)
except Exception as e:
logger.error(f"Error getting automation definition: {str(e)}")
return None
def createAutomationDefinition(self, automationData: Dict[str, Any]) -> AutomationDefinition:
"""Creates a new automation definition, then triggers sync."""
try:
# Ensure ID is present
if "id" not in automationData or not automationData["id"]:
automationData["id"] = str(uuid.uuid4())
# Ensure mandateId and featureInstanceId are set for proper data isolation
if "mandateId" not in automationData or not automationData.get("mandateId"):
# Use request context mandateId, or fall back to Root mandate
effectiveMandateId = self.mandateId
if not effectiveMandateId:
# Fall back to Root mandate (first mandate in system)
try:
from modules.datamodels.datamodelUam import Mandate
from modules.security.rootAccess import getRootDbAppConnector
dbAppConn = getRootDbAppConnector()
allMandates = dbAppConn.getRecordset(Mandate)
if allMandates:
effectiveMandateId = allMandates[0].get("id")
logger.debug(f"createAutomationDefinition: Using Root mandate {effectiveMandateId}")
except Exception as e:
logger.warning(f"Could not get Root mandate: {e}")
automationData["mandateId"] = effectiveMandateId
if "featureInstanceId" not in automationData:
automationData["featureInstanceId"] = self.featureInstanceId
# Ensure database connector has correct userId context
if not self.userId:
logger.error(f"createAutomationDefinition: userId is not set! Cannot set _createdBy. currentUser={self.currentUser}")
elif hasattr(self.db, 'updateContext'):
try:
self.db.updateContext(self.userId)
logger.debug(f"createAutomationDefinition: Updated database context with userId={self.userId}")
except Exception as e:
logger.warning(f"Could not update database context: {e}")
# Create automation in database
createdAutomation = self.db.recordCreate(AutomationDefinition, automationData)
# Compute status
createdAutomation["status"] = self._computeAutomationStatus(createdAutomation)
# Ensure executionLogs is always a list, not None
if createdAutomation.get("executionLogs") is None:
createdAutomation["executionLogs"] = []
# Trigger automation change callback
self._notifyAutomationChanged()
# Clean metadata fields and return Pydantic model
cleanedRecord = {k: v for k, v in createdAutomation.items() if not k.startswith("_")}
return AutomationDefinition(**cleanedRecord)
except Exception as e:
logger.error(f"Error creating automation definition: {str(e)}")
raise
def _saveExecutionLog(self, automationId: str, executionLogs: List[Dict[str, Any]]) -> None:
"""
Save execution logs to an automation definition WITHOUT RBAC check.
This is a system-level operation: when a user executes an automation,
the execution log must be saved regardless of whether the user has
'update' permission on the AutomationDefinition. The user already
proved they have execute/read access by loading the automation.
"""
try:
self.db.recordModify(AutomationDefinition, automationId, {"executionLogs": executionLogs})
logger.debug(f"Saved execution log for automation {automationId}")
except Exception as e:
logger.warning(f"Could not save execution log for automation {automationId}: {e}")
def updateAutomationDefinition(self, automationId: str, automationData: Dict[str, Any]) -> AutomationDefinition:
"""Updates an automation definition, then triggers sync."""
try:
# Check access
existing = self.getAutomationDefinition(automationId)
if not existing:
raise PermissionError(f"No access to automation {automationId}")
if not self.checkRbacPermission(AutomationDefinition, "update", automationId):
raise PermissionError(f"No permission to modify automation {automationId}")
# If deactivating: immediately remove scheduler job (don't rely on async callback)
isBeingDeactivated = "active" in automationData and not automationData["active"]
if isBeingDeactivated:
existingEventId = getattr(existing, "eventId", None) if not isinstance(existing, dict) else existing.get("eventId")
if existingEventId:
try:
from modules.shared.eventManagement import eventManager
eventManager.remove(existingEventId)
logger.info(f"Removed scheduler job {existingEventId} (automation deactivated)")
except Exception as e:
logger.warning(f"Could not remove scheduler job {existingEventId}: {e}")
automationData["eventId"] = None
# Update automation in database
updatedAutomation = self.db.recordModify(AutomationDefinition, automationId, automationData)
# Compute status
updatedAutomation["status"] = self._computeAutomationStatus(updatedAutomation)
# Ensure executionLogs is always a list, not None
if updatedAutomation.get("executionLogs") is None:
updatedAutomation["executionLogs"] = []
# Trigger automation change callback
self._notifyAutomationChanged()
# Clean metadata fields and return Pydantic model
cleanedRecord = {k: v for k, v in updatedAutomation.items() if not k.startswith("_")}
return AutomationDefinition(**cleanedRecord)
except Exception as e:
logger.error(f"Error updating automation definition: {str(e)}")
raise
def deleteAutomationDefinition(self, automationId: str) -> bool:
"""Deletes an automation definition, then triggers sync."""
try:
# Check access
existing = self.getAutomationDefinition(automationId)
if not existing:
raise PermissionError(f"No access to automation {automationId}")
if not self.checkRbacPermission(AutomationDefinition, "delete", automationId):
raise PermissionError(f"No permission to delete automation {automationId}")
# Delete automation from database
self.db.recordDelete(AutomationDefinition, automationId)
# Trigger automation change callback
self._notifyAutomationChanged()
return True
except Exception as e:
logger.error(f"Error deleting automation definition: {str(e)}")
raise
def getAllAutomationDefinitionsWithRBAC(self, user: User) -> List[Dict[str, Any]]:
"""
Get all automation definitions filtered by RBAC for a specific user.
This method encapsulates getRecordsetWithRBAC() to avoid exposing the connector.
Args:
user: User object for RBAC filtering
Returns:
List of automation definition dictionaries filtered by RBAC
"""
return getRecordsetWithRBAC(
self.db,
AutomationDefinition,
user,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# =========================================================================
# AutomationTemplate CRUD methods
# =========================================================================
def getAllAutomationTemplates(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
"""
Returns automation templates: system templates + instance templates for current instance.
System templates (isSystem=True) are always included (read-only for non-SysAdmin).
Instance templates (featureInstanceId matches) are included with RBAC filtering.
"""
# 1. System templates — always visible to all users
systemTemplates = self.db.getRecordset(
AutomationTemplate,
recordFilter={"isSystem": True}
)
# 2. Instance templates — scoped to current feature instance, RBAC-filtered
instanceTemplates = []
if self.featureInstanceId:
allInstanceTemplates = self.db.getRecordset(
AutomationTemplate,
recordFilter={"featureInstanceId": self.featureInstanceId, "isSystem": False}
)
# Apply RBAC filtering on instance templates
for t in allInstanceTemplates:
instanceTemplates.append(t)
# Combine: system first, then instance
filteredTemplates = systemTemplates + instanceTemplates
# Enrich with user names
self._enrichTemplatesWithUserName(filteredTemplates)
# If no pagination requested, return all items
if pagination is None:
return filteredTemplates
# Apply filtering (if filters provided)
if pagination.filters:
filteredTemplates = self._applyFilters(filteredTemplates, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredTemplates = self._applySorting(filteredTemplates, pagination.sort)
# Count total items after filters
totalItems = len(filteredTemplates)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedTemplates = filteredTemplates[startIdx:endIdx]
return PaginatedResult(
items=pagedTemplates,
totalItems=totalItems,
totalPages=totalPages
)
def _enrichTemplatesWithUserName(self, templates: List[Dict[str, Any]]) -> None:
"""Batch enrich templates with creator user names."""
if not templates:
return
# Collect unique user IDs
userIds = set()
for template in templates:
createdBy = template.get("_createdBy")
if createdBy:
userIds.add(createdBy)
if not userIds:
return
# Batch fetch users
try:
from modules.datamodels.datamodelUam import UserInDB
from modules.security.rootAccess import getRootDbAppConnector
dbAppConn = getRootDbAppConnector()
userNameMap = {}
for userId in userIds:
users = dbAppConn.getRecordset(UserInDB, recordFilter={"id": userId})
if users:
user = users[0]
displayName = user.get("fullName") or user.get("username") or user.get("email") or None
if displayName:
userNameMap[userId] = displayName
# Apply to templates — SECURITY: no fallback, empty if not found
for template in templates:
createdBy = template.get("_createdBy")
template["_createdByUserName"] = userNameMap.get(createdBy, "") if createdBy else ""
except Exception as e:
logger.warning(f"Could not enrich templates with user names: {e}")
def getAutomationTemplate(self, templateId: str) -> Optional[Dict[str, Any]]:
"""Returns an automation template by ID (system templates always accessible, instance templates scoped)."""
try:
records = self.db.getRecordset(
AutomationTemplate,
recordFilter={"id": templateId}
)
if not records:
return None
template = records[0]
# System templates are readable by everyone
if template.get("isSystem"):
self._enrichTemplatesWithUserName([template])
return template
# Instance templates: must belong to current feature instance
templateInstanceId = template.get("featureInstanceId")
if templateInstanceId and self.featureInstanceId and str(templateInstanceId) != str(self.featureInstanceId):
return None # Not in this instance
self._enrichTemplatesWithUserName([template])
return template
except Exception as e:
logger.error(f"Error getting automation template: {str(e)}")
return None
def createAutomationTemplate(self, templateData: Dict[str, Any], isSysAdmin: bool = False) -> Dict[str, Any]:
"""Creates a new automation template.
System templates (isSystem=True) can only be created by SysAdmin.
Instance templates get featureInstanceId from context.
"""
try:
# Ensure ID is present
if "id" not in templateData or not templateData["id"]:
templateData["id"] = str(uuid.uuid4())
# System template protection
if templateData.get("isSystem") and not isSysAdmin:
raise PermissionError("Only SysAdmin can create system templates")
# Set featureInstanceId for non-system templates
if not templateData.get("isSystem"):
templateData["featureInstanceId"] = self.featureInstanceId
templateData["isSystem"] = False
# RBAC check (for non-system templates)
if not isSysAdmin and not self.checkRbacPermission(AutomationTemplate, "create"):
raise PermissionError("No permission to create template")
# Ensure database connector has correct userId context
if self.userId and hasattr(self.db, 'updateContext'):
try:
self.db.updateContext(self.userId)
except Exception as e:
logger.warning(f"Could not update database context: {e}")
# Convert template field to string if it's a dict (frontend may send parsed JSON)
if "template" in templateData and isinstance(templateData["template"], dict):
import json
templateData["template"] = json.dumps(templateData["template"])
# Validate through Pydantic model to ensure proper type conversion
validatedTemplate = AutomationTemplate(**templateData)
# Create template in database using model_dump for proper serialization
createdTemplate = self.db.recordCreate(AutomationTemplate, validatedTemplate.model_dump())
return createdTemplate
except Exception as e:
logger.error(f"Error creating automation template: {str(e)}")
raise
def updateAutomationTemplate(self, templateId: str, templateData: Dict[str, Any], isSysAdmin: bool = False) -> Dict[str, Any]:
"""Updates an automation template.
System templates can only be updated by SysAdmin.
"""
try:
# Check access
existing = self.getAutomationTemplate(templateId)
if not existing:
raise PermissionError(f"No access to template {templateId}")
# System template protection
if existing.get("isSystem") and not isSysAdmin:
raise PermissionError("Only SysAdmin can modify system templates")
if not isSysAdmin and not self.checkRbacPermission(AutomationTemplate, "update", templateId):
raise PermissionError(f"No permission to modify template {templateId}")
# Prevent changing isSystem/featureInstanceId
templateData.pop("isSystem", None)
templateData.pop("featureInstanceId", None)
# Convert template field to string if it's a dict (frontend may send parsed JSON)
if "template" in templateData and isinstance(templateData["template"], dict):
import json
templateData["template"] = json.dumps(templateData["template"])
# Merge existing data with update data for partial updates
mergedData = {**existing, **templateData}
mergedData["id"] = templateId # Ensure ID is preserved
# Validate through Pydantic model to ensure proper type conversion
validatedTemplate = AutomationTemplate(**mergedData)
# Update template in database using model_dump for proper serialization
updatedTemplate = self.db.recordModify(AutomationTemplate, templateId, validatedTemplate.model_dump())
return updatedTemplate
except Exception as e:
logger.error(f"Error updating automation template: {str(e)}")
raise
def deleteAutomationTemplate(self, templateId: str, isSysAdmin: bool = False) -> bool:
"""Deletes an automation template.
System templates can only be deleted by SysAdmin.
"""
try:
# Check access
existing = self.getAutomationTemplate(templateId)
if not existing:
return False
# System template protection
if existing.get("isSystem") and not isSysAdmin:
raise PermissionError("Only SysAdmin can delete system templates")
if not isSysAdmin and not self.checkRbacPermission(AutomationTemplate, "delete", templateId):
raise PermissionError(f"No permission to delete template {templateId}")
# Delete template from database
self.db.recordDelete(AutomationTemplate, templateId)
return True
except Exception as e:
logger.error(f"Error deleting automation template: {str(e)}")
raise
def duplicateAutomationTemplate(self, templateId: str) -> Dict[str, Any]:
"""Duplicates a template into the current feature instance.
Creates a copy with new ID, isSystem=False, featureInstanceId from context.
Works for both system and instance templates.
"""
try:
existing = self.getAutomationTemplate(templateId)
if not existing:
raise PermissionError(f"Template {templateId} not found")
# RBAC check for creating templates
if not self.checkRbacPermission(AutomationTemplate, "create"):
raise PermissionError("No permission to create templates")
# Build duplicate data
duplicateData = {
"id": str(uuid.uuid4()),
"label": existing.get("label", {}),
"overview": existing.get("overview"),
"template": existing.get("template", ""),
"isSystem": False,
"featureInstanceId": self.featureInstanceId,
}
# Append "(Kopie)" to label
label = duplicateData["label"]
if isinstance(label, dict):
for lang in label:
if label[lang]:
label[lang] = f"{label[lang]} (Kopie)"
# Ensure database connector has correct userId context
if self.userId and hasattr(self.db, 'updateContext'):
self.db.updateContext(self.userId)
validatedTemplate = AutomationTemplate(**duplicateData)
createdTemplate = self.db.recordCreate(AutomationTemplate, validatedTemplate.model_dump())
logger.info(f"Duplicated template {templateId} -> {duplicateData['id']}")
return createdTemplate
except Exception as e:
logger.error(f"Error duplicating template: {str(e)}")
raise
def duplicateAutomationDefinition(self, definitionId: str) -> Dict[str, Any]:
"""Duplicates an automation definition within the same feature instance.
Creates a copy with new ID, active=False, no eventId.
"""
try:
existing = self.getAutomationDefinition(definitionId)
if not existing:
raise PermissionError(f"Definition {definitionId} not found")
# RBAC check for creating definitions
if not self.checkRbacPermission(AutomationDefinition, "create"):
raise PermissionError("No permission to create definitions")
# Build duplicate data
duplicateData = {
"id": str(uuid.uuid4()),
"mandateId": existing.get("mandateId"),
"featureInstanceId": existing.get("featureInstanceId"),
"label": f"{existing.get('label', '')} (Kopie)",
"schedule": existing.get("schedule", ""),
"template": existing.get("template", ""),
"placeholders": existing.get("placeholders", {}),
"active": False,
"eventId": None,
"status": None,
"executionLogs": [],
"allowedProviders": existing.get("allowedProviders", []),
}
# Ensure database connector has correct userId context
if self.userId and hasattr(self.db, 'updateContext'):
self.db.updateContext(self.userId)
validatedDefinition = AutomationDefinition(**duplicateData)
createdDefinition = self.db.recordCreate(AutomationDefinition, validatedDefinition.model_dump())
logger.info(f"Duplicated definition {definitionId} -> {duplicateData['id']}")
return createdDefinition
except Exception as e:
logger.error(f"Error duplicating definition: {str(e)}")
raise
def _notifyAutomationChanged(self):
"""Notify registered callbacks about automation changes (decoupled from features).
Sync-safe: works from both sync and async contexts."""
try:
from modules.shared.callbackRegistry import callbackRegistry
# Trigger callbacks without knowing which features are listening
callbackRegistry.trigger('automation.changed', self)
except Exception as e:
logger.error(f"Error notifying automation change: {str(e)}")
def getInterface(currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> 'AutomationObjects':
"""
Returns an AutomationObjects instance for the current user.
Handles initialization of database and records.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header).
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header).
"""
if not currentUser:
raise ValueError("Invalid user context: user is required")
effectiveMandateId = str(mandateId) if mandateId else None
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
# Create context key including featureInstanceId for proper isolation
contextKey = f"automation_{effectiveMandateId}_{effectiveFeatureInstanceId}_{currentUser.id}"
# Create new instance if not exists
if contextKey not in _automationInterfaces:
_automationInterfaces[contextKey] = AutomationObjects(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
else:
# Update user context if needed
_automationInterfaces[contextKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
return _automationInterfaces[contextKey]