gateway/modules/methods/methodSharepoint.py
2025-09-04 16:46:56 +02:00

1519 lines
No EOL
83 KiB
Python

"""
SharePoint operations method module.
Handles SharePoint document operations using the SharePoint service.
"""
import logging
import json
import re
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
import base64
from urllib.parse import urlparse
import aiohttp
import asyncio
from modules.chat.methodBase import MethodBase, action
from modules.interfaces.interfaceChatModel import ActionResult
from modules.shared.timezoneUtils import get_utc_timestamp
logger = logging.getLogger(__name__)
class MethodSharepoint(MethodBase):
"""SharePoint operations methods."""
def __init__(self, service):
super().__init__(service)
self.name = "sharepoint"
self.description = "SharePoint operations methods"
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
def _getMicrosoftConnection(self, connectionReference: str) -> Optional[Dict[str, Any]]:
"""Get Microsoft connection from connection reference"""
try:
userConnection = self.service.getUserConnectionFromConnectionReference(connectionReference)
if not userConnection:
logger.warning(f"No user connection found for reference: {connectionReference}")
return None
if userConnection.authority.value != "msft":
logger.warning(f"Connection {userConnection.id} is not Microsoft (authority: {userConnection.authority.value})")
return None
# Check if connection is active or pending (pending means OAuth in progress)
if userConnection.status.value not in ["active", "pending"]:
logger.warning(f"Connection {userConnection.id} status is not active/pending: {userConnection.status.value}")
return None
# Get the token for this specific connection
token = self.service.interfaceApp.getConnectionToken(userConnection.id)
if not token:
logger.warning(f"No token found for connection {userConnection.id}")
return None
# Check if token is expired
if hasattr(token, 'expiresAt') and token.expiresAt:
current_time = get_utc_timestamp()
if current_time > token.expiresAt:
logger.warning(f"Token for connection {userConnection.id} is expired (expiresAt: {token.expiresAt}, current: {current_time})")
return None
logger.info(f"Successfully retrieved Microsoft connection: {userConnection.id}, status: {userConnection.status.value}, externalId: {userConnection.externalId}")
return {
"id": userConnection.id,
"userConnection": userConnection,
"accessToken": token.tokenAccess,
"refreshToken": token.tokenRefresh,
"scopes": ["Sites.ReadWrite.All", "Files.ReadWrite.All", "User.Read"] # SharePoint scopes
}
except Exception as e:
logger.error(f"Error getting Microsoft connection: {str(e)}")
return None
async def _discoverSharePointSites(self, access_token: str) -> List[Dict[str, Any]]:
"""
Discover all SharePoint sites accessible to the user via Microsoft Graph API
Parameters:
access_token (str): Microsoft Graph access token
Returns:
List[Dict[str, Any]]: List of SharePoint site information
"""
try:
# Query Microsoft Graph to get all sites the user has access to
endpoint = "sites?search=*"
result = await self._makeGraphApiCall(access_token, endpoint)
if "error" in result:
logger.error(f"Error discovering SharePoint sites: {result['error']}")
return []
sites = result.get("value", [])
logger.info(f"Discovered {len(sites)} SharePoint sites")
# Process and return site information
processed_sites = []
for site in sites:
site_info = {
"id": site.get("id"),
"displayName": site.get("displayName"),
"name": site.get("name"),
"webUrl": site.get("webUrl"),
"description": site.get("description"),
"createdDateTime": site.get("createdDateTime"),
"lastModifiedDateTime": site.get("lastModifiedDateTime")
}
processed_sites.append(site_info)
logger.debug(f"Site: {site_info['displayName']} - {site_info['webUrl']}")
return processed_sites
except Exception as e:
logger.error(f"Error discovering SharePoint sites: {str(e)}")
return []
def _filter_sites_by_hint(self, sites: List[Dict[str, Any]], site_hint: str) -> List[Dict[str, Any]]:
"""Filter discovered sites by a human-entered site hint (case-insensitive substring)."""
try:
if not site_hint:
return sites
hint = site_hint.strip().lower()
filtered: List[Dict[str, Any]] = []
for site in sites:
name = (site.get("displayName") or "").lower()
web_url = (site.get("webUrl") or "").lower()
if hint in name or hint in web_url:
filtered.append(site)
return filtered if filtered else sites
except Exception as e:
logger.error(f"Error filtering sites by hint '{site_hint}': {str(e)}")
return sites
def _parse_site_scoped_path(self, path_query: str) -> Optional[Dict[str, str]]:
"""
Parse a site-scoped path of the form:
/site:KM LayerFinance/Documents/Work or /site:KM LayerFinance/Shared Documents/Work
Returns dict with keys: siteName, innerPath (no leading slash) on success, else None.
"""
try:
if not path_query or not path_query.startswith('/'):
return None
# expected syntax prefix
prefix = '/site:'
if not path_query.startswith(prefix):
return None
remainder = path_query[len(prefix):]
# split once on the next '/'
if '/' not in remainder:
return None
site_name, inner = remainder.split('/', 1)
site_name = site_name.strip()
inner_path = inner.strip()
if not site_name or not inner_path:
return None
return {"siteName": site_name, "innerPath": inner_path}
except Exception as e:
logger.error(f"Error parsing site-scoped path '{path_query}': {str(e)}")
return None
def _parseSearchQuery(self, searchQuery: str) -> tuple[str, str, str, dict]:
"""
Parse searchQuery to extract path, search terms, search type, and search options.
Parameters:
searchQuery (str): Enhanced search query with options:
- "budget" -> pathQuery="*", fileQuery="budget", searchType="all", options={}
- "/Documents:budget" -> pathQuery="/Documents", fileQuery="budget", searchType="all", options={}
- "files:budget" -> pathQuery="*", fileQuery="budget", searchType="files", options={}
- "folders:DELTA" -> pathQuery="*", fileQuery="DELTA", searchType="folders", options={}
- "exact:\"Operations 2025\"" -> exact phrase matching
- "regex:^Operations.*2025$" -> regex pattern matching
- "case:DELTA" -> case-sensitive search
- "and:DELTA AND 2025 Mars AND Group" -> all AND terms must be present
Returns:
tuple[str, str, str, dict]: (pathQuery, fileQuery, searchType, searchOptions)
"""
try:
if not searchQuery or not searchQuery.strip() or searchQuery.strip() == "*":
return "*", "*", "all", {}
searchQuery = searchQuery.strip()
searchOptions = {}
# Check for search type specification (files:, folders:, all:) FIRST
searchType = "all" # Default
if searchQuery.startswith(("files:", "folders:", "all:")):
type_parts = searchQuery.split(':', 1)
searchType = type_parts[0].strip()
searchQuery = type_parts[1].strip()
# Extract optional site hint tokens: support "site=Name" or leading "site:Name"
def _extract_site_hint(q: str) -> tuple[str, Optional[str]]:
try:
q_strip = q.strip()
# Leading form: site:KM LayerFinance ...
if q_strip.lower().startswith("site:"):
after = q_strip[5:].lstrip()
# site name until next space or end
if ' ' in after:
site_name, rest = after.split(' ', 1)
else:
site_name, rest = after, ''
return rest.strip(), site_name.strip()
# Inline key=value form anywhere
m = re.search(r"\bsite=([^;\s]+)", q_strip, flags=re.IGNORECASE)
if m:
site_name = m.group(1).strip()
# remove the token from query
q_new = re.sub(r"\bsite=[^;\s]+;?", "", q_strip, flags=re.IGNORECASE).strip()
return q_new, site_name
except Exception:
pass
return q, None
searchQuery, extracted_site = _extract_site_hint(searchQuery)
if extracted_site:
searchOptions["site_hint"] = extracted_site
logger.info(f"Extracted site hint: '{extracted_site}'")
# Extract name="..." if present (for quoted multi-word names)
name_match = re.search(r"name=\"([^\"]+)\"", searchQuery)
if name_match:
searchQuery = name_match.group(1)
logger.info(f"Extracted name from quotes: '{searchQuery}'")
# Check for search mode specification (exact:, regex:, case:, and:)
if searchQuery.startswith(("exact:", "regex:", "case:", "and:")):
mode_parts = searchQuery.split(':', 1)
mode = mode_parts[0].strip()
searchQuery = mode_parts[1].strip()
if mode == "exact":
searchOptions["exact_match"] = True
# Remove quotes if present
if searchQuery.startswith('"') and searchQuery.endswith('"'):
searchQuery = searchQuery[1:-1]
elif mode == "regex":
searchOptions["regex_match"] = True
elif mode == "case":
searchOptions["case_sensitive"] = True
elif mode == "and":
searchOptions["and_terms"] = True
# Check if it contains path:search format
if ':' in searchQuery:
parts = searchQuery.split(':', 1) # Split only on first colon
path_part = parts[0].strip()
search_part = parts[1].strip()
# Handle path part
if not path_part or path_part == "*":
pathQuery = "*"
elif path_part.startswith('/'):
pathQuery = path_part
else:
pathQuery = f"/Documents/{path_part}"
# Handle search part
if not search_part or search_part == "*":
fileQuery = "*"
else:
fileQuery = search_part
# Use search_part as fileQuery (name extraction already handled above)
return pathQuery, fileQuery, searchType, searchOptions
# No colon - check if it looks like a path
elif searchQuery.startswith('/'):
# It's a path only
return searchQuery, "*", searchType, searchOptions
else:
# It's a search term only
return "*", searchQuery, searchType, searchOptions
except Exception as e:
logger.error(f"Error parsing searchQuery '{searchQuery}': {str(e)}")
return "*", "*", "all", {}
def _resolvePathQuery(self, pathQuery: str) -> List[str]:
"""
Resolve pathQuery into a list of search paths for SharePoint operations.
Parameters:
pathQuery (str): Query string that can contain:
- Direct paths (e.g., "/Documents/Project1")
- Wildcards (e.g., "/Documents/*")
- Multiple paths separated by semicolons (e.g., "/Docs; /Files")
- Relative paths (e.g., "Project1" -> resolved to default folder)
- Empty string or "*" for global search
Returns:
List[str]: List of resolved paths
"""
try:
if not pathQuery or not pathQuery.strip() or pathQuery.strip() == "*":
return ["*"] # Global search across all sites
# Split by semicolon to handle multiple paths
raw_paths = [path.strip() for path in pathQuery.split(';') if path.strip()]
resolved_paths = []
for raw_path in raw_paths:
# Handle wildcards - return as-is
if '*' in raw_path:
resolved_paths.append(raw_path)
# Handle absolute paths
elif raw_path.startswith('/'):
resolved_paths.append(raw_path)
# Handle relative paths - prepend default folder
else:
resolved_paths.append(f"/Documents/{raw_path}")
# Remove duplicates while preserving order
seen = set()
unique_paths = []
for path in resolved_paths:
if path not in seen:
seen.add(path)
unique_paths.append(path)
logger.info(f"Resolved pathQuery '{pathQuery}' to {len(unique_paths)} paths: {unique_paths}")
return unique_paths
except Exception as e:
logger.error(f"Error resolving pathQuery '{pathQuery}': {str(e)}")
return ["*"] # Fallback to global search
def _parseSiteUrl(self, siteUrl: str) -> Dict[str, str]:
"""Parse SharePoint site URL to extract hostname and site path"""
try:
parsed = urlparse(siteUrl)
hostname = parsed.hostname
path = parsed.path.strip('/')
return {
"hostname": hostname,
"sitePath": path
}
except Exception as e:
logger.error(f"Error parsing site URL {siteUrl}: {str(e)}")
return {"hostname": "", "sitePath": ""}
async def _makeGraphApiCall(self, access_token: str, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]:
"""Make a Microsoft Graph API call with timeout and detailed logging"""
try:
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json"
}
url = f"https://graph.microsoft.com/v1.0/{endpoint}"
logger.info(f"Making Graph API call: {method} {url}")
# Set timeout to 30 seconds
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
if method == "GET":
logger.debug(f"Starting GET request to {url}")
async with session.get(url, headers=headers) as response:
logger.info(f"Graph API response: {response.status}")
if response.status == 200:
result = await response.json()
logger.debug(f"Graph API success: {len(str(result))} characters response")
return result
else:
error_text = await response.text()
logger.error(f"Graph API call failed: {response.status} - {error_text}")
return {"error": f"API call failed: {response.status} - {error_text}"}
elif method == "PUT":
logger.debug(f"Starting PUT request to {url}")
async with session.put(url, headers=headers, data=data) as response:
logger.info(f"Graph API response: {response.status}")
if response.status in [200, 201]:
result = await response.json()
logger.debug(f"Graph API success: {len(str(result))} characters response")
return result
else:
error_text = await response.text()
logger.error(f"Graph API call failed: {response.status} - {error_text}")
return {"error": f"API call failed: {response.status} - {error_text}"}
elif method == "POST":
logger.debug(f"Starting POST request to {url}")
async with session.post(url, headers=headers, data=data) as response:
logger.info(f"Graph API response: {response.status}")
if response.status in [200, 201]:
result = await response.json()
logger.debug(f"Graph API success: {len(str(result))} characters response")
return result
else:
error_text = await response.text()
logger.error(f"Graph API call failed: {response.status} - {error_text}")
return {"error": f"API call failed: {response.status} - {error_text}"}
except asyncio.TimeoutError:
logger.error(f"Graph API call timed out after 30 seconds: {endpoint}")
return {"error": f"API call timed out after 30 seconds: {endpoint}"}
except Exception as e:
logger.error(f"Error making Graph API call: {str(e)}")
return {"error": f"Error making Graph API call: {str(e)}"}
async def _getSiteId(self, access_token: str, hostname: str, site_path: str) -> str:
"""Get SharePoint site ID from hostname and site path"""
try:
endpoint = f"sites/{hostname}:/{site_path}"
result = await self._makeGraphApiCall(access_token, endpoint)
if "error" in result:
logger.error(f"Error getting site ID: {result['error']}")
return ""
return result.get("id", "")
except Exception as e:
logger.error(f"Error getting site ID: {str(e)}")
return ""
@action
async def findDocumentPath(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Find documents by searching their content, names, or metadata across all accessible SharePoint sites
Parameters:
connectionReference (str): Reference to the Microsoft connection
site (str, optional): SharePoint site name or hint to search within (e.g., "SSS", "KM XYZ"). If not provided, searches all accessible sites
searchQuery (str): [path:][type:][mode:]query - Enhanced search syntax:
- "budget", "/Documents:budget", "files:budget", "folders:DELTA", "*"
- "exact:\"Operations 2025\"" - exact phrase matching
- "regex:^Operations.*2025$" - regex pattern matching
- "case:DELTA" - case-sensitive search
- "and:DELTA AND 2025 Mars AND Group" - all terms must be present
- "folders:and:DELTA AND 2025 Mars AND Group" - combined options
- Site hint support: "folders:site=KM LayerFinance;name=Work" or "folders:site:KM LayerFinance Work"
- For quoted names: "folders:site=KM;name=\"page staten\""
- For folder search: words like "part1 part2" will search for folders containing BOTH terms
Note: For storage locations, use "folders:" prefix. When searching for folders, multiple words are treated as search terms that must all appear in the folder name or path.
Site hints help narrow search to specific SharePoint sites for better accuracy.
resultDocument (str, optional): JSON result document from previous findDocumentPath action to refine search
searchScope (str, optional): Search scope - options: "all" (default), "documents" (files only), "pages" (SharePoint pages only)
maxResults (int, optional): Maximum number of results to return (default: 100)
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
connectionReference = parameters.get("connectionReference")
site = parameters.get("site")
searchQuery = parameters.get("searchQuery", "*")
resultDocument = parameters.get("resultDocument")
searchScope = parameters.get("searchScope", "all")
maxResults = parameters.get("maxResults", 100)
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
if not connectionReference:
return ActionResult.isFailure(error="Connection reference is required")
# If resultDocument is provided, extract site information to refine search
if resultDocument:
try:
import json
# Resolve the reference label to get the actual document list
document_list = self.service.getChatDocumentsFromDocumentList([resultDocument])
if not document_list or len(document_list) == 0:
return ActionResult.isFailure(error=f"No document list found for reference: {resultDocument}")
# Get the first document's content (which should be the JSON)
first_document = document_list[0]
file_data = self.service.getFileData(first_document.fileId)
if not file_data:
return ActionResult.isFailure(error=f"No file data found for document: {resultDocument}")
# Parse the JSON content
result_data = json.loads(file_data)
found_documents = result_data.get("foundDocuments", [])
# Extract site information from the result for context
if found_documents:
# Use the site information from the previous search to refine current search
# This could be used to limit search to specific sites or add context
logger.info(f"Refining search using {len(found_documents)} documents from previous result")
except json.JSONDecodeError as e:
return ActionResult.isFailure(error=f"Invalid JSON in resultDocument: {str(e)}")
except Exception as e:
return ActionResult.isFailure(error=f"Error resolving resultDocument reference: {str(e)}")
# Parse searchQuery to extract path, search terms, search type, and options
pathQuery, fileQuery, searchType, searchOptions = self._parseSearchQuery(searchQuery)
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Discover all SharePoint sites accessible to the user
sites = await self._discoverSharePointSites(connection["accessToken"])
if not sites:
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
# Filter sites by site parameter if provided
if site:
sites = self._filter_sites_by_hint(sites, site)
logger.info(f"Filtered sites by site parameter: '{site}' -> {len(sites)} sites")
if not sites:
return ActionResult.isFailure(error=f"No SharePoint sites found matching '{site}'")
# Resolve path query into search paths
search_paths = self._resolvePathQuery(pathQuery)
try:
# Search across all discovered sites
found_documents = []
all_sites_searched = []
# Apply site hint filtering if provided in search options
site_scoped_sites = sites
strict_folder_name: Optional[str] = None
# First check for explicit site hint in search options
if searchOptions.get("site_hint"):
site_scoped_sites = self._filter_sites_by_hint(sites, searchOptions["site_hint"])
logger.info(f"Filtered sites by explicit site hint: '{searchOptions['site_hint']}' -> {len(site_scoped_sites)} sites")
# Heuristic: if user searched for folders with pattern "<siteHint> <folderName>",
# prefer filtering sites by the first token(s) and match folder name exactly for the last token
elif searchType == "folders" and fileQuery and ' ' in fileQuery and not searchOptions.get("regex_match"):
# treat last token as folder name, preceding tokens combined as site hint
tokens = [t for t in fileQuery.split(' ') if t]
if len(tokens) >= 2:
strict_folder_name = tokens[-1]
site_hint = ' '.join(tokens[:-1])
site_scoped_sites = self._filter_sites_by_hint(sites, site_hint)
logger.info(f"Filtered sites by heuristic site hint: '{site_hint}' -> {len(site_scoped_sites)} sites")
for site in site_scoped_sites:
site_id = site["id"]
site_name = site["displayName"]
site_url = site["webUrl"]
logger.info(f"Searching in site: {site_name} ({site_url})")
# Use Microsoft Graph API for this specific site
# Handle empty or wildcard queries
if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*":
# For wildcard/empty queries, list all items in the drive
endpoint = f"sites/{site_id}/drive/root/children"
else:
# For specific queries, use different approaches based on search type
if searchType == "folders":
# Use Microsoft Graph unified search endpoint: POST /search/query
# Scope by all drives in the site (e.g., Shared Documents, Documents, language variants)
try:
import json
# Discover drives for the site to build precise path scopes
drives_resp = await self._makeGraphApiCall(connection["accessToken"], f"sites/{site_id}/drives")
path_filters = []
if not ("error" in drives_resp):
for drv in (drives_resp.get("value", []) or []):
web_url = (drv.get("webUrl") or "").rstrip('/') + '/'
if web_url:
# path:"<drive webUrl>/"
path_filters.append(f"path:\"{web_url}\"")
if not path_filters:
# fallback to site root if no drives found
scoped_path = site_url.rstrip('/') + '/'
path_filters = [f"path:\"{scoped_path}\""]
# Use KQL syntax for folder search
terms = [t for t in fileQuery.split() if t.strip()]
if len(terms) > 1:
# Multiple terms: first search for folders containing ANY of the terms (OR)
# This broadens the search to catch all potential matches
name_terms = " OR ".join([f"foldername:*{t}*" for t in terms])
name_filter = f"({name_terms})"
else:
# Single term: search for folders containing the term
single_term = terms[0] if terms else fileQuery
name_filter = f"foldername:*{single_term}*"
# Use KQL syntax with isFolder:true
query_string = f"isFolder:true AND {name_filter}"
logger.info(f"Using KQL query: {query_string}")
payload = {
"requests": [
{
"entityTypes": ["driveItem"],
"query": {"queryString": query_string},
"from": 0,
"size": 50
}
]
}
logger.info(f"Using unified search API for folders with queryString: {query_string}")
logger.info(f"Payload: {json.dumps(payload, indent=2)}")
unified_result = await self._makeGraphApiCall(
connection["accessToken"],
"search/query",
method="POST",
data=json.dumps(payload).encode("utf-8")
)
logger.info(f"Unified search response: {json.dumps(unified_result, indent=2)}")
if "error" in unified_result:
logger.warning(f"Unified search failed for site {site_name}: {unified_result['error']}")
items = []
else:
# Flatten hits -> driveItem resources
items = []
for container in (unified_result.get("value", []) or []):
for hits_container in (container.get("hitsContainers", []) or []):
for hit in (hits_container.get("hits", []) or []):
resource = hit.get("resource")
if resource:
items.append(resource)
logger.info(f"Unified search returned {len(items)} items (pre-filter)")
# Post-filter: For multiple terms, filter results to only include folders that contain ALL terms
if len(terms) > 1:
filtered_items = []
for item in items:
folder_name = item.get("name", "").lower()
# Check if folder name contains ALL search terms
if all(term.lower() in folder_name for term in terms):
filtered_items.append(item)
items = filtered_items
logger.info(f"Post-filtered to {len(items)} items containing all terms: {terms}")
except Exception as e:
logger.error(f"Error performing unified folder search: {str(e)}")
items = []
else:
# For files, use regular search API
search_query = fileQuery.replace("'", "''") # Escape single quotes for OData
endpoint = f"sites/{site_id}/drive/root/search(q='{search_query}')"
logger.info(f"Using search API for files with query: '{search_query}'")
# Make the search API call (files)
search_result = await self._makeGraphApiCall(connection["accessToken"], endpoint)
if "error" in search_result:
logger.warning(f"Search failed for site {site_name}: {search_result['error']}")
continue
# Process search results for this site (files)
items = search_result.get("value", [])
logger.info(f"Retrieved {len(items)} items from site {site_name}")
site_documents = []
for item in items:
item_name = item.get("name", "")
item_type = "folder" if "folder" in item else "file"
item_path = item.get("parentReference", {}).get("path", "")
logger.debug(f"Processing {item_type}: '{item_name}' at path: '{item_path}'")
# Filter by search scope if specified
if searchScope == "documents" and "folder" in item:
logger.debug(f"Skipping folder '{item_name}' due to documents scope")
continue
elif searchScope == "pages" and "file" in item and not item["file"].get("mimeType", "").startswith("text/html"):
logger.debug(f"Skipping file '{item_name}' due to pages scope")
continue
# Filter by search type (files, folders, all)
if searchType == "files" and "folder" in item:
logger.debug(f"Skipping folder '{item_name}' due to files search type")
continue
elif searchType == "folders" and "file" in item:
logger.debug(f"Skipping file '{item_name}' due to folders search type")
continue
# Enhanced post-filtering based on search options
if fileQuery != "*" and fileQuery.strip() and searchType != "folders":
# For non-folder searches, apply name filtering
# (Folder searches are already filtered by the recursive search)
search_target = item_name
# Apply different filtering based on search options
if searchOptions.get("exact_match"):
# Exact phrase matching
if searchOptions.get("case_sensitive"):
if fileQuery not in search_target:
continue
else:
if fileQuery.lower() not in search_target.lower():
continue
elif searchOptions.get("regex_match"):
# Regex pattern matching
import re
flags = 0 if searchOptions.get("case_sensitive") else re.IGNORECASE
if not re.search(fileQuery, search_target, flags):
continue
elif searchOptions.get("and_terms"):
# AND terms mode: Split by " AND " and ensure ALL terms are present
search_name = search_target.lower() if not searchOptions.get("case_sensitive") else search_target
and_terms = [term.strip() for term in fileQuery.split(" AND ") if term.strip()]
and_terms = [term.lower() if not searchOptions.get("case_sensitive") else term for term in and_terms]
if not all(term in search_name for term in and_terms):
continue # Skip this item if not all AND terms match
else:
# Default: ALL search terms must be present (space-separated)
search_name = search_target.lower() if not searchOptions.get("case_sensitive") else search_target
search_terms = [term.strip().lower() if not searchOptions.get("case_sensitive") else term.strip()
for term in fileQuery.split() if term.strip()]
if not all(term in search_name for term in search_terms):
continue # Skip this item if not all terms match
# If strict folder name requested, enforce exact (case-insensitive) match on folders
if strict_folder_name:
item_is_folder = "folder" in item
item_name_ci = (item.get("name") or "").strip().lower()
if item_is_folder and item_name_ci != strict_folder_name.lower():
logger.debug(f"Skipping folder '{item_name}' - doesn't match strict name '{strict_folder_name}'")
continue
logger.debug(f"Item '{item_name}' passed all filters - adding to results")
# Create minimal result with only essential reference information
doc_info = {
"id": item.get("id"),
"name": item.get("name"),
"type": "folder" if "folder" in item else "file",
"siteName": site_name,
"siteId": site_id
}
site_documents.append(doc_info)
found_documents.extend(site_documents)
all_sites_searched.append({
"siteName": site_name,
"siteUrl": site_url,
"siteId": site_id,
"documentsFound": len(site_documents)
})
logger.info(f"Found {len(site_documents)} documents in site {site_name}")
# Limit total results to maxResults
if len(found_documents) > maxResults:
found_documents = found_documents[:maxResults]
logger.info(f"Limited results to {maxResults} items")
result_data = {
"searchQuery": searchQuery,
"totalResults": len(found_documents),
"maxResults": maxResults,
"foundDocuments": found_documents,
"timestamp": get_utc_timestamp()
}
except Exception as e:
logger.error(f"Error searching SharePoint: {str(e)}")
return ActionResult.isFailure(error=str(e))
# Determine output format based on expected formats
output_extension = ".json" # Default
output_mime_type = "application/json" # Default
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# Use the first expected format
expected_format = expectedDocumentFormats[0]
output_extension = expected_format.get("extension", ".json")
output_mime_type = expected_format.get("mimeType", "application/json")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
else:
logger.info("No expected format specified, using default .json format")
return ActionResult(
success=True,
documents=[
{
"documentName": f"sharepoint_find_path_{self._format_timestamp_for_filename()}{output_extension}",
"documentData": result_data,
"mimeType": output_mime_type
}
]
)
except Exception as e:
logger.error(f"Error finding document path: {str(e)}")
return ActionResult.isFailure(error=str(e))
@action
async def readDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Read documents from SharePoint across all accessible sites
Parameters:
documentList (str): Reference to the document list to read
connectionReference (str): Reference to the Microsoft connection
pathQuery (str): Path query to locate documents (e.g., "/Documents/Project1", "*" for all sites)
resultDocument (str, optional): JSON result document from findDocumentPath action (alternative to pathQuery)
includeMetadata (bool, optional): Whether to include metadata (default: True)
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
documentList = parameters.get("documentList")
connectionReference = parameters.get("connectionReference")
pathQuery = parameters.get("pathQuery", "*")
resultDocument = parameters.get("resultDocument")
includeMetadata = parameters.get("includeMetadata", True)
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
if not documentList or not connectionReference:
return ActionResult.isFailure(error="Document list reference and connection reference are required")
# If resultDocument is provided, extract folder IDs from it
if resultDocument:
try:
import json
# Resolve the reference label to get the actual document list
document_list = self.service.getChatDocumentsFromDocumentList([resultDocument])
if not document_list or len(document_list) == 0:
return ActionResult.isFailure(error=f"No document list found for reference: {resultDocument}")
# Get the first document's content (which should be the JSON)
first_document = document_list[0]
file_data = self.service.getFileData(first_document.fileId)
if not file_data:
return ActionResult.isFailure(error=f"No file data found for document: {resultDocument}")
# Parse the JSON content
result_data = json.loads(file_data)
found_documents = result_data.get("foundDocuments", [])
# Extract folder IDs from the result
folder_ids = []
for doc in found_documents:
if doc.get("type") == "folder":
folder_ids.append(doc.get("id"))
if folder_ids:
# Use the first folder ID found as pathQuery
pathQuery = folder_ids[0]
logger.info(f"Using folder ID from resultDocument: {pathQuery}")
else:
return ActionResult.isFailure(error="No folders found in resultDocument")
except json.JSONDecodeError as e:
return ActionResult.isFailure(error=f"Invalid JSON in resultDocument: {str(e)}")
except Exception as e:
return ActionResult.isFailure(error=f"Error resolving resultDocument reference: {str(e)}")
# Get documents from reference - ensure documentList is a list, not a string
if isinstance(documentList, str):
documentList = [documentList] # Convert string to list
chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(error="No documents found for the provided reference")
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Discover all SharePoint sites accessible to the user
sites = await self._discoverSharePointSites(connection["accessToken"])
if not sites:
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
# Resolve path query into search paths
search_paths = self._resolvePathQuery(pathQuery)
# Process each chat document across all sites
read_results = []
for i, chatDocument in enumerate(chatDocuments):
try:
fileId = chatDocument.fileId
fileName = chatDocument.fileName
# Search for this file across all sites
file_found = False
for site in sites:
site_id = site["id"]
site_name = site["displayName"]
site_url = site["webUrl"]
# Try to find the file by name in this site
search_query = fileName.replace("'", "''") # Escape single quotes for OData
endpoint = f"sites/{site_id}/drive/root/search(q='{search_query}')"
search_result = await self._makeGraphApiCall(connection["accessToken"], endpoint)
if "error" in search_result:
continue
items = search_result.get("value", [])
for item in items:
if item.get("name") == fileName:
# Found the file, get its details
file_id = item.get("id")
file_endpoint = f"sites/{site_id}/drive/items/{file_id}"
# Get file metadata
file_info_result = await self._makeGraphApiCall(connection["accessToken"], file_endpoint)
if "error" in file_info_result:
continue
# Build result with metadata
result_item = {
"fileId": fileId,
"fileName": fileName,
"sharepointFileId": file_id,
"siteName": site_name,
"siteUrl": site_url,
"size": file_info_result.get("size", 0),
"createdDateTime": file_info_result.get("createdDateTime"),
"lastModifiedDateTime": file_info_result.get("lastModifiedDateTime"),
"webUrl": file_info_result.get("webUrl")
}
# Add metadata if requested
if includeMetadata:
result_item["metadata"] = {
"mimeType": file_info_result.get("file", {}).get("mimeType"),
"downloadUrl": file_info_result.get("@microsoft.graph.downloadUrl"),
"createdBy": file_info_result.get("createdBy", {}),
"lastModifiedBy": file_info_result.get("lastModifiedBy", {}),
"parentReference": file_info_result.get("parentReference", {})
}
# Get file content if it's a readable format
mime_type = file_info_result.get("file", {}).get("mimeType", "")
if mime_type.startswith("text/") or mime_type in [
"application/json", "application/xml", "application/javascript"
]:
# Download the file content
content_endpoint = f"sites/{site_id}/drive/items/{file_id}/content"
# For content download, we need to handle binary data
try:
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {connection['accessToken']}"}
async with session.get(f"https://graph.microsoft.com/v1.0/{content_endpoint}", headers=headers) as response:
if response.status == 200:
content = await response.text()
result_item["content"] = content
else:
result_item["content"] = f"Could not download content: HTTP {response.status}"
except Exception as e:
result_item["content"] = f"Error downloading content: {str(e)}"
else:
result_item["content"] = f"Binary file type ({mime_type}) - content not retrieved"
read_results.append(result_item)
file_found = True
break
if file_found:
break
if not file_found:
read_results.append({
"fileId": fileId,
"fileName": fileName,
"error": "File not found in any accessible SharePoint site",
"content": None
})
except Exception as e:
logger.error(f"Error reading document {chatDocument.fileName}: {str(e)}")
read_results.append({
"fileId": chatDocument.fileId,
"fileName": chatDocument.fileName,
"error": str(e),
"content": None
})
result_data = {
"connectionReference": connectionReference,
"pathQuery": pathQuery,
"documentList": documentList,
"includeMetadata": includeMetadata,
"sitesSearched": len(sites),
"readResults": read_results,
"connection": {
"id": connection["id"],
"authority": "microsoft",
"reference": connectionReference
},
"timestamp": get_utc_timestamp()
}
# Determine output format based on expected formats
output_extension = ".json" # Default
output_mime_type = "application/json" # Default
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# Use the first expected format
expected_format = expectedDocumentFormats[0]
output_extension = expected_format.get("extension", ".json")
output_mime_type = expected_format.get("mimeType", "application/json")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
else:
logger.info("No expected format specified, using default .json format")
return ActionResult(
success=True,
documents=[
{
"documentName": f"sharepoint_documents_{self._format_timestamp_for_filename()}{output_extension}",
"documentData": result_data,
"mimeType": output_mime_type
}
]
)
except Exception as e:
logger.error(f"Error reading SharePoint documents: {str(e)}")
return ActionResult(
success=False,
error=str(e)
)
@action
async def uploadDocument(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Upload documents to SharePoint across accessible sites
Parameters:
connectionReference (str): Reference to the Microsoft connection
sitePath (str): REQUIRED - Specific SharePoint path where to upload documents. Must be a valid SharePoint path format:
- For direct upload: "/site:<Site Name>/<Library>/<Folder Path>" (e.g., "/site:KM XYZ/Documents/Work")
- If user provides words like "word1 word2", the system MUST call findDocumentPath first to locate the actual folder path, the result then to give to this parameter
documentList (str): Reference to the document list to upload
fileNames (List[str]): List of names for the uploaded files
resultDocument (str, optional): JSON result document from findDocumentPath action (alternative to sitePath)
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
connectionReference = parameters.get("connectionReference")
sitePath = parameters.get("sitePath", "/Documents")
documentList = parameters.get("documentList")
fileNames = parameters.get("fileNames")
resultDocument = parameters.get("resultDocument")
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
if not connectionReference or not documentList or not fileNames:
return ActionResult.isFailure(error="Connection reference, document list, and file names are required")
# If resultDocument is provided, extract folder IDs from it
if resultDocument:
try:
import json
# Resolve the reference label to get the actual document list
document_list = self.service.getChatDocumentsFromDocumentList([resultDocument])
if not document_list or len(document_list) == 0:
return ActionResult.isFailure(error=f"No document list found for reference: {resultDocument}")
# Get the first document's content (which should be the JSON)
first_document = document_list[0]
file_data = self.service.getFileData(first_document.fileId)
if not file_data:
return ActionResult.isFailure(error=f"No file data found for document: {resultDocument}")
# Parse the JSON content
result_data = json.loads(file_data)
found_documents = result_data.get("foundDocuments", [])
# Extract folder IDs from the result
folder_ids = []
for doc in found_documents:
if doc.get("type") == "folder":
folder_ids.append(doc.get("id"))
if folder_ids:
# Use the first folder ID found as sitePath
sitePath = folder_ids[0]
logger.info(f"Using folder ID from resultDocument: {sitePath}")
else:
return ActionResult.isFailure(error="No folders found in resultDocument")
except json.JSONDecodeError as e:
return ActionResult.isFailure(error=f"Invalid JSON in resultDocument: {str(e)}")
except Exception as e:
return ActionResult.isFailure(error=f"Error resolving resultDocument reference: {str(e)}")
# Get Microsoft connection
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Get documents from reference - ensure documentList is a list, not a string
if isinstance(documentList, str):
documentList = [documentList] # Convert string to list
chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(error="No documents found for the provided reference")
# Discover all SharePoint sites accessible to the user
sites = await self._discoverSharePointSites(connection["accessToken"])
if not sites:
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
# Enforce site-scoped path usage when using sitePath directly (without resultDocument)
upload_site_scope = None
if not resultDocument:
if not sitePath or not sitePath.startswith('/'):
return ActionResult.isFailure(error="sitePath must start with '/' and include site name with syntax /site:<Site Display Name>/... e.g. /site:KM LayerFinance/Documents/Work")
# Check if sitePath contains search terms (words without proper path structure)
if not sitePath.startswith('/site:') and not sitePath.startswith('/Documents') and not sitePath.startswith('/Shared Documents'):
# This looks like search terms, not a valid path
return ActionResult.isFailure(error=f"Invalid sitePath '{sitePath}'. This appears to be search terms, not a valid SharePoint path. Use findDocumentPath action first to search for folders, then use the returned folder path as sitePath.")
parsed = self._parse_site_scoped_path(sitePath)
if not parsed:
return ActionResult.isFailure(error="Invalid sitePath. Use /site:<Site Display Name>/<Library or Folder Path>")
# find matching site
candidate_sites = self._filter_sites_by_hint(sites, parsed["siteName"]) # substring match
# choose exact displayName match if available
exact = [s for s in candidate_sites if (s.get("displayName") or "").strip().lower() == parsed["siteName"].strip().lower()]
selected_site = exact[0] if exact else (candidate_sites[0] if candidate_sites else None)
if not selected_site:
return ActionResult.isFailure(error=f"SharePoint site '{parsed['siteName']}' not found or not accessible")
upload_site_scope = selected_site
# Use the inner path portion as the actual upload target path
upload_paths = [f"/{parsed['innerPath'].lstrip('/')}"]
sites = [selected_site]
else:
# Resolve path query into upload paths (fallback behavior when using resultDocument)
upload_paths = self._resolvePathQuery(sitePath)
# Process each document upload
upload_results = []
for i, (chatDocument, fileName) in enumerate(zip(chatDocuments, fileNames)):
try:
fileId = chatDocument.fileId
file_data = self.service.getFileData(fileId)
if not file_data:
logger.warning(f"File data not found for fileId: {fileId}")
upload_results.append({
"fileName": fileName,
"fileId": fileId,
"error": "File data not found",
"uploadStatus": "failed"
})
continue
# Upload to the first available site (or could be made configurable)
upload_successful = False
for site in sites:
site_id = site["id"]
site_name = site["displayName"]
site_url = site["webUrl"]
# Use the first upload path or default to Documents
upload_path = upload_paths[0] if upload_paths else "/Documents"
upload_path = upload_path.rstrip('/') + '/' + fileName
upload_path_clean = upload_path.lstrip('/')
# Upload endpoint for small files (< 4MB)
if len(file_data) < 4 * 1024 * 1024: # 4MB
upload_endpoint = f"sites/{site_id}/drive/root:/{upload_path_clean}:/content"
# Upload the file
upload_result = await self._makeGraphApiCall(
connection["accessToken"],
upload_endpoint,
method="PUT",
data=file_data
)
if "error" not in upload_result:
upload_results.append({
"fileName": fileName,
"fileId": fileId,
"uploadStatus": "success",
"siteName": site_name,
"siteUrl": site_url,
"uploadPath": upload_path,
"sharepointFileId": upload_result.get("id"),
"webUrl": upload_result.get("webUrl"),
"size": upload_result.get("size"),
"createdDateTime": upload_result.get("createdDateTime")
})
upload_successful = True
break
else:
logger.warning(f"Upload failed to site {site_name}: {upload_result['error']}")
else:
# For large files, we would need to implement resumable upload
logger.warning(f"File too large ({len(file_data)} bytes) for site {site_name}")
continue
if not upload_successful:
upload_results.append({
"fileName": fileName,
"fileId": fileId,
"error": f"File too large ({len(file_data)} bytes) or upload failed to all sites. Files larger than 4MB require resumable upload (not implemented).",
"uploadStatus": "failed"
})
except Exception as e:
logger.error(f"Error uploading document {fileName}: {str(e)}")
upload_results.append({
"fileName": fileName,
"fileId": fileId,
"error": str(e),
"uploadStatus": "failed"
})
# Create result data
result_data = {
"connectionReference": connectionReference,
"sitePath": sitePath,
"documentList": documentList,
"fileNames": fileNames,
"sitesAvailable": len(sites),
"uploadResults": upload_results,
"connection": {
"id": connection["id"],
"authority": "microsoft",
"reference": connectionReference
},
"timestamp": get_utc_timestamp()
}
# Determine output format based on expected formats
output_extension = ".json" # Default
output_mime_type = "application/json" # Default
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# Use the first expected format
expected_format = expectedDocumentFormats[0]
output_extension = expected_format.get("extension", ".json")
output_mime_type = expected_format.get("mimeType", "application/json")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
else:
logger.info("No expected format specified, using default .json format")
return ActionResult(
success=True,
documents=[
{
"documentName": f"sharepoint_upload_{self._format_timestamp_for_filename()}{output_extension}",
"documentData": result_data,
"mimeType": output_mime_type
}
]
)
except Exception as e:
logger.error(f"Error uploading to SharePoint: {str(e)}")
return ActionResult(
success=False,
error=str(e)
)
@action
async def listDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
"""
List documents in SharePoint folders across accessible sites
Parameters:
connectionReference (str): Reference to the Microsoft connection
searchQuery (str): [path:][type:][mode:]query - "Test Plan", "folders:Test Plan", "/Documents", "*"
Note: Use "folders:Name" to search for folders anywhere, not "path:/Name" which looks only in root
resultDocument (str, optional): JSON result document from findDocumentPath action (alternative to searchQuery)
includeSubfolders (bool, optional): Whether to include subfolders (default: False)
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
connectionReference = parameters.get("connectionReference")
searchQuery = parameters.get("searchQuery", "*")
resultDocument = parameters.get("resultDocument")
includeSubfolders = parameters.get("includeSubfolders", False) # Default to False for better UX
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
if not connectionReference:
return ActionResult.isFailure(error="Connection reference is required")
# If resultDocument is provided, resolve the reference and extract folder IDs from it
if resultDocument:
try:
import json
# Resolve the reference label to get the actual document list
document_list = self.service.getChatDocumentsFromDocumentList([resultDocument])
if not document_list or len(document_list) == 0:
return ActionResult.isFailure(error=f"No document list found for reference: {resultDocument}")
# Get the first document's content (which should be the JSON)
first_document = document_list[0]
logger.info(f"Document fileId: {first_document.fileId}, fileName: {first_document.fileName}")
file_data = self.service.getFileData(first_document.fileId)
if not file_data:
return ActionResult.isFailure(error=f"No file data found for document: {resultDocument} (fileId: {first_document.fileId})")
logger.info(f"File data length: {len(file_data) if file_data else 0}")
# Parse the JSON content
result_data = json.loads(file_data)
found_documents = result_data.get("foundDocuments", [])
# Extract folder IDs from the result
folder_ids = []
for doc in found_documents:
if doc.get("type") == "folder":
folder_ids.append(doc.get("id"))
if folder_ids:
# Use the first folder ID found
searchQuery = folder_ids[0]
logger.info(f"Using folder ID from resultDocument: {searchQuery}")
else:
return ActionResult.isFailure(error="No folders found in resultDocument")
except json.JSONDecodeError as e:
return ActionResult.isFailure(error=f"Invalid JSON in resultDocument: {str(e)}")
except Exception as e:
return ActionResult.isFailure(error=f"Error resolving resultDocument reference: {str(e)}")
# Get Microsoft connection
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
logger.info(f"Starting SharePoint listDocuments for searchQuery: {searchQuery}")
logger.debug(f"Connection ID: {connection['id']}")
# Parse searchQuery to extract path, search terms, search type, and options
pathQuery, fileQuery, searchType, searchOptions = self._parseSearchQuery(searchQuery)
# Discover all SharePoint sites accessible to the user
sites = await self._discoverSharePointSites(connection["accessToken"])
if not sites:
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
# Check if searchQuery is a folder ID (starts with 01PPXICCB...)
if searchQuery.startswith('01PPXICCB') or searchQuery.startswith('01'):
# Direct folder ID - use it directly
folder_paths = [searchQuery]
logger.info(f"Using direct folder ID: {searchQuery}")
else:
# Resolve path query into folder paths
folder_paths = self._resolvePathQuery(pathQuery)
logger.info(f"Resolved folder paths: {folder_paths}")
# Process each folder path across all sites
list_results = []
for folderPath in folder_paths:
try:
folder_results = []
for site in sites:
site_id = site["id"]
site_name = site["displayName"]
site_url = site["webUrl"]
logger.info(f"Listing folder {folderPath} in site: {site_name}")
# Determine the endpoint based on folder path
if folderPath in ["/", ""] or folderPath == "*":
# Root folder
endpoint = f"sites/{site_id}/drive/root/children"
elif folderPath.startswith('01PPXICCB') or folderPath.startswith('01'):
# Direct folder ID
endpoint = f"sites/{site_id}/drive/items/{folderPath}/children"
else:
# Specific folder path - remove leading slash if present
folder_path_clean = folderPath.lstrip('/')
endpoint = f"sites/{site_id}/drive/root:/{folder_path_clean}:/children"
# Make the API call to list folder contents
api_result = await self._makeGraphApiCall(connection["accessToken"], endpoint)
if "error" in api_result:
logger.warning(f"Failed to list folder {folderPath} in site {site_name}: {api_result['error']}")
continue
# Process the results
items = api_result.get("value", [])
processed_items = []
for item in items:
item_info = {
"id": item.get("id"),
"name": item.get("name"),
"size": item.get("size", 0),
"createdDateTime": item.get("createdDateTime"),
"lastModifiedDateTime": item.get("lastModifiedDateTime"),
"webUrl": item.get("webUrl"),
"type": "folder" if "folder" in item else "file",
"siteName": site_name,
"siteUrl": site_url
}
# Add file-specific information
if "file" in item:
item_info.update({
"mimeType": item["file"].get("mimeType"),
"downloadUrl": item.get("@microsoft.graph.downloadUrl")
})
# Add folder-specific information
if "folder" in item:
item_info.update({
"childCount": item["folder"].get("childCount", 0)
})
processed_items.append(item_info)
# If include subfolders is enabled, get ONLY direct subfolder contents (1 level deep only)
if includeSubfolders:
logger.info(f"Including subfolders - processing {len([item for item in processed_items if item['type'] == 'folder'])} folders")
subfolder_count = 0
max_subfolders = 10 # Limit to prevent infinite loops
for item in processed_items[:]: # Use slice to avoid modifying list during iteration
if item["type"] == "folder" and subfolder_count < max_subfolders:
subfolder_count += 1
subfolder_path = f"{folderPath.rstrip('/')}/{item['name']}"
subfolder_endpoint = f"sites/{site_id}/drive/items/{item['id']}/children"
logger.debug(f"Getting contents of subfolder: {item['name']}")
subfolder_result = await self._makeGraphApiCall(connection["accessToken"], subfolder_endpoint)
if "error" not in subfolder_result:
subfolder_items = subfolder_result.get("value", [])
logger.debug(f"Found {len(subfolder_items)} items in subfolder {item['name']}")
for subfolder_item in subfolder_items:
# Only add files and direct subfolders, NO RECURSION
subfolder_item_info = {
"id": subfolder_item.get("id"),
"name": subfolder_item.get("name"),
"size": subfolder_item.get("size", 0),
"createdDateTime": subfolder_item.get("createdDateTime"),
"lastModifiedDateTime": subfolder_item.get("lastModifiedDateTime"),
"webUrl": subfolder_item.get("webUrl"),
"type": "folder" if "folder" in subfolder_item else "file",
"parentPath": subfolder_path,
"siteName": site_name,
"siteUrl": site_url
}
if "file" in subfolder_item:
subfolder_item_info.update({
"mimeType": subfolder_item["file"].get("mimeType"),
"downloadUrl": subfolder_item.get("@microsoft.graph.downloadUrl")
})
processed_items.append(subfolder_item_info)
else:
logger.warning(f"Failed to get contents of subfolder {item['name']}: {subfolder_result.get('error')}")
elif subfolder_count >= max_subfolders:
logger.warning(f"Reached maximum subfolder limit ({max_subfolders}), skipping remaining folders")
break
logger.info(f"Processed {subfolder_count} subfolders, total items: {len(processed_items)}")
folder_results.append({
"siteName": site_name,
"siteUrl": site_url,
"itemCount": len(processed_items),
"items": processed_items
})
list_results.append({
"folderPath": folderPath,
"sitesProcessed": len(folder_results),
"siteResults": folder_results
})
except Exception as e:
logger.error(f"Error listing folder {folderPath}: {str(e)}")
list_results.append({
"folderPath": folderPath,
"error": str(e),
"siteResults": []
})
# Create result data
result_data = {
"searchQuery": searchQuery,
"includeSubfolders": includeSubfolders,
"sitesSearched": len(sites),
"listResults": list_results,
"timestamp": get_utc_timestamp()
}
# Determine output format based on expected formats
output_extension = ".json" # Default
output_mime_type = "application/json" # Default
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# Use the first expected format
expected_format = expectedDocumentFormats[0]
output_extension = expected_format.get("extension", ".json")
output_mime_type = expected_format.get("mimeType", "application/json")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
else:
logger.info("No expected format specified, using default .json format")
return ActionResult(
success=True,
documents=[
{
"documentName": f"sharepoint_document_list_{self._format_timestamp_for_filename()}{output_extension}",
"documentData": result_data,
"mimeType": output_mime_type
}
]
)
except Exception as e:
logger.error(f"Error listing SharePoint documents: {str(e)}")
return ActionResult(
success=False,
error=str(e)
)