method sharepoinjt fix site

This commit is contained in:
ValueOn AG 2025-09-05 08:54:22 +02:00
parent a1ebcac588
commit 1ff4248346

View file

@ -503,102 +503,202 @@ class MethodSharepoint(MethodBase):
found_documents = [] found_documents = []
all_sites_searched = [] all_sites_searched = []
# Use simple approach like test file - no complex filtering # Handle different search approaches based on search type
site_scoped_sites = sites if searchType == "folders" and fileQuery and fileQuery.strip() != "" and fileQuery.strip() != "*":
# Use unified search for folders - this is global and searches all sites
try:
import json
for site in site_scoped_sites: # Use Microsoft Graph Search API syntax (simple term search only)
site_id = site["id"] terms = [t for t in fileQuery.split() if t.strip()]
site_name = site["displayName"]
site_url = site["webUrl"]
logger.info(f"Searching in site: {site_name} ({site_url})") if len(terms) > 1:
# Multiple terms: search for ALL terms (AND) - more specific results
query_string = " AND ".join(terms)
else:
# Single term: search for the term
query_string = terms[0] if terms else fileQuery
logger.info(f"Using unified search for folders: {query_string}")
# Use Microsoft Graph API for this specific site payload = {
# Handle empty or wildcard queries "requests": [
if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*": {
# For wildcard/empty queries, list all items in the drive "entityTypes": ["driveItem"],
endpoint = f"sites/{site_id}/drive/root/children" "query": {"queryString": query_string},
else: "from": 0,
# For specific queries, use different approaches based on search type "size": 50
if searchType == "folders":
# Use Microsoft Graph unified search endpoint: POST /search/query
# This approach works reliably for finding folders
try:
import json
# Use Microsoft Graph Search API syntax (simple term search only)
terms = [t for t in fileQuery.split() if t.strip()]
if len(terms) > 1:
# Multiple terms: search for ALL terms (AND) - more specific results
query_string = " AND ".join(terms)
else:
# Single term: search for the term
query_string = terms[0] if terms else fileQuery
logger.info(f"Using search query for folders: {query_string}")
payload = {
"requests": [
{
"entityTypes": ["driveItem"],
"query": {"queryString": query_string},
"from": 0,
"size": 50
}
]
} }
logger.info(f"Using unified search API for folders with queryString: {query_string}") ]
}
logger.info(f"Using unified search API for folders with queryString: {query_string}")
# Use global search endpoint (site-specific search not available) # Use global search endpoint (site-specific search not available)
unified_result = await self._makeGraphApiCall( unified_result = await self._makeGraphApiCall(
connection["accessToken"], connection["accessToken"],
"search/query", "search/query",
method="POST", method="POST",
data=json.dumps(payload).encode("utf-8") data=json.dumps(payload).encode("utf-8")
) )
if "error" in unified_result: if "error" in unified_result:
logger.warning(f"Unified search failed for site {site_name}: {unified_result['error']}") logger.warning(f"Unified search failed: {unified_result['error']}")
items = [] items = []
else:
# Flatten hits -> driveItem resources
items = []
for container in (unified_result.get("value", []) or []):
for hits_container in (container.get("hitsContainers", []) or []):
for hit in (hits_container.get("hits", []) or []):
resource = hit.get("resource")
if resource:
items.append(resource)
logger.info(f"Unified search returned {len(items)} items (pre-filter)")
# Apply our improved folder detection logic
folder_items = []
for item in items:
resource = item
# Use the same detection logic as our test
is_folder = False
if 'folder' in resource:
is_folder = True
else: else:
# Flatten hits -> driveItem resources # Try to detect by URL pattern or other indicators
items = [] web_url = resource.get('webUrl', '')
for container in (unified_result.get("value", []) or []): name = resource.get('name', '')
for hits_container in (container.get("hitsContainers", []) or []):
for hit in (hits_container.get("hits", []) or []):
resource = hit.get("resource")
if resource:
items.append(resource)
logger.info(f"Unified search returned {len(items)} items (pre-filter)") # Check if URL has no file extension and looks like a folder path
if '.' not in name and ('/' in web_url or '\\' in web_url):
is_folder = True
# Apply our improved folder detection logic if is_folder:
folder_items = [] folder_items.append(item)
for item in items:
resource = item
# Use the same detection logic as our test items = folder_items
is_folder = False logger.info(f"Filtered to {len(items)} folders using improved detection logic")
if 'folder' in resource:
is_folder = True
else:
# Try to detect by URL pattern or other indicators
web_url = resource.get('webUrl', '')
name = resource.get('name', '')
# Check if URL has no file extension and looks like a folder path # Process unified search results - extract site information from webUrl
if '.' not in name and ('/' in web_url or '\\' in web_url): for item in items:
is_folder = True item_name = item.get("name", "")
web_url = item.get("webUrl", "")
if is_folder: # Extract site information from webUrl
folder_items.append(item) site_name = "Unknown Site"
site_id = "unknown"
items = folder_items if web_url and '/sites/' in web_url:
logger.info(f"Filtered to {len(items)} folders using improved detection logic") try:
# Extract site name from URL: https://pcuster.sharepoint.com/sites/SiteName/...
url_parts = web_url.split('/sites/')
if len(url_parts) > 1:
site_path = url_parts[1].split('/')[0]
# Find matching site from discovered sites
# First try to match by site name (URL path)
for site in sites:
if site.get("name") == site_path:
site_name = site.get("displayName", site_path)
site_id = site.get("id", "unknown")
break
else:
# If no match by name, try to match by displayName
for site in sites:
if site.get("displayName") == site_path:
site_name = site.get("displayName", site_path)
site_id = site.get("id", "unknown")
break
else:
# If no exact match, use the site path as site name
site_name = site_path
# Try to find a site with similar name
for site in sites:
if site_path.lower() in site.get("name", "").lower() or site_path.lower() in site.get("displayName", "").lower():
site_name = site.get("displayName", site_path)
site_id = site.get("id", "unknown")
break
except Exception as e:
logger.warning(f"Error extracting site info from URL {web_url}: {e}")
except Exception as e: # Use improved folder detection logic
logger.error(f"Error performing unified folder search: {str(e)}") is_folder = False
items = [] if 'folder' in item:
is_folder = True
else:
# Try to detect by URL pattern or other indicators
name = item.get('name', '')
# Check if URL has no file extension and looks like a folder path
if '.' not in name and ('/' in web_url or '\\' in web_url):
is_folder = True
item_type = "folder" if is_folder else "file"
item_path = item.get("parentReference", {}).get("path", "")
logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'")
# Simple filtering like test file - just check search type
if searchType == "files" and is_folder:
continue # Skip folders when searching for files
elif searchType == "folders" and not is_folder:
continue # Skip files when searching for folders
# Simple approach like test file - no complex filtering
logger.debug(f"Item '{item_name}' found - adding to results")
# Create result with full path information for proper action chaining
parent_path = item.get("parentReference", {}).get("path", "")
# Extract the full SharePoint path from webUrl or parentReference
full_path = ""
if web_url:
# Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung
if '/sites/' in web_url:
path_part = web_url.split('/sites/')[1]
# Decode URL encoding and convert to backslash format
import urllib.parse
decoded_path = urllib.parse.unquote(path_part)
full_path = "\\" + decoded_path.replace('/', '\\')
elif parent_path:
# Use parentReference path if available
full_path = parent_path.replace('/', '\\')
doc_info = {
"id": item.get("id"),
"name": item.get("name"),
"type": "folder" if is_folder else "file",
"siteName": site_name,
"siteId": site_id,
"webUrl": web_url,
"fullPath": full_path,
"parentPath": parent_path
}
found_documents.append(doc_info)
logger.info(f"Found {len(found_documents)} documents from unified search")
except Exception as e:
logger.error(f"Error performing unified folder search: {str(e)}")
# Fallback to site-by-site search
pass
# If no unified search was performed or it failed, fall back to site-by-site search
if not found_documents:
# Use simple approach like test file - no complex filtering
site_scoped_sites = sites
for site in site_scoped_sites:
site_id = site["id"]
site_name = site["displayName"]
site_url = site["webUrl"]
logger.info(f"Searching in site: {site_name} ({site_url})")
# Use Microsoft Graph API for this specific site
# Handle empty or wildcard queries
if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*":
# For wildcard/empty queries, list all items in the drive
endpoint = f"sites/{site_id}/drive/root/children"
else: else:
# For files, use regular search API # For files, use regular search API
search_query = fileQuery.replace("'", "''") # Escape single quotes for OData search_query = fileQuery.replace("'", "''") # Escape single quotes for OData
@ -613,77 +713,78 @@ class MethodSharepoint(MethodBase):
# Process search results for this site (files) # Process search results for this site (files)
items = search_result.get("value", []) items = search_result.get("value", [])
logger.info(f"Retrieved {len(items)} items from site {site_name}") logger.info(f"Retrieved {len(items)} items from site {site_name}")
site_documents = []
for item in items: site_documents = []
item_name = item.get("name", "")
# Use improved folder detection logic for item in items:
is_folder = False item_name = item.get("name", "")
if 'folder' in item:
is_folder = True
else:
# Try to detect by URL pattern or other indicators
web_url = item.get('webUrl', '')
name = item.get('name', '')
# Check if URL has no file extension and looks like a folder path # Use improved folder detection logic
if '.' not in name and ('/' in web_url or '\\' in web_url): is_folder = False
if 'folder' in item:
is_folder = True is_folder = True
else:
# Try to detect by URL pattern or other indicators
web_url = item.get('webUrl', '')
name = item.get('name', '')
item_type = "folder" if is_folder else "file" # Check if URL has no file extension and looks like a folder path
item_path = item.get("parentReference", {}).get("path", "") if '.' not in name and ('/' in web_url or '\\' in web_url):
logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'") is_folder = True
# Simple filtering like test file - just check search type item_type = "folder" if is_folder else "file"
if searchType == "files" and is_folder: item_path = item.get("parentReference", {}).get("path", "")
continue # Skip folders when searching for files logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'")
elif searchType == "folders" and not is_folder:
continue # Skip files when searching for folders
# Simple approach like test file - no complex filtering # Simple filtering like test file - just check search type
logger.debug(f"Item '{item_name}' found - adding to results") if searchType == "files" and is_folder:
continue # Skip folders when searching for files
elif searchType == "folders" and not is_folder:
continue # Skip files when searching for folders
# Create result with full path information for proper action chaining # Simple approach like test file - no complex filtering
web_url = item.get("webUrl", "") logger.debug(f"Item '{item_name}' found - adding to results")
parent_path = item.get("parentReference", {}).get("path", "")
# Extract the full SharePoint path from webUrl or parentReference # Create result with full path information for proper action chaining
full_path = "" web_url = item.get("webUrl", "")
if web_url: parent_path = item.get("parentReference", {}).get("path", "")
# Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung
if '/sites/' in web_url:
path_part = web_url.split('/sites/')[1]
# Decode URL encoding and convert to backslash format
import urllib.parse
decoded_path = urllib.parse.unquote(path_part)
full_path = "\\" + decoded_path.replace('/', '\\')
elif parent_path:
# Use parentReference path if available
full_path = parent_path.replace('/', '\\')
doc_info = { # Extract the full SharePoint path from webUrl or parentReference
"id": item.get("id"), full_path = ""
"name": item.get("name"), if web_url:
"type": "folder" if is_folder else "file", # Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung
if '/sites/' in web_url:
path_part = web_url.split('/sites/')[1]
# Decode URL encoding and convert to backslash format
import urllib.parse
decoded_path = urllib.parse.unquote(path_part)
full_path = "\\" + decoded_path.replace('/', '\\')
elif parent_path:
# Use parentReference path if available
full_path = parent_path.replace('/', '\\')
doc_info = {
"id": item.get("id"),
"name": item.get("name"),
"type": "folder" if is_folder else "file",
"siteName": site_name,
"siteId": site_id,
"webUrl": web_url,
"fullPath": full_path,
"parentPath": parent_path
}
site_documents.append(doc_info)
found_documents.extend(site_documents)
all_sites_searched.append({
"siteName": site_name, "siteName": site_name,
"siteUrl": site_url,
"siteId": site_id, "siteId": site_id,
"webUrl": web_url, "documentsFound": len(site_documents)
"fullPath": full_path, })
"parentPath": parent_path
}
site_documents.append(doc_info) logger.info(f"Found {len(site_documents)} documents in site {site_name}")
found_documents.extend(site_documents)
all_sites_searched.append({
"siteName": site_name,
"siteUrl": site_url,
"siteId": site_id,
"documentsFound": len(site_documents)
})
logger.info(f"Found {len(site_documents)} documents in site {site_name}")
# Limit total results to maxResults # Limit total results to maxResults
if len(found_documents) > maxResults: if len(found_documents) > maxResults: