# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Shared helpers for ingestion walkers (timeouts, per-item logging). Walkers (sharepoint, gdrive, gmail, outlook, clickup, kdrive) all face the same risks: - A single `adapter.download()` call can hang on the network for hours. - A single `runExtraction()` call can hang on a corrupt PDF/Office doc inside a sync extractor library, blocking the asyncio loop. - A single `requestIngestion()` call can stall on the embedding API. Without timeouts, one bad item freezes the whole bootstrap job and we end up with "Job stuck at 10% for 10h" zombies. These helpers wrap each phase in `asyncio.wait_for`. Sync extraction runs on a worker thread so the loop stays responsive. Every wrapped call also emits a short start/done log line, so when something hangs we know the exact item that caused it (path, size, mime). """ from __future__ import annotations import asyncio import logging from typing import Any, Awaitable, Callable, Optional logger = logging.getLogger(__name__) DOWNLOAD_TIMEOUT_S = 60 EXTRACTION_TIMEOUT_S = 90 INGEST_TIMEOUT_S = 60 class WalkerTimeout(Exception): """Raised when a walker phase exceeds its timeout budget.""" async def downloadWithTimeout( awaitable: Awaitable[Any], *, label: str, timeoutSeconds: int = DOWNLOAD_TIMEOUT_S, ) -> Any: """Run a download awaitable with a hard timeout. `label` is a short human-readable identifier (typically the external path) used in log messages so we can pinpoint the offending item in case of a hang or timeout. """ logger.info("walker.download.start %s timeout=%ds", label, timeoutSeconds) try: result = await asyncio.wait_for(awaitable, timeout=timeoutSeconds) logger.debug("walker.download.done %s", label) return result except asyncio.TimeoutError as ex: logger.warning("walker.download.timeout %s after %ds", label, timeoutSeconds) raise WalkerTimeout(f"download timeout after {timeoutSeconds}s: {label}") from ex async def extractWithTimeout( syncFn: Callable[..., Any], *args: Any, label: str, timeoutSeconds: int = EXTRACTION_TIMEOUT_S, ) -> Any: """Run a synchronous extraction function on a worker thread with timeout. Sync extractors (PDF, OCR, MS Office) cannot be cancelled cleanly from asyncio; `wait_for` only protects the awaiter. The underlying thread may keep running until the process exits — but at least the walker proceeds to the next item instead of freezing forever. """ logger.info("walker.extract.start %s timeout=%ds", label, timeoutSeconds) try: result = await asyncio.wait_for( asyncio.to_thread(syncFn, *args), timeout=timeoutSeconds, ) logger.debug("walker.extract.done %s", label) return result except asyncio.TimeoutError as ex: logger.warning("walker.extract.timeout %s after %ds", label, timeoutSeconds) raise WalkerTimeout(f"extract timeout after {timeoutSeconds}s: {label}") from ex async def ingestWithTimeout( awaitable: Awaitable[Any], *, label: str, timeoutSeconds: int = INGEST_TIMEOUT_S, ) -> Any: """Run an ingestion request with a hard timeout.""" logger.debug("walker.ingest.start %s timeout=%ds", label, timeoutSeconds) try: result = await asyncio.wait_for(awaitable, timeout=timeoutSeconds) logger.debug("walker.ingest.done %s", label) return result except asyncio.TimeoutError as ex: logger.warning("walker.ingest.timeout %s after %ds", label, timeoutSeconds) raise WalkerTimeout(f"ingest timeout after {timeoutSeconds}s: {label}") from ex def logItemStart(service: str, label: str, *, sizeBytes: Optional[int] = None, mime: Optional[str] = None) -> None: """Log that processing of one item is about to begin. When the worker hangs, the LAST `walker.item.start` line in the log points to the exact item that caused the freeze. This is the single most valuable diagnostic for stuck-job triage. """ parts = [f"walker.item.start service={service} path={label}"] if sizeBytes is not None: parts.append(f"size={sizeBytes}") if mime: parts.append(f"mime={mime}") logger.info(" ".join(parts))