515 lines
18 KiB
Python
515 lines
18 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
System Routes - Navigation and system-level API endpoints.
|
|
|
|
Navigation API Konzept:
|
|
- Single Source of Truth für Navigation im Gateway
|
|
- UI rendert nur was es erhält (keine Permission-Logik im UI)
|
|
- Keine Icons in API Response - UI mappt selbst via uiComponent
|
|
- Blocks statt Sections mit order auf allen Ebenen
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Any, Optional
|
|
from fastapi import APIRouter, Depends, Request, Query
|
|
from slowapi import Limiter
|
|
from slowapi.util import get_remote_address
|
|
|
|
from modules.auth.authentication import getRequestContext, RequestContext
|
|
from modules.system.mainSystem import NAVIGATION_SECTIONS, _objectKeyToUiComponent
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole, FeatureAccess, FeatureAccessRole
|
|
|
|
logger = logging.getLogger(__name__)
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
# Main system router (for other system endpoints if needed)
|
|
router = APIRouter(prefix="/api/system", tags=["System"])
|
|
|
|
# Navigation router at /api/navigation (gemäss Navigation-API-Konzept)
|
|
navigationRouter = APIRouter(prefix="/api", tags=["Navigation"])
|
|
|
|
|
|
def _getUserRoleIds(userId: str) -> List[str]:
|
|
"""Get all role IDs for a user across all their mandates."""
|
|
rootInterface = getRootInterface()
|
|
roleIds = []
|
|
|
|
userMandates = rootInterface.db.getRecordset(
|
|
UserMandate,
|
|
recordFilter={"userId": userId, "enabled": True}
|
|
)
|
|
|
|
for um in userMandates:
|
|
mandateRoleIds = rootInterface.getRoleIdsForUserMandate(um.get("id"))
|
|
for rid in mandateRoleIds:
|
|
if rid not in roleIds:
|
|
roleIds.append(rid)
|
|
|
|
return roleIds
|
|
|
|
|
|
def _checkUiPermission(roleIds: List[str], objectKey: str) -> bool:
|
|
"""Check if any of the given roles has view permission for the UI object."""
|
|
if not roleIds:
|
|
return False
|
|
|
|
rootInterface = getRootInterface()
|
|
|
|
for roleId in roleIds:
|
|
# Get UI rules for this role
|
|
rules = rootInterface.db.getRecordset(
|
|
AccessRule,
|
|
recordFilter={"roleId": roleId, "context": "UI"}
|
|
)
|
|
|
|
for rule in rules:
|
|
ruleItem = rule.get("item")
|
|
ruleView = rule.get("view", False)
|
|
|
|
if not ruleView:
|
|
continue
|
|
|
|
# Global rule (item=None) grants access to all UI
|
|
if ruleItem is None:
|
|
return True
|
|
|
|
# Exact match
|
|
if ruleItem == objectKey:
|
|
return True
|
|
|
|
# Wildcard match (e.g., ui.system.* matches ui.system.playground)
|
|
if ruleItem.endswith(".*"):
|
|
prefix = ruleItem[:-2]
|
|
if objectKey.startswith(prefix):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# Navigation API (gemäss Navigation-API-Konzept)
|
|
# Endpoint: GET /api/navigation
|
|
# =============================================================================
|
|
|
|
def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get UI objects for a feature from its main module.
|
|
Returns list of UI objects with objectKey, label, meta (including path).
|
|
"""
|
|
try:
|
|
# Dynamic import based on feature code
|
|
if featureCode == "trustee":
|
|
from modules.features.trustee.mainTrustee import UI_OBJECTS
|
|
return UI_OBJECTS
|
|
elif featureCode == "realestate":
|
|
from modules.features.realEstate.mainRealEstate import UI_OBJECTS
|
|
return UI_OBJECTS
|
|
else:
|
|
logger.warning(f"Unknown feature code: {featureCode}")
|
|
return []
|
|
except ImportError as e:
|
|
logger.error(f"Failed to import UI_OBJECTS for feature {featureCode}: {e}")
|
|
return []
|
|
|
|
|
|
def _buildDynamicBlock(
|
|
userId: str,
|
|
language: str,
|
|
isSysAdmin: bool
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Build the dynamic features block with mandates, features, and instances.
|
|
|
|
Returns None if user has no feature instances.
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
# Get all feature accesses for this user
|
|
featureAccesses = rootInterface.getFeatureAccessesForUser(userId)
|
|
|
|
if not featureAccesses:
|
|
return None
|
|
|
|
# Build hierarchical structure: mandate -> feature -> instances
|
|
mandatesMap: Dict[str, Dict[str, Any]] = {}
|
|
featuresMap: Dict[str, Dict[str, Any]] = {} # key: mandateId_featureCode
|
|
|
|
mandateOrder = 10
|
|
for access in featureAccesses:
|
|
if not access.enabled:
|
|
continue
|
|
|
|
instance = featureInterface.getFeatureInstance(str(access.featureInstanceId))
|
|
if not instance or not instance.enabled:
|
|
continue
|
|
|
|
# Get mandate info
|
|
mandateId = str(instance.mandateId)
|
|
if mandateId not in mandatesMap:
|
|
mandate = rootInterface.getMandate(mandateId)
|
|
mandateName = mandate.name if mandate and hasattr(mandate, 'name') else mandateId
|
|
mandatesMap[mandateId] = {
|
|
"id": mandateId,
|
|
"uiLabel": mandateName,
|
|
"order": mandateOrder,
|
|
"features": []
|
|
}
|
|
mandateOrder += 10
|
|
|
|
# Get feature info
|
|
featureKey = f"{mandateId}_{instance.featureCode}"
|
|
if featureKey not in featuresMap:
|
|
feature = featureInterface.getFeature(instance.featureCode)
|
|
|
|
# Handle featureLabel - could be a dict or a Pydantic model (TextMultilingual)
|
|
if feature and hasattr(feature, 'label'):
|
|
featureLabel = feature.label
|
|
# Convert Pydantic model to dict if needed
|
|
if hasattr(featureLabel, 'model_dump'):
|
|
featureLabel = featureLabel.model_dump()
|
|
elif hasattr(featureLabel, 'dict'):
|
|
featureLabel = featureLabel.dict()
|
|
elif not isinstance(featureLabel, dict):
|
|
# Fallback: try to access as attributes
|
|
featureLabel = {"de": getattr(featureLabel, 'de', instance.featureCode), "en": getattr(featureLabel, 'en', instance.featureCode)}
|
|
else:
|
|
featureLabel = {"de": instance.featureCode, "en": instance.featureCode}
|
|
|
|
featuresMap[featureKey] = {
|
|
"uiComponent": f"feature.{instance.featureCode}",
|
|
"uiLabel": featureLabel.get(language, featureLabel.get("en", instance.featureCode)),
|
|
"order": 10,
|
|
"instances": [],
|
|
"_mandateId": mandateId,
|
|
"_featureCode": instance.featureCode
|
|
}
|
|
|
|
# Get user's permissions for this instance to filter views
|
|
permissions = _getInstanceViewPermissions(rootInterface, userId, str(instance.id), isSysAdmin)
|
|
|
|
# Get feature UI objects to build views
|
|
featureUiObjects = _getFeatureUiObjects(instance.featureCode)
|
|
|
|
# Build views for this instance
|
|
views = []
|
|
viewOrder = 10
|
|
for uiObj in featureUiObjects:
|
|
objectKey = uiObj.get("objectKey", "")
|
|
# Extract view name from objectKey for path building
|
|
viewName = objectKey.split(".")[-1] if objectKey else ""
|
|
|
|
# Check permission using full objectKey (as per Navigation-API-Konzept)
|
|
if not isSysAdmin and not permissions.get("_all") and not permissions.get(objectKey, False):
|
|
continue
|
|
|
|
# Skip admin-only views for non-admins
|
|
meta = uiObj.get("meta", {})
|
|
if meta.get("admin_only") and not isSysAdmin and not permissions.get("isAdmin", False):
|
|
continue
|
|
|
|
# Build path for this view
|
|
viewPath = f"/mandates/{mandateId}/{instance.featureCode}/{instance.id}/{viewName}"
|
|
|
|
# Get label in requested language
|
|
label = uiObj.get("label", {})
|
|
uiLabel = label.get(language, label.get("en", viewName))
|
|
|
|
views.append({
|
|
"uiComponent": f"page.feature.{instance.featureCode}.{viewName}",
|
|
"uiLabel": uiLabel,
|
|
"uiPath": viewPath,
|
|
"order": viewOrder,
|
|
"objectKey": objectKey
|
|
})
|
|
viewOrder += 10
|
|
|
|
# Sort views by order
|
|
views.sort(key=lambda v: v["order"])
|
|
|
|
# Add instance to feature
|
|
featuresMap[featureKey]["instances"].append({
|
|
"id": str(instance.id),
|
|
"uiLabel": instance.label,
|
|
"order": 10,
|
|
"views": views
|
|
})
|
|
|
|
# Build final structure
|
|
for featureKey, featureData in featuresMap.items():
|
|
mandateId = featureData.pop("_mandateId")
|
|
featureData.pop("_featureCode")
|
|
mandatesMap[mandateId]["features"].append(featureData)
|
|
|
|
# Sort features within each mandate
|
|
for mandate in mandatesMap.values():
|
|
mandate["features"].sort(key=lambda f: f["order"])
|
|
|
|
# Convert to list and sort by order
|
|
mandatesList = list(mandatesMap.values())
|
|
mandatesList.sort(key=lambda m: m["order"])
|
|
|
|
if not mandatesList:
|
|
return None
|
|
|
|
return {
|
|
"type": "dynamic",
|
|
"id": "features",
|
|
"title": "MEINE FEATURES",
|
|
"order": 15, # Between system (10) and workflows (20)
|
|
"mandates": mandatesList
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error building dynamic block: {e}")
|
|
return None
|
|
|
|
|
|
def _getInstanceViewPermissions(
|
|
rootInterface,
|
|
userId: str,
|
|
instanceId: str,
|
|
isSysAdmin: bool
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get view permissions for a user in a feature instance.
|
|
Returns dict with view names as keys and True/False as values.
|
|
Also includes "_all" if user has global view access.
|
|
"""
|
|
if isSysAdmin:
|
|
return {"_all": True, "isAdmin": True}
|
|
|
|
permissions = {"_all": False, "isAdmin": False}
|
|
|
|
try:
|
|
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext, Role
|
|
|
|
# Get FeatureAccess for this user and instance
|
|
featureAccesses = rootInterface.db.getRecordset(
|
|
FeatureAccess,
|
|
recordFilter={"userId": userId, "featureInstanceId": instanceId}
|
|
)
|
|
|
|
if not featureAccesses:
|
|
return permissions
|
|
|
|
# Get role IDs via FeatureAccessRole junction table
|
|
featureAccessId = featureAccesses[0].get("id")
|
|
featureAccessRoles = rootInterface.db.getRecordset(
|
|
FeatureAccessRole,
|
|
recordFilter={"featureAccessId": featureAccessId}
|
|
)
|
|
roleIds = [far.get("roleId") for far in featureAccessRoles]
|
|
|
|
if not roleIds:
|
|
return permissions
|
|
|
|
# Check if user has admin role
|
|
for roleId in roleIds:
|
|
roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if roles:
|
|
roleLabel = roles[0].get("roleLabel", "").lower()
|
|
if "admin" in roleLabel:
|
|
permissions["isAdmin"] = True
|
|
break
|
|
|
|
# Get UI permissions from AccessRules
|
|
# Permissions are stored with full objectKey (e.g., ui.feature.trustee.dashboard)
|
|
for roleId in roleIds:
|
|
accessRules = rootInterface.db.getRecordset(
|
|
AccessRule,
|
|
recordFilter={"roleId": roleId, "context": "UI"}
|
|
)
|
|
|
|
logger.debug(f"_getInstanceViewPermissions: roleId={roleId}, UI rules count={len(accessRules)}")
|
|
|
|
for rule in accessRules:
|
|
if not rule.get("view", False):
|
|
continue
|
|
|
|
item = rule.get("item")
|
|
logger.debug(f"_getInstanceViewPermissions: rule item={item}, view={rule.get('view')}")
|
|
|
|
if item is None:
|
|
# item=None means all views
|
|
permissions["_all"] = True
|
|
else:
|
|
# Store full objectKey as per Navigation-API-Konzept
|
|
permissions[item] = True
|
|
|
|
logger.debug(f"_getInstanceViewPermissions: final permissions={permissions}")
|
|
return permissions
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error getting instance view permissions: {e}")
|
|
return permissions
|
|
|
|
|
|
def _buildStaticBlocks(
|
|
language: str,
|
|
isSysAdmin: bool,
|
|
roleIds: List[str],
|
|
hasGlobalPermission: bool
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Build static navigation blocks from NAVIGATION_SECTIONS.
|
|
|
|
Returns list of blocks with items filtered by permissions.
|
|
"""
|
|
blocks = []
|
|
|
|
for section in NAVIGATION_SECTIONS:
|
|
# Skip admin-only sections for non-admins
|
|
if section.get("adminOnly") and not isSysAdmin:
|
|
continue
|
|
|
|
# Filter items based on permissions
|
|
filteredItems = []
|
|
for item in section.get("items", []):
|
|
# Skip admin-only items for non-admins
|
|
if item.get("adminOnly") and not isSysAdmin:
|
|
continue
|
|
|
|
# Public items are always visible
|
|
if item.get("public"):
|
|
filteredItems.append(_formatBlockItem(item, language))
|
|
continue
|
|
|
|
# SysAdmin sees everything
|
|
if isSysAdmin:
|
|
filteredItems.append(_formatBlockItem(item, language))
|
|
continue
|
|
|
|
# Check permission for this item
|
|
if hasGlobalPermission or _checkUiPermission(roleIds, item["objectKey"]):
|
|
filteredItems.append(_formatBlockItem(item, language))
|
|
|
|
# Only include section if it has visible items
|
|
if filteredItems:
|
|
# Sort items by order
|
|
filteredItems.sort(key=lambda i: i["order"])
|
|
|
|
blocks.append({
|
|
"type": "static",
|
|
"id": section["id"],
|
|
"title": section["title"].get(language, section["title"].get("en", section["id"])),
|
|
"order": section.get("order", 50),
|
|
"items": filteredItems,
|
|
})
|
|
|
|
return blocks
|
|
|
|
|
|
def _formatBlockItem(item: Dict[str, Any], language: str) -> Dict[str, Any]:
|
|
"""
|
|
Format a navigation item for the new API response.
|
|
|
|
Uses new field names: uiComponent, uiLabel, uiPath
|
|
Does NOT include icon (UI maps via uiComponent)
|
|
"""
|
|
objectKey = item["objectKey"]
|
|
uiComponent = _objectKeyToUiComponent(objectKey)
|
|
|
|
return {
|
|
"uiComponent": uiComponent,
|
|
"uiLabel": item["label"].get(language, item["label"].get("en", item["id"])),
|
|
"uiPath": item["path"],
|
|
"order": item.get("order", 50),
|
|
"objectKey": objectKey,
|
|
}
|
|
|
|
|
|
@navigationRouter.get("/navigation")
|
|
@limiter.limit("60/minute")
|
|
async def get_navigation(
|
|
request: Request,
|
|
language: str = Query("de", description="Language for labels (en, de, fr)"),
|
|
reqContext: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get unified navigation structure with blocks.
|
|
|
|
Single Source of Truth für Navigation - UI rendert nur was es erhält.
|
|
|
|
Endpoint: GET /api/navigation
|
|
|
|
Block order:
|
|
- System (10)
|
|
- Dynamic/Features (15) - only if user has feature instances
|
|
- Workflows (20)
|
|
- Basisdaten (30)
|
|
- Migrate (40)
|
|
- Administration (200)
|
|
|
|
Response format:
|
|
{
|
|
"language": "de",
|
|
"blocks": [
|
|
{
|
|
"type": "static",
|
|
"id": "system",
|
|
"title": "SYSTEM",
|
|
"order": 10,
|
|
"items": [
|
|
{
|
|
"uiComponent": "page.system.home",
|
|
"uiLabel": "Übersicht",
|
|
"uiPath": "/",
|
|
"order": 10,
|
|
"objectKey": "ui.system.home"
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": "dynamic",
|
|
"id": "features",
|
|
"title": "MEINE FEATURES",
|
|
"order": 15,
|
|
"mandates": [...]
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
try:
|
|
isSysAdmin = reqContext.isSysAdmin
|
|
userId = str(reqContext.user.id) if reqContext.user else None
|
|
|
|
# Get user's role IDs for permission checking
|
|
roleIds = []
|
|
if userId and not isSysAdmin:
|
|
roleIds = _getUserRoleIds(userId)
|
|
|
|
# Check if user has global UI permission
|
|
hasGlobalPermission = isSysAdmin
|
|
if not hasGlobalPermission and roleIds:
|
|
hasGlobalPermission = _checkUiPermission(roleIds, "_global_check")
|
|
|
|
# Build static blocks from NAVIGATION_SECTIONS
|
|
blocks = _buildStaticBlocks(language, isSysAdmin, roleIds, hasGlobalPermission)
|
|
|
|
# Build dynamic block (features) if user has feature instances
|
|
if userId:
|
|
dynamicBlock = _buildDynamicBlock(userId, language, isSysAdmin)
|
|
if dynamicBlock:
|
|
blocks.append(dynamicBlock)
|
|
|
|
# Sort all blocks by order
|
|
blocks.sort(key=lambda b: b["order"])
|
|
|
|
return {
|
|
"language": language,
|
|
"blocks": blocks,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting navigation: {e}")
|
|
return {
|
|
"language": language,
|
|
"blocks": [],
|
|
"error": str(e),
|
|
}
|