From 60d50622048adb58f2266fb61abb43185cc86d58 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Sun, 26 Apr 2026 18:13:11 +0200 Subject: [PATCH] fix model registration race locker --- modules/aicore/aicoreModelRegistry.py | 94 +++++++++++++++------------ 1 file changed, 54 insertions(+), 40 deletions(-) diff --git a/modules/aicore/aicoreModelRegistry.py b/modules/aicore/aicoreModelRegistry.py index f05745ac..1c50651d 100644 --- a/modules/aicore/aicoreModelRegistry.py +++ b/modules/aicore/aicoreModelRegistry.py @@ -38,6 +38,31 @@ class ModelRegistry: self._getAvailableModelsCache: Dict[Tuple[str, int], Tuple[List[AiModel], float]] = {} # (user_id, rbac_id) -> (models, ts) self._getAvailableModelsCacheTtl: float = 30.0 # seconds + def _addModelToDict(self, model: AiModel, connectorType: str, target: Dict[str, AiModel]): + """Add model to a dict, tolerating benign re-adds from the same connector.""" + if model.displayName in target: + existing = target[model.displayName] + if existing.name == model.name and existing.connectorType == model.connectorType: + logger.debug(f"Skipping duplicate model '{model.displayName}' from same connector {connectorType}") + return + raise ValueError( + f"displayName conflict '{model.displayName}': " + f"existing name='{existing.name}' (connector: {existing.connectorType}), " + f"new name='{model.name}' (connector: {connectorType})" + ) + + if TESTING_MAX_TOKENS_OVERRIDE is not None and model.maxTokens > TESTING_MAX_TOKENS_OVERRIDE: + originalMaxTokens = model.maxTokens + model.maxTokens = TESTING_MAX_TOKENS_OVERRIDE + logger.debug(f"TESTING: Overrode maxTokens for {model.displayName}: {originalMaxTokens} -> {TESTING_MAX_TOKENS_OVERRIDE}") + + target[model.displayName] = model + logger.debug(f"Registered model: {model.displayName} (name: {model.name}) from {connectorType}") + + def _addModel(self, model: AiModel, connectorType: str): + """Convenience wrapper for adding to self._models.""" + self._addModelToDict(model, connectorType, self._models) + def registerConnector(self, connector: BaseConnectorAi): """Register a connector and collect its models.""" connectorType = connector.getConnectorType() @@ -102,51 +127,40 @@ class ModelRegistry: self._connectorsInitialized = True def refreshModels(self, force: bool = False): - """Refresh models from all registered connectors.""" - import time - + """Refresh models from all registered connectors. Thread-safe via _refreshLock.""" self.ensureConnectorsRegistered() currentTime = time.time() - - # Check if refresh is needed - if (not force and - self._lastRefresh is not None and + + if (not force and + self._lastRefresh is not None and currentTime - self._lastRefresh < self._refreshInterval): return - - logger.info("Refreshing model registry...") - - # Clear existing models - self._models.clear() - - # Re-register all connectors - for connector in self._connectors.values(): - try: - connector.clearCache() # Clear connector cache - models = connector.getCachedModels() - for model in models: - # Validate displayName uniqueness - if model.displayName in self._models: - existingModel = self._models[model.displayName] - errorMsg = f"Duplicate displayName '{model.displayName}' detected! Existing model: displayName='{existingModel.displayName}', name='{existingModel.name}' (connector: {existingModel.connectorType}), New model: displayName='{model.displayName}', name='{model.name}' (connector: {connector.getConnectorType()}). displayName must be unique." - logger.error(errorMsg) - raise ValueError(errorMsg) - - # TODO TESTING: Override maxTokens if testing override is enabled - if TESTING_MAX_TOKENS_OVERRIDE is not None and model.maxTokens > TESTING_MAX_TOKENS_OVERRIDE: - originalMaxTokens = model.maxTokens - model.maxTokens = TESTING_MAX_TOKENS_OVERRIDE - logger.debug(f"TESTING: Overrode maxTokens for {model.displayName}: {originalMaxTokens} -> {TESTING_MAX_TOKENS_OVERRIDE}") - - # Use displayName as the key (must be unique) - self._models[model.displayName] = model - except Exception as e: - logger.error(f"Failed to refresh models from {connector.getConnectorType()}: {e}") - raise - - self._lastRefresh = currentTime - logger.info(f"Model registry refreshed: {len(self._models)} models available") + + if not self._refreshLock.acquire(blocking=False): + logger.debug("refreshModels already running in another thread, skipping") + return + + try: + logger.info("Refreshing model registry...") + newModels: Dict[str, AiModel] = {} + + for connector in self._connectors.values(): + connectorType = connector.getConnectorType() + try: + connector.clearCache() + models = connector.getCachedModels() + for model in models: + self._addModelToDict(model, connectorType, newModels) + except Exception as e: + logger.error(f"Failed to refresh models from {connectorType}: {e}") + raise + + self._models = newModels + self._lastRefresh = time.time() + logger.info(f"Model registry refreshed: {len(self._models)} models available") + finally: + self._refreshLock.release() def getModel(self, displayName: str) -> Optional[AiModel]: """Get a specific model by displayName (displayName must be unique)."""