diff --git a/app.py b/app.py index 72c94ab5..282775ad 100644 --- a/app.py +++ b/app.py @@ -4,6 +4,7 @@ os.environ["NUMEXPR_MAX_THREADS"] = "12" from fastapi import FastAPI, HTTPException, Depends, Body, status, Response from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager +from zoneinfo import ZoneInfo import logging from logging.handlers import RotatingFileHandler @@ -11,6 +12,8 @@ from datetime import timedelta import pathlib from modules.shared.configuration import APP_CONFIG +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger def initLogging(): """Initialize logging with configuration from APP_CONFIG""" @@ -147,10 +150,43 @@ async def lifespan(app: FastAPI): from modules.interfaces.interfaceAppObjects import getRootInterface getRootInterface() + # Setup APScheduler for JIRA sync + scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Zurich")) + try: + from modules.workflow.managerSyncDelta import perform_sync_jira_delta_group + # Schedule hourly sync at minute 0 + scheduler.add_job( + perform_sync_jira_delta_group, + CronTrigger(minute="0"), + id="jira_delta_group_sync", + replace_existing=True, + coalesce=True, + max_instances=1, + misfire_grace_time=1800, + ) + scheduler.start() + logger.info("APScheduler started (jira_delta_group_sync hourly)") + + # Run initial sync on startup (non-blocking failure) + try: + logger.info("Running initial JIRA sync on app startup...") + await perform_sync_jira_delta_group() + logger.info("Initial JIRA sync completed successfully") + except Exception as e: + logger.error(f"Initial JIRA sync failed: {str(e)}") + except Exception as e: + logger.error(f"Failed to initialize scheduler or JIRA sync: {str(e)}") + yield # Shutdown logic logger.info("Application has been shut down") + try: + if 'scheduler' in locals() and scheduler.running: + scheduler.shutdown(wait=False) + logger.info("APScheduler stopped") + except Exception as e: + logger.error(f"Error shutting down scheduler: {str(e)}") # START APP app = FastAPI( @@ -212,6 +248,3 @@ app.include_router(msftRouter) from modules.routes.routeSecurityGoogle import router as googleRouter app.include_router(googleRouter) - -from modules.routes.routeJira import router as jiraRouter -app.include_router(jiraRouter) \ No newline at end of file diff --git a/modules/connectors/connectorSharepoint.py b/modules/connectors/connectorSharepoint.py index b5eaa703..89bdffbe 100644 --- a/modules/connectors/connectorSharepoint.py +++ b/modules/connectors/connectorSharepoint.py @@ -1,180 +1,443 @@ -"""Connector for CRUD sharepoint operations.""" +"""Connector for SharePoint operations using Microsoft Graph API.""" +import logging +import json +import aiohttp import asyncio -from concurrent.futures import ThreadPoolExecutor -from dataclasses import dataclass -from datetime import datetime -from io import BytesIO -from typing import Optional -from office365.sharepoint.client_context import ClientContext -from office365.sharepoint.files.file import File +from typing import Dict, Any, List, Optional +from datetime import datetime, UTC + +logger = logging.getLogger(__name__) -@dataclass class ConnectorSharepoint: - ctx: ClientContext - - @classmethod - async def create(cls, ctx: ClientContext) -> "ConnectorSharepoint": - """Creates an instance of the Sharepoint connector. - - Params: - ctx: The ClientContext instance. - - Returns: - ConnectorSharepoint: An instance of the Sharepoint connector. + """SharePoint connector using Microsoft Graph API for reliable authentication.""" + + def __init__(self, access_token: str): + """Initialize with access token. + + Args: + access_token: Microsoft Graph access token """ - return cls(ctx=ctx) - - @classmethod - def get_client_context_from_username_password( - cls, site_url: str, username: str, password: str - ) -> ClientContext: - """Creates a ClientContext instance from username and password. - - Params: - site_url: The URL of the SharePoint site. - username: The username for authentication. - password: The password for authentication. - - Returns: - ClientContext: An instance of the ClientContext. - """ - return ClientContext(site_url).with_user_credentials(username, password) - - @classmethod - def get_client_context_from_app( - cls, site_url: str, client_id: str, client_secret: str - ) -> ClientContext: - """Creates a ClientContext instance from client ID and client secret. - - Params: - site_url: The URL of the SharePoint site. - client_id: The client ID for authentication. - client_secret: The client secret for authentication. - - Returns: - ClientContext: An instance of the ClientContext. - """ - return ClientContext(site_url).with_client_credentials( - client_id=client_id, client_secret=client_secret - ) - - def copy_file( - self, *, source_folder: str, source_file: str, dest_folder: str, dest_file: str - ) -> bool: - """Copy a file from one SharePoint location to another. - - Params: - source_folder: Source folder path (server-relative) - source_file: Source file name - dest_folder: Destination folder path (server-relative) - dest_file: Destination file name - - Returns: - bool: True if successful, False otherwise - """ - source_path = f"{source_folder.rstrip('/')}/{source_file}" - dest_path = f"{dest_folder.rstrip('/')}/{dest_file}" - - source_file_obj = self.ctx.web.get_file_by_server_relative_url(source_path) - source_file_obj.copyto(dest_path).execute_query() - return True - - async def copy_file_async( - self, *, source_folder: str, source_file: str, dest_folder: str, dest_file: str - ) -> bool: - """Copy a file from one SharePoint location to another (async version). - - Params: - source_folder: Source folder path (server-relative) - source_file: Source file name - dest_folder: Destination folder path (server-relative) - dest_file: Destination file name - - Returns: - bool: True if successful, False otherwise - """ - loop = asyncio.get_event_loop() - with ThreadPoolExecutor() as executor: - return await loop.run_in_executor( - executor, - lambda: self.copy_file( - source_folder=source_folder, - source_file=source_file, - dest_folder=dest_folder, - dest_file=dest_file, - ), + self.access_token = access_token + self.base_url = "https://graph.microsoft.com/v1.0" + + async def _make_graph_api_call(self, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]: + """Make a Microsoft Graph API call with proper error handling.""" + try: + headers = { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json" + } + + # Remove leading slash from endpoint to avoid double slash + clean_endpoint = endpoint.lstrip('/') + url = f"{self.base_url}/{clean_endpoint}" + logger.debug(f"Making Graph API call: {method} {url}") + + timeout = aiohttp.ClientTimeout(total=30) + + async with aiohttp.ClientSession(timeout=timeout) as session: + if method == "GET": + async with session.get(url, headers=headers) as response: + if response.status == 200: + return await response.json() + else: + error_text = await response.text() + logger.error(f"Graph API call failed: {response.status} - {error_text}") + return {"error": f"API call failed: {response.status} - {error_text}"} + + elif method == "PUT": + async with session.put(url, headers=headers, data=data) as response: + if response.status in [200, 201]: + return await response.json() + else: + error_text = await response.text() + logger.error(f"Graph API call failed: {response.status} - {error_text}") + return {"error": f"API call failed: {response.status} - {error_text}"} + + elif method == "POST": + async with session.post(url, headers=headers, data=data) as response: + if response.status in [200, 201]: + return await response.json() + else: + error_text = await response.text() + logger.error(f"Graph API call failed: {response.status} - {error_text}") + return {"error": f"API call failed: {response.status} - {error_text}"} + + except asyncio.TimeoutError: + logger.error(f"Graph API call timed out after 30 seconds: {endpoint}") + return {"error": f"API call timed out after 30 seconds: {endpoint}"} + except Exception as e: + logger.error(f"Error making Graph API call: {str(e)}") + return {"error": f"Error making Graph API call: {str(e)}"} + + async def discover_sites(self) -> List[Dict[str, Any]]: + """Discover all SharePoint sites accessible to the user.""" + try: + result = await self._make_graph_api_call("sites?search=*") + + if "error" in result: + logger.error(f"Error discovering SharePoint sites: {result['error']}") + return [] + + sites = result.get("value", []) + logger.info(f"Discovered {len(sites)} SharePoint sites") + + processed_sites = [] + for site in sites: + site_info = { + "id": site.get("id"), + "displayName": site.get("displayName"), + "name": site.get("name"), + "webUrl": site.get("webUrl"), + "description": site.get("description"), + "createdDateTime": site.get("createdDateTime"), + "lastModifiedDateTime": site.get("lastModifiedDateTime") + } + processed_sites.append(site_info) + logger.debug(f"Site: {site_info['displayName']} - {site_info['webUrl']}") + + return processed_sites + + except Exception as e: + logger.error(f"Error discovering SharePoint sites: {str(e)}") + return [] + + async def find_site_by_name(self, site_name: str) -> Optional[Dict[str, Any]]: + """Find a specific SharePoint site by name using direct Graph API call.""" + try: + # Try to get the site directly by name using Graph API + endpoint = f"sites/{site_name}" + result = await self._make_graph_api_call(endpoint) + + if result and "error" not in result: + site_info = { + "id": result.get("id"), + "displayName": result.get("displayName"), + "name": result.get("name"), + "webUrl": result.get("webUrl"), + "description": result.get("description"), + "createdDateTime": result.get("createdDateTime"), + "lastModifiedDateTime": result.get("lastModifiedDateTime") + } + logger.info(f"Found site directly: {site_info['displayName']} - {site_info['webUrl']}") + return site_info + + except Exception as e: + logger.debug(f"Direct site lookup failed for '{site_name}': {str(e)}") + + # Fallback to discovery if direct lookup fails + logger.info(f"Direct lookup failed, trying discovery for site: {site_name}") + sites = await self.discover_sites() + if not sites: + logger.warning("No sites discovered") + return None + + logger.info(f"Discovered {len(sites)} SharePoint sites:") + for site in sites: + logger.info(f" - {site.get('displayName', 'Unknown')} (ID: {site.get('id', 'Unknown')})") + + # Try exact match first + for site in sites: + if site.get("displayName", "").strip().lower() == site_name.strip().lower(): + logger.info(f"Found exact match: {site.get('displayName')}") + return site + + # Try partial match + for site in sites: + if site_name.lower() in site.get("displayName", "").lower(): + logger.info(f"Found partial match: {site.get('displayName')}") + return site + + logger.warning(f"No site found matching: {site_name}") + return None + + async def find_site_by_web_url(self, web_url: str) -> Optional[Dict[str, Any]]: + """Find a SharePoint site using its web URL (useful for guest sites).""" + try: + # Use the web URL format: sites/{hostname}:/sites/{site-path} + # Extract hostname and site path from the web URL + if not web_url.startswith("https://"): + web_url = f"https://{web_url}" + + # Parse the URL to extract hostname and site path + from urllib.parse import urlparse + parsed = urlparse(web_url) + hostname = parsed.hostname + path_parts = parsed.path.strip('/').split('/') + + if len(path_parts) >= 2 and path_parts[0] == 'sites': + site_path = '/'.join(path_parts[1:]) # Everything after 'sites/' + else: + logger.error(f"Invalid SharePoint URL format: {web_url}") + return None + + endpoint = f"sites/{hostname}:/sites/{site_path}" + logger.debug(f"Trying web URL format: {endpoint}") + + result = await self._make_graph_api_call(endpoint) + + if result and "error" not in result: + site_info = { + "id": result.get("id"), + "displayName": result.get("displayName"), + "name": result.get("name"), + "webUrl": result.get("webUrl"), + "description": result.get("description"), + "createdDateTime": result.get("createdDateTime"), + "lastModifiedDateTime": result.get("lastModifiedDateTime") + } + logger.info(f"Found site by web URL: {site_info['displayName']} - {site_info['webUrl']} (ID: {site_info['id']})") + return site_info + else: + logger.warning(f"Site not found using web URL: {web_url}") + return None + + except Exception as e: + logger.error(f"Error finding site by web URL: {str(e)}") + return None + + async def find_site_by_url(self, hostname: str, site_path: str) -> Optional[Dict[str, Any]]: + """Find a SharePoint site using the site URL format.""" + try: + # For guest sites, try different URL formats + url_formats = [ + f"sites/{hostname}:/sites/{site_path}", # Standard format + f"sites/{hostname}:/sites/{site_path}/", # With trailing slash + f"sites/{hostname}:/sites/{site_path.lower()}", # Lowercase + f"sites/{hostname}:/sites/{site_path.lower()}/", # Lowercase with slash + ] + + for endpoint in url_formats: + logger.debug(f"Trying URL format: {endpoint}") + result = await self._make_graph_api_call(endpoint) + + if result and "error" not in result: + site_info = { + "id": result.get("id"), + "displayName": result.get("displayName"), + "name": result.get("name"), + "webUrl": result.get("webUrl"), + "description": result.get("description"), + "createdDateTime": result.get("createdDateTime"), + "lastModifiedDateTime": result.get("lastModifiedDateTime") + } + logger.info(f"Found site by URL: {site_info['displayName']} - {site_info['webUrl']} (ID: {site_info['id']})") + return site_info + else: + logger.debug(f"URL format failed: {endpoint} - {result.get('error', 'Unknown error')}") + + logger.warning(f"Site not found using any URL format for: {hostname}:/sites/{site_path}") + return None + + except Exception as e: + logger.error(f"Error finding site by URL: {str(e)}") + return None + + async def get_folder_by_path(self, site_id: str, folder_path: str) -> Optional[Dict[str, Any]]: + """Get folder information by path within a site.""" + try: + # Clean the path + clean_path = folder_path.lstrip('/') + endpoint = f"sites/{site_id}/drive/root:/{clean_path}" + + result = await self._make_graph_api_call(endpoint) + + if "error" in result: + logger.warning(f"Folder not found at path {folder_path}: {result['error']}") + return None + + return result + + except Exception as e: + logger.error(f"Error getting folder by path: {str(e)}") + return None + + async def upload_file(self, site_id: str, folder_path: str, file_name: str, content: bytes) -> Dict[str, Any]: + """Upload a file to SharePoint.""" + try: + # Clean the path + clean_path = folder_path.lstrip('/') + upload_path = f"{clean_path.rstrip('/')}/{file_name}" + endpoint = f"sites/{site_id}/drive/root:/{upload_path}:/content" + + logger.info(f"Uploading file to: {endpoint}") + + result = await self._make_graph_api_call(endpoint, method="PUT", data=content) + + if "error" in result: + logger.error(f"Upload failed: {result['error']}") + return result + + logger.info(f"File uploaded successfully: {file_name}") + return result + + except Exception as e: + logger.error(f"Error uploading file: {str(e)}") + return {"error": f"Error uploading file: {str(e)}"} + + async def download_file(self, site_id: str, file_id: str) -> Optional[bytes]: + """Download a file from SharePoint.""" + try: + endpoint = f"sites/{site_id}/drive/items/{file_id}/content" + + headers = {"Authorization": f"Bearer {self.access_token}"} + timeout = aiohttp.ClientTimeout(total=30) + + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(f"{self.base_url}/{endpoint}", headers=headers) as response: + if response.status == 200: + return await response.read() + else: + logger.error(f"Download failed: {response.status}") + return None + + except Exception as e: + logger.error(f"Error downloading file: {str(e)}") + return None + + async def list_folder_contents(self, site_id: str, folder_path: str = "") -> List[Dict[str, Any]]: + """List contents of a folder.""" + try: + if not folder_path or folder_path == "/": + endpoint = f"sites/{site_id}/drive/root/children" + else: + clean_path = folder_path.lstrip('/') + endpoint = f"sites/{site_id}/drive/root:/{clean_path}:/children" + + result = await self._make_graph_api_call(endpoint) + + if "error" in result: + logger.warning(f"Failed to list folder contents: {result['error']}") + return None + + items = result.get("value", []) + processed_items = [] + + for item in items: + # Determine if it's a folder or file + is_folder = 'folder' in item + + item_info = { + "id": item.get("id"), + "name": item.get("name"), + "type": "folder" if is_folder else "file", + "size": item.get("size", 0), + "createdDateTime": item.get("createdDateTime"), + "lastModifiedDateTime": item.get("lastModifiedDateTime"), + "webUrl": item.get("webUrl") + } + + if "file" in item: + item_info["mimeType"] = item["file"].get("mimeType") + item_info["downloadUrl"] = item.get("@microsoft.graph.downloadUrl") + + if "folder" in item: + item_info["childCount"] = item["folder"].get("childCount", 0) + + processed_items.append(item_info) + + return processed_items + + except Exception as e: + logger.error(f"Error listing folder contents: {str(e)}") + return [] + + async def search_files(self, site_id: str, query: str) -> List[Dict[str, Any]]: + """Search for files in a site.""" + try: + search_query = query.replace("'", "''") # Escape single quotes for OData + endpoint = f"sites/{site_id}/drive/root/search(q='{search_query}')" + + result = await self._make_graph_api_call(endpoint) + + if "error" in result: + logger.warning(f"Search failed: {result['error']}") + return [] + + items = result.get("value", []) + processed_items = [] + + for item in items: + is_folder = 'folder' in item + + item_info = { + "id": item.get("id"), + "name": item.get("name"), + "type": "folder" if is_folder else "file", + "size": item.get("size", 0), + "createdDateTime": item.get("createdDateTime"), + "lastModifiedDateTime": item.get("lastModifiedDateTime"), + "webUrl": item.get("webUrl"), + "parentPath": item.get("parentReference", {}).get("path", "") + } + + if "file" in item: + item_info["mimeType"] = item["file"].get("mimeType") + item_info["downloadUrl"] = item.get("@microsoft.graph.downloadUrl") + + processed_items.append(item_info) + + return processed_items + + except Exception as e: + logger.error(f"Error searching files: {str(e)}") + return [] + + async def copy_file_async(self, site_id: str, source_folder: str, source_file: str, dest_folder: str, dest_file: str) -> None: + """Copy a file from source to destination folder (like original synchronizer).""" + try: + # First, download the source file + source_path = f"{source_folder}/{source_file}" + file_content = await self.download_file_by_path(site_id=site_id, file_path=source_path) + + if not file_content: + raise Exception(f"Failed to download source file: {source_path}") + + # Upload to destination + await self.upload_file( + site_id=site_id, + folder_path=dest_folder, + file_name=dest_file, + content=file_content ) + + logger.info(f"File copied: {source_file} -> {dest_file}") + + except Exception as e: + logger.error(f"Error copying file: {str(e)}") + raise + + async def download_file_by_path(self, site_id: str, file_path: str) -> Optional[bytes]: + """Download a file by its path within a site.""" + try: + # Clean the path + clean_path = file_path.strip('/') + endpoint = f"sites/{site_id}/drive/root:/{clean_path}:/content" + + # Use direct HTTP call for file downloads (binary content) + headers = { + "Authorization": f"Bearer {self.access_token}", + } + + # Remove leading slash from endpoint to avoid double slash + clean_endpoint = endpoint.lstrip('/') + url = f"{self.base_url}/{clean_endpoint}" + logger.debug(f"Downloading file: GET {url}") + + timeout = aiohttp.ClientTimeout(total=30) + + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url, headers=headers) as response: + if response.status == 200: + return await response.read() + else: + error_text = await response.text() + logger.error(f"File download failed: {response.status} - {error_text}") + return None + + except Exception as e: + logger.error(f"Error downloading file by path: {str(e)}") + return None - def read_file(self, *, folder_path: str, file_name: str) -> bytes: - """Read a file from SharePoint and return its content as bytes. - - Params: - folder_path: Folder path (server-relative) - file_name: File name - - Returns: - bytes: File content as bytes - """ - file_path = f"{folder_path.rstrip('/')}/{file_name}" - response = File.open_binary(self.ctx, file_path) - return response.content - - async def read_file_async(self, *, folder_path: str, file_name: str) -> bytes: - """Read a file from SharePoint and return its content as bytes (async version). - - Params: - folder_path: Folder path (server-relative) - file_name: File name - - Returns: - bytes: File content as bytes - """ - loop = asyncio.get_event_loop() - with ThreadPoolExecutor() as executor: - return await loop.run_in_executor( - executor, - lambda: self.read_file(folder_path=folder_path, file_name=file_name), - ) - - def overwrite_file( - self, *, folder_path: str, file_name: str, content: bytes - ) -> bool: - """Write content to a SharePoint file, overwriting if it exists. - - Params: - folder_path: Target folder path (server-relative) - file_name: Target file name - content: File content as bytes - - Returns: - bool: True if successful, False otherwise - """ - target_folder = self.ctx.web.get_folder_by_server_relative_url(folder_path) - target_folder.upload_file(file_name, content).execute_query() - return True - - async def overwrite_file_async( - self, *, folder_path: str, file_name: str, content: bytes - ) -> bool: - """Write content to a SharePoint file, overwriting if it exists (async version). - - Params: - folder_path: Target folder path (server-relative) - file_name: Target file name - content: File content as bytes - - Returns: - bool: True if successful, False otherwise - """ - loop = asyncio.get_event_loop() - with ThreadPoolExecutor() as executor: - return await loop.run_in_executor( - executor, - lambda: self.overwrite_file( - folder_path=folder_path, - file_name=file_name, - content=content, - ), - ) diff --git a/modules/interfaces/interfaceTicketObjects.py b/modules/interfaces/interfaceTicketObjects.py index 8d46e20f..991c9da0 100644 --- a/modules/interfaces/interfaceTicketObjects.py +++ b/modules/interfaces/interfaceTicketObjects.py @@ -6,8 +6,7 @@ from modules.shared.timezoneUtils import get_utc_now from modules.connectors.connectorSharepoint import ConnectorSharepoint -from modules.interfaces.interfaceTicketModel import TicketBase -from modules.interfaces.interfaceTicketModel import Task +from modules.interfaces.interfaceTicketModel import TicketBase, Task @dataclass(slots=True) @@ -19,6 +18,7 @@ class TicketSharepointSyncInterface: sync_file: str backup_folder: str audit_folder: str + site_id: str # Keep for compatibility but not used with REST API @classmethod async def create( @@ -30,6 +30,7 @@ class TicketSharepointSyncInterface: sync_file: str, backup_folder: str, audit_folder: str, + site_id: str, ) -> "TicketSharepointSyncInterface": return cls( connector_ticket=connector_ticket, @@ -39,6 +40,7 @@ class TicketSharepointSyncInterface: sync_file=sync_file, backup_folder=backup_folder, audit_folder=audit_folder, + site_id=site_id, ) async def create_backup(self): @@ -47,6 +49,7 @@ class TicketSharepointSyncInterface: backup_filename = f"backup_{timestamp}_{self.sync_file}" await self.connector_sharepoint.copy_file_async( + site_id=self.site_id, source_folder=self.sync_folder, source_file=self.sync_file, dest_folder=self.backup_folder, @@ -83,8 +86,10 @@ class TicketSharepointSyncInterface: try: timestamp = get_utc_now().strftime("%Y%m%d_%H%M%S") jira_export_filename = f"jira_export_{timestamp}.csv" - jira_export_content = self._create_csv_content(jira_data) - await self.connector_sharepoint.overwrite_file_async( + # Use default headers for JIRA export + jira_export_content = self._create_csv_content(jira_data, {"header1": "JIRA Export", "header2": "Raw Data"}) + await self.connector_sharepoint.upload_file( + site_id=self.site_id, folder_path=self.audit_folder, file_name=jira_export_filename, content=jira_export_content, @@ -111,18 +116,35 @@ class TicketSharepointSyncInterface: audit_log.append("Step 5: Reading existing CSV file...") existing_data = [] existing_file_found = False + existing_headers = {"header1": "", "header2": ""} try: - csv_content = await self.connector_sharepoint.read_file_async( - folder_path=self.sync_folder, file_name=self.sync_file + file_path = f"{self.sync_folder}/{self.sync_file}" + csv_content = await self.connector_sharepoint.download_file_by_path( + site_id=self.site_id, file_path=file_path ) + + # Read the first two lines to get headers + csv_lines = csv_content.decode('utf-8').split('\n') + if len(csv_lines) >= 2: + # Store the raw first two lines as headers (preserving original formatting) + existing_headers["header1"] = csv_lines[0].rstrip('\r\n') + existing_headers["header2"] = csv_lines[1].rstrip('\r\n') + + # Try to read with robust CSV parsing (skip first 2 rows) df_existing = pd.read_csv( - BytesIO(csv_content), skiprows=2 - ) # Skip header rows + BytesIO(csv_content), + skiprows=2, + quoting=1, # QUOTE_ALL + escapechar='\\', + on_bad_lines='skip', # Skip malformed lines + engine='python' # More robust parsing + ) existing_data = df_existing.to_dict("records") existing_file_found = True audit_log.append( f"Existing CSV file found with {len(existing_data)} records" ) + audit_log.append(f"Preserved headers: Header1='{existing_headers['header1']}', Header2='{existing_headers['header2']}'") except Exception as e: audit_log.append(f"No existing CSV file found or read error: {str(e)}") audit_log.append("") @@ -149,8 +171,9 @@ class TicketSharepointSyncInterface: # 7. Create CSV with 4-row structure and write to SharePoint audit_log.append("Step 7: Writing updated CSV to SharePoint...") - csv_content = self._create_csv_content(merged_data) - await self.connector_sharepoint.overwrite_file_async( + csv_content = self._create_csv_content(merged_data, existing_headers) + await self.connector_sharepoint.upload_file( + site_id=self.site_id, folder_path=self.sync_folder, file_name=self.sync_file, content=csv_content, @@ -196,10 +219,19 @@ class TicketSharepointSyncInterface: # 1. Read CSV file from SharePoint audit_log.append("Step 1: Reading CSV file from SharePoint...") try: - csv_content = await self.connector_sharepoint.read_file_async( - folder_path=self.sync_folder, file_name=self.sync_file + file_path = f"{self.sync_folder}/{self.sync_file}" + csv_content = await self.connector_sharepoint.download_file_by_path( + site_id=self.site_id, file_path=file_path + ) + # Try to read with robust CSV parsing + df = pd.read_csv( + BytesIO(csv_content), + skiprows=2, + quoting=1, # QUOTE_ALL + escapechar='\\', + on_bad_lines='skip', # Skip malformed lines + engine='python' # More robust parsing ) - df = pd.read_csv(BytesIO(csv_content), skiprows=2) # Skip header rows csv_data = df.to_dict("records") audit_log.append( f"CSV file read successfully with {len(csv_data)} records" @@ -495,34 +527,71 @@ class TicketSharepointSyncInterface: # Convert audit log to bytes audit_content = "\n".join(audit_log).encode("utf-8") + # Debug logging + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Writing audit log to folder: {self.audit_folder}, file: {audit_filename}") + # Write to SharePoint - await self.connector_sharepoint.overwrite_file_async( + await self.connector_sharepoint.upload_file( + site_id=self.site_id, folder_path=self.audit_folder, file_name=audit_filename, content=audit_content, ) + logger.debug("Audit log written successfully") except Exception as e: # If audit logging fails, we don't want to break the main sync process # Just log the error (this could be enhanced with fallback logging) import logging logger = logging.getLogger(__name__) logger.warning(f"Failed to write audit log: {str(e)}") + logger.warning(f"Audit folder: {self.audit_folder}") + logger.warning(f"Operation type: {operation_type}") + import traceback + logger.warning(f"Traceback: {traceback.format_exc()}") - def _create_csv_content(self, data: list[dict]) -> bytes: + def _create_csv_content(self, data: list[dict], existing_headers: dict = None) -> bytes: """Create CSV content with 4-row structure matching reference code.""" + # Get current timestamp for header + timestamp = get_utc_now().strftime("%Y-%m-%d %H:%M:%S UTC") + + # Use existing headers if provided, otherwise use defaults + if existing_headers is None: + existing_headers = {"header1": "Header 1", "header2": "Header 2"} + if not data: # Build an empty table with the expected columns from schema cols = list(self.task_sync_definition.keys()) df = pd.DataFrame(columns=cols) - # Row 1 & 2: keep your current banner lines - header_row1 = pd.DataFrame( - [["Header 1"] + [""] * (len(cols) - 1)], columns=cols - ) - header_row2 = pd.DataFrame( - [["Header 2"] + [""] * (len(cols) - 1)], columns=cols - ) + # Parse existing headers to extract individual columns + import csv as csv_module + header1_text = existing_headers.get("header1", "Header 1") + header2_text = existing_headers.get("header2", "Header 2") + + # Parse the existing header rows + header1_reader = csv_module.reader([header1_text]) + header2_reader = csv_module.reader([header2_text]) + header1_row = next(header1_reader, []) + header2_row = next(header2_reader, []) + + # Row 1: Use existing header1 or default + if len(header1_row) >= len(cols): + header_row1_data = header1_row[:len(cols)] + else: + header_row1_data = header1_row + [""] * (len(cols) - len(header1_row)) + header_row1 = pd.DataFrame([header_row1_data], columns=cols) + + # Row 2: Use existing header2 and add timestamp to second column + if len(header2_row) >= len(cols): + header_row2_data = header2_row[:len(cols)] + else: + header_row2_data = header2_row + [""] * (len(cols) - len(header2_row)) + if len(header_row2_data) > 1: + header_row2_data[1] = timestamp + header_row2 = pd.DataFrame([header_row2_data], columns=cols) # Row 3: table headers table_headers = pd.DataFrame([cols], columns=cols) @@ -531,7 +600,7 @@ class TicketSharepointSyncInterface: [header_row1, header_row2, table_headers, df], ignore_index=True ) csv_text = StringIO() - final_df.to_csv(csv_text, index=False, header=False) + final_df.to_csv(csv_text, index=False, header=False, quoting=1, escapechar='\\') return csv_text.getvalue().encode("utf-8") # Create DataFrame from data @@ -542,16 +611,38 @@ class TicketSharepointSyncInterface: df[column] = df[column].astype("object") df[column] = df[column].fillna("") - # Create the 4-row structure - # Row 1: Static header row 1 - header_row1 = pd.DataFrame( - [["Header 1"] + [""] * (len(df.columns) - 1)], columns=df.columns - ) + # Clean data: replace actual line breaks with \n and escape quotes + for column in df.columns: + df[column] = df[column].astype(str).str.replace('\n', '\\n', regex=False) + df[column] = df[column].str.replace('"', '""', regex=False) - # Row 2: Static header row 2 with strict compatibility - header_row2 = pd.DataFrame( - [["Header 2"] + [""] * (len(df.columns) - 1)], columns=df.columns - ) + # Create the 4-row structure + # Parse existing headers to extract individual columns + import csv as csv_module + header1_text = existing_headers.get("header1", "Header 1") + header2_text = existing_headers.get("header2", "Header 2") + + # Parse the existing header rows + header1_reader = csv_module.reader([header1_text]) + header2_reader = csv_module.reader([header2_text]) + header1_row = next(header1_reader, []) + header2_row = next(header2_reader, []) + + # Row 1: Use existing header1 or default + if len(header1_row) >= len(df.columns): + header_row1_data = header1_row[:len(df.columns)] + else: + header_row1_data = header1_row + [""] * (len(df.columns) - len(header1_row)) + header_row1 = pd.DataFrame([header_row1_data], columns=df.columns) + + # Row 2: Use existing header2 and add timestamp to second column + if len(header2_row) >= len(df.columns): + header_row2_data = header2_row[:len(df.columns)] + else: + header_row2_data = header2_row + [""] * (len(df.columns) - len(header2_row)) + if len(header_row2_data) > 1: + header_row2_data[1] = timestamp + header_row2 = pd.DataFrame([header_row2_data], columns=df.columns) # Row 3: Table headers (column names) table_headers = pd.DataFrame([df.columns.tolist()], columns=df.columns) @@ -561,7 +652,7 @@ class TicketSharepointSyncInterface: [header_row1, header_row2, table_headers, df], ignore_index=True ) - # Convert to CSV bytes (write text, then encode) + # Convert to CSV bytes with proper quoting for fields containing special characters csv_text = StringIO() - final_df.to_csv(csv_text, index=False, header=False) + final_df.to_csv(csv_text, index=False, header=False, quoting=1, escapechar='\\') return csv_text.getvalue().encode("utf-8") diff --git a/modules/routes/routeJira.py b/modules/routes/routeJira.py deleted file mode 100644 index 3e4038aa..00000000 --- a/modules/routes/routeJira.py +++ /dev/null @@ -1,141 +0,0 @@ -# Configure logger -import logging -from fastapi import APIRouter, FastAPI -from contextlib import asynccontextmanager -from zoneinfo import ZoneInfo - - -from modules.connectors.connectorTicketJira import ConnectorTicketJira -from modules.connectors.connectorSharepoint import ConnectorSharepoint -from modules.interfaces.interfaceTicketObjects import TicketSharepointSyncInterface - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.triggers.cron import CronTrigger - - -logger = logging.getLogger(__name__) - - -scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Zurich")) - - -@asynccontextmanager -async def router_lifespan(app: FastAPI): - # start scheduler when this router is mounted - scheduler.add_job( - perform_sync_jira_delta_group, - CronTrigger(minute="0"), # run at the top of every hour - id="jira_delta_group_sync", - replace_existing=True, - coalesce=True, - max_instances=1, - misfire_grace_time=1800, - ) - scheduler.start() - logger.info("APScheduler started (jira_delta_group_sync hourly)") - try: - yield - finally: - if scheduler.running: - scheduler.shutdown(wait=False) - logger.info("APScheduler stopped") - - -router = APIRouter( - prefix="/api/jira", - tags=["JIRA Sync"], - lifespan=router_lifespan, -) - - -@router.post("/sync/delta-group") -async def sync_jira_delta_group(): - """Endpoint to trigger JIRA-SharePoint sync for Delta Group project.""" - - logger.info("Received request to sync JIRA Delta Group project") - await perform_sync_jira_delta_group() - - # Return a response - return {"status": "Sync completed"} - - -async def perform_sync_jira_delta_group(): - logger.info("Syncing Jira issues for Delta Group...") - - # Sharepoint connection parameters - sharepoint_client_id = None - sharepoint_client_secret = None - sharepoint_site_url = None - - # Jira connection parameters - jira_username = "ONHOLD - TASK - p.motsch@valueon.ch" - jira_api_token = "ATATT3xFfGF0d973nNb3R1wTDI4lesmJfJAmooS-4cYMJTyLfwYv4himrE6yyCxyX3aSMfl34NHcm2fAXeFXrLHUzJx0RQVUBonCFnlgexjLQTgS5BoCbSO7dwAVjlcHZZkArHbooCUaRwJ15n6AHkm-nwdjLQ3Z74TFnKKUZC4uhuh3Aj-MuX8=2D7124FA" - jira_url = "https://deltasecurity.atlassian.net" - project_code = "DCS" - issue_type = "Task" - - # Basic validation (credentials will be added later) - if not all([sharepoint_client_id, sharepoint_client_secret, sharepoint_site_url]): - raise ValueError("SharePoint credentials not configured") - - if not all([jira_username, jira_api_token]): - raise ValueError("JIRA credentials not configured") - - # Define the task sync definition - task_sync_definition = { - # key=excel-header, [get:jira>excel | put: excel>jira, jira-xml-field-list] - "ID": ["get", ["key"]], - "Module Category": ["get", ["fields", "customfield_10058", "value"]], - "Summary": ["get", ["fields", "summary"]], - "Description": ["get", ["fields", "description"]], - "References": ["get", ["fields", "customfield_10066"]], - "Priority": ["get", ["fields", "priority", "name"]], - "Issue Status": ["get", ["fields", "customfield_10062"]], - "Assignee": ["get", ["fields", "assignee", "displayName"]], - "Issue Created": ["get", ["fields", "created"]], - "Due Date": ["get", ["fields", "duedate"]], - "DELTA Comments": ["get", ["fields", "customfield_10060"]], - "SELISE Ticket References": ["put", ["fields", "customfield_10067"]], - "SELISE Status Values": ["put", ["fields", "customfield_10065"]], - "SELISE Comments": ["put", ["fields", "customfield_10064"]], - } - - # SharePoint file configuration - sync_folder = "/sites//Shared Documents/TicketSync" - sync_file = "delta_group_selise_ticket_exchange_list.csv" - backup_folder = "/sites//Shared Documents/TicketSync/Backups" - audit_folder = "/sites//Shared Documents/TicketSync/AuditLogs" - - # Create the jira connector instance - jira_connector = await ConnectorTicketJira.create( - jira_username=jira_username, - jira_api_token=jira_api_token, - jira_url=jira_url, - project_code=project_code, - issue_type=issue_type, - ) - - # Create the sharepoint connector instance - ctx = ConnectorSharepoint.get_client_context_from_app( - site_url=sharepoint_site_url, - client_id=sharepoint_client_id, - client_secret=sharepoint_client_secret, - ) - sharepoint_connector = await ConnectorSharepoint.create(ctx=ctx) - - # Create the sync interface instance - sync_interface = await TicketSharepointSyncInterface.create( - connector_ticket=jira_connector, - connector_sharepoint=sharepoint_connector, - task_sync_definition=task_sync_definition, - sync_folder=sync_folder, - sync_file=sync_file, - backup_folder=backup_folder, - audit_folder=audit_folder, - ) - - # Sync from JIRA to CSV in Sharepoint - await sync_interface.sync_from_jira_to_csv() - - # Sync from CSV in Sharepoint to JIRA - await sync_interface.sync_from_csv_to_jira() diff --git a/modules/workflow/managerSyncDelta.py b/modules/workflow/managerSyncDelta.py new file mode 100644 index 00000000..e6a0ce56 --- /dev/null +++ b/modules/workflow/managerSyncDelta.py @@ -0,0 +1,231 @@ +""" +Delta Group JIRA-SharePoint Sync Manager + +This module handles the synchronization of JIRA tickets to SharePoint using the new +Graph API-based connector architecture. +""" + +import logging +import csv +import io +from datetime import datetime, UTC +from typing import Dict, Any, List, Optional +from modules.connectors.connectorSharepoint import ConnectorSharepoint +from modules.connectors.connectorTicketJira import ConnectorTicketJira +from modules.interfaces.interfaceAppObjects import getRootInterface +from modules.interfaces.interfaceAppModel import UserInDB +from modules.interfaces.interfaceTicketObjects import TicketSharepointSyncInterface +from modules.shared.timezoneUtils import get_utc_timestamp +from modules.shared.configuration import APP_CONFIG + +logger = logging.getLogger(__name__) + +# Get environment type from configuration +APP_ENV_TYPE = APP_CONFIG.get("APP_ENV_TYPE", "dev") + + +class ManagerSyncDelta: + """Manages JIRA to SharePoint synchronization for Delta Group.""" + #SHAREPOINT_SITE_ID = "02830618-4029-4dc8-8d3d-f5168f282249" + #SHAREPOINT_SITE_NAME = "SteeringBPM" + #SHAREPOINT_MAIN_FOLDER = "/sites/SteeringBPM/Freigegebene Dokumente/General/50 Docs hosted by SELISE" + #SHAREPOINT_BACKUP_FOLDER = "/sites/SteeringBPM/Freigegebene Dokumente/General/50 Docs hosted by SELISE/SyncHistory" + #SHAREPOINT_AUDIT_FOLDER = "/sites/SteeringBPM/Freigegebene Dokumente/General/50 Docs hosted by SELISE/SyncHistory" + + # SharePoint site constants using hostname + site path (resolve real site ID at runtime) + SHAREPOINT_HOSTNAME = "pcuster.sharepoint.com" + SHAREPOINT_SITE_PATH = "KM.DELTAG.20968511411" + SHAREPOINT_SITE_NAME = "KM.DELTAG.20968511411" + # Drive-relative (document library) paths, not server-relative "/sites/..." + # Note: Default library name is "Shared Documents" in Graph + SHAREPOINT_MAIN_FOLDER = "1_Arbeitsbereich" + SHAREPOINT_BACKUP_FOLDER = "1_Arbeitsbereich/SyncHistory" + SHAREPOINT_AUDIT_FOLDER = "1_Arbeitsbereich/SyncHistory" + + # Fixed filename for the main CSV file (like original synchronizer) + SYNC_FILE_NAME = "DELTAgroup x SELISE Ticket Exchange List.csv" + + # JIRA connection parameters (hardcoded for Delta Group) + JIRA_USERNAME = "p.motsch@valueon.ch" + JIRA_API_TOKEN = "ATATT3xFfGF0d973nNb3R1wTDI4lesmJfJAmooS-4cYMJTyLfwYv4himrE6yyCxyX3aSMfl34NHcm2fAXeFXrLHUzJx0RQVUBonCFnlgexjLQTgS5BoCbSO7dwAVjlcHZZkArHbooCUaRwJ15n6AHkm-nwdjLQ3Z74TFnKKUZC4uhuh3Aj-MuX8=2D7124FA" + JIRA_URL = "https://deltasecurity.atlassian.net" + JIRA_PROJECT_CODE = "DCS" + JIRA_ISSUE_TYPE = "Task" + + # Task sync definition for field mapping (like original synchronizer) + TASK_SYNC_DEFINITION = { + "ID": ["get", ["key"]], + "Summary": ["get", ["fields", "summary"]], + "Status": ["get", ["fields", "status", "name"]], + "Assignee": ["get", ["fields", "assignee", "displayName"]], + "Reporter": ["get", ["fields", "reporter", "displayName"]], + "Created": ["get", ["fields", "created"]], + "Updated": ["get", ["fields", "updated"]], + "Priority": ["get", ["fields", "priority", "name"]], + "IssueType": ["get", ["fields", "issuetype", "name"]], + "Project": ["get", ["fields", "project", "name"]], + "Description": ["get", ["fields", "description"]], + } + + def __init__(self): + """Initialize the sync manager with hardcoded Delta Group credentials.""" + self.root_interface = getRootInterface() + self.jira_connector = None + self.sharepoint_connector = None + self.target_site = None + + async def initialize_connectors(self) -> bool: + """Initialize JIRA and SharePoint connectors.""" + try: + logger.info("Initializing JIRA connector with hardcoded credentials") + + # Initialize JIRA connector using class constants + self.jira_connector = await ConnectorTicketJira.create( + jira_username=self.JIRA_USERNAME, + jira_api_token=self.JIRA_API_TOKEN, + jira_url=self.JIRA_URL, + project_code=self.JIRA_PROJECT_CODE, + issue_type=self.JIRA_ISSUE_TYPE + ) + + # Use the current logged-in user from root interface + activeUser = self.root_interface.currentUser + if not activeUser: + logger.error("No current user available - SharePoint connection required") + return False + + logger.info(f"Using current user for SharePoint: {activeUser.id}") + + # Get SharePoint connection for this user + user_connections = self.root_interface.getUserConnections(activeUser.id) + sharepoint_connection = None + + for connection in user_connections: + if connection.authority == "msft": + sharepoint_connection = connection + break + + if not sharepoint_connection: + logger.error("No SharePoint connection found for Delta Group user") + return False + + logger.info(f"Found SharePoint connection: {sharepoint_connection.id}") + + # Get SharePoint token for this connection + sharepoint_token = self.root_interface.getConnectionToken(sharepoint_connection.id) + if not sharepoint_token: + logger.error("No SharePoint token found for Delta Group user connection") + return False + + logger.info(f"Found SharePoint token: {sharepoint_token.id}") + + # Initialize SharePoint connector with Graph API + self.sharepoint_connector = ConnectorSharepoint(access_token=sharepoint_token.tokenAccess) + + # Resolve the site by hostname + site path to get the real site ID + logger.info( + f"Resolving site ID via hostname+path: {self.SHAREPOINT_HOSTNAME}:/sites/{self.SHAREPOINT_SITE_PATH}" + ) + resolved = await self.sharepoint_connector.find_site_by_url( + hostname=self.SHAREPOINT_HOSTNAME, + site_path=self.SHAREPOINT_SITE_PATH + ) + + if not resolved: + logger.error( + f"Failed to resolve site. Hostname: {self.SHAREPOINT_HOSTNAME}, Path: {self.SHAREPOINT_SITE_PATH}" + ) + return False + + self.target_site = { + "id": resolved.get("id"), + "displayName": resolved.get("displayName", self.SHAREPOINT_SITE_NAME), + "name": resolved.get("name", self.SHAREPOINT_SITE_NAME) + } + + # Test site access by listing root of the drive + logger.info("Testing site access using resolved site ID...") + test_result = await self.sharepoint_connector.list_folder_contents( + site_id=self.target_site["id"], + folder_path="" + ) + + if test_result is not None: + logger.info( + f"Site access confirmed: {self.target_site['displayName']} (ID: {self.target_site['id']})" + ) + else: + logger.error("Could not access site drive - check permissions") + return False + + return True + + except Exception as e: + logger.error(f"Error initializing connectors: {str(e)}") + return False + + async def sync_jira_to_sharepoint(self) -> bool: + """Perform the main JIRA to SharePoint synchronization using sophisticated sync logic.""" + try: + logger.info("Starting JIRA to SharePoint synchronization") + + # Initialize connectors + if not await self.initialize_connectors(): + logger.error("Failed to initialize connectors") + return False + + # Create the sophisticated sync interface + sync_interface = await TicketSharepointSyncInterface.create( + connector_ticket=self.jira_connector, + connector_sharepoint=self.sharepoint_connector, + task_sync_definition=self.TASK_SYNC_DEFINITION, + sync_folder=self.SHAREPOINT_MAIN_FOLDER, + sync_file=self.SYNC_FILE_NAME, + backup_folder=self.SHAREPOINT_BACKUP_FOLDER, + audit_folder=self.SHAREPOINT_AUDIT_FOLDER, + site_id=self.target_site['id'] + ) + + # Perform the sophisticated sync + logger.info("Performing sophisticated JIRA to CSV sync...") + await sync_interface.sync_from_jira_to_csv() + + logger.info("JIRA to SharePoint synchronization completed successfully") + return True + + except Exception as e: + logger.error(f"Error during JIRA to SharePoint synchronization: {str(e)}") + return False + + + +# Global sync function for use in app.py +async def perform_sync_jira_delta_group() -> bool: + """Perform JIRA to SharePoint synchronization for Delta Group. + + This function is called by the scheduler and can be used independently. + + Returns: + bool: True if synchronization was successful, False otherwise + """ + try: + if APP_ENV_TYPE != "prod": + logger.info("JIRA to SharePoint synchronization: TASK to run only in PROD") + return True + + logger.info("Starting Delta Group JIRA sync...") + + + sync_manager = ManagerSyncDelta() + success = await sync_manager.sync_jira_to_sharepoint() + + if success: + logger.info("Delta Group JIRA sync completed successfully") + else: + logger.error("Delta Group JIRA sync failed") + + return success + + except Exception as e: + logger.error(f"Error in perform_sync_jira_delta_group: {str(e)}") + return False