# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Service Center Resolver. Resolution logic, dependency injection, and optional legacy fallback. """ import importlib import logging from typing import Any, Callable, Dict, Optional, Set from modules.serviceCenter.context import ServiceCenterContext from modules.serviceCenter.registry import CORE_SERVICES, IMPORTABLE_SERVICES logger = logging.getLogger(__name__) # Type for get_service callable passed to services GetServiceFunc = Callable[[str], Any] def _make_context_id(ctx: ServiceCenterContext) -> str: """Create a stable cache key from context.""" return f"{id(ctx.user)}_{ctx.mandate_id or ''}_{ctx.feature_instance_id or ''}" def _load_service_class(module_path: str, class_name: str): """Load service class from module.""" module = importlib.import_module(module_path) return getattr(module, class_name) def _create_legacy_hub(ctx: ServiceCenterContext) -> Any: """Create legacy Services instance for fallback when service not yet migrated.""" from modules.services import getInterface return getInterface( ctx.user, workflow=ctx.workflow, mandateId=ctx.mandate_id, featureInstanceId=ctx.feature_instance_id, ) def _get_from_legacy(legacy_hub: Any, key: str) -> Any: """Map service key to legacy hub attribute (for fallback when service center module fails).""" key_to_attr = { "utils": "utils", "security": "security", "streaming": "streaming", "ticket": "ticket", "messaging": "messaging", "billing": "billing", "sharepoint": "sharepoint", "chat": "chat", "extraction": "extraction", "generation": "generation", "ai": "ai", "web": "web", "neutralization": "neutralization", } attr = key_to_attr.get(key) if attr and hasattr(legacy_hub, attr): return getattr(legacy_hub, attr) return None def resolve( key: str, context: ServiceCenterContext, cache: Dict[str, Any], resolving: Set[str], legacy_hub: Optional[Any] = None, ) -> Any: """ Resolve a service by key. Uses cache, resolves dependencies recursively. Falls back to legacy_hub if service module cannot be loaded. """ cache_key = f"{_make_context_id(context)}_{key}" if cache_key in cache: return cache[cache_key] if key in resolving: raise RuntimeError(f"Circular dependency detected for service: {key}") def get_service(dep_key: str) -> Any: return resolve(dep_key, context, cache, resolving, legacy_hub) # Try core first if key in CORE_SERVICES: spec = CORE_SERVICES[key] try: cls = _load_service_class(spec["module"], spec["class"]) resolving.add(key) try: for dep in spec.get("dependencies", []): get_service(dep) finally: resolving.discard(key) instance = cls(context, get_service) cache[cache_key] = instance return instance except (ImportError, ModuleNotFoundError, AttributeError) as e: logger.debug(f"Could not load core service '{key}' from service center: {e}") if legacy_hub: fallback = _get_from_legacy(legacy_hub, key) if fallback is not None: cache[cache_key] = fallback return fallback raise # Try importable if key in IMPORTABLE_SERVICES: spec = IMPORTABLE_SERVICES[key] try: cls = _load_service_class(spec["module"], spec["class"]) resolving.add(key) try: for dep in spec.get("dependencies", []): get_service(dep) finally: resolving.discard(key) instance = cls(context, get_service) cache[cache_key] = instance return instance except (ImportError, ModuleNotFoundError, AttributeError) as e: logger.debug(f"Could not load importable service '{key}' from service center: {e}") if legacy_hub: fallback = _get_from_legacy(legacy_hub, key) if fallback is not None: cache[cache_key] = fallback return fallback raise if legacy_hub: fallback = _get_from_legacy(legacy_hub, key) if fallback is not None: cache[cache_key] = fallback return fallback raise KeyError(f"Unknown service: {key}") # Module-level cache for service instances (per context) _resolution_cache: Dict[str, Any] = {} _cache_lock: Optional[Any] = None try: from threading import Lock _cache_lock = Lock() except ImportError: pass def get_resolution_cache() -> Dict[str, Any]: """Get the module-level resolution cache (for preWarm/clear).""" return _resolution_cache def clear_cache() -> None: """Clear the resolution cache.""" lock = _cache_lock if _cache_lock is not None else _DummyLock() with lock: _resolution_cache.clear() class _DummyLock: def __enter__(self): return self def __exit__(self, *args): pass