gateway/modules/workflows/methods/methodClickup/actions/search_tasks.py

221 lines
7.6 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
import re
from typing import Any, Dict, List, Optional
from modules.datamodels.datamodelChat import ActionDocument, ActionResult
logger = logging.getLogger(__name__)
_DESC_MAX = 4000
_MAX_LIST_PAGES = 50
_DASHES = re.compile(r"[\u2010-\u2015\-]")
def _norm_title(s: str) -> str:
"""Lowercase, unify hyphens/dashes, collapse spaces (helps full-title matches)."""
t = (s or "").strip().lower()
t = _DASHES.sub("-", t)
t = re.sub(r"\s+", " ", t)
return t
def _title_contains_query(name: str, query: str) -> bool:
if not query:
return True
n = _norm_title(name)
q = _norm_title(query)
if q in n:
return True
return query.lower() in (name or "").lower()
def _task_text_for_broad(t: Dict[str, Any]) -> str:
parts: List[str] = []
if t.get("name"):
parts.append(str(t["name"]))
d = t.get("description") or t.get("text_content") or t.get("textcontent") or ""
if d:
parts.append(str(d))
return " ".join(parts).lower()
def _task_matches_query(t: Dict[str, Any], query: str, *, match_name_only: bool) -> bool:
if not query:
return True
if match_name_only:
return _title_contains_query((t.get("name") or ""), query)
return query.lower() in _task_text_for_broad(t)
def _pick(d: Dict[str, Any], *keys: str, default: Any = None) -> Any:
for k in keys:
if k in d and d[k] is not None:
return d[k]
return default
def _slim_custom_field(cf: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Only include custom fields that have a value (omit null noise)."""
val = cf.get("value")
if val is None or val == "":
return None
return {
"id": cf.get("id"),
"name": cf.get("name"),
"type": cf.get("type"),
"value": val,
}
def _slim_clickup_task(t: Dict[str, Any]) -> Dict[str, Any]:
"""Usable automation payload — not the full ClickUp API mirror (no nested typeconfig)."""
status = t.get("status")
if not isinstance(status, dict):
status = {}
li = t.get("list")
if not isinstance(li, dict):
li = {}
desc = _pick(t, "description", "Description", default="") or ""
if len(desc) > _DESC_MAX:
desc = desc[:_DESC_MAX] + "…(truncated)"
assignees: List[Dict[str, Any]] = []
for a in t.get("assignees") or []:
if isinstance(a, dict):
assignees.append(
{
"id": a.get("id"),
"username": a.get("username"),
"email": a.get("email"),
}
)
cfs = t.get("custom_fields") or t.get("customfields") or []
slim_cf: List[Dict[str, Any]] = []
for cf in cfs:
if isinstance(cf, dict):
row = _slim_custom_field(cf)
if row is not None:
slim_cf.append(row)
out: Dict[str, Any] = {
"id": t.get("id"),
"name": t.get("name"),
"text_content": _pick(t, "text_content", "textcontent"),
"description": desc,
"status": status.get("status"),
"url": t.get("url"),
"list": {"id": li.get("id"), "name": li.get("name")} if li else None,
"date_created": _pick(t, "date_created", "datecreated"),
"date_updated": _pick(t, "date_updated", "dateupdated"),
"due_date": _pick(t, "due_date", "duedate"),
}
if assignees:
out["assignees"] = assignees
if slim_cf:
out["custom_fields"] = slim_cf
pr = t.get("priority")
if pr is not None:
out["priority"] = pr
return out
def _slim_search_payload(data: Dict[str, Any]) -> Dict[str, Any]:
tasks = data.get("tasks") or []
slim_tasks = [_slim_clickup_task(t) if isinstance(t, dict) else t for t in tasks]
out: Dict[str, Any] = {k: v for k, v in data.items() if k != "tasks"}
out["tasks"] = slim_tasks
out["_nyla"] = {
"slim": True,
"hint": "Set fullTaskData=true for raw ClickUp API objects.",
}
return out
async def search_tasks(self, parameters: Dict[str, Any]) -> ActionResult:
connection_reference = parameters.get("connectionReference")
team_id = (parameters.get("teamId") or "").strip()
query = (parameters.get("query") or parameters.get("searchQuery") or "").strip()
list_id_filter = (parameters.get("listId") or "").strip()
if not connection_reference:
return ActionResult.isFailure(error="connectionReference is required")
if not team_id:
return ActionResult.isFailure(error="teamId is required (workspace id from ClickUp)")
if not query:
return ActionResult.isFailure(error="query is required")
conn = self.connection.get_clickup_connection(connection_reference)
if not conn:
return ActionResult.isFailure(error="No valid ClickUp connection")
full_task_data = bool(parameters.get("fullTaskData") or parameters.get("fullPayload"))
if "matchNameOnly" in parameters:
match_name_only = bool(parameters.get("matchNameOnly"))
elif "matchTitle" in parameters:
match_name_only = bool(parameters.get("matchTitle"))
else:
match_name_only = True
page = int(parameters.get("page") or 0)
include_closed = bool(parameters.get("includeClosed", False))
if list_id_filter:
# List API: scan pages in this list and match locally (team search does not scope to one table).
filtered_tasks: List[Dict[str, Any]] = []
p = page
while p < page + _MAX_LIST_PAGES:
batch = await self.services.clickup.getTasksInList(
list_id_filter, page=p, include_closed=include_closed, subtasks=True
)
if isinstance(batch, dict) and batch.get("error"):
return ActionResult.isFailure(error=str(batch.get("error")) + (batch.get("body") or ""))
tasks = batch.get("tasks") or []
last = bool(batch.get("last_page") or batch.get("lastpage"))
for t in tasks:
if isinstance(t, dict) and _task_matches_query(t, query, match_name_only=match_name_only):
filtered_tasks.append(t)
if last or not tasks:
break
p += 1
data: Dict[str, Any] = {"tasks": filtered_tasks, "lastpage": True}
search_mode = "list"
else:
data = await self.services.clickup.searchTeamTasks(team_id, query=query, page=page)
if isinstance(data, dict) and data.get("error"):
return ActionResult.isFailure(error=str(data.get("error")) + (data.get("body") or ""))
if match_name_only and isinstance(data, dict):
tasks = data.get("tasks") or []
filtered = [
t
for t in tasks
if isinstance(t, dict) and _title_contains_query((t.get("name") or ""), query)
]
data = {**data, "tasks": filtered}
search_mode = "team"
if isinstance(data, dict) and not full_task_data:
data = _slim_search_payload(data)
doc = ActionDocument(
documentName="clickup_search_tasks.json",
documentData=json.dumps(data, ensure_ascii=False, indent=2),
mimeType="application/json",
validationMetadata={
"actionType": "clickup.searchTasks",
"teamId": team_id,
"query": query,
"slim": not full_task_data,
"matchNameOnly": match_name_only,
"searchMode": search_mode,
"listId": list_id_filter or None,
"includeClosed": include_closed if list_id_filter else None,
},
)
return ActionResult.isSuccess(documents=[doc])