gateway/modules/features/neutralization/mainNeutralization.py
2026-04-10 12:33:27 +02:00

231 lines
8.2 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Neutralizer Feature Container - Main Module.
Handles feature initialization and RBAC catalog registration.
"""
import logging
from typing import Dict, List, Any
logger = logging.getLogger(__name__)
# Feature metadata
FEATURE_CODE = "neutralization"
FEATURE_LABEL = "Neutralisierung"
FEATURE_ICON = "mdi-shield-check"
# UI Objects for RBAC catalog
UI_OBJECTS = [
{
"objectKey": "ui.feature.neutralization.playground",
"label": "Spielwiese",
"meta": {"area": "playground"}
}
]
# Resource Objects for RBAC catalog
RESOURCE_OBJECTS = [
{
"objectKey": "resource.feature.neutralization.process.text",
"label": "Text verarbeiten",
"meta": {"endpoint": "/api/neutralization/process/text", "method": "POST"}
},
{
"objectKey": "resource.feature.neutralization.process.files",
"label": "Dateien verarbeiten",
"meta": {"endpoint": "/api/neutralization/process/files", "method": "POST"}
},
{
"objectKey": "resource.feature.neutralization.config.update",
"label": "Konfiguration aktualisieren",
"meta": {"endpoint": "/api/neutralization/config", "method": "PUT"}
},
]
# Template roles for this feature
TEMPLATE_ROLES = [
{
"roleLabel": "neutralization-viewer",
"description": "Neutralisierungs-Betrachter - Neutralisierungsdaten einsehen (nur lesen)",
"accessRules": [
{"context": "UI", "item": "ui.feature.neutralization.playground", "view": True},
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
],
},
{
"roleLabel": "neutralization-user",
"description": "Neutralisierungs-Benutzer - Neutralisierungstools nutzen und eigene Daten verwalten",
"accessRules": [
{"context": "UI", "item": "ui.feature.neutralization.playground", "view": True},
{"context": "UI", "item": "ui.feature.neutralization.attributes", "view": True},
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
],
},
{
"roleLabel": "neutralization-admin",
"description": "Neutralisierungs-Administrator - Vollzugriff auf Neutralisierungs-Einstellungen und Daten",
"accessRules": [
{"context": "UI", "item": None, "view": True},
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
],
},
{
"roleLabel": "neutralization-analyst",
"description": "Neutralisierungs-Analyst - Neutralisierungsdaten analysieren und verarbeiten",
"accessRules": [
{"context": "UI", "item": "ui.feature.neutralization.playground", "view": True},
{"context": "UI", "item": "ui.feature.neutralization.attributes", "view": True},
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "n", "update": "n", "delete": "n"},
],
},
]
def getFeatureDefinition() -> Dict[str, Any]:
"""Return the feature definition for registration."""
return {
"code": FEATURE_CODE,
"label": FEATURE_LABEL,
"icon": FEATURE_ICON
}
def getUiObjects() -> List[Dict[str, Any]]:
"""Return UI objects for RBAC catalog registration."""
return UI_OBJECTS
def getResourceObjects() -> List[Dict[str, Any]]:
"""Return resource objects for RBAC catalog registration."""
return RESOURCE_OBJECTS
def getTemplateRoles() -> List[Dict[str, Any]]:
"""Return template roles for this feature."""
return TEMPLATE_ROLES
def registerFeature(catalogService) -> bool:
"""Register this feature's RBAC objects in the catalog."""
try:
for uiObj in UI_OBJECTS:
catalogService.registerUiObject(
featureCode=FEATURE_CODE,
objectKey=uiObj["objectKey"],
label=uiObj["label"],
meta=uiObj.get("meta")
)
for resObj in RESOURCE_OBJECTS:
catalogService.registerResourceObject(
featureCode=FEATURE_CODE,
objectKey=resObj["objectKey"],
label=resObj["label"],
meta=resObj.get("meta")
)
# Sync template roles to database
_syncTemplateRolesToDb()
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects")
return True
except Exception as e:
logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
return False
def _syncTemplateRolesToDb() -> int:
"""
Sync template roles and their AccessRules to the database.
Creates global template roles (mandateId=None) if they don't exist.
Returns:
Number of roles created
"""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
from modules.datamodels.datamodelUtils import coerce_text_multilingual
rootInterface = getRootInterface()
existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE)
templateRoles = [r for r in existingRoles if r.mandateId is None]
existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles}
createdCount = 0
for roleTemplate in TEMPLATE_ROLES:
roleLabel = roleTemplate["roleLabel"]
if roleLabel in existingRoleLabels:
roleId = existingRoleLabels[roleLabel]
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
else:
newRole = Role(
roleLabel=roleLabel,
description=coerce_text_multilingual(roleTemplate.get("description", {})),
featureCode=FEATURE_CODE,
mandateId=None,
featureInstanceId=None,
isSystemRole=False
)
createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump())
roleId = createdRole.get("id")
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
logger.info(f"Created template role '{roleLabel}' with ID {roleId}")
createdCount += 1
if createdCount > 0:
logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles")
return createdCount
except Exception as e:
logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}")
return 0
def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
"""Ensure AccessRules exist for a role based on templates."""
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
existingRules = rootInterface.getAccessRulesByRole(roleId)
existingSignatures = set()
for rule in existingRules:
sig = (rule.context.value if rule.context else None, rule.item)
existingSignatures.add(sig)
createdCount = 0
for template in ruleTemplates:
context = template.get("context", "UI")
item = template.get("item")
sig = (context, item)
if sig in existingSignatures:
continue
if context == "UI":
contextEnum = AccessRuleContext.UI
elif context == "DATA":
contextEnum = AccessRuleContext.DATA
elif context == "RESOURCE":
contextEnum = AccessRuleContext.RESOURCE
else:
contextEnum = context
newRule = AccessRule(
roleId=roleId,
context=contextEnum,
item=item,
view=template.get("view", False),
read=template.get("read"),
create=template.get("create"),
update=template.get("update"),
delete=template.get("delete"),
)
rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
createdCount += 1
return createdCount