440 lines
18 KiB
Python
440 lines
18 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Trustee Feature Container - Main Module.
|
|
Handles feature initialization and RBAC catalog registration.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Feature metadata
|
|
FEATURE_CODE = "trustee"
|
|
FEATURE_LABEL = {"en": "Trustee", "de": "Treuhand", "fr": "Fiduciaire"}
|
|
FEATURE_ICON = "mdi-briefcase"
|
|
|
|
# UI Objects for RBAC catalog
|
|
# Note: organisations and contracts removed - feature instance = organisation
|
|
UI_OBJECTS = [
|
|
{
|
|
"objectKey": "ui.feature.trustee.dashboard",
|
|
"label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"},
|
|
"meta": {"area": "dashboard"}
|
|
},
|
|
{
|
|
"objectKey": "ui.feature.trustee.positions",
|
|
"label": {"en": "Positions", "de": "Positionen", "fr": "Positions"},
|
|
"meta": {"area": "positions"}
|
|
},
|
|
{
|
|
"objectKey": "ui.feature.trustee.documents",
|
|
"label": {"en": "Documents", "de": "Dokumente", "fr": "Documents"},
|
|
"meta": {"area": "documents"}
|
|
},
|
|
{
|
|
"objectKey": "ui.feature.trustee.position-documents",
|
|
"label": {"en": "Position Documents", "de": "Positions-Dokumente", "fr": "Documents de position"},
|
|
"meta": {"area": "position-documents"}
|
|
},
|
|
{
|
|
"objectKey": "ui.feature.trustee.expense-import",
|
|
"label": {"en": "Expense Import", "de": "Spesen Import", "fr": "Import de dépenses"},
|
|
"meta": {"area": "expense-import"}
|
|
},
|
|
{
|
|
"objectKey": "ui.feature.trustee.instance-roles",
|
|
"label": {"en": "Instance Roles & Permissions", "de": "Instanz-Rollen & Berechtigungen", "fr": "Rôles et permissions d'instance"},
|
|
"meta": {"area": "admin", "admin_only": True}
|
|
},
|
|
]
|
|
|
|
# DATA Objects for RBAC catalog (tables/entities)
|
|
# Used for AccessRules on data-level permissions
|
|
DATA_OBJECTS = [
|
|
{
|
|
"objectKey": "data.feature.trustee.TrusteePosition",
|
|
"label": {"en": "Position", "de": "Position", "fr": "Position"},
|
|
"meta": {"table": "TrusteePosition", "fields": ["id", "label", "description", "organisationId"]}
|
|
},
|
|
{
|
|
"objectKey": "data.feature.trustee.TrusteeDocument",
|
|
"label": {"en": "Document", "de": "Dokument", "fr": "Document"},
|
|
"meta": {"table": "TrusteeDocument", "fields": ["id", "filename", "mimeType", "fileSize", "uploadDate"]}
|
|
},
|
|
{
|
|
"objectKey": "data.feature.trustee.TrusteePositionDocument",
|
|
"label": {"en": "Position-Document Assignment", "de": "Position-Dokument Zuordnung", "fr": "Assignation Position-Document"},
|
|
"meta": {"table": "TrusteePositionDocument", "fields": ["id", "positionId", "documentId"]}
|
|
},
|
|
{
|
|
"objectKey": "data.feature.trustee.*",
|
|
"label": {"en": "All Trustee Data", "de": "Alle Treuhand-Daten", "fr": "Toutes les données fiduciaires"},
|
|
"meta": {"wildcard": True, "description": "Wildcard for all trustee data tables"}
|
|
},
|
|
]
|
|
|
|
# Resource Objects for RBAC catalog
|
|
# Note: organisations and contracts removed - feature instance = organisation
|
|
RESOURCE_OBJECTS = [
|
|
{
|
|
"objectKey": "resource.feature.trustee.documents.create",
|
|
"label": {"en": "Upload Document", "de": "Dokument hochladen", "fr": "Télécharger document"},
|
|
"meta": {"endpoint": "/api/trustee/{instanceId}/documents", "method": "POST"}
|
|
},
|
|
{
|
|
"objectKey": "resource.feature.trustee.documents.update",
|
|
"label": {"en": "Update Document", "de": "Dokument aktualisieren", "fr": "Modifier document"},
|
|
"meta": {"endpoint": "/api/trustee/{instanceId}/documents/{documentId}", "method": "PUT"}
|
|
},
|
|
{
|
|
"objectKey": "resource.feature.trustee.documents.delete",
|
|
"label": {"en": "Delete Document", "de": "Dokument löschen", "fr": "Supprimer document"},
|
|
"meta": {"endpoint": "/api/trustee/{instanceId}/documents/{documentId}", "method": "DELETE"}
|
|
},
|
|
{
|
|
"objectKey": "resource.feature.trustee.positions.create",
|
|
"label": {"en": "Create Position", "de": "Position erstellen", "fr": "Créer position"},
|
|
"meta": {"endpoint": "/api/trustee/{instanceId}/positions", "method": "POST"}
|
|
},
|
|
{
|
|
"objectKey": "resource.feature.trustee.positions.update",
|
|
"label": {"en": "Update Position", "de": "Position aktualisieren", "fr": "Modifier position"},
|
|
"meta": {"endpoint": "/api/trustee/{instanceId}/positions/{positionId}", "method": "PUT"}
|
|
},
|
|
{
|
|
"objectKey": "resource.feature.trustee.positions.delete",
|
|
"label": {"en": "Delete Position", "de": "Position löschen", "fr": "Supprimer position"},
|
|
"meta": {"endpoint": "/api/trustee/{instanceId}/positions/{positionId}", "method": "DELETE"}
|
|
},
|
|
{
|
|
"objectKey": "resource.feature.trustee.instance-roles.manage",
|
|
"label": {"en": "Manage Instance Roles", "de": "Instanz-Rollen verwalten", "fr": "Gérer les rôles d'instance"},
|
|
"meta": {"endpoint": "/api/trustee/{instanceId}/instance-roles", "method": "ALL", "admin_only": True}
|
|
},
|
|
]
|
|
|
|
# Template roles for this feature with AccessRules
|
|
# Each role defines default UI and DATA permissions
|
|
# Note: UI item=None means ALL views, specific items restrict to named views
|
|
# IMPORTANT: item uses vollqualifizierte ObjectKeys (gemäss Navigation-API-Konzept)
|
|
TEMPLATE_ROLES = [
|
|
{
|
|
"roleLabel": "trustee-admin",
|
|
"description": {
|
|
"en": "Trustee Administrator - Full access to all trustee data and settings",
|
|
"de": "Treuhand-Administrator - Vollzugriff auf alle Treuhand-Daten und Einstellungen",
|
|
"fr": "Administrateur fiduciaire - Accès complet aux données et paramètres fiduciaires"
|
|
},
|
|
"accessRules": [
|
|
# Full UI access (all views including admin views)
|
|
{"context": "UI", "item": None, "view": True},
|
|
# Full DATA access
|
|
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
|
|
# Admin resource: manage instance roles
|
|
{"context": "RESOURCE", "item": "resource.feature.trustee.instance-roles.manage", "view": True},
|
|
]
|
|
},
|
|
{
|
|
"roleLabel": "trustee-accountant",
|
|
"description": {
|
|
"en": "Trustee Accountant - Manage accounting and financial data",
|
|
"de": "Treuhand-Buchhalter - Buchhaltungs- und Finanzdaten verwalten",
|
|
"fr": "Comptable fiduciaire - Gérer les données comptables et financières"
|
|
},
|
|
"accessRules": [
|
|
# UI access to main views (not admin views) - vollqualifizierte ObjectKeys
|
|
{"context": "UI", "item": "ui.feature.trustee.dashboard", "view": True},
|
|
{"context": "UI", "item": "ui.feature.trustee.positions", "view": True},
|
|
{"context": "UI", "item": "ui.feature.trustee.documents", "view": True},
|
|
{"context": "UI", "item": "ui.feature.trustee.position-documents", "view": True},
|
|
{"context": "UI", "item": "ui.feature.trustee.expense-import", "view": True},
|
|
# Group-level DATA access
|
|
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "g"},
|
|
]
|
|
},
|
|
{
|
|
"roleLabel": "trustee-client",
|
|
"description": {
|
|
"en": "Trustee Client - View own accounting data and documents",
|
|
"de": "Treuhand-Kunde - Eigene Buchhaltungsdaten und Dokumente einsehen",
|
|
"fr": "Client fiduciaire - Consulter ses propres données comptables et documents"
|
|
},
|
|
"accessRules": [
|
|
# UI access to main views only (read-only focus) - vollqualifizierte ObjectKeys
|
|
{"context": "UI", "item": "ui.feature.trustee.dashboard", "view": True},
|
|
{"context": "UI", "item": "ui.feature.trustee.positions", "view": True},
|
|
{"context": "UI", "item": "ui.feature.trustee.documents", "view": True},
|
|
{"context": "UI", "item": "ui.feature.trustee.position-documents", "view": True},
|
|
# Own records only (MY level) - explizite Regeln pro Tabelle
|
|
{"context": "DATA", "item": "data.feature.trustee.TrusteePosition", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
|
|
{"context": "DATA", "item": "data.feature.trustee.TrusteeDocument", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
|
|
{"context": "DATA", "item": "data.feature.trustee.TrusteePositionDocument", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
|
|
]
|
|
},
|
|
]
|
|
|
|
|
|
def getFeatureDefinition() -> Dict[str, Any]:
|
|
"""Return the feature definition for registration."""
|
|
return {
|
|
"code": FEATURE_CODE,
|
|
"label": FEATURE_LABEL,
|
|
"icon": FEATURE_ICON
|
|
}
|
|
|
|
|
|
def getUiObjects() -> List[Dict[str, Any]]:
|
|
"""Return UI objects for RBAC catalog registration."""
|
|
return UI_OBJECTS
|
|
|
|
|
|
def getResourceObjects() -> List[Dict[str, Any]]:
|
|
"""Return resource objects for RBAC catalog registration."""
|
|
return RESOURCE_OBJECTS
|
|
|
|
|
|
def getTemplateRoles() -> List[Dict[str, Any]]:
|
|
"""Return template roles for this feature."""
|
|
return TEMPLATE_ROLES
|
|
|
|
|
|
def getDataObjects() -> List[Dict[str, Any]]:
|
|
"""Return DATA objects for RBAC catalog registration."""
|
|
return DATA_OBJECTS
|
|
|
|
|
|
def registerFeature(catalogService) -> bool:
|
|
"""
|
|
Register this feature's RBAC objects in the catalog.
|
|
|
|
Args:
|
|
catalogService: The RBAC catalog service instance
|
|
|
|
Returns:
|
|
True if registration was successful
|
|
"""
|
|
try:
|
|
# Register UI objects
|
|
for uiObj in UI_OBJECTS:
|
|
catalogService.registerUiObject(
|
|
featureCode=FEATURE_CODE,
|
|
objectKey=uiObj["objectKey"],
|
|
label=uiObj["label"],
|
|
meta=uiObj.get("meta")
|
|
)
|
|
|
|
# Register Resource objects
|
|
for resObj in RESOURCE_OBJECTS:
|
|
catalogService.registerResourceObject(
|
|
featureCode=FEATURE_CODE,
|
|
objectKey=resObj["objectKey"],
|
|
label=resObj["label"],
|
|
meta=resObj.get("meta")
|
|
)
|
|
|
|
# Register DATA objects (tables/entities)
|
|
for dataObj in DATA_OBJECTS:
|
|
catalogService.registerDataObject(
|
|
featureCode=FEATURE_CODE,
|
|
objectKey=dataObj["objectKey"],
|
|
label=dataObj["label"],
|
|
meta=dataObj.get("meta")
|
|
)
|
|
|
|
# Sync template roles to database (with AccessRules)
|
|
_syncTemplateRolesToDb()
|
|
|
|
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI, {len(RESOURCE_OBJECTS)} resource, {len(DATA_OBJECTS)} data objects")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
|
|
return False
|
|
|
|
|
|
def _syncTemplateRolesToDb() -> int:
|
|
"""
|
|
Sync template roles and their AccessRules to the database.
|
|
Creates global template roles (mandateId=None) if they don't exist.
|
|
|
|
Returns:
|
|
Number of roles created/updated
|
|
"""
|
|
try:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
|
|
|
|
rootInterface = getRootInterface()
|
|
db = rootInterface.db
|
|
|
|
# Get existing template roles for this feature
|
|
existingRoles = db.getRecordset(
|
|
Role,
|
|
recordFilter={"featureCode": FEATURE_CODE, "mandateId": None}
|
|
)
|
|
existingRoleLabels = {r.get("roleLabel"): r.get("id") for r in existingRoles}
|
|
|
|
createdCount = 0
|
|
for roleTemplate in TEMPLATE_ROLES:
|
|
roleLabel = roleTemplate["roleLabel"]
|
|
|
|
if roleLabel in existingRoleLabels:
|
|
roleId = existingRoleLabels[roleLabel]
|
|
logger.debug(f"Template role '{roleLabel}' already exists with ID {roleId}")
|
|
|
|
# Ensure AccessRules exist for this role
|
|
_ensureAccessRulesForRole(db, roleId, roleTemplate.get("accessRules", []))
|
|
else:
|
|
# Create new template role
|
|
newRole = Role(
|
|
roleLabel=roleLabel,
|
|
description=roleTemplate.get("description", {}),
|
|
featureCode=FEATURE_CODE,
|
|
mandateId=None, # Global template
|
|
featureInstanceId=None,
|
|
isSystemRole=False
|
|
)
|
|
createdRole = db.recordCreate(Role, newRole.model_dump())
|
|
roleId = createdRole.get("id")
|
|
|
|
# Create AccessRules for this role
|
|
_ensureAccessRulesForRole(db, roleId, roleTemplate.get("accessRules", []))
|
|
|
|
logger.info(f"Created template role '{roleLabel}' with ID {roleId}")
|
|
createdCount += 1
|
|
|
|
if createdCount > 0:
|
|
logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles")
|
|
|
|
# Repair instance-specific roles that are missing AccessRules
|
|
_repairInstanceRolesAccessRules(db, existingRoleLabels)
|
|
|
|
return createdCount
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}")
|
|
return 0
|
|
|
|
|
|
def _repairInstanceRolesAccessRules(db, templateRoleLabels: Dict[str, str]) -> int:
|
|
"""
|
|
Repair instance-specific roles by copying AccessRules from their template roles.
|
|
This ensures instance roles created before AccessRules were defined get updated.
|
|
|
|
Args:
|
|
db: Database connector
|
|
templateRoleLabels: Dict mapping roleLabel to template role ID
|
|
|
|
Returns:
|
|
Number of instance roles repaired
|
|
"""
|
|
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
|
|
|
|
repairedCount = 0
|
|
|
|
# Get all instance-specific roles for this feature (mandateId is NOT None)
|
|
allRoles = db.getRecordset(Role, recordFilter={"featureCode": FEATURE_CODE})
|
|
instanceRoles = [r for r in allRoles if r.get("mandateId") is not None]
|
|
|
|
for instanceRole in instanceRoles:
|
|
roleLabel = instanceRole.get("roleLabel")
|
|
instanceRoleId = instanceRole.get("id")
|
|
|
|
# Find matching template role
|
|
templateRoleId = templateRoleLabels.get(roleLabel)
|
|
if not templateRoleId:
|
|
continue
|
|
|
|
# Check if instance role has AccessRules
|
|
existingRules = db.getRecordset(AccessRule, recordFilter={"roleId": instanceRoleId})
|
|
if existingRules:
|
|
continue # Already has rules, skip
|
|
|
|
# Copy AccessRules from template role
|
|
templateRules = db.getRecordset(AccessRule, recordFilter={"roleId": templateRoleId})
|
|
if not templateRules:
|
|
continue # Template has no rules
|
|
|
|
for rule in templateRules:
|
|
newRule = AccessRule(
|
|
roleId=instanceRoleId,
|
|
context=rule.get("context"),
|
|
item=rule.get("item"),
|
|
view=rule.get("view", False),
|
|
read=rule.get("read"),
|
|
create=rule.get("create"),
|
|
update=rule.get("update"),
|
|
delete=rule.get("delete"),
|
|
)
|
|
db.recordCreate(AccessRule, newRule.model_dump())
|
|
|
|
logger.info(f"Repaired instance role '{roleLabel}' (ID: {instanceRoleId}): copied {len(templateRules)} AccessRules from template")
|
|
repairedCount += 1
|
|
|
|
if repairedCount > 0:
|
|
logger.info(f"Feature '{FEATURE_CODE}': Repaired {repairedCount} instance roles with missing AccessRules")
|
|
|
|
return repairedCount
|
|
|
|
|
|
def _ensureAccessRulesForRole(db, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
|
|
"""
|
|
Ensure AccessRules exist for a role based on templates.
|
|
|
|
Args:
|
|
db: Database connector
|
|
roleId: Role ID
|
|
ruleTemplates: List of rule templates
|
|
|
|
Returns:
|
|
Number of rules created
|
|
"""
|
|
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
|
|
|
|
# Get existing rules for this role
|
|
existingRules = db.getRecordset(AccessRule, recordFilter={"roleId": roleId})
|
|
|
|
# Create a set of existing rule signatures to avoid duplicates
|
|
existingSignatures = set()
|
|
for rule in existingRules:
|
|
sig = (rule.get("context"), rule.get("item"))
|
|
existingSignatures.add(sig)
|
|
|
|
createdCount = 0
|
|
for template in ruleTemplates:
|
|
context = template.get("context", "UI")
|
|
item = template.get("item")
|
|
sig = (context, item)
|
|
|
|
if sig in existingSignatures:
|
|
continue
|
|
|
|
# Map context string to enum
|
|
if context == "UI":
|
|
contextEnum = AccessRuleContext.UI
|
|
elif context == "DATA":
|
|
contextEnum = AccessRuleContext.DATA
|
|
elif context == "RESOURCE":
|
|
contextEnum = AccessRuleContext.RESOURCE
|
|
else:
|
|
contextEnum = context
|
|
|
|
newRule = AccessRule(
|
|
roleId=roleId,
|
|
context=contextEnum,
|
|
item=item,
|
|
view=template.get("view", False),
|
|
read=template.get("read"),
|
|
create=template.get("create"),
|
|
update=template.get("update"),
|
|
delete=template.get("delete"),
|
|
)
|
|
db.recordCreate(AccessRule, newRule.model_dump())
|
|
createdCount += 1
|
|
|
|
if createdCount > 0:
|
|
logger.debug(f"Created {createdCount} AccessRules for role {roleId}")
|
|
|
|
return createdCount
|