gateway/modules/security/rbacCatalog.py
2026-01-25 03:01:01 +01:00

192 lines
8.2 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
RBAC Catalog Service.
Central registry for Feature RBAC objects (UI and RESOURCE).
Feature-Container register their RBAC objects via mainXxx.py at startup.
"""
import logging
from typing import Dict, List, Any, Optional
from threading import Lock
logger = logging.getLogger(__name__)
class RbacCatalogService:
"""
Central RBAC Catalog for Feature UI and RESOURCE objects.
Singleton service that stores all registered RBAC objects from feature containers.
"""
_instance = None
_lock = Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._uiObjects: Dict[str, Dict[str, Any]] = {}
self._resourceObjects: Dict[str, Dict[str, Any]] = {}
self._dataObjects: Dict[str, Dict[str, Any]] = {} # DATA objects (tables/entities)
self._featureDefinitions: Dict[str, Dict[str, Any]] = {}
self._templateRoles: Dict[str, List[Dict[str, Any]]] = {}
self._initialized = True
logger.info("RBAC Catalog Service initialized")
def registerUiObject(self, featureCode: str, objectKey: str, label: Dict[str, str], meta: Optional[Dict[str, Any]] = None) -> bool:
"""Register a UI object for a feature."""
try:
self._uiObjects[objectKey] = {"objectKey": objectKey, "featureCode": featureCode, "label": label, "meta": meta or {}, "type": "UI"}
return True
except Exception as e:
logger.error(f"Failed to register UI object {objectKey}: {e}")
return False
def registerResourceObject(self, featureCode: str, objectKey: str, label: Dict[str, str], meta: Optional[Dict[str, Any]] = None) -> bool:
"""Register a RESOURCE object for a feature."""
try:
self._resourceObjects[objectKey] = {"objectKey": objectKey, "featureCode": featureCode, "label": label, "meta": meta or {}, "type": "RESOURCE"}
return True
except Exception as e:
logger.error(f"Failed to register RESOURCE object {objectKey}: {e}")
return False
def registerDataObject(self, featureCode: str, objectKey: str, label: Dict[str, str], meta: Optional[Dict[str, Any]] = None) -> bool:
"""
Register a DATA object (table/entity) for a feature.
Args:
featureCode: Feature code (e.g., "trustee", "system")
objectKey: Dot-notation key (e.g., "data.feature.trustee.TrusteeContract")
label: Multilingual label dict
meta: Optional metadata (e.g., table name, fields list)
"""
try:
self._dataObjects[objectKey] = {
"objectKey": objectKey,
"featureCode": featureCode,
"label": label,
"meta": meta or {},
"type": "DATA"
}
return True
except Exception as e:
logger.error(f"Failed to register DATA object {objectKey}: {e}")
return False
def registerFeatureDefinition(self, featureCode: str, label: Dict[str, str], icon: str) -> bool:
"""Register a feature definition."""
try:
self._featureDefinitions[featureCode] = {"code": featureCode, "label": label, "icon": icon}
return True
except Exception as e:
logger.error(f"Failed to register feature definition {featureCode}: {e}")
return False
def registerTemplateRoles(self, featureCode: str, roles: List[Dict[str, Any]]) -> bool:
"""Register template roles for a feature."""
try:
self._templateRoles[featureCode] = roles
return True
except Exception as e:
logger.error(f"Failed to register template roles for {featureCode}: {e}")
return False
def getUiObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all UI objects, optionally filtered by feature."""
if featureCode:
return [obj for obj in self._uiObjects.values() if obj["featureCode"] == featureCode]
return list(self._uiObjects.values())
def getResourceObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all RESOURCE objects, optionally filtered by feature."""
if featureCode:
return [obj for obj in self._resourceObjects.values() if obj["featureCode"] == featureCode]
return list(self._resourceObjects.values())
def getDataObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all DATA objects (tables/entities), optionally filtered by feature."""
if featureCode:
return [obj for obj in self._dataObjects.values() if obj["featureCode"] == featureCode]
return list(self._dataObjects.values())
def getAllObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all RBAC objects (UI + RESOURCE + DATA), optionally filtered by feature."""
return self.getUiObjects(featureCode) + self.getResourceObjects(featureCode) + self.getDataObjects(featureCode)
def getAllCatalogObjects(self, featureCode: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]:
"""Get all catalog objects grouped by type (DATA, UI, RESOURCE)."""
return {
"DATA": self.getDataObjects(featureCode),
"UI": self.getUiObjects(featureCode),
"RESOURCE": self.getResourceObjects(featureCode)
}
def getFeatureDefinitions(self) -> List[Dict[str, Any]]:
"""Get all registered feature definitions."""
return list(self._featureDefinitions.values())
def getFeatureDefinition(self, featureCode: str) -> Optional[Dict[str, Any]]:
"""Get a specific feature definition."""
return self._featureDefinitions.get(featureCode)
def getTemplateRoles(self, featureCode: str) -> List[Dict[str, Any]]:
"""Get template roles for a feature."""
return self._templateRoles.get(featureCode, [])
def getAllTemplateRoles(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get all template roles grouped by feature."""
return self._templateRoles.copy()
def getRegisteredFeatures(self) -> List[str]:
"""Get list of all registered feature codes."""
return list(self._featureDefinitions.keys())
def unregisterFeature(self, featureCode: str) -> bool:
"""Unregister all objects for a feature."""
try:
for key in [k for k, v in self._uiObjects.items() if v["featureCode"] == featureCode]:
del self._uiObjects[key]
for key in [k for k, v in self._resourceObjects.items() if v["featureCode"] == featureCode]:
del self._resourceObjects[key]
for key in [k for k, v in self._dataObjects.items() if v["featureCode"] == featureCode]:
del self._dataObjects[key]
self._featureDefinitions.pop(featureCode, None)
self._templateRoles.pop(featureCode, None)
logger.info(f"Unregistered feature: {featureCode}")
return True
except Exception as e:
logger.error(f"Failed to unregister feature {featureCode}: {e}")
return False
def getCatalogStats(self) -> Dict[str, Any]:
"""Get statistics about the catalog."""
return {
"features": len(self._featureDefinitions),
"uiObjects": len(self._uiObjects),
"resourceObjects": len(self._resourceObjects),
"dataObjects": len(self._dataObjects),
"templateRoles": sum(len(roles) for roles in self._templateRoles.values())
}
# Singleton accessor
_catalogService: Optional[RbacCatalogService] = None
def getCatalogService() -> RbacCatalogService:
"""Get the singleton RBAC Catalog Service instance."""
global _catalogService
if _catalogService is None:
_catalogService = RbacCatalogService()
return _catalogService