141 lines
6.2 KiB
Python
141 lines
6.2 KiB
Python
"""ClickUp connector for CRUD operations (compatible with TicketInterface).
|
|
|
|
This module defines its own minimal abstractions to avoid coupling.
|
|
"""
|
|
|
|
from typing import Optional
|
|
import logging
|
|
import aiohttp
|
|
from modules.datamodels.datamodelTickets import TicketBase, TicketFieldAttribute
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ConnectorTicketClickup(TicketBase):
|
|
def __init__(
|
|
self,
|
|
*,
|
|
apiToken: str,
|
|
teamId: str,
|
|
listId: Optional[str] = None,
|
|
apiUrl: str = "https://api.clickup.com/api/v2",
|
|
) -> None:
|
|
self.apiToken = apiToken
|
|
self.teamId = teamId
|
|
self.listId = listId
|
|
self.apiUrl = apiUrl
|
|
|
|
def _headers(self) -> dict:
|
|
return {
|
|
"Authorization": self.apiToken,
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
async def read_attributes(self) -> list[TicketFieldAttribute]:
|
|
"""Fetch field attributes. Uses list custom fields if listId provided; else basic fields."""
|
|
attributes: list[TicketFieldAttribute] = []
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
if self.listId:
|
|
url = f"{self.apiUrl}/list/{self.listId}/field"
|
|
async with session.get(url, headers=self._headers()) as response:
|
|
if response.status != 200:
|
|
logger.warning(f"ClickUp fields fetch status: {response.status}")
|
|
else:
|
|
data = await response.json()
|
|
for field in data.get("fields", []):
|
|
fieldId = field.get("id")
|
|
fieldName = field.get("name", fieldId)
|
|
if fieldId:
|
|
attributes.append(TicketFieldAttribute(fieldName=fieldName, field=fieldId))
|
|
|
|
# Add common top-level fields
|
|
core_fields = [
|
|
("ID", "id"),
|
|
("Name", "name"),
|
|
("Status", "status.status"),
|
|
("Assignees", "assignees"),
|
|
("DateCreated", "date_created"),
|
|
("DueDate", "due_date"),
|
|
]
|
|
for name, fid in core_fields:
|
|
attributes.append(TicketFieldAttribute(fieldName=name, field=fid))
|
|
except Exception as e:
|
|
logger.error(f"ClickUp read_attributes error: {e}")
|
|
return attributes
|
|
|
|
async def read_tasks(self, *, limit: int = 0) -> list[dict]:
|
|
"""Read tasks from ClickUp, always returning full task records.
|
|
If list_id is set, read from that list; otherwise read from team.
|
|
"""
|
|
tasks: list[dict] = []
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
page = 0
|
|
pageSize = 100
|
|
while True:
|
|
if self.listId:
|
|
url = f"{self.apiUrl}/list/{self.listId}/task?subtasks=true&page={page}&order_by=created&reverse=true"
|
|
else:
|
|
# Team-level search for open tasks
|
|
url = f"{self.apiUrl}/team/{self.teamId}/task?subtasks=true&page={page}&order_by=created&reverse=true"
|
|
|
|
# Request with parameters to include all fields where possible
|
|
async with session.get(url, headers=self._headers()) as response:
|
|
if response.status != 200:
|
|
errorText = await response.text()
|
|
logger.error(f"ClickUp read_tasks failed: {response.status} {errorText}")
|
|
break
|
|
|
|
data = await response.json()
|
|
items = data.get("tasks", [])
|
|
for item in items:
|
|
tasks.append(item)
|
|
if limit and len(tasks) >= limit:
|
|
return tasks
|
|
|
|
if len(items) < pageSize:
|
|
break
|
|
page += 1
|
|
except Exception as e:
|
|
logger.error(f"ClickUp read_tasks error: {e}")
|
|
return tasks
|
|
|
|
async def write_tasks(self, tasklist: list[dict]) -> None:
|
|
"""Update tasks in ClickUp. Expects each item to contain {'ID' or 'id' or 'task_id', 'fields': {...}}"""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
for data in tasklist:
|
|
taskId = data.get("ID") or data.get("id") or data.get("task_id")
|
|
fields = data.get("fields", {})
|
|
if not taskId or not isinstance(fields, dict) or not fields:
|
|
continue
|
|
|
|
# Map generic fields to ClickUp payload
|
|
payload: dict = {}
|
|
for fieldId, value in fields.items():
|
|
# Heuristics: map common field ids
|
|
if fieldId in ("name", "summary"):
|
|
payload["name"] = value
|
|
elif fieldId in ("status",):
|
|
payload["status"] = value
|
|
elif fieldId.startswith("customfield_") or fieldId.startswith("cf_"):
|
|
# ClickUp custom fields need separate endpoint; attempt inline update if supported
|
|
if "custom_fields" not in payload:
|
|
payload["custom_fields"] = []
|
|
payload["custom_fields"].append({"id": fieldId, "value": value})
|
|
else:
|
|
# Best-effort assign to description for unknown text fields
|
|
if isinstance(value, str) and value:
|
|
payload.setdefault("description", value)
|
|
|
|
url = f"{self.apiUrl}/task/{taskId}"
|
|
async with session.put(url, headers=self._headers(), json=payload) as response:
|
|
if response.status not in (200, 204):
|
|
err = await response.text()
|
|
logger.error(f"ClickUp update failed for {taskId}: {response.status} {err}")
|
|
except Exception as e:
|
|
logger.error(f"ClickUp write_tasks error: {e}")
|
|
|
|
|