229 lines
8 KiB
Python
229 lines
8 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""Connection-lifecycle consumer bridging OAuth events to ingestion jobs.
|
|
|
|
Subscribes to `connection.established` and `connection.revoked` callbacks
|
|
emitted by the OAuth callbacks / connection management routes and dispatches:
|
|
|
|
- `connection.established` -> enqueue a `connection.bootstrap` BackgroundJob
|
|
that walks the connector and ingests all reachable items via
|
|
KnowledgeService.requestIngestion (file-like or virtual documents).
|
|
- `connection.revoked` -> run `KnowledgeService.purgeConnection` synchronously
|
|
so the knowledge corpus releases the data before the UI confirms the revoke.
|
|
|
|
The consumer is registered once at process boot (see `app.py` lifespan).
|
|
It intentionally does NOT hold a per-user service context; each callback
|
|
creates whatever context it needs from the UserConnection row itself.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Dict, Optional
|
|
|
|
from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
|
|
from modules.shared.callbackRegistry import callbackRegistry
|
|
from modules.serviceCenter.services.serviceBackgroundJobs import (
|
|
registerJobHandler,
|
|
startJob,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BOOTSTRAP_JOB_TYPE = "connection.bootstrap"
|
|
|
|
_registered = False
|
|
|
|
|
|
def _onConnectionEstablished(
|
|
*,
|
|
connectionId: str,
|
|
authority: str,
|
|
userId: Optional[str] = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Fire-and-forget bootstrap enqueue for a freshly connected UserConnection."""
|
|
if not connectionId:
|
|
logger.warning("connection.established without connectionId; ignoring")
|
|
return
|
|
payload: Dict[str, Any] = {
|
|
"connectionId": connectionId,
|
|
"authority": (authority or "").lower(),
|
|
"userId": userId,
|
|
}
|
|
logger.info(
|
|
"ingestion.connection.bootstrap.queued connectionId=%s authority=%s",
|
|
connectionId, authority,
|
|
extra={
|
|
"event": "ingestion.connection.bootstrap.queued",
|
|
"connectionId": connectionId,
|
|
"authority": authority,
|
|
},
|
|
)
|
|
|
|
async def _enqueue() -> None:
|
|
try:
|
|
await startJob(
|
|
BOOTSTRAP_JOB_TYPE,
|
|
payload,
|
|
triggeredBy=userId,
|
|
)
|
|
except Exception as exc:
|
|
logger.error(
|
|
"ingestion.connection.bootstrap.enqueue_failed connectionId=%s error=%s",
|
|
connectionId, exc, exc_info=True,
|
|
)
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
loop.create_task(_enqueue())
|
|
else:
|
|
loop.run_until_complete(_enqueue())
|
|
except RuntimeError:
|
|
asyncio.run(_enqueue())
|
|
|
|
|
|
def _onConnectionRevoked(
|
|
*,
|
|
connectionId: str,
|
|
authority: Optional[str] = None,
|
|
userId: Optional[str] = None,
|
|
reason: Optional[str] = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Run the knowledge purge synchronously so UI feedback is authoritative."""
|
|
if not connectionId:
|
|
logger.warning("connection.revoked without connectionId; ignoring")
|
|
return
|
|
try:
|
|
# Purge lives on the DB interface to avoid ServiceCenter/user-context
|
|
# plumbing here; the service method is a thin wrapper on top of this.
|
|
result = getKnowledgeInterface(None).deleteFileContentIndexByConnectionId(connectionId)
|
|
except Exception as exc:
|
|
logger.error(
|
|
"ingestion.connection.purged.failed connectionId=%s error=%s",
|
|
connectionId, exc, exc_info=True,
|
|
)
|
|
return
|
|
logger.info(
|
|
"ingestion.connection.purged connectionId=%s authority=%s reason=%s rows=%d chunks=%d",
|
|
connectionId, authority, reason,
|
|
result.get("indexRows", 0), result.get("chunks", 0),
|
|
extra={
|
|
"event": "ingestion.connection.purged",
|
|
"connectionId": connectionId,
|
|
"authority": authority,
|
|
"reason": reason,
|
|
"indexRows": result.get("indexRows", 0),
|
|
"chunks": result.get("chunks", 0),
|
|
},
|
|
)
|
|
|
|
|
|
async def _bootstrapJobHandler(
|
|
job: Dict[str, Any],
|
|
progressCb,
|
|
) -> Dict[str, Any]:
|
|
"""Dispatch bootstrap by authority. Each authority runs its own sub-bootstraps."""
|
|
payload = job.get("payload") or {}
|
|
connectionId = payload.get("connectionId")
|
|
authority = (payload.get("authority") or "").lower()
|
|
if not connectionId:
|
|
raise ValueError("connection.bootstrap requires payload.connectionId")
|
|
|
|
progressCb(5, f"resolving {authority} connection")
|
|
|
|
def _normalize(res: Any, label: str) -> Dict[str, Any]:
|
|
if isinstance(res, Exception):
|
|
logger.error(
|
|
"ingestion.connection.bootstrap.failed part=%s connectionId=%s error=%s",
|
|
label, connectionId, res, exc_info=res,
|
|
)
|
|
return {"error": str(res)}
|
|
return res or {}
|
|
|
|
if authority == "msft":
|
|
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import (
|
|
bootstrapSharepoint,
|
|
)
|
|
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook import (
|
|
bootstrapOutlook,
|
|
)
|
|
|
|
progressCb(10, "sharepoint + outlook")
|
|
spResult, olResult = await asyncio.gather(
|
|
bootstrapSharepoint(connectionId=connectionId, progressCb=progressCb),
|
|
bootstrapOutlook(connectionId=connectionId, progressCb=progressCb),
|
|
return_exceptions=True,
|
|
)
|
|
return {
|
|
"connectionId": connectionId,
|
|
"authority": authority,
|
|
"sharepoint": _normalize(spResult, "sharepoint"),
|
|
"outlook": _normalize(olResult, "outlook"),
|
|
}
|
|
|
|
if authority == "google":
|
|
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGdrive import (
|
|
bootstrapGdrive,
|
|
)
|
|
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail import (
|
|
bootstrapGmail,
|
|
)
|
|
|
|
progressCb(10, "drive + gmail")
|
|
gdResult, gmResult = await asyncio.gather(
|
|
bootstrapGdrive(connectionId=connectionId, progressCb=progressCb),
|
|
bootstrapGmail(connectionId=connectionId, progressCb=progressCb),
|
|
return_exceptions=True,
|
|
)
|
|
return {
|
|
"connectionId": connectionId,
|
|
"authority": authority,
|
|
"drive": _normalize(gdResult, "gdrive"),
|
|
"gmail": _normalize(gmResult, "gmail"),
|
|
}
|
|
|
|
if authority == "clickup":
|
|
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup import (
|
|
bootstrapClickup,
|
|
)
|
|
|
|
progressCb(10, "clickup tasks")
|
|
cuResult = await bootstrapClickup(connectionId=connectionId, progressCb=progressCb)
|
|
return {
|
|
"connectionId": connectionId,
|
|
"authority": authority,
|
|
"clickup": _normalize(cuResult, "clickup"),
|
|
}
|
|
|
|
logger.info(
|
|
"ingestion.connection.bootstrap.skipped reason=unsupported_authority authority=%s connectionId=%s",
|
|
authority, connectionId,
|
|
extra={
|
|
"event": "ingestion.connection.bootstrap.skipped",
|
|
"authority": authority,
|
|
"connectionId": connectionId,
|
|
"reason": "unsupported_authority",
|
|
},
|
|
)
|
|
return {
|
|
"connectionId": connectionId,
|
|
"authority": authority,
|
|
"skipped": True,
|
|
"reason": "unsupported_authority",
|
|
}
|
|
|
|
|
|
def registerKnowledgeIngestionConsumer() -> None:
|
|
"""Register callback subscribers + background job handler. Idempotent."""
|
|
global _registered
|
|
if _registered:
|
|
return
|
|
callbackRegistry.register("connection.established", _onConnectionEstablished)
|
|
callbackRegistry.register("connection.revoked", _onConnectionRevoked)
|
|
registerJobHandler(BOOTSTRAP_JOB_TYPE, _bootstrapJobHandler)
|
|
_registered = True
|
|
logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler)", BOOTSTRAP_JOB_TYPE)
|