# Copyright (c) 2026 PowerOn AG # All rights reserved. """Connector for SharePoint operations using Microsoft Graph API.""" import logging import aiohttp import asyncio import time from typing import Dict, Any, List, Optional, Callable from datetime import datetime, timedelta, timezone logger = logging.getLogger(__name__) # Cache for discoverSites() to avoid hitting Graph API on every folder-options call (e.g. when UI loads site list). # Key: token prefix (per user), Value: (expiry_ts, sites). TTL 5 minutes. _discoverSitesCache: Dict[str, tuple] = {} _DISCOVER_SITES_TTL_SEC = 300 class SharepointService: """SharePoint connector using Microsoft Graph API for reliable authentication.""" def __init__(self, context, get_service: Callable[[str], Any]): """Initialize SharePoint service without access token. Args: context: ServiceCenterContext with user, mandateId, etc. get_service: Service resolver for dependency injection (e.g. security) Use setAccessTokenFromConnection() method to configure the access token before making API calls. """ self._context = context self._getService = get_service self.accessToken = None self.baseUrl = "https://graph.microsoft.com/v1.0" def setAccessTokenFromConnection(self, userConnection) -> bool: """Set access token from UserConnection. Args: userConnection: UserConnection object or dict containing token information Returns: bool: True if token was set successfully, False otherwise """ try: if not userConnection: logger.error("UserConnection is required to set access token") return False # Handle both dict and UserConnection object if isinstance(userConnection, dict): connectionId = userConnection.get('id') else: connectionId = getattr(userConnection, 'id', None) if not connectionId: logger.error("UserConnection must have an 'id' field") return False # Get a fresh token for this specific connection via security service security = self._getService("security") if not security: logger.error("Security service not available for token access") return False token = security.getFreshToken(connectionId) if not token: logger.error(f"No token found for connection {connectionId}") return False self.accessToken = token.tokenAccess logger.info(f"Access token set for connection {connectionId}") return True except Exception as e: logger.error(f"Error setting access token: {str(e)}") return False async def _makeGraphApiCall(self, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]: """Make a Microsoft Graph API call with proper error handling.""" try: if self.accessToken is None: logger.error("Access token is not set. Please call setAccessTokenFromConnection() before using the SharePoint service.") return {"error": "Access token is not set. Please call setAccessTokenFromConnection() before using the SharePoint service."} headers = { "Authorization": f"Bearer {self.accessToken}", "Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json" } # Remove leading slash from endpoint to avoid double slash cleanEndpoint = endpoint.lstrip('/') url = f"{self.baseUrl}/{cleanEndpoint}" logger.debug(f"Making Graph API call: {method} {url}") timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: if method == "GET": async with session.get(url, headers=headers) as response: if response.status == 200: return await response.json() else: error_text = await response.text() logger.error(f"Graph API call failed: {response.status} - {error_text}") return {"error": f"API call failed: {response.status} - {error_text}"} elif method == "PUT": async with session.put(url, headers=headers, data=data) as response: if response.status in [200, 201]: return await response.json() else: error_text = await response.text() logger.error(f"Graph API call failed: {response.status} - {error_text}") return {"error": f"API call failed: {response.status} - {error_text}"} elif method == "POST": async with session.post(url, headers=headers, data=data) as response: if response.status in [200, 201]: return await response.json() else: error_text = await response.text() logger.error(f"Graph API call failed: {response.status} - {error_text}") return {"error": f"API call failed: {response.status} - {error_text}"} elif method == "DELETE": async with session.delete(url, headers=headers) as response: if response.status in [200, 204]: return {} else: error_text = await response.text() logger.error(f"Graph API call failed: {response.status} - {error_text}") return {"error": f"API call failed: {response.status} - {error_text}"} except asyncio.TimeoutError: logger.error(f"Graph API call timed out after 30 seconds: {endpoint}") return {"error": f"API call timed out after 30 seconds: {endpoint}"} except Exception as e: logger.error(f"Error making Graph API call: {str(e)}") return {"error": f"Error making Graph API call: {str(e)}"} async def discoverSites(self) -> List[Dict[str, Any]]: """Discover all SharePoint sites accessible to the user.""" try: result = await self._makeGraphApiCall("sites?search=*") if "error" in result: logger.error(f"Error discovering SharePoint sites: {result['error']}") return [] sites = result.get("value", []) logger.info(f"Discovered {len(sites)} SharePoint sites") processedSites = [] for site in sites: siteInfo = { "id": site.get("id"), "displayName": site.get("displayName"), "name": site.get("name"), "webUrl": site.get("webUrl"), "description": site.get("description"), "createdDateTime": site.get("createdDateTime"), "lastModifiedDateTime": site.get("lastModifiedDateTime") } processedSites.append(siteInfo) logger.debug(f"Site: {siteInfo['displayName']} - {siteInfo['webUrl']}") return processedSites except Exception as e: logger.error(f"Error discovering SharePoint sites: {str(e)}") return [] async def findSiteByName(self, siteName: str) -> Optional[Dict[str, Any]]: """Find a specific SharePoint site by name using direct Graph API call.""" try: # Try to get the site directly by name using Graph API endpoint = f"sites/{siteName}" result = await self._makeGraphApiCall(endpoint) if result and "error" not in result: siteInfo = { "id": result.get("id"), "displayName": result.get("displayName"), "name": result.get("name"), "webUrl": result.get("webUrl"), "description": result.get("description"), "createdDateTime": result.get("createdDateTime"), "lastModifiedDateTime": result.get("lastModifiedDateTime") } logger.info(f"Found site directly: {siteInfo['displayName']} - {siteInfo['webUrl']}") return siteInfo except Exception as e: logger.debug(f"Direct site lookup failed for '{siteName}': {str(e)}") # Fallback to discovery if direct lookup fails logger.info(f"Direct lookup failed, trying discovery for site: {siteName}") sites = await self.discoverSites() if not sites: logger.warning("No sites discovered") return None logger.info(f"Discovered {len(sites)} SharePoint sites:") for site in sites: logger.info(f" - {site.get('displayName', 'Unknown')} (ID: {site.get('id', 'Unknown')})") # Try exact match first for site in sites: if site.get("displayName", "").strip().lower() == siteName.strip().lower(): logger.info(f"Found exact match: {site.get('displayName')}") return site # Try partial match for site in sites: if siteName.lower() in site.get("displayName", "").lower(): logger.info(f"Found partial match: {site.get('displayName')}") return site logger.warning(f"No site found matching: {siteName}") return None async def findSiteByWebUrl(self, webUrl: str) -> Optional[Dict[str, Any]]: """Find a SharePoint site using its web URL (useful for guest sites).""" try: # Use the web URL format: sites/{hostname}:/sites/{site-path} # Extract hostname and site path from the web URL if not webUrl.startswith("https://"): webUrl = f"https://{webUrl}" # Parse the URL to extract hostname and site path from urllib.parse import urlparse parsed = urlparse(webUrl) hostname = parsed.hostname pathParts = parsed.path.strip('/').split('/') if len(pathParts) >= 2 and pathParts[0] == 'sites': sitePath = '/'.join(pathParts[1:]) # Everything after 'sites/' else: logger.error(f"Invalid SharePoint URL format: {webUrl}") return None endpoint = f"sites/{hostname}:/sites/{sitePath}" logger.debug(f"Trying web URL format: {endpoint}") result = await self._makeGraphApiCall(endpoint) if result and "error" not in result: siteInfo = { "id": result.get("id"), "displayName": result.get("displayName"), "name": result.get("name"), "webUrl": result.get("webUrl"), "description": result.get("description"), "createdDateTime": result.get("createdDateTime"), "lastModifiedDateTime": result.get("lastModifiedDateTime") } logger.info(f"Found site by web URL: {siteInfo['displayName']} - {siteInfo['webUrl']} (ID: {siteInfo['id']})") return siteInfo else: logger.warning(f"Site not found using web URL: {webUrl}") return None except Exception as e: logger.error(f"Error finding site by web URL: {str(e)}") return None async def findSiteByUrl(self, hostname: str, sitePath: str) -> Optional[Dict[str, Any]]: """Find a SharePoint site using the site URL format.""" try: # For guest sites, try different URL formats urlFormats = [ f"sites/{hostname}:/sites/{sitePath}", # Standard format f"sites/{hostname}:/sites/{sitePath}/", # With trailing slash f"sites/{hostname}:/sites/{sitePath.lower()}", # Lowercase f"sites/{hostname}:/sites/{sitePath.lower()}/", # Lowercase with slash ] for endpoint in urlFormats: logger.debug(f"Trying URL format: {endpoint}") result = await self._makeGraphApiCall(endpoint) if result and "error" not in result: siteInfo = { "id": result.get("id"), "displayName": result.get("displayName"), "name": result.get("name"), "webUrl": result.get("webUrl"), "description": result.get("description"), "createdDateTime": result.get("createdDateTime"), "lastModifiedDateTime": result.get("lastModifiedDateTime") } logger.info(f"Found site by URL: {siteInfo['displayName']} - {siteInfo['webUrl']} (ID: {siteInfo['id']})") return siteInfo else: logger.debug(f"URL format failed: {endpoint} - {result.get('error', 'Unknown error')}") logger.warning(f"Site not found using any URL format for: {hostname}:/sites/{sitePath}") return None except Exception as e: logger.error(f"Error finding site by URL: {str(e)}") return None async def getFolderByPath(self, siteId: str, folderPath: str) -> Optional[Dict[str, Any]]: """Get folder information by path within a site.""" try: # Clean the path cleanPath = folderPath.lstrip('/') # If path is empty, get root directly if not cleanPath: endpoint = f"sites/{siteId}/drive/root" else: endpoint = f"sites/{siteId}/drive/root:/{cleanPath}" result = await self._makeGraphApiCall(endpoint) if "error" in result: logger.warning(f"Folder not found at path {folderPath}: {result['error']}") return None return result except Exception as e: logger.error(f"Error getting folder by path: {str(e)}") return None async def uploadFile(self, siteId: str, folderPath: str, fileName: str, content: bytes) -> Dict[str, Any]: """Upload a file to SharePoint. Raises on failure.""" cleanPath = folderPath.lstrip('/') uploadPath = f"{cleanPath.rstrip('/')}/{fileName}" endpoint = f"sites/{siteId}/drive/root:/{uploadPath}:/content" logger.info(f"Uploading file to: {endpoint}") result = await self._makeGraphApiCall(endpoint, method="PUT", data=content) if "error" in result: raise Exception(f"Upload failed: {result['error']}") logger.info(f"File uploaded successfully: {fileName}") return result async def downloadFile(self, siteId: str, fileId: str) -> Optional[bytes]: """Download a file from SharePoint.""" try: if self.accessToken is None: logger.error("Access token is not set. Please call setAccessTokenFromConnection() before using the SharePoint service.") return None endpoint = f"sites/{siteId}/drive/items/{fileId}/content" headers = {"Authorization": f"Bearer {self.accessToken}"} timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(f"{self.baseUrl}/{endpoint}", headers=headers) as response: if response.status == 200: return await response.read() else: logger.error(f"Download failed: {response.status}") return None except Exception as e: logger.error(f"Error downloading file: {str(e)}") return None async def listFolderContents(self, siteId: str, folderPath: str = "") -> List[Dict[str, Any]]: """List contents of a folder.""" try: if not folderPath or folderPath == "/": endpoint = f"sites/{siteId}/drive/root/children" else: cleanPath = folderPath.lstrip('/') endpoint = f"sites/{siteId}/drive/root:/{cleanPath}:/children" result = await self._makeGraphApiCall(endpoint) if "error" in result: logger.warning(f"Failed to list folder contents: {result['error']}") return None items = result.get("value", []) processedItems = [] for item in items: # Determine if it's a folder or file isFolder = 'folder' in item itemInfo = { "id": item.get("id"), "name": item.get("name"), "type": "folder" if isFolder else "file", "size": item.get("size", 0), "createdDateTime": item.get("createdDateTime"), "lastModifiedDateTime": item.get("lastModifiedDateTime"), "webUrl": item.get("webUrl") } if "file" in item: itemInfo["mimeType"] = item["file"].get("mimeType") itemInfo["downloadUrl"] = item.get("@microsoft.graph.downloadUrl") if "folder" in item: itemInfo["childCount"] = item["folder"].get("childCount", 0) processedItems.append(itemInfo) return processedItems except Exception as e: logger.error(f"Error listing folder contents: {str(e)}") return [] async def searchFiles(self, siteId: str, query: str) -> List[Dict[str, Any]]: """Search for files in a site.""" try: searchQuery = query.replace("'", "''") # Escape single quotes for OData endpoint = f"sites/{siteId}/drive/root/search(q='{searchQuery}')" result = await self._makeGraphApiCall(endpoint) if "error" in result: logger.warning(f"Search failed: {result['error']}") return [] items = result.get("value", []) processedItems = [] for item in items: isFolder = 'folder' in item itemInfo = { "id": item.get("id"), "name": item.get("name"), "type": "folder" if isFolder else "file", "size": item.get("size", 0), "createdDateTime": item.get("createdDateTime"), "lastModifiedDateTime": item.get("lastModifiedDateTime"), "webUrl": item.get("webUrl"), "parentPath": item.get("parentReference", {}).get("path", "") } if "file" in item: itemInfo["mimeType"] = item["file"].get("mimeType") itemInfo["downloadUrl"] = item.get("@microsoft.graph.downloadUrl") processedItems.append(itemInfo) return processedItems except Exception as e: logger.error(f"Error searching files: {str(e)}") return [] async def copyFileAsync(self, siteId: str, sourceFolder: str, sourceFile: str, destFolder: str, destFile: str) -> None: """Copy a file from source to destination folder (like original synchronizer).""" try: # First, download the source file sourcePath = f"{sourceFolder}/{sourceFile}" fileContent = await self.downloadFileByPath(siteId=siteId, filePath=sourcePath) if not fileContent: raise Exception(f"Failed to download source file: {sourcePath}") # Upload to destination await self.uploadFile( siteId=siteId, folderPath=destFolder, fileName=destFile, content=fileContent ) logger.info(f"File copied: {sourceFile} -> {destFile}") except Exception as e: # Provide more specific error information errorMsg = str(e) if "itemNotFound" in errorMsg or "404" in errorMsg: raise Exception(f"Source file not found (404): {sourcePath} - {errorMsg}") else: raise Exception(f"Error copying file: {errorMsg}") async def deleteFile(self, siteId: str, itemId: str) -> bool: """Delete a file (or folder) from SharePoint by item ID. Returns True on success.""" try: if not siteId or not itemId: logger.warning("deleteFile: siteId and itemId are required") return False endpoint = f"sites/{siteId}/drive/items/{itemId}" result = await self._makeGraphApiCall(endpoint, method="DELETE") if result and "error" in result: logger.warning(f"deleteFile failed: {result.get('error')}") return False return True except Exception as e: logger.error(f"Error deleting file: {str(e)}") return False async def downloadFileByPath(self, siteId: str, filePath: str) -> Optional[bytes]: """Download a file by its path within a site.""" try: if self.accessToken is None: logger.error("Access token is not set. Please call setAccessTokenFromConnection() before using the SharePoint service.") return None # Clean the path cleanPath = filePath.strip('/') endpoint = f"sites/{siteId}/drive/root:/{cleanPath}:/content" # Use direct HTTP call for file downloads (binary content) headers = { "Authorization": f"Bearer {self.accessToken}", } # Remove leading slash from endpoint to avoid double slash cleanEndpoint = endpoint.lstrip('/') url = f"{self.baseUrl}/{cleanEndpoint}" logger.debug(f"Downloading file: GET {url}") timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url, headers=headers) as response: if response.status == 200: return await response.read() else: error_text = await response.text() logger.error(f"File download failed: {response.status} - {error_text}") return None except Exception as e: logger.error(f"Error downloading file by path: {str(e)}") return None async def _getItemById(self, siteId: str, driveId: str, itemId: str) -> Optional[Dict[str, Any]]: """Verify that an item exists by getting it by ID.""" try: endpoint = f"sites/{siteId}/drives/{driveId}/items/{itemId}" result = await self._makeGraphApiCall(endpoint) if "error" in result: logger.warning(f"Item {itemId} not found: {result['error']}") return None return result except Exception as e: logger.warning(f"Error verifying item {itemId}: {str(e)}") return None async def _findDriveForItem(self, siteId: str, itemId: str) -> Optional[str]: """Find which drive contains a specific item by trying to get it from all drives.""" try: endpoint = f"sites/{siteId}/drives" drivesResult = await self._makeGraphApiCall(endpoint) if "error" in drivesResult: logger.warning(f"Could not get drives for site {siteId}: {drivesResult['error']}") return None drives = drivesResult.get("value", []) if not drives: logger.warning(f"No drives found for site {siteId}") return None for drive in drives: driveId = drive.get("id") if not driveId: continue itemInfo = await self._getItemById(siteId, driveId, itemId) if itemInfo: logger.info(f"Found item {itemId} in drive {drive.get('name', driveId)}") return driveId logger.warning(f"Item {itemId} not found in any drive for site {siteId}") return None except Exception as e: logger.warning(f"Error finding drive for item {itemId}: {str(e)}") return None async def getFolderUsageAnalytics(self, siteId: str, driveId: str, itemId: str, startDateTime: Optional[str] = None, endDateTime: Optional[str] = None, interval: str = "day") -> Dict[str, Any]: """Get usage analytics for a folder or file.""" try: if not endDateTime: endDateTime = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') if not startDateTime: startDate = datetime.now(timezone.utc) - timedelta(days=30) startDateTime = startDate.isoformat().replace('+00:00', 'Z') endpoint = f"sites/{siteId}/drives/{driveId}/items/{itemId}/getActivitiesByInterval" endpoint += f"?startDateTime={startDateTime}&endDateTime={endDateTime}&interval={interval}" result = await self._makeGraphApiCall(endpoint) if "error" in result: errorMsg = result.get('error', '') if isinstance(errorMsg, str) and '404' in errorMsg: itemInfo = await self._getItemById(siteId, driveId, itemId) if not itemInfo: correctDriveId = await self._findDriveForItem(siteId, itemId) if correctDriveId and correctDriveId != driveId: endpoint = f"sites/{siteId}/drives/{correctDriveId}/items/{itemId}/getActivitiesByInterval" endpoint += f"?startDateTime={startDateTime}&endDateTime={endDateTime}&interval={interval}" result = await self._makeGraphApiCall(endpoint) if "error" not in result: return result itemInfo = await self._getItemById(siteId, correctDriveId, itemId) if itemInfo: return { "value": [], "note": "No analytics data available for this item. The item exists but may not have activity data or analytics may not be supported for this item type." } else: return result else: return result return result except Exception as e: logger.error(f"Error getting folder usage analytics: {str(e)}") return {"error": f"Error getting folder usage analytics: {str(e)}"} async def getDriveId(self, siteId: str, driveName: Optional[str] = None) -> Optional[str]: """Get drive ID for a site.""" try: endpoint = f"sites/{siteId}/drives" result = await self._makeGraphApiCall(endpoint) if "error" in result: logger.error(f"Error getting drives: {result['error']}") return None drives = result.get("value", []) if not driveName: for drive in drives: if drive.get("name") == "Documents" or drive.get("name") == "Shared Documents": return drive.get("id") if drives: return drives[0].get("id") return None for drive in drives: if drive.get("name", "").lower() == driveName.lower(): return drive.get("id") return None except Exception as e: logger.error(f"Error getting drive ID: {str(e)}") return None def extractSiteFromStandardPath(self, pathQuery: str) -> Optional[Dict[str, str]]: """ Extract site name from Microsoft-standard server-relative path: /sites/company-share/Freigegebene Dokumente/... Returns dict with keys: siteName, innerPath (no leading slash) on success, else None. """ try: if not pathQuery or not pathQuery.startswith('/sites/'): return None remainder = pathQuery[7:] if '/' not in remainder: return {"siteName": remainder, "innerPath": ""} siteName, inner = remainder.split('/', 1) siteName = siteName.strip() innerPath = inner.strip() if not siteName: return None return {"siteName": siteName, "innerPath": innerPath} except Exception as e: logger.error(f"Error extracting site from standard path '{pathQuery}': {str(e)}") return None def _isGraphSiteId(self, sitePath: str) -> bool: """Check if sitePath is a Graph API site ID (hostname,siteId,webId format with 2 commas).""" if not sitePath or sitePath.count(',') != 2: return False parts = sitePath.split(',') return len(parts) == 3 and all(p.strip() for p in parts) async def getSiteByStandardPath(self, sitePath: str, allSites: Optional[List[Dict[str, Any]]] = None) -> Optional[Dict[str, Any]]: """Get SharePoint site directly by Microsoft-standard path (/sites/SiteName) or by site ID.""" try: from urllib.parse import urlparse # When sitePath is a Graph API site ID (host,siteId,webId), use sites/{id} directly if self._isGraphSiteId(sitePath): endpoint = f"sites/{sitePath}" result = await self._makeGraphApiCall(endpoint) if result and "error" not in result: return { "id": result.get("id"), "displayName": result.get("displayName"), "name": result.get("name"), "webUrl": result.get("webUrl"), "description": result.get("description"), "createdDateTime": result.get("createdDateTime"), "lastModifiedDateTime": result.get("lastModifiedDateTime") } return None hostname = None if allSites and len(allSites) > 0: webUrl = allSites[0].get("webUrl", "") hostname = urlparse(webUrl).hostname if webUrl else None if not hostname: rootSite = await self._makeGraphApiCall("sites/root") if rootSite and "webUrl" in rootSite and "error" not in rootSite: hostname = urlparse(rootSite.get("webUrl", "")).hostname if not hostname: minimalSites = await self.discoverSites() if not minimalSites: return None hostname = urlparse(minimalSites[0].get("webUrl", "")).hostname if not hostname: return None endpoint = f"sites/{hostname}:/sites/{sitePath}" result = await self._makeGraphApiCall(endpoint) if "error" in result: return None return { "id": result.get("id"), "displayName": result.get("displayName"), "name": result.get("name"), "webUrl": result.get("webUrl"), "description": result.get("description"), "createdDateTime": result.get("createdDateTime"), "lastModifiedDateTime": result.get("lastModifiedDateTime") } except Exception as e: logger.error(f"Error getting site by standard path '{sitePath}': {str(e)}") return None def filterSitesByHint(self, sites: List[Dict[str, Any]], siteHint: str) -> List[Dict[str, Any]]: """Filter discovered sites by a human-entered site hint (case-insensitive substring).""" try: if not siteHint: return sites hint = siteHint.strip().lower() filtered: List[Dict[str, Any]] = [] for site in sites: name = (site.get("displayName") or "").lower() webUrl = (site.get("webUrl") or "").lower() if hint in name or hint in webUrl: filtered.append(site) return filtered if filtered else sites except Exception as e: logger.error(f"Error filtering sites by hint '{siteHint}': {str(e)}") return sites async def resolveSitesFromPathQuery(self, pathQuery: str, allSites: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: """Resolve sites from pathQuery. Handles both Microsoft-standard paths and regular paths.""" try: if pathQuery.startswith('/sites/'): parsedPath = self.extractSiteFromStandardPath(pathQuery) if parsedPath: siteName = parsedPath.get("siteName") directSite = await self.getSiteByStandardPath(siteName, allSites) if directSite: logger.info(f"Got site directly by standard path - no need to discover all sites") return [directSite] else: logger.warning(f"Could not get site directly, falling back to site discovery") if not allSites: allSites = await self.discoverSites() if not allSites: logger.warning("No SharePoint sites found or accessible") return [] if pathQuery.startswith('/sites/'): parsedPath = self.extractSiteFromStandardPath(pathQuery) if parsedPath: siteName = parsedPath.get("siteName") # When siteName is Graph API composite ID (host,siteId,webId), match by exact id if siteName and ',' in siteName: exact = [s for s in allSites if s.get("id") == siteName] if exact: logger.info(f"Resolved site by exact ID: {siteName}") return exact logger.warning(f"No site found with exact ID '{siteName}'") return [] sites = self.filterSitesByHint(allSites, siteName) if not sites: logger.warning(f"No SharePoint site found matching '{siteName}'") return [] logger.info(f"Filtered to site(s) matching '{siteName}': {[s['displayName'] for s in sites]}") return sites else: return allSites else: return allSites except Exception as e: logger.error(f"Error resolving sites from pathQuery '{pathQuery}': {str(e)}") return [] def validatePathQuery(self, pathQuery: str) -> tuple[bool, Optional[str]]: """Validate pathQuery format. Returns (isValid, errorMessage).""" try: if not pathQuery or pathQuery.strip() == "" or pathQuery.strip() == "*": return False, "pathQuery cannot be empty or '*'" if not pathQuery.startswith('/'): return False, "pathQuery must start with '/' and include site name with Microsoft-standard syntax /sites//... e.g. /sites/company-share/Freigegebene Dokumente/Work" validPathPrefixes = ['/sites/', '/Documents', '/documents', '/Shared Documents', '/shared documents'] if not any(pathQuery.startswith(prefix) for prefix in validPathPrefixes): return False, f"Invalid pathQuery '{pathQuery}'. This appears to be search terms, not a valid SharePoint path. Use findDocumentPath action first to search for folders, then use the returned folder path as pathQuery." return True, None except Exception as e: logger.error(f"Error validating pathQuery '{pathQuery}': {str(e)}") return False, f"Error validating pathQuery: {str(e)}" def detectFolderType(self, item: Dict[str, Any]) -> bool: """Detect if an item is a folder using improved detection logic.""" try: if 'folder' in item: return True webUrl = item.get('webUrl', '') name = item.get('name', '') if '.' not in name and ('/' in webUrl or '\\' in webUrl): return True return False except Exception as e: logger.error(f"Error detecting folder type: {str(e)}") return False