gateway/modules/interfaces/interfaceTicketObjects.py
2025-09-24 23:18:10 +02:00

186 lines
7.2 KiB
Python

from typing import Any, Optional
from datetime import datetime, timezone
from modules.interfaces.interfaceTicketModel import TicketBase, Task
# Module-level factory to create TicketInterface by connector type
async def createTicketInterfaceByType(
*,
taskSyncDefinition: dict,
connectorType: str,
connectorParams: dict,
) -> "TicketInterface":
connectorTypeLower = (connectorType or "").strip().lower()
if connectorTypeLower == "jira":
from modules.connectors.connectorTicketsJira import ConnectorTicketJira
connector_ticket = ConnectorTicketJira(**connectorParams)
elif connectorTypeLower == "clickup":
from modules.connectors.connectorTicketsClickup import ConnectorTicketClickup
# ClickUp does not require async factory; instantiate directly
connector_ticket = ConnectorTicketClickup(**connectorParams)
else:
raise ValueError(f"Unsupported connector_type: {connectorType}")
return TicketInterface(
connector_ticket=connector_ticket,
task_sync_definition=taskSyncDefinition,
)
class TicketInterface:
def __init__(self, *, connector_ticket: TicketBase, task_sync_definition: dict):
self.connector_ticket = connector_ticket
self.task_sync_definition = task_sync_definition
async def exportTicketsAsList(self) -> list[dict]:
tickets = await self.connector_ticket.read_tasks(limit=0)
transformed_tasks = self._transformTasks(tickets, includePut=True)
data_list = [task.data for task in transformed_tasks]
return self._filterEmptyRecords(data_list)
async def importListToTickets(self, records: list[dict]) -> None:
updates: list[Task] = []
for row in records:
task_id = row.get("ID")
if not task_id:
continue
fields = {}
for field_name, field_config in self.task_sync_definition.items():
if field_config[0] == "put":
field_path = field_config[1]
value = row.get(field_name, "")
if len(field_path) >= 2 and field_path[0] == "fields":
field_id = field_path[1]
fields[field_id] = value
if fields:
updates.append(Task(data={"ID": task_id, "fields": fields}))
if updates:
await self.connector_ticket.write_tasks(updates)
def _transformTasks(
self, tasks: list[Task], includePut: bool = False
) -> list[Task]:
"""Transforms tasks according to the task_sync_definition."""
transformed_tasks = []
for task in tasks:
transformed_data = {}
# Process each field in the sync definition
for field_name, field_config in self.task_sync_definition.items():
direction = field_config[0]
field_path = field_config[1]
# Get the right fields
if direction == "get" or includePut:
value = self._extractFieldValue(task.data, field_path, field_name)
transformed_data[field_name] = value
# Create new Task with transformed data
transformed_task = Task(data=transformed_data)
transformed_tasks.append(transformed_task)
return transformed_tasks
def _extractFieldValue(self, issue_data: dict, field_path: list[str], field_name: str = None) -> Any:
"""Extract field value from ticket data using field path."""
value = issue_data
try:
for key in field_path:
if value is not None:
value = value[key]
if value is None:
return None
# Handle complex objects that have a 'value' field (like custom field options)
if isinstance(value, dict) and "value" in value:
value = value["value"]
# Handle lists of objects with 'value' fields
elif (
isinstance(value, list)
and len(value) > 0
and isinstance(value[0], dict)
and "value" in value[0]
):
value = value[0]["value"]
# Apply date formatting for date fields
if field_name and self._isDateField(field_name):
value = self._formatDateForExcel(value)
return value
except (KeyError, TypeError):
return None
def _formatDateForExcel(self, date_value: Any) -> Optional[str]:
"""Format date value for Excel export.
Handles various date formats and converts them to a consistent format suitable for Excel display.
Args:
date_value: Date value from Tickets (string, datetime, or None)
Returns:
Formatted date string or None if invalid/empty
"""
if not date_value:
return None
try:
# Handle ISO 8601 strings (JIRA format: 2025-09-16T12:33:10.044+0200)
if isinstance(date_value, str):
# Parse ISO format with timezone
if 'T' in date_value and ('+' in date_value or 'Z' in date_value):
dt = datetime.fromisoformat(date_value.replace('Z', '+00:00'))
# Convert to UTC for consistency
if dt.tzinfo:
dt = dt.astimezone(timezone.utc)
return dt.strftime('%Y-%m-%d %H:%M:%S UTC')
# Handle simple date strings
elif len(date_value) == 10 and date_value.count('-') == 2:
dt = datetime.strptime(date_value, '%Y-%m-%d')
return dt.strftime('%Y-%m-%d')
else:
# Try to parse as datetime
dt = datetime.fromisoformat(date_value)
return dt.strftime('%Y-%m-%d %H:%M:%S')
# Handle datetime objects
elif isinstance(date_value, datetime):
if date_value.tzinfo:
dt = date_value.astimezone(timezone.utc)
else:
dt = date_value
return dt.strftime('%Y-%m-%d %H:%M:%S UTC')
return str(date_value)
except (ValueError, TypeError) as e:
# Log error but don't fail the sync
return str(date_value) if date_value else None
def _isDateField(self, field_name: str) -> bool:
"""Check if a field is a date field based on its name.
Args:
field_name: Name of the field
Returns:
True if field is likely a date field
"""
date_keywords = ['date', 'time', 'created', 'updated', 'due', 'deadline']
return any(keyword in field_name.lower() for keyword in date_keywords)
def _filterEmptyRecords(self, records: list[dict]) -> list[dict]:
"""Remove records that are missing an ID.
Purposefully only filter by presence of 'ID' to avoid dropping
valid rows with many empty optional fields.
"""
filtered: list[dict] = []
for row in records:
if isinstance(row, dict) and row.get("ID"):
filtered.append(row)
return filtered