From 1ff4248346c71411fd1dd351c8ef4bb6db03eb65 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Fri, 5 Sep 2025 08:54:22 +0200
Subject: [PATCH] method sharepoinjt fix site
---
modules/methods/methodSharepoint.py | 407 +++++++++++++++++-----------
1 file changed, 254 insertions(+), 153 deletions(-)
diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py
index 19af4c1e..21decac0 100644
--- a/modules/methods/methodSharepoint.py
+++ b/modules/methods/methodSharepoint.py
@@ -503,102 +503,202 @@ class MethodSharepoint(MethodBase):
found_documents = []
all_sites_searched = []
- # Use simple approach like test file - no complex filtering
- site_scoped_sites = sites
+ # Handle different search approaches based on search type
+ if searchType == "folders" and fileQuery and fileQuery.strip() != "" and fileQuery.strip() != "*":
+ # Use unified search for folders - this is global and searches all sites
+ try:
+ import json
+
+ # Use Microsoft Graph Search API syntax (simple term search only)
+ terms = [t for t in fileQuery.split() if t.strip()]
+
+ if len(terms) > 1:
+ # Multiple terms: search for ALL terms (AND) - more specific results
+ query_string = " AND ".join(terms)
+ else:
+ # Single term: search for the term
+ query_string = terms[0] if terms else fileQuery
+ logger.info(f"Using unified search for folders: {query_string}")
- for site in site_scoped_sites:
- site_id = site["id"]
- site_name = site["displayName"]
- site_url = site["webUrl"]
-
- logger.info(f"Searching in site: {site_name} ({site_url})")
-
- # Use Microsoft Graph API for this specific site
- # Handle empty or wildcard queries
- if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*":
- # For wildcard/empty queries, list all items in the drive
- endpoint = f"sites/{site_id}/drive/root/children"
- else:
- # For specific queries, use different approaches based on search type
- if searchType == "folders":
- # Use Microsoft Graph unified search endpoint: POST /search/query
- # This approach works reliably for finding folders
- try:
- import json
-
- # Use Microsoft Graph Search API syntax (simple term search only)
- terms = [t for t in fileQuery.split() if t.strip()]
-
- if len(terms) > 1:
- # Multiple terms: search for ALL terms (AND) - more specific results
- query_string = " AND ".join(terms)
- else:
- # Single term: search for the term
- query_string = terms[0] if terms else fileQuery
- logger.info(f"Using search query for folders: {query_string}")
-
- payload = {
- "requests": [
- {
- "entityTypes": ["driveItem"],
- "query": {"queryString": query_string},
- "from": 0,
- "size": 50
- }
- ]
+ payload = {
+ "requests": [
+ {
+ "entityTypes": ["driveItem"],
+ "query": {"queryString": query_string},
+ "from": 0,
+ "size": 50
}
- logger.info(f"Using unified search API for folders with queryString: {query_string}")
+ ]
+ }
+ logger.info(f"Using unified search API for folders with queryString: {query_string}")
+
+ # Use global search endpoint (site-specific search not available)
+ unified_result = await self._makeGraphApiCall(
+ connection["accessToken"],
+ "search/query",
+ method="POST",
+ data=json.dumps(payload).encode("utf-8")
+ )
+
+ if "error" in unified_result:
+ logger.warning(f"Unified search failed: {unified_result['error']}")
+ items = []
+ else:
+ # Flatten hits -> driveItem resources
+ items = []
+ for container in (unified_result.get("value", []) or []):
+ for hits_container in (container.get("hitsContainers", []) or []):
+ for hit in (hits_container.get("hits", []) or []):
+ resource = hit.get("resource")
+ if resource:
+ items.append(resource)
+
+ logger.info(f"Unified search returned {len(items)} items (pre-filter)")
+
+ # Apply our improved folder detection logic
+ folder_items = []
+ for item in items:
+ resource = item
- # Use global search endpoint (site-specific search not available)
- unified_result = await self._makeGraphApiCall(
- connection["accessToken"],
- "search/query",
- method="POST",
- data=json.dumps(payload).encode("utf-8")
- )
-
- if "error" in unified_result:
- logger.warning(f"Unified search failed for site {site_name}: {unified_result['error']}")
- items = []
+ # Use the same detection logic as our test
+ is_folder = False
+ if 'folder' in resource:
+ is_folder = True
else:
- # Flatten hits -> driveItem resources
- items = []
- for container in (unified_result.get("value", []) or []):
- for hits_container in (container.get("hitsContainers", []) or []):
- for hit in (hits_container.get("hits", []) or []):
- resource = hit.get("resource")
- if resource:
- items.append(resource)
+ # Try to detect by URL pattern or other indicators
+ web_url = resource.get('webUrl', '')
+ name = resource.get('name', '')
- logger.info(f"Unified search returned {len(items)} items (pre-filter)")
-
- # Apply our improved folder detection logic
- folder_items = []
- for item in items:
- resource = item
-
- # Use the same detection logic as our test
- is_folder = False
- if 'folder' in resource:
- is_folder = True
- else:
- # Try to detect by URL pattern or other indicators
- web_url = resource.get('webUrl', '')
- name = resource.get('name', '')
-
- # Check if URL has no file extension and looks like a folder path
- if '.' not in name and ('/' in web_url or '\\' in web_url):
- is_folder = True
-
- if is_folder:
- folder_items.append(item)
-
- items = folder_items
- logger.info(f"Filtered to {len(items)} folders using improved detection logic")
+ # Check if URL has no file extension and looks like a folder path
+ if '.' not in name and ('/' in web_url or '\\' in web_url):
+ is_folder = True
- except Exception as e:
- logger.error(f"Error performing unified folder search: {str(e)}")
- items = []
+ if is_folder:
+ folder_items.append(item)
+
+ items = folder_items
+ logger.info(f"Filtered to {len(items)} folders using improved detection logic")
+
+ # Process unified search results - extract site information from webUrl
+ for item in items:
+ item_name = item.get("name", "")
+ web_url = item.get("webUrl", "")
+
+ # Extract site information from webUrl
+ site_name = "Unknown Site"
+ site_id = "unknown"
+
+ if web_url and '/sites/' in web_url:
+ try:
+ # Extract site name from URL: https://pcuster.sharepoint.com/sites/SiteName/...
+ url_parts = web_url.split('/sites/')
+ if len(url_parts) > 1:
+ site_path = url_parts[1].split('/')[0]
+ # Find matching site from discovered sites
+ # First try to match by site name (URL path)
+ for site in sites:
+ if site.get("name") == site_path:
+ site_name = site.get("displayName", site_path)
+ site_id = site.get("id", "unknown")
+ break
+ else:
+ # If no match by name, try to match by displayName
+ for site in sites:
+ if site.get("displayName") == site_path:
+ site_name = site.get("displayName", site_path)
+ site_id = site.get("id", "unknown")
+ break
+ else:
+ # If no exact match, use the site path as site name
+ site_name = site_path
+ # Try to find a site with similar name
+ for site in sites:
+ if site_path.lower() in site.get("name", "").lower() or site_path.lower() in site.get("displayName", "").lower():
+ site_name = site.get("displayName", site_path)
+ site_id = site.get("id", "unknown")
+ break
+ except Exception as e:
+ logger.warning(f"Error extracting site info from URL {web_url}: {e}")
+
+ # Use improved folder detection logic
+ is_folder = False
+ if 'folder' in item:
+ is_folder = True
+ else:
+ # Try to detect by URL pattern or other indicators
+ name = item.get('name', '')
+
+ # Check if URL has no file extension and looks like a folder path
+ if '.' not in name and ('/' in web_url or '\\' in web_url):
+ is_folder = True
+
+ item_type = "folder" if is_folder else "file"
+ item_path = item.get("parentReference", {}).get("path", "")
+ logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'")
+
+ # Simple filtering like test file - just check search type
+ if searchType == "files" and is_folder:
+ continue # Skip folders when searching for files
+ elif searchType == "folders" and not is_folder:
+ continue # Skip files when searching for folders
+
+ # Simple approach like test file - no complex filtering
+ logger.debug(f"Item '{item_name}' found - adding to results")
+
+ # Create result with full path information for proper action chaining
+ parent_path = item.get("parentReference", {}).get("path", "")
+
+ # Extract the full SharePoint path from webUrl or parentReference
+ full_path = ""
+ if web_url:
+ # Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung
+ if '/sites/' in web_url:
+ path_part = web_url.split('/sites/')[1]
+ # Decode URL encoding and convert to backslash format
+ import urllib.parse
+ decoded_path = urllib.parse.unquote(path_part)
+ full_path = "\\" + decoded_path.replace('/', '\\')
+ elif parent_path:
+ # Use parentReference path if available
+ full_path = parent_path.replace('/', '\\')
+
+ doc_info = {
+ "id": item.get("id"),
+ "name": item.get("name"),
+ "type": "folder" if is_folder else "file",
+ "siteName": site_name,
+ "siteId": site_id,
+ "webUrl": web_url,
+ "fullPath": full_path,
+ "parentPath": parent_path
+ }
+
+ found_documents.append(doc_info)
+
+ logger.info(f"Found {len(found_documents)} documents from unified search")
+
+ except Exception as e:
+ logger.error(f"Error performing unified folder search: {str(e)}")
+ # Fallback to site-by-site search
+ pass
+
+ # If no unified search was performed or it failed, fall back to site-by-site search
+ if not found_documents:
+ # Use simple approach like test file - no complex filtering
+ site_scoped_sites = sites
+
+ for site in site_scoped_sites:
+ site_id = site["id"]
+ site_name = site["displayName"]
+ site_url = site["webUrl"]
+
+ logger.info(f"Searching in site: {site_name} ({site_url})")
+
+ # Use Microsoft Graph API for this specific site
+ # Handle empty or wildcard queries
+ if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*":
+ # For wildcard/empty queries, list all items in the drive
+ endpoint = f"sites/{site_id}/drive/root/children"
else:
# For files, use regular search API
search_query = fileQuery.replace("'", "''") # Escape single quotes for OData
@@ -613,77 +713,78 @@ class MethodSharepoint(MethodBase):
# Process search results for this site (files)
items = search_result.get("value", [])
logger.info(f"Retrieved {len(items)} items from site {site_name}")
- site_documents = []
-
- for item in items:
- item_name = item.get("name", "")
- # Use improved folder detection logic
- is_folder = False
- if 'folder' in item:
- is_folder = True
- else:
- # Try to detect by URL pattern or other indicators
- web_url = item.get('webUrl', '')
- name = item.get('name', '')
+ site_documents = []
+
+ for item in items:
+ item_name = item.get("name", "")
- # Check if URL has no file extension and looks like a folder path
- if '.' not in name and ('/' in web_url or '\\' in web_url):
+ # Use improved folder detection logic
+ is_folder = False
+ if 'folder' in item:
is_folder = True
-
- item_type = "folder" if is_folder else "file"
- item_path = item.get("parentReference", {}).get("path", "")
- logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'")
-
- # Simple filtering like test file - just check search type
- if searchType == "files" and is_folder:
- continue # Skip folders when searching for files
- elif searchType == "folders" and not is_folder:
- continue # Skip files when searching for folders
-
- # Simple approach like test file - no complex filtering
- logger.debug(f"Item '{item_name}' found - adding to results")
+ else:
+ # Try to detect by URL pattern or other indicators
+ web_url = item.get('webUrl', '')
+ name = item.get('name', '')
+
+ # Check if URL has no file extension and looks like a folder path
+ if '.' not in name and ('/' in web_url or '\\' in web_url):
+ is_folder = True
+
+ item_type = "folder" if is_folder else "file"
+ item_path = item.get("parentReference", {}).get("path", "")
+ logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'")
+
+ # Simple filtering like test file - just check search type
+ if searchType == "files" and is_folder:
+ continue # Skip folders when searching for files
+ elif searchType == "folders" and not is_folder:
+ continue # Skip files when searching for folders
+
+ # Simple approach like test file - no complex filtering
+ logger.debug(f"Item '{item_name}' found - adding to results")
- # Create result with full path information for proper action chaining
- web_url = item.get("webUrl", "")
- parent_path = item.get("parentReference", {}).get("path", "")
+ # Create result with full path information for proper action chaining
+ web_url = item.get("webUrl", "")
+ parent_path = item.get("parentReference", {}).get("path", "")
+
+ # Extract the full SharePoint path from webUrl or parentReference
+ full_path = ""
+ if web_url:
+ # Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung
+ if '/sites/' in web_url:
+ path_part = web_url.split('/sites/')[1]
+ # Decode URL encoding and convert to backslash format
+ import urllib.parse
+ decoded_path = urllib.parse.unquote(path_part)
+ full_path = "\\" + decoded_path.replace('/', '\\')
+ elif parent_path:
+ # Use parentReference path if available
+ full_path = parent_path.replace('/', '\\')
+
+ doc_info = {
+ "id": item.get("id"),
+ "name": item.get("name"),
+ "type": "folder" if is_folder else "file",
+ "siteName": site_name,
+ "siteId": site_id,
+ "webUrl": web_url,
+ "fullPath": full_path,
+ "parentPath": parent_path
+ }
+
+ site_documents.append(doc_info)
- # Extract the full SharePoint path from webUrl or parentReference
- full_path = ""
- if web_url:
- # Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung
- if '/sites/' in web_url:
- path_part = web_url.split('/sites/')[1]
- # Decode URL encoding and convert to backslash format
- import urllib.parse
- decoded_path = urllib.parse.unquote(path_part)
- full_path = "\\" + decoded_path.replace('/', '\\')
- elif parent_path:
- # Use parentReference path if available
- full_path = parent_path.replace('/', '\\')
-
- doc_info = {
- "id": item.get("id"),
- "name": item.get("name"),
- "type": "folder" if is_folder else "file",
+ found_documents.extend(site_documents)
+ all_sites_searched.append({
"siteName": site_name,
+ "siteUrl": site_url,
"siteId": site_id,
- "webUrl": web_url,
- "fullPath": full_path,
- "parentPath": parent_path
- }
+ "documentsFound": len(site_documents)
+ })
- site_documents.append(doc_info)
-
- found_documents.extend(site_documents)
- all_sites_searched.append({
- "siteName": site_name,
- "siteUrl": site_url,
- "siteId": site_id,
- "documentsFound": len(site_documents)
- })
-
- logger.info(f"Found {len(site_documents)} documents in site {site_name}")
+ logger.info(f"Found {len(site_documents)} documents in site {site_name}")
# Limit total results to maxResults
if len(found_documents) > maxResults: