fix: resolve datasource labels on reload, faster shutdown
All checks were successful
Deploy Plattform-Core (Int) / test (push) Successful in 1m3s
Deploy Plattform-Core (Int) / deploy (push) Successful in 9s

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
ValueOn AG 2026-06-04 23:56:58 +02:00
parent 76753f6037
commit 74f6f35ad4
3 changed files with 59 additions and 13 deletions

26
app.py
View file

@ -435,8 +435,19 @@ async def lifespan(app: FastAPI):
# --- Shutdown sequence (protected against CancelledError) --- # --- Shutdown sequence (protected against CancelledError) ---
try: try:
# 1. Signal DB layer to abort in-flight borrow waits immediately. # 1. Drain SSE queues and cancel agent tasks FIRST so that open
# This MUST happen first so that sync worker threads stuck in # streaming connections break out of their queue.get() loop
# immediately. Without this, uvicorn waits for the SSE generators
# to finish (up to 120 s keepalive timeout) before the rest of
# the shutdown can proceed.
try:
from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager as _getStreamingEM
_getStreamingEM().shutdown()
except Exception as e:
logger.warning(f"Streaming EventManager shutdown failed: {e}")
# 2. Signal DB layer to abort in-flight borrow waits immediately.
# This MUST happen early so that sync worker threads stuck in
# _acquireConn (30 s poll loop) bail out within one backoff tick # _acquireConn (30 s poll loop) bail out within one backoff tick
# instead of blocking process exit for the full borrow timeout. # instead of blocking process exit for the full borrow timeout.
try: try:
@ -445,10 +456,10 @@ async def lifespan(app: FastAPI):
except Exception as e: except Exception as e:
logger.warning(f"Closing DB connection pools failed: {e}") logger.warning(f"Closing DB connection pools failed: {e}")
# 2. Stop scheduler (removes all pending cron/interval jobs) # 3. Stop scheduler (removes all pending cron/interval jobs)
eventManager.stop() eventManager.stop()
# 3. Stop Feature Containers (Plug&Play) # 4. Stop Feature Containers (Plug&Play)
try: try:
mainModules = loadFeatureMainModules() mainModules = loadFeatureMainModules()
for featureName, module in mainModules.items(): for featureName, module in mainModules.items():
@ -461,13 +472,6 @@ async def lifespan(app: FastAPI):
except Exception as e: except Exception as e:
logger.warning(f"Could not shutdown feature containers: {e}") logger.warning(f"Could not shutdown feature containers: {e}")
# 4. Cancel all pending streaming EventManager tasks (cleanup sleeps, agent tasks)
try:
from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager as _getStreamingEM
_getStreamingEM().shutdown()
except Exception as e:
logger.warning(f"Streaming EventManager shutdown failed: {e}")
# 5. Close shared HTTP sessions (ResilientHttp) to avoid TCP keepalive hang # 5. Close shared HTTP sessions (ResilientHttp) to avoid TCP keepalive hang
try: try:
from modules.connectors._httpResilience import closeAllResilientHttp from modules.connectors._httpResilience import closeAllResilientHttp

View file

@ -1163,10 +1163,42 @@ async def getWorkspaceMessages(
attachedFdsIds = list(getattr(wf, "attachedFeatureDataSourceIds", None) or []) attachedFdsIds = list(getattr(wf, "attachedFeatureDataSourceIds", None) or [])
except Exception as e: except Exception as e:
logger.debug(f"getWorkspaceMessages: cannot read attachments for {workflowId}: {e}") logger.debug(f"getWorkspaceMessages: cannot read attachments for {workflowId}: {e}")
attachedDsLabels: Dict[str, str] = {}
attachedFdsLabels: Dict[str, str] = {}
if attachedDsIds or attachedFdsIds:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
if attachedDsIds:
from modules.datamodels.datamodelDataSource import DataSource as _DS
for dsId in attachedDsIds:
try:
records = rootIf.db.getRecordset(_DS, recordFilter={"id": dsId})
if records:
lbl = records[0].get("label") or records[0].get("path") or ""
if lbl:
attachedDsLabels[dsId] = str(lbl)
except Exception:
pass
if attachedFdsIds:
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource as _FDS
for fdsId in attachedFdsIds:
try:
records = rootIf.db.getRecordset(_FDS, recordFilter={"id": fdsId})
if records:
tbl = records[0].get("tableName") or ""
lbl = records[0].get("label") or tbl
if lbl:
attachedFdsLabels[fdsId] = str(lbl)
except Exception:
pass
return JSONResponse({ return JSONResponse({
"messages": items, "messages": items,
"attachedDataSourceIds": attachedDsIds, "attachedDataSourceIds": attachedDsIds,
"attachedFeatureDataSourceIds": attachedFdsIds, "attachedFeatureDataSourceIds": attachedFdsIds,
"attachedDataSourceLabels": attachedDsLabels,
"attachedFeatureDataSourceLabels": attachedFdsLabels,
}) })

View file

@ -182,7 +182,17 @@ class EventManager:
self._cleanup_tasks[workflow_id] = task self._cleanup_tasks[workflow_id] = task
def shutdown(self) -> None: def shutdown(self) -> None:
"""Cancel all pending cleanup and agent tasks for fast process exit.""" """Cancel all pending cleanup and agent tasks for fast process exit.
Injects ``None`` sentinels into every live queue so that SSE generators
(which block on ``queue.get()``) break out of their loop immediately
instead of waiting up to the keepalive timeout.
"""
for _wfId, q in list(self._queues.items()):
try:
q.put_nowait(None)
except Exception:
pass
for wfId, task in list(self._cleanup_tasks.items()): for wfId, task in list(self._cleanup_tasks.items()):
if not task.done(): if not task.done():
task.cancel() task.cancel()
@ -192,7 +202,7 @@ class EventManager:
task.cancel() task.cancel()
self._agent_tasks.clear() self._agent_tasks.clear()
self._queues.clear() self._queues.clear()
logger.info("EventManager shutdown: all tasks cancelled") logger.info("EventManager shutdown: all tasks cancelled, queues drained")
# Global event manager instance # Global event manager instance