""" SharePoint operations method module. Handles SharePoint document operations using the SharePoint service. """ import logging import json import re from typing import Dict, Any, List, Optional from datetime import datetime, UTC import base64 from urllib.parse import urlparse import aiohttp import asyncio from modules.chat.methodBase import MethodBase, action from modules.interfaces.interfaceChatModel import ActionResult from modules.shared.timezoneUtils import get_utc_timestamp logger = logging.getLogger(__name__) class MethodSharepoint(MethodBase): """SharePoint operations methods.""" def __init__(self, service): super().__init__(service) self.name = "sharepoint" self.description = "SharePoint operations methods" def _format_timestamp_for_filename(self) -> str: """Format current timestamp as YYYYMMDD-hhmmss for filenames.""" return datetime.now(UTC).strftime("%Y%m%d-%H%M%S") def _getMicrosoftConnection(self, connectionReference: str) -> Optional[Dict[str, Any]]: """Get Microsoft connection from connection reference""" try: userConnection = self.service.getUserConnectionFromConnectionReference(connectionReference) if not userConnection: logger.warning(f"No user connection found for reference: {connectionReference}") return None if userConnection.authority.value != "msft": logger.warning(f"Connection {userConnection.id} is not Microsoft (authority: {userConnection.authority.value})") return None # Check if connection is active or pending (pending means OAuth in progress) if userConnection.status.value not in ["active", "pending"]: logger.warning(f"Connection {userConnection.id} status is not active/pending: {userConnection.status.value}") return None # Get the token for this specific connection token = self.service.interfaceApp.getConnectionToken(userConnection.id) if not token: logger.warning(f"No token found for connection {userConnection.id}") return None # Check if token is expired if hasattr(token, 'expiresAt') and token.expiresAt: current_time = get_utc_timestamp() if current_time > token.expiresAt: logger.warning(f"Token for connection {userConnection.id} is expired (expiresAt: {token.expiresAt}, current: {current_time})") return None logger.info(f"Successfully retrieved Microsoft connection: {userConnection.id}, status: {userConnection.status.value}, externalId: {userConnection.externalId}") return { "id": userConnection.id, "userConnection": userConnection, "accessToken": token.tokenAccess, "refreshToken": token.tokenRefresh, "scopes": ["Sites.ReadWrite.All", "Files.ReadWrite.All", "User.Read"] # SharePoint scopes } except Exception as e: logger.error(f"Error getting Microsoft connection: {str(e)}") return None async def _discoverSharePointSites(self, access_token: str) -> List[Dict[str, Any]]: """ Discover all SharePoint sites accessible to the user via Microsoft Graph API Parameters: access_token (str): Microsoft Graph access token Returns: List[Dict[str, Any]]: List of SharePoint site information """ try: # Query Microsoft Graph to get all sites the user has access to endpoint = "sites?search=*" result = await self._makeGraphApiCall(access_token, endpoint) if "error" in result: logger.error(f"Error discovering SharePoint sites: {result['error']}") return [] sites = result.get("value", []) logger.info(f"Discovered {len(sites)} SharePoint sites") # Process and return site information processed_sites = [] for site in sites: site_info = { "id": site.get("id"), "displayName": site.get("displayName"), "name": site.get("name"), "webUrl": site.get("webUrl"), "description": site.get("description"), "createdDateTime": site.get("createdDateTime"), "lastModifiedDateTime": site.get("lastModifiedDateTime") } processed_sites.append(site_info) logger.debug(f"Site: {site_info['displayName']} - {site_info['webUrl']}") return processed_sites except Exception as e: logger.error(f"Error discovering SharePoint sites: {str(e)}") return [] def _parseSearchQuery(self, searchQuery: str) -> tuple[str, str, str, dict]: """ Parse searchQuery to extract path, search terms, search type, and search options. Parameters: searchQuery (str): Enhanced search query with options: - "budget" -> pathQuery="*", fileQuery="budget", searchType="all", options={} - "/Documents:budget" -> pathQuery="/Documents", fileQuery="budget", searchType="all", options={} - "files:budget" -> pathQuery="*", fileQuery="budget", searchType="files", options={} - "folders:DELTA" -> pathQuery="*", fileQuery="DELTA", searchType="folders", options={} - "exact:\"Operations 2025\"" -> exact phrase matching - "regex:^Operations.*2025$" -> regex pattern matching - "case:DELTA" -> case-sensitive search - "and:DELTA AND 2025 Mars AND Group" -> all AND terms must be present Returns: tuple[str, str, str, dict]: (pathQuery, fileQuery, searchType, searchOptions) """ try: if not searchQuery or not searchQuery.strip() or searchQuery.strip() == "*": return "*", "*", "all", {} searchQuery = searchQuery.strip() searchOptions = {} # Check for search type specification (files:, folders:, all:) searchType = "all" # Default if searchQuery.startswith(("files:", "folders:", "all:")): type_parts = searchQuery.split(':', 1) searchType = type_parts[0].strip() searchQuery = type_parts[1].strip() # Check for search mode specification (exact:, regex:, case:, and:) if searchQuery.startswith(("exact:", "regex:", "case:", "and:")): mode_parts = searchQuery.split(':', 1) mode = mode_parts[0].strip() searchQuery = mode_parts[1].strip() if mode == "exact": searchOptions["exact_match"] = True # Remove quotes if present if searchQuery.startswith('"') and searchQuery.endswith('"'): searchQuery = searchQuery[1:-1] elif mode == "regex": searchOptions["regex_match"] = True elif mode == "case": searchOptions["case_sensitive"] = True elif mode == "and": searchOptions["and_terms"] = True # Check if it contains path:search format if ':' in searchQuery: parts = searchQuery.split(':', 1) # Split only on first colon path_part = parts[0].strip() search_part = parts[1].strip() # Handle path part if not path_part or path_part == "*": pathQuery = "*" elif path_part.startswith('/'): pathQuery = path_part else: pathQuery = f"/Documents/{path_part}" # Handle search part if not search_part or search_part == "*": fileQuery = "*" else: fileQuery = search_part return pathQuery, fileQuery, searchType, searchOptions # No colon - check if it looks like a path elif searchQuery.startswith('/'): # It's a path only return searchQuery, "*", searchType, searchOptions else: # It's a search term only return "*", searchQuery, searchType, searchOptions except Exception as e: logger.error(f"Error parsing searchQuery '{searchQuery}': {str(e)}") return "*", "*", "all", {} def _resolvePathQuery(self, pathQuery: str) -> List[str]: """ Resolve pathQuery into a list of search paths for SharePoint operations. Parameters: pathQuery (str): Query string that can contain: - Direct paths (e.g., "/Documents/Project1") - Wildcards (e.g., "/Documents/*") - Multiple paths separated by semicolons (e.g., "/Docs; /Files") - Relative paths (e.g., "Project1" -> resolved to default folder) - Empty string or "*" for global search Returns: List[str]: List of resolved paths """ try: if not pathQuery or not pathQuery.strip() or pathQuery.strip() == "*": return ["*"] # Global search across all sites # Split by semicolon to handle multiple paths raw_paths = [path.strip() for path in pathQuery.split(';') if path.strip()] resolved_paths = [] for raw_path in raw_paths: # Handle wildcards - return as-is if '*' in raw_path: resolved_paths.append(raw_path) # Handle absolute paths elif raw_path.startswith('/'): resolved_paths.append(raw_path) # Handle relative paths - prepend default folder else: resolved_paths.append(f"/Documents/{raw_path}") # Remove duplicates while preserving order seen = set() unique_paths = [] for path in resolved_paths: if path not in seen: seen.add(path) unique_paths.append(path) logger.info(f"Resolved pathQuery '{pathQuery}' to {len(unique_paths)} paths: {unique_paths}") return unique_paths except Exception as e: logger.error(f"Error resolving pathQuery '{pathQuery}': {str(e)}") return ["*"] # Fallback to global search def _parseSiteUrl(self, siteUrl: str) -> Dict[str, str]: """Parse SharePoint site URL to extract hostname and site path""" try: parsed = urlparse(siteUrl) hostname = parsed.hostname path = parsed.path.strip('/') return { "hostname": hostname, "sitePath": path } except Exception as e: logger.error(f"Error parsing site URL {siteUrl}: {str(e)}") return {"hostname": "", "sitePath": ""} async def _makeGraphApiCall(self, access_token: str, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]: """Make a Microsoft Graph API call with timeout and detailed logging""" try: headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json" } url = f"https://graph.microsoft.com/v1.0/{endpoint}" logger.info(f"Making Graph API call: {method} {url}") # Set timeout to 30 seconds timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: if method == "GET": logger.debug(f"Starting GET request to {url}") async with session.get(url, headers=headers) as response: logger.info(f"Graph API response: {response.status}") if response.status == 200: result = await response.json() logger.debug(f"Graph API success: {len(str(result))} characters response") return result else: error_text = await response.text() logger.error(f"Graph API call failed: {response.status} - {error_text}") return {"error": f"API call failed: {response.status} - {error_text}"} elif method == "PUT": logger.debug(f"Starting PUT request to {url}") async with session.put(url, headers=headers, data=data) as response: logger.info(f"Graph API response: {response.status}") if response.status in [200, 201]: result = await response.json() logger.debug(f"Graph API success: {len(str(result))} characters response") return result else: error_text = await response.text() logger.error(f"Graph API call failed: {response.status} - {error_text}") return {"error": f"API call failed: {response.status} - {error_text}"} elif method == "POST": logger.debug(f"Starting POST request to {url}") async with session.post(url, headers=headers, data=data) as response: logger.info(f"Graph API response: {response.status}") if response.status in [200, 201]: result = await response.json() logger.debug(f"Graph API success: {len(str(result))} characters response") return result else: error_text = await response.text() logger.error(f"Graph API call failed: {response.status} - {error_text}") return {"error": f"API call failed: {response.status} - {error_text}"} except asyncio.TimeoutError: logger.error(f"Graph API call timed out after 30 seconds: {endpoint}") return {"error": f"API call timed out after 30 seconds: {endpoint}"} except Exception as e: logger.error(f"Error making Graph API call: {str(e)}") return {"error": f"Error making Graph API call: {str(e)}"} async def _getSiteId(self, access_token: str, hostname: str, site_path: str) -> str: """Get SharePoint site ID from hostname and site path""" try: endpoint = f"sites/{hostname}:/{site_path}" result = await self._makeGraphApiCall(access_token, endpoint) if "error" in result: logger.error(f"Error getting site ID: {result['error']}") return "" return result.get("id", "") except Exception as e: logger.error(f"Error getting site ID: {str(e)}") return "" @action async def findDocumentPath(self, parameters: Dict[str, Any]) -> ActionResult: """ Find documents by searching their content, names, or metadata across all accessible SharePoint sites Parameters: connectionReference (str): Reference to the Microsoft connection searchQuery (str): [path:][type:][mode:]query - Enhanced search syntax: - "budget", "/Documents:budget", "files:budget", "folders:DELTA", "*" - "exact:\"Operations 2025\"" - exact phrase matching - "regex:^Operations.*2025$" - regex pattern matching - "case:DELTA" - case-sensitive search - "and:DELTA AND 2025 Mars AND Group" - all terms must be present - "folders:and:DELTA AND 2025 Mars AND Group" - combined options Note: For storage locations, use "folders:" prefix. All search terms must be present by default. resultDocument (str, optional): JSON result document from previous findDocumentPath action to refine search searchScope (str, optional): Search scope - options: "all" (default), "documents" (files only), "pages" (SharePoint pages only) maxResults (int, optional): Maximum number of results to return (default: 100) expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: connectionReference = parameters.get("connectionReference") searchQuery = parameters.get("searchQuery", "*") resultDocument = parameters.get("resultDocument") searchScope = parameters.get("searchScope", "all") maxResults = parameters.get("maxResults", 100) expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not connectionReference: return ActionResult.isFailure(error="Connection reference is required") # If resultDocument is provided, extract site information to refine search if resultDocument: try: import json # Resolve the reference label to get the actual document list document_list = self.service.getChatDocumentsFromDocumentList([resultDocument]) if not document_list or len(document_list) == 0: return ActionResult.isFailure(error=f"No document list found for reference: {resultDocument}") # Get the first document's content (which should be the JSON) first_document = document_list[0] file_data = self.service.getFileData(first_document.fileId) if not file_data: return ActionResult.isFailure(error=f"No file data found for document: {resultDocument}") # Parse the JSON content result_data = json.loads(file_data) found_documents = result_data.get("foundDocuments", []) # Extract site information from the result for context if found_documents: # Use the site information from the previous search to refine current search # This could be used to limit search to specific sites or add context logger.info(f"Refining search using {len(found_documents)} documents from previous result") except json.JSONDecodeError as e: return ActionResult.isFailure(error=f"Invalid JSON in resultDocument: {str(e)}") except Exception as e: return ActionResult.isFailure(error=f"Error resolving resultDocument reference: {str(e)}") # Parse searchQuery to extract path, search terms, search type, and options pathQuery, fileQuery, searchType, searchOptions = self._parseSearchQuery(searchQuery) connection = self._getMicrosoftConnection(connectionReference) if not connection: return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference") # Discover all SharePoint sites accessible to the user sites = await self._discoverSharePointSites(connection["accessToken"]) if not sites: return ActionResult.isFailure(error="No SharePoint sites found or accessible") # Resolve path query into search paths search_paths = self._resolvePathQuery(pathQuery) try: # Search across all discovered sites found_documents = [] all_sites_searched = [] for site in sites: site_id = site["id"] site_name = site["displayName"] site_url = site["webUrl"] logger.info(f"Searching in site: {site_name} ({site_url})") # Use Microsoft Graph search API for this specific site # Handle empty or wildcard queries if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*": # For wildcard/empty queries, list all items in the drive endpoint = f"sites/{site_id}/drive/root/children" else: # For specific queries, use search API search_query = fileQuery.replace("'", "''") # Escape single quotes for OData endpoint = f"sites/{site_id}/drive/root/search(q='{search_query}')" # Make the search API call search_result = await self._makeGraphApiCall(connection["accessToken"], endpoint) if "error" in search_result: logger.warning(f"Search failed for site {site_name}: {search_result['error']}") continue # Process search results for this site items = search_result.get("value", []) site_documents = [] for item in items: # Filter by search scope if specified if searchScope == "documents" and "folder" in item: continue elif searchScope == "pages" and "file" in item and not item["file"].get("mimeType", "").startswith("text/html"): continue # Filter by search type (files, folders, all) if searchType == "files" and "folder" in item: continue elif searchType == "folders" and "file" in item: continue # Enhanced post-filtering based on search options item_name = item.get("name", "") if fileQuery != "*" and fileQuery.strip(): # Apply different filtering based on search options if searchOptions.get("exact_match"): # Exact phrase matching if searchOptions.get("case_sensitive"): if fileQuery not in item_name: continue else: if fileQuery.lower() not in item_name.lower(): continue elif searchOptions.get("regex_match"): # Regex pattern matching import re flags = 0 if searchOptions.get("case_sensitive") else re.IGNORECASE if not re.search(fileQuery, item_name, flags): continue elif searchOptions.get("and_terms"): # AND terms mode: Split by " AND " and ensure ALL terms are present search_name = item_name.lower() if not searchOptions.get("case_sensitive") else item_name and_terms = [term.strip() for term in fileQuery.split(" AND ") if term.strip()] and_terms = [term.lower() if not searchOptions.get("case_sensitive") else term for term in and_terms] if not all(term in search_name for term in and_terms): continue # Skip this item if not all AND terms match else: # Default: ALL search terms must be present (space-separated) search_name = item_name.lower() if not searchOptions.get("case_sensitive") else item_name search_terms = [term.strip().lower() if not searchOptions.get("case_sensitive") else term.strip() for term in fileQuery.split() if term.strip()] if not all(term in search_name for term in search_terms): continue # Skip this item if not all terms match # Create minimal result with only essential reference information doc_info = { "id": item.get("id"), "name": item.get("name"), "type": "folder" if "folder" in item else "file", "siteName": site_name, "siteId": site_id } site_documents.append(doc_info) found_documents.extend(site_documents) all_sites_searched.append({ "siteName": site_name, "siteUrl": site_url, "siteId": site_id, "documentsFound": len(site_documents) }) logger.info(f"Found {len(site_documents)} documents in site {site_name}") # Limit total results to maxResults if len(found_documents) > maxResults: found_documents = found_documents[:maxResults] logger.info(f"Limited results to {maxResults} items") result_data = { "searchQuery": searchQuery, "totalResults": len(found_documents), "maxResults": maxResults, "foundDocuments": found_documents, "timestamp": get_utc_timestamp() } except Exception as e: logger.error(f"Error searching SharePoint: {str(e)}") return ActionResult.isFailure(error=str(e)) # Determine output format based on expected formats output_extension = ".json" # Default output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", ".json") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info("No expected format specified, using default .json format") return ActionResult( success=True, documents=[ { "documentName": f"sharepoint_find_path_{self._format_timestamp_for_filename()}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] ) except Exception as e: logger.error(f"Error finding document path: {str(e)}") return ActionResult.isFailure(error=str(e)) @action async def readDocuments(self, parameters: Dict[str, Any]) -> ActionResult: """ Read documents from SharePoint across all accessible sites Parameters: documentList (str): Reference to the document list to read connectionReference (str): Reference to the Microsoft connection pathQuery (str): Path query to locate documents (e.g., "/Documents/Project1", "*" for all sites) resultDocument (str, optional): JSON result document from findDocumentPath action (alternative to pathQuery) includeMetadata (bool, optional): Whether to include metadata (default: True) expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: documentList = parameters.get("documentList") connectionReference = parameters.get("connectionReference") pathQuery = parameters.get("pathQuery", "*") resultDocument = parameters.get("resultDocument") includeMetadata = parameters.get("includeMetadata", True) expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not documentList or not connectionReference: return ActionResult.isFailure(error="Document list reference and connection reference are required") # If resultDocument is provided, extract folder IDs from it if resultDocument: try: import json # Resolve the reference label to get the actual document list document_list = self.service.getChatDocumentsFromDocumentList([resultDocument]) if not document_list or len(document_list) == 0: return ActionResult.isFailure(error=f"No document list found for reference: {resultDocument}") # Get the first document's content (which should be the JSON) first_document = document_list[0] file_data = self.service.getFileData(first_document.fileId) if not file_data: return ActionResult.isFailure(error=f"No file data found for document: {resultDocument}") # Parse the JSON content result_data = json.loads(file_data) found_documents = result_data.get("foundDocuments", []) # Extract folder IDs from the result folder_ids = [] for doc in found_documents: if doc.get("type") == "folder": folder_ids.append(doc.get("id")) if folder_ids: # Use the first folder ID found as pathQuery pathQuery = folder_ids[0] logger.info(f"Using folder ID from resultDocument: {pathQuery}") else: return ActionResult.isFailure(error="No folders found in resultDocument") except json.JSONDecodeError as e: return ActionResult.isFailure(error=f"Invalid JSON in resultDocument: {str(e)}") except Exception as e: return ActionResult.isFailure(error=f"Error resolving resultDocument reference: {str(e)}") # Get documents from reference - ensure documentList is a list, not a string if isinstance(documentList, str): documentList = [documentList] # Convert string to list chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList) if not chatDocuments: return ActionResult.isFailure(error="No documents found for the provided reference") connection = self._getMicrosoftConnection(connectionReference) if not connection: return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference") # Discover all SharePoint sites accessible to the user sites = await self._discoverSharePointSites(connection["accessToken"]) if not sites: return ActionResult.isFailure(error="No SharePoint sites found or accessible") # Resolve path query into search paths search_paths = self._resolvePathQuery(pathQuery) # Process each chat document across all sites read_results = [] for i, chatDocument in enumerate(chatDocuments): try: fileId = chatDocument.fileId fileName = chatDocument.fileName # Search for this file across all sites file_found = False for site in sites: site_id = site["id"] site_name = site["displayName"] site_url = site["webUrl"] # Try to find the file by name in this site search_query = fileName.replace("'", "''") # Escape single quotes for OData endpoint = f"sites/{site_id}/drive/root/search(q='{search_query}')" search_result = await self._makeGraphApiCall(connection["accessToken"], endpoint) if "error" in search_result: continue items = search_result.get("value", []) for item in items: if item.get("name") == fileName: # Found the file, get its details file_id = item.get("id") file_endpoint = f"sites/{site_id}/drive/items/{file_id}" # Get file metadata file_info_result = await self._makeGraphApiCall(connection["accessToken"], file_endpoint) if "error" in file_info_result: continue # Build result with metadata result_item = { "fileId": fileId, "fileName": fileName, "sharepointFileId": file_id, "siteName": site_name, "siteUrl": site_url, "size": file_info_result.get("size", 0), "createdDateTime": file_info_result.get("createdDateTime"), "lastModifiedDateTime": file_info_result.get("lastModifiedDateTime"), "webUrl": file_info_result.get("webUrl") } # Add metadata if requested if includeMetadata: result_item["metadata"] = { "mimeType": file_info_result.get("file", {}).get("mimeType"), "downloadUrl": file_info_result.get("@microsoft.graph.downloadUrl"), "createdBy": file_info_result.get("createdBy", {}), "lastModifiedBy": file_info_result.get("lastModifiedBy", {}), "parentReference": file_info_result.get("parentReference", {}) } # Get file content if it's a readable format mime_type = file_info_result.get("file", {}).get("mimeType", "") if mime_type.startswith("text/") or mime_type in [ "application/json", "application/xml", "application/javascript" ]: # Download the file content content_endpoint = f"sites/{site_id}/drive/items/{file_id}/content" # For content download, we need to handle binary data try: async with aiohttp.ClientSession() as session: headers = {"Authorization": f"Bearer {connection['accessToken']}"} async with session.get(f"https://graph.microsoft.com/v1.0/{content_endpoint}", headers=headers) as response: if response.status == 200: content = await response.text() result_item["content"] = content else: result_item["content"] = f"Could not download content: HTTP {response.status}" except Exception as e: result_item["content"] = f"Error downloading content: {str(e)}" else: result_item["content"] = f"Binary file type ({mime_type}) - content not retrieved" read_results.append(result_item) file_found = True break if file_found: break if not file_found: read_results.append({ "fileId": fileId, "fileName": fileName, "error": "File not found in any accessible SharePoint site", "content": None }) except Exception as e: logger.error(f"Error reading document {chatDocument.fileName}: {str(e)}") read_results.append({ "fileId": chatDocument.fileId, "fileName": chatDocument.fileName, "error": str(e), "content": None }) result_data = { "connectionReference": connectionReference, "pathQuery": pathQuery, "documentList": documentList, "includeMetadata": includeMetadata, "sitesSearched": len(sites), "readResults": read_results, "connection": { "id": connection["id"], "authority": "microsoft", "reference": connectionReference }, "timestamp": get_utc_timestamp() } # Determine output format based on expected formats output_extension = ".json" # Default output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", ".json") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info("No expected format specified, using default .json format") return ActionResult( success=True, documents=[ { "documentName": f"sharepoint_documents_{self._format_timestamp_for_filename()}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] ) except Exception as e: logger.error(f"Error reading SharePoint documents: {str(e)}") return ActionResult( success=False, error=str(e) ) @action async def uploadDocument(self, parameters: Dict[str, Any]) -> ActionResult: """ Upload documents to SharePoint across accessible sites Parameters: connectionReference (str): Reference to the Microsoft connection pathQuery (str): Path query where to upload documents (e.g., "/Documents/Project1", "*" for default location) documentList (str): Reference to the document list to upload fileNames (List[str]): List of names for the uploaded files resultDocument (str, optional): JSON result document from findDocumentPath action (alternative to pathQuery) expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: connectionReference = parameters.get("connectionReference") pathQuery = parameters.get("pathQuery", "/Documents") documentList = parameters.get("documentList") fileNames = parameters.get("fileNames") resultDocument = parameters.get("resultDocument") expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not connectionReference or not documentList or not fileNames: return ActionResult.isFailure(error="Connection reference, document list, and file names are required") # If resultDocument is provided, extract folder IDs from it if resultDocument: try: import json # Resolve the reference label to get the actual document list document_list = self.service.getChatDocumentsFromDocumentList([resultDocument]) if not document_list or len(document_list) == 0: return ActionResult.isFailure(error=f"No document list found for reference: {resultDocument}") # Get the first document's content (which should be the JSON) first_document = document_list[0] file_data = self.service.getFileData(first_document.fileId) if not file_data: return ActionResult.isFailure(error=f"No file data found for document: {resultDocument}") # Parse the JSON content result_data = json.loads(file_data) found_documents = result_data.get("foundDocuments", []) # Extract folder IDs from the result folder_ids = [] for doc in found_documents: if doc.get("type") == "folder": folder_ids.append(doc.get("id")) if folder_ids: # Use the first folder ID found as pathQuery pathQuery = folder_ids[0] logger.info(f"Using folder ID from resultDocument: {pathQuery}") else: return ActionResult.isFailure(error="No folders found in resultDocument") except json.JSONDecodeError as e: return ActionResult.isFailure(error=f"Invalid JSON in resultDocument: {str(e)}") except Exception as e: return ActionResult.isFailure(error=f"Error resolving resultDocument reference: {str(e)}") # Get Microsoft connection connection = self._getMicrosoftConnection(connectionReference) if not connection: return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference") # Get documents from reference - ensure documentList is a list, not a string if isinstance(documentList, str): documentList = [documentList] # Convert string to list chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList) if not chatDocuments: return ActionResult.isFailure(error="No documents found for the provided reference") # Discover all SharePoint sites accessible to the user sites = await self._discoverSharePointSites(connection["accessToken"]) if not sites: return ActionResult.isFailure(error="No SharePoint sites found or accessible") # Resolve path query into upload paths upload_paths = self._resolvePathQuery(pathQuery) # Process each document upload upload_results = [] for i, (chatDocument, fileName) in enumerate(zip(chatDocuments, fileNames)): try: fileId = chatDocument.fileId file_data = self.service.getFileData(fileId) if not file_data: logger.warning(f"File data not found for fileId: {fileId}") upload_results.append({ "fileName": fileName, "fileId": fileId, "error": "File data not found", "uploadStatus": "failed" }) continue # Upload to the first available site (or could be made configurable) upload_successful = False for site in sites: site_id = site["id"] site_name = site["displayName"] site_url = site["webUrl"] # Use the first upload path or default to Documents upload_path = upload_paths[0] if upload_paths else "/Documents" upload_path = upload_path.rstrip('/') + '/' + fileName upload_path_clean = upload_path.lstrip('/') # Upload endpoint for small files (< 4MB) if len(file_data) < 4 * 1024 * 1024: # 4MB upload_endpoint = f"sites/{site_id}/drive/root:/{upload_path_clean}:/content" # Upload the file upload_result = await self._makeGraphApiCall( connection["accessToken"], upload_endpoint, method="PUT", data=file_data ) if "error" not in upload_result: upload_results.append({ "fileName": fileName, "fileId": fileId, "uploadStatus": "success", "siteName": site_name, "siteUrl": site_url, "uploadPath": upload_path, "sharepointFileId": upload_result.get("id"), "webUrl": upload_result.get("webUrl"), "size": upload_result.get("size"), "createdDateTime": upload_result.get("createdDateTime") }) upload_successful = True break else: logger.warning(f"Upload failed to site {site_name}: {upload_result['error']}") else: # For large files, we would need to implement resumable upload logger.warning(f"File too large ({len(file_data)} bytes) for site {site_name}") continue if not upload_successful: upload_results.append({ "fileName": fileName, "fileId": fileId, "error": f"File too large ({len(file_data)} bytes) or upload failed to all sites. Files larger than 4MB require resumable upload (not implemented).", "uploadStatus": "failed" }) except Exception as e: logger.error(f"Error uploading document {fileName}: {str(e)}") upload_results.append({ "fileName": fileName, "fileId": fileId, "error": str(e), "uploadStatus": "failed" }) # Create result data result_data = { "connectionReference": connectionReference, "pathQuery": pathQuery, "documentList": documentList, "fileNames": fileNames, "sitesAvailable": len(sites), "uploadResults": upload_results, "connection": { "id": connection["id"], "authority": "microsoft", "reference": connectionReference }, "timestamp": get_utc_timestamp() } # Determine output format based on expected formats output_extension = ".json" # Default output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", ".json") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info("No expected format specified, using default .json format") return ActionResult( success=True, documents=[ { "documentName": f"sharepoint_upload_{self._format_timestamp_for_filename()}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] ) except Exception as e: logger.error(f"Error uploading to SharePoint: {str(e)}") return ActionResult( success=False, error=str(e) ) @action async def listDocuments(self, parameters: Dict[str, Any]) -> ActionResult: """ List documents in SharePoint folders across accessible sites Parameters: connectionReference (str): Reference to the Microsoft connection searchQuery (str): [path:][type:][mode:]query - "Test Plan", "folders:Test Plan", "/Documents", "*" Note: Use "folders:Name" to search for folders anywhere, not "path:/Name" which looks only in root resultDocument (str, optional): JSON result document from findDocumentPath action (alternative to searchQuery) includeSubfolders (bool, optional): Whether to include subfolders (default: False) expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: connectionReference = parameters.get("connectionReference") searchQuery = parameters.get("searchQuery", "*") resultDocument = parameters.get("resultDocument") includeSubfolders = parameters.get("includeSubfolders", False) # Default to False for better UX expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not connectionReference: return ActionResult.isFailure(error="Connection reference is required") # If resultDocument is provided, resolve the reference and extract folder IDs from it if resultDocument: try: import json # Resolve the reference label to get the actual document list document_list = self.service.getChatDocumentsFromDocumentList([resultDocument]) if not document_list or len(document_list) == 0: return ActionResult.isFailure(error=f"No document list found for reference: {resultDocument}") # Get the first document's content (which should be the JSON) first_document = document_list[0] logger.info(f"Document fileId: {first_document.fileId}, fileName: {first_document.fileName}") file_data = self.service.getFileData(first_document.fileId) if not file_data: return ActionResult.isFailure(error=f"No file data found for document: {resultDocument} (fileId: {first_document.fileId})") logger.info(f"File data length: {len(file_data) if file_data else 0}") # Parse the JSON content result_data = json.loads(file_data) found_documents = result_data.get("foundDocuments", []) # Extract folder IDs from the result folder_ids = [] for doc in found_documents: if doc.get("type") == "folder": folder_ids.append(doc.get("id")) if folder_ids: # Use the first folder ID found searchQuery = folder_ids[0] logger.info(f"Using folder ID from resultDocument: {searchQuery}") else: return ActionResult.isFailure(error="No folders found in resultDocument") except json.JSONDecodeError as e: return ActionResult.isFailure(error=f"Invalid JSON in resultDocument: {str(e)}") except Exception as e: return ActionResult.isFailure(error=f"Error resolving resultDocument reference: {str(e)}") # Get Microsoft connection connection = self._getMicrosoftConnection(connectionReference) if not connection: return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference") logger.info(f"Starting SharePoint listDocuments for searchQuery: {searchQuery}") logger.debug(f"Connection ID: {connection['id']}") # Parse searchQuery to extract path, search terms, search type, and options pathQuery, fileQuery, searchType, searchOptions = self._parseSearchQuery(searchQuery) # Discover all SharePoint sites accessible to the user sites = await self._discoverSharePointSites(connection["accessToken"]) if not sites: return ActionResult.isFailure(error="No SharePoint sites found or accessible") # Check if searchQuery is a folder ID (starts with 01PPXICCB...) if searchQuery.startswith('01PPXICCB') or searchQuery.startswith('01'): # Direct folder ID - use it directly folder_paths = [searchQuery] logger.info(f"Using direct folder ID: {searchQuery}") else: # Resolve path query into folder paths folder_paths = self._resolvePathQuery(pathQuery) logger.info(f"Resolved folder paths: {folder_paths}") # Process each folder path across all sites list_results = [] for folderPath in folder_paths: try: folder_results = [] for site in sites: site_id = site["id"] site_name = site["displayName"] site_url = site["webUrl"] logger.info(f"Listing folder {folderPath} in site: {site_name}") # Determine the endpoint based on folder path if folderPath in ["/", ""] or folderPath == "*": # Root folder endpoint = f"sites/{site_id}/drive/root/children" elif folderPath.startswith('01PPXICCB') or folderPath.startswith('01'): # Direct folder ID endpoint = f"sites/{site_id}/drive/items/{folderPath}/children" else: # Specific folder path - remove leading slash if present folder_path_clean = folderPath.lstrip('/') endpoint = f"sites/{site_id}/drive/root:/{folder_path_clean}:/children" # Make the API call to list folder contents api_result = await self._makeGraphApiCall(connection["accessToken"], endpoint) if "error" in api_result: logger.warning(f"Failed to list folder {folderPath} in site {site_name}: {api_result['error']}") continue # Process the results items = api_result.get("value", []) processed_items = [] for item in items: item_info = { "id": item.get("id"), "name": item.get("name"), "size": item.get("size", 0), "createdDateTime": item.get("createdDateTime"), "lastModifiedDateTime": item.get("lastModifiedDateTime"), "webUrl": item.get("webUrl"), "type": "folder" if "folder" in item else "file", "siteName": site_name, "siteUrl": site_url } # Add file-specific information if "file" in item: item_info.update({ "mimeType": item["file"].get("mimeType"), "downloadUrl": item.get("@microsoft.graph.downloadUrl") }) # Add folder-specific information if "folder" in item: item_info.update({ "childCount": item["folder"].get("childCount", 0) }) processed_items.append(item_info) # If include subfolders is enabled, get ONLY direct subfolder contents (1 level deep only) if includeSubfolders: logger.info(f"Including subfolders - processing {len([item for item in processed_items if item['type'] == 'folder'])} folders") subfolder_count = 0 max_subfolders = 10 # Limit to prevent infinite loops for item in processed_items[:]: # Use slice to avoid modifying list during iteration if item["type"] == "folder" and subfolder_count < max_subfolders: subfolder_count += 1 subfolder_path = f"{folderPath.rstrip('/')}/{item['name']}" subfolder_endpoint = f"sites/{site_id}/drive/items/{item['id']}/children" logger.debug(f"Getting contents of subfolder: {item['name']}") subfolder_result = await self._makeGraphApiCall(connection["accessToken"], subfolder_endpoint) if "error" not in subfolder_result: subfolder_items = subfolder_result.get("value", []) logger.debug(f"Found {len(subfolder_items)} items in subfolder {item['name']}") for subfolder_item in subfolder_items: # Only add files and direct subfolders, NO RECURSION subfolder_item_info = { "id": subfolder_item.get("id"), "name": subfolder_item.get("name"), "size": subfolder_item.get("size", 0), "createdDateTime": subfolder_item.get("createdDateTime"), "lastModifiedDateTime": subfolder_item.get("lastModifiedDateTime"), "webUrl": subfolder_item.get("webUrl"), "type": "folder" if "folder" in subfolder_item else "file", "parentPath": subfolder_path, "siteName": site_name, "siteUrl": site_url } if "file" in subfolder_item: subfolder_item_info.update({ "mimeType": subfolder_item["file"].get("mimeType"), "downloadUrl": subfolder_item.get("@microsoft.graph.downloadUrl") }) processed_items.append(subfolder_item_info) else: logger.warning(f"Failed to get contents of subfolder {item['name']}: {subfolder_result.get('error')}") elif subfolder_count >= max_subfolders: logger.warning(f"Reached maximum subfolder limit ({max_subfolders}), skipping remaining folders") break logger.info(f"Processed {subfolder_count} subfolders, total items: {len(processed_items)}") folder_results.append({ "siteName": site_name, "siteUrl": site_url, "itemCount": len(processed_items), "items": processed_items }) list_results.append({ "folderPath": folderPath, "sitesProcessed": len(folder_results), "siteResults": folder_results }) except Exception as e: logger.error(f"Error listing folder {folderPath}: {str(e)}") list_results.append({ "folderPath": folderPath, "error": str(e), "siteResults": [] }) # Create result data result_data = { "searchQuery": searchQuery, "includeSubfolders": includeSubfolders, "sitesSearched": len(sites), "listResults": list_results, "timestamp": get_utc_timestamp() } # Determine output format based on expected formats output_extension = ".json" # Default output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", ".json") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info("No expected format specified, using default .json format") return ActionResult( success=True, documents=[ { "documentName": f"sharepoint_document_list_{self._format_timestamp_for_filename()}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] ) except Exception as e: logger.error(f"Error listing SharePoint documents: {str(e)}") return ActionResult( success=False, error=str(e) )