# Copyright (c) 2025 Patrick Motsch # Merge clickup.updateTask node parameter taskUpdateEntries into taskUpdate JSON. import json import logging from datetime import datetime, timezone from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) def _unwrap_value(v: Any) -> Any: if isinstance(v, dict) and v.get("type") == "value" and "value" in v: return v.get("value") return v def _unwrap_dynamic(v: Any) -> Any: return _unwrap_value(v) def _parse_int_list(val: Any) -> List[int]: if val is None: return [] val = _unwrap_value(val) if isinstance(val, str) and val.strip(): try: parsed = json.loads(val) if isinstance(parsed, list): return [int(x) for x in parsed if x is not None and str(x).strip() != ""] except (json.JSONDecodeError, ValueError, TypeError): return [] if isinstance(val, list): out: List[int] = [] for x in val: if x is None or (isinstance(x, str) and not x.strip()): continue try: out.append(int(x)) except (ValueError, TypeError): continue return out return [] def _parse_due_date_ms(v: Any) -> Optional[int]: v = _unwrap_value(v) if v is None or v == "": return None if isinstance(v, str) and len(v) >= 10 and v[4] == "-" and v[7] == "-": try: dt = datetime.strptime(v[:10], "%Y-%m-%d").replace(tzinfo=timezone.utc) return int(dt.timestamp() * 1000) except ValueError: pass try: i = int(float(v)) return i if i > 0 else None except (ValueError, TypeError): return None def _parse_time_estimate_hours_to_ms(v: Any) -> Optional[int]: v = _unwrap_value(v) if v is None or v == "": return None try: h = float(v) if h < 0: return None return int(round(h * 3600 * 1000)) except (ValueError, TypeError): return None def merge_clickup_task_update_entries(resolved_params: Dict[str, Any]) -> None: """ Pop taskUpdateEntries from resolved_params and merge into taskUpdate (dict or JSON string). Existing taskUpdate (advanced JSON) is the base; entry rows override by key. """ entries = resolved_params.pop("taskUpdateEntries", None) json_raw = resolved_params.get("taskUpdate") base: Dict[str, Any] = {} if isinstance(json_raw, str) and json_raw.strip(): try: parsed = json.loads(json_raw) if isinstance(parsed, dict): base = dict(parsed) except json.JSONDecodeError: logger.warning("clickup.updateTask: taskUpdate is not valid JSON, ignoring base") elif isinstance(json_raw, dict): base = dict(json_raw) if not isinstance(entries, list) or not entries: if not base and json_raw not in (None, "", {}): resolved_params["taskUpdate"] = json_raw elif base: resolved_params["taskUpdate"] = json.dumps(base, ensure_ascii=False) return overlay: Dict[str, Any] = {} custom_rows: List[Dict[str, Any]] = [] for row in entries: if not isinstance(row, dict): continue fk = row.get("fieldKey") or row.get("field") if fk is None: continue fk = str(fk).strip() val = _unwrap_dynamic(row.get("value")) if fk == "custom_field": cfid = _unwrap_dynamic(row.get("customFieldId")) if not cfid or not str(cfid).strip(): continue if val is None or val == "": continue custom_rows.append({"id": str(cfid).strip(), "value": val}) continue if fk == "name" and val is not None and str(val).strip(): overlay["name"] = str(val).strip() elif fk == "description": overlay["description"] = "" if val is None else str(val) elif fk == "status" and val is not None and str(val).strip(): overlay["status"] = str(val).strip() elif fk == "priority": if val is None or val == "": continue try: pi = int(float(val)) if 1 <= pi <= 4: overlay["priority"] = pi except (ValueError, TypeError): pass elif fk == "due_date": dms = _parse_due_date_ms(val) if dms is not None: overlay["due_date"] = dms elif fk == "time_estimate_h": tms = _parse_time_estimate_hours_to_ms(val) if tms is not None: overlay["time_estimate"] = tms elif fk == "time_estimate_ms": if val is None or val == "": continue try: tms = int(float(val)) if tms > 0: overlay["time_estimate"] = tms except (ValueError, TypeError): pass elif fk == "assignees": ids = _parse_int_list(val) if ids: overlay["assignees"] = ids else: logger.debug("clickup.updateTask: unknown fieldKey %s", fk) merged = {**base, **overlay} if custom_rows: by_id: Dict[str, Dict[str, Any]] = {} existing = merged.get("custom_fields") if isinstance(existing, list): for x in existing: if isinstance(x, dict) and x.get("id") is not None: by_id[str(x["id"])] = x for x in custom_rows: by_id[str(x["id"])] = x merged["custom_fields"] = list(by_id.values()) resolved_params["taskUpdate"] = json.dumps(merged, ensure_ascii=False) if merged else ""