gateway/tests/unit/services/test_bootstrap_clickup.py
2026-04-29 14:39:40 +02:00

203 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Bootstrap ClickUp tests with a fake service + knowledge service.
Verifies:
- Teams → spaces → lists (folderless + folder-based) → tasks traversal.
- Each task produces a `requestIngestion` call with `sourceKind="clickup_task"`
and header + description content-objects.
- `date_updated` is forwarded as contentVersion → idempotency.
- Recency filter drops tasks older than `maxAgeDays`.
- maxWorkspaces / maxListsPerWorkspace / maxTasks caps are respected.
"""
import asyncio
import os
import sys
import time
from types import SimpleNamespace
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup import (
bootstrapClickup,
ClickupBootstrapLimits,
_syntheticTaskId,
)
def _nowMs(offsetDays: int = 0) -> str:
return str(int((time.time() + offsetDays * 86400) * 1000))
class _FakeClickupService:
"""Records API calls; serves a canned 1-team / 1-space / 1-list / 2-task layout."""
def __init__(self, taskCount=2, oldTask=False):
self._taskCount = taskCount
self._oldTask = oldTask # when True, the second task is 400 days old
self.calls = []
async def getAuthorizedTeams(self):
self.calls.append(("getAuthorizedTeams",))
return {"teams": [{"id": "team-1", "name": "Acme"}]}
async def getSpaces(self, team_id: str):
self.calls.append(("getSpaces", team_id))
return {"spaces": [{"id": "space-1", "name": "Engineering"}]}
async def getFolderlessLists(self, space_id: str):
self.calls.append(("getFolderlessLists", space_id))
return {"lists": [{"id": "list-1", "name": "Sprint 1"}]}
async def getFolders(self, space_id: str):
self.calls.append(("getFolders", space_id))
return {"folders": [{"id": "folder-1", "name": "Subproject"}]}
async def getListsInFolder(self, folder_id: str):
self.calls.append(("getListsInFolder", folder_id))
return {"lists": [{"id": "list-2", "name": "Sub-tasks"}]}
async def getTasksInList(self, list_id: str, *, page=0, include_closed=False, subtasks=True):
self.calls.append(("getTasksInList", list_id, page, include_closed))
if page > 0:
return {"tasks": []}
tasks = []
for i in range(self._taskCount):
tid = f"{list_id}-task-{i}"
offsetDays = -400 if (self._oldTask and i == 1) else 0
tasks.append({
"id": tid,
"name": f"Task {i} of {list_id}",
"description": f"Plain description for task {i}",
"text_content": f"Rich content for task {i}",
"status": {"status": "open" if i == 0 else "closed"},
"assignees": [{"username": "alice"}],
"tags": [{"name": "urgent"}],
"date_updated": _nowMs(offsetDays),
"date_created": _nowMs(-1),
"url": f"https://app.clickup.com/t/{tid}",
})
return {"tasks": tasks}
class _FakeKnowledgeService:
def __init__(self, duplicateIds=None):
self.calls = []
self._duplicates = duplicateIds or set()
async def requestIngestion(self, job):
self.calls.append(job)
status = "duplicate" if job.sourceId in self._duplicates else "indexed"
return SimpleNamespace(
jobId=job.sourceId, status=status, contentHash="h",
fileId=job.sourceId, index=None, error=None,
)
def _adapter(svc):
return SimpleNamespace(_svc=svc)
def test_bootstrap_walks_team_space_lists_and_tasks():
svc = _FakeClickupService(taskCount=2)
knowledge = _FakeKnowledgeService()
connection = SimpleNamespace(mandateId="m1", userId="u1")
async def _run():
return await bootstrapClickup(
connectionId="c1",
adapter=_adapter(svc),
connection=connection,
knowledgeService=knowledge,
limits=ClickupBootstrapLimits(maxAgeDays=None),
)
result = asyncio.run(_run())
# 2 lists (folderless list-1 + folder's list-2) × 2 tasks each = 4 tasks
assert result["indexed"] == 4
assert result["workspaces"] == 1
assert result["lists"] == 2
sourceIds = {c.sourceId for c in knowledge.calls}
assert len(sourceIds) == 4
for job in knowledge.calls:
assert job.sourceKind == "clickup_task"
assert job.mimeType == "application/vnd.clickup.task+json"
assert job.mandateId == "m1"
assert job.provenance["connectionId"] == "c1"
assert job.provenance["authority"] == "clickup"
assert job.provenance["teamId"] == "team-1"
assert job.contentVersion # numeric millisecond string
# At least the header content-object is present.
ids = [co["contentObjectId"] for co in job.contentObjects]
assert "header" in ids
def test_bootstrap_reports_duplicates_on_second_run():
svc = _FakeClickupService(taskCount=1)
duplicates = {
_syntheticTaskId("c1", "list-1-task-0"),
_syntheticTaskId("c1", "list-2-task-0"),
}
knowledge = _FakeKnowledgeService(duplicateIds=duplicates)
connection = SimpleNamespace(mandateId="m1", userId="u1")
async def _run():
return await bootstrapClickup(
connectionId="c1",
adapter=_adapter(svc),
connection=connection,
knowledgeService=knowledge,
limits=ClickupBootstrapLimits(maxAgeDays=None),
)
result = asyncio.run(_run())
assert result["indexed"] == 0
assert result["skippedDuplicate"] == 2
def test_bootstrap_skips_tasks_older_than_maxAgeDays():
svc = _FakeClickupService(taskCount=2, oldTask=True)
knowledge = _FakeKnowledgeService()
connection = SimpleNamespace(mandateId="m1", userId="u1")
async def _run():
return await bootstrapClickup(
connectionId="c1",
adapter=_adapter(svc),
connection=connection,
knowledgeService=knowledge,
limits=ClickupBootstrapLimits(maxAgeDays=180),
)
result = asyncio.run(_run())
# 2 lists × (1 recent + 1 skipped old) = 2 indexed + 2 skippedPolicy
assert result["indexed"] == 2
assert result["skippedPolicy"] == 2
def test_bootstrap_maxTasks_caps_ingestion():
svc = _FakeClickupService(taskCount=2)
knowledge = _FakeKnowledgeService()
connection = SimpleNamespace(mandateId="m1", userId="u1")
async def _run():
return await bootstrapClickup(
connectionId="c1",
adapter=_adapter(svc),
connection=connection,
knowledgeService=knowledge,
limits=ClickupBootstrapLimits(maxAgeDays=None, maxTasks=3),
)
result = asyncio.run(_run())
assert result["indexed"] == 3
if __name__ == "__main__":
test_bootstrap_walks_team_space_lists_and_tasks()
test_bootstrap_reports_duplicates_on_second_run()
test_bootstrap_skips_tasks_older_than_maxAgeDays()
test_bootstrap_maxTasks_caps_ingestion()
print("OK — bootstrapClickup tests passed")