334 lines
12 KiB
Python
334 lines
12 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
||
# All rights reserved.
|
||
"""Connection-lifecycle consumer bridging OAuth events to ingestion jobs.
|
||
|
||
Subscribes to `connection.established` and `connection.revoked` callbacks
|
||
emitted by the OAuth callbacks / connection management routes and dispatches:
|
||
|
||
- `connection.established` -> enqueue a `connection.bootstrap` BackgroundJob
|
||
that walks the connector and ingests all reachable items via
|
||
KnowledgeService.requestIngestion (file-like or virtual documents).
|
||
- `connection.revoked` -> run `KnowledgeService.purgeConnection` synchronously
|
||
so the knowledge corpus releases the data before the UI confirms the revoke.
|
||
|
||
The consumer is registered once at process boot (see `app.py` lifespan).
|
||
It intentionally does NOT hold a per-user service context; each callback
|
||
creates whatever context it needs from the UserConnection row itself.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
from typing import Any, Dict, Optional
|
||
|
||
from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
|
||
from modules.shared.callbackRegistry import callbackRegistry
|
||
from modules.serviceCenter.services.serviceBackgroundJobs import (
|
||
registerJobHandler,
|
||
startJob,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
BOOTSTRAP_JOB_TYPE = "connection.bootstrap"
|
||
|
||
_registered = False
|
||
|
||
|
||
def _onConnectionEstablished(
|
||
*,
|
||
connectionId: str,
|
||
authority: str,
|
||
userId: Optional[str] = None,
|
||
**kwargs: Any,
|
||
) -> None:
|
||
"""Fire-and-forget bootstrap enqueue for a freshly connected UserConnection."""
|
||
if not connectionId:
|
||
logger.warning("connection.established without connectionId; ignoring")
|
||
return
|
||
payload: Dict[str, Any] = {
|
||
"connectionId": connectionId,
|
||
"authority": (authority or "").lower(),
|
||
"userId": userId,
|
||
}
|
||
logger.info(
|
||
"ingestion.connection.bootstrap.queued connectionId=%s authority=%s",
|
||
connectionId, authority,
|
||
extra={
|
||
"event": "ingestion.connection.bootstrap.queued",
|
||
"connectionId": connectionId,
|
||
"authority": authority,
|
||
},
|
||
)
|
||
|
||
async def _enqueue() -> None:
|
||
try:
|
||
await startJob(
|
||
BOOTSTRAP_JOB_TYPE,
|
||
payload,
|
||
triggeredBy=userId,
|
||
)
|
||
except Exception as exc:
|
||
logger.error(
|
||
"ingestion.connection.bootstrap.enqueue_failed connectionId=%s error=%s",
|
||
connectionId, exc, exc_info=True,
|
||
)
|
||
|
||
try:
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_running():
|
||
loop.create_task(_enqueue())
|
||
else:
|
||
loop.run_until_complete(_enqueue())
|
||
except RuntimeError:
|
||
asyncio.run(_enqueue())
|
||
|
||
|
||
def _onConnectionRevoked(
|
||
*,
|
||
connectionId: str,
|
||
authority: Optional[str] = None,
|
||
userId: Optional[str] = None,
|
||
reason: Optional[str] = None,
|
||
**kwargs: Any,
|
||
) -> None:
|
||
"""Run the knowledge purge synchronously so UI feedback is authoritative."""
|
||
if not connectionId:
|
||
logger.warning("connection.revoked without connectionId; ignoring")
|
||
return
|
||
try:
|
||
# Purge lives on the DB interface to avoid ServiceCenter/user-context
|
||
# plumbing here; the service method is a thin wrapper on top of this.
|
||
result = getKnowledgeInterface(None).deleteFileContentIndexByConnectionId(connectionId)
|
||
except Exception as exc:
|
||
logger.error(
|
||
"ingestion.connection.purged.failed connectionId=%s error=%s",
|
||
connectionId, exc, exc_info=True,
|
||
)
|
||
return
|
||
logger.info(
|
||
"ingestion.connection.purged connectionId=%s authority=%s reason=%s rows=%d chunks=%d",
|
||
connectionId, authority, reason,
|
||
result.get("indexRows", 0), result.get("chunks", 0),
|
||
extra={
|
||
"event": "ingestion.connection.purged",
|
||
"connectionId": connectionId,
|
||
"authority": authority,
|
||
"reason": reason,
|
||
"indexRows": result.get("indexRows", 0),
|
||
"chunks": result.get("chunks", 0),
|
||
},
|
||
)
|
||
|
||
|
||
async def _bootstrapJobHandler(
|
||
job: Dict[str, Any],
|
||
progressCb,
|
||
) -> Dict[str, Any]:
|
||
"""Dispatch bootstrap by authority. Each authority runs its own sub-bootstraps."""
|
||
payload = job.get("payload") or {}
|
||
connectionId = payload.get("connectionId")
|
||
authority = (payload.get("authority") or "").lower()
|
||
if not connectionId:
|
||
raise ValueError("connection.bootstrap requires payload.connectionId")
|
||
|
||
progressCb(5, f"resolving {authority} connection")
|
||
|
||
# Defensive consent check: if the connection has since disabled knowledge ingestion
|
||
# (e.g. user toggled setting after the job was enqueued), skip all walkers.
|
||
try:
|
||
from modules.interfaces.interfaceDbApp import getRootInterface
|
||
_root = getRootInterface()
|
||
_conn = _root.getUserConnectionById(connectionId)
|
||
if _conn and not getattr(_conn, "knowledgeIngestionEnabled", True):
|
||
logger.info(
|
||
"ingestion.connection.bootstrap.skipped — consent disabled connectionId=%s",
|
||
connectionId,
|
||
extra={
|
||
"event": "ingestion.connection.bootstrap.skipped",
|
||
"connectionId": connectionId,
|
||
"authority": authority,
|
||
"reason": "consent_disabled",
|
||
},
|
||
)
|
||
return {"connectionId": connectionId, "authority": authority, "skipped": True, "reason": "consent_disabled"}
|
||
except Exception as _guardErr:
|
||
logger.debug("Could not load connection for consent guard: %s", _guardErr)
|
||
|
||
def _normalize(res: Any, label: str) -> Dict[str, Any]:
|
||
if isinstance(res, Exception):
|
||
logger.error(
|
||
"ingestion.connection.bootstrap.failed part=%s connectionId=%s error=%s",
|
||
label, connectionId, res, exc_info=res,
|
||
)
|
||
return {"error": str(res)}
|
||
return res or {}
|
||
|
||
if authority == "msft":
|
||
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import (
|
||
bootstrapSharepoint,
|
||
)
|
||
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook import (
|
||
bootstrapOutlook,
|
||
)
|
||
|
||
progressCb(10, "sharepoint + outlook")
|
||
spResult, olResult = await asyncio.gather(
|
||
bootstrapSharepoint(connectionId=connectionId, progressCb=progressCb),
|
||
bootstrapOutlook(connectionId=connectionId, progressCb=progressCb),
|
||
return_exceptions=True,
|
||
)
|
||
return {
|
||
"connectionId": connectionId,
|
||
"authority": authority,
|
||
"sharepoint": _normalize(spResult, "sharepoint"),
|
||
"outlook": _normalize(olResult, "outlook"),
|
||
}
|
||
|
||
if authority == "google":
|
||
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGdrive import (
|
||
bootstrapGdrive,
|
||
)
|
||
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail import (
|
||
bootstrapGmail,
|
||
)
|
||
|
||
progressCb(10, "drive + gmail")
|
||
gdResult, gmResult = await asyncio.gather(
|
||
bootstrapGdrive(connectionId=connectionId, progressCb=progressCb),
|
||
bootstrapGmail(connectionId=connectionId, progressCb=progressCb),
|
||
return_exceptions=True,
|
||
)
|
||
return {
|
||
"connectionId": connectionId,
|
||
"authority": authority,
|
||
"drive": _normalize(gdResult, "gdrive"),
|
||
"gmail": _normalize(gmResult, "gmail"),
|
||
}
|
||
|
||
if authority == "clickup":
|
||
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup import (
|
||
bootstrapClickup,
|
||
)
|
||
|
||
progressCb(10, "clickup tasks")
|
||
cuResult = await bootstrapClickup(connectionId=connectionId, progressCb=progressCb)
|
||
return {
|
||
"connectionId": connectionId,
|
||
"authority": authority,
|
||
"clickup": _normalize(cuResult, "clickup"),
|
||
}
|
||
|
||
logger.info(
|
||
"ingestion.connection.bootstrap.skipped reason=unsupported_authority authority=%s connectionId=%s",
|
||
authority, connectionId,
|
||
extra={
|
||
"event": "ingestion.connection.bootstrap.skipped",
|
||
"authority": authority,
|
||
"connectionId": connectionId,
|
||
"reason": "unsupported_authority",
|
||
},
|
||
)
|
||
return {
|
||
"connectionId": connectionId,
|
||
"authority": authority,
|
||
"skipped": True,
|
||
"reason": "unsupported_authority",
|
||
}
|
||
|
||
|
||
async def _scheduledDailyResync() -> None:
|
||
"""Enqueue a connection.bootstrap job for every active knowledge connection.
|
||
|
||
Runs once per day (default 2 AM Europe/Zurich). Each job re-walks the
|
||
connector and hands new / changed items to KnowledgeService.requestIngestion.
|
||
Unchanged items are deduplicated by content-hash and skipped automatically.
|
||
"""
|
||
try:
|
||
from modules.interfaces.interfaceDbApp import getRootInterface
|
||
rootInterface = getRootInterface()
|
||
connections = rootInterface.getActiveKnowledgeConnections()
|
||
except Exception as exc:
|
||
logger.error("knowledge.daily_resync: could not load connections: %s", exc, exc_info=True)
|
||
return
|
||
|
||
if not connections:
|
||
logger.info("knowledge.daily_resync: no active knowledge connections — nothing to do")
|
||
return
|
||
|
||
logger.info(
|
||
"knowledge.daily_resync: enqueuing bootstrap for %d connection(s)",
|
||
len(connections),
|
||
extra={"event": "knowledge.daily_resync.started", "count": len(connections)},
|
||
)
|
||
|
||
enqueued = 0
|
||
skipped = 0
|
||
for conn in connections:
|
||
connectionId = str(conn.id)
|
||
authority = conn.authority.value if hasattr(conn.authority, "value") else str(conn.authority)
|
||
userId = str(conn.userId)
|
||
payload: Dict[str, Any] = {
|
||
"connectionId": connectionId,
|
||
"authority": authority.lower(),
|
||
"userId": userId,
|
||
}
|
||
try:
|
||
await startJob(
|
||
BOOTSTRAP_JOB_TYPE,
|
||
payload,
|
||
triggeredBy="scheduler.daily_resync",
|
||
)
|
||
enqueued += 1
|
||
logger.debug(
|
||
"knowledge.daily_resync: queued connectionId=%s authority=%s",
|
||
connectionId, authority,
|
||
)
|
||
except Exception as exc:
|
||
skipped += 1
|
||
logger.error(
|
||
"knowledge.daily_resync: failed to enqueue connectionId=%s: %s",
|
||
connectionId, exc,
|
||
)
|
||
|
||
logger.info(
|
||
"knowledge.daily_resync: done — enqueued=%d skipped=%d",
|
||
enqueued, skipped,
|
||
extra={"event": "knowledge.daily_resync.done", "enqueued": enqueued, "skipped": skipped},
|
||
)
|
||
|
||
|
||
def registerDailyResyncScheduler(*, hour: int = 2, minute: int = 0) -> None:
|
||
"""Register the daily knowledge re-sync cron job. Idempotent.
|
||
|
||
Args:
|
||
hour: Hour of day to run (0–23, default 2 → 2 AM Europe/Zurich).
|
||
minute: Minute within the hour (default 0).
|
||
"""
|
||
try:
|
||
from modules.shared.eventManagement import eventManager
|
||
eventManager.registerCron(
|
||
jobId="knowledge.daily_resync",
|
||
func=_scheduledDailyResync,
|
||
cronKwargs={"hour": str(hour), "minute": str(minute)},
|
||
)
|
||
logger.info(
|
||
"knowledge.daily_resync scheduler registered (daily %02d:%02d Europe/Zurich)",
|
||
hour, minute,
|
||
)
|
||
except Exception as exc:
|
||
logger.warning("knowledge.daily_resync scheduler registration failed (non-critical): %s", exc)
|
||
|
||
|
||
def registerKnowledgeIngestionConsumer() -> None:
|
||
"""Register callback subscribers + background job handler. Idempotent."""
|
||
global _registered
|
||
if _registered:
|
||
return
|
||
callbackRegistry.register("connection.established", _onConnectionEstablished)
|
||
callbackRegistry.register("connection.revoked", _onConnectionRevoked)
|
||
registerJobHandler(BOOTSTRAP_JOB_TYPE, _bootstrapJobHandler)
|
||
registerDailyResyncScheduler()
|
||
_registered = True
|
||
logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler + daily resync)", BOOTSTRAP_JOB_TYPE)
|