Merge pull request #46 from valueonag/int

delta sync issue resolved paginagion and new post
This commit is contained in:
ValueOn AG 2025-09-16 17:45:57 +02:00 committed by GitHub
commit 7bc7357732
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 10605 additions and 47 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,8 +1,10 @@
"""Jira connector for CRUD operations.""" """Jira connector for CRUD operations."""
from dataclasses import dataclass from dataclasses import dataclass
import os
import logging import logging
import aiohttp import aiohttp
import asyncio
import json import json
from modules.interfaces.interfaceTicketModel import ( from modules.interfaces.interfaceTicketModel import (
@ -49,18 +51,27 @@ class ConnectorTicketJira(TicketBase):
Returns: Returns:
list[TicketFieldAttribute]: List of field attributes with names and IDs list[TicketFieldAttribute]: List of field attributes with names and IDs
""" """
jql_query = f"project={self.project_code} AND issuetype={self.issue_type}" # Build JQL dynamically; allow empty or '*' issue_type to mean "all types"
if self.issue_type and self.issue_type != "*":
jql_query = f"project={self.project_code} AND issuetype={self.issue_type}"
else:
jql_query = f"project={self.project_code}"
# Prepare the request URL and parameters (use new search endpoint) # Prepare the request URL (use JQL search endpoint)
url = f"{self.jira_url}/rest/api/3/search/jql" url = f"{self.jira_url}/rest/api/3/search/jql"
params = {"jql": jql_query, "maxResults": 1, "expand": "names"}
# Prepare authentication # Prepare authentication
auth = aiohttp.BasicAuth(self.jira_username, self.jira_api_token) auth = aiohttp.BasicAuth(self.jira_username, self.jira_api_token)
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, auth=auth) as response: headers = {"Content-Type": "application/json"}
payload = {
"jql": jql_query,
"maxResults": 1,
"fields": ["summary", "status", "assignee", "created", "updated", "priority", "issuetype", "project", "customfield_10168", "customfield_10067", "customfield_10065"]
}
async with session.post(url, json=payload, auth=auth, headers=headers) as response:
if response.status != 200: if response.status != 200:
error_text = await response.text() error_text = await response.text()
logger.error( logger.error(
@ -150,13 +161,19 @@ class ConnectorTicketJira(TicketBase):
Returns: Returns:
list[Task]: List of tasks with their data list[Task]: List of tasks with their data
""" """
jql_query = f"project={self.project_code} AND issuetype={self.issue_type}" # Build JQL dynamically; allow empty or '*' issue_type to mean "all types"
if self.issue_type and self.issue_type != "*":
jql_query = f"project={self.project_code} AND issuetype={self.issue_type}"
else:
jql_query = f"project={self.project_code}"
# Initialize variables for pagination # Initialize variables for pagination (cursor-based /search/jql)
start_at = 0 max_results = 100
max_results = 50 next_page_token: str | None = None
total = 1 # Initialize with a value greater than 0 to enter the loop
tasks = [] tasks = []
page_counter = 0
max_pages_safety_cap = 1000
seen_issue_ids: set[str] = set()
# Prepare authentication # Prepare authentication
auth = aiohttp.BasicAuth(self.jira_username, self.jira_api_token) auth = aiohttp.BasicAuth(self.jira_username, self.jira_api_token)
@ -164,18 +181,25 @@ class ConnectorTicketJira(TicketBase):
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
while start_at < total and (limit == 0 or len(tasks) < limit): while True:
# Prepare request parameters # Prepare request payload for JQL search with cursor-based pagination
params = { # According to Jira API docs, BOTH jql AND nextPageToken should be included in subsequent requests
payload = {
"jql": jql_query, "jql": jql_query,
"startAt": start_at,
"maxResults": max_results, "maxResults": max_results,
"fields": ["summary", "status", "assignee", "created", "updated", "priority", "issuetype", "project", "customfield_10168", "customfield_10067", "customfield_10065"]
} }
if next_page_token:
# For subsequent pages, include BOTH jql and nextPageToken
payload["nextPageToken"] = next_page_token
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
async with session.get( # Debug: log the payload being sent
url, params=params, auth=auth, headers=headers logger.debug(f"JIRA request payload: {json.dumps(payload, indent=2)}")
async with session.post(
url, json=payload, auth=auth, headers=headers
) as response: ) as response:
if response.status != 200: if response.status != 200:
error_text = await response.text() error_text = await response.text()
@ -185,23 +209,74 @@ class ConnectorTicketJira(TicketBase):
break break
data = await response.json() data = await response.json()
# Debug: dump raw JIRA response per page
try:
debug_dir = os.path.join(os.getcwd(), "jira_debug")
os.makedirs(debug_dir, exist_ok=True)
token_suffix = next_page_token or "0"
safe_suffix = token_suffix[:16].replace("/", "_")
debug_file = os.path.join(debug_dir, f"search_response_{safe_suffix}.json")
with open(debug_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as _e:
logger.debug(f"Failed to write JIRA debug file: {str(_e)}")
# Handle cursor-based pagination response
issues = data.get("issues", []) issues = data.get("issues", [])
total = data.get("total", 0) is_last = data.get("isLast", False)
current_next_page_token = data.get("nextPageToken")
# Debug: log pagination info
logger.debug(f"Pagination info - Issues: {len(issues)}, isLast: {is_last}, nextPageToken: {current_next_page_token[:50] if current_next_page_token else 'None'}...")
new_items_added = 0
for issue in issues: for issue in issues:
# Store the raw JIRA issue data directly # Store the raw JIRA issue data directly
# This matches what the reference implementation expects # This matches what the reference implementation expects
issue_id = issue.get("id") or issue.get("key")
if issue_id and issue_id in seen_issue_ids:
continue
if issue_id:
seen_issue_ids.add(issue_id)
task = Task(data=issue) task = Task(data=issue)
tasks.append(task) tasks.append(task)
new_items_added += 1
# Check limit # Check limit
if limit > 0 and len(tasks) >= limit: if limit > 0 and len(tasks) >= limit:
break break
start_at += max_results
logger.debug(f"Issues packages reading: {len(tasks)}") logger.debug(f"Issues packages reading: {len(tasks)}")
logger.info(f"JIRA issues read: {len(tasks)}") # Stop conditions
# 1) No issues returned
if len(issues) == 0:
break
# 1b) No new items added (duplicate page) -> prevent endless loop
if new_items_added == 0:
logger.warning("Pagination returned duplicate page; stopping to prevent loop")
break
# 2) Cursor-based pagination says last page
if is_last:
break
# 3) Safety cap to avoid endless loops
page_counter += 1
if page_counter >= max_pages_safety_cap:
logger.warning("Stopping pagination due to safety cap")
break
# 4) Continue to next page if we have a nextPageToken
if not current_next_page_token:
logger.warning("No nextPageToken available, stopping pagination")
break
# Update the token for the next iteration
next_page_token = current_next_page_token
# Add a small delay to avoid token expiration issues
await asyncio.sleep(0.1)
logger.info(f"JIRA issues read: {len(tasks)} (cursor-based pagination)")
return tasks return tasks
except aiohttp.ClientError as e: except aiohttp.ClientError as e:
@ -246,8 +321,40 @@ class ConnectorTicketJira(TicketBase):
logger.debug(f"No fields to update for task {task_id}") logger.debug(f"No fields to update for task {task_id}")
continue continue
# Convert ADF fields to proper format
processed_fields = {}
for field_id, field_value in fields.items():
if field_id == "customfield_10168":
# Convert to ADF format for paragraph fields
if isinstance(field_value, str) and field_value.strip():
processed_fields[field_id] = {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [
{
"type": "text",
"text": field_value
}
]
}
]
}
else:
# Skip empty ADF fields
logger.debug(f"Skipping empty ADF field {field_id} for task {task_id}")
continue
else:
processed_fields[field_id] = field_value
if not processed_fields:
logger.debug(f"No valid fields to update for task {task_id}")
continue
# Prepare update data # Prepare update data
update_data = {"fields": fields} update_data = {"fields": processed_fields}
# Make the update request # Make the update request
url = f"{self.jira_url}/rest/api/3/issue/{task_id}" url = f"{self.jira_url}/rest/api/3/issue/{task_id}"

View file

@ -87,9 +87,18 @@ class TicketSharepointSyncInterface:
audit_log.append("Step 2: Transforming JIRA data...") audit_log.append("Step 2: Transforming JIRA data...")
transformed_tasks = self._transform_tasks(tickets, include_put=True) transformed_tasks = self._transform_tasks(tickets, include_put=True)
jira_data = [task.data for task in transformed_tasks] jira_data = [task.data for task in transformed_tasks]
# Remove empty records and those without an ID to avoid blank rows before_count = len(jira_data)
# Remove records without an ID to avoid blank rows
jira_data = self._filter_empty_records(jira_data) jira_data = self._filter_empty_records(jira_data)
audit_log.append(f"JIRA issues transformed: {len(jira_data)}") after_count = len(jira_data)
audit_log.append(f"JIRA issues transformed: {before_count}")
audit_log.append(f"JIRA issues after ID filter: {after_count}")
# Log a sample of IDs to diagnose empty export issues
try:
sample_ids = [str(row.get("ID")) for row in jira_data[:5]]
audit_log.append(f"Sample IDs: {', '.join(sample_ids)}")
except Exception:
pass
audit_log.append("") audit_log.append("")
# 3. Create JIRA export file in audit folder # 3. Create JIRA export file in audit folder
@ -480,7 +489,7 @@ class TicketSharepointSyncInterface:
# 7. Create Excel with 4-row structure and write to SharePoint # 7. Create Excel with 4-row structure and write to SharePoint
audit_log.append("Step 7: Writing updated Excel to SharePoint...") audit_log.append("Step 7: Writing updated Excel to SharePoint...")
# Ensure no empty records are written # Ensure no records without ID are written
merged_data = self._filter_empty_records(merged_data) merged_data = self._filter_empty_records(merged_data)
excel_content = self._create_excel_content(merged_data, existing_headers) excel_content = self._create_excel_content(merged_data, existing_headers)
await self.connector_sharepoint.upload_file( await self.connector_sharepoint.upload_file(
@ -726,35 +735,14 @@ class TicketSharepointSyncInterface:
return None return None
def _filter_empty_records(self, records: list[dict]) -> list[dict]: def _filter_empty_records(self, records: list[dict]) -> list[dict]:
"""Remove records that are effectively empty or missing an ID. """Remove records that are missing an ID.
- Drop rows with no 'ID' Purposefully only filter by presence of 'ID' to avoid dropping
- Drop rows where all mapped fields are empty/None/'' valid rows with many empty optional fields.
""" """
filtered: list[dict] = [] filtered: list[dict] = []
field_names = set(self.task_sync_definition.keys())
for row in records: for row in records:
if not isinstance(row, dict): if isinstance(row, dict) and row.get("ID"):
continue
# Require ID
task_id = row.get("ID")
if not task_id:
continue
# Check if all mapped fields are empty
non_empty = False
for name in field_names:
val = row.get(name)
if val is None:
continue
if isinstance(val, str) and val.strip() == "":
continue
# Consider dict/list values as non-empty if they have content
if isinstance(val, (list, dict)):
if len(val) == 0:
continue
non_empty = True
break
if non_empty:
filtered.append(row) filtered.append(row)
return filtered return filtered