From 74f6f35ad46d41728c19f0b2071be42dd3c1516c Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Thu, 4 Jun 2026 23:56:58 +0200 Subject: [PATCH] fix: resolve datasource labels on reload, faster shutdown Co-authored-by: Cursor --- app.py | 26 ++++++++------- .../workspace/routeFeatureWorkspace.py | 32 +++++++++++++++++++ .../core/serviceStreaming/eventManager.py | 14 ++++++-- 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/app.py b/app.py index 07bdf98f..11068505 100644 --- a/app.py +++ b/app.py @@ -435,8 +435,19 @@ async def lifespan(app: FastAPI): # --- Shutdown sequence (protected against CancelledError) --- try: - # 1. Signal DB layer to abort in-flight borrow waits immediately. - # This MUST happen first so that sync worker threads stuck in + # 1. Drain SSE queues and cancel agent tasks FIRST so that open + # streaming connections break out of their queue.get() loop + # immediately. Without this, uvicorn waits for the SSE generators + # to finish (up to 120 s keepalive timeout) before the rest of + # the shutdown can proceed. + try: + from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager as _getStreamingEM + _getStreamingEM().shutdown() + except Exception as e: + logger.warning(f"Streaming EventManager shutdown failed: {e}") + + # 2. Signal DB layer to abort in-flight borrow waits immediately. + # This MUST happen early so that sync worker threads stuck in # _acquireConn (30 s poll loop) bail out within one backoff tick # instead of blocking process exit for the full borrow timeout. try: @@ -445,10 +456,10 @@ async def lifespan(app: FastAPI): except Exception as e: logger.warning(f"Closing DB connection pools failed: {e}") - # 2. Stop scheduler (removes all pending cron/interval jobs) + # 3. Stop scheduler (removes all pending cron/interval jobs) eventManager.stop() - # 3. Stop Feature Containers (Plug&Play) + # 4. Stop Feature Containers (Plug&Play) try: mainModules = loadFeatureMainModules() for featureName, module in mainModules.items(): @@ -461,13 +472,6 @@ async def lifespan(app: FastAPI): except Exception as e: logger.warning(f"Could not shutdown feature containers: {e}") - # 4. Cancel all pending streaming EventManager tasks (cleanup sleeps, agent tasks) - try: - from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager as _getStreamingEM - _getStreamingEM().shutdown() - except Exception as e: - logger.warning(f"Streaming EventManager shutdown failed: {e}") - # 5. Close shared HTTP sessions (ResilientHttp) to avoid TCP keepalive hang try: from modules.connectors._httpResilience import closeAllResilientHttp diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py index 6b701432..55526c55 100644 --- a/modules/features/workspace/routeFeatureWorkspace.py +++ b/modules/features/workspace/routeFeatureWorkspace.py @@ -1163,10 +1163,42 @@ async def getWorkspaceMessages( attachedFdsIds = list(getattr(wf, "attachedFeatureDataSourceIds", None) or []) except Exception as e: logger.debug(f"getWorkspaceMessages: cannot read attachments for {workflowId}: {e}") + + attachedDsLabels: Dict[str, str] = {} + attachedFdsLabels: Dict[str, str] = {} + if attachedDsIds or attachedFdsIds: + from modules.interfaces.interfaceDbApp import getRootInterface + rootIf = getRootInterface() + if attachedDsIds: + from modules.datamodels.datamodelDataSource import DataSource as _DS + for dsId in attachedDsIds: + try: + records = rootIf.db.getRecordset(_DS, recordFilter={"id": dsId}) + if records: + lbl = records[0].get("label") or records[0].get("path") or "" + if lbl: + attachedDsLabels[dsId] = str(lbl) + except Exception: + pass + if attachedFdsIds: + from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource as _FDS + for fdsId in attachedFdsIds: + try: + records = rootIf.db.getRecordset(_FDS, recordFilter={"id": fdsId}) + if records: + tbl = records[0].get("tableName") or "" + lbl = records[0].get("label") or tbl + if lbl: + attachedFdsLabels[fdsId] = str(lbl) + except Exception: + pass + return JSONResponse({ "messages": items, "attachedDataSourceIds": attachedDsIds, "attachedFeatureDataSourceIds": attachedFdsIds, + "attachedDataSourceLabels": attachedDsLabels, + "attachedFeatureDataSourceLabels": attachedFdsLabels, }) diff --git a/modules/serviceCenter/core/serviceStreaming/eventManager.py b/modules/serviceCenter/core/serviceStreaming/eventManager.py index bc1fb3c6..180430eb 100644 --- a/modules/serviceCenter/core/serviceStreaming/eventManager.py +++ b/modules/serviceCenter/core/serviceStreaming/eventManager.py @@ -182,7 +182,17 @@ class EventManager: self._cleanup_tasks[workflow_id] = task def shutdown(self) -> None: - """Cancel all pending cleanup and agent tasks for fast process exit.""" + """Cancel all pending cleanup and agent tasks for fast process exit. + + Injects ``None`` sentinels into every live queue so that SSE generators + (which block on ``queue.get()``) break out of their loop immediately + instead of waiting up to the keepalive timeout. + """ + for _wfId, q in list(self._queues.items()): + try: + q.put_nowait(None) + except Exception: + pass for wfId, task in list(self._cleanup_tasks.items()): if not task.done(): task.cancel() @@ -192,7 +202,7 @@ class EventManager: task.cancel() self._agent_tasks.clear() self._queues.clear() - logger.info("EventManager shutdown: all tasks cancelled") + logger.info("EventManager shutdown: all tasks cancelled, queues drained") # Global event manager instance