gateway/modules/features/trustee/mainTrustee.py
2026-01-24 02:06:49 +01:00

401 lines
15 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Trustee Feature Container - Main Module.
Handles feature initialization and RBAC catalog registration.
"""
import logging
from typing import Dict, List, Any
logger = logging.getLogger(__name__)
# Feature metadata
FEATURE_CODE = "trustee"
FEATURE_LABEL = {"en": "Trustee", "de": "Treuhand", "fr": "Fiduciaire"}
FEATURE_ICON = "mdi-briefcase"
# UI Objects for RBAC catalog
UI_OBJECTS = [
{
"objectKey": "ui.feature.trustee.organisations",
"label": {"en": "Organisations", "de": "Organisationen", "fr": "Organisations"},
"meta": {"area": "organisations"}
},
{
"objectKey": "ui.feature.trustee.contracts",
"label": {"en": "Contracts", "de": "Verträge", "fr": "Contrats"},
"meta": {"area": "contracts"}
},
{
"objectKey": "ui.feature.trustee.contracts.tab.documents",
"label": {"en": "Contract Documents", "de": "Vertragsdokumente", "fr": "Documents contractuels"},
"meta": {"area": "contracts", "element": "tab.documents"}
},
{
"objectKey": "ui.feature.trustee.contracts.tab.positions",
"label": {"en": "Contract Positions", "de": "Vertragspositionen", "fr": "Positions contractuelles"},
"meta": {"area": "contracts", "element": "tab.positions"}
},
{
"objectKey": "ui.feature.trustee.access",
"label": {"en": "Access Management", "de": "Zugriffsverwaltung", "fr": "Gestion des accès"},
"meta": {"area": "access"}
},
{
"objectKey": "ui.feature.trustee.roles",
"label": {"en": "Roles", "de": "Rollen", "fr": "Rôles"},
"meta": {"area": "roles"}
},
]
# Resource Objects for RBAC catalog
RESOURCE_OBJECTS = [
{
"objectKey": "resource.feature.trustee.organisations.create",
"label": {"en": "Create Organisation", "de": "Organisation erstellen", "fr": "Créer organisation"},
"meta": {"endpoint": "/api/trustee/{instanceId}/organisations", "method": "POST"}
},
{
"objectKey": "resource.feature.trustee.organisations.update",
"label": {"en": "Update Organisation", "de": "Organisation aktualisieren", "fr": "Modifier organisation"},
"meta": {"endpoint": "/api/trustee/{instanceId}/organisations/{orgId}", "method": "PUT"}
},
{
"objectKey": "resource.feature.trustee.organisations.delete",
"label": {"en": "Delete Organisation", "de": "Organisation löschen", "fr": "Supprimer organisation"},
"meta": {"endpoint": "/api/trustee/{instanceId}/organisations/{orgId}", "method": "DELETE"}
},
{
"objectKey": "resource.feature.trustee.contracts.create",
"label": {"en": "Create Contract", "de": "Vertrag erstellen", "fr": "Créer contrat"},
"meta": {"endpoint": "/api/trustee/{instanceId}/contracts", "method": "POST"}
},
{
"objectKey": "resource.feature.trustee.contracts.update",
"label": {"en": "Update Contract", "de": "Vertrag aktualisieren", "fr": "Modifier contrat"},
"meta": {"endpoint": "/api/trustee/{instanceId}/contracts/{contractId}", "method": "PUT"}
},
{
"objectKey": "resource.feature.trustee.contracts.delete",
"label": {"en": "Delete Contract", "de": "Vertrag löschen", "fr": "Supprimer contrat"},
"meta": {"endpoint": "/api/trustee/{instanceId}/contracts/{contractId}", "method": "DELETE"}
},
{
"objectKey": "resource.feature.trustee.documents.create",
"label": {"en": "Upload Document", "de": "Dokument hochladen", "fr": "Télécharger document"},
"meta": {"endpoint": "/api/trustee/{instanceId}/documents", "method": "POST"}
},
{
"objectKey": "resource.feature.trustee.documents.delete",
"label": {"en": "Delete Document", "de": "Dokument löschen", "fr": "Supprimer document"},
"meta": {"endpoint": "/api/trustee/{instanceId}/documents/{documentId}", "method": "DELETE"}
},
{
"objectKey": "resource.feature.trustee.positions.create",
"label": {"en": "Create Position", "de": "Position erstellen", "fr": "Créer position"},
"meta": {"endpoint": "/api/trustee/{instanceId}/positions", "method": "POST"}
},
{
"objectKey": "resource.feature.trustee.positions.delete",
"label": {"en": "Delete Position", "de": "Position löschen", "fr": "Supprimer position"},
"meta": {"endpoint": "/api/trustee/{instanceId}/positions/{positionId}", "method": "DELETE"}
},
]
# Template roles for this feature with AccessRules
# Each role defines default UI and DATA permissions
TEMPLATE_ROLES = [
{
"roleLabel": "trustee-admin",
"description": {
"en": "Trustee Administrator - Full access to all trustee data and settings",
"de": "Treuhand-Administrator - Vollzugriff auf alle Treuhand-Daten und Einstellungen",
"fr": "Administrateur fiduciaire - Accès complet aux données et paramètres fiduciaires"
},
"accessRules": [
# Full UI access
{"context": "UI", "item": None, "view": True},
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
]
},
{
"roleLabel": "trustee-accountant",
"description": {
"en": "Trustee Accountant - Manage accounting and financial data",
"de": "Treuhand-Buchhalter - Buchhaltungs- und Finanzdaten verwalten",
"fr": "Comptable fiduciaire - Gérer les données comptables et financières"
},
"accessRules": [
# Full UI access
{"context": "UI", "item": None, "view": True},
# Group-level DATA access
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "g"},
]
},
{
"roleLabel": "trustee-client",
"description": {
"en": "Trustee Client - View own accounting data and documents",
"de": "Treuhand-Kunde - Eigene Buchhaltungsdaten und Dokumente einsehen",
"fr": "Client fiduciaire - Consulter ses propres données comptables et documents"
},
"accessRules": [
# Full UI access
{"context": "UI", "item": None, "view": True},
# Own records only (MY level)
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
]
},
]
def getFeatureDefinition() -> Dict[str, Any]:
"""Return the feature definition for registration."""
return {
"code": FEATURE_CODE,
"label": FEATURE_LABEL,
"icon": FEATURE_ICON
}
def getUiObjects() -> List[Dict[str, Any]]:
"""Return UI objects for RBAC catalog registration."""
return UI_OBJECTS
def getResourceObjects() -> List[Dict[str, Any]]:
"""Return resource objects for RBAC catalog registration."""
return RESOURCE_OBJECTS
def getTemplateRoles() -> List[Dict[str, Any]]:
"""Return template roles for this feature."""
return TEMPLATE_ROLES
def registerFeature(catalogService) -> bool:
"""
Register this feature's RBAC objects in the catalog.
Args:
catalogService: The RBAC catalog service instance
Returns:
True if registration was successful
"""
try:
# Register UI objects
for uiObj in UI_OBJECTS:
catalogService.registerUiObject(
featureCode=FEATURE_CODE,
objectKey=uiObj["objectKey"],
label=uiObj["label"],
meta=uiObj.get("meta")
)
# Register Resource objects
for resObj in RESOURCE_OBJECTS:
catalogService.registerResourceObject(
featureCode=FEATURE_CODE,
objectKey=resObj["objectKey"],
label=resObj["label"],
meta=resObj.get("meta")
)
# Sync template roles to database (with AccessRules)
_syncTemplateRolesToDb()
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects")
return True
except Exception as e:
logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
return False
def _syncTemplateRolesToDb() -> int:
"""
Sync template roles and their AccessRules to the database.
Creates global template roles (mandateId=None) if they don't exist.
Returns:
Number of roles created/updated
"""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
rootInterface = getRootInterface()
db = rootInterface.db
# Get existing template roles for this feature
existingRoles = db.getRecordset(
Role,
recordFilter={"featureCode": FEATURE_CODE, "mandateId": None}
)
existingRoleLabels = {r.get("roleLabel"): r.get("id") for r in existingRoles}
createdCount = 0
for roleTemplate in TEMPLATE_ROLES:
roleLabel = roleTemplate["roleLabel"]
if roleLabel in existingRoleLabels:
roleId = existingRoleLabels[roleLabel]
logger.debug(f"Template role '{roleLabel}' already exists with ID {roleId}")
# Ensure AccessRules exist for this role
_ensureAccessRulesForRole(db, roleId, roleTemplate.get("accessRules", []))
else:
# Create new template role
newRole = Role(
roleLabel=roleLabel,
description=roleTemplate.get("description", {}),
featureCode=FEATURE_CODE,
mandateId=None, # Global template
featureInstanceId=None,
isSystemRole=False
)
createdRole = db.recordCreate(Role, newRole.model_dump())
roleId = createdRole.get("id")
# Create AccessRules for this role
_ensureAccessRulesForRole(db, roleId, roleTemplate.get("accessRules", []))
logger.info(f"Created template role '{roleLabel}' with ID {roleId}")
createdCount += 1
if createdCount > 0:
logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles")
# Repair instance-specific roles that are missing AccessRules
_repairInstanceRolesAccessRules(db, existingRoleLabels)
return createdCount
except Exception as e:
logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}")
return 0
def _repairInstanceRolesAccessRules(db, templateRoleLabels: Dict[str, str]) -> int:
"""
Repair instance-specific roles by copying AccessRules from their template roles.
This ensures instance roles created before AccessRules were defined get updated.
Args:
db: Database connector
templateRoleLabels: Dict mapping roleLabel to template role ID
Returns:
Number of instance roles repaired
"""
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
repairedCount = 0
# Get all instance-specific roles for this feature (mandateId is NOT None)
allRoles = db.getRecordset(Role, recordFilter={"featureCode": FEATURE_CODE})
instanceRoles = [r for r in allRoles if r.get("mandateId") is not None]
for instanceRole in instanceRoles:
roleLabel = instanceRole.get("roleLabel")
instanceRoleId = instanceRole.get("id")
# Find matching template role
templateRoleId = templateRoleLabels.get(roleLabel)
if not templateRoleId:
continue
# Check if instance role has AccessRules
existingRules = db.getRecordset(AccessRule, recordFilter={"roleId": instanceRoleId})
if existingRules:
continue # Already has rules, skip
# Copy AccessRules from template role
templateRules = db.getRecordset(AccessRule, recordFilter={"roleId": templateRoleId})
if not templateRules:
continue # Template has no rules
for rule in templateRules:
newRule = AccessRule(
roleId=instanceRoleId,
context=rule.get("context"),
item=rule.get("item"),
view=rule.get("view", False),
read=rule.get("read"),
create=rule.get("create"),
update=rule.get("update"),
delete=rule.get("delete"),
)
db.recordCreate(AccessRule, newRule.model_dump())
logger.info(f"Repaired instance role '{roleLabel}' (ID: {instanceRoleId}): copied {len(templateRules)} AccessRules from template")
repairedCount += 1
if repairedCount > 0:
logger.info(f"Feature '{FEATURE_CODE}': Repaired {repairedCount} instance roles with missing AccessRules")
return repairedCount
def _ensureAccessRulesForRole(db, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
"""
Ensure AccessRules exist for a role based on templates.
Args:
db: Database connector
roleId: Role ID
ruleTemplates: List of rule templates
Returns:
Number of rules created
"""
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
# Get existing rules for this role
existingRules = db.getRecordset(AccessRule, recordFilter={"roleId": roleId})
# Create a set of existing rule signatures to avoid duplicates
existingSignatures = set()
for rule in existingRules:
sig = (rule.get("context"), rule.get("item"))
existingSignatures.add(sig)
createdCount = 0
for template in ruleTemplates:
context = template.get("context", "UI")
item = template.get("item")
sig = (context, item)
if sig in existingSignatures:
continue
# Map context string to enum
if context == "UI":
contextEnum = AccessRuleContext.UI
elif context == "DATA":
contextEnum = AccessRuleContext.DATA
elif context == "RESOURCE":
contextEnum = AccessRuleContext.RESOURCE
else:
contextEnum = context
newRule = AccessRule(
roleId=roleId,
context=contextEnum,
item=item,
view=template.get("view", False),
read=template.get("read"),
create=template.get("create"),
update=template.get("update"),
delete=template.get("delete"),
)
db.recordCreate(AccessRule, newRule.model_dump())
createdCount += 1
if createdCount > 0:
logger.debug(f"Created {createdCount} AccessRules for role {roleId}")
return createdCount