"""Jira connector for CRUD operations.""" from dataclasses import dataclass import os import logging import aiohttp import asyncio import json from modules.interfaces.interfaceTicketModel import ( TicketBase, TicketFieldAttribute, Task, ) logger = logging.getLogger(__name__) @dataclass class ConnectorTicketJira(TicketBase): jira_username: str jira_api_token: str jira_url: str project_code: str issue_type: str @classmethod async def create( cls, *, jira_username: str, jira_api_token: str, jira_url: str, project_code: str, issue_type: str, ): return ConnectorTicketJira( jira_username=jira_username, jira_api_token=jira_api_token, jira_url=jira_url, project_code=project_code, issue_type=issue_type, ) async def read_attributes(self) -> list[TicketFieldAttribute]: """ Read field attributes from Jira by querying for a single issue and extracting the field mappings. Returns: list[TicketFieldAttribute]: List of field attributes with names and IDs """ # Build JQL dynamically; allow empty or '*' issue_type to mean "all types" if self.issue_type and self.issue_type != "*": jql_query = f"project={self.project_code} AND issuetype={self.issue_type}" else: jql_query = f"project={self.project_code}" # Prepare the request URL (use JQL search endpoint) url = f"{self.jira_url}/rest/api/3/search/jql" # Prepare authentication auth = aiohttp.BasicAuth(self.jira_username, self.jira_api_token) try: async with aiohttp.ClientSession() as session: headers = {"Content-Type": "application/json"} payload = { "jql": jql_query, "maxResults": 1 # Don't specify fields to get all available fields } async with session.post(url, json=payload, auth=auth, headers=headers) as response: if response.status != 200: error_text = await response.text() logger.error( f"Jira API request failed with status {response.status}: {error_text}" ) raise Exception( f"Jira API request failed with status {response.status}" ) data = await response.json() # Extract issues and field names issues = data.get("issues", []) field_names = data.get("names", {}) # If no issues or fields are present, fall back to the fields API if not issues or not issues[0].get("fields"): logger.warning( "No issue fields returned by search; falling back to /rest/api/3/field" ) return await self._read_all_fields_via_fields_api() # Extract field attributes from the first issue attributes = [] issue = issues[0] fields = issue.get("fields", {}) for field_id, value in fields.items(): field_name = field_names.get(field_id, field_id) attributes.append( TicketFieldAttribute(field_name=field_name, field=field_id) ) logger.info( f"Successfully retrieved {len(attributes)} field attributes from Jira" ) return attributes except aiohttp.ClientError as e: logger.error(f"HTTP client error while fetching Jira attributes: {str(e)}") raise Exception(f"Failed to connect to Jira: {str(e)}") except json.JSONDecodeError as e: logger.error(f"Failed to parse Jira API response: {str(e)}") raise Exception(f"Invalid response from Jira API: {str(e)}") except Exception as e: logger.error(f"Unexpected error while fetching Jira attributes: {str(e)}") raise async def _read_all_fields_via_fields_api(self) -> list[TicketFieldAttribute]: """Fallback: use Jira fields API to list all fields with id->name mapping.""" auth = aiohttp.BasicAuth(self.jira_username, self.jira_api_token) url = f"{self.jira_url}/rest/api/3/field" try: async with aiohttp.ClientSession() as session: async with session.get(url, auth=auth) as response: if response.status != 200: error_text = await response.text() logger.error( f"Jira fields API failed with status {response.status}: {error_text}" ) return [] data = await response.json() attributes: list[TicketFieldAttribute] = [] for field in data: field_id = field.get("id") field_name = field.get("name", field_id) if field_id: attributes.append( TicketFieldAttribute(field_name=field_name, field=field_id) ) logger.info( f"Successfully retrieved {len(attributes)} field attributes via fields API" ) return attributes except Exception as e: logger.error(f"Error while calling fields API: {str(e)}") return [] async def read_tasks(self, *, limit: int = 0) -> list[Task]: """ Read tasks from Jira with pagination support. Args: limit: Maximum number of tasks to retrieve. 0 means no limit. Returns: list[Task]: List of tasks with their data """ # Build JQL dynamically; allow empty or '*' issue_type to mean "all types" if self.issue_type and self.issue_type != "*": jql_query = f"project={self.project_code} AND issuetype={self.issue_type}" else: jql_query = f"project={self.project_code}" # Initialize variables for pagination (cursor-based /search/jql) max_results = 100 next_page_token: str | None = None tasks = [] page_counter = 0 max_pages_safety_cap = 1000 seen_issue_ids: set[str] = set() # Prepare authentication auth = aiohttp.BasicAuth(self.jira_username, self.jira_api_token) url = f"{self.jira_url}/rest/api/3/search/jql" try: async with aiohttp.ClientSession() as session: while True: # Prepare request payload for JQL search with cursor-based pagination # According to Jira API docs, BOTH jql AND nextPageToken should be included in subsequent requests payload = { "jql": jql_query, "maxResults": max_results, "fields": ["*all"] # Get all fields } if next_page_token: # For subsequent pages, include BOTH jql and nextPageToken payload["nextPageToken"] = next_page_token headers = {"Content-Type": "application/json"} # Debug: log the payload being sent logger.debug(f"JIRA request payload: {json.dumps(payload, indent=2)}") async with session.post( url, json=payload, auth=auth, headers=headers ) as response: if response.status != 200: error_text = await response.text() logger.error( f"Failed to fetch tasks from Jira. Status code: {response.status}, Response: {error_text}" ) break data = await response.json() # Handle cursor-based pagination response issues = data.get("issues", []) is_last = data.get("isLast", False) current_next_page_token = data.get("nextPageToken") # Debug: log pagination info logger.debug(f"Pagination info - Issues: {len(issues)}, isLast: {is_last}, nextPageToken: {current_next_page_token[:50] if current_next_page_token else 'None'}...") new_items_added = 0 for issue in issues: # Store the raw JIRA issue data directly # This matches what the reference implementation expects issue_id = issue.get("id") or issue.get("key") if issue_id and issue_id in seen_issue_ids: continue if issue_id: seen_issue_ids.add(issue_id) task = Task(data=issue) tasks.append(task) new_items_added += 1 # Check limit if limit > 0 and len(tasks) >= limit: break logger.debug(f"Issues packages reading: {len(tasks)}") # Stop conditions # 1) No issues returned if len(issues) == 0: break # 1b) No new items added (duplicate page) -> prevent endless loop if new_items_added == 0: logger.warning("Pagination returned duplicate page; stopping to prevent loop") break # 2) Cursor-based pagination says last page if is_last: break # 3) Safety cap to avoid endless loops page_counter += 1 if page_counter >= max_pages_safety_cap: logger.warning("Stopping pagination due to safety cap") break # 4) Continue to next page if we have a nextPageToken if not current_next_page_token: logger.warning("No nextPageToken available, stopping pagination") break # Update the token for the next iteration next_page_token = current_next_page_token # Add a small delay to avoid token expiration issues await asyncio.sleep(0.1) logger.info(f"JIRA issues read: {len(tasks)} (cursor-based pagination)") return tasks except aiohttp.ClientError as e: logger.error(f"HTTP client error while fetching Jira tasks: {str(e)}") raise Exception(f"Failed to connect to Jira: {str(e)}") except json.JSONDecodeError as e: logger.error(f"Failed to parse Jira API response: {str(e)}") raise Exception(f"Invalid response from Jira API: {str(e)}") except Exception as e: logger.error(f"Unexpected error while fetching Jira tasks: {str(e)}") raise async def write_tasks(self, tasklist: list[Task]) -> None: """ Write/update tasks to Jira. Args: tasklist: List of Task objects containing task data to update """ headers = {"Accept": "application/json", "Content-Type": "application/json"} auth = aiohttp.BasicAuth(self.jira_username, self.jira_api_token) try: async with aiohttp.ClientSession() as session: for task in tasklist: task_data = task.data task_id = ( task_data.get("ID") or task_data.get("id") or task_data.get("key") ) if not task_id: logger.warning("Task missing ID or key, skipping update") continue # Extract fields to update from task data # The task data should contain the field updates in a "fields" key fields = task_data.get("fields", {}) if not fields: logger.debug(f"No fields to update for task {task_id}") continue # Convert ADF fields to proper format processed_fields = {} for field_id, field_value in fields.items(): if field_id == "customfield_10168": # Convert to ADF format for paragraph fields if isinstance(field_value, str) and field_value.strip(): processed_fields[field_id] = { "type": "doc", "version": 1, "content": [ { "type": "paragraph", "content": [ { "type": "text", "text": field_value } ] } ] } else: # Skip empty ADF fields logger.debug(f"Skipping empty ADF field {field_id} for task {task_id}") continue else: processed_fields[field_id] = field_value if not processed_fields: logger.debug(f"No valid fields to update for task {task_id}") continue # Prepare update data update_data = {"fields": processed_fields} # Make the update request url = f"{self.jira_url}/rest/api/3/issue/{task_id}" async with session.put( url, json=update_data, headers=headers, auth=auth ) as response: if response.status == 204: logger.info(f"JIRA task {task_id} updated successfully.") else: error_text = await response.text() logger.error( f"JIRA failed to update task {task_id}: {response.status} - {error_text}" ) except aiohttp.ClientError as e: logger.error(f"HTTP client error while updating Jira tasks: {str(e)}") raise Exception(f"Failed to connect to Jira: {str(e)}") except Exception as e: logger.error(f"Unexpected error while updating Jira tasks: {str(e)}") raise