ready mvp3

This commit is contained in:
ValueOn AG 2025-09-09 13:26:12 +02:00
parent 3525b72c2c
commit b7a7ebedcb
5 changed files with 827 additions and 350 deletions

39
app.py
View file

@ -4,6 +4,7 @@ os.environ["NUMEXPR_MAX_THREADS"] = "12"
from fastapi import FastAPI, HTTPException, Depends, Body, status, Response
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from zoneinfo import ZoneInfo
import logging
from logging.handlers import RotatingFileHandler
@ -11,6 +12,8 @@ from datetime import timedelta
import pathlib
from modules.shared.configuration import APP_CONFIG
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
def initLogging():
"""Initialize logging with configuration from APP_CONFIG"""
@ -147,10 +150,43 @@ async def lifespan(app: FastAPI):
from modules.interfaces.interfaceAppObjects import getRootInterface
getRootInterface()
# Setup APScheduler for JIRA sync
scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Zurich"))
try:
from modules.workflow.managerSyncDelta import perform_sync_jira_delta_group
# Schedule hourly sync at minute 0
scheduler.add_job(
perform_sync_jira_delta_group,
CronTrigger(minute="0"),
id="jira_delta_group_sync",
replace_existing=True,
coalesce=True,
max_instances=1,
misfire_grace_time=1800,
)
scheduler.start()
logger.info("APScheduler started (jira_delta_group_sync hourly)")
# Run initial sync on startup (non-blocking failure)
try:
logger.info("Running initial JIRA sync on app startup...")
await perform_sync_jira_delta_group()
logger.info("Initial JIRA sync completed successfully")
except Exception as e:
logger.error(f"Initial JIRA sync failed: {str(e)}")
except Exception as e:
logger.error(f"Failed to initialize scheduler or JIRA sync: {str(e)}")
yield
# Shutdown logic
logger.info("Application has been shut down")
try:
if 'scheduler' in locals() and scheduler.running:
scheduler.shutdown(wait=False)
logger.info("APScheduler stopped")
except Exception as e:
logger.error(f"Error shutting down scheduler: {str(e)}")
# START APP
app = FastAPI(
@ -212,6 +248,3 @@ app.include_router(msftRouter)
from modules.routes.routeSecurityGoogle import router as googleRouter
app.include_router(googleRouter)
from modules.routes.routeJira import router as jiraRouter
app.include_router(jiraRouter)

View file

@ -1,180 +1,443 @@
"""Connector for CRUD sharepoint operations."""
"""Connector for SharePoint operations using Microsoft Graph API."""
import logging
import json
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime
from io import BytesIO
from typing import Optional
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
logger = logging.getLogger(__name__)
@dataclass
class ConnectorSharepoint:
ctx: ClientContext
@classmethod
async def create(cls, ctx: ClientContext) -> "ConnectorSharepoint":
"""Creates an instance of the Sharepoint connector.
Params:
ctx: The ClientContext instance.
Returns:
ConnectorSharepoint: An instance of the Sharepoint connector.
"""SharePoint connector using Microsoft Graph API for reliable authentication."""
def __init__(self, access_token: str):
"""Initialize with access token.
Args:
access_token: Microsoft Graph access token
"""
return cls(ctx=ctx)
@classmethod
def get_client_context_from_username_password(
cls, site_url: str, username: str, password: str
) -> ClientContext:
"""Creates a ClientContext instance from username and password.
Params:
site_url: The URL of the SharePoint site.
username: The username for authentication.
password: The password for authentication.
Returns:
ClientContext: An instance of the ClientContext.
"""
return ClientContext(site_url).with_user_credentials(username, password)
@classmethod
def get_client_context_from_app(
cls, site_url: str, client_id: str, client_secret: str
) -> ClientContext:
"""Creates a ClientContext instance from client ID and client secret.
Params:
site_url: The URL of the SharePoint site.
client_id: The client ID for authentication.
client_secret: The client secret for authentication.
Returns:
ClientContext: An instance of the ClientContext.
"""
return ClientContext(site_url).with_client_credentials(
client_id=client_id, client_secret=client_secret
)
def copy_file(
self, *, source_folder: str, source_file: str, dest_folder: str, dest_file: str
) -> bool:
"""Copy a file from one SharePoint location to another.
Params:
source_folder: Source folder path (server-relative)
source_file: Source file name
dest_folder: Destination folder path (server-relative)
dest_file: Destination file name
Returns:
bool: True if successful, False otherwise
"""
source_path = f"{source_folder.rstrip('/')}/{source_file}"
dest_path = f"{dest_folder.rstrip('/')}/{dest_file}"
source_file_obj = self.ctx.web.get_file_by_server_relative_url(source_path)
source_file_obj.copyto(dest_path).execute_query()
return True
async def copy_file_async(
self, *, source_folder: str, source_file: str, dest_folder: str, dest_file: str
) -> bool:
"""Copy a file from one SharePoint location to another (async version).
Params:
source_folder: Source folder path (server-relative)
source_file: Source file name
dest_folder: Destination folder path (server-relative)
dest_file: Destination file name
Returns:
bool: True if successful, False otherwise
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(
executor,
lambda: self.copy_file(
source_folder=source_folder,
source_file=source_file,
dest_folder=dest_folder,
dest_file=dest_file,
),
self.access_token = access_token
self.base_url = "https://graph.microsoft.com/v1.0"
async def _make_graph_api_call(self, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]:
"""Make a Microsoft Graph API call with proper error handling."""
try:
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json"
}
# Remove leading slash from endpoint to avoid double slash
clean_endpoint = endpoint.lstrip('/')
url = f"{self.base_url}/{clean_endpoint}"
logger.debug(f"Making Graph API call: {method} {url}")
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
if method == "GET":
async with session.get(url, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
logger.error(f"Graph API call failed: {response.status} - {error_text}")
return {"error": f"API call failed: {response.status} - {error_text}"}
elif method == "PUT":
async with session.put(url, headers=headers, data=data) as response:
if response.status in [200, 201]:
return await response.json()
else:
error_text = await response.text()
logger.error(f"Graph API call failed: {response.status} - {error_text}")
return {"error": f"API call failed: {response.status} - {error_text}"}
elif method == "POST":
async with session.post(url, headers=headers, data=data) as response:
if response.status in [200, 201]:
return await response.json()
else:
error_text = await response.text()
logger.error(f"Graph API call failed: {response.status} - {error_text}")
return {"error": f"API call failed: {response.status} - {error_text}"}
except asyncio.TimeoutError:
logger.error(f"Graph API call timed out after 30 seconds: {endpoint}")
return {"error": f"API call timed out after 30 seconds: {endpoint}"}
except Exception as e:
logger.error(f"Error making Graph API call: {str(e)}")
return {"error": f"Error making Graph API call: {str(e)}"}
async def discover_sites(self) -> List[Dict[str, Any]]:
"""Discover all SharePoint sites accessible to the user."""
try:
result = await self._make_graph_api_call("sites?search=*")
if "error" in result:
logger.error(f"Error discovering SharePoint sites: {result['error']}")
return []
sites = result.get("value", [])
logger.info(f"Discovered {len(sites)} SharePoint sites")
processed_sites = []
for site in sites:
site_info = {
"id": site.get("id"),
"displayName": site.get("displayName"),
"name": site.get("name"),
"webUrl": site.get("webUrl"),
"description": site.get("description"),
"createdDateTime": site.get("createdDateTime"),
"lastModifiedDateTime": site.get("lastModifiedDateTime")
}
processed_sites.append(site_info)
logger.debug(f"Site: {site_info['displayName']} - {site_info['webUrl']}")
return processed_sites
except Exception as e:
logger.error(f"Error discovering SharePoint sites: {str(e)}")
return []
async def find_site_by_name(self, site_name: str) -> Optional[Dict[str, Any]]:
"""Find a specific SharePoint site by name using direct Graph API call."""
try:
# Try to get the site directly by name using Graph API
endpoint = f"sites/{site_name}"
result = await self._make_graph_api_call(endpoint)
if result and "error" not in result:
site_info = {
"id": result.get("id"),
"displayName": result.get("displayName"),
"name": result.get("name"),
"webUrl": result.get("webUrl"),
"description": result.get("description"),
"createdDateTime": result.get("createdDateTime"),
"lastModifiedDateTime": result.get("lastModifiedDateTime")
}
logger.info(f"Found site directly: {site_info['displayName']} - {site_info['webUrl']}")
return site_info
except Exception as e:
logger.debug(f"Direct site lookup failed for '{site_name}': {str(e)}")
# Fallback to discovery if direct lookup fails
logger.info(f"Direct lookup failed, trying discovery for site: {site_name}")
sites = await self.discover_sites()
if not sites:
logger.warning("No sites discovered")
return None
logger.info(f"Discovered {len(sites)} SharePoint sites:")
for site in sites:
logger.info(f" - {site.get('displayName', 'Unknown')} (ID: {site.get('id', 'Unknown')})")
# Try exact match first
for site in sites:
if site.get("displayName", "").strip().lower() == site_name.strip().lower():
logger.info(f"Found exact match: {site.get('displayName')}")
return site
# Try partial match
for site in sites:
if site_name.lower() in site.get("displayName", "").lower():
logger.info(f"Found partial match: {site.get('displayName')}")
return site
logger.warning(f"No site found matching: {site_name}")
return None
async def find_site_by_web_url(self, web_url: str) -> Optional[Dict[str, Any]]:
"""Find a SharePoint site using its web URL (useful for guest sites)."""
try:
# Use the web URL format: sites/{hostname}:/sites/{site-path}
# Extract hostname and site path from the web URL
if not web_url.startswith("https://"):
web_url = f"https://{web_url}"
# Parse the URL to extract hostname and site path
from urllib.parse import urlparse
parsed = urlparse(web_url)
hostname = parsed.hostname
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) >= 2 and path_parts[0] == 'sites':
site_path = '/'.join(path_parts[1:]) # Everything after 'sites/'
else:
logger.error(f"Invalid SharePoint URL format: {web_url}")
return None
endpoint = f"sites/{hostname}:/sites/{site_path}"
logger.debug(f"Trying web URL format: {endpoint}")
result = await self._make_graph_api_call(endpoint)
if result and "error" not in result:
site_info = {
"id": result.get("id"),
"displayName": result.get("displayName"),
"name": result.get("name"),
"webUrl": result.get("webUrl"),
"description": result.get("description"),
"createdDateTime": result.get("createdDateTime"),
"lastModifiedDateTime": result.get("lastModifiedDateTime")
}
logger.info(f"Found site by web URL: {site_info['displayName']} - {site_info['webUrl']} (ID: {site_info['id']})")
return site_info
else:
logger.warning(f"Site not found using web URL: {web_url}")
return None
except Exception as e:
logger.error(f"Error finding site by web URL: {str(e)}")
return None
async def find_site_by_url(self, hostname: str, site_path: str) -> Optional[Dict[str, Any]]:
"""Find a SharePoint site using the site URL format."""
try:
# For guest sites, try different URL formats
url_formats = [
f"sites/{hostname}:/sites/{site_path}", # Standard format
f"sites/{hostname}:/sites/{site_path}/", # With trailing slash
f"sites/{hostname}:/sites/{site_path.lower()}", # Lowercase
f"sites/{hostname}:/sites/{site_path.lower()}/", # Lowercase with slash
]
for endpoint in url_formats:
logger.debug(f"Trying URL format: {endpoint}")
result = await self._make_graph_api_call(endpoint)
if result and "error" not in result:
site_info = {
"id": result.get("id"),
"displayName": result.get("displayName"),
"name": result.get("name"),
"webUrl": result.get("webUrl"),
"description": result.get("description"),
"createdDateTime": result.get("createdDateTime"),
"lastModifiedDateTime": result.get("lastModifiedDateTime")
}
logger.info(f"Found site by URL: {site_info['displayName']} - {site_info['webUrl']} (ID: {site_info['id']})")
return site_info
else:
logger.debug(f"URL format failed: {endpoint} - {result.get('error', 'Unknown error')}")
logger.warning(f"Site not found using any URL format for: {hostname}:/sites/{site_path}")
return None
except Exception as e:
logger.error(f"Error finding site by URL: {str(e)}")
return None
async def get_folder_by_path(self, site_id: str, folder_path: str) -> Optional[Dict[str, Any]]:
"""Get folder information by path within a site."""
try:
# Clean the path
clean_path = folder_path.lstrip('/')
endpoint = f"sites/{site_id}/drive/root:/{clean_path}"
result = await self._make_graph_api_call(endpoint)
if "error" in result:
logger.warning(f"Folder not found at path {folder_path}: {result['error']}")
return None
return result
except Exception as e:
logger.error(f"Error getting folder by path: {str(e)}")
return None
async def upload_file(self, site_id: str, folder_path: str, file_name: str, content: bytes) -> Dict[str, Any]:
"""Upload a file to SharePoint."""
try:
# Clean the path
clean_path = folder_path.lstrip('/')
upload_path = f"{clean_path.rstrip('/')}/{file_name}"
endpoint = f"sites/{site_id}/drive/root:/{upload_path}:/content"
logger.info(f"Uploading file to: {endpoint}")
result = await self._make_graph_api_call(endpoint, method="PUT", data=content)
if "error" in result:
logger.error(f"Upload failed: {result['error']}")
return result
logger.info(f"File uploaded successfully: {file_name}")
return result
except Exception as e:
logger.error(f"Error uploading file: {str(e)}")
return {"error": f"Error uploading file: {str(e)}"}
async def download_file(self, site_id: str, file_id: str) -> Optional[bytes]:
"""Download a file from SharePoint."""
try:
endpoint = f"sites/{site_id}/drive/items/{file_id}/content"
headers = {"Authorization": f"Bearer {self.access_token}"}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(f"{self.base_url}/{endpoint}", headers=headers) as response:
if response.status == 200:
return await response.read()
else:
logger.error(f"Download failed: {response.status}")
return None
except Exception as e:
logger.error(f"Error downloading file: {str(e)}")
return None
async def list_folder_contents(self, site_id: str, folder_path: str = "") -> List[Dict[str, Any]]:
"""List contents of a folder."""
try:
if not folder_path or folder_path == "/":
endpoint = f"sites/{site_id}/drive/root/children"
else:
clean_path = folder_path.lstrip('/')
endpoint = f"sites/{site_id}/drive/root:/{clean_path}:/children"
result = await self._make_graph_api_call(endpoint)
if "error" in result:
logger.warning(f"Failed to list folder contents: {result['error']}")
return None
items = result.get("value", [])
processed_items = []
for item in items:
# Determine if it's a folder or file
is_folder = 'folder' in item
item_info = {
"id": item.get("id"),
"name": item.get("name"),
"type": "folder" if is_folder else "file",
"size": item.get("size", 0),
"createdDateTime": item.get("createdDateTime"),
"lastModifiedDateTime": item.get("lastModifiedDateTime"),
"webUrl": item.get("webUrl")
}
if "file" in item:
item_info["mimeType"] = item["file"].get("mimeType")
item_info["downloadUrl"] = item.get("@microsoft.graph.downloadUrl")
if "folder" in item:
item_info["childCount"] = item["folder"].get("childCount", 0)
processed_items.append(item_info)
return processed_items
except Exception as e:
logger.error(f"Error listing folder contents: {str(e)}")
return []
async def search_files(self, site_id: str, query: str) -> List[Dict[str, Any]]:
"""Search for files in a site."""
try:
search_query = query.replace("'", "''") # Escape single quotes for OData
endpoint = f"sites/{site_id}/drive/root/search(q='{search_query}')"
result = await self._make_graph_api_call(endpoint)
if "error" in result:
logger.warning(f"Search failed: {result['error']}")
return []
items = result.get("value", [])
processed_items = []
for item in items:
is_folder = 'folder' in item
item_info = {
"id": item.get("id"),
"name": item.get("name"),
"type": "folder" if is_folder else "file",
"size": item.get("size", 0),
"createdDateTime": item.get("createdDateTime"),
"lastModifiedDateTime": item.get("lastModifiedDateTime"),
"webUrl": item.get("webUrl"),
"parentPath": item.get("parentReference", {}).get("path", "")
}
if "file" in item:
item_info["mimeType"] = item["file"].get("mimeType")
item_info["downloadUrl"] = item.get("@microsoft.graph.downloadUrl")
processed_items.append(item_info)
return processed_items
except Exception as e:
logger.error(f"Error searching files: {str(e)}")
return []
async def copy_file_async(self, site_id: str, source_folder: str, source_file: str, dest_folder: str, dest_file: str) -> None:
"""Copy a file from source to destination folder (like original synchronizer)."""
try:
# First, download the source file
source_path = f"{source_folder}/{source_file}"
file_content = await self.download_file_by_path(site_id=site_id, file_path=source_path)
if not file_content:
raise Exception(f"Failed to download source file: {source_path}")
# Upload to destination
await self.upload_file(
site_id=site_id,
folder_path=dest_folder,
file_name=dest_file,
content=file_content
)
logger.info(f"File copied: {source_file} -> {dest_file}")
except Exception as e:
logger.error(f"Error copying file: {str(e)}")
raise
async def download_file_by_path(self, site_id: str, file_path: str) -> Optional[bytes]:
"""Download a file by its path within a site."""
try:
# Clean the path
clean_path = file_path.strip('/')
endpoint = f"sites/{site_id}/drive/root:/{clean_path}:/content"
# Use direct HTTP call for file downloads (binary content)
headers = {
"Authorization": f"Bearer {self.access_token}",
}
# Remove leading slash from endpoint to avoid double slash
clean_endpoint = endpoint.lstrip('/')
url = f"{self.base_url}/{clean_endpoint}"
logger.debug(f"Downloading file: GET {url}")
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
return await response.read()
else:
error_text = await response.text()
logger.error(f"File download failed: {response.status} - {error_text}")
return None
except Exception as e:
logger.error(f"Error downloading file by path: {str(e)}")
return None
def read_file(self, *, folder_path: str, file_name: str) -> bytes:
"""Read a file from SharePoint and return its content as bytes.
Params:
folder_path: Folder path (server-relative)
file_name: File name
Returns:
bytes: File content as bytes
"""
file_path = f"{folder_path.rstrip('/')}/{file_name}"
response = File.open_binary(self.ctx, file_path)
return response.content
async def read_file_async(self, *, folder_path: str, file_name: str) -> bytes:
"""Read a file from SharePoint and return its content as bytes (async version).
Params:
folder_path: Folder path (server-relative)
file_name: File name
Returns:
bytes: File content as bytes
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(
executor,
lambda: self.read_file(folder_path=folder_path, file_name=file_name),
)
def overwrite_file(
self, *, folder_path: str, file_name: str, content: bytes
) -> bool:
"""Write content to a SharePoint file, overwriting if it exists.
Params:
folder_path: Target folder path (server-relative)
file_name: Target file name
content: File content as bytes
Returns:
bool: True if successful, False otherwise
"""
target_folder = self.ctx.web.get_folder_by_server_relative_url(folder_path)
target_folder.upload_file(file_name, content).execute_query()
return True
async def overwrite_file_async(
self, *, folder_path: str, file_name: str, content: bytes
) -> bool:
"""Write content to a SharePoint file, overwriting if it exists (async version).
Params:
folder_path: Target folder path (server-relative)
file_name: Target file name
content: File content as bytes
Returns:
bool: True if successful, False otherwise
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(
executor,
lambda: self.overwrite_file(
folder_path=folder_path,
file_name=file_name,
content=content,
),
)

View file

@ -6,8 +6,7 @@ from modules.shared.timezoneUtils import get_utc_now
from modules.connectors.connectorSharepoint import ConnectorSharepoint
from modules.interfaces.interfaceTicketModel import TicketBase
from modules.interfaces.interfaceTicketModel import Task
from modules.interfaces.interfaceTicketModel import TicketBase, Task
@dataclass(slots=True)
@ -19,6 +18,7 @@ class TicketSharepointSyncInterface:
sync_file: str
backup_folder: str
audit_folder: str
site_id: str # Keep for compatibility but not used with REST API
@classmethod
async def create(
@ -30,6 +30,7 @@ class TicketSharepointSyncInterface:
sync_file: str,
backup_folder: str,
audit_folder: str,
site_id: str,
) -> "TicketSharepointSyncInterface":
return cls(
connector_ticket=connector_ticket,
@ -39,6 +40,7 @@ class TicketSharepointSyncInterface:
sync_file=sync_file,
backup_folder=backup_folder,
audit_folder=audit_folder,
site_id=site_id,
)
async def create_backup(self):
@ -47,6 +49,7 @@ class TicketSharepointSyncInterface:
backup_filename = f"backup_{timestamp}_{self.sync_file}"
await self.connector_sharepoint.copy_file_async(
site_id=self.site_id,
source_folder=self.sync_folder,
source_file=self.sync_file,
dest_folder=self.backup_folder,
@ -83,8 +86,10 @@ class TicketSharepointSyncInterface:
try:
timestamp = get_utc_now().strftime("%Y%m%d_%H%M%S")
jira_export_filename = f"jira_export_{timestamp}.csv"
jira_export_content = self._create_csv_content(jira_data)
await self.connector_sharepoint.overwrite_file_async(
# Use default headers for JIRA export
jira_export_content = self._create_csv_content(jira_data, {"header1": "JIRA Export", "header2": "Raw Data"})
await self.connector_sharepoint.upload_file(
site_id=self.site_id,
folder_path=self.audit_folder,
file_name=jira_export_filename,
content=jira_export_content,
@ -111,18 +116,35 @@ class TicketSharepointSyncInterface:
audit_log.append("Step 5: Reading existing CSV file...")
existing_data = []
existing_file_found = False
existing_headers = {"header1": "", "header2": ""}
try:
csv_content = await self.connector_sharepoint.read_file_async(
folder_path=self.sync_folder, file_name=self.sync_file
file_path = f"{self.sync_folder}/{self.sync_file}"
csv_content = await self.connector_sharepoint.download_file_by_path(
site_id=self.site_id, file_path=file_path
)
# Read the first two lines to get headers
csv_lines = csv_content.decode('utf-8').split('\n')
if len(csv_lines) >= 2:
# Store the raw first two lines as headers (preserving original formatting)
existing_headers["header1"] = csv_lines[0].rstrip('\r\n')
existing_headers["header2"] = csv_lines[1].rstrip('\r\n')
# Try to read with robust CSV parsing (skip first 2 rows)
df_existing = pd.read_csv(
BytesIO(csv_content), skiprows=2
) # Skip header rows
BytesIO(csv_content),
skiprows=2,
quoting=1, # QUOTE_ALL
escapechar='\\',
on_bad_lines='skip', # Skip malformed lines
engine='python' # More robust parsing
)
existing_data = df_existing.to_dict("records")
existing_file_found = True
audit_log.append(
f"Existing CSV file found with {len(existing_data)} records"
)
audit_log.append(f"Preserved headers: Header1='{existing_headers['header1']}', Header2='{existing_headers['header2']}'")
except Exception as e:
audit_log.append(f"No existing CSV file found or read error: {str(e)}")
audit_log.append("")
@ -149,8 +171,9 @@ class TicketSharepointSyncInterface:
# 7. Create CSV with 4-row structure and write to SharePoint
audit_log.append("Step 7: Writing updated CSV to SharePoint...")
csv_content = self._create_csv_content(merged_data)
await self.connector_sharepoint.overwrite_file_async(
csv_content = self._create_csv_content(merged_data, existing_headers)
await self.connector_sharepoint.upload_file(
site_id=self.site_id,
folder_path=self.sync_folder,
file_name=self.sync_file,
content=csv_content,
@ -196,10 +219,19 @@ class TicketSharepointSyncInterface:
# 1. Read CSV file from SharePoint
audit_log.append("Step 1: Reading CSV file from SharePoint...")
try:
csv_content = await self.connector_sharepoint.read_file_async(
folder_path=self.sync_folder, file_name=self.sync_file
file_path = f"{self.sync_folder}/{self.sync_file}"
csv_content = await self.connector_sharepoint.download_file_by_path(
site_id=self.site_id, file_path=file_path
)
# Try to read with robust CSV parsing
df = pd.read_csv(
BytesIO(csv_content),
skiprows=2,
quoting=1, # QUOTE_ALL
escapechar='\\',
on_bad_lines='skip', # Skip malformed lines
engine='python' # More robust parsing
)
df = pd.read_csv(BytesIO(csv_content), skiprows=2) # Skip header rows
csv_data = df.to_dict("records")
audit_log.append(
f"CSV file read successfully with {len(csv_data)} records"
@ -495,34 +527,71 @@ class TicketSharepointSyncInterface:
# Convert audit log to bytes
audit_content = "\n".join(audit_log).encode("utf-8")
# Debug logging
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Writing audit log to folder: {self.audit_folder}, file: {audit_filename}")
# Write to SharePoint
await self.connector_sharepoint.overwrite_file_async(
await self.connector_sharepoint.upload_file(
site_id=self.site_id,
folder_path=self.audit_folder,
file_name=audit_filename,
content=audit_content,
)
logger.debug("Audit log written successfully")
except Exception as e:
# If audit logging fails, we don't want to break the main sync process
# Just log the error (this could be enhanced with fallback logging)
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Failed to write audit log: {str(e)}")
logger.warning(f"Audit folder: {self.audit_folder}")
logger.warning(f"Operation type: {operation_type}")
import traceback
logger.warning(f"Traceback: {traceback.format_exc()}")
def _create_csv_content(self, data: list[dict]) -> bytes:
def _create_csv_content(self, data: list[dict], existing_headers: dict = None) -> bytes:
"""Create CSV content with 4-row structure matching reference code."""
# Get current timestamp for header
timestamp = get_utc_now().strftime("%Y-%m-%d %H:%M:%S UTC")
# Use existing headers if provided, otherwise use defaults
if existing_headers is None:
existing_headers = {"header1": "Header 1", "header2": "Header 2"}
if not data:
# Build an empty table with the expected columns from schema
cols = list(self.task_sync_definition.keys())
df = pd.DataFrame(columns=cols)
# Row 1 & 2: keep your current banner lines
header_row1 = pd.DataFrame(
[["Header 1"] + [""] * (len(cols) - 1)], columns=cols
)
header_row2 = pd.DataFrame(
[["Header 2"] + [""] * (len(cols) - 1)], columns=cols
)
# Parse existing headers to extract individual columns
import csv as csv_module
header1_text = existing_headers.get("header1", "Header 1")
header2_text = existing_headers.get("header2", "Header 2")
# Parse the existing header rows
header1_reader = csv_module.reader([header1_text])
header2_reader = csv_module.reader([header2_text])
header1_row = next(header1_reader, [])
header2_row = next(header2_reader, [])
# Row 1: Use existing header1 or default
if len(header1_row) >= len(cols):
header_row1_data = header1_row[:len(cols)]
else:
header_row1_data = header1_row + [""] * (len(cols) - len(header1_row))
header_row1 = pd.DataFrame([header_row1_data], columns=cols)
# Row 2: Use existing header2 and add timestamp to second column
if len(header2_row) >= len(cols):
header_row2_data = header2_row[:len(cols)]
else:
header_row2_data = header2_row + [""] * (len(cols) - len(header2_row))
if len(header_row2_data) > 1:
header_row2_data[1] = timestamp
header_row2 = pd.DataFrame([header_row2_data], columns=cols)
# Row 3: table headers
table_headers = pd.DataFrame([cols], columns=cols)
@ -531,7 +600,7 @@ class TicketSharepointSyncInterface:
[header_row1, header_row2, table_headers, df], ignore_index=True
)
csv_text = StringIO()
final_df.to_csv(csv_text, index=False, header=False)
final_df.to_csv(csv_text, index=False, header=False, quoting=1, escapechar='\\')
return csv_text.getvalue().encode("utf-8")
# Create DataFrame from data
@ -542,16 +611,38 @@ class TicketSharepointSyncInterface:
df[column] = df[column].astype("object")
df[column] = df[column].fillna("")
# Create the 4-row structure
# Row 1: Static header row 1
header_row1 = pd.DataFrame(
[["Header 1"] + [""] * (len(df.columns) - 1)], columns=df.columns
)
# Clean data: replace actual line breaks with \n and escape quotes
for column in df.columns:
df[column] = df[column].astype(str).str.replace('\n', '\\n', regex=False)
df[column] = df[column].str.replace('"', '""', regex=False)
# Row 2: Static header row 2 with strict compatibility
header_row2 = pd.DataFrame(
[["Header 2"] + [""] * (len(df.columns) - 1)], columns=df.columns
)
# Create the 4-row structure
# Parse existing headers to extract individual columns
import csv as csv_module
header1_text = existing_headers.get("header1", "Header 1")
header2_text = existing_headers.get("header2", "Header 2")
# Parse the existing header rows
header1_reader = csv_module.reader([header1_text])
header2_reader = csv_module.reader([header2_text])
header1_row = next(header1_reader, [])
header2_row = next(header2_reader, [])
# Row 1: Use existing header1 or default
if len(header1_row) >= len(df.columns):
header_row1_data = header1_row[:len(df.columns)]
else:
header_row1_data = header1_row + [""] * (len(df.columns) - len(header1_row))
header_row1 = pd.DataFrame([header_row1_data], columns=df.columns)
# Row 2: Use existing header2 and add timestamp to second column
if len(header2_row) >= len(df.columns):
header_row2_data = header2_row[:len(df.columns)]
else:
header_row2_data = header2_row + [""] * (len(df.columns) - len(header2_row))
if len(header_row2_data) > 1:
header_row2_data[1] = timestamp
header_row2 = pd.DataFrame([header_row2_data], columns=df.columns)
# Row 3: Table headers (column names)
table_headers = pd.DataFrame([df.columns.tolist()], columns=df.columns)
@ -561,7 +652,7 @@ class TicketSharepointSyncInterface:
[header_row1, header_row2, table_headers, df], ignore_index=True
)
# Convert to CSV bytes (write text, then encode)
# Convert to CSV bytes with proper quoting for fields containing special characters
csv_text = StringIO()
final_df.to_csv(csv_text, index=False, header=False)
final_df.to_csv(csv_text, index=False, header=False, quoting=1, escapechar='\\')
return csv_text.getvalue().encode("utf-8")

View file

@ -1,141 +0,0 @@
# Configure logger
import logging
from fastapi import APIRouter, FastAPI
from contextlib import asynccontextmanager
from zoneinfo import ZoneInfo
from modules.connectors.connectorTicketJira import ConnectorTicketJira
from modules.connectors.connectorSharepoint import ConnectorSharepoint
from modules.interfaces.interfaceTicketObjects import TicketSharepointSyncInterface
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Zurich"))
@asynccontextmanager
async def router_lifespan(app: FastAPI):
# start scheduler when this router is mounted
scheduler.add_job(
perform_sync_jira_delta_group,
CronTrigger(minute="0"), # run at the top of every hour
id="jira_delta_group_sync",
replace_existing=True,
coalesce=True,
max_instances=1,
misfire_grace_time=1800,
)
scheduler.start()
logger.info("APScheduler started (jira_delta_group_sync hourly)")
try:
yield
finally:
if scheduler.running:
scheduler.shutdown(wait=False)
logger.info("APScheduler stopped")
router = APIRouter(
prefix="/api/jira",
tags=["JIRA Sync"],
lifespan=router_lifespan,
)
@router.post("/sync/delta-group")
async def sync_jira_delta_group():
"""Endpoint to trigger JIRA-SharePoint sync for Delta Group project."""
logger.info("Received request to sync JIRA Delta Group project")
await perform_sync_jira_delta_group()
# Return a response
return {"status": "Sync completed"}
async def perform_sync_jira_delta_group():
logger.info("Syncing Jira issues for Delta Group...")
# Sharepoint connection parameters
sharepoint_client_id = None
sharepoint_client_secret = None
sharepoint_site_url = None
# Jira connection parameters
jira_username = "ONHOLD - TASK - p.motsch@valueon.ch"
jira_api_token = "ATATT3xFfGF0d973nNb3R1wTDI4lesmJfJAmooS-4cYMJTyLfwYv4himrE6yyCxyX3aSMfl34NHcm2fAXeFXrLHUzJx0RQVUBonCFnlgexjLQTgS5BoCbSO7dwAVjlcHZZkArHbooCUaRwJ15n6AHkm-nwdjLQ3Z74TFnKKUZC4uhuh3Aj-MuX8=2D7124FA"
jira_url = "https://deltasecurity.atlassian.net"
project_code = "DCS"
issue_type = "Task"
# Basic validation (credentials will be added later)
if not all([sharepoint_client_id, sharepoint_client_secret, sharepoint_site_url]):
raise ValueError("SharePoint credentials not configured")
if not all([jira_username, jira_api_token]):
raise ValueError("JIRA credentials not configured")
# Define the task sync definition
task_sync_definition = {
# key=excel-header, [get:jira>excel | put: excel>jira, jira-xml-field-list]
"ID": ["get", ["key"]],
"Module Category": ["get", ["fields", "customfield_10058", "value"]],
"Summary": ["get", ["fields", "summary"]],
"Description": ["get", ["fields", "description"]],
"References": ["get", ["fields", "customfield_10066"]],
"Priority": ["get", ["fields", "priority", "name"]],
"Issue Status": ["get", ["fields", "customfield_10062"]],
"Assignee": ["get", ["fields", "assignee", "displayName"]],
"Issue Created": ["get", ["fields", "created"]],
"Due Date": ["get", ["fields", "duedate"]],
"DELTA Comments": ["get", ["fields", "customfield_10060"]],
"SELISE Ticket References": ["put", ["fields", "customfield_10067"]],
"SELISE Status Values": ["put", ["fields", "customfield_10065"]],
"SELISE Comments": ["put", ["fields", "customfield_10064"]],
}
# SharePoint file configuration
sync_folder = "/sites/<YourSite>/Shared Documents/TicketSync"
sync_file = "delta_group_selise_ticket_exchange_list.csv"
backup_folder = "/sites/<YourSite>/Shared Documents/TicketSync/Backups"
audit_folder = "/sites/<YourSite>/Shared Documents/TicketSync/AuditLogs"
# Create the jira connector instance
jira_connector = await ConnectorTicketJira.create(
jira_username=jira_username,
jira_api_token=jira_api_token,
jira_url=jira_url,
project_code=project_code,
issue_type=issue_type,
)
# Create the sharepoint connector instance
ctx = ConnectorSharepoint.get_client_context_from_app(
site_url=sharepoint_site_url,
client_id=sharepoint_client_id,
client_secret=sharepoint_client_secret,
)
sharepoint_connector = await ConnectorSharepoint.create(ctx=ctx)
# Create the sync interface instance
sync_interface = await TicketSharepointSyncInterface.create(
connector_ticket=jira_connector,
connector_sharepoint=sharepoint_connector,
task_sync_definition=task_sync_definition,
sync_folder=sync_folder,
sync_file=sync_file,
backup_folder=backup_folder,
audit_folder=audit_folder,
)
# Sync from JIRA to CSV in Sharepoint
await sync_interface.sync_from_jira_to_csv()
# Sync from CSV in Sharepoint to JIRA
await sync_interface.sync_from_csv_to_jira()

View file

@ -0,0 +1,231 @@
"""
Delta Group JIRA-SharePoint Sync Manager
This module handles the synchronization of JIRA tickets to SharePoint using the new
Graph API-based connector architecture.
"""
import logging
import csv
import io
from datetime import datetime, UTC
from typing import Dict, Any, List, Optional
from modules.connectors.connectorSharepoint import ConnectorSharepoint
from modules.connectors.connectorTicketJira import ConnectorTicketJira
from modules.interfaces.interfaceAppObjects import getRootInterface
from modules.interfaces.interfaceAppModel import UserInDB
from modules.interfaces.interfaceTicketObjects import TicketSharepointSyncInterface
from modules.shared.timezoneUtils import get_utc_timestamp
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Get environment type from configuration
APP_ENV_TYPE = APP_CONFIG.get("APP_ENV_TYPE", "dev")
class ManagerSyncDelta:
"""Manages JIRA to SharePoint synchronization for Delta Group."""
#SHAREPOINT_SITE_ID = "02830618-4029-4dc8-8d3d-f5168f282249"
#SHAREPOINT_SITE_NAME = "SteeringBPM"
#SHAREPOINT_MAIN_FOLDER = "/sites/SteeringBPM/Freigegebene Dokumente/General/50 Docs hosted by SELISE"
#SHAREPOINT_BACKUP_FOLDER = "/sites/SteeringBPM/Freigegebene Dokumente/General/50 Docs hosted by SELISE/SyncHistory"
#SHAREPOINT_AUDIT_FOLDER = "/sites/SteeringBPM/Freigegebene Dokumente/General/50 Docs hosted by SELISE/SyncHistory"
# SharePoint site constants using hostname + site path (resolve real site ID at runtime)
SHAREPOINT_HOSTNAME = "pcuster.sharepoint.com"
SHAREPOINT_SITE_PATH = "KM.DELTAG.20968511411"
SHAREPOINT_SITE_NAME = "KM.DELTAG.20968511411"
# Drive-relative (document library) paths, not server-relative "/sites/..."
# Note: Default library name is "Shared Documents" in Graph
SHAREPOINT_MAIN_FOLDER = "1_Arbeitsbereich"
SHAREPOINT_BACKUP_FOLDER = "1_Arbeitsbereich/SyncHistory"
SHAREPOINT_AUDIT_FOLDER = "1_Arbeitsbereich/SyncHistory"
# Fixed filename for the main CSV file (like original synchronizer)
SYNC_FILE_NAME = "DELTAgroup x SELISE Ticket Exchange List.csv"
# JIRA connection parameters (hardcoded for Delta Group)
JIRA_USERNAME = "p.motsch@valueon.ch"
JIRA_API_TOKEN = "ATATT3xFfGF0d973nNb3R1wTDI4lesmJfJAmooS-4cYMJTyLfwYv4himrE6yyCxyX3aSMfl34NHcm2fAXeFXrLHUzJx0RQVUBonCFnlgexjLQTgS5BoCbSO7dwAVjlcHZZkArHbooCUaRwJ15n6AHkm-nwdjLQ3Z74TFnKKUZC4uhuh3Aj-MuX8=2D7124FA"
JIRA_URL = "https://deltasecurity.atlassian.net"
JIRA_PROJECT_CODE = "DCS"
JIRA_ISSUE_TYPE = "Task"
# Task sync definition for field mapping (like original synchronizer)
TASK_SYNC_DEFINITION = {
"ID": ["get", ["key"]],
"Summary": ["get", ["fields", "summary"]],
"Status": ["get", ["fields", "status", "name"]],
"Assignee": ["get", ["fields", "assignee", "displayName"]],
"Reporter": ["get", ["fields", "reporter", "displayName"]],
"Created": ["get", ["fields", "created"]],
"Updated": ["get", ["fields", "updated"]],
"Priority": ["get", ["fields", "priority", "name"]],
"IssueType": ["get", ["fields", "issuetype", "name"]],
"Project": ["get", ["fields", "project", "name"]],
"Description": ["get", ["fields", "description"]],
}
def __init__(self):
"""Initialize the sync manager with hardcoded Delta Group credentials."""
self.root_interface = getRootInterface()
self.jira_connector = None
self.sharepoint_connector = None
self.target_site = None
async def initialize_connectors(self) -> bool:
"""Initialize JIRA and SharePoint connectors."""
try:
logger.info("Initializing JIRA connector with hardcoded credentials")
# Initialize JIRA connector using class constants
self.jira_connector = await ConnectorTicketJira.create(
jira_username=self.JIRA_USERNAME,
jira_api_token=self.JIRA_API_TOKEN,
jira_url=self.JIRA_URL,
project_code=self.JIRA_PROJECT_CODE,
issue_type=self.JIRA_ISSUE_TYPE
)
# Use the current logged-in user from root interface
activeUser = self.root_interface.currentUser
if not activeUser:
logger.error("No current user available - SharePoint connection required")
return False
logger.info(f"Using current user for SharePoint: {activeUser.id}")
# Get SharePoint connection for this user
user_connections = self.root_interface.getUserConnections(activeUser.id)
sharepoint_connection = None
for connection in user_connections:
if connection.authority == "msft":
sharepoint_connection = connection
break
if not sharepoint_connection:
logger.error("No SharePoint connection found for Delta Group user")
return False
logger.info(f"Found SharePoint connection: {sharepoint_connection.id}")
# Get SharePoint token for this connection
sharepoint_token = self.root_interface.getConnectionToken(sharepoint_connection.id)
if not sharepoint_token:
logger.error("No SharePoint token found for Delta Group user connection")
return False
logger.info(f"Found SharePoint token: {sharepoint_token.id}")
# Initialize SharePoint connector with Graph API
self.sharepoint_connector = ConnectorSharepoint(access_token=sharepoint_token.tokenAccess)
# Resolve the site by hostname + site path to get the real site ID
logger.info(
f"Resolving site ID via hostname+path: {self.SHAREPOINT_HOSTNAME}:/sites/{self.SHAREPOINT_SITE_PATH}"
)
resolved = await self.sharepoint_connector.find_site_by_url(
hostname=self.SHAREPOINT_HOSTNAME,
site_path=self.SHAREPOINT_SITE_PATH
)
if not resolved:
logger.error(
f"Failed to resolve site. Hostname: {self.SHAREPOINT_HOSTNAME}, Path: {self.SHAREPOINT_SITE_PATH}"
)
return False
self.target_site = {
"id": resolved.get("id"),
"displayName": resolved.get("displayName", self.SHAREPOINT_SITE_NAME),
"name": resolved.get("name", self.SHAREPOINT_SITE_NAME)
}
# Test site access by listing root of the drive
logger.info("Testing site access using resolved site ID...")
test_result = await self.sharepoint_connector.list_folder_contents(
site_id=self.target_site["id"],
folder_path=""
)
if test_result is not None:
logger.info(
f"Site access confirmed: {self.target_site['displayName']} (ID: {self.target_site['id']})"
)
else:
logger.error("Could not access site drive - check permissions")
return False
return True
except Exception as e:
logger.error(f"Error initializing connectors: {str(e)}")
return False
async def sync_jira_to_sharepoint(self) -> bool:
"""Perform the main JIRA to SharePoint synchronization using sophisticated sync logic."""
try:
logger.info("Starting JIRA to SharePoint synchronization")
# Initialize connectors
if not await self.initialize_connectors():
logger.error("Failed to initialize connectors")
return False
# Create the sophisticated sync interface
sync_interface = await TicketSharepointSyncInterface.create(
connector_ticket=self.jira_connector,
connector_sharepoint=self.sharepoint_connector,
task_sync_definition=self.TASK_SYNC_DEFINITION,
sync_folder=self.SHAREPOINT_MAIN_FOLDER,
sync_file=self.SYNC_FILE_NAME,
backup_folder=self.SHAREPOINT_BACKUP_FOLDER,
audit_folder=self.SHAREPOINT_AUDIT_FOLDER,
site_id=self.target_site['id']
)
# Perform the sophisticated sync
logger.info("Performing sophisticated JIRA to CSV sync...")
await sync_interface.sync_from_jira_to_csv()
logger.info("JIRA to SharePoint synchronization completed successfully")
return True
except Exception as e:
logger.error(f"Error during JIRA to SharePoint synchronization: {str(e)}")
return False
# Global sync function for use in app.py
async def perform_sync_jira_delta_group() -> bool:
"""Perform JIRA to SharePoint synchronization for Delta Group.
This function is called by the scheduler and can be used independently.
Returns:
bool: True if synchronization was successful, False otherwise
"""
try:
if APP_ENV_TYPE != "prod":
logger.info("JIRA to SharePoint synchronization: TASK to run only in PROD")
return True
logger.info("Starting Delta Group JIRA sync...")
sync_manager = ManagerSyncDelta()
success = await sync_manager.sync_jira_to_sharepoint()
if success:
logger.info("Delta Group JIRA sync completed successfully")
else:
logger.error("Delta Group JIRA sync failed")
return success
except Exception as e:
logger.error(f"Error in perform_sync_jira_delta_group: {str(e)}")
return False