221 lines
7.6 KiB
Python
221 lines
7.6 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from modules.datamodels.datamodelChat import ActionDocument, ActionResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DESC_MAX = 4000
|
|
_MAX_LIST_PAGES = 50
|
|
|
|
_DASHES = re.compile(r"[\u2010-\u2015\-]")
|
|
|
|
|
|
def _norm_title(s: str) -> str:
|
|
"""Lowercase, unify hyphens/dashes, collapse spaces (helps full-title matches)."""
|
|
t = (s or "").strip().lower()
|
|
t = _DASHES.sub("-", t)
|
|
t = re.sub(r"\s+", " ", t)
|
|
return t
|
|
|
|
|
|
def _title_contains_query(name: str, query: str) -> bool:
|
|
if not query:
|
|
return True
|
|
n = _norm_title(name)
|
|
q = _norm_title(query)
|
|
if q in n:
|
|
return True
|
|
return query.lower() in (name or "").lower()
|
|
|
|
|
|
def _task_text_for_broad(t: Dict[str, Any]) -> str:
|
|
parts: List[str] = []
|
|
if t.get("name"):
|
|
parts.append(str(t["name"]))
|
|
d = t.get("description") or t.get("text_content") or t.get("textcontent") or ""
|
|
if d:
|
|
parts.append(str(d))
|
|
return " ".join(parts).lower()
|
|
|
|
|
|
def _task_matches_query(t: Dict[str, Any], query: str, *, match_name_only: bool) -> bool:
|
|
if not query:
|
|
return True
|
|
if match_name_only:
|
|
return _title_contains_query((t.get("name") or ""), query)
|
|
return query.lower() in _task_text_for_broad(t)
|
|
|
|
|
|
def _pick(d: Dict[str, Any], *keys: str, default: Any = None) -> Any:
|
|
for k in keys:
|
|
if k in d and d[k] is not None:
|
|
return d[k]
|
|
return default
|
|
|
|
|
|
def _slim_custom_field(cf: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Only include custom fields that have a value (omit null noise)."""
|
|
val = cf.get("value")
|
|
if val is None or val == "":
|
|
return None
|
|
return {
|
|
"id": cf.get("id"),
|
|
"name": cf.get("name"),
|
|
"type": cf.get("type"),
|
|
"value": val,
|
|
}
|
|
|
|
|
|
def _slim_clickup_task(t: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Usable automation payload — not the full ClickUp API mirror (no nested typeconfig)."""
|
|
status = t.get("status")
|
|
if not isinstance(status, dict):
|
|
status = {}
|
|
li = t.get("list")
|
|
if not isinstance(li, dict):
|
|
li = {}
|
|
|
|
desc = _pick(t, "description", "Description", default="") or ""
|
|
if len(desc) > _DESC_MAX:
|
|
desc = desc[:_DESC_MAX] + "…(truncated)"
|
|
|
|
assignees: List[Dict[str, Any]] = []
|
|
for a in t.get("assignees") or []:
|
|
if isinstance(a, dict):
|
|
assignees.append(
|
|
{
|
|
"id": a.get("id"),
|
|
"username": a.get("username"),
|
|
"email": a.get("email"),
|
|
}
|
|
)
|
|
|
|
cfs = t.get("custom_fields") or t.get("customfields") or []
|
|
slim_cf: List[Dict[str, Any]] = []
|
|
for cf in cfs:
|
|
if isinstance(cf, dict):
|
|
row = _slim_custom_field(cf)
|
|
if row is not None:
|
|
slim_cf.append(row)
|
|
|
|
out: Dict[str, Any] = {
|
|
"id": t.get("id"),
|
|
"name": t.get("name"),
|
|
"text_content": _pick(t, "text_content", "textcontent"),
|
|
"description": desc,
|
|
"status": status.get("status"),
|
|
"url": t.get("url"),
|
|
"list": {"id": li.get("id"), "name": li.get("name")} if li else None,
|
|
"date_created": _pick(t, "date_created", "datecreated"),
|
|
"date_updated": _pick(t, "date_updated", "dateupdated"),
|
|
"due_date": _pick(t, "due_date", "duedate"),
|
|
}
|
|
if assignees:
|
|
out["assignees"] = assignees
|
|
if slim_cf:
|
|
out["custom_fields"] = slim_cf
|
|
pr = t.get("priority")
|
|
if pr is not None:
|
|
out["priority"] = pr
|
|
return out
|
|
|
|
|
|
def _slim_search_payload(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
tasks = data.get("tasks") or []
|
|
slim_tasks = [_slim_clickup_task(t) if isinstance(t, dict) else t for t in tasks]
|
|
out: Dict[str, Any] = {k: v for k, v in data.items() if k != "tasks"}
|
|
out["tasks"] = slim_tasks
|
|
out["_nyla"] = {
|
|
"slim": True,
|
|
"hint": "Set fullTaskData=true for raw ClickUp API objects.",
|
|
}
|
|
return out
|
|
|
|
|
|
async def search_tasks(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
connection_reference = parameters.get("connectionReference")
|
|
team_id = (parameters.get("teamId") or "").strip()
|
|
query = (parameters.get("query") or parameters.get("searchQuery") or "").strip()
|
|
list_id_filter = (parameters.get("listId") or "").strip()
|
|
if not connection_reference:
|
|
return ActionResult.isFailure(error="connectionReference is required")
|
|
if not team_id:
|
|
return ActionResult.isFailure(error="teamId is required (workspace id from ClickUp)")
|
|
if not query:
|
|
return ActionResult.isFailure(error="query is required")
|
|
|
|
conn = self.connection.get_clickup_connection(connection_reference)
|
|
if not conn:
|
|
return ActionResult.isFailure(error="No valid ClickUp connection")
|
|
|
|
full_task_data = bool(parameters.get("fullTaskData") or parameters.get("fullPayload"))
|
|
if "matchNameOnly" in parameters:
|
|
match_name_only = bool(parameters.get("matchNameOnly"))
|
|
elif "matchTitle" in parameters:
|
|
match_name_only = bool(parameters.get("matchTitle"))
|
|
else:
|
|
match_name_only = True
|
|
|
|
page = int(parameters.get("page") or 0)
|
|
include_closed = bool(parameters.get("includeClosed", False))
|
|
|
|
if list_id_filter:
|
|
# List API: scan pages in this list and match locally (team search does not scope to one table).
|
|
filtered_tasks: List[Dict[str, Any]] = []
|
|
p = page
|
|
while p < page + _MAX_LIST_PAGES:
|
|
batch = await self.services.clickup.getTasksInList(
|
|
list_id_filter, page=p, include_closed=include_closed, subtasks=True
|
|
)
|
|
if isinstance(batch, dict) and batch.get("error"):
|
|
return ActionResult.isFailure(error=str(batch.get("error")) + (batch.get("body") or ""))
|
|
tasks = batch.get("tasks") or []
|
|
last = bool(batch.get("last_page") or batch.get("lastpage"))
|
|
for t in tasks:
|
|
if isinstance(t, dict) and _task_matches_query(t, query, match_name_only=match_name_only):
|
|
filtered_tasks.append(t)
|
|
if last or not tasks:
|
|
break
|
|
p += 1
|
|
data: Dict[str, Any] = {"tasks": filtered_tasks, "lastpage": True}
|
|
search_mode = "list"
|
|
else:
|
|
data = await self.services.clickup.searchTeamTasks(team_id, query=query, page=page)
|
|
if isinstance(data, dict) and data.get("error"):
|
|
return ActionResult.isFailure(error=str(data.get("error")) + (data.get("body") or ""))
|
|
|
|
if match_name_only and isinstance(data, dict):
|
|
tasks = data.get("tasks") or []
|
|
filtered = [
|
|
t
|
|
for t in tasks
|
|
if isinstance(t, dict) and _title_contains_query((t.get("name") or ""), query)
|
|
]
|
|
data = {**data, "tasks": filtered}
|
|
search_mode = "team"
|
|
|
|
if isinstance(data, dict) and not full_task_data:
|
|
data = _slim_search_payload(data)
|
|
|
|
doc = ActionDocument(
|
|
documentName="clickup_search_tasks.json",
|
|
documentData=json.dumps(data, ensure_ascii=False, indent=2),
|
|
mimeType="application/json",
|
|
validationMetadata={
|
|
"actionType": "clickup.searchTasks",
|
|
"teamId": team_id,
|
|
"query": query,
|
|
"slim": not full_task_data,
|
|
"matchNameOnly": match_name_only,
|
|
"searchMode": search_mode,
|
|
"listId": list_id_filter or None,
|
|
"includeClosed": include_closed if list_id_filter else None,
|
|
},
|
|
)
|
|
return ActionResult.isSuccess(documents=[doc])
|