# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ RBAC Catalog Service. Central registry for Feature RBAC objects (UI and RESOURCE). Feature-Container register their RBAC objects via mainXxx.py at startup. """ import logging from typing import Dict, List, Any, Optional from threading import Lock logger = logging.getLogger(__name__) class RbacCatalogService: """ Central RBAC Catalog for Feature UI and RESOURCE objects. Singleton service that stores all registered RBAC objects from feature containers. """ _instance = None _lock = Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._uiObjects: Dict[str, Dict[str, Any]] = {} self._resourceObjects: Dict[str, Dict[str, Any]] = {} self._dataObjects: Dict[str, Dict[str, Any]] = {} # DATA objects (tables/entities) self._featureDefinitions: Dict[str, Dict[str, Any]] = {} self._templateRoles: Dict[str, List[Dict[str, Any]]] = {} self._initialized = True logger.info("RBAC Catalog Service initialized") def registerUiObject(self, featureCode: str, objectKey: str, label: str, meta: Optional[Dict[str, Any]] = None) -> bool: """Register a UI object for a feature.""" try: self._uiObjects[objectKey] = {"objectKey": objectKey, "featureCode": featureCode, "label": label, "meta": meta or {}, "type": "UI"} return True except Exception as e: logger.error(f"Failed to register UI object {objectKey}: {e}") return False def registerResourceObject(self, featureCode: str, objectKey: str, label: str, meta: Optional[Dict[str, Any]] = None) -> bool: """Register a RESOURCE object for a feature.""" try: self._resourceObjects[objectKey] = {"objectKey": objectKey, "featureCode": featureCode, "label": label, "meta": meta or {}, "type": "RESOURCE"} return True except Exception as e: logger.error(f"Failed to register RESOURCE object {objectKey}: {e}") return False def registerDataObject(self, featureCode: str, objectKey: str, label: str, meta: Optional[Dict[str, Any]] = None) -> bool: """ Register a DATA object (table/entity) for a feature. Args: featureCode: Feature code (e.g., "trustee", "system") objectKey: Dot-notation key (e.g., "data.feature.trustee.TrusteeContract") label: German plaintext label (used as i18n key) meta: Optional metadata (e.g., table name, fields list) """ try: self._dataObjects[objectKey] = { "objectKey": objectKey, "featureCode": featureCode, "label": label, "meta": meta or {}, "type": "DATA" } return True except Exception as e: logger.error(f"Failed to register DATA object {objectKey}: {e}") return False def registerFeatureDefinition(self, featureCode: str, label: str, icon: str) -> bool: """Register a feature definition.""" try: self._featureDefinitions[featureCode] = {"code": featureCode, "label": label, "icon": icon} return True except Exception as e: logger.error(f"Failed to register feature definition {featureCode}: {e}") return False def registerTemplateRoles(self, featureCode: str, roles: List[Dict[str, Any]]) -> bool: """Register template roles for a feature.""" try: self._templateRoles[featureCode] = roles return True except Exception as e: logger.error(f"Failed to register template roles for {featureCode}: {e}") return False def getUiObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]: """Get all UI objects, optionally filtered by feature.""" if featureCode: return [obj for obj in self._uiObjects.values() if obj["featureCode"] == featureCode] return list(self._uiObjects.values()) def getResourceObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]: """Get all RESOURCE objects, optionally filtered by feature.""" if featureCode: return [obj for obj in self._resourceObjects.values() if obj["featureCode"] == featureCode] return list(self._resourceObjects.values()) def getDataObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]: """Get all DATA objects (tables/entities), optionally filtered by feature.""" if featureCode: return [obj for obj in self._dataObjects.values() if obj["featureCode"] == featureCode] return list(self._dataObjects.values()) def getAccessibleDataObjects( self, featureCode: str, rbacInstance, user, mandateId: str, featureInstanceId: str, ) -> List[Dict[str, Any]]: """Get DATA objects filtered by RBAC read permission for the user. Args: featureCode: Feature code to filter by rbacInstance: RbacClass instance for permission checks user: User object mandateId: Mandate scope featureInstanceId: Feature instance scope """ from modules.datamodels.datamodelRbac import AccessRuleContext allObjects = self.getDataObjects(featureCode) accessible = [] for obj in allObjects: objectKey = obj.get("objectKey", "") try: perms = rbacInstance.getUserPermissions( user=user, context=AccessRuleContext.DATA, item=objectKey, mandateId=mandateId, featureInstanceId=featureInstanceId, ) if perms.view or perms.read.value != "n": accessible.append(obj) except Exception: pass return accessible def getFeaturesWithDataObjects(self) -> List[str]: """Get feature codes that have at least one registered DATA object.""" codes = set() for obj in self._dataObjects.values(): codes.add(obj["featureCode"]) return list(codes) def getAllObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]: """Get all RBAC objects (UI + RESOURCE + DATA), optionally filtered by feature.""" return self.getUiObjects(featureCode) + self.getResourceObjects(featureCode) + self.getDataObjects(featureCode) def getAllCatalogObjects(self, featureCode: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]: """Get all catalog objects grouped by type (DATA, UI, RESOURCE).""" return { "DATA": self.getDataObjects(featureCode), "UI": self.getUiObjects(featureCode), "RESOURCE": self.getResourceObjects(featureCode) } def getFeatureDefinitions(self) -> List[Dict[str, Any]]: """Get all registered feature definitions.""" return list(self._featureDefinitions.values()) def getFeatureDefinition(self, featureCode: str) -> Optional[Dict[str, Any]]: """Get a specific feature definition.""" return self._featureDefinitions.get(featureCode) def getTemplateRoles(self, featureCode: str) -> List[Dict[str, Any]]: """Get template roles for a feature.""" return self._templateRoles.get(featureCode, []) def getAllTemplateRoles(self) -> Dict[str, List[Dict[str, Any]]]: """Get all template roles grouped by feature.""" return self._templateRoles.copy() def getRegisteredFeatures(self) -> List[str]: """Get list of all registered feature codes.""" return list(self._featureDefinitions.keys()) def unregisterFeature(self, featureCode: str) -> bool: """Unregister all objects for a feature.""" try: for key in [k for k, v in self._uiObjects.items() if v["featureCode"] == featureCode]: del self._uiObjects[key] for key in [k for k, v in self._resourceObjects.items() if v["featureCode"] == featureCode]: del self._resourceObjects[key] for key in [k for k, v in self._dataObjects.items() if v["featureCode"] == featureCode]: del self._dataObjects[key] self._featureDefinitions.pop(featureCode, None) self._templateRoles.pop(featureCode, None) logger.info(f"Unregistered feature: {featureCode}") return True except Exception as e: logger.error(f"Failed to unregister feature {featureCode}: {e}") return False def getCatalogStats(self) -> Dict[str, Any]: """Get statistics about the catalog.""" return { "features": len(self._featureDefinitions), "uiObjects": len(self._uiObjects), "resourceObjects": len(self._resourceObjects), "dataObjects": len(self._dataObjects), "templateRoles": sum(len(roles) for roles in self._templateRoles.values()) } # Singleton accessor _catalogService: Optional[RbacCatalogService] = None def getCatalogService() -> RbacCatalogService: """Get the singleton RBAC Catalog Service instance.""" global _catalogService if _catalogService is None: _catalogService = RbacCatalogService() return _catalogService