From 1ff4248346c71411fd1dd351c8ef4bb6db03eb65 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Fri, 5 Sep 2025 08:54:22 +0200 Subject: [PATCH] method sharepoinjt fix site --- modules/methods/methodSharepoint.py | 407 +++++++++++++++++----------- 1 file changed, 254 insertions(+), 153 deletions(-) diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py index 19af4c1e..21decac0 100644 --- a/modules/methods/methodSharepoint.py +++ b/modules/methods/methodSharepoint.py @@ -503,102 +503,202 @@ class MethodSharepoint(MethodBase): found_documents = [] all_sites_searched = [] - # Use simple approach like test file - no complex filtering - site_scoped_sites = sites + # Handle different search approaches based on search type + if searchType == "folders" and fileQuery and fileQuery.strip() != "" and fileQuery.strip() != "*": + # Use unified search for folders - this is global and searches all sites + try: + import json + + # Use Microsoft Graph Search API syntax (simple term search only) + terms = [t for t in fileQuery.split() if t.strip()] + + if len(terms) > 1: + # Multiple terms: search for ALL terms (AND) - more specific results + query_string = " AND ".join(terms) + else: + # Single term: search for the term + query_string = terms[0] if terms else fileQuery + logger.info(f"Using unified search for folders: {query_string}") - for site in site_scoped_sites: - site_id = site["id"] - site_name = site["displayName"] - site_url = site["webUrl"] - - logger.info(f"Searching in site: {site_name} ({site_url})") - - # Use Microsoft Graph API for this specific site - # Handle empty or wildcard queries - if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*": - # For wildcard/empty queries, list all items in the drive - endpoint = f"sites/{site_id}/drive/root/children" - else: - # For specific queries, use different approaches based on search type - if searchType == "folders": - # Use Microsoft Graph unified search endpoint: POST /search/query - # This approach works reliably for finding folders - try: - import json - - # Use Microsoft Graph Search API syntax (simple term search only) - terms = [t for t in fileQuery.split() if t.strip()] - - if len(terms) > 1: - # Multiple terms: search for ALL terms (AND) - more specific results - query_string = " AND ".join(terms) - else: - # Single term: search for the term - query_string = terms[0] if terms else fileQuery - logger.info(f"Using search query for folders: {query_string}") - - payload = { - "requests": [ - { - "entityTypes": ["driveItem"], - "query": {"queryString": query_string}, - "from": 0, - "size": 50 - } - ] + payload = { + "requests": [ + { + "entityTypes": ["driveItem"], + "query": {"queryString": query_string}, + "from": 0, + "size": 50 } - logger.info(f"Using unified search API for folders with queryString: {query_string}") + ] + } + logger.info(f"Using unified search API for folders with queryString: {query_string}") + + # Use global search endpoint (site-specific search not available) + unified_result = await self._makeGraphApiCall( + connection["accessToken"], + "search/query", + method="POST", + data=json.dumps(payload).encode("utf-8") + ) + + if "error" in unified_result: + logger.warning(f"Unified search failed: {unified_result['error']}") + items = [] + else: + # Flatten hits -> driveItem resources + items = [] + for container in (unified_result.get("value", []) or []): + for hits_container in (container.get("hitsContainers", []) or []): + for hit in (hits_container.get("hits", []) or []): + resource = hit.get("resource") + if resource: + items.append(resource) + + logger.info(f"Unified search returned {len(items)} items (pre-filter)") + + # Apply our improved folder detection logic + folder_items = [] + for item in items: + resource = item - # Use global search endpoint (site-specific search not available) - unified_result = await self._makeGraphApiCall( - connection["accessToken"], - "search/query", - method="POST", - data=json.dumps(payload).encode("utf-8") - ) - - if "error" in unified_result: - logger.warning(f"Unified search failed for site {site_name}: {unified_result['error']}") - items = [] + # Use the same detection logic as our test + is_folder = False + if 'folder' in resource: + is_folder = True else: - # Flatten hits -> driveItem resources - items = [] - for container in (unified_result.get("value", []) or []): - for hits_container in (container.get("hitsContainers", []) or []): - for hit in (hits_container.get("hits", []) or []): - resource = hit.get("resource") - if resource: - items.append(resource) + # Try to detect by URL pattern or other indicators + web_url = resource.get('webUrl', '') + name = resource.get('name', '') - logger.info(f"Unified search returned {len(items)} items (pre-filter)") - - # Apply our improved folder detection logic - folder_items = [] - for item in items: - resource = item - - # Use the same detection logic as our test - is_folder = False - if 'folder' in resource: - is_folder = True - else: - # Try to detect by URL pattern or other indicators - web_url = resource.get('webUrl', '') - name = resource.get('name', '') - - # Check if URL has no file extension and looks like a folder path - if '.' not in name and ('/' in web_url or '\\' in web_url): - is_folder = True - - if is_folder: - folder_items.append(item) - - items = folder_items - logger.info(f"Filtered to {len(items)} folders using improved detection logic") + # Check if URL has no file extension and looks like a folder path + if '.' not in name and ('/' in web_url or '\\' in web_url): + is_folder = True - except Exception as e: - logger.error(f"Error performing unified folder search: {str(e)}") - items = [] + if is_folder: + folder_items.append(item) + + items = folder_items + logger.info(f"Filtered to {len(items)} folders using improved detection logic") + + # Process unified search results - extract site information from webUrl + for item in items: + item_name = item.get("name", "") + web_url = item.get("webUrl", "") + + # Extract site information from webUrl + site_name = "Unknown Site" + site_id = "unknown" + + if web_url and '/sites/' in web_url: + try: + # Extract site name from URL: https://pcuster.sharepoint.com/sites/SiteName/... + url_parts = web_url.split('/sites/') + if len(url_parts) > 1: + site_path = url_parts[1].split('/')[0] + # Find matching site from discovered sites + # First try to match by site name (URL path) + for site in sites: + if site.get("name") == site_path: + site_name = site.get("displayName", site_path) + site_id = site.get("id", "unknown") + break + else: + # If no match by name, try to match by displayName + for site in sites: + if site.get("displayName") == site_path: + site_name = site.get("displayName", site_path) + site_id = site.get("id", "unknown") + break + else: + # If no exact match, use the site path as site name + site_name = site_path + # Try to find a site with similar name + for site in sites: + if site_path.lower() in site.get("name", "").lower() or site_path.lower() in site.get("displayName", "").lower(): + site_name = site.get("displayName", site_path) + site_id = site.get("id", "unknown") + break + except Exception as e: + logger.warning(f"Error extracting site info from URL {web_url}: {e}") + + # Use improved folder detection logic + is_folder = False + if 'folder' in item: + is_folder = True + else: + # Try to detect by URL pattern or other indicators + name = item.get('name', '') + + # Check if URL has no file extension and looks like a folder path + if '.' not in name and ('/' in web_url or '\\' in web_url): + is_folder = True + + item_type = "folder" if is_folder else "file" + item_path = item.get("parentReference", {}).get("path", "") + logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'") + + # Simple filtering like test file - just check search type + if searchType == "files" and is_folder: + continue # Skip folders when searching for files + elif searchType == "folders" and not is_folder: + continue # Skip files when searching for folders + + # Simple approach like test file - no complex filtering + logger.debug(f"Item '{item_name}' found - adding to results") + + # Create result with full path information for proper action chaining + parent_path = item.get("parentReference", {}).get("path", "") + + # Extract the full SharePoint path from webUrl or parentReference + full_path = "" + if web_url: + # Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung + if '/sites/' in web_url: + path_part = web_url.split('/sites/')[1] + # Decode URL encoding and convert to backslash format + import urllib.parse + decoded_path = urllib.parse.unquote(path_part) + full_path = "\\" + decoded_path.replace('/', '\\') + elif parent_path: + # Use parentReference path if available + full_path = parent_path.replace('/', '\\') + + doc_info = { + "id": item.get("id"), + "name": item.get("name"), + "type": "folder" if is_folder else "file", + "siteName": site_name, + "siteId": site_id, + "webUrl": web_url, + "fullPath": full_path, + "parentPath": parent_path + } + + found_documents.append(doc_info) + + logger.info(f"Found {len(found_documents)} documents from unified search") + + except Exception as e: + logger.error(f"Error performing unified folder search: {str(e)}") + # Fallback to site-by-site search + pass + + # If no unified search was performed or it failed, fall back to site-by-site search + if not found_documents: + # Use simple approach like test file - no complex filtering + site_scoped_sites = sites + + for site in site_scoped_sites: + site_id = site["id"] + site_name = site["displayName"] + site_url = site["webUrl"] + + logger.info(f"Searching in site: {site_name} ({site_url})") + + # Use Microsoft Graph API for this specific site + # Handle empty or wildcard queries + if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*": + # For wildcard/empty queries, list all items in the drive + endpoint = f"sites/{site_id}/drive/root/children" else: # For files, use regular search API search_query = fileQuery.replace("'", "''") # Escape single quotes for OData @@ -613,77 +713,78 @@ class MethodSharepoint(MethodBase): # Process search results for this site (files) items = search_result.get("value", []) logger.info(f"Retrieved {len(items)} items from site {site_name}") - site_documents = [] - - for item in items: - item_name = item.get("name", "") - # Use improved folder detection logic - is_folder = False - if 'folder' in item: - is_folder = True - else: - # Try to detect by URL pattern or other indicators - web_url = item.get('webUrl', '') - name = item.get('name', '') + site_documents = [] + + for item in items: + item_name = item.get("name", "") - # Check if URL has no file extension and looks like a folder path - if '.' not in name and ('/' in web_url or '\\' in web_url): + # Use improved folder detection logic + is_folder = False + if 'folder' in item: is_folder = True - - item_type = "folder" if is_folder else "file" - item_path = item.get("parentReference", {}).get("path", "") - logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'") - - # Simple filtering like test file - just check search type - if searchType == "files" and is_folder: - continue # Skip folders when searching for files - elif searchType == "folders" and not is_folder: - continue # Skip files when searching for folders - - # Simple approach like test file - no complex filtering - logger.debug(f"Item '{item_name}' found - adding to results") + else: + # Try to detect by URL pattern or other indicators + web_url = item.get('webUrl', '') + name = item.get('name', '') + + # Check if URL has no file extension and looks like a folder path + if '.' not in name and ('/' in web_url or '\\' in web_url): + is_folder = True + + item_type = "folder" if is_folder else "file" + item_path = item.get("parentReference", {}).get("path", "") + logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'") + + # Simple filtering like test file - just check search type + if searchType == "files" and is_folder: + continue # Skip folders when searching for files + elif searchType == "folders" and not is_folder: + continue # Skip files when searching for folders + + # Simple approach like test file - no complex filtering + logger.debug(f"Item '{item_name}' found - adding to results") - # Create result with full path information for proper action chaining - web_url = item.get("webUrl", "") - parent_path = item.get("parentReference", {}).get("path", "") + # Create result with full path information for proper action chaining + web_url = item.get("webUrl", "") + parent_path = item.get("parentReference", {}).get("path", "") + + # Extract the full SharePoint path from webUrl or parentReference + full_path = "" + if web_url: + # Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung + if '/sites/' in web_url: + path_part = web_url.split('/sites/')[1] + # Decode URL encoding and convert to backslash format + import urllib.parse + decoded_path = urllib.parse.unquote(path_part) + full_path = "\\" + decoded_path.replace('/', '\\') + elif parent_path: + # Use parentReference path if available + full_path = parent_path.replace('/', '\\') + + doc_info = { + "id": item.get("id"), + "name": item.get("name"), + "type": "folder" if is_folder else "file", + "siteName": site_name, + "siteId": site_id, + "webUrl": web_url, + "fullPath": full_path, + "parentPath": parent_path + } + + site_documents.append(doc_info) - # Extract the full SharePoint path from webUrl or parentReference - full_path = "" - if web_url: - # Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung - if '/sites/' in web_url: - path_part = web_url.split('/sites/')[1] - # Decode URL encoding and convert to backslash format - import urllib.parse - decoded_path = urllib.parse.unquote(path_part) - full_path = "\\" + decoded_path.replace('/', '\\') - elif parent_path: - # Use parentReference path if available - full_path = parent_path.replace('/', '\\') - - doc_info = { - "id": item.get("id"), - "name": item.get("name"), - "type": "folder" if is_folder else "file", + found_documents.extend(site_documents) + all_sites_searched.append({ "siteName": site_name, + "siteUrl": site_url, "siteId": site_id, - "webUrl": web_url, - "fullPath": full_path, - "parentPath": parent_path - } + "documentsFound": len(site_documents) + }) - site_documents.append(doc_info) - - found_documents.extend(site_documents) - all_sites_searched.append({ - "siteName": site_name, - "siteUrl": site_url, - "siteId": site_id, - "documentsFound": len(site_documents) - }) - - logger.info(f"Found {len(site_documents)} documents in site {site_name}") + logger.info(f"Found {len(site_documents)} documents in site {site_name}") # Limit total results to maxResults if len(found_documents) > maxResults: