gateway/modules/workflows/automation2/clickupTaskUpdateMerge.py

174 lines
5.7 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# Merge clickup.updateTask node parameter taskUpdateEntries into taskUpdate JSON.
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
def _unwrap_value(v: Any) -> Any:
if isinstance(v, dict) and v.get("type") == "value" and "value" in v:
return v.get("value")
return v
def _unwrap_dynamic(v: Any) -> Any:
return _unwrap_value(v)
def _parse_int_list(val: Any) -> List[int]:
if val is None:
return []
val = _unwrap_value(val)
if isinstance(val, str) and val.strip():
try:
parsed = json.loads(val)
if isinstance(parsed, list):
return [int(x) for x in parsed if x is not None and str(x).strip() != ""]
except (json.JSONDecodeError, ValueError, TypeError):
return []
if isinstance(val, list):
out: List[int] = []
for x in val:
if x is None or (isinstance(x, str) and not x.strip()):
continue
try:
out.append(int(x))
except (ValueError, TypeError):
continue
return out
return []
def _parse_due_date_ms(v: Any) -> Optional[int]:
v = _unwrap_value(v)
if v is None or v == "":
return None
if isinstance(v, str) and len(v) >= 10 and v[4] == "-" and v[7] == "-":
try:
dt = datetime.strptime(v[:10], "%Y-%m-%d").replace(tzinfo=timezone.utc)
return int(dt.timestamp() * 1000)
except ValueError:
pass
try:
i = int(float(v))
return i if i > 0 else None
except (ValueError, TypeError):
return None
def _parse_time_estimate_hours_to_ms(v: Any) -> Optional[int]:
v = _unwrap_value(v)
if v is None or v == "":
return None
try:
h = float(v)
if h < 0:
return None
return int(round(h * 3600 * 1000))
except (ValueError, TypeError):
return None
def merge_clickup_task_update_entries(resolved_params: Dict[str, Any]) -> None:
"""
Pop taskUpdateEntries from resolved_params and merge into taskUpdate (dict or JSON string).
Existing taskUpdate (advanced JSON) is the base; entry rows override by key.
"""
entries = resolved_params.pop("taskUpdateEntries", None)
json_raw = resolved_params.get("taskUpdate")
base: Dict[str, Any] = {}
if isinstance(json_raw, str) and json_raw.strip():
try:
parsed = json.loads(json_raw)
if isinstance(parsed, dict):
base = dict(parsed)
except json.JSONDecodeError:
logger.warning("clickup.updateTask: taskUpdate is not valid JSON, ignoring base")
elif isinstance(json_raw, dict):
base = dict(json_raw)
if not isinstance(entries, list) or not entries:
if not base and json_raw not in (None, "", {}):
resolved_params["taskUpdate"] = json_raw
elif base:
resolved_params["taskUpdate"] = json.dumps(base, ensure_ascii=False)
return
overlay: Dict[str, Any] = {}
custom_rows: List[Dict[str, Any]] = []
for row in entries:
if not isinstance(row, dict):
continue
fk = row.get("fieldKey") or row.get("field")
if fk is None:
continue
fk = str(fk).strip()
val = _unwrap_dynamic(row.get("value"))
if fk == "custom_field":
cfid = _unwrap_dynamic(row.get("customFieldId"))
if not cfid or not str(cfid).strip():
continue
if val is None or val == "":
continue
custom_rows.append({"id": str(cfid).strip(), "value": val})
continue
if fk == "name" and val is not None and str(val).strip():
overlay["name"] = str(val).strip()
elif fk == "description":
overlay["description"] = "" if val is None else str(val)
elif fk == "status" and val is not None and str(val).strip():
overlay["status"] = str(val).strip()
elif fk == "priority":
if val is None or val == "":
continue
try:
pi = int(float(val))
if 1 <= pi <= 4:
overlay["priority"] = pi
except (ValueError, TypeError):
pass
elif fk == "due_date":
dms = _parse_due_date_ms(val)
if dms is not None:
overlay["due_date"] = dms
elif fk == "time_estimate_h":
tms = _parse_time_estimate_hours_to_ms(val)
if tms is not None:
overlay["time_estimate"] = tms
elif fk == "time_estimate_ms":
if val is None or val == "":
continue
try:
tms = int(float(val))
if tms > 0:
overlay["time_estimate"] = tms
except (ValueError, TypeError):
pass
elif fk == "assignees":
ids = _parse_int_list(val)
if ids:
overlay["assignees"] = ids
else:
logger.debug("clickup.updateTask: unknown fieldKey %s", fk)
merged = {**base, **overlay}
if custom_rows:
by_id: Dict[str, Dict[str, Any]] = {}
existing = merged.get("custom_fields")
if isinstance(existing, list):
for x in existing:
if isinstance(x, dict) and x.get("id") is not None:
by_id[str(x["id"])] = x
for x in custom_rows:
by_id[str(x["id"])] = x
merged["custom_fields"] = list(by_id.values())
resolved_params["taskUpdate"] = json.dumps(merged, ensure_ascii=False) if merged else ""