1112 lines
No EOL
57 KiB
Python
1112 lines
No EOL
57 KiB
Python
"""
|
|
SharePoint operations method module.
|
|
Handles SharePoint document operations using the SharePoint service.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import re
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, UTC
|
|
import base64
|
|
from urllib.parse import urlparse
|
|
import aiohttp
|
|
import asyncio
|
|
|
|
from modules.chat.methodBase import MethodBase, action
|
|
from modules.interfaces.interfaceChatModel import ActionResult
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MethodSharepoint(MethodBase):
|
|
"""SharePoint operations methods."""
|
|
|
|
def __init__(self, service):
|
|
super().__init__(service)
|
|
self.name = "sharepoint"
|
|
self.description = "SharePoint operations methods"
|
|
|
|
def _format_timestamp_for_filename(self) -> str:
|
|
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
|
|
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
|
|
|
|
def _getMicrosoftConnection(self, connectionReference: str) -> Optional[Dict[str, Any]]:
|
|
"""Get Microsoft connection from connection reference"""
|
|
try:
|
|
userConnection = self.service.getUserConnectionFromConnectionReference(connectionReference)
|
|
if not userConnection:
|
|
logger.warning(f"No user connection found for reference: {connectionReference}")
|
|
return None
|
|
|
|
if userConnection.authority.value != "msft":
|
|
logger.warning(f"Connection {userConnection.id} is not Microsoft (authority: {userConnection.authority.value})")
|
|
return None
|
|
|
|
# Check if connection is active or pending (pending means OAuth in progress)
|
|
if userConnection.status.value not in ["active", "pending"]:
|
|
logger.warning(f"Connection {userConnection.id} status is not active/pending: {userConnection.status.value}")
|
|
return None
|
|
|
|
# Get the token for this specific connection
|
|
token = self.service.interfaceApp.getConnectionToken(userConnection.id)
|
|
if not token:
|
|
logger.warning(f"No token found for connection {userConnection.id}")
|
|
return None
|
|
|
|
# Check if token is expired
|
|
if hasattr(token, 'expiresAt') and token.expiresAt:
|
|
current_time = get_utc_timestamp()
|
|
if current_time > token.expiresAt:
|
|
logger.warning(f"Token for connection {userConnection.id} is expired (expiresAt: {token.expiresAt}, current: {current_time})")
|
|
return None
|
|
|
|
logger.info(f"Successfully retrieved Microsoft connection: {userConnection.id}, status: {userConnection.status.value}, externalId: {userConnection.externalId}")
|
|
|
|
return {
|
|
"id": userConnection.id,
|
|
"userConnection": userConnection,
|
|
"accessToken": token.tokenAccess,
|
|
"refreshToken": token.tokenRefresh,
|
|
"scopes": ["Sites.ReadWrite.All", "Files.ReadWrite.All", "User.Read"] # SharePoint scopes
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting Microsoft connection: {str(e)}")
|
|
return None
|
|
|
|
async def _discoverSharePointSites(self, access_token: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Discover all SharePoint sites accessible to the user via Microsoft Graph API
|
|
|
|
Parameters:
|
|
access_token (str): Microsoft Graph access token
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: List of SharePoint site information
|
|
"""
|
|
try:
|
|
# Query Microsoft Graph to get all sites the user has access to
|
|
endpoint = "sites?search=*"
|
|
result = await self._makeGraphApiCall(access_token, endpoint)
|
|
|
|
if "error" in result:
|
|
logger.error(f"Error discovering SharePoint sites: {result['error']}")
|
|
return []
|
|
|
|
sites = result.get("value", [])
|
|
logger.info(f"Discovered {len(sites)} SharePoint sites")
|
|
|
|
# Process and return site information
|
|
processed_sites = []
|
|
for site in sites:
|
|
site_info = {
|
|
"id": site.get("id"),
|
|
"displayName": site.get("displayName"),
|
|
"name": site.get("name"),
|
|
"webUrl": site.get("webUrl"),
|
|
"description": site.get("description"),
|
|
"createdDateTime": site.get("createdDateTime"),
|
|
"lastModifiedDateTime": site.get("lastModifiedDateTime")
|
|
}
|
|
processed_sites.append(site_info)
|
|
logger.debug(f"Site: {site_info['displayName']} - {site_info['webUrl']}")
|
|
|
|
return processed_sites
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error discovering SharePoint sites: {str(e)}")
|
|
return []
|
|
|
|
def _parseSearchQuery(self, searchQuery: str) -> tuple[str, str, str, dict]:
|
|
"""
|
|
Parse searchQuery to extract path, search terms, search type, and search options.
|
|
|
|
Parameters:
|
|
searchQuery (str): Enhanced search query with options:
|
|
- "budget" -> pathQuery="*", fileQuery="budget", searchType="all", options={}
|
|
- "/Documents:budget" -> pathQuery="/Documents", fileQuery="budget", searchType="all", options={}
|
|
- "files:budget" -> pathQuery="*", fileQuery="budget", searchType="files", options={}
|
|
- "folders:DELTA" -> pathQuery="*", fileQuery="DELTA", searchType="folders", options={}
|
|
- "exact:\"Operations 2025\"" -> exact phrase matching
|
|
- "regex:^Operations.*2025$" -> regex pattern matching
|
|
- "case:DELTA" -> case-sensitive search
|
|
- "and:DELTA AND 2025 Mars AND Group" -> all AND terms must be present
|
|
|
|
Returns:
|
|
tuple[str, str, str, dict]: (pathQuery, fileQuery, searchType, searchOptions)
|
|
"""
|
|
try:
|
|
if not searchQuery or not searchQuery.strip() or searchQuery.strip() == "*":
|
|
return "*", "*", "all", {}
|
|
|
|
searchQuery = searchQuery.strip()
|
|
searchOptions = {}
|
|
|
|
# Check for search type specification (files:, folders:, all:)
|
|
searchType = "all" # Default
|
|
if searchQuery.startswith(("files:", "folders:", "all:")):
|
|
type_parts = searchQuery.split(':', 1)
|
|
searchType = type_parts[0].strip()
|
|
searchQuery = type_parts[1].strip()
|
|
|
|
# Check for search mode specification (exact:, regex:, case:, and:)
|
|
if searchQuery.startswith(("exact:", "regex:", "case:", "and:")):
|
|
mode_parts = searchQuery.split(':', 1)
|
|
mode = mode_parts[0].strip()
|
|
searchQuery = mode_parts[1].strip()
|
|
|
|
if mode == "exact":
|
|
searchOptions["exact_match"] = True
|
|
# Remove quotes if present
|
|
if searchQuery.startswith('"') and searchQuery.endswith('"'):
|
|
searchQuery = searchQuery[1:-1]
|
|
elif mode == "regex":
|
|
searchOptions["regex_match"] = True
|
|
elif mode == "case":
|
|
searchOptions["case_sensitive"] = True
|
|
elif mode == "and":
|
|
searchOptions["and_terms"] = True
|
|
|
|
# Check if it contains path:search format
|
|
if ':' in searchQuery:
|
|
parts = searchQuery.split(':', 1) # Split only on first colon
|
|
path_part = parts[0].strip()
|
|
search_part = parts[1].strip()
|
|
|
|
# Handle path part
|
|
if not path_part or path_part == "*":
|
|
pathQuery = "*"
|
|
elif path_part.startswith('/'):
|
|
pathQuery = path_part
|
|
else:
|
|
pathQuery = f"/Documents/{path_part}"
|
|
|
|
# Handle search part
|
|
if not search_part or search_part == "*":
|
|
fileQuery = "*"
|
|
else:
|
|
fileQuery = search_part
|
|
|
|
return pathQuery, fileQuery, searchType, searchOptions
|
|
|
|
# No colon - check if it looks like a path
|
|
elif searchQuery.startswith('/'):
|
|
# It's a path only
|
|
return searchQuery, "*", searchType, searchOptions
|
|
|
|
else:
|
|
# It's a search term only
|
|
return "*", searchQuery, searchType, searchOptions
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing searchQuery '{searchQuery}': {str(e)}")
|
|
return "*", "*", "all", {}
|
|
|
|
def _resolvePathQuery(self, pathQuery: str) -> List[str]:
|
|
"""
|
|
Resolve pathQuery into a list of search paths for SharePoint operations.
|
|
|
|
Parameters:
|
|
pathQuery (str): Query string that can contain:
|
|
- Direct paths (e.g., "/Documents/Project1")
|
|
- Wildcards (e.g., "/Documents/*")
|
|
- Multiple paths separated by semicolons (e.g., "/Docs; /Files")
|
|
- Relative paths (e.g., "Project1" -> resolved to default folder)
|
|
- Empty string or "*" for global search
|
|
|
|
Returns:
|
|
List[str]: List of resolved paths
|
|
"""
|
|
try:
|
|
if not pathQuery or not pathQuery.strip() or pathQuery.strip() == "*":
|
|
return ["*"] # Global search across all sites
|
|
|
|
# Split by semicolon to handle multiple paths
|
|
raw_paths = [path.strip() for path in pathQuery.split(';') if path.strip()]
|
|
resolved_paths = []
|
|
|
|
for raw_path in raw_paths:
|
|
# Handle wildcards - return as-is
|
|
if '*' in raw_path:
|
|
resolved_paths.append(raw_path)
|
|
# Handle absolute paths
|
|
elif raw_path.startswith('/'):
|
|
resolved_paths.append(raw_path)
|
|
# Handle relative paths - prepend default folder
|
|
else:
|
|
resolved_paths.append(f"/Documents/{raw_path}")
|
|
|
|
# Remove duplicates while preserving order
|
|
seen = set()
|
|
unique_paths = []
|
|
for path in resolved_paths:
|
|
if path not in seen:
|
|
seen.add(path)
|
|
unique_paths.append(path)
|
|
|
|
logger.info(f"Resolved pathQuery '{pathQuery}' to {len(unique_paths)} paths: {unique_paths}")
|
|
return unique_paths
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error resolving pathQuery '{pathQuery}': {str(e)}")
|
|
return ["*"] # Fallback to global search
|
|
|
|
def _parseSiteUrl(self, siteUrl: str) -> Dict[str, str]:
|
|
"""Parse SharePoint site URL to extract hostname and site path"""
|
|
try:
|
|
parsed = urlparse(siteUrl)
|
|
hostname = parsed.hostname
|
|
path = parsed.path.strip('/')
|
|
|
|
return {
|
|
"hostname": hostname,
|
|
"sitePath": path
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error parsing site URL {siteUrl}: {str(e)}")
|
|
return {"hostname": "", "sitePath": ""}
|
|
|
|
async def _makeGraphApiCall(self, access_token: str, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]:
|
|
"""Make a Microsoft Graph API call with timeout and detailed logging"""
|
|
try:
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json"
|
|
}
|
|
|
|
url = f"https://graph.microsoft.com/v1.0/{endpoint}"
|
|
logger.info(f"Making Graph API call: {method} {url}")
|
|
|
|
# Set timeout to 30 seconds
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
if method == "GET":
|
|
logger.debug(f"Starting GET request to {url}")
|
|
async with session.get(url, headers=headers) as response:
|
|
logger.info(f"Graph API response: {response.status}")
|
|
if response.status == 200:
|
|
result = await response.json()
|
|
logger.debug(f"Graph API success: {len(str(result))} characters response")
|
|
return result
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(f"Graph API call failed: {response.status} - {error_text}")
|
|
return {"error": f"API call failed: {response.status} - {error_text}"}
|
|
|
|
elif method == "PUT":
|
|
logger.debug(f"Starting PUT request to {url}")
|
|
async with session.put(url, headers=headers, data=data) as response:
|
|
logger.info(f"Graph API response: {response.status}")
|
|
if response.status in [200, 201]:
|
|
result = await response.json()
|
|
logger.debug(f"Graph API success: {len(str(result))} characters response")
|
|
return result
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(f"Graph API call failed: {response.status} - {error_text}")
|
|
return {"error": f"API call failed: {response.status} - {error_text}"}
|
|
|
|
elif method == "POST":
|
|
logger.debug(f"Starting POST request to {url}")
|
|
async with session.post(url, headers=headers, data=data) as response:
|
|
logger.info(f"Graph API response: {response.status}")
|
|
if response.status in [200, 201]:
|
|
result = await response.json()
|
|
logger.debug(f"Graph API success: {len(str(result))} characters response")
|
|
return result
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(f"Graph API call failed: {response.status} - {error_text}")
|
|
return {"error": f"API call failed: {response.status} - {error_text}"}
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.error(f"Graph API call timed out after 30 seconds: {endpoint}")
|
|
return {"error": f"API call timed out after 30 seconds: {endpoint}"}
|
|
except Exception as e:
|
|
logger.error(f"Error making Graph API call: {str(e)}")
|
|
return {"error": f"Error making Graph API call: {str(e)}"}
|
|
|
|
async def _getSiteId(self, access_token: str, hostname: str, site_path: str) -> str:
|
|
"""Get SharePoint site ID from hostname and site path"""
|
|
try:
|
|
endpoint = f"sites/{hostname}:/{site_path}"
|
|
result = await self._makeGraphApiCall(access_token, endpoint)
|
|
|
|
if "error" in result:
|
|
logger.error(f"Error getting site ID: {result['error']}")
|
|
return ""
|
|
|
|
return result.get("id", "")
|
|
except Exception as e:
|
|
logger.error(f"Error getting site ID: {str(e)}")
|
|
return ""
|
|
|
|
@action
|
|
async def findDocumentPath(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Find documents by searching their content, names, or metadata across all accessible SharePoint sites
|
|
|
|
Parameters:
|
|
connectionReference (str): Reference to the Microsoft connection
|
|
searchQuery (str): [path:][type:][mode:]query - Enhanced search syntax:
|
|
- "budget", "/Documents:budget", "files:budget", "folders:DELTA", "*"
|
|
- "exact:\"Operations 2025\"" - exact phrase matching
|
|
- "regex:^Operations.*2025$" - regex pattern matching
|
|
- "case:DELTA" - case-sensitive search
|
|
- "and:DELTA AND 2025 Mars AND Group" - all terms must be present
|
|
- "folders:and:DELTA AND 2025 Mars AND Group" - combined options
|
|
Note: For storage locations, use "folders:" prefix. All search terms must be present by default.
|
|
searchScope (str, optional): Search scope - options: "all" (default), "documents" (files only), "pages" (SharePoint pages only)
|
|
maxResults (int, optional): Maximum number of results to return (default: 100)
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
connectionReference = parameters.get("connectionReference")
|
|
searchQuery = parameters.get("searchQuery", "*")
|
|
searchScope = parameters.get("searchScope", "all")
|
|
maxResults = parameters.get("maxResults", 100)
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not connectionReference:
|
|
return ActionResult.isFailure(error="Connection reference is required")
|
|
|
|
# Parse searchQuery to extract path, search terms, search type, and options
|
|
pathQuery, fileQuery, searchType, searchOptions = self._parseSearchQuery(searchQuery)
|
|
|
|
connection = self._getMicrosoftConnection(connectionReference)
|
|
if not connection:
|
|
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
|
|
|
|
# Discover all SharePoint sites accessible to the user
|
|
sites = await self._discoverSharePointSites(connection["accessToken"])
|
|
if not sites:
|
|
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
|
|
|
|
# Resolve path query into search paths
|
|
search_paths = self._resolvePathQuery(pathQuery)
|
|
|
|
try:
|
|
# Search across all discovered sites
|
|
found_documents = []
|
|
all_sites_searched = []
|
|
|
|
for site in sites:
|
|
site_id = site["id"]
|
|
site_name = site["displayName"]
|
|
site_url = site["webUrl"]
|
|
|
|
logger.info(f"Searching in site: {site_name} ({site_url})")
|
|
|
|
# Use Microsoft Graph search API for this specific site
|
|
# Handle empty or wildcard queries
|
|
if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*":
|
|
# For wildcard/empty queries, list all items in the drive
|
|
endpoint = f"sites/{site_id}/drive/root/children"
|
|
else:
|
|
# For specific queries, use search API
|
|
search_query = fileQuery.replace("'", "''") # Escape single quotes for OData
|
|
endpoint = f"sites/{site_id}/drive/root/search(q='{search_query}')"
|
|
|
|
# Make the search API call
|
|
search_result = await self._makeGraphApiCall(connection["accessToken"], endpoint)
|
|
|
|
if "error" in search_result:
|
|
logger.warning(f"Search failed for site {site_name}: {search_result['error']}")
|
|
continue
|
|
|
|
# Process search results for this site
|
|
items = search_result.get("value", [])
|
|
site_documents = []
|
|
|
|
for item in items:
|
|
# Filter by search scope if specified
|
|
if searchScope == "documents" and "folder" in item:
|
|
continue
|
|
elif searchScope == "pages" and "file" in item and not item["file"].get("mimeType", "").startswith("text/html"):
|
|
continue
|
|
|
|
# Filter by search type (files, folders, all)
|
|
if searchType == "files" and "folder" in item:
|
|
continue
|
|
elif searchType == "folders" and "file" in item:
|
|
continue
|
|
|
|
# Enhanced post-filtering based on search options
|
|
item_name = item.get("name", "")
|
|
if fileQuery != "*" and fileQuery.strip():
|
|
# Apply different filtering based on search options
|
|
if searchOptions.get("exact_match"):
|
|
# Exact phrase matching
|
|
if searchOptions.get("case_sensitive"):
|
|
if fileQuery not in item_name:
|
|
continue
|
|
else:
|
|
if fileQuery.lower() not in item_name.lower():
|
|
continue
|
|
elif searchOptions.get("regex_match"):
|
|
# Regex pattern matching
|
|
import re
|
|
flags = 0 if searchOptions.get("case_sensitive") else re.IGNORECASE
|
|
if not re.search(fileQuery, item_name, flags):
|
|
continue
|
|
elif searchOptions.get("and_terms"):
|
|
# AND terms mode: Split by " AND " and ensure ALL terms are present
|
|
search_name = item_name.lower() if not searchOptions.get("case_sensitive") else item_name
|
|
and_terms = [term.strip() for term in fileQuery.split(" AND ") if term.strip()]
|
|
and_terms = [term.lower() if not searchOptions.get("case_sensitive") else term for term in and_terms]
|
|
if not all(term in search_name for term in and_terms):
|
|
continue # Skip this item if not all AND terms match
|
|
else:
|
|
# Default: ALL search terms must be present (space-separated)
|
|
search_name = item_name.lower() if not searchOptions.get("case_sensitive") else item_name
|
|
search_terms = [term.strip().lower() if not searchOptions.get("case_sensitive") else term.strip()
|
|
for term in fileQuery.split() if term.strip()]
|
|
if not all(term in search_name for term in search_terms):
|
|
continue # Skip this item if not all terms match
|
|
|
|
# Create minimal result with only essential reference information
|
|
doc_info = {
|
|
"id": item.get("id"),
|
|
"name": item.get("name"),
|
|
"type": "folder" if "folder" in item else "file",
|
|
"siteName": site_name,
|
|
"siteId": site_id
|
|
}
|
|
|
|
site_documents.append(doc_info)
|
|
|
|
found_documents.extend(site_documents)
|
|
all_sites_searched.append({
|
|
"siteName": site_name,
|
|
"siteUrl": site_url,
|
|
"siteId": site_id,
|
|
"documentsFound": len(site_documents)
|
|
})
|
|
|
|
logger.info(f"Found {len(site_documents)} documents in site {site_name}")
|
|
|
|
# Limit total results to maxResults
|
|
if len(found_documents) > maxResults:
|
|
found_documents = found_documents[:maxResults]
|
|
logger.info(f"Limited results to {maxResults} items")
|
|
|
|
result_data = {
|
|
"searchQuery": searchQuery,
|
|
"totalResults": len(found_documents),
|
|
"maxResults": maxResults,
|
|
"foundDocuments": found_documents,
|
|
"timestamp": get_utc_timestamp()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching SharePoint: {str(e)}")
|
|
return ActionResult.isFailure(error=str(e))
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = ".json" # Default
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", ".json")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info("No expected format specified, using default .json format")
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=[
|
|
{
|
|
"documentName": f"sharepoint_find_path_{self._format_timestamp_for_filename()}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error finding document path: {str(e)}")
|
|
return ActionResult.isFailure(error=str(e))
|
|
|
|
@action
|
|
async def readDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Read documents from SharePoint across all accessible sites
|
|
|
|
Parameters:
|
|
documentList (str): Reference to the document list to read
|
|
connectionReference (str): Reference to the Microsoft connection
|
|
pathQuery (str): Path query to locate documents (e.g., "/Documents/Project1", "*" for all sites)
|
|
includeMetadata (bool, optional): Whether to include metadata (default: True)
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
documentList = parameters.get("documentList")
|
|
connectionReference = parameters.get("connectionReference")
|
|
pathQuery = parameters.get("pathQuery", "*")
|
|
includeMetadata = parameters.get("includeMetadata", True)
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not documentList or not connectionReference:
|
|
return ActionResult.isFailure(error="Document list reference and connection reference are required")
|
|
|
|
# Get documents from reference - ensure documentList is a list, not a string
|
|
if isinstance(documentList, str):
|
|
documentList = [documentList] # Convert string to list
|
|
chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
|
|
|
|
if not chatDocuments:
|
|
return ActionResult.isFailure(error="No documents found for the provided reference")
|
|
|
|
connection = self._getMicrosoftConnection(connectionReference)
|
|
if not connection:
|
|
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
|
|
|
|
# Discover all SharePoint sites accessible to the user
|
|
sites = await self._discoverSharePointSites(connection["accessToken"])
|
|
if not sites:
|
|
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
|
|
|
|
# Resolve path query into search paths
|
|
search_paths = self._resolvePathQuery(pathQuery)
|
|
|
|
# Process each chat document across all sites
|
|
read_results = []
|
|
|
|
for i, chatDocument in enumerate(chatDocuments):
|
|
try:
|
|
fileId = chatDocument.fileId
|
|
fileName = chatDocument.fileName
|
|
|
|
# Search for this file across all sites
|
|
file_found = False
|
|
|
|
for site in sites:
|
|
site_id = site["id"]
|
|
site_name = site["displayName"]
|
|
site_url = site["webUrl"]
|
|
|
|
# Try to find the file by name in this site
|
|
search_query = fileName.replace("'", "''") # Escape single quotes for OData
|
|
endpoint = f"sites/{site_id}/drive/root/search(q='{search_query}')"
|
|
|
|
search_result = await self._makeGraphApiCall(connection["accessToken"], endpoint)
|
|
|
|
if "error" in search_result:
|
|
continue
|
|
|
|
items = search_result.get("value", [])
|
|
for item in items:
|
|
if item.get("name") == fileName:
|
|
# Found the file, get its details
|
|
file_id = item.get("id")
|
|
file_endpoint = f"sites/{site_id}/drive/items/{file_id}"
|
|
|
|
# Get file metadata
|
|
file_info_result = await self._makeGraphApiCall(connection["accessToken"], file_endpoint)
|
|
|
|
if "error" in file_info_result:
|
|
continue
|
|
|
|
# Build result with metadata
|
|
result_item = {
|
|
"fileId": fileId,
|
|
"fileName": fileName,
|
|
"sharepointFileId": file_id,
|
|
"siteName": site_name,
|
|
"siteUrl": site_url,
|
|
"size": file_info_result.get("size", 0),
|
|
"createdDateTime": file_info_result.get("createdDateTime"),
|
|
"lastModifiedDateTime": file_info_result.get("lastModifiedDateTime"),
|
|
"webUrl": file_info_result.get("webUrl")
|
|
}
|
|
|
|
# Add metadata if requested
|
|
if includeMetadata:
|
|
result_item["metadata"] = {
|
|
"mimeType": file_info_result.get("file", {}).get("mimeType"),
|
|
"downloadUrl": file_info_result.get("@microsoft.graph.downloadUrl"),
|
|
"createdBy": file_info_result.get("createdBy", {}),
|
|
"lastModifiedBy": file_info_result.get("lastModifiedBy", {}),
|
|
"parentReference": file_info_result.get("parentReference", {})
|
|
}
|
|
|
|
# Get file content if it's a readable format
|
|
mime_type = file_info_result.get("file", {}).get("mimeType", "")
|
|
if mime_type.startswith("text/") or mime_type in [
|
|
"application/json", "application/xml", "application/javascript"
|
|
]:
|
|
# Download the file content
|
|
content_endpoint = f"sites/{site_id}/drive/items/{file_id}/content"
|
|
|
|
# For content download, we need to handle binary data
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
headers = {"Authorization": f"Bearer {connection['accessToken']}"}
|
|
async with session.get(f"https://graph.microsoft.com/v1.0/{content_endpoint}", headers=headers) as response:
|
|
if response.status == 200:
|
|
content = await response.text()
|
|
result_item["content"] = content
|
|
else:
|
|
result_item["content"] = f"Could not download content: HTTP {response.status}"
|
|
except Exception as e:
|
|
result_item["content"] = f"Error downloading content: {str(e)}"
|
|
else:
|
|
result_item["content"] = f"Binary file type ({mime_type}) - content not retrieved"
|
|
|
|
read_results.append(result_item)
|
|
file_found = True
|
|
break
|
|
|
|
if file_found:
|
|
break
|
|
|
|
if not file_found:
|
|
read_results.append({
|
|
"fileId": fileId,
|
|
"fileName": fileName,
|
|
"error": "File not found in any accessible SharePoint site",
|
|
"content": None
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading document {chatDocument.fileName}: {str(e)}")
|
|
read_results.append({
|
|
"fileId": chatDocument.fileId,
|
|
"fileName": chatDocument.fileName,
|
|
"error": str(e),
|
|
"content": None
|
|
})
|
|
|
|
result_data = {
|
|
"connectionReference": connectionReference,
|
|
"pathQuery": pathQuery,
|
|
"documentList": documentList,
|
|
"includeMetadata": includeMetadata,
|
|
"sitesSearched": len(sites),
|
|
"readResults": read_results,
|
|
"connection": {
|
|
"id": connection["id"],
|
|
"authority": "microsoft",
|
|
"reference": connectionReference
|
|
},
|
|
"timestamp": get_utc_timestamp()
|
|
}
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = ".json" # Default
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", ".json")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info("No expected format specified, using default .json format")
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=[
|
|
{
|
|
"documentName": f"sharepoint_documents_{self._format_timestamp_for_filename()}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error reading SharePoint documents: {str(e)}")
|
|
return ActionResult(
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
@action
|
|
async def uploadDocument(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Upload documents to SharePoint across accessible sites
|
|
|
|
Parameters:
|
|
connectionReference (str): Reference to the Microsoft connection
|
|
pathQuery (str): Path query where to upload documents (e.g., "/Documents/Project1", "*" for default location)
|
|
documentList (str): Reference to the document list to upload
|
|
fileNames (List[str]): List of names for the uploaded files
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
connectionReference = parameters.get("connectionReference")
|
|
pathQuery = parameters.get("pathQuery", "/Documents")
|
|
documentList = parameters.get("documentList")
|
|
fileNames = parameters.get("fileNames")
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not connectionReference or not documentList or not fileNames:
|
|
return ActionResult.isFailure(error="Connection reference, document list, and file names are required")
|
|
|
|
# Get Microsoft connection
|
|
connection = self._getMicrosoftConnection(connectionReference)
|
|
if not connection:
|
|
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
|
|
|
|
# Get documents from reference - ensure documentList is a list, not a string
|
|
if isinstance(documentList, str):
|
|
documentList = [documentList] # Convert string to list
|
|
chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
|
|
if not chatDocuments:
|
|
return ActionResult.isFailure(error="No documents found for the provided reference")
|
|
|
|
# Discover all SharePoint sites accessible to the user
|
|
sites = await self._discoverSharePointSites(connection["accessToken"])
|
|
if not sites:
|
|
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
|
|
|
|
# Resolve path query into upload paths
|
|
upload_paths = self._resolvePathQuery(pathQuery)
|
|
|
|
# Process each document upload
|
|
upload_results = []
|
|
|
|
for i, (chatDocument, fileName) in enumerate(zip(chatDocuments, fileNames)):
|
|
try:
|
|
fileId = chatDocument.fileId
|
|
file_data = self.service.getFileData(fileId)
|
|
|
|
if not file_data:
|
|
logger.warning(f"File data not found for fileId: {fileId}")
|
|
upload_results.append({
|
|
"fileName": fileName,
|
|
"fileId": fileId,
|
|
"error": "File data not found",
|
|
"uploadStatus": "failed"
|
|
})
|
|
continue
|
|
|
|
# Upload to the first available site (or could be made configurable)
|
|
upload_successful = False
|
|
|
|
for site in sites:
|
|
site_id = site["id"]
|
|
site_name = site["displayName"]
|
|
site_url = site["webUrl"]
|
|
|
|
# Use the first upload path or default to Documents
|
|
upload_path = upload_paths[0] if upload_paths else "/Documents"
|
|
upload_path = upload_path.rstrip('/') + '/' + fileName
|
|
upload_path_clean = upload_path.lstrip('/')
|
|
|
|
# Upload endpoint for small files (< 4MB)
|
|
if len(file_data) < 4 * 1024 * 1024: # 4MB
|
|
upload_endpoint = f"sites/{site_id}/drive/root:/{upload_path_clean}:/content"
|
|
|
|
# Upload the file
|
|
upload_result = await self._makeGraphApiCall(
|
|
connection["accessToken"],
|
|
upload_endpoint,
|
|
method="PUT",
|
|
data=file_data
|
|
)
|
|
|
|
if "error" not in upload_result:
|
|
upload_results.append({
|
|
"fileName": fileName,
|
|
"fileId": fileId,
|
|
"uploadStatus": "success",
|
|
"siteName": site_name,
|
|
"siteUrl": site_url,
|
|
"uploadPath": upload_path,
|
|
"sharepointFileId": upload_result.get("id"),
|
|
"webUrl": upload_result.get("webUrl"),
|
|
"size": upload_result.get("size"),
|
|
"createdDateTime": upload_result.get("createdDateTime")
|
|
})
|
|
upload_successful = True
|
|
break
|
|
else:
|
|
logger.warning(f"Upload failed to site {site_name}: {upload_result['error']}")
|
|
else:
|
|
# For large files, we would need to implement resumable upload
|
|
logger.warning(f"File too large ({len(file_data)} bytes) for site {site_name}")
|
|
continue
|
|
|
|
if not upload_successful:
|
|
upload_results.append({
|
|
"fileName": fileName,
|
|
"fileId": fileId,
|
|
"error": f"File too large ({len(file_data)} bytes) or upload failed to all sites. Files larger than 4MB require resumable upload (not implemented).",
|
|
"uploadStatus": "failed"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error uploading document {fileName}: {str(e)}")
|
|
upload_results.append({
|
|
"fileName": fileName,
|
|
"fileId": fileId,
|
|
"error": str(e),
|
|
"uploadStatus": "failed"
|
|
})
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"connectionReference": connectionReference,
|
|
"pathQuery": pathQuery,
|
|
"documentList": documentList,
|
|
"fileNames": fileNames,
|
|
"sitesAvailable": len(sites),
|
|
"uploadResults": upload_results,
|
|
"connection": {
|
|
"id": connection["id"],
|
|
"authority": "microsoft",
|
|
"reference": connectionReference
|
|
},
|
|
"timestamp": get_utc_timestamp()
|
|
}
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = ".json" # Default
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", ".json")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info("No expected format specified, using default .json format")
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=[
|
|
{
|
|
"documentName": f"sharepoint_upload_{self._format_timestamp_for_filename()}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error uploading to SharePoint: {str(e)}")
|
|
return ActionResult(
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
@action
|
|
async def listDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
List documents in SharePoint folders across accessible sites
|
|
|
|
Parameters:
|
|
connectionReference (str): Reference to the Microsoft connection
|
|
pathQuery (str): Path query to list folders (e.g., "/Documents", "/Shared Documents/Project1", "*" for all sites)
|
|
includeSubfolders (bool, optional): Whether to include subfolders (default: False)
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
connectionReference = parameters.get("connectionReference")
|
|
pathQuery = parameters.get("pathQuery", "*")
|
|
includeSubfolders = parameters.get("includeSubfolders", False) # Default to False for better UX
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not connectionReference:
|
|
return ActionResult.isFailure(error="Connection reference is required")
|
|
|
|
# Get Microsoft connection
|
|
connection = self._getMicrosoftConnection(connectionReference)
|
|
if not connection:
|
|
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
|
|
|
|
logger.info(f"Starting SharePoint listDocuments for pathQuery: {pathQuery}")
|
|
logger.debug(f"Connection ID: {connection['id']}")
|
|
|
|
# Discover all SharePoint sites accessible to the user
|
|
sites = await self._discoverSharePointSites(connection["accessToken"])
|
|
if not sites:
|
|
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
|
|
|
|
# Resolve path query into folder paths
|
|
folder_paths = self._resolvePathQuery(pathQuery)
|
|
logger.info(f"Resolved folder paths: {folder_paths}")
|
|
|
|
# Process each folder path across all sites
|
|
list_results = []
|
|
|
|
for folderPath in folder_paths:
|
|
try:
|
|
folder_results = []
|
|
|
|
for site in sites:
|
|
site_id = site["id"]
|
|
site_name = site["displayName"]
|
|
site_url = site["webUrl"]
|
|
|
|
logger.info(f"Listing folder {folderPath} in site: {site_name}")
|
|
|
|
# Determine the endpoint based on folder path
|
|
if folderPath in ["/", ""] or folderPath == "*":
|
|
# Root folder
|
|
endpoint = f"sites/{site_id}/drive/root/children"
|
|
else:
|
|
# Specific folder - remove leading slash if present
|
|
folder_path_clean = folderPath.lstrip('/')
|
|
endpoint = f"sites/{site_id}/drive/root:/{folder_path_clean}:/children"
|
|
|
|
# Make the API call to list folder contents
|
|
api_result = await self._makeGraphApiCall(connection["accessToken"], endpoint)
|
|
|
|
if "error" in api_result:
|
|
logger.warning(f"Failed to list folder {folderPath} in site {site_name}: {api_result['error']}")
|
|
continue
|
|
|
|
# Process the results
|
|
items = api_result.get("value", [])
|
|
processed_items = []
|
|
|
|
for item in items:
|
|
item_info = {
|
|
"id": item.get("id"),
|
|
"name": item.get("name"),
|
|
"size": item.get("size", 0),
|
|
"createdDateTime": item.get("createdDateTime"),
|
|
"lastModifiedDateTime": item.get("lastModifiedDateTime"),
|
|
"webUrl": item.get("webUrl"),
|
|
"type": "folder" if "folder" in item else "file",
|
|
"siteName": site_name,
|
|
"siteUrl": site_url
|
|
}
|
|
|
|
# Add file-specific information
|
|
if "file" in item:
|
|
item_info.update({
|
|
"mimeType": item["file"].get("mimeType"),
|
|
"downloadUrl": item.get("@microsoft.graph.downloadUrl")
|
|
})
|
|
|
|
# Add folder-specific information
|
|
if "folder" in item:
|
|
item_info.update({
|
|
"childCount": item["folder"].get("childCount", 0)
|
|
})
|
|
|
|
processed_items.append(item_info)
|
|
|
|
# If include subfolders is enabled, get ONLY direct subfolder contents (1 level deep only)
|
|
if includeSubfolders:
|
|
logger.info(f"Including subfolders - processing {len([item for item in processed_items if item['type'] == 'folder'])} folders")
|
|
subfolder_count = 0
|
|
max_subfolders = 10 # Limit to prevent infinite loops
|
|
|
|
for item in processed_items[:]: # Use slice to avoid modifying list during iteration
|
|
if item["type"] == "folder" and subfolder_count < max_subfolders:
|
|
subfolder_count += 1
|
|
subfolder_path = f"{folderPath.rstrip('/')}/{item['name']}"
|
|
subfolder_endpoint = f"sites/{site_id}/drive/items/{item['id']}/children"
|
|
|
|
logger.debug(f"Getting contents of subfolder: {item['name']}")
|
|
subfolder_result = await self._makeGraphApiCall(connection["accessToken"], subfolder_endpoint)
|
|
if "error" not in subfolder_result:
|
|
subfolder_items = subfolder_result.get("value", [])
|
|
logger.debug(f"Found {len(subfolder_items)} items in subfolder {item['name']}")
|
|
|
|
for subfolder_item in subfolder_items:
|
|
# Only add files and direct subfolders, NO RECURSION
|
|
subfolder_item_info = {
|
|
"id": subfolder_item.get("id"),
|
|
"name": subfolder_item.get("name"),
|
|
"size": subfolder_item.get("size", 0),
|
|
"createdDateTime": subfolder_item.get("createdDateTime"),
|
|
"lastModifiedDateTime": subfolder_item.get("lastModifiedDateTime"),
|
|
"webUrl": subfolder_item.get("webUrl"),
|
|
"type": "folder" if "folder" in subfolder_item else "file",
|
|
"parentPath": subfolder_path,
|
|
"siteName": site_name,
|
|
"siteUrl": site_url
|
|
}
|
|
|
|
if "file" in subfolder_item:
|
|
subfolder_item_info.update({
|
|
"mimeType": subfolder_item["file"].get("mimeType"),
|
|
"downloadUrl": subfolder_item.get("@microsoft.graph.downloadUrl")
|
|
})
|
|
|
|
processed_items.append(subfolder_item_info)
|
|
else:
|
|
logger.warning(f"Failed to get contents of subfolder {item['name']}: {subfolder_result.get('error')}")
|
|
elif subfolder_count >= max_subfolders:
|
|
logger.warning(f"Reached maximum subfolder limit ({max_subfolders}), skipping remaining folders")
|
|
break
|
|
|
|
logger.info(f"Processed {subfolder_count} subfolders, total items: {len(processed_items)}")
|
|
|
|
folder_results.append({
|
|
"siteName": site_name,
|
|
"siteUrl": site_url,
|
|
"itemCount": len(processed_items),
|
|
"items": processed_items
|
|
})
|
|
|
|
list_results.append({
|
|
"folderPath": folderPath,
|
|
"sitesProcessed": len(folder_results),
|
|
"siteResults": folder_results
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing folder {folderPath}: {str(e)}")
|
|
list_results.append({
|
|
"folderPath": folderPath,
|
|
"error": str(e),
|
|
"siteResults": []
|
|
})
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"connectionReference": connectionReference,
|
|
"pathQuery": pathQuery,
|
|
"includeSubfolders": includeSubfolders,
|
|
"sitesSearched": len(sites),
|
|
"listResults": list_results,
|
|
"connection": {
|
|
"id": connection["id"],
|
|
"authority": "microsoft",
|
|
"reference": connectionReference
|
|
},
|
|
"timestamp": get_utc_timestamp()
|
|
}
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = ".json" # Default
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", ".json")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info("No expected format specified, using default .json format")
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=[
|
|
{
|
|
"documentName": f"sharepoint_document_list_{self._format_timestamp_for_filename()}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing SharePoint documents: {str(e)}")
|
|
return ActionResult(
|
|
success=False,
|
|
error=str(e)
|
|
) |