117 lines
4.3 KiB
Python
117 lines
4.3 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""Shared helpers for ingestion walkers (timeouts, per-item logging).
|
|
|
|
Walkers (sharepoint, gdrive, gmail, outlook, clickup, kdrive) all face the
|
|
same risks:
|
|
|
|
- A single `adapter.download()` call can hang on the network for hours.
|
|
- A single `runExtraction()` call can hang on a corrupt PDF/Office doc inside
|
|
a sync extractor library, blocking the asyncio loop.
|
|
- A single `requestIngestion()` call can stall on the embedding API.
|
|
|
|
Without timeouts, one bad item freezes the whole bootstrap job and we end
|
|
up with "Job stuck at 10% for 10h" zombies.
|
|
|
|
These helpers wrap each phase in `asyncio.wait_for`. Sync extraction runs
|
|
on a worker thread so the loop stays responsive. Every wrapped call also
|
|
emits start/done log lines at DEBUG so normal INFO logs stay quiet; for
|
|
stuck-job triage, enable DEBUG for this module — the last
|
|
``walker.item.start`` before a hang still pinpoints the item (path, size, mime).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Awaitable, Callable, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DOWNLOAD_TIMEOUT_S = 60
|
|
EXTRACTION_TIMEOUT_S = 90
|
|
INGEST_TIMEOUT_S = 60
|
|
|
|
|
|
class WalkerTimeout(Exception):
|
|
"""Raised when a walker phase exceeds its timeout budget."""
|
|
|
|
|
|
async def downloadWithTimeout(
|
|
awaitable: Awaitable[Any],
|
|
*,
|
|
label: str,
|
|
timeoutSeconds: int = DOWNLOAD_TIMEOUT_S,
|
|
) -> Any:
|
|
"""Run a download awaitable with a hard timeout.
|
|
|
|
`label` is a short human-readable identifier (typically the external path)
|
|
used in log messages so we can pinpoint the offending item in case of a
|
|
hang or timeout.
|
|
"""
|
|
logger.debug("walker.download.start %s timeout=%ds", label, timeoutSeconds)
|
|
try:
|
|
result = await asyncio.wait_for(awaitable, timeout=timeoutSeconds)
|
|
logger.debug("walker.download.done %s", label)
|
|
return result
|
|
except asyncio.TimeoutError as ex:
|
|
logger.warning("walker.download.timeout %s after %ds", label, timeoutSeconds)
|
|
raise WalkerTimeout(f"download timeout after {timeoutSeconds}s: {label}") from ex
|
|
|
|
|
|
async def extractWithTimeout(
|
|
syncFn: Callable[..., Any],
|
|
*args: Any,
|
|
label: str,
|
|
timeoutSeconds: int = EXTRACTION_TIMEOUT_S,
|
|
) -> Any:
|
|
"""Run a synchronous extraction function on a worker thread with timeout.
|
|
|
|
Sync extractors (PDF, OCR, MS Office) cannot be cancelled cleanly from
|
|
asyncio; `wait_for` only protects the awaiter. The underlying thread may
|
|
keep running until the process exits — but at least the walker proceeds
|
|
to the next item instead of freezing forever.
|
|
"""
|
|
logger.debug("walker.extract.start %s timeout=%ds", label, timeoutSeconds)
|
|
try:
|
|
result = await asyncio.wait_for(
|
|
asyncio.to_thread(syncFn, *args),
|
|
timeout=timeoutSeconds,
|
|
)
|
|
logger.debug("walker.extract.done %s", label)
|
|
return result
|
|
except asyncio.TimeoutError as ex:
|
|
logger.warning("walker.extract.timeout %s after %ds", label, timeoutSeconds)
|
|
raise WalkerTimeout(f"extract timeout after {timeoutSeconds}s: {label}") from ex
|
|
|
|
|
|
async def ingestWithTimeout(
|
|
awaitable: Awaitable[Any],
|
|
*,
|
|
label: str,
|
|
timeoutSeconds: int = INGEST_TIMEOUT_S,
|
|
) -> Any:
|
|
"""Run an ingestion request with a hard timeout."""
|
|
logger.debug("walker.ingest.start %s timeout=%ds", label, timeoutSeconds)
|
|
try:
|
|
result = await asyncio.wait_for(awaitable, timeout=timeoutSeconds)
|
|
logger.debug("walker.ingest.done %s", label)
|
|
return result
|
|
except asyncio.TimeoutError as ex:
|
|
logger.warning("walker.ingest.timeout %s after %ds", label, timeoutSeconds)
|
|
raise WalkerTimeout(f"ingest timeout after {timeoutSeconds}s: {label}") from ex
|
|
|
|
|
|
def logItemStart(service: str, label: str, *, sizeBytes: Optional[int] = None, mime: Optional[str] = None) -> None:
|
|
"""Log that processing of one item is about to begin (DEBUG).
|
|
|
|
When the worker hangs, the LAST `walker.item.start` line in the log
|
|
points to the exact item that caused the freeze. Enable DEBUG for this
|
|
module during triage.
|
|
"""
|
|
parts = [f"walker.item.start service={service} path={label}"]
|
|
if sizeBytes is not None:
|
|
parts.append(f"size={sizeBytes}")
|
|
if mime:
|
|
parts.append(f"mime={mime}")
|
|
logger.debug(" ".join(parts))
|