gateway/modules/interfaces/interfaceTicketObjects.py
2025-09-05 11:58:25 +02:00

541 lines
23 KiB
Python

from dataclasses import dataclass
from io import BytesIO
from typing import Any
import pandas as pd
from modules.shared.timezoneUtils import get_utc_now
from modules.connectors.connectorSharepoint import ConnectorSharepoint
from modules.interfaces.interfaceTicketModel import TicketBase
from modules.interfaces.interfaceTicketModel import Task
@dataclass(slots=True)
class TicketSharepointSyncInterface:
connector_ticket: TicketBase
connector_sharepoint: ConnectorSharepoint
task_sync_definition: dict
sync_folder: str
sync_file: str
backup_folder: str
audit_folder: str
@classmethod
async def create(
cls,
connector_ticket: TicketBase,
connector_sharepoint: ConnectorSharepoint,
task_sync_definition: dict,
sync_folder: str,
sync_file: str,
backup_folder: str,
audit_folder: str,
) -> "TicketSharepointSyncInterface":
return cls(
connector_ticket=connector_ticket,
connector_sharepoint=connector_sharepoint,
task_sync_definition=task_sync_definition,
sync_folder=sync_folder,
sync_file=sync_file,
backup_folder=backup_folder,
audit_folder=audit_folder,
)
async def create_backup(self):
"""Creates a backup of the current sync file in the backup folder."""
timestamp = get_utc_now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"backup_{timestamp}_{self.sync_file}"
await self.connector_sharepoint.copy_file_async(
source_folder=self.sync_folder,
source_file=self.sync_file,
dest_folder=self.backup_folder,
dest_file=backup_filename,
)
async def sync_from_jira_to_csv(self):
"""Syncs tasks from JIRA to a CSV file in SharePoint."""
start_time = get_utc_now()
audit_log = []
audit_log.append("=== JIRA TO CSV SYNC STARTED ===")
audit_log.append(f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
audit_log.append(f"Sync File: {self.sync_file}")
audit_log.append(f"Sync Folder: {self.sync_folder}")
audit_log.append("")
try:
# 1. Read JIRA tickets
audit_log.append("Step 1: Reading JIRA tickets...")
tickets = await self.connector_ticket.read_tasks(limit=0)
audit_log.append(f"JIRA issues read: {len(tickets)}")
audit_log.append("")
# 2. Transform tasks according to task_sync_definition
audit_log.append("Step 2: Transforming JIRA data...")
transformed_tasks = self._transform_tasks(tickets)
jira_data = [task.data for task in transformed_tasks]
audit_log.append(f"JIRA issues transformed: {len(jira_data)}")
audit_log.append("")
# 3. Create JIRA export file in audit folder
audit_log.append("Step 3: Creating JIRA export file...")
try:
timestamp = get_utc_now().strftime("%Y%m%d_%H%M%S")
jira_export_filename = f"jira_export_{timestamp}.csv"
jira_export_content = self._create_csv_content(jira_data)
await self.connector_sharepoint.overwrite_file_async(
folder_path=self.audit_folder,
file_name=jira_export_filename,
content=jira_export_content,
)
audit_log.append(f"JIRA export file created: {jira_export_filename}")
except Exception as e:
audit_log.append(f"Failed to create JIRA export file: {str(e)}")
audit_log.append("")
# 4. Create backup of existing sync file (if it exists)
audit_log.append("Step 4: Creating backup...")
backup_created = False
try:
await self.create_backup()
backup_created = True
audit_log.append("Backup created successfully")
except Exception as e:
audit_log.append(
f"Backup creation failed (file might not exist): {str(e)}"
)
audit_log.append("")
# 5. Try to read existing CSV file from SharePoint
audit_log.append("Step 5: Reading existing CSV file...")
existing_data = []
existing_file_found = False
try:
csv_content = await self.connector_sharepoint.read_file_async(
folder_path=self.sync_folder, file_name=self.sync_file
)
df_existing = pd.read_csv(
BytesIO(csv_content), skiprows=2
) # Skip header rows
existing_data = df_existing.to_dict("records")
existing_file_found = True
audit_log.append(
f"Existing CSV file found with {len(existing_data)} records"
)
except Exception as e:
audit_log.append(f"No existing CSV file found or read error: {str(e)}")
audit_log.append("")
# 6. Merge JIRA data with existing data and track changes
audit_log.append("Step 6: Merging JIRA data with existing data...")
merged_data, change_details = self._merge_jira_with_existing_detailed(
jira_data, existing_data
)
# Log detailed changes
audit_log.append(f"Total records after merge: {len(merged_data)}")
audit_log.append(f"Records updated: {change_details['updated']}")
audit_log.append(f"Records added: {change_details['added']}")
audit_log.append(f"Records unchanged: {change_details['unchanged']}")
audit_log.append("")
# Log individual changes
if change_details["changes"]:
audit_log.append("DETAILED CHANGES:")
for change in change_details["changes"]:
audit_log.append(f"- {change}")
audit_log.append("")
# 7. Create CSV with 4-row structure and write to SharePoint
audit_log.append("Step 7: Writing updated CSV to SharePoint...")
csv_content = self._create_csv_content(merged_data)
await self.connector_sharepoint.overwrite_file_async(
folder_path=self.sync_folder,
file_name=self.sync_file,
content=csv_content,
)
audit_log.append("CSV file successfully written to SharePoint")
audit_log.append("")
# Success summary
end_time = get_utc_now()
duration = (end_time - start_time).total_seconds()
audit_log.append("=== SYNC COMPLETED SUCCESSFULLY ===")
audit_log.append(f"End Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
audit_log.append(f"Duration: {duration:.2f} seconds")
audit_log.append(f"Total JIRA issues processed: {len(jira_data)}")
audit_log.append(f"Total records in final CSV: {len(merged_data)}")
except Exception as e:
# Error handling
end_time = get_utc_now()
duration = (end_time - start_time).total_seconds()
audit_log.append("")
audit_log.append("=== SYNC FAILED ===")
audit_log.append(f"Error Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
audit_log.append(f"Duration before failure: {duration:.2f} seconds")
audit_log.append(f"Error: {str(e)}")
raise
finally:
# Write audit log to SharePoint
await self._write_audit_log(audit_log, "jira_to_csv")
async def sync_from_csv_to_jira(self):
"""Syncs tasks from a CSV file in SharePoint to JIRA."""
start_time = get_utc_now()
audit_log = []
audit_log.append("=== CSV TO JIRA SYNC STARTED ===")
audit_log.append(f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
audit_log.append(f"Sync File: {self.sync_file}")
audit_log.append(f"Sync Folder: {self.sync_folder}")
audit_log.append("")
try:
# 1. Read CSV file from SharePoint
audit_log.append("Step 1: Reading CSV file from SharePoint...")
try:
csv_content = await self.connector_sharepoint.read_file_async(
folder_path=self.sync_folder, file_name=self.sync_file
)
df = pd.read_csv(BytesIO(csv_content), skiprows=2) # Skip header rows
csv_data = df.to_dict("records")
audit_log.append(
f"CSV file read successfully with {len(csv_data)} records"
)
except Exception as e:
audit_log.append(f"Failed to read CSV file: {str(e)}")
audit_log.append("CSV to JIRA sync aborted - no file to process")
return
audit_log.append("")
# 2. Read current JIRA data for comparison
audit_log.append("Step 2: Reading current JIRA data for comparison...")
try:
current_jira_tasks = await self.connector_ticket.read_tasks(limit=0)
current_jira_data = self._transform_tasks(current_jira_tasks)
jira_lookup = {
task.data.get("ID"): task.data for task in current_jira_data
}
audit_log.append(f"Current JIRA data read: {len(jira_lookup)} tasks")
except Exception as e:
audit_log.append(f"Failed to read current JIRA data: {str(e)}")
raise
audit_log.append("")
# 3. Detect actual changes in "put" fields
audit_log.append("Step 3: Detecting changes in 'put' fields...")
actual_changes = {}
records_with_changes = 0
total_changes = 0
for row in csv_data:
task_id = row.get("ID")
if not task_id or task_id not in jira_lookup:
continue
current_jira_task = jira_lookup[task_id]
task_changes = {}
for field_name, field_config in self.task_sync_definition.items():
if field_config[0] == "put": # Only process "put" fields
csv_value = row.get(field_name, "")
jira_value = current_jira_task.get(field_name, "")
# Convert None to empty string for comparison
csv_value = "" if csv_value is None else str(csv_value).strip()
jira_value = (
"" if jira_value is None else str(jira_value).strip()
)
# Only include if values are different and CSV has non-empty value
if csv_value != jira_value and csv_value:
task_changes[field_name] = csv_value
if task_changes:
actual_changes[task_id] = task_changes
records_with_changes += 1
total_changes += len(task_changes)
audit_log.append(f"Records with actual changes: {records_with_changes}")
audit_log.append(f"Total field changes detected: {total_changes}")
audit_log.append("")
# Log detailed changes
if actual_changes:
audit_log.append("DETAILED CHANGES TO APPLY TO JIRA:")
for task_id, changes in actual_changes.items():
change_list = [
f"{field}: '{value}'" for field, value in changes.items()
]
audit_log.append(f"- Task ID {task_id}: {', '.join(change_list)}")
audit_log.append("")
# 4. Update JIRA tasks with actual changes
if actual_changes:
audit_log.append("Step 4: Updating JIRA tasks...")
# Convert to Task objects for the connector
tasks_to_update = []
for task_id, changes in actual_changes.items():
# Create task data structure expected by JIRA connector
# Build the nested fields structure that JIRA expects
fields = {}
for field_name, new_value in changes.items():
# Map back to JIRA field structure using task_sync_definition
field_config = self.task_sync_definition[field_name]
field_path = field_config[1]
# Extract the JIRA field ID from the path
# For "put" fields, the path is like ['fields', 'customfield_10067']
if len(field_path) >= 2 and field_path[0] == "fields":
jira_field_id = field_path[1]
fields[jira_field_id] = new_value
if fields:
task_data = {"ID": task_id, "fields": fields}
task = Task(data=task_data)
tasks_to_update.append(task)
# Write tasks back to JIRA
try:
await self.connector_ticket.write_tasks(tasks_to_update)
audit_log.append(
f"Successfully updated {len(tasks_to_update)} JIRA tasks"
)
except Exception as e:
audit_log.append(f"Failed to update JIRA tasks: {str(e)}")
raise
else:
audit_log.append("Step 4: No changes to apply to JIRA")
audit_log.append("")
# Success summary
end_time = get_utc_now()
duration = (end_time - start_time).total_seconds()
audit_log.append("=== SYNC COMPLETED SUCCESSFULLY ===")
audit_log.append(f"End Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
audit_log.append(f"Duration: {duration:.2f} seconds")
audit_log.append(f"Total CSV records processed: {len(csv_data)}")
audit_log.append(f"Records with actual changes: {records_with_changes}")
audit_log.append(f"JIRA tasks updated: {len(actual_changes)}")
except Exception as e:
# Error handling
end_time = get_utc_now()
duration = (end_time - start_time).total_seconds()
audit_log.append("")
audit_log.append("=== SYNC FAILED ===")
audit_log.append(f"Error Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
audit_log.append(f"Duration before failure: {duration:.2f} seconds")
audit_log.append(f"Error: {str(e)}")
raise
finally:
# Write audit log to SharePoint
await self._write_audit_log(audit_log, "csv_to_jira")
def _transform_tasks(self, tasks: list[Task]) -> list[Task]:
"""Transforms tasks according to the task_sync_definition."""
transformed_tasks = []
for task in tasks:
transformed_data = {}
# Process each field in the sync definition
for field_name, field_config in self.task_sync_definition.items():
direction = field_config[0] # "get" or "put"
field_path = field_config[1] # List of keys to navigate
# Only process "get" fields (JIRA → CSV)
if direction == "get":
# Extract value using the field path
value = self._extract_field_value(task.data, field_path)
transformed_data[field_name] = value
# Create new Task with transformed data
transformed_task = Task(data=transformed_data)
transformed_tasks.append(transformed_task)
return transformed_tasks
def _extract_field_value(self, issue_data: dict, field_path: list[str]) -> Any:
"""Extract field value from JIRA issue data using field path."""
value = issue_data
try:
for key in field_path:
if value is not None:
value = value[key]
if value is None:
return None
# Handle complex objects that have a 'value' field (like custom field options)
if isinstance(value, dict) and "value" in value:
value = value["value"]
# Handle lists of objects with 'value' fields
elif (
isinstance(value, list)
and len(value) > 0
and isinstance(value[0], dict)
and "value" in value[0]
):
value = value[0]["value"]
return value
except (KeyError, TypeError):
return None
def _merge_jira_with_existing(
self, jira_data: list[dict], existing_data: list[dict]
) -> list[dict]:
"""Merge JIRA data with existing CSV data, updating only 'get' fields."""
# Create a lookup for existing data by ID
existing_lookup = {row.get("ID"): row for row in existing_data if row.get("ID")}
merged_data = []
for jira_row in jira_data:
jira_id = jira_row.get("ID")
if jira_id and jira_id in existing_lookup:
# Update existing row with JIRA data (only 'get' fields)
existing_row = existing_lookup[jira_id].copy()
for field_name, field_config in self.task_sync_definition.items():
if field_config[0] == "get": # Only update 'get' fields
existing_row[field_name] = jira_row.get(field_name)
merged_data.append(existing_row)
# Remove from lookup to track processed items
del existing_lookup[jira_id]
else:
# New row from JIRA
merged_data.append(jira_row)
# Add any remaining existing rows that weren't in JIRA data
merged_data.extend(existing_lookup.values())
return merged_data
def _merge_jira_with_existing_detailed(
self, jira_data: list[dict], existing_data: list[dict]
) -> tuple[list[dict], dict]:
"""Merge JIRA data with existing CSV data and track detailed changes."""
# Create a lookup for existing data by ID
existing_lookup = {row.get("ID"): row for row in existing_data if row.get("ID")}
merged_data = []
changes = []
updated_count = 0
added_count = 0
unchanged_count = 0
for jira_row in jira_data:
jira_id = jira_row.get("ID")
if jira_id and jira_id in existing_lookup:
# Update existing row with JIRA data (only 'get' fields)
existing_row = existing_lookup[jira_id].copy()
row_changes = []
for field_name, field_config in self.task_sync_definition.items():
if field_config[0] == "get": # Only update 'get' fields
old_value = existing_row.get(field_name, "")
new_value = jira_row.get(field_name, "")
# Convert None to empty string for comparison
old_value = "" if old_value is None else str(old_value)
new_value = "" if new_value is None else str(new_value)
if old_value != new_value:
row_changes.append(
f"{field_name}: '{old_value}''{new_value}'"
)
existing_row[field_name] = jira_row.get(field_name)
merged_data.append(existing_row)
if row_changes:
updated_count += 1
changes.append(
f"Row ID {jira_id} updated: {', '.join(row_changes)}"
)
else:
unchanged_count += 1
# Remove from lookup to track processed items
del existing_lookup[jira_id]
else:
# New row from JIRA
merged_data.append(jira_row)
added_count += 1
changes.append(f"Row ID {jira_id} added as new record")
# Add any remaining existing rows that weren't in JIRA data
for remaining_row in existing_lookup.values():
merged_data.append(remaining_row)
unchanged_count += 1
change_details = {
"updated": updated_count,
"added": added_count,
"unchanged": unchanged_count,
"changes": changes,
}
return merged_data, change_details
async def _write_audit_log(self, audit_log: list[str], operation_type: str):
"""Write audit log to SharePoint."""
try:
timestamp = get_utc_now().strftime("%Y%m%d_%H%M%S")
audit_filename = f"audit_{operation_type}_{timestamp}.log"
# Convert audit log to bytes
audit_content = "\n".join(audit_log).encode("utf-8")
# Write to SharePoint
await self.connector_sharepoint.overwrite_file_async(
folder_path=self.audit_folder,
file_name=audit_filename,
content=audit_content,
)
except Exception as e:
# If audit logging fails, we don't want to break the main sync process
# Just log the error (this could be enhanced with fallback logging)
print(f"Failed to write audit log: {str(e)}")
def _create_csv_content(self, data: list[dict]) -> bytes:
"""Create CSV content with 4-row structure matching reference code."""
if not data:
return b""
# Create DataFrame from data
df = pd.DataFrame(data)
# Force all columns to be object (string) type to preserve empty cells
for column in df.columns:
df[column] = df[column].astype("object")
df[column] = df[column].fillna("")
# Create the 4-row structure
# Row 1: Static header row 1
header_row1 = pd.DataFrame(
[["Header 1"] + [""] * (len(df.columns) - 1)], columns=df.columns
)
# Row 2: Static header row 2 with timestamp
timestamp = get_utc_now().strftime("%Y-%m-%d %H:%M:%S")
header_row2 = pd.DataFrame(
[[f"{timestamp}"] + [""] * (len(df.columns) - 1)], columns=df.columns
)
# Row 3: Table headers (column names)
table_headers = pd.DataFrame([df.columns.tolist()], columns=df.columns)
# Concatenate all rows: header1 + header2 + table_headers + data
final_df = pd.concat(
[header_row1, header_row2, table_headers, df], ignore_index=True
)
# Convert to CSV bytes
csv_buffer = BytesIO()
final_df.to_csv(csv_buffer, index=False, header=False)
return csv_buffer.getvalue()