From ce671f61b6c4862b4709a3f40a373658eac114a1 Mon Sep 17 00:00:00 2001 From: Ida Date: Wed, 29 Apr 2026 14:26:32 +0200 Subject: [PATCH] feat: app-scheduler ausgebaut um nachts bestehende connections zu indexieren --- modules/interfaces/interfaceDbApp.py | 22 +++++ .../subConnectorIngestConsumer.py | 86 ++++++++++++++++++- 2 files changed, 107 insertions(+), 1 deletion(-) diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py index 04ae82ff..e7c7e6be 100644 --- a/modules/interfaces/interfaceDbApp.py +++ b/modules/interfaces/interfaceDbApp.py @@ -1281,6 +1281,28 @@ class AppObjects: logger.error(f"Error getting user connections: {str(e)}") return [] + def getActiveKnowledgeConnections(self) -> List[UserConnection]: + """Return all UserConnections with knowledgeIngestionEnabled=True and status=active. + + Used by the daily re-sync scheduler to determine which connections to re-index. + """ + try: + rows = self.db.getRecordset( + UserConnection, + recordFilter={"knowledgeIngestionEnabled": True, "status": ConnectionStatus.ACTIVE.value}, + ) + result = [] + for row in rows or []: + try: + conn = UserConnection.model_validate(row) if isinstance(row, dict) else row + result.append(conn) + except Exception as _e: + logger.warning(f"getActiveKnowledgeConnections: could not parse row: {_e}") + return result + except Exception as e: + logger.error(f"getActiveKnowledgeConnections failed: {e}") + return [] + def getUserConnectionById(self, connectionId: str) -> Optional[UserConnection]: """Get a single UserConnection by ID or by reference string (connection:authority:username).""" try: diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py index e27e2d29..97ac61d5 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py @@ -238,6 +238,89 @@ async def _bootstrapJobHandler( } +async def _scheduledDailyResync() -> None: + """Enqueue a connection.bootstrap job for every active knowledge connection. + + Runs once per day (default 2 AM Europe/Zurich). Each job re-walks the + connector and hands new / changed items to KnowledgeService.requestIngestion. + Unchanged items are deduplicated by content-hash and skipped automatically. + """ + try: + from modules.interfaces.interfaceDbApp import getRootInterface + rootInterface = getRootInterface() + connections = rootInterface.getActiveKnowledgeConnections() + except Exception as exc: + logger.error("knowledge.daily_resync: could not load connections: %s", exc, exc_info=True) + return + + if not connections: + logger.info("knowledge.daily_resync: no active knowledge connections — nothing to do") + return + + logger.info( + "knowledge.daily_resync: enqueuing bootstrap for %d connection(s)", + len(connections), + extra={"event": "knowledge.daily_resync.started", "count": len(connections)}, + ) + + enqueued = 0 + skipped = 0 + for conn in connections: + connectionId = str(conn.id) + authority = conn.authority.value if hasattr(conn.authority, "value") else str(conn.authority) + userId = str(conn.userId) + payload: Dict[str, Any] = { + "connectionId": connectionId, + "authority": authority.lower(), + "userId": userId, + } + try: + await startJob( + BOOTSTRAP_JOB_TYPE, + payload, + triggeredBy="scheduler.daily_resync", + ) + enqueued += 1 + logger.debug( + "knowledge.daily_resync: queued connectionId=%s authority=%s", + connectionId, authority, + ) + except Exception as exc: + skipped += 1 + logger.error( + "knowledge.daily_resync: failed to enqueue connectionId=%s: %s", + connectionId, exc, + ) + + logger.info( + "knowledge.daily_resync: done — enqueued=%d skipped=%d", + enqueued, skipped, + extra={"event": "knowledge.daily_resync.done", "enqueued": enqueued, "skipped": skipped}, + ) + + +def registerDailyResyncScheduler(*, hour: int = 2, minute: int = 0) -> None: + """Register the daily knowledge re-sync cron job. Idempotent. + + Args: + hour: Hour of day to run (0–23, default 2 → 2 AM Europe/Zurich). + minute: Minute within the hour (default 0). + """ + try: + from modules.shared.eventManagement import eventManager + eventManager.registerCron( + jobId="knowledge.daily_resync", + func=_scheduledDailyResync, + cronKwargs={"hour": str(hour), "minute": str(minute)}, + ) + logger.info( + "knowledge.daily_resync scheduler registered (daily %02d:%02d Europe/Zurich)", + hour, minute, + ) + except Exception as exc: + logger.warning("knowledge.daily_resync scheduler registration failed (non-critical): %s", exc) + + def registerKnowledgeIngestionConsumer() -> None: """Register callback subscribers + background job handler. Idempotent.""" global _registered @@ -246,5 +329,6 @@ def registerKnowledgeIngestionConsumer() -> None: callbackRegistry.register("connection.established", _onConnectionEstablished) callbackRegistry.register("connection.revoked", _onConnectionRevoked) registerJobHandler(BOOTSTRAP_JOB_TYPE, _bootstrapJobHandler) + registerDailyResyncScheduler() _registered = True - logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler)", BOOTSTRAP_JOB_TYPE) + logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler + daily resync)", BOOTSTRAP_JOB_TYPE)