gateway/modules/connectors/connectorTicketsClickup.py
2025-09-24 23:18:10 +02:00

145 lines
6.3 KiB
Python

"""ClickUp connector for CRUD operations (compatible with TicketInterface)."""
from dataclasses import dataclass
from typing import Optional
import logging
import aiohttp
from modules.interfaces.interfaceTicketModel import (
TicketBase,
TicketFieldAttribute,
Task,
)
logger = logging.getLogger(__name__)
class ConnectorTicketClickup(TicketBase):
def __init__(
self,
*,
apiToken: str,
teamId: str,
listId: Optional[str] = None,
apiUrl: str = "https://api.clickup.com/api/v2",
) -> None:
self.apiToken = apiToken
self.teamId = teamId
self.listId = listId
self.apiUrl = apiUrl
def _headers(self) -> dict:
return {
"Authorization": self.apiToken,
"Content-Type": "application/json",
}
async def read_attributes(self) -> list[TicketFieldAttribute]:
"""Fetch field attributes. Uses list custom fields if listId provided; else basic fields."""
attributes: list[TicketFieldAttribute] = []
try:
async with aiohttp.ClientSession() as session:
if self.listId:
url = f"{self.apiUrl}/list/{self.listId}/field"
async with session.get(url, headers=self._headers()) as response:
if response.status != 200:
logger.warning(f"ClickUp fields fetch status: {response.status}")
else:
data = await response.json()
for field in data.get("fields", []):
fieldId = field.get("id")
fieldName = field.get("name", fieldId)
if fieldId:
attributes.append(TicketFieldAttribute(fieldName=fieldName, field=fieldId))
# Add common top-level fields
core_fields = [
("ID", "id"),
("Name", "name"),
("Status", "status.status"),
("Assignees", "assignees"),
("DateCreated", "date_created"),
("DueDate", "due_date"),
]
for name, fid in core_fields:
attributes.append(TicketFieldAttribute(fieldName=name, field=fid))
except Exception as e:
logger.error(f"ClickUp read_attributes error: {e}")
return attributes
async def read_tasks(self, *, limit: int = 0) -> list[Task]:
"""Read tasks from ClickUp, always returning full task records.
If list_id is set, read from that list; otherwise read from team.
"""
tasks: list[Task] = []
try:
async with aiohttp.ClientSession() as session:
page = 0
pageSize = 100
while True:
if self.listId:
url = f"{self.apiUrl}/list/{self.listId}/task?subtasks=true&page={page}&order_by=created&reverse=true"
else:
# Team-level search for open tasks
url = f"{self.apiUrl}/team/{self.teamId}/task?subtasks=true&page={page}&order_by=created&reverse=true"
# Request with parameters to include all fields where possible
async with session.get(url, headers=self._headers()) as response:
if response.status != 200:
errorText = await response.text()
logger.error(f"ClickUp read_tasks failed: {response.status} {errorText}")
break
data = await response.json()
items = data.get("tasks", [])
for item in items:
tasks.append(Task(data=item))
if limit and len(tasks) >= limit:
return tasks
if len(items) < pageSize:
break
page += 1
except Exception as e:
logger.error(f"ClickUp read_tasks error: {e}")
return tasks
async def write_tasks(self, tasklist: list[Task]) -> None:
"""Update tasks in ClickUp. Expects Task.data to contain {'ID' or 'id' or 'task_id', 'fields': {...}}"""
try:
async with aiohttp.ClientSession() as session:
for task in tasklist:
data = task.data
taskId = data.get("ID") or data.get("id") or data.get("task_id")
fields = data.get("fields", {})
if not taskId or not isinstance(fields, dict) or not fields:
continue
# Map generic fields to ClickUp payload
payload: dict = {}
for fieldId, value in fields.items():
# Heuristics: map common field ids
if fieldId in ("name", "summary"):
payload["name"] = value
elif fieldId in ("status",):
payload["status"] = value
elif fieldId.startswith("customfield_") or fieldId.startswith("cf_"):
# ClickUp custom fields need separate endpoint; attempt inline update if supported
if "custom_fields" not in payload:
payload["custom_fields"] = []
payload["custom_fields"].append({"id": fieldId, "value": value})
else:
# Best-effort assign to description for unknown text fields
if isinstance(value, str) and value:
payload.setdefault("description", value)
url = f"{self.apiUrl}/task/{taskId}"
async with session.put(url, headers=self._headers(), json=payload) as response:
if response.status not in (200, 204):
err = await response.text()
logger.error(f"ClickUp update failed for {taskId}: {response.status} {err}")
except Exception as e:
logger.error(f"ClickUp write_tasks error: {e}")