136 lines
5 KiB
Python
136 lines
5 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""ClickUp API routes — lists and tasks (connection-scoped). OAuth lives under /api/clickup/auth/* in routeSecurityClickup."""
|
|
|
|
import logging
|
|
from typing import Any, Dict, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request, status
|
|
from modules.auth import getCurrentUser, limiter
|
|
from modules.datamodels.datamodelUam import AuthAuthority, User, UserConnection
|
|
from modules.interfaces.interfaceDbApp import getInterface
|
|
from modules.serviceHub import getInterface as getServices
|
|
from modules.shared.i18nRegistry import apiRouteContext
|
|
|
|
routeApiMsg = apiRouteContext("routeClickup")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(
|
|
prefix="/api/clickup",
|
|
tags=["ClickUp"],
|
|
responses={
|
|
404: {"description": "Not found"},
|
|
400: {"description": "Bad request"},
|
|
401: {"description": "Unauthorized"},
|
|
500: {"description": "Internal server error"},
|
|
},
|
|
)
|
|
|
|
|
|
def _getUserConnection(interface, connection_id: str, user_id: str) -> Optional[UserConnection]:
|
|
try:
|
|
connections = interface.getUserConnections(user_id)
|
|
for conn in connections:
|
|
if conn.id == connection_id:
|
|
return conn
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting user connection: {e}")
|
|
return None
|
|
|
|
|
|
def _clickup_connection_or_404(interface, connection_id: str, user_id: str) -> UserConnection:
|
|
connection = _getUserConnection(interface, connection_id, user_id)
|
|
if not connection:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=routeApiMsg("Connection not found"))
|
|
authority = connection.authority.value if hasattr(connection.authority, "value") else str(connection.authority)
|
|
if authority.lower() != AuthAuthority.CLICKUP.value:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Connection is not a ClickUp connection"),
|
|
)
|
|
return connection
|
|
|
|
|
|
def _svc_for_connection(current_user: User, connection: UserConnection):
|
|
services = getServices(current_user, None)
|
|
if not services.clickup.setAccessTokenFromConnection(connection):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=routeApiMsg("Failed to set ClickUp access token"),
|
|
)
|
|
return services.clickup
|
|
|
|
|
|
@router.get("/{connectionId}/teams/{teamId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def get_team(
|
|
request: Request,
|
|
connectionId: str = Path(...),
|
|
teamId: str = Path(...),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> Dict[str, Any]:
|
|
"""Workspace/team details including members (for assignee pickers)."""
|
|
interface = getInterface(currentUser)
|
|
conn = _clickup_connection_or_404(interface, connectionId, currentUser.id)
|
|
cu = _svc_for_connection(currentUser, conn)
|
|
return await cu.getTeam(teamId)
|
|
|
|
|
|
@router.get("/{connectionId}/lists/{listId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def get_list(
|
|
request: Request,
|
|
connectionId: str = Path(...),
|
|
listId: str = Path(...),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> Dict[str, Any]:
|
|
interface = getInterface(currentUser)
|
|
conn = _clickup_connection_or_404(interface, connectionId, currentUser.id)
|
|
cu = _svc_for_connection(currentUser, conn)
|
|
return await cu.getList(listId)
|
|
|
|
|
|
@router.get("/{connectionId}/lists/{listId}/fields", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def get_list_fields(
|
|
request: Request,
|
|
connectionId: str = Path(...),
|
|
listId: str = Path(...),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> Dict[str, Any]:
|
|
interface = getInterface(currentUser)
|
|
conn = _clickup_connection_or_404(interface, connectionId, currentUser.id)
|
|
cu = _svc_for_connection(currentUser, conn)
|
|
return await cu.getListFields(listId)
|
|
|
|
|
|
@router.get("/{connectionId}/lists/{listId}/tasks", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def get_list_tasks(
|
|
request: Request,
|
|
connectionId: str = Path(...),
|
|
listId: str = Path(...),
|
|
page: int = Query(0),
|
|
include_closed: bool = Query(False),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> Dict[str, Any]:
|
|
interface = getInterface(currentUser)
|
|
conn = _clickup_connection_or_404(interface, connectionId, currentUser.id)
|
|
cu = _svc_for_connection(currentUser, conn)
|
|
return await cu.getTasksInList(listId, page=page, include_closed=include_closed)
|
|
|
|
|
|
@router.get("/{connectionId}/tasks/{taskId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def get_task(
|
|
request: Request,
|
|
connectionId: str = Path(...),
|
|
taskId: str = Path(...),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> Dict[str, Any]:
|
|
interface = getInterface(currentUser)
|
|
conn = _clickup_connection_or_404(interface, connectionId, currentUser.id)
|
|
cu = _svc_for_connection(currentUser, conn)
|
|
return await cu.getTask(taskId)
|