gateway/modules/workflows/methods/methodClickup/actions/create_task.py

213 lines
7 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from modules.datamodels.datamodelChat import ActionDocument, ActionResult
from ..helpers.pathparse import parse_team_and_list
logger = logging.getLogger(__name__)
def _as_str(v: Any) -> str:
if v is None:
return ""
return str(v).strip()
def _parse_custom_field_values(parameters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Build ClickUp custom_fields array from customFieldValues map (field id -> value)."""
raw = parameters.get("customFieldValues")
if raw is None:
return []
if isinstance(raw, str) and raw.strip():
try:
raw = json.loads(raw)
except json.JSONDecodeError:
return []
if not isinstance(raw, dict):
return []
out: List[Dict[str, Any]] = []
for fid, val in raw.items():
if val is None or val == "":
continue
if isinstance(val, dict) and val.get("type") in ("ref", "value"):
continue
out.append({"id": str(fid), "value": val})
return out
def _unwrap_value(v: Any) -> Any:
if isinstance(v, dict) and v.get("type") == "value" and "value" in v:
return v.get("value")
return v
def _parse_int_list(val: Any) -> List[int]:
if val is None:
return []
val = _unwrap_value(val)
if isinstance(val, str) and val.strip():
try:
parsed = json.loads(val)
if isinstance(parsed, list):
return [int(x) for x in parsed if x is not None and str(x).strip() != ""]
except (json.JSONDecodeError, ValueError, TypeError):
return []
if isinstance(val, list):
out: List[int] = []
for x in val:
if x is None or (isinstance(x, str) and not x.strip()):
continue
try:
out.append(int(x))
except (ValueError, TypeError):
continue
return out
return []
def _optional_positive_int(v: Any) -> Optional[int]:
v = _unwrap_value(v)
if v is None or v == "":
return None
try:
i = int(float(v))
return i if i > 0 else None
except (ValueError, TypeError):
return None
def _parse_due_date_ms(v: Any) -> Optional[int]:
"""Accept Unix ms or ISO date string (YYYY-MM-DD) from form payload."""
v = _unwrap_value(v)
if v is None or v == "":
return None
if isinstance(v, str) and len(v) >= 10 and v[4] == "-" and v[7] == "-":
try:
dt = datetime.strptime(v[:10], "%Y-%m-%d").replace(tzinfo=timezone.utc)
return int(dt.timestamp() * 1000)
except ValueError:
pass
try:
i = int(float(v))
return i if i > 0 else None
except (ValueError, TypeError):
return None
def _parse_time_estimate_hours_to_ms(v: Any) -> Optional[int]:
v = _unwrap_value(v)
if v is None or v == "":
return None
try:
h = float(v)
if h < 0:
return None
return int(round(h * 3600 * 1000))
except (ValueError, TypeError):
return None
def _apply_standard_task_fields(body: Dict[str, Any], parameters: Dict[str, Any]) -> None:
"""Map first-class node params to ClickUp POST /list/{{id}}/task body (before taskFields merge)."""
ts = _as_str(parameters.get("taskStatus") or parameters.get("clickupStatus"))
if ts:
body["status"] = ts
pr = parameters.get("taskPriority")
pr = _unwrap_value(pr)
if pr is not None and pr != "":
try:
pi = int(float(pr))
if 1 <= pi <= 4:
body["priority"] = pi
except (ValueError, TypeError):
pass
dd = parameters.get("taskDueDateMs")
dms = _parse_due_date_ms(dd)
if dms is not None:
body["due_date"] = dms
assignees = _parse_int_list(parameters.get("taskAssigneeIds"))
if assignees:
body["assignees"] = assignees
teh = parameters.get("taskTimeEstimateHours")
tem_h = _parse_time_estimate_hours_to_ms(teh)
if tem_h is not None:
body["time_estimate"] = tem_h
else:
te = parameters.get("taskTimeEstimateMs")
tem = _optional_positive_int(te)
if tem is not None:
body["time_estimate"] = tem
def _merge_custom_fields(body: Dict[str, Any], items: List[Dict[str, Any]]) -> None:
if not items:
return
existing = body.get("custom_fields")
if isinstance(existing, list) and existing:
by_id: Dict[str, Dict[str, Any]] = {}
for x in existing:
if isinstance(x, dict) and x.get("id") is not None:
by_id[str(x["id"])] = x
for item in items:
by_id[str(item["id"])] = item
body["custom_fields"] = list(by_id.values())
else:
body["custom_fields"] = items
async def create_task(self, parameters: Dict[str, Any]) -> ActionResult:
connection_reference = parameters.get("connectionReference")
list_id = (parameters.get("listId") or "").strip()
path_query = (parameters.get("pathQuery") or parameters.get("path") or "").strip()
name = _as_str(parameters.get("name"))
description = _as_str(parameters.get("description"))
if not connection_reference:
return ActionResult.isFailure(error="connectionReference is required")
if not list_id and path_query:
_t, lid = parse_team_and_list(path_query)
list_id = lid or list_id
if not list_id:
return ActionResult.isFailure(error="listId or path /team/{teamId}/list/{listId} is required")
if not name:
return ActionResult.isFailure(error="name is required")
conn = self.connection.get_clickup_connection(connection_reference)
if not conn:
return ActionResult.isFailure(error="No valid ClickUp connection")
body: Dict[str, Any] = {"name": name}
if description:
body["description"] = description
_apply_standard_task_fields(body, parameters)
extra = parameters.get("taskFields")
if isinstance(extra, str) and extra.strip():
try:
parsed = json.loads(extra)
if isinstance(parsed, dict):
body.update(parsed)
except json.JSONDecodeError:
return ActionResult.isFailure(error="taskFields must be valid JSON object")
elif isinstance(extra, dict):
body.update(extra)
cf_items = _parse_custom_field_values(parameters)
if cf_items:
_merge_custom_fields(body, cf_items)
data = await self.services.clickup.createTask(list_id, body)
if isinstance(data, dict) and data.get("error"):
return ActionResult.isFailure(error=str(data.get("error")) + (data.get("body") or ""))
doc = ActionDocument(
documentName="clickup_create_task.json",
documentData=json.dumps(data, ensure_ascii=False, indent=2),
mimeType="application/json",
validationMetadata={"actionType": "clickup.createTask", "listId": list_id},
)
return ActionResult.isSuccess(documents=[doc])