gateway/modules/serviceCenter/resolver.py
2026-03-06 14:03:18 +01:00

170 lines
5.2 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Service Center Resolver.
Resolution logic, dependency injection, and optional legacy fallback.
"""
import importlib
import logging
from typing import Any, Callable, Dict, Optional, Set
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.registry import CORE_SERVICES, IMPORTABLE_SERVICES
logger = logging.getLogger(__name__)
# Type for get_service callable passed to services
GetServiceFunc = Callable[[str], Any]
def _make_context_id(ctx: ServiceCenterContext) -> str:
"""Create a stable cache key from context."""
return f"{id(ctx.user)}_{ctx.mandate_id or ''}_{ctx.feature_instance_id or ''}"
def _load_service_class(module_path: str, class_name: str):
"""Load service class from module."""
module = importlib.import_module(module_path)
return getattr(module, class_name)
def _create_legacy_hub(ctx: ServiceCenterContext) -> Any:
"""Create legacy Services instance for fallback when service not yet migrated."""
from modules.services import getInterface
return getInterface(
ctx.user,
workflow=ctx.workflow,
mandateId=ctx.mandate_id,
featureInstanceId=ctx.feature_instance_id,
)
def _get_from_legacy(legacy_hub: Any, key: str) -> Any:
"""Map service key to legacy hub attribute (for fallback when service center module fails)."""
key_to_attr = {
"utils": "utils",
"security": "security",
"streaming": "streaming",
"ticket": "ticket",
"messaging": "messaging",
"billing": "billing",
"sharepoint": "sharepoint",
"chat": "chat",
"extraction": "extraction",
"generation": "generation",
"ai": "ai",
"web": "web",
"neutralization": "neutralization",
}
attr = key_to_attr.get(key)
if attr and hasattr(legacy_hub, attr):
return getattr(legacy_hub, attr)
return None
def resolve(
key: str,
context: ServiceCenterContext,
cache: Dict[str, Any],
resolving: Set[str],
legacy_hub: Optional[Any] = None,
) -> Any:
"""
Resolve a service by key. Uses cache, resolves dependencies recursively.
Falls back to legacy_hub if service module cannot be loaded.
"""
cache_key = f"{_make_context_id(context)}_{key}"
if cache_key in cache:
return cache[cache_key]
if key in resolving:
raise RuntimeError(f"Circular dependency detected for service: {key}")
def get_service(dep_key: str) -> Any:
return resolve(dep_key, context, cache, resolving, legacy_hub)
# Try core first
if key in CORE_SERVICES:
spec = CORE_SERVICES[key]
try:
cls = _load_service_class(spec["module"], spec["class"])
resolving.add(key)
try:
for dep in spec.get("dependencies", []):
get_service(dep)
finally:
resolving.discard(key)
instance = cls(context, get_service)
cache[cache_key] = instance
return instance
except (ImportError, ModuleNotFoundError, AttributeError) as e:
logger.debug(f"Could not load core service '{key}' from service center: {e}")
if legacy_hub:
fallback = _get_from_legacy(legacy_hub, key)
if fallback is not None:
cache[cache_key] = fallback
return fallback
raise
# Try importable
if key in IMPORTABLE_SERVICES:
spec = IMPORTABLE_SERVICES[key]
try:
cls = _load_service_class(spec["module"], spec["class"])
resolving.add(key)
try:
for dep in spec.get("dependencies", []):
get_service(dep)
finally:
resolving.discard(key)
instance = cls(context, get_service)
cache[cache_key] = instance
return instance
except (ImportError, ModuleNotFoundError, AttributeError) as e:
logger.debug(f"Could not load importable service '{key}' from service center: {e}")
if legacy_hub:
fallback = _get_from_legacy(legacy_hub, key)
if fallback is not None:
cache[cache_key] = fallback
return fallback
raise
if legacy_hub:
fallback = _get_from_legacy(legacy_hub, key)
if fallback is not None:
cache[cache_key] = fallback
return fallback
raise KeyError(f"Unknown service: {key}")
# Module-level cache for service instances (per context)
_resolution_cache: Dict[str, Any] = {}
_cache_lock: Optional[Any] = None
try:
from threading import Lock
_cache_lock = Lock()
except ImportError:
pass
def get_resolution_cache() -> Dict[str, Any]:
"""Get the module-level resolution cache (for preWarm/clear)."""
return _resolution_cache
def clear_cache() -> None:
"""Clear the resolution cache."""
lock = _cache_lock if _cache_lock is not None else _DummyLock()
with lock:
_resolution_cache.clear()
class _DummyLock:
def __enter__(self):
return self
def __exit__(self, *args):
pass