225 lines
8.2 KiB
Python
225 lines
8.2 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Neutralizer Feature Container - Main Module.
|
|
Handles feature initialization and RBAC catalog registration.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Feature metadata
|
|
FEATURE_CODE = "neutralization"
|
|
FEATURE_LABEL = {"en": "Neutralization", "de": "Neutralisierung", "fr": "Neutralisation"}
|
|
FEATURE_ICON = "mdi-shield-check"
|
|
|
|
# UI Objects for RBAC catalog
|
|
UI_OBJECTS = [
|
|
{
|
|
"objectKey": "ui.feature.neutralization.playground",
|
|
"label": {"en": "Playground", "de": "Spielwiese", "fr": "Bac à sable"},
|
|
"meta": {"area": "playground"}
|
|
}
|
|
]
|
|
|
|
# Resource Objects for RBAC catalog
|
|
RESOURCE_OBJECTS = [
|
|
{
|
|
"objectKey": "resource.feature.neutralization.process.text",
|
|
"label": {"en": "Process Text", "de": "Text verarbeiten", "fr": "Traiter texte"},
|
|
"meta": {"endpoint": "/api/neutralization/process/text", "method": "POST"}
|
|
},
|
|
{
|
|
"objectKey": "resource.feature.neutralization.process.files",
|
|
"label": {"en": "Process Files", "de": "Dateien verarbeiten", "fr": "Traiter fichiers"},
|
|
"meta": {"endpoint": "/api/neutralization/process/files", "method": "POST"}
|
|
},
|
|
{
|
|
"objectKey": "resource.feature.neutralization.config.update",
|
|
"label": {"en": "Update Config", "de": "Konfiguration aktualisieren", "fr": "Mettre à jour config"},
|
|
"meta": {"endpoint": "/api/neutralization/config", "method": "PUT"}
|
|
},
|
|
]
|
|
|
|
# Template roles for this feature
|
|
TEMPLATE_ROLES = [
|
|
{
|
|
"roleLabel": "neutralization-admin",
|
|
"description": {
|
|
"en": "Neutralization Administrator - Full access to neutralization settings and data",
|
|
"de": "Neutralisierungs-Administrator - Vollzugriff auf Neutralisierungs-Einstellungen und Daten",
|
|
"fr": "Administrateur neutralisation - Accès complet aux paramètres et données"
|
|
},
|
|
"accessRules": [
|
|
# Full UI access (all views including admin views)
|
|
{"context": "UI", "item": None, "view": True},
|
|
# Full DATA access
|
|
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
|
|
]
|
|
},
|
|
{
|
|
"roleLabel": "neutralization-analyst",
|
|
"description": {
|
|
"en": "Neutralization Analyst - Analyze and process neutralization data",
|
|
"de": "Neutralisierungs-Analyst - Neutralisierungsdaten analysieren und verarbeiten",
|
|
"fr": "Analyste neutralisation - Analyser et traiter les données de neutralisation"
|
|
},
|
|
"accessRules": [
|
|
# UI access to specific views - vollqualifizierte ObjectKeys
|
|
{"context": "UI", "item": "ui.feature.neutralization.playground", "view": True},
|
|
{"context": "UI", "item": "ui.feature.neutralization.attributes", "view": True},
|
|
# Group-level DATA access (read-only for sensitive config)
|
|
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "n", "update": "n", "delete": "n"},
|
|
]
|
|
},
|
|
]
|
|
|
|
|
|
def getFeatureDefinition() -> Dict[str, Any]:
|
|
"""Return the feature definition for registration."""
|
|
return {
|
|
"code": FEATURE_CODE,
|
|
"label": FEATURE_LABEL,
|
|
"icon": FEATURE_ICON
|
|
}
|
|
|
|
|
|
def getUiObjects() -> List[Dict[str, Any]]:
|
|
"""Return UI objects for RBAC catalog registration."""
|
|
return UI_OBJECTS
|
|
|
|
|
|
def getResourceObjects() -> List[Dict[str, Any]]:
|
|
"""Return resource objects for RBAC catalog registration."""
|
|
return RESOURCE_OBJECTS
|
|
|
|
|
|
def getTemplateRoles() -> List[Dict[str, Any]]:
|
|
"""Return template roles for this feature."""
|
|
return TEMPLATE_ROLES
|
|
|
|
|
|
def registerFeature(catalogService) -> bool:
|
|
"""Register this feature's RBAC objects in the catalog."""
|
|
try:
|
|
for uiObj in UI_OBJECTS:
|
|
catalogService.registerUiObject(
|
|
featureCode=FEATURE_CODE,
|
|
objectKey=uiObj["objectKey"],
|
|
label=uiObj["label"],
|
|
meta=uiObj.get("meta")
|
|
)
|
|
|
|
for resObj in RESOURCE_OBJECTS:
|
|
catalogService.registerResourceObject(
|
|
featureCode=FEATURE_CODE,
|
|
objectKey=resObj["objectKey"],
|
|
label=resObj["label"],
|
|
meta=resObj.get("meta")
|
|
)
|
|
|
|
# Sync template roles to database
|
|
_syncTemplateRolesToDb()
|
|
|
|
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
|
|
return False
|
|
|
|
|
|
def _syncTemplateRolesToDb() -> int:
|
|
"""
|
|
Sync template roles and their AccessRules to the database.
|
|
Creates global template roles (mandateId=None) if they don't exist.
|
|
|
|
Returns:
|
|
Number of roles created
|
|
"""
|
|
try:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
|
|
|
|
rootInterface = getRootInterface()
|
|
|
|
existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE)
|
|
templateRoles = [r for r in existingRoles if r.mandateId is None]
|
|
existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles}
|
|
|
|
createdCount = 0
|
|
for roleTemplate in TEMPLATE_ROLES:
|
|
roleLabel = roleTemplate["roleLabel"]
|
|
|
|
if roleLabel in existingRoleLabels:
|
|
roleId = existingRoleLabels[roleLabel]
|
|
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
|
|
else:
|
|
newRole = Role(
|
|
roleLabel=roleLabel,
|
|
description=roleTemplate.get("description", {}),
|
|
featureCode=FEATURE_CODE,
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
isSystemRole=False
|
|
)
|
|
createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump())
|
|
roleId = createdRole.get("id")
|
|
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
|
|
logger.info(f"Created template role '{roleLabel}' with ID {roleId}")
|
|
createdCount += 1
|
|
|
|
if createdCount > 0:
|
|
logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles")
|
|
|
|
return createdCount
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}")
|
|
return 0
|
|
|
|
|
|
def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
|
|
"""Ensure AccessRules exist for a role based on templates."""
|
|
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
|
|
|
|
existingRules = rootInterface.getAccessRulesByRole(roleId)
|
|
existingSignatures = set()
|
|
for rule in existingRules:
|
|
sig = (rule.context.value if rule.context else None, rule.item)
|
|
existingSignatures.add(sig)
|
|
|
|
createdCount = 0
|
|
for template in ruleTemplates:
|
|
context = template.get("context", "UI")
|
|
item = template.get("item")
|
|
sig = (context, item)
|
|
|
|
if sig in existingSignatures:
|
|
continue
|
|
|
|
if context == "UI":
|
|
contextEnum = AccessRuleContext.UI
|
|
elif context == "DATA":
|
|
contextEnum = AccessRuleContext.DATA
|
|
elif context == "RESOURCE":
|
|
contextEnum = AccessRuleContext.RESOURCE
|
|
else:
|
|
contextEnum = context
|
|
|
|
newRule = AccessRule(
|
|
roleId=roleId,
|
|
context=contextEnum,
|
|
item=item,
|
|
view=template.get("view", False),
|
|
read=template.get("read"),
|
|
create=template.get("create"),
|
|
update=template.get("update"),
|
|
delete=template.get("delete"),
|
|
)
|
|
rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
|
|
createdCount += 1
|
|
|
|
return createdCount
|