feat: app-scheduler ausgebaut um nachts bestehende connections zu indexieren
This commit is contained in:
parent
4a840e9e6e
commit
ce671f61b6
2 changed files with 107 additions and 1 deletions
|
|
@ -1281,6 +1281,28 @@ class AppObjects:
|
|||
logger.error(f"Error getting user connections: {str(e)}")
|
||||
return []
|
||||
|
||||
def getActiveKnowledgeConnections(self) -> List[UserConnection]:
|
||||
"""Return all UserConnections with knowledgeIngestionEnabled=True and status=active.
|
||||
|
||||
Used by the daily re-sync scheduler to determine which connections to re-index.
|
||||
"""
|
||||
try:
|
||||
rows = self.db.getRecordset(
|
||||
UserConnection,
|
||||
recordFilter={"knowledgeIngestionEnabled": True, "status": ConnectionStatus.ACTIVE.value},
|
||||
)
|
||||
result = []
|
||||
for row in rows or []:
|
||||
try:
|
||||
conn = UserConnection.model_validate(row) if isinstance(row, dict) else row
|
||||
result.append(conn)
|
||||
except Exception as _e:
|
||||
logger.warning(f"getActiveKnowledgeConnections: could not parse row: {_e}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"getActiveKnowledgeConnections failed: {e}")
|
||||
return []
|
||||
|
||||
def getUserConnectionById(self, connectionId: str) -> Optional[UserConnection]:
|
||||
"""Get a single UserConnection by ID or by reference string (connection:authority:username)."""
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -238,6 +238,89 @@ async def _bootstrapJobHandler(
|
|||
}
|
||||
|
||||
|
||||
async def _scheduledDailyResync() -> None:
|
||||
"""Enqueue a connection.bootstrap job for every active knowledge connection.
|
||||
|
||||
Runs once per day (default 2 AM Europe/Zurich). Each job re-walks the
|
||||
connector and hands new / changed items to KnowledgeService.requestIngestion.
|
||||
Unchanged items are deduplicated by content-hash and skipped automatically.
|
||||
"""
|
||||
try:
|
||||
from modules.interfaces.interfaceDbApp import getRootInterface
|
||||
rootInterface = getRootInterface()
|
||||
connections = rootInterface.getActiveKnowledgeConnections()
|
||||
except Exception as exc:
|
||||
logger.error("knowledge.daily_resync: could not load connections: %s", exc, exc_info=True)
|
||||
return
|
||||
|
||||
if not connections:
|
||||
logger.info("knowledge.daily_resync: no active knowledge connections — nothing to do")
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"knowledge.daily_resync: enqueuing bootstrap for %d connection(s)",
|
||||
len(connections),
|
||||
extra={"event": "knowledge.daily_resync.started", "count": len(connections)},
|
||||
)
|
||||
|
||||
enqueued = 0
|
||||
skipped = 0
|
||||
for conn in connections:
|
||||
connectionId = str(conn.id)
|
||||
authority = conn.authority.value if hasattr(conn.authority, "value") else str(conn.authority)
|
||||
userId = str(conn.userId)
|
||||
payload: Dict[str, Any] = {
|
||||
"connectionId": connectionId,
|
||||
"authority": authority.lower(),
|
||||
"userId": userId,
|
||||
}
|
||||
try:
|
||||
await startJob(
|
||||
BOOTSTRAP_JOB_TYPE,
|
||||
payload,
|
||||
triggeredBy="scheduler.daily_resync",
|
||||
)
|
||||
enqueued += 1
|
||||
logger.debug(
|
||||
"knowledge.daily_resync: queued connectionId=%s authority=%s",
|
||||
connectionId, authority,
|
||||
)
|
||||
except Exception as exc:
|
||||
skipped += 1
|
||||
logger.error(
|
||||
"knowledge.daily_resync: failed to enqueue connectionId=%s: %s",
|
||||
connectionId, exc,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"knowledge.daily_resync: done — enqueued=%d skipped=%d",
|
||||
enqueued, skipped,
|
||||
extra={"event": "knowledge.daily_resync.done", "enqueued": enqueued, "skipped": skipped},
|
||||
)
|
||||
|
||||
|
||||
def registerDailyResyncScheduler(*, hour: int = 2, minute: int = 0) -> None:
|
||||
"""Register the daily knowledge re-sync cron job. Idempotent.
|
||||
|
||||
Args:
|
||||
hour: Hour of day to run (0–23, default 2 → 2 AM Europe/Zurich).
|
||||
minute: Minute within the hour (default 0).
|
||||
"""
|
||||
try:
|
||||
from modules.shared.eventManagement import eventManager
|
||||
eventManager.registerCron(
|
||||
jobId="knowledge.daily_resync",
|
||||
func=_scheduledDailyResync,
|
||||
cronKwargs={"hour": str(hour), "minute": str(minute)},
|
||||
)
|
||||
logger.info(
|
||||
"knowledge.daily_resync scheduler registered (daily %02d:%02d Europe/Zurich)",
|
||||
hour, minute,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("knowledge.daily_resync scheduler registration failed (non-critical): %s", exc)
|
||||
|
||||
|
||||
def registerKnowledgeIngestionConsumer() -> None:
|
||||
"""Register callback subscribers + background job handler. Idempotent."""
|
||||
global _registered
|
||||
|
|
@ -246,5 +329,6 @@ def registerKnowledgeIngestionConsumer() -> None:
|
|||
callbackRegistry.register("connection.established", _onConnectionEstablished)
|
||||
callbackRegistry.register("connection.revoked", _onConnectionRevoked)
|
||||
registerJobHandler(BOOTSTRAP_JOB_TYPE, _bootstrapJobHandler)
|
||||
registerDailyResyncScheduler()
|
||||
_registered = True
|
||||
logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler)", BOOTSTRAP_JOB_TYPE)
|
||||
logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler + daily resync)", BOOTSTRAP_JOB_TYPE)
|
||||
|
|
|
|||
Loading…
Reference in a new issue