gateway/tests/unit/services/test_bootstrap_gdrive.py
2026-04-29 14:39:40 +02:00

225 lines
7.2 KiB
Python

#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Bootstrap Google Drive tests with a fake adapter + knowledge service.
Verifies:
- Drive walk traverses root → subfolders, respecting `maxDepth`.
- Every file triggers `requestIngestion` with `sourceKind="gdrive_item"`.
- Duplicate runs (same modifiedTime revision) report `skippedDuplicate`.
- Provenance carries `authority="google"` and the Drive file id.
- Recency filter skips files older than `maxAgeDays`.
"""
import asyncio
import os
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Any, Dict, List, Optional
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGdrive import (
bootstrapGdrive,
GdriveBootstrapLimits,
_syntheticFileId,
)
@dataclass
class _ExtEntry:
name: str
path: str
isFolder: bool = False
size: Optional[int] = None
mimeType: Optional[str] = None
metadata: Dict[str, Any] = None
def _today_iso(offsetDays: int = 0) -> str:
return (datetime.now(timezone.utc) + timedelta(days=offsetDays)).strftime("%Y-%m-%dT%H:%M:%SZ")
class _FakeDriveAdapter:
"""Minimal DriveAdapter stand-in.
Layout:
"/" (root) → 2 files + 1 folder (sub)
"/sub_id" → 1 file
"""
def __init__(self, recent_only: bool = True):
self.downloaded: List[str] = []
self._recent = _today_iso(0)
self._old = _today_iso(-400)
self._recent_only = recent_only
async def browse(self, path: str, filter=None, limit=None):
if path in ("/", "", "root"):
return [
_ExtEntry(
name="f1.txt", path="/f1", size=20,
mimeType="text/plain",
metadata={"id": "f1", "modifiedTime": self._recent},
),
_ExtEntry(
name="f2.txt", path="/f2", size=20,
mimeType="text/plain",
metadata={"id": "f2", "modifiedTime": self._recent if self._recent_only else self._old},
),
_ExtEntry(
name="Subfolder", path="/sub_id", isFolder=True,
mimeType="application/vnd.google-apps.folder",
metadata={"id": "sub_id", "modifiedTime": self._recent},
),
]
if path == "/sub_id":
return [
_ExtEntry(
name="f3.txt", path="/f3", size=20,
mimeType="text/plain",
metadata={"id": "f3", "modifiedTime": self._recent},
),
]
return []
async def download(self, path: str) -> bytes:
self.downloaded.append(path)
return path.encode("utf-8")
class _FakeKnowledgeService:
def __init__(self, duplicateIds=None):
self.calls: List[SimpleNamespace] = []
self._duplicateIds = duplicateIds or set()
async def requestIngestion(self, job):
self.calls.append(job)
status = "duplicate" if job.sourceId in self._duplicateIds else "indexed"
return SimpleNamespace(
jobId=f"{job.sourceKind}:{job.sourceId}",
status=status, contentHash="h",
fileId=job.sourceId, index=None, error=None,
)
def _fakeRunExtraction(data, name, mime, options):
return SimpleNamespace(
parts=[
SimpleNamespace(
id="p1",
data=data.decode("utf-8") if isinstance(data, bytes) else str(data),
typeGroup="text",
label="page:1",
metadata={"pageIndex": 0},
)
]
)
def test_bootstrap_walks_drive_and_subfolders():
adapter = _FakeDriveAdapter()
knowledge = _FakeKnowledgeService()
connection = SimpleNamespace(mandateId="m1", userId="u1")
async def _run():
return await bootstrapGdrive(
connectionId="c1",
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
runExtractionFn=_fakeRunExtraction,
limits=GdriveBootstrapLimits(maxAgeDays=None),
)
result = asyncio.run(_run())
assert len(knowledge.calls) == 3
sourceIds = {c.sourceId for c in knowledge.calls}
assert sourceIds == {
_syntheticFileId("c1", "f1"),
_syntheticFileId("c1", "f2"),
_syntheticFileId("c1", "f3"),
}
assert result["indexed"] == 3
assert result["skippedDuplicate"] == 0
assert adapter.downloaded == ["/f1", "/f2", "/f3"]
def test_bootstrap_reports_duplicates_on_second_run():
adapter = _FakeDriveAdapter()
duplicateIds = {
_syntheticFileId("c1", "f1"),
_syntheticFileId("c1", "f2"),
_syntheticFileId("c1", "f3"),
}
knowledge = _FakeKnowledgeService(duplicateIds=duplicateIds)
connection = SimpleNamespace(mandateId="m1", userId="u1")
async def _run():
return await bootstrapGdrive(
connectionId="c1",
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
runExtractionFn=_fakeRunExtraction,
limits=GdriveBootstrapLimits(maxAgeDays=None),
)
result = asyncio.run(_run())
assert result["indexed"] == 0
assert result["skippedDuplicate"] == 3
def test_bootstrap_skips_files_older_than_maxAgeDays():
adapter = _FakeDriveAdapter(recent_only=False) # f2 is 400 days old
knowledge = _FakeKnowledgeService()
connection = SimpleNamespace(mandateId="m1", userId="u1")
async def _run():
return await bootstrapGdrive(
connectionId="c1",
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
runExtractionFn=_fakeRunExtraction,
limits=GdriveBootstrapLimits(maxAgeDays=180),
)
result = asyncio.run(_run())
assert result["indexed"] == 2 # f1, f3
assert result["skippedPolicy"] == 1 # f2 filtered out
def test_bootstrap_passes_connection_provenance():
adapter = _FakeDriveAdapter()
knowledge = _FakeKnowledgeService()
connection = SimpleNamespace(mandateId="m1", userId="u1")
async def _run():
return await bootstrapGdrive(
connectionId="c1",
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
runExtractionFn=_fakeRunExtraction,
limits=GdriveBootstrapLimits(maxAgeDays=None),
)
asyncio.run(_run())
for job in knowledge.calls:
assert job.sourceKind == "gdrive_item"
assert job.mandateId == "m1"
assert job.provenance["connectionId"] == "c1"
assert job.provenance["authority"] == "google"
assert job.provenance["service"] == "drive"
assert job.contentVersion # modifiedTime ISO string
if __name__ == "__main__":
test_bootstrap_walks_drive_and_subfolders()
test_bootstrap_reports_duplicates_on_second_run()
test_bootstrap_skips_files_older_than_maxAgeDays()
test_bootstrap_passes_connection_provenance()
print("OK — bootstrapGdrive tests passed")