gateway/modules/connectors/connectorTicketsClickup.py
2025-09-25 16:59:44 +02:00

141 lines
6.2 KiB
Python

"""ClickUp connector for CRUD operations (compatible with TicketInterface).
This module defines its own minimal abstractions to avoid coupling.
"""
from typing import Optional
import logging
import aiohttp
from modules.datamodels.datamodelTickets import TicketBase, TicketFieldAttribute
logger = logging.getLogger(__name__)
class ConnectorTicketClickup(TicketBase):
def __init__(
self,
*,
apiToken: str,
teamId: str,
listId: Optional[str] = None,
apiUrl: str = "https://api.clickup.com/api/v2",
) -> None:
self.apiToken = apiToken
self.teamId = teamId
self.listId = listId
self.apiUrl = apiUrl
def _headers(self) -> dict:
return {
"Authorization": self.apiToken,
"Content-Type": "application/json",
}
async def read_attributes(self) -> list[TicketFieldAttribute]:
"""Fetch field attributes. Uses list custom fields if listId provided; else basic fields."""
attributes: list[TicketFieldAttribute] = []
try:
async with aiohttp.ClientSession() as session:
if self.listId:
url = f"{self.apiUrl}/list/{self.listId}/field"
async with session.get(url, headers=self._headers()) as response:
if response.status != 200:
logger.warning(f"ClickUp fields fetch status: {response.status}")
else:
data = await response.json()
for field in data.get("fields", []):
fieldId = field.get("id")
fieldName = field.get("name", fieldId)
if fieldId:
attributes.append(TicketFieldAttribute(fieldName=fieldName, field=fieldId))
# Add common top-level fields
core_fields = [
("ID", "id"),
("Name", "name"),
("Status", "status.status"),
("Assignees", "assignees"),
("DateCreated", "date_created"),
("DueDate", "due_date"),
]
for name, fid in core_fields:
attributes.append(TicketFieldAttribute(fieldName=name, field=fid))
except Exception as e:
logger.error(f"ClickUp read_attributes error: {e}")
return attributes
async def read_tasks(self, *, limit: int = 0) -> list[dict]:
"""Read tasks from ClickUp, always returning full task records.
If list_id is set, read from that list; otherwise read from team.
"""
tasks: list[dict] = []
try:
async with aiohttp.ClientSession() as session:
page = 0
pageSize = 100
while True:
if self.listId:
url = f"{self.apiUrl}/list/{self.listId}/task?subtasks=true&page={page}&order_by=created&reverse=true"
else:
# Team-level search for open tasks
url = f"{self.apiUrl}/team/{self.teamId}/task?subtasks=true&page={page}&order_by=created&reverse=true"
# Request with parameters to include all fields where possible
async with session.get(url, headers=self._headers()) as response:
if response.status != 200:
errorText = await response.text()
logger.error(f"ClickUp read_tasks failed: {response.status} {errorText}")
break
data = await response.json()
items = data.get("tasks", [])
for item in items:
tasks.append(item)
if limit and len(tasks) >= limit:
return tasks
if len(items) < pageSize:
break
page += 1
except Exception as e:
logger.error(f"ClickUp read_tasks error: {e}")
return tasks
async def write_tasks(self, tasklist: list[dict]) -> None:
"""Update tasks in ClickUp. Expects each item to contain {'ID' or 'id' or 'task_id', 'fields': {...}}"""
try:
async with aiohttp.ClientSession() as session:
for data in tasklist:
taskId = data.get("ID") or data.get("id") or data.get("task_id")
fields = data.get("fields", {})
if not taskId or not isinstance(fields, dict) or not fields:
continue
# Map generic fields to ClickUp payload
payload: dict = {}
for fieldId, value in fields.items():
# Heuristics: map common field ids
if fieldId in ("name", "summary"):
payload["name"] = value
elif fieldId in ("status",):
payload["status"] = value
elif fieldId.startswith("customfield_") or fieldId.startswith("cf_"):
# ClickUp custom fields need separate endpoint; attempt inline update if supported
if "custom_fields" not in payload:
payload["custom_fields"] = []
payload["custom_fields"].append({"id": fieldId, "value": value})
else:
# Best-effort assign to description for unknown text fields
if isinstance(value, str) and value:
payload.setdefault("description", value)
url = f"{self.apiUrl}/task/{taskId}"
async with session.put(url, headers=self._headers(), json=payload) as response:
if response.status not in (200, 204):
err = await response.text()
logger.error(f"ClickUp update failed for {taskId}: {response.status} {err}")
except Exception as e:
logger.error(f"ClickUp write_tasks error: {e}")