# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Connection-lifecycle consumer bridging OAuth events to ingestion jobs. Subscribes to `connection.established` and `connection.revoked` callbacks emitted by the OAuth callbacks / connection management routes and dispatches: - `connection.established` -> enqueue a `connection.bootstrap` BackgroundJob that walks the connector and ingests all reachable items via KnowledgeService.requestIngestion (file-like or virtual documents). - `connection.revoked` -> run `KnowledgeService.purgeConnection` synchronously so the knowledge corpus releases the data before the UI confirms the revoke. The consumer is registered once at process boot (see `app.py` lifespan). It intentionally does NOT hold a per-user service context; each callback creates whatever context it needs from the UserConnection row itself. """ from __future__ import annotations import asyncio import logging from typing import Any, Dict, Optional from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface from modules.shared.callbackRegistry import callbackRegistry from modules.serviceCenter.services.serviceBackgroundJobs import ( registerJobHandler, startJob, ) logger = logging.getLogger(__name__) BOOTSTRAP_JOB_TYPE = "connection.bootstrap" _registered = False def _onConnectionEstablished( *, connectionId: str, authority: str, userId: Optional[str] = None, **kwargs: Any, ) -> None: """Fire-and-forget bootstrap enqueue for a freshly connected UserConnection.""" if not connectionId: logger.warning("connection.established without connectionId; ignoring") return payload: Dict[str, Any] = { "connectionId": connectionId, "authority": (authority or "").lower(), "userId": userId, } logger.info( "ingestion.connection.bootstrap.queued connectionId=%s authority=%s", connectionId, authority, extra={ "event": "ingestion.connection.bootstrap.queued", "connectionId": connectionId, "authority": authority, }, ) async def _enqueue() -> None: try: await startJob( BOOTSTRAP_JOB_TYPE, payload, triggeredBy=userId, ) except Exception as exc: logger.error( "ingestion.connection.bootstrap.enqueue_failed connectionId=%s error=%s", connectionId, exc, exc_info=True, ) try: loop = asyncio.get_event_loop() if loop.is_running(): loop.create_task(_enqueue()) else: loop.run_until_complete(_enqueue()) except RuntimeError: asyncio.run(_enqueue()) def _onConnectionRevoked( *, connectionId: str, authority: Optional[str] = None, userId: Optional[str] = None, reason: Optional[str] = None, **kwargs: Any, ) -> None: """Run the knowledge purge synchronously so UI feedback is authoritative.""" if not connectionId: logger.warning("connection.revoked without connectionId; ignoring") return try: # Purge lives on the DB interface to avoid ServiceCenter/user-context # plumbing here; the service method is a thin wrapper on top of this. result = getKnowledgeInterface(None).deleteFileContentIndexByConnectionId(connectionId) except Exception as exc: logger.error( "ingestion.connection.purged.failed connectionId=%s error=%s", connectionId, exc, exc_info=True, ) return logger.info( "ingestion.connection.purged connectionId=%s authority=%s reason=%s rows=%d chunks=%d", connectionId, authority, reason, result.get("indexRows", 0), result.get("chunks", 0), extra={ "event": "ingestion.connection.purged", "connectionId": connectionId, "authority": authority, "reason": reason, "indexRows": result.get("indexRows", 0), "chunks": result.get("chunks", 0), }, ) async def _bootstrapJobHandler( job: Dict[str, Any], progressCb, ) -> Dict[str, Any]: """Dispatch bootstrap by authority. Each authority runs its own sub-bootstraps.""" payload = job.get("payload") or {} connectionId = payload.get("connectionId") authority = (payload.get("authority") or "").lower() if not connectionId: raise ValueError("connection.bootstrap requires payload.connectionId") progressCb(5, f"resolving {authority} connection") if authority == "msft": from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import ( bootstrapSharepoint, ) from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook import ( bootstrapOutlook, ) progressCb(10, "sharepoint + outlook") spResult, olResult = await asyncio.gather( bootstrapSharepoint(connectionId=connectionId, progressCb=progressCb), bootstrapOutlook(connectionId=connectionId, progressCb=progressCb), return_exceptions=True, ) def _normalize(res: Any, label: str) -> Dict[str, Any]: if isinstance(res, Exception): logger.error( "ingestion.connection.bootstrap.failed part=%s connectionId=%s error=%s", label, connectionId, res, exc_info=res, ) return {"error": str(res)} return res or {} return { "connectionId": connectionId, "authority": authority, "sharepoint": _normalize(spResult, "sharepoint"), "outlook": _normalize(olResult, "outlook"), } logger.info( "ingestion.connection.bootstrap.skipped reason=P1_pilot_scope authority=%s connectionId=%s", authority, connectionId, extra={ "event": "ingestion.connection.bootstrap.skipped", "authority": authority, "connectionId": connectionId, "reason": "P1_pilot_scope", }, ) return { "connectionId": connectionId, "authority": authority, "skipped": True, "reason": "P1_pilot_scope", } def registerKnowledgeIngestionConsumer() -> None: """Register callback subscribers + background job handler. Idempotent.""" global _registered if _registered: return callbackRegistry.register("connection.established", _onConnectionEstablished) callbackRegistry.register("connection.revoked", _onConnectionRevoked) registerJobHandler(BOOTSTRAP_JOB_TYPE, _bootstrapJobHandler) _registered = True logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler)", BOOTSTRAP_JOB_TYPE)