gateway/modules/workflows/methods/methodClickup/actions/get_task.py

40 lines
1.5 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
from typing import Any, Dict
from modules.datamodels.datamodelChat import ActionDocument, ActionResult
from ..helpers.pathparse import parse_task_id
logger = logging.getLogger(__name__)
async def get_task(self, parameters: Dict[str, Any]) -> ActionResult:
connection_reference = parameters.get("connectionReference")
task_id = (parameters.get("taskId") or "").strip()
path_hint = (parameters.get("path") or parameters.get("pathQuery") or "").strip()
if not connection_reference:
return ActionResult.isFailure(error="connectionReference is required")
if not task_id and path_hint:
task_id = parse_task_id(path_hint) or ""
if not task_id:
return ActionResult.isFailure(error="taskId is required (or path ending in .../task/{id})")
conn = self.connection.get_clickup_connection(connection_reference)
if not conn:
return ActionResult.isFailure(error="No valid ClickUp connection")
data = await self.services.clickup.getTask(task_id)
if isinstance(data, dict) and data.get("error"):
return ActionResult.isFailure(error=str(data.get("error")) + (data.get("body") or ""))
doc = ActionDocument(
documentName=f"clickup_task_{task_id}.json",
documentData=json.dumps(data, ensure_ascii=False, indent=2),
mimeType="application/json",
validationMetadata={"actionType": "clickup.getTask", "taskId": task_id},
)
return ActionResult.isSuccess(documents=[doc])