from typing import Any, Optional from datetime import datetime, timezone # Module-level factory to create TicketInterface by connector type async def createTicketInterfaceByType( *, taskSyncDefinition: dict, connectorType: str, connectorParams: dict, ) -> "TicketInterface": connectorTypeLower = (connectorType or "").strip().lower() if connectorTypeLower == "jira": from modules.connectors.connectorTicketsJira import ConnectorTicketJira connector_ticket = ConnectorTicketJira(**connectorParams) elif connectorTypeLower == "clickup": from modules.connectors.connectorTicketsClickup import ConnectorTicketClickup # ClickUp does not require async factory; instantiate directly connector_ticket = ConnectorTicketClickup(**connectorParams) else: raise ValueError(f"Unsupported connector_type: {connectorType}") return TicketInterface( connector_ticket=connector_ticket, task_sync_definition=taskSyncDefinition, ) class TicketInterface: def __init__(self, *, connector_ticket, task_sync_definition: dict): self.connector_ticket = connector_ticket self.task_sync_definition = task_sync_definition async def exportTicketsAsList(self) -> list[dict]: tickets: list[dict] = await self.connector_ticket.readTasks(limit=0) transformed_tasks = self._transformTicketRecords(tickets, includePut=True) # Return plain dictionaries filtered by presence of ID rows: list[dict] = [] for task in transformed_tasks: if isinstance(task, dict) and task.get("ID"): rows.append(task) return rows async def importListToTickets(self, records: list[dict]) -> None: updates: list[dict] = [] for row in records: task_id = row.get("ID") if not task_id: continue fields = {} for field_name, field_config in self.task_sync_definition.items(): if field_config[0] == "put": field_path = field_config[1] value = row.get(field_name, "") if len(field_path) >= 2 and field_path[0] == "fields": field_id = field_path[1] fields[field_id] = value if fields: updates.append({"ID": task_id, "fields": fields}) if updates: await self.connector_ticket.writeTasks(updates) def _transformTicketRecords( self, tasks: list[dict], includePut: bool = False ) -> list[dict]: """Transforms tasks according to the task_sync_definition.""" transformed_tasks: list[dict] = [] for task in tasks: transformed_data = {} # Process each field in the sync definition for field_name, field_config in self.task_sync_definition.items(): direction = field_config[0] field_path = field_config[1] # Get the right fields if direction == "get" or includePut: value = self._extractFieldValue(task, field_path, field_name) transformed_data[field_name] = value transformed_tasks.append(transformed_data) return transformed_tasks def _extractFieldValue(self, issue_data: dict, field_path: list[str], field_name: str = None) -> Any: """Extract field value from ticket data using field path.""" value = issue_data try: for key in field_path: if value is not None: value = value[key] if value is None: return None # Handle complex objects that have a 'value' field (like custom field options) if isinstance(value, dict) and "value" in value: value = value["value"] # Handle lists of objects with 'value' fields elif ( isinstance(value, list) and len(value) > 0 and isinstance(value[0], dict) and "value" in value[0] ): value = value[0]["value"] # Apply date formatting for date fields if field_name and self._isDateField(field_name): value = self._formatDateForExcel(value) return value except (KeyError, TypeError): return None def _formatDateForExcel(self, date_value: Any) -> Optional[str]: """Format date value for Excel export. Handles various date formats and converts them to a consistent format suitable for Excel display. Args: date_value: Date value from Tickets (string, datetime, or None) Returns: Formatted date string or None if invalid/empty """ if not date_value: return None try: # Handle ISO 8601 strings (JIRA format: 2025-09-16T12:33:10.044+0200) if isinstance(date_value, str): # Parse ISO format with timezone if 'T' in date_value and ('+' in date_value or 'Z' in date_value): dt = datetime.fromisoformat(date_value.replace('Z', '+00:00')) # Convert to UTC for consistency if dt.tzinfo: dt = dt.astimezone(timezone.utc) return dt.strftime('%Y-%m-%d %H:%M:%S UTC') # Handle simple date strings elif len(date_value) == 10 and date_value.count('-') == 2: dt = datetime.strptime(date_value, '%Y-%m-%d') return dt.strftime('%Y-%m-%d') else: # Try to parse as datetime dt = datetime.fromisoformat(date_value) return dt.strftime('%Y-%m-%d %H:%M:%S') # Handle datetime objects elif isinstance(date_value, datetime): if date_value.tzinfo: dt = date_value.astimezone(timezone.utc) else: dt = date_value return dt.strftime('%Y-%m-%d %H:%M:%S UTC') return str(date_value) except (ValueError, TypeError) as e: # Log error but don't fail the sync return str(date_value) if date_value else None def _isDateField(self, field_name: str) -> bool: """Check if a field is a date field based on its name. Args: field_name: Name of the field Returns: True if field is likely a date field """ date_keywords = ['date', 'time', 'created', 'updated', 'due', 'deadline'] return any(keyword in field_name.lower() for keyword in date_keywords) def _filterEmptyRecords(self, records: list[dict]) -> list[dict]: return [row for row in records if isinstance(row, dict) and row.get("ID")]