platform-core/modules/serviceCenter/__init__.py
ValueOn AG 26dd8f6f3f
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 12s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
cleanup intra referencings in codebase
2026-06-09 07:05:06 +02:00

137 lines
3.8 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Service Center.
Central registry for core and importable services with per-feature resolution.
"""
import logging
from typing import Any, List, Optional
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.registry import (
CORE_SERVICES,
IMPORTABLE_SERVICES,
SERVICE_RBAC_OBJECTS,
)
from modules.serviceCenter.resolver import (
resolve,
getResolutionCache,
clearCache,
)
from modules.serviceCenter.services.serviceAgent.mainServiceAgent import ServicesBag
logger = logging.getLogger(__name__)
def getService(
key: str,
context: ServiceCenterContext,
) -> Any:
"""
Get a service instance by key for the given context.
Args:
key: Service key (e.g., "web", "extraction", "utils")
context: ServiceCenterContext with user, mandateId, featureInstanceId, workflow
Returns:
Service instance
"""
cache = getResolutionCache()
resolving = set()
return resolve(key, context, cache, resolving)
def preWarm(service_keys: Optional[List[str]] = None) -> None:
"""
Pre-load service modules at startup to avoid first-request latency.
Args:
service_keys: Optional list of keys to preload. If None, preloads all registered services.
"""
import importlib
keys = service_keys or list(CORE_SERVICES.keys()) + list(IMPORTABLE_SERVICES.keys())
for key in keys:
spec = CORE_SERVICES.get(key) or IMPORTABLE_SERVICES.get(key)
if not spec:
continue
try:
importlib.import_module(spec["module"])
logger.debug(f"Pre-warmed service module: {key}")
except (ImportError, ModuleNotFoundError) as e:
logger.debug(f"Could not pre-warm {key}: {e}")
def registerServiceObjects(catalogService) -> bool:
"""Register service RBAC objects in the catalog. Call at startup."""
try:
for obj in SERVICE_RBAC_OBJECTS:
catalogService.registerResourceObject(
featureCode="system",
objectKey=obj["objectKey"],
label=obj["label"],
meta=obj.get("meta"),
)
logger.info(f"Registered {len(SERVICE_RBAC_OBJECTS)} service RBAC objects")
return True
except Exception as e:
logger.error(f"Failed to register service RBAC objects: {e}")
return False
def canAccessService(
user,
rbac,
serviceKey: str,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
allowWhenNoRbac: bool = True,
) -> bool:
"""
Check if user has permission to access the given service.
Args:
user: User object
rbac: RbacClass instance (e.g. from interfaceDbApp.rbac)
serviceKey: Service key (e.g., "web", "extraction")
mandateId: Optional mandate context
featureInstanceId: Optional feature instance context
allowWhenNoRbac: If True, allow when rbac is None (migration/default)
Returns:
True if user has view permission on the service
"""
if not rbac:
return allowWhenNoRbac
if serviceKey not in IMPORTABLE_SERVICES:
return False
obj = IMPORTABLE_SERVICES[serviceKey]
objectKey = obj.get("objectKey")
if not objectKey:
return False
from modules.datamodels.datamodelRbac import AccessRuleContext
permissions = rbac.getUserPermissions(
user,
AccessRuleContext.RESOURCE,
objectKey,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
)
return permissions.view if permissions else False
__all__ = [
"ServiceCenterContext",
"ServicesBag",
"getService",
"preWarm",
"clearCache",
"registerServiceObjects",
"canAccessService",
"SERVICE_RBAC_OBJECTS",
"CORE_SERVICES",
"IMPORTABLE_SERVICES",
]