Merge branch 'int'
All checks were successful
Deploy Plattform-Core / test (push) Successful in 46s
Deploy Plattform-Core / deploy (push) Successful in 4s

This commit is contained in:
ValueOn AG 2026-06-04 23:57:18 +02:00
commit e006d85302
3 changed files with 59 additions and 13 deletions

26
app.py
View file

@ -435,8 +435,19 @@ async def lifespan(app: FastAPI):
# --- Shutdown sequence (protected against CancelledError) ---
try:
# 1. Signal DB layer to abort in-flight borrow waits immediately.
# This MUST happen first so that sync worker threads stuck in
# 1. Drain SSE queues and cancel agent tasks FIRST so that open
# streaming connections break out of their queue.get() loop
# immediately. Without this, uvicorn waits for the SSE generators
# to finish (up to 120 s keepalive timeout) before the rest of
# the shutdown can proceed.
try:
from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager as _getStreamingEM
_getStreamingEM().shutdown()
except Exception as e:
logger.warning(f"Streaming EventManager shutdown failed: {e}")
# 2. Signal DB layer to abort in-flight borrow waits immediately.
# This MUST happen early so that sync worker threads stuck in
# _acquireConn (30 s poll loop) bail out within one backoff tick
# instead of blocking process exit for the full borrow timeout.
try:
@ -445,10 +456,10 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.warning(f"Closing DB connection pools failed: {e}")
# 2. Stop scheduler (removes all pending cron/interval jobs)
# 3. Stop scheduler (removes all pending cron/interval jobs)
eventManager.stop()
# 3. Stop Feature Containers (Plug&Play)
# 4. Stop Feature Containers (Plug&Play)
try:
mainModules = loadFeatureMainModules()
for featureName, module in mainModules.items():
@ -461,13 +472,6 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.warning(f"Could not shutdown feature containers: {e}")
# 4. Cancel all pending streaming EventManager tasks (cleanup sleeps, agent tasks)
try:
from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager as _getStreamingEM
_getStreamingEM().shutdown()
except Exception as e:
logger.warning(f"Streaming EventManager shutdown failed: {e}")
# 5. Close shared HTTP sessions (ResilientHttp) to avoid TCP keepalive hang
try:
from modules.connectors._httpResilience import closeAllResilientHttp

View file

@ -1163,10 +1163,42 @@ async def getWorkspaceMessages(
attachedFdsIds = list(getattr(wf, "attachedFeatureDataSourceIds", None) or [])
except Exception as e:
logger.debug(f"getWorkspaceMessages: cannot read attachments for {workflowId}: {e}")
attachedDsLabels: Dict[str, str] = {}
attachedFdsLabels: Dict[str, str] = {}
if attachedDsIds or attachedFdsIds:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
if attachedDsIds:
from modules.datamodels.datamodelDataSource import DataSource as _DS
for dsId in attachedDsIds:
try:
records = rootIf.db.getRecordset(_DS, recordFilter={"id": dsId})
if records:
lbl = records[0].get("label") or records[0].get("path") or ""
if lbl:
attachedDsLabels[dsId] = str(lbl)
except Exception:
pass
if attachedFdsIds:
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource as _FDS
for fdsId in attachedFdsIds:
try:
records = rootIf.db.getRecordset(_FDS, recordFilter={"id": fdsId})
if records:
tbl = records[0].get("tableName") or ""
lbl = records[0].get("label") or tbl
if lbl:
attachedFdsLabels[fdsId] = str(lbl)
except Exception:
pass
return JSONResponse({
"messages": items,
"attachedDataSourceIds": attachedDsIds,
"attachedFeatureDataSourceIds": attachedFdsIds,
"attachedDataSourceLabels": attachedDsLabels,
"attachedFeatureDataSourceLabels": attachedFdsLabels,
})

View file

@ -182,7 +182,17 @@ class EventManager:
self._cleanup_tasks[workflow_id] = task
def shutdown(self) -> None:
"""Cancel all pending cleanup and agent tasks for fast process exit."""
"""Cancel all pending cleanup and agent tasks for fast process exit.
Injects ``None`` sentinels into every live queue so that SSE generators
(which block on ``queue.get()``) break out of their loop immediately
instead of waiting up to the keepalive timeout.
"""
for _wfId, q in list(self._queues.items()):
try:
q.put_nowait(None)
except Exception:
pass
for wfId, task in list(self._cleanup_tasks.items()):
if not task.done():
task.cancel()
@ -192,7 +202,7 @@ class EventManager:
task.cancel()
self._agent_tasks.clear()
self._queues.clear()
logger.info("EventManager shutdown: all tasks cancelled")
logger.info("EventManager shutdown: all tasks cancelled, queues drained")
# Global event manager instance