gateway/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py

250 lines
9 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Connection-lifecycle consumer bridging OAuth events to ingestion jobs.
Subscribes to `connection.established` and `connection.revoked` callbacks
emitted by the OAuth callbacks / connection management routes and dispatches:
- `connection.established` -> enqueue a `connection.bootstrap` BackgroundJob
that walks the connector and ingests all reachable items via
KnowledgeService.requestIngestion (file-like or virtual documents).
- `connection.revoked` -> run `KnowledgeService.purgeConnection` synchronously
so the knowledge corpus releases the data before the UI confirms the revoke.
The consumer is registered once at process boot (see `app.py` lifespan).
It intentionally does NOT hold a per-user service context; each callback
creates whatever context it needs from the UserConnection row itself.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Dict, Optional
from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
from modules.shared.callbackRegistry import callbackRegistry
from modules.serviceCenter.services.serviceBackgroundJobs import (
registerJobHandler,
startJob,
)
logger = logging.getLogger(__name__)
BOOTSTRAP_JOB_TYPE = "connection.bootstrap"
_registered = False
def _onConnectionEstablished(
*,
connectionId: str,
authority: str,
userId: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Fire-and-forget bootstrap enqueue for a freshly connected UserConnection."""
if not connectionId:
logger.warning("connection.established without connectionId; ignoring")
return
payload: Dict[str, Any] = {
"connectionId": connectionId,
"authority": (authority or "").lower(),
"userId": userId,
}
logger.info(
"ingestion.connection.bootstrap.queued connectionId=%s authority=%s",
connectionId, authority,
extra={
"event": "ingestion.connection.bootstrap.queued",
"connectionId": connectionId,
"authority": authority,
},
)
async def _enqueue() -> None:
try:
await startJob(
BOOTSTRAP_JOB_TYPE,
payload,
triggeredBy=userId,
)
except Exception as exc:
logger.error(
"ingestion.connection.bootstrap.enqueue_failed connectionId=%s error=%s",
connectionId, exc, exc_info=True,
)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(_enqueue())
else:
loop.run_until_complete(_enqueue())
except RuntimeError:
asyncio.run(_enqueue())
def _onConnectionRevoked(
*,
connectionId: str,
authority: Optional[str] = None,
userId: Optional[str] = None,
reason: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Run the knowledge purge synchronously so UI feedback is authoritative."""
if not connectionId:
logger.warning("connection.revoked without connectionId; ignoring")
return
try:
# Purge lives on the DB interface to avoid ServiceCenter/user-context
# plumbing here; the service method is a thin wrapper on top of this.
result = getKnowledgeInterface(None).deleteFileContentIndexByConnectionId(connectionId)
except Exception as exc:
logger.error(
"ingestion.connection.purged.failed connectionId=%s error=%s",
connectionId, exc, exc_info=True,
)
return
logger.info(
"ingestion.connection.purged connectionId=%s authority=%s reason=%s rows=%d chunks=%d",
connectionId, authority, reason,
result.get("indexRows", 0), result.get("chunks", 0),
extra={
"event": "ingestion.connection.purged",
"connectionId": connectionId,
"authority": authority,
"reason": reason,
"indexRows": result.get("indexRows", 0),
"chunks": result.get("chunks", 0),
},
)
async def _bootstrapJobHandler(
job: Dict[str, Any],
progressCb,
) -> Dict[str, Any]:
"""Dispatch bootstrap by authority. Each authority runs its own sub-bootstraps."""
payload = job.get("payload") or {}
connectionId = payload.get("connectionId")
authority = (payload.get("authority") or "").lower()
if not connectionId:
raise ValueError("connection.bootstrap requires payload.connectionId")
progressCb(5, f"resolving {authority} connection")
# Defensive consent check: if the connection has since disabled knowledge ingestion
# (e.g. user toggled setting after the job was enqueued), skip all walkers.
try:
from modules.interfaces.interfaceDbApp import getRootInterface
_root = getRootInterface()
_conn = _root.getUserConnectionById(connectionId)
if _conn and not getattr(_conn, "knowledgeIngestionEnabled", True):
logger.info(
"ingestion.connection.bootstrap.skipped — consent disabled connectionId=%s",
connectionId,
extra={
"event": "ingestion.connection.bootstrap.skipped",
"connectionId": connectionId,
"authority": authority,
"reason": "consent_disabled",
},
)
return {"connectionId": connectionId, "authority": authority, "skipped": True, "reason": "consent_disabled"}
except Exception as _guardErr:
logger.debug("Could not load connection for consent guard: %s", _guardErr)
def _normalize(res: Any, label: str) -> Dict[str, Any]:
if isinstance(res, Exception):
logger.error(
"ingestion.connection.bootstrap.failed part=%s connectionId=%s error=%s",
label, connectionId, res, exc_info=res,
)
return {"error": str(res)}
return res or {}
if authority == "msft":
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import (
bootstrapSharepoint,
)
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook import (
bootstrapOutlook,
)
progressCb(10, "sharepoint + outlook")
spResult, olResult = await asyncio.gather(
bootstrapSharepoint(connectionId=connectionId, progressCb=progressCb),
bootstrapOutlook(connectionId=connectionId, progressCb=progressCb),
return_exceptions=True,
)
return {
"connectionId": connectionId,
"authority": authority,
"sharepoint": _normalize(spResult, "sharepoint"),
"outlook": _normalize(olResult, "outlook"),
}
if authority == "google":
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGdrive import (
bootstrapGdrive,
)
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail import (
bootstrapGmail,
)
progressCb(10, "drive + gmail")
gdResult, gmResult = await asyncio.gather(
bootstrapGdrive(connectionId=connectionId, progressCb=progressCb),
bootstrapGmail(connectionId=connectionId, progressCb=progressCb),
return_exceptions=True,
)
return {
"connectionId": connectionId,
"authority": authority,
"drive": _normalize(gdResult, "gdrive"),
"gmail": _normalize(gmResult, "gmail"),
}
if authority == "clickup":
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup import (
bootstrapClickup,
)
progressCb(10, "clickup tasks")
cuResult = await bootstrapClickup(connectionId=connectionId, progressCb=progressCb)
return {
"connectionId": connectionId,
"authority": authority,
"clickup": _normalize(cuResult, "clickup"),
}
logger.info(
"ingestion.connection.bootstrap.skipped reason=unsupported_authority authority=%s connectionId=%s",
authority, connectionId,
extra={
"event": "ingestion.connection.bootstrap.skipped",
"authority": authority,
"connectionId": connectionId,
"reason": "unsupported_authority",
},
)
return {
"connectionId": connectionId,
"authority": authority,
"skipped": True,
"reason": "unsupported_authority",
}
def registerKnowledgeIngestionConsumer() -> None:
"""Register callback subscribers + background job handler. Idempotent."""
global _registered
if _registered:
return
callbackRegistry.register("connection.established", _onConnectionEstablished)
callbackRegistry.register("connection.revoked", _onConnectionRevoked)
registerJobHandler(BOOTSTRAP_JOB_TYPE, _bootstrapJobHandler)
_registered = True
logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler)", BOOTSTRAP_JOB_TYPE)