gateway/modules/workflows/methods/methodSharepoint.py.old
2025-12-17 10:45:09 +01:00

2840 lines
No EOL
147 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
SharePoint operations method module.
Handles SharePoint document operations using the SharePoint service.
"""
import logging
import re
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC, timedelta, timezone
import urllib
import aiohttp
import asyncio
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
logger = logging.getLogger(__name__)
class MethodSharepoint(MethodBase):
"""SharePoint operations methods."""
def __init__(self, services):
super().__init__(services)
self.name = "sharepoint"
self.description = "SharePoint operations methods"
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
def _getMicrosoftConnection(self, connectionReference: str) -> Optional[Dict[str, Any]]:
"""Get Microsoft connection from connection reference and configure SharePoint service"""
try:
userConnection = self.services.chat.getUserConnectionFromConnectionReference(connectionReference)
if not userConnection:
logger.warning(f"No user connection found for reference: {connectionReference}")
return None
if userConnection.authority.value != "msft":
logger.warning(f"Connection {userConnection.id} is not Microsoft (authority: {userConnection.authority.value})")
return None
# Check if connection is active or pending (pending means OAuth in progress)
if userConnection.status.value not in ["active", "pending"]:
logger.warning(f"Connection {userConnection.id} status is not active/pending: {userConnection.status.value}")
return None
# Configure SharePoint service with the UserConnection
if not self.services.sharepoint.setAccessTokenFromConnection(userConnection):
logger.warning(f"Failed to configure SharePoint service with connection {userConnection.id}")
return None
logger.info(f"Successfully configured SharePoint service with Microsoft connection: {userConnection.id}, status: {userConnection.status.value}, externalId: {userConnection.externalId}")
return {
"id": userConnection.id,
"userConnection": userConnection,
"scopes": ["Sites.ReadWrite.All", "Files.ReadWrite.All", "User.Read"] # SharePoint scopes
}
except Exception as e:
logger.error(f"Error getting Microsoft connection: {str(e)}")
return None
async def _discoverSharePointSites(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Discover SharePoint sites accessible to the user via Microsoft Graph API
Parameters:
limit (Optional[int]): Limit number of sites to return (for optimization when only hostname is needed)
Returns:
List[Dict[str, Any]]: List of SharePoint site information
"""
try:
# Query Microsoft Graph to get sites the user has access to
endpoint = "sites?search=*"
if limit:
endpoint += f"&$top={limit}"
result = await self._makeGraphApiCall(endpoint)
if "error" in result:
logger.error(f"Error discovering SharePoint sites: {result['error']}")
return []
sites = result.get("value", [])
if limit:
sites = sites[:limit]
logger.info(f"Discovered {len(sites)} SharePoint sites" + (f" (limited to {limit})" if limit else ""))
# Process and return site information
processedSites = []
for site in sites:
siteInfo = {
"id": site.get("id"),
"displayName": site.get("displayName"),
"name": site.get("name"),
"webUrl": site.get("webUrl"),
"description": site.get("description"),
"createdDateTime": site.get("createdDateTime"),
"lastModifiedDateTime": site.get("lastModifiedDateTime")
}
processedSites.append(siteInfo)
logger.debug(f"Site: {siteInfo['displayName']} - {siteInfo['webUrl']}")
return processedSites
except Exception as e:
logger.error(f"Error discovering SharePoint sites: {str(e)}")
return []
def _extractHostnameFromWebUrl(self, webUrl: str) -> Optional[str]:
"""Extract hostname from SharePoint webUrl (e.g., https://pcuster.sharepoint.com)"""
try:
if not webUrl:
return None
parsed = urllib.parse.urlparse(webUrl)
return parsed.hostname
except Exception as e:
logger.error(f"Error extracting hostname from webUrl '{webUrl}': {str(e)}")
return None
def _extractSiteFromStandardPath(self, pathQuery: str) -> Optional[Dict[str, str]]:
"""
Extract site name from Microsoft-standard server-relative path.
Delegates to SharePoint service.
"""
return self.services.sharepoint.extractSiteFromStandardPath(pathQuery)
async def _getSiteByStandardPath(self, sitePath: str) -> Optional[Dict[str, Any]]:
"""
Get SharePoint site directly by Microsoft-standard path.
Delegates to SharePoint service.
"""
return await self.services.sharepoint.getSiteByStandardPath(sitePath)
def _filterSitesByHint(self, sites: List[Dict[str, Any]], siteHint: str) -> List[Dict[str, Any]]:
"""
Filter discovered sites by a human-entered site hint.
Delegates to SharePoint service.
"""
return self.services.sharepoint.filterSitesByHint(sites, siteHint)
def _parseSearchQuery(self, searchQuery: str) -> tuple[str, str, str, dict]:
"""
Parse searchQuery to extract path, search terms, search type, and search options.
CRITICAL: NEVER convert words to paths! Words stay as search terms.
- "root document lesson" → fileQuery="root document lesson" (NOT "/root/document/lesson")
- "root, gose" → fileQuery="root, gose" (NOT "/root/gose")
- "druckersteuerung eskalation logobject" → fileQuery="druckersteuerung eskalation logobject"
Parameters:
searchQuery (str): Enhanced search query with options:
- "budget" -> pathQuery="*", fileQuery="budget", searchType="all", options={}
- "root document lesson" -> pathQuery="*", fileQuery="root document lesson", searchType="all", options={}
- "root, gose" -> pathQuery="*", fileQuery="root, gose", searchType="all", options={}
- "/Documents:budget" -> pathQuery="/Documents", fileQuery="budget", searchType="all", options={}
- "files:budget" -> pathQuery="*", fileQuery="budget", searchType="files", options={}
- "folders:DELTA" -> pathQuery="*", fileQuery="DELTA", searchType="folders", options={}
- "exact:\"Operations 2025\"" -> exact phrase matching
- "regex:^Operations.*2025$" -> regex pattern matching
- "case:DELTA" -> case-sensitive search
- "and:DELTA AND 2025 Mars AND Group" -> all AND terms must be present
Returns:
tuple[str, str, str, dict]: (pathQuery, fileQuery, searchType, searchOptions)
"""
try:
if not searchQuery or not searchQuery.strip() or searchQuery.strip() == "*":
return "*", "*", "all", {}
searchQuery = searchQuery.strip()
searchOptions = {}
# CRITICAL: Do NOT convert space-separated or comma-separated words to paths!
# "root document lesson" should stay as "root document lesson", NOT "/root/document/lesson"
# "root, gose" should stay as "root, gose", NOT "/root/gose"
# Check for search type specification (files:, folders:, all:) FIRST
searchType = "all" # Default
if searchQuery.startswith(("files:", "folders:", "all:")):
typeParts = searchQuery.split(':', 1)
searchType = typeParts[0].strip()
searchQuery = typeParts[1].strip()
# Extract optional site hint tokens: support "site=Name" or leading "site:Name"
def _extractSiteHint(q: str) -> tuple[str, Optional[str]]:
try:
qStrip = q.strip()
# Leading form: site:KM LayerFinance ...
if qStrip.lower().startswith("site:"):
after = qStrip[5:].lstrip()
# site name until next space or end
if ' ' in after:
siteName, rest = after.split(' ', 1)
else:
siteName, rest = after, ''
return rest.strip(), siteName.strip()
# Inline key=value form anywhere
m = re.search(r"\bsite=([^;\s]+)", qStrip, flags=re.IGNORECASE)
if m:
siteName = m.group(1).strip()
# remove the token from query
qNew = re.sub(r"\bsite=[^;\s]+;?", "", qStrip, flags=re.IGNORECASE).strip()
return qNew, siteName
except Exception:
pass
return q, None
searchQuery, extractedSite = _extractSiteHint(searchQuery)
if extractedSite:
searchOptions["site_hint"] = extractedSite
logger.info(f"Extracted site hint: '{extractedSite}'")
# Extract name="..." if present (for quoted multi-word names)
nameMatch = re.search(r"name=\"([^\"]+)\"", searchQuery)
if nameMatch:
searchQuery = nameMatch.group(1)
logger.info(f"Extracted name from quotes: '{searchQuery}'")
# Check for search mode specification (exact:, regex:, case:, and:)
if searchQuery.startswith(("exact:", "regex:", "case:", "and:")):
modeParts = searchQuery.split(':', 1)
mode = modeParts[0].strip()
searchQuery = modeParts[1].strip()
if mode == "exact":
searchOptions["exact_match"] = True
# Remove quotes if present
if searchQuery.startswith('"') and searchQuery.endswith('"'):
searchQuery = searchQuery[1:-1]
elif mode == "regex":
searchOptions["regex_match"] = True
elif mode == "case":
searchOptions["case_sensitive"] = True
elif mode == "and":
searchOptions["and_terms"] = True
# Check if it contains path:search format
# Microsoft-standard paths: /sites/SiteName/Path:files:.pdf
if ':' in searchQuery:
# For Microsoft-standard paths (/sites/...), find the colon that separates path from search
if searchQuery.startswith('/sites/'):
# Find the colon that separates path from search (after the full path)
# Look for pattern: /sites/SiteName/Path/...:files:.pdf
# We need to find the colon that's followed by search type or file extension
colonPositions = []
for i, char in enumerate(searchQuery):
if char == ':':
colonPositions.append(i)
# If we have colons, find the one that's followed by search type or file extension
splitPos = None
if colonPositions:
for pos in colonPositions:
afterColon = searchQuery[pos+1:pos+10].strip().lower()
# Check if this colon is followed by search type or looks like a file extension
if afterColon.startswith(('files:', 'folders:', 'all:', '.')) or afterColon == '':
splitPos = pos
break
# If no clear split found, use the last colon
if splitPos is None and colonPositions:
splitPos = colonPositions[-1]
if splitPos:
pathPart = searchQuery[:splitPos].strip()
searchPart = searchQuery[splitPos+1:].strip()
else:
# Fallback: split on first colon
parts = searchQuery.split(':', 1)
pathPart = parts[0].strip()
searchPart = parts[1].strip()
else:
# Regular path:search format - split on first colon
parts = searchQuery.split(':', 1)
pathPart = parts[0].strip()
searchPart = parts[1].strip()
# Check if searchPart starts with search type (files:, folders:, all:)
if searchPart.startswith(("files:", "folders:", "all:")):
typeParts = searchPart.split(':', 1)
searchType = typeParts[0].strip() # Update searchType
searchPart = typeParts[1].strip() if len(typeParts) > 1 else ""
# Handle path part
if not pathPart or pathPart == "*":
pathQuery = "*"
elif pathPart.startswith('/'):
pathQuery = pathPart
else:
pathQuery = f"/Documents/{pathPart}"
# Handle search part
if not searchPart or searchPart == "*":
fileQuery = "*"
else:
fileQuery = searchPart
return pathQuery, fileQuery, searchType, searchOptions
# No colon - check if it looks like a path
elif searchQuery.startswith('/'):
# It's a path only
return searchQuery, "*", searchType, searchOptions
else:
# It's a search term only - keep words as-is, do NOT convert to paths
# "root document lesson" stays as "root document lesson"
# "root, gose" stays as "root, gose"
return "*", searchQuery, searchType, searchOptions
except Exception as e:
logger.error(f"Error parsing searchQuery '{searchQuery}': {str(e)}")
raise ValueError(f"Failed to parse searchQuery '{searchQuery}': {str(e)}")
def _resolvePathQuery(self, pathQuery: str) -> List[str]:
"""
Resolve pathQuery into a list of search paths for SharePoint operations.
Parameters:
pathQuery (str): Query string that can contain:
- Direct paths (e.g., "/Documents/Project1")
- Wildcards (e.g., "/Documents/*")
- Multiple paths separated by semicolons (e.g., "/Docs; /Files")
- Single word relative paths (e.g., "Project1" -> resolved to default folder)
- Empty string or "*" for global search
- Space-separated words are treated as search terms, NOT folder paths
Returns:
List[str]: List of resolved paths
"""
try:
if not pathQuery or not pathQuery.strip() or pathQuery.strip() == "*":
return ["*"] # Global search across all sites
# Split by semicolon to handle multiple paths
rawPaths = [path.strip() for path in pathQuery.split(';') if path.strip()]
resolvedPaths = []
for rawPath in rawPaths:
# Handle wildcards - return as-is
if '*' in rawPath:
resolvedPaths.append(rawPath)
# Handle absolute paths
elif rawPath.startswith('/'):
resolvedPaths.append(rawPath)
# Handle single word relative paths - prepend default folder
# BUT NOT space-separated words (those are search terms, not paths)
elif ' ' not in rawPath:
resolvedPaths.append(f"/Documents/{rawPath}")
else:
# Check if this looks like a path (has path separators) or search terms
if '\\' in rawPath or '/' in rawPath:
# This looks like a path with spaces in folder names - treat as valid path
resolvedPaths.append(rawPath)
logger.info(f"Path with spaces '{rawPath}' treated as valid folder path")
else:
# Space-separated words without path separators are search terms
# Return as "*" to search globally
logger.info(f"Space-separated words '{rawPath}' treated as search terms, not folder path")
resolvedPaths.append("*")
# Remove duplicates while preserving order
seen = set()
uniquePaths = []
for path in resolvedPaths:
if path not in seen:
seen.add(path)
uniquePaths.append(path)
logger.info(f"Resolved pathQuery '{pathQuery}' to {len(uniquePaths)} paths: {uniquePaths}")
return uniquePaths
except Exception as e:
logger.error(f"Error resolving pathQuery '{pathQuery}': {str(e)}")
raise ValueError(f"Failed to resolve pathQuery '{pathQuery}': {str(e)}")
def _parseSiteUrl(self, siteUrl: str) -> Dict[str, str]:
"""Parse SharePoint site URL to extract hostname and site path"""
try:
parsed = urllib.parse.urlparse(siteUrl)
hostname = parsed.hostname
path = parsed.path.strip('/')
return {
"hostname": hostname,
"sitePath": path
}
except Exception as e:
logger.error(f"Error parsing site URL {siteUrl}: {str(e)}")
return {"hostname": "", "sitePath": ""}
def _cleanSearchQuery(self, query: str) -> str:
"""
Clean search query to make it compatible with Graph API KQL syntax.
Removes path-like syntax and invalid KQL constructs.
Parameters:
query (str): Raw search query that may contain paths and invalid syntax
Returns:
str: Cleaned query suitable for Graph API search endpoint
"""
if not query or not query.strip():
return ""
query = query.strip()
# Handle patterns like: "Company Share/Freigegebene Dokumente/.../expenses:files:.pdf"
# Extract the search term and file extension
# First, extract file extension if present (format: :files:.pdf or just .pdf at the end)
fileExtension = ""
if ':files:' in query.lower() or ':folders:' in query.lower():
# Extract extension after the type filter
extMatch = re.search(r':(?:files|folders):(\.\w+)', query, re.IGNORECASE)
if extMatch:
fileExtension = extMatch.group(1)
# Remove the type filter part
query = re.sub(r':(?:files|folders):\.?\w*', '', query, flags=re.IGNORECASE)
elif query.endswith(('.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.csv', '.ppt', '.pptx')):
# Extract extension from end
extMatch = re.search(r'(\.\w+)$', query)
if extMatch:
fileExtension = extMatch.group(1)
query = query[:-len(fileExtension)]
# Extract search term: get the last segment after the last slash (filename part)
queryNormalized = query.replace('\\', '/')
if '/' in queryNormalized:
# Extract the last segment (usually the filename/search term)
lastSegment = queryNormalized.split('/')[-1]
# Remove any remaining colons or type filters
if ':' in lastSegment:
lastSegment = lastSegment.split(':')[0]
searchTerm = lastSegment.strip()
else:
# No path separators, use the query as-is but remove type filters
if ':' in query:
searchTerm = query.split(':')[0].strip()
else:
searchTerm = query.strip()
# Remove any remaining type filters or invalid syntax
searchTerm = re.sub(r':(?:files|folders|all):?', '', searchTerm, flags=re.IGNORECASE)
searchTerm = searchTerm.strip()
# If we have a file extension, include it in the search term
# Note: Graph API search endpoint may not support filetype: syntax
# So we include the extension as part of the search term or filter results after
if fileExtension:
extWithoutDot = fileExtension.lstrip('.')
# Try simple approach: add extension as search term
# If this doesn't work, we'll filter results after search
if searchTerm:
# Include extension in search - Graph API will search in filename
searchTerm = f"{searchTerm} {extWithoutDot}"
else:
searchTerm = extWithoutDot
# Final cleanup: remove any remaining invalid characters for KQL
# Keep alphanumeric, spaces, hyphens, underscores, dots, and common search operators
searchTerm = re.sub(r'[^\w\s\-\.\*]', ' ', searchTerm)
searchTerm = ' '.join(searchTerm.split()) # Normalize whitespace
return searchTerm if searchTerm else "*"
async def _makeGraphApiCall(self, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]:
"""Make a Microsoft Graph API call with timeout and detailed logging"""
try:
if not hasattr(self.services, 'sharepoint') or not self.services.sharepoint._target.accessToken:
return {"error": "SharePoint service not configured with access token"}
headers = {
"Authorization": f"Bearer {self.services.sharepoint._target.accessToken}",
"Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json"
}
url = f"https://graph.microsoft.com/v1.0/{endpoint}"
logger.info(f"Making Graph API call: {method} {url}")
# Set timeout to 30 seconds
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
if method == "GET":
logger.debug(f"Starting GET request to {url}")
async with session.get(url, headers=headers) as response:
logger.info(f"Graph API response: {response.status}")
if response.status == 200:
result = await response.json()
logger.debug(f"Graph API success: {len(str(result))} characters response")
return result
else:
errorText = await response.text()
logger.error(f"Graph API call failed: {response.status} - {errorText}")
return {"error": f"API call failed: {response.status} - {errorText}"}
elif method == "PUT":
logger.debug(f"Starting PUT request to {url}")
async with session.put(url, headers=headers, data=data) as response:
logger.info(f"Graph API response: {response.status}")
if response.status in [200, 201]:
result = await response.json()
logger.debug(f"Graph API success: {len(str(result))} characters response")
return result
else:
errorText = await response.text()
logger.error(f"Graph API call failed: {response.status} - {errorText}")
return {"error": f"API call failed: {response.status} - {errorText}"}
elif method == "POST":
logger.debug(f"Starting POST request to {url}")
async with session.post(url, headers=headers, data=data) as response:
logger.info(f"Graph API response: {response.status}")
if response.status in [200, 201]:
result = await response.json()
logger.debug(f"Graph API success: {len(str(result))} characters response")
return result
else:
errorText = await response.text()
logger.error(f"Graph API call failed: {response.status} - {errorText}")
return {"error": f"API call failed: {response.status} - {errorText}"}
except asyncio.TimeoutError:
logger.error(f"Graph API call timed out after 30 seconds: {endpoint}")
return {"error": f"API call timed out after 30 seconds: {endpoint}"}
except Exception as e:
logger.error(f"Error making Graph API call: {str(e)}")
return {"error": f"Error making Graph API call: {str(e)}"}
async def _getSiteId(self, hostname: str, sitePath: str) -> str:
"""Get SharePoint site ID from hostname and site path"""
try:
endpoint = f"sites/{hostname}:/{sitePath}"
result = await self._makeGraphApiCall(endpoint)
if "error" in result:
logger.error(f"Error getting site ID: {result['error']}")
return ""
return result.get("id", "")
except Exception as e:
logger.error(f"Error getting site ID: {str(e)}")
return ""
async def _parseDocumentListForFoundDocuments(self, documentList: Any) -> tuple[Optional[List[Dict[str, Any]]], Optional[List[Dict[str, Any]]], Optional[str]]:
"""
Parse documentList to extract foundDocuments and site information.
Parameters:
documentList: Document list (can be list, DocumentReferenceList, or string)
Returns:
tuple: (foundDocuments, sites, errorMessage)
- foundDocuments: List of found documents from findDocumentPath result
- sites: List of site dictionaries with id, displayName, webUrl
- errorMessage: Error message if parsing failed, None otherwise
"""
try:
if isinstance(documentList, str):
documentList = [documentList]
# Resolve documentList to get actual documents
from modules.datamodels.datamodelDocref import DocumentReferenceList
if isinstance(documentList, DocumentReferenceList):
docRefList = documentList
elif isinstance(documentList, list):
docRefList = DocumentReferenceList.from_string_list(documentList)
else:
docRefList = DocumentReferenceList(references=[])
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docRefList)
if not chatDocuments:
return None, None, "No documents found for the provided document list"
firstDocument = chatDocuments[0]
fileData = self.services.chat.getFileData(firstDocument.fileId)
if not fileData:
return None, None, None # No fileData, but not an error (might be regular file)
try:
resultData = json.loads(fileData)
foundDocuments = resultData.get("foundDocuments", [])
# If no foundDocuments, check if it's a listDocuments result (has listResults)
if not foundDocuments and "listResults" in resultData:
logger.info(f"documentList contains listResults from listDocuments, converting to foundDocuments format")
listResults = resultData.get("listResults", [])
foundDocuments = []
siteIdFromList = None
siteNameFromList = None
for listResult in listResults:
siteResults = listResult.get("siteResults", [])
for siteResult in siteResults:
items = siteResult.get("items", [])
# Extract site info from first item if available
if items and not siteIdFromList:
siteNameFromList = items[0].get("siteName")
for item in items:
# Convert listDocuments item format to foundDocuments format
if item.get("type") == "file":
foundDoc = {
"id": item.get("id"),
"name": item.get("name"),
"type": "file",
"siteName": item.get("siteName"),
"siteId": None, # Will be determined from site discovery
"webUrl": item.get("webUrl"),
"fullPath": item.get("webUrl", ""),
"parentPath": item.get("parentPath", "")
}
foundDocuments.append(foundDoc)
# Discover sites to get siteId if we have siteName
if foundDocuments and siteNameFromList and not siteIdFromList:
logger.info(f"Discovering sites to find siteId for '{siteNameFromList}'")
allSites = await self._discoverSharePointSites()
matchingSites = self._filterSitesByHint(allSites, siteNameFromList)
if matchingSites:
siteIdFromList = matchingSites[0].get("id")
# Update all foundDocuments with siteId
for doc in foundDocuments:
doc["siteId"] = siteIdFromList
logger.info(f"Found siteId '{siteIdFromList}' for site '{siteNameFromList}'")
logger.info(f"Converted {len(foundDocuments)} files from listResults format")
if not foundDocuments:
return None, None, None # No foundDocuments, but not an error
# Extract site information from foundDocuments
firstDoc = foundDocuments[0]
siteName = firstDoc.get("siteName")
siteId = firstDoc.get("siteId")
# If siteId is missing (from listDocuments conversion), discover sites to find it
if siteName and not siteId:
logger.info(f"Site ID missing, discovering sites to find siteId for '{siteName}'")
allSites = await self._discoverSharePointSites()
matchingSites = self._filterSitesByHint(allSites, siteName)
if matchingSites:
siteId = matchingSites[0].get("id")
logger.info(f"Found siteId '{siteId}' for site '{siteName}'")
sites = None
if siteName and siteId:
sites = [{
"id": siteId,
"displayName": siteName,
"webUrl": firstDoc.get("webUrl", "")
}]
logger.info(f"Using specific site from documentList: {siteName} (ID: {siteId})")
elif siteName:
# Try to get site by name
allSites = await self._discoverSharePointSites()
matchingSites = self._filterSitesByHint(allSites, siteName)
if matchingSites:
sites = [{
"id": matchingSites[0].get("id"),
"displayName": siteName,
"webUrl": matchingSites[0].get("webUrl", "")
}]
logger.info(f"Found site by name: {siteName} (ID: {sites[0]['id']})")
else:
return None, None, f"Site '{siteName}' not found. Cannot determine target site."
else:
return None, None, "Site information missing from documentList. Cannot determine target site."
return foundDocuments, sites, None
except json.JSONDecodeError as e:
return None, None, f"Invalid JSON in documentList: {str(e)}"
except Exception as e:
return None, None, f"Error processing documentList: {str(e)}"
except Exception as e:
logger.error(f"Error parsing documentList: {str(e)}")
return None, None, f"Error parsing documentList: {str(e)}"
async def _resolveSitesFromPathQuery(self, pathQuery: str) -> tuple[List[Dict[str, Any]], Optional[str]]:
"""
Resolve sites from pathQuery using SharePoint service helper methods.
Parameters:
pathQuery (str): Path query string
Returns:
tuple: (sites, errorMessage)
- sites: List of site dictionaries
- errorMessage: Error message if resolution failed, None otherwise
"""
try:
# Validate pathQuery format
isValid, errorMsg = self.services.sharepoint.validatePathQuery(pathQuery)
if not isValid:
return [], errorMsg
# Resolve sites using service helper
sites = await self.services.sharepoint.resolveSitesFromPathQuery(pathQuery)
if not sites:
return [], "No SharePoint sites found or accessible"
return sites, None
except Exception as e:
logger.error(f"Error resolving sites from pathQuery '{pathQuery}': {str(e)}")
return [], f"Error resolving sites from pathQuery: {str(e)}"
@action
async def findDocumentPath(self, parameters: Dict[str, Any]) -> ActionResult:
"""
GENERAL:
- Purpose: Find documents and folders by name/path across sites.
- Input requirements: connectionReference (required); searchQuery (required); optional site, maxResults.
- Output format: JSON with found items and paths.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- site (str, optional): Site hint.
- searchQuery (str, required): Search terms or path.
- maxResults (int, optional): Maximum items to return. Default: 1000.
"""
import time
operationId = None
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"sharepoint_find_{workflowId}_{int(time.time())}"
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"Find Document Path",
"SharePoint Search",
f"Query: {parameters.get('searchQuery', '*')}",
parentOperationId=parentOperationId
)
connectionReference = parameters.get("connectionReference")
site = parameters.get("site")
searchQuery = parameters.get("searchQuery", "*")
maxResults = parameters.get("maxResults", 1000)
if not connectionReference:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Connection reference is required")
# Parse searchQuery to extract path, search terms, search type, and options
pathQuery, fileQuery, searchType, searchOptions = self._parseSearchQuery(searchQuery)
logger.debug(f"Parsed searchQuery '{searchQuery}' -> pathQuery='{pathQuery}', fileQuery='{fileQuery}', searchType='{searchType}'")
self.services.chat.progressLogUpdate(operationId, 0.2, "Getting Microsoft connection")
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Extract site name from pathQuery if it contains Microsoft-standard path (/sites/SiteName/...)
siteFromPath = None
directSite = None
if pathQuery and pathQuery.startswith('/sites/'):
parsedPath = self._extractSiteFromStandardPath(pathQuery)
if parsedPath:
siteFromPath = parsedPath.get("siteName")
logger.info(f"Extracted site from Microsoft-standard pathQuery '{pathQuery}': '{siteFromPath}'")
# Try to get site directly by path (optimization - no need to load all 60 sites)
directSite = await self._getSiteByStandardPath(siteFromPath)
if directSite:
logger.info(f"Got site directly by standard path - no need to discover all sites")
sites = [directSite]
else:
logger.warning(f"Could not get site directly, falling back to site discovery")
directSite = None
else:
logger.warning(f"Failed to parse site from standard pathQuery '{pathQuery}'")
# If we didn't get the site directly, use discovery and filtering
if not directSite:
# Determine which site hint to use (priority: site parameter > site from pathQuery > site_hint from searchOptions)
siteHintToUse = site or siteFromPath or searchOptions.get("site_hint")
# Discover SharePoint sites - use targeted approach when site hint is available
self.services.chat.progressLogUpdate(operationId, 0.3, "Discovering SharePoint sites")
if siteHintToUse:
# When site hint is available, discover all sites first, then filter
allSites = await self._discoverSharePointSites()
if not allSites:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
sites = self._filterSitesByHint(allSites, siteHintToUse)
logger.info(f"Filtered sites by site hint '{siteHintToUse}' -> {len(sites)} sites")
if not sites:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=f"No SharePoint sites found matching '{siteHintToUse}'")
else:
# No site hint - discover all sites
sites = await self._discoverSharePointSites()
if not sites:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No SharePoint sites found or accessible")
# Resolve path query into search paths
searchPaths = self._resolvePathQuery(pathQuery)
self.services.chat.progressLogUpdate(operationId, 0.5, f"Searching across {len(sites)} site(s)")
try:
# Search across all discovered sites
foundDocuments = []
allSitesSearched = []
# Handle different search approaches based on search type
if searchType == "folders" and fileQuery and fileQuery.strip() != "" and fileQuery.strip() != "*":
# Use unified search for folders - this is global and searches all sites
try:
# Use Microsoft Graph Search API syntax (simple term search only)
terms = [t for t in fileQuery.split() if t.strip()]
if len(terms) > 1:
# Multiple terms: search for ALL terms (AND) - more specific results
queryString = " AND ".join(terms)
else:
# Single term: search for the term
queryString = terms[0] if terms else fileQuery
logger.info(f"Using unified search for folders: {queryString}")
payload = {
"requests": [
{
"entityTypes": ["driveItem"],
"query": {"queryString": queryString},
"from": 0,
"size": 50
}
]
}
logger.info(f"Using unified search API for folders with queryString: {queryString}")
# Use global search endpoint (site-specific search not available)
unifiedResult = await self._makeGraphApiCall(
"search/query",
method="POST",
data=json.dumps(payload).encode("utf-8")
)
if "error" in unifiedResult:
logger.warning(f"Unified search failed: {unifiedResult['error']}")
items = []
else:
# Flatten hits -> driveItem resources
items = []
for container in (unifiedResult.get("value", []) or []):
for hitsContainer in (container.get("hitsContainers", []) or []):
for hit in (hitsContainer.get("hits", []) or []):
resource = hit.get("resource")
if resource:
items.append(resource)
logger.info(f"Unified search returned {len(items)} items (pre-filter)")
# Apply our improved folder detection logic
folderItems = []
for item in items:
resource = item
# Use the same detection logic as our test
isFolder = self.services.sharepoint.detectFolderType(resource)
if isFolder:
folderItems.append(item)
items = folderItems
logger.info(f"Filtered to {len(items)} folders using improved detection logic")
# Process unified search results - extract site information from webUrl
for item in items:
itemName = item.get("name", "")
webUrl = item.get("webUrl", "")
# Extract site information from webUrl
siteName = "Unknown Site"
siteId = "unknown"
if webUrl and '/sites/' in webUrl:
try:
# Extract site name from URL: https://pcuster.sharepoint.com/sites/SiteName/...
urlParts = webUrl.split('/sites/')
if len(urlParts) > 1:
sitePath = urlParts[1].split('/')[0]
# Find matching site from discovered sites
# First try to match by site name (URL path)
for site in sites:
if site.get("name") == sitePath:
siteName = site.get("displayName", sitePath)
siteId = site.get("id", "unknown")
break
else:
# If no match by name, try to match by displayName
for site in sites:
if site.get("displayName") == sitePath:
siteName = site.get("displayName", sitePath)
siteId = site.get("id", "unknown")
break
else:
# If no exact match, use the site path as site name
siteName = sitePath
# Try to find a site with similar name
for site in sites:
if sitePath.lower() in site.get("name", "").lower() or sitePath.lower() in site.get("displayName", "").lower():
siteName = site.get("displayName", sitePath)
siteId = site.get("id", "unknown")
break
except Exception as e:
logger.warning(f"Error extracting site info from URL {webUrl}: {e}")
# Use improved folder detection logic
isFolder = self.services.sharepoint.detectFolderType(item)
itemType = "folder" if isFolder else "file"
itemPath = item.get("parentReference", {}).get("path", "")
logger.debug(f"Processing {itemType}: '{itemName}' at path: '{itemPath}'")
# Simple filtering like test file - just check search type
if searchType == "files" and isFolder:
continue # Skip folders when searching for files
elif searchType == "folders" and not isFolder:
continue # Skip files when searching for folders
# Simple approach like test file - no complex filtering
logger.debug(f"Item '{itemName}' found - adding to results")
# Create result with full path information for proper action chaining
parentPath = item.get("parentReference", {}).get("path", "")
# Extract the full SharePoint path from webUrl or parentReference
fullPath = ""
if webUrl:
# Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung
if '/sites/' in webUrl:
pathPart = webUrl.split('/sites/')[1]
# Decode URL encoding and convert to backslash format
decodedPath = urllib.parse.unquote(pathPart)
fullPath = "\\" + decodedPath.replace('/', '\\')
elif parentPath:
# Use parentReference path if available
fullPath = parentPath.replace('/', '\\')
docInfo = {
"id": item.get("id"),
"name": item.get("name"),
"type": "folder" if isFolder else "file",
"siteName": siteName,
"siteId": siteId,
"webUrl": webUrl,
"fullPath": fullPath,
"parentPath": parentPath
}
foundDocuments.append(docInfo)
logger.info(f"Found {len(foundDocuments)} documents from unified search")
except Exception as e:
logger.error(f"Error performing unified folder search: {str(e)}")
# Fallback to site-by-site search
pass
# If no unified search was performed or it failed, fall back to site-by-site search
if not foundDocuments:
# Use simple approach like test file - no complex filtering
siteScopedSites = sites
for site in siteScopedSites:
siteId = site["id"]
siteName = site["displayName"]
siteUrl = site["webUrl"]
logger.info(f"Searching in site: {siteName} ({siteUrl})")
# Check if pathQuery contains a specific folder path (not just /sites/SiteName)
folderPath = None
if pathQuery and pathQuery.startswith('/sites/'):
parsedPath = self._extractSiteFromStandardPath(pathQuery)
if parsedPath:
innerPath = parsedPath.get("innerPath", "")
if innerPath and innerPath.strip():
# Remove leading slash if present
folderPath = innerPath.lstrip('/')
# Generic approach: Try to find the folder, if it fails, remove first segment
# This works for all languages because we test the actual API response
# In SharePoint Graph API, /drive/root already points to the default document library,
# so library names in paths should be removed
pathSegments = [s for s in folderPath.split('/') if s.strip()]
if len(pathSegments) > 1:
# Try with first segment removed (first segment is likely the document library)
testPath = '/'.join(pathSegments[1:])
# Quick test: try to get folder info (this is fast and doesn't require full search)
testEndpoint = f"sites/{siteId}/drive/root:/{urllib.parse.quote(testPath, safe='')}:"
testResult = await self._makeGraphApiCall(testEndpoint)
if testResult and "error" not in testResult:
# Path without first segment works - first segment was likely the document library
folderPath = testPath
logger.info(f"Removed document library name '{pathSegments[0]}' from folder path (tested via API)")
else:
# Keep original path - first segment is not a document library
logger.info(f"Keeping original folder path '{folderPath}' (first segment is not a document library)")
elif len(pathSegments) == 1:
# Only one segment - likely the document library itself, use root
folderPath = None
logger.info(f"Only one segment '{pathSegments[0]}' found, likely document library - using root")
if folderPath:
logger.info(f"Extracted folder path from pathQuery: '{folderPath}'")
else:
logger.info(f"Folder path resolved to root (only document library in path)")
# Use Microsoft Graph API for this specific site
# Handle empty or wildcard queries
if not fileQuery or fileQuery.strip() == "" or fileQuery.strip() == "*":
# For wildcard/empty queries, list all items
if folderPath:
# List items in specific folder
encodedPath = urllib.parse.quote(folderPath, safe='')
endpoint = f"sites/{siteId}/drive/root:/{encodedPath}:/children"
logger.info(f"Listing items in folder: '{folderPath}'")
else:
# List all items in the drive root
endpoint = f"sites/{siteId}/drive/root/children"
# Make the API call to list items
listResult = await self._makeGraphApiCall(endpoint)
if "error" in listResult:
logger.warning(f"List failed for site {siteName}: {listResult['error']}")
continue
# Process list results for this site
items = listResult.get("value", [])
logger.info(f"Retrieved {len(items)} items from site {siteName}")
else:
# For files, use regular search API
# Clean the query: remove path-like syntax and invalid KQL syntax
searchQuery = self._cleanSearchQuery(fileQuery)
# URL-encode the query parameter
encodedQuery = urllib.parse.quote(searchQuery, safe='')
if folderPath:
# Search in specific folder
encodedPath = urllib.parse.quote(folderPath, safe='')
endpoint = f"sites/{siteId}/drive/root:/{encodedPath}:/search(q='{encodedQuery}')"
logger.info(f"Searching in folder '{folderPath}' with query: '{searchQuery}' (encoded: '{encodedQuery}')")
else:
# Search in drive root
endpoint = f"sites/{siteId}/drive/root/search(q='{encodedQuery}')"
logger.info(f"Using search API for files with query: '{searchQuery}' (encoded: '{encodedQuery}')")
# Make the search API call (files)
searchResult = await self._makeGraphApiCall(endpoint)
if "error" in searchResult:
logger.warning(f"Search failed for site {siteName}: {searchResult['error']}")
continue
# Process search results for this site (files)
items = searchResult.get("value", [])
logger.info(f"Retrieved {len(items)} items from site {siteName}")
siteDocuments = []
for item in items:
itemName = item.get("name", "")
# Use improved folder detection logic
isFolder = self.services.sharepoint.detectFolderType(item)
itemType = "folder" if isFolder else "file"
itemPath = item.get("parentReference", {}).get("path", "")
logger.debug(f"Processing {itemType}: '{itemName}' at path: '{itemPath}'")
# Simple filtering like test file - just check search type
if searchType == "files" and isFolder:
continue # Skip folders when searching for files
elif searchType == "folders" and not isFolder:
continue # Skip files when searching for folders
# Simple approach like test file - no complex filtering
logger.debug(f"Item '{itemName}' found - adding to results")
# Create result with full path information for proper action chaining
webUrl = item.get("webUrl", "")
parentPath = item.get("parentReference", {}).get("path", "")
# Extract the full SharePoint path from webUrl or parentReference
fullPath = ""
if webUrl:
# Extract path from webUrl: https://pcuster.sharepoint.com/sites/SSSRESYNachfolge/Freigegebene%20Dokumente/General/Eskalation%20LogObject/Druckersteuerung
if '/sites/' in webUrl:
pathPart = webUrl.split('/sites/')[1]
# Decode URL encoding and convert to backslash format
decodedPath = urllib.parse.unquote(pathPart)
fullPath = "\\" + decodedPath.replace('/', '\\')
elif parentPath:
# Use parentReference path if available
fullPath = parentPath.replace('/', '\\')
docInfo = {
"id": item.get("id"),
"name": item.get("name"),
"type": "folder" if isFolder else "file",
"siteName": siteName,
"siteId": siteId,
"webUrl": webUrl,
"fullPath": fullPath,
"parentPath": parentPath
}
siteDocuments.append(docInfo)
foundDocuments.extend(siteDocuments)
allSitesSearched.append({
"siteName": siteName,
"siteUrl": siteUrl,
"siteId": siteId,
"documentsFound": len(siteDocuments)
})
logger.info(f"Found {len(siteDocuments)} documents in site {siteName}")
# Limit total results to maxResults
if len(foundDocuments) > maxResults:
foundDocuments = foundDocuments[:maxResults]
logger.info(f"Limited results to {maxResults} items")
self.services.chat.progressLogUpdate(operationId, 0.9, f"Found {len(foundDocuments)} document(s)")
resultData = {
"searchQuery": searchQuery,
"totalResults": len(foundDocuments),
"maxResults": maxResults,
"foundDocuments": foundDocuments,
"timestamp": self.services.utils.timestampGetUtc()
}
except Exception as e:
logger.error(f"Error searching SharePoint: {str(e)}")
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=str(e))
# Use default JSON format for output
outputExtension = ".json" # Default
outputMimeType = "application/json" # Default
validationMetadata = {
"actionType": "sharepoint.findDocumentPath",
"searchQuery": searchQuery,
"maxResults": maxResults,
"totalResults": len(foundDocuments),
"hasResults": len(foundDocuments) > 0
}
self.services.chat.progressLogFinish(operationId, True)
return ActionResult(
success=True,
documents=[
ActionDocument(
documentName=f"sharepoint_find_path_{self._format_timestamp_for_filename()}{outputExtension}",
documentData=json.dumps(resultData, indent=2),
mimeType=outputMimeType,
validationMetadata=validationMetadata
)
]
)
except Exception as e:
logger.error(f"Error finding document path: {str(e)}")
if operationId:
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass
return ActionResult.isFailure(error=str(e))
@action
async def readDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
"""
GENERAL:
- Purpose: Read documents from SharePoint and extract content/metadata.
- Input requirements: connectionReference (required); documentList or pathQuery (required); includeMetadata (optional).
- Output format: Standardized ActionDocument format (documentName, documentData, mimeType).
- Binary files (PDFs, etc.) are Base64-encoded in documentData.
- Text files are stored as plain text in documentData.
- Returns ActionResult with documents list for template processing.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- documentList (list, optional): Document list reference(s) containing findDocumentPath result.
- pathQuery (str, optional): Direct path query if no documentList (e.g., /sites/SiteName/FolderPath).
- includeMetadata (bool, optional): Include metadata. Default: True.
Returns:
- ActionResult with documents: List[ActionDocument] where each ActionDocument contains:
- documentName: File name
- documentData: Base64-encoded content (binary files) or plain text (text files)
- mimeType: MIME type (e.g., application/pdf, text/plain)
"""
import time
operationId = None
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"sharepoint_read_{workflowId}_{int(time.time())}"
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"Read Documents",
"SharePoint Document Reading",
"Processing document list",
parentOperationId=parentOperationId
)
documentList = parameters.get("documentList")
pathQuery = parameters.get("pathQuery", "*")
connectionReference = parameters.get("connectionReference")
includeMetadata = parameters.get("includeMetadata", True)
# Validate connection reference
if not connectionReference:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Connection reference is required")
# Require either documentList or pathQuery
if not documentList and (not pathQuery or pathQuery.strip() == "" or pathQuery.strip() == "*"):
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Either documentList or pathQuery is required")
# Get connection first
self.services.chat.progressLogUpdate(operationId, 0.2, "Getting Microsoft connection")
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Parse documentList to extract foundDocuments and site information
sharePointFileIds = None
sites = None
if documentList:
foundDocuments, sites, errorMsg = await self._parseDocumentListForFoundDocuments(documentList)
if errorMsg:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=errorMsg)
if foundDocuments:
# Extract SharePoint file IDs from foundDocuments
sharePointFileIds = [doc.get("id") for doc in foundDocuments if doc.get("type") == "file"]
if not sharePointFileIds:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No files found in documentList from findDocumentPath result")
logger.info(f"Extracted {len(sharePointFileIds)} SharePoint file IDs from documentList")
# If we have SharePoint file IDs from documentList (findDocumentPath result), read them directly
if sharePointFileIds and sites:
# Read SharePoint files directly using their IDs
readResults = []
siteId = sites[0]['id']
self.services.chat.progressLogUpdate(operationId, 0.5, f"Reading {len(sharePointFileIds)} file(s) from SharePoint")
for idx, fileId in enumerate(sharePointFileIds):
try:
self.services.chat.progressLogUpdate(operationId, 0.5 + (idx * 0.3 / len(sharePointFileIds)), f"Reading file {idx + 1}/{len(sharePointFileIds)}")
# Get file info from SharePoint
endpoint = f"sites/{siteId}/drive/items/{fileId}"
fileInfo = await self._makeGraphApiCall(endpoint)
if "error" in fileInfo:
logger.warning(f"Failed to get file info for {fileId}: {fileInfo['error']}")
continue
# Get file content using SharePoint service (handles binary data correctly)
fileName = fileInfo.get("name", f"file_{fileId}")
fileContent = await self.services.sharepoint.downloadFile(siteId, fileId)
# Create result document
resultItem = {
"fileId": fileId,
"fileName": fileName,
"sharepointFileId": fileId,
"siteName": sites[0]['displayName'],
"siteUrl": sites[0]['webUrl'],
"size": fileInfo.get("size", 0),
"createdDateTime": fileInfo.get("createdDateTime"),
"lastModifiedDateTime": fileInfo.get("lastModifiedDateTime"),
"webUrl": fileInfo.get("webUrl")
}
# Add content if available
if fileContent:
resultItem["content"] = fileContent
# Add metadata if requested
if includeMetadata:
resultItem["metadata"] = {
"mimeType": fileInfo.get("file", {}).get("mimeType"),
"downloadUrl": fileInfo.get("@microsoft.graph.downloadUrl"),
"createdBy": fileInfo.get("createdBy", {}),
"lastModifiedBy": fileInfo.get("lastModifiedBy", {}),
"parentReference": fileInfo.get("parentReference", {})
}
readResults.append(resultItem)
except Exception as e:
logger.error(f"Error reading file {fileId}: {str(e)}")
continue
if not readResults:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No files could be read from documentList")
# Convert read results to ActionDocument objects
# IMPORTANT: For binary files (PDFs), store Base64-encoded content directly in documentData
# The system will create FileData and ChatDocument automatically
self.services.chat.progressLogUpdate(operationId, 0.8, f"Processing {len(readResults)} document(s)")
from modules.datamodels.datamodelChat import ActionDocument
import base64
actionDocuments = []
for resultItem in readResults:
fileContent = resultItem.get("content")
fileName = resultItem.get("fileName", f"file_{resultItem.get('fileId')}")
# Determine MIME type from metadata or file extension
mimeType = "application/octet-stream"
if resultItem.get("metadata", {}).get("mimeType"):
mimeType = resultItem["metadata"]["mimeType"]
elif fileName:
if fileName.endswith('.pdf'):
mimeType = "application/pdf"
elif fileName.endswith('.txt'):
mimeType = "text/plain"
elif fileName.endswith('.json'):
mimeType = "application/json"
# For binary files (PDFs, etc.), store Base64-encoded content directly
# The GenerationService will detect PDF mimeType and handle base64 decoding
if fileContent and isinstance(fileContent, bytes):
# Encode binary content as Base64 string
base64Content = base64.b64encode(fileContent).decode('utf-8')
validationMetadata = {
"actionType": "sharepoint.readDocuments",
"fileName": fileName,
"sharepointFileId": resultItem.get("sharepointFileId"),
"siteName": resultItem.get("siteName"),
"mimeType": mimeType,
"contentType": "binary",
"size": len(fileContent),
"includeMetadata": includeMetadata
}
actionDoc = ActionDocument(
documentName=fileName,
documentData=base64Content, # Base64 string for binary files
mimeType=mimeType,
validationMetadata=validationMetadata
)
actionDocuments.append(actionDoc)
logger.info(f"Stored binary file {fileName} ({len(fileContent)} bytes) as Base64 in ActionDocument")
elif fileContent:
# Text content - store directly in documentData
validationMetadata = {
"actionType": "sharepoint.readDocuments",
"fileName": fileName,
"sharepointFileId": resultItem.get("sharepointFileId"),
"siteName": resultItem.get("siteName"),
"mimeType": mimeType,
"contentType": "text",
"includeMetadata": includeMetadata
}
actionDoc = ActionDocument(
documentName=fileName,
documentData=fileContent if isinstance(fileContent, str) else str(fileContent),
mimeType=mimeType,
validationMetadata=validationMetadata
)
actionDocuments.append(actionDoc)
else:
# No content - store metadata only
docData = {
"fileName": fileName,
"sharepointFileId": resultItem.get("sharepointFileId"),
"siteName": resultItem.get("siteName"),
"siteUrl": resultItem.get("siteUrl"),
"size": resultItem.get("size"),
"createdDateTime": resultItem.get("createdDateTime"),
"lastModifiedDateTime": resultItem.get("lastModifiedDateTime"),
"webUrl": resultItem.get("webUrl")
}
if resultItem.get("metadata"):
docData["metadata"] = resultItem["metadata"]
validationMetadata = {
"actionType": "sharepoint.readDocuments",
"fileName": fileName,
"sharepointFileId": resultItem.get("sharepointFileId"),
"siteName": resultItem.get("siteName"),
"mimeType": mimeType,
"contentType": "metadata_only",
"includeMetadata": includeMetadata
}
actionDoc = ActionDocument(
documentName=fileName,
documentData=json.dumps(docData, indent=2),
mimeType=mimeType,
validationMetadata=validationMetadata
)
actionDocuments.append(actionDoc)
# Return success with action documents
self.services.chat.progressLogUpdate(operationId, 0.9, f"Read {len(actionDocuments)} document(s)")
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=actionDocuments)
# If no sites from documentList, try pathQuery fallback
if not sites and pathQuery and pathQuery.strip() != "" and pathQuery.strip() != "*":
sites, errorMsg = await self._resolveSitesFromPathQuery(pathQuery)
if errorMsg:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=errorMsg)
# If still no sites, return error
if not sites:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Either documentList must contain findDocumentPath result with file information, or pathQuery must be provided. Use findDocumentPath first to get file paths, or provide pathQuery directly.")
# This should never be reached if logic above is correct
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Unexpected error: could not process documentList or pathQuery")
except Exception as e:
logger.error(f"Error reading SharePoint documents: {str(e)}")
if operationId:
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass # Don't fail on progress logging errors
return ActionResult(
success=False,
error=str(e)
)
@action
async def uploadDocument(self, parameters: Dict[str, Any]) -> ActionResult:
"""
GENERAL:
- Purpose: Upload documents to SharePoint. Only to choose this action with a connectionReference
- Input requirements: connectionReference (required); documentList (required); pathQuery (optional).
- Output format: JSON with upload status and file info.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- documentList (list, required): Document reference(s) to upload. File names are taken from the documents.
- pathQuery (str, optional): Direct upload target path if documentList doesn't contain findDocumentPath result (e.g., /sites/SiteName/FolderPath).
"""
import time
operationId = None
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"sharepoint_upload_{workflowId}_{int(time.time())}"
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"Upload Document",
"SharePoint Upload",
"Processing document list",
parentOperationId=parentOperationId
)
connectionReference = parameters.get("connectionReference")
documentList = parameters.get("documentList")
pathQuery = parameters.get("pathQuery")
if isinstance(documentList, str):
documentList = [documentList]
if not connectionReference:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Connection reference is required")
if not documentList:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Document list is required")
# Parse documentList to extract folder path and site information
uploadPath, sites, filesToUpload, errorMsg = await self._parseDocumentListForFolder(documentList)
if errorMsg:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=errorMsg)
# If no folder path found from documentList, use pathQuery if provided
if not uploadPath and pathQuery and pathQuery.strip() != "" and pathQuery.strip() != "*":
uploadPath = pathQuery
logger.info(f"Using pathQuery for upload path: {uploadPath}")
# Resolve sites from pathQuery
sites, errorMsg = await self._resolveSitesFromPathQuery(pathQuery)
if errorMsg:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=errorMsg)
# Validate required parameters
if not uploadPath:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Either documentList must contain findDocumentPath result with folder information, or pathQuery must be provided. Use findDocumentPath first to get upload folder, or provide pathQuery directly.")
if not sites:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Site information missing. Cannot determine target site for upload.")
if not filesToUpload:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No files to upload found in documentList.")
# Get connection
self.services.chat.progressLogUpdate(operationId, 0.3, "Getting Microsoft connection")
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Process upload paths
uploadPaths = []
if uploadPath.startswith('01PPXICCB') or uploadPath.startswith('01'):
# It's a folder ID - use it directly
uploadPaths = [uploadPath]
logger.info(f"Using folder ID directly for upload: {uploadPath}")
else:
# It's a path - resolve it normally
uploadPaths = self._resolvePathQuery(uploadPath)
# Process each document upload
uploadResults = []
# Extract file names from documents
fileNames = [doc.fileName for doc in filesToUpload]
logger.info(f"Using file names from documentList: {fileNames}")
self.services.chat.progressLogUpdate(operationId, 0.5, f"Uploading {len(filesToUpload)} document(s)")
# Process upload paths
# Process each document upload
uploadResults = []
# Extract file names from documents
fileNames = [doc.fileName for doc in filesToUpload]
logger.info(f"Using file names from documentList: {fileNames}")
self.services.chat.progressLogUpdate(operationId, 0.5, f"Uploading {len(filesToUpload)} document(s)")
for i, (chatDocument, fileName) in enumerate(zip(filesToUpload, fileNames)):
try:
fileId = chatDocument.fileId
fileData = self.services.chat.getFileData(fileId)
if not fileData:
logger.warning(f"File data not found for fileId: {fileId}")
uploadResults.append({
"fileName": fileName,
"fileId": fileId,
"error": "File data not found",
"uploadStatus": "failed"
})
continue
# Upload to the first available site (or could be made configurable)
uploadSuccessful = False
for site in sites:
siteId = site["id"]
siteName = site["displayName"]
siteUrl = site["webUrl"]
# Use the first upload path or default to Documents
uploadPath = uploadPaths[0] if uploadPaths else "/Documents"
# Handle wildcard paths - replace with default Documents folder
if uploadPath == "*":
uploadPath = "/Documents"
logger.warning(f"Wildcard path '*' detected, using default '/Documents' folder for upload")
# Check if uploadPath is a folder ID or a regular path
if uploadPath.startswith('01PPXICCB') or uploadPath.startswith('01'):
# It's a folder ID - use the folder-specific upload endpoint
uploadEndpoint = f"sites/{siteId}/drive/items/{uploadPath}:/{fileName}:/content"
logger.info(f"Using folder ID upload endpoint: {uploadEndpoint}")
else:
# It's a regular path - use the root-based upload endpoint
uploadPath = uploadPath.rstrip('/') + '/' + fileName
uploadPathClean = uploadPath.lstrip('/')
uploadEndpoint = f"sites/{siteId}/drive/root:/{uploadPathClean}:/content"
logger.info(f"Using path-based upload endpoint: {uploadEndpoint}")
# Upload endpoint for small files (< 4MB)
if len(fileData) < 4 * 1024 * 1024: # 4MB
# Upload the file
uploadResult = await self._makeGraphApiCall(
uploadEndpoint,
method="PUT",
data=fileData
)
if "error" not in uploadResult:
uploadResults.append({
"fileName": fileName,
"fileId": fileId,
"uploadStatus": "success",
"siteName": siteName,
"siteUrl": siteUrl,
"uploadPath": uploadPath,
"uploadEndpoint": uploadEndpoint,
"sharepointFileId": uploadResult.get("id"),
"webUrl": uploadResult.get("webUrl"),
"size": uploadResult.get("size"),
"createdDateTime": uploadResult.get("createdDateTime")
})
uploadSuccessful = True
break
else:
logger.warning(f"Upload failed to site {siteName}: {uploadResult['error']}")
else:
# For large files, we would need to implement resumable upload
logger.warning(f"File too large ({len(fileData)} bytes) for site {siteName}")
continue
if not uploadSuccessful:
uploadResults.append({
"fileName": fileName,
"fileId": fileId,
"error": f"File too large ({len(fileData)} bytes) or upload failed to all sites. Files larger than 4MB require resumable upload (not implemented).",
"uploadStatus": "failed"
})
except Exception as e:
logger.error(f"Error uploading document {fileName}: {str(e)}")
uploadResults.append({
"fileName": fileName,
"fileId": fileId,
"error": str(e),
"uploadStatus": "failed"
})
# Update progress for each file
self.services.chat.progressLogUpdate(operationId, 0.5 + (i * 0.4 / len(filesToUpload)), f"Uploaded {i + 1}/{len(filesToUpload)} file(s)")
# Create result data
resultData = {
"connectionReference": connectionReference,
"uploadPath": uploadPath,
"documentList": documentList,
"fileNames": fileNames,
"sitesAvailable": len(sites),
"uploadResults": uploadResults,
"connection": {
"id": connection["id"],
"authority": "microsoft",
"reference": connectionReference
},
"timestamp": self.services.utils.timestampGetUtc()
}
# Use default JSON format for output
outputExtension = ".json" # Default
outputMimeType = "application/json" # Default
validationMetadata = {
"actionType": "sharepoint.uploadDocument",
"connectionReference": connectionReference,
"uploadPath": uploadPath,
"fileNames": fileNames,
"uploadCount": len(uploadResults),
"successfulUploads": len([r for r in uploadResults if r.get("uploadStatus") == "success"]),
"failedUploads": len([r for r in uploadResults if r.get("uploadStatus") == "failed"])
}
successfulUploads = len([r for r in uploadResults if r.get("uploadStatus") == "success"])
self.services.chat.progressLogUpdate(operationId, 0.9, f"Uploaded {successfulUploads}/{len(uploadResults)} file(s)")
self.services.chat.progressLogFinish(operationId, successfulUploads > 0)
return ActionResult(
success=True,
documents=[
ActionDocument(
documentName=f"sharepoint_upload_{self._format_timestamp_for_filename()}{outputExtension}",
documentData=json.dumps(resultData, indent=2),
mimeType=outputMimeType,
validationMetadata=validationMetadata
)
]
)
except Exception as e:
logger.error(f"Error uploading to SharePoint: {str(e)}")
if operationId:
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass
return ActionResult(
success=False,
error=str(e)
)
@action
async def listDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
"""
GENERAL:
- Purpose: List documents and folders in SharePoint paths across sites.
- Input requirements: connectionReference (required); documentList (required); includeSubfolders (optional).
- Output format: JSON with folder items and metadata.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- documentList (list, required): Document list reference(s) containing findDocumentPath result.
- includeSubfolders (bool, optional): Include one level of subfolders. Default: False.
"""
import time
operationId = None
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"sharepoint_list_{workflowId}_{int(time.time())}"
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"List Documents",
"SharePoint Listing",
"Processing document list",
parentOperationId=parentOperationId
)
connectionReference = parameters.get("connectionReference")
documentList = parameters.get("documentList")
pathQuery = parameters.get("pathQuery", "*")
if isinstance(documentList, str):
documentList = [documentList]
includeSubfolders = parameters.get("includeSubfolders", False) # Default to False for better UX
if not connectionReference:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Connection reference is required")
# Require either documentList or pathQuery
if not documentList and (not pathQuery or pathQuery.strip() == "" or pathQuery.strip() == "*"):
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Either documentList or pathQuery is required")
# Parse documentList to extract folder path and site information
listQuery, sites, _, errorMsg = await self._parseDocumentListForFolder(documentList)
if errorMsg:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=errorMsg)
# If no folder path found from documentList, use pathQuery if provided
if not listQuery and pathQuery and pathQuery.strip() != "" and pathQuery.strip() != "*":
listQuery = pathQuery
logger.info(f"Using pathQuery for list query: {listQuery}")
# Resolve sites from pathQuery
sites, errorMsg = await self._resolveSitesFromPathQuery(pathQuery)
if errorMsg:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=errorMsg)
# Validate required parameters
if not listQuery:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Either documentList must contain findDocumentPath result with folder information, or pathQuery must be provided. Use findDocumentPath first to get folder path, or provide pathQuery directly.")
if not sites:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Site information missing. Cannot determine target site for list operation.")
# Get connection
self.services.chat.progressLogUpdate(operationId, 0.2, "Getting Microsoft connection")
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
logger.info(f"Starting SharePoint listDocuments for listQuery: {listQuery}")
logger.debug(f"Connection ID: {connection['id']}")
self.services.chat.progressLogUpdate(operationId, 0.3, "Processing folder path")
# Parse listQuery to extract path, search terms, search type, and options
pathQuery, fileQuery, searchType, searchOptions = self._parseSearchQuery(listQuery)
# Check if listQuery is a folder ID (starts with 01PPXICCB...)
if listQuery.startswith('01PPXICCB') or listQuery.startswith('01'):
# Direct folder ID - use it directly
folderPaths = [listQuery]
logger.info(f"Using direct folder ID: {listQuery}")
else:
# Remove site prefix from pathQuery before resolving (it's only for site filtering)
pathQueryForResolve = pathQuery
# Microsoft-standard path: /sites/SiteName/Path -> /Path
if pathQuery.startswith('/sites/'):
parsedPath = self._extractSiteFromStandardPath(pathQuery)
if parsedPath:
innerPath = parsedPath.get("innerPath", "")
pathQueryForResolve = '/' + innerPath if innerPath else '/'
else:
pathQueryForResolve = '/'
# Remove first path segment if it looks like a document library name
# In SharePoint Graph API, /drive/root already points to the default document library,
# so library names in paths should be removed
# Generic approach: if path has multiple segments, store original for fallback
pathSegments = [s for s in pathQueryForResolve.split('/') if s.strip()]
if len(pathSegments) > 1:
# Path has multiple segments - first might be a library name
# Store original for potential fallback
originalPath = pathQueryForResolve
# Try without first segment (assuming it's a library name)
pathQueryForResolve = '/' + '/'.join(pathSegments[1:])
logger.info(f"Removed first path segment (potential library name), path changed from '{originalPath}' to '{pathQueryForResolve}'")
elif len(pathSegments) == 1:
# Only one segment - if it's a common library-like name, use root
firstSegmentLower = pathSegments[0].lower()
libraryIndicators = ['document', 'dokument', 'shared', 'freigegeben', 'library', 'bibliothek']
if any(indicator in firstSegmentLower for indicator in libraryIndicators):
pathQueryForResolve = '/'
logger.info(f"First segment '{pathSegments[0]}' appears to be a library name, using root")
# Resolve path query into folder paths
folderPaths = self._resolvePathQuery(pathQueryForResolve)
logger.info(f"Resolved folder paths: {folderPaths}")
# Process each folder path across all sites
listResults = []
self.services.chat.progressLogUpdate(operationId, 0.5, f"Listing {len(folderPaths)} folder(s) across {len(sites)} site(s)")
for folderPath in folderPaths:
try:
folderResults = []
for site in sites:
siteId = site["id"]
siteName = site["displayName"]
siteUrl = site["webUrl"]
logger.info(f"Listing folder {folderPath} in site: {siteName}")
# Determine the endpoint based on folder path
if folderPath in ["/", ""] or folderPath == "*":
# Root folder
endpoint = f"sites/{siteId}/drive/root/children"
elif folderPath.startswith('01PPXICCB') or folderPath.startswith('01'):
# Direct folder ID
endpoint = f"sites/{siteId}/drive/items/{folderPath}/children"
else:
# Specific folder path - remove leading slash if present and URL encode
folderPathClean = folderPath.lstrip('/')
# URL encode the path for Graph API (spaces and special characters need encoding)
folderPathEncoded = urllib.parse.quote(folderPathClean, safe='/')
endpoint = f"sites/{siteId}/drive/root:/{folderPathEncoded}:/children"
# Make the API call to list folder contents
apiResult = await self._makeGraphApiCall(endpoint)
if "error" in apiResult:
logger.warning(f"Failed to list folder {folderPath} in site {siteName}: {apiResult['error']}")
continue
# Process the results
items = apiResult.get("value", [])
processedItems = []
for item in items:
# Use improved folder detection logic
isFolder = self.services.sharepoint.detectFolderType(item)
itemInfo = {
"id": item.get("id"),
"name": item.get("name"),
"size": item.get("size", 0),
"createdDateTime": item.get("createdDateTime"),
"lastModifiedDateTime": item.get("lastModifiedDateTime"),
"webUrl": item.get("webUrl"),
"type": "folder" if isFolder else "file",
"siteName": siteName,
"siteUrl": siteUrl
}
# Add file-specific information
if "file" in item:
itemInfo.update({
"mimeType": item["file"].get("mimeType"),
"downloadUrl": item.get("@microsoft.graph.downloadUrl")
})
# Add folder-specific information
if "folder" in item:
itemInfo.update({
"childCount": item["folder"].get("childCount", 0)
})
processedItems.append(itemInfo)
# If include subfolders is enabled, get ONLY direct subfolder contents (1 level deep only)
if includeSubfolders:
folderItems = [item for item in processedItems if item['type'] == 'folder']
logger.info(f"Including subfolders - processing {len(folderItems)} folders")
subfolderCount = 0
maxSubfolders = 10 # Limit to prevent infinite loops
for item in processedItems[:]: # Use slice to avoid modifying list during iteration
if item["type"] == "folder" and subfolderCount < maxSubfolders:
subfolderCount += 1
subfolderPath = f"{folderPath.rstrip('/')}/{item['name']}"
subfolderEndpoint = f"sites/{siteId}/drive/items/{item['id']}/children"
logger.debug(f"Getting contents of subfolder: {item['name']}")
subfolderResult = await self._makeGraphApiCall(subfolderEndpoint)
if "error" not in subfolderResult:
subfolderItems = subfolderResult.get("value", [])
logger.debug(f"Found {len(subfolderItems)} items in subfolder {item['name']}")
for subfolderItem in subfolderItems:
# Use improved folder detection logic for subfolder items
subfolderIsFolder = self.services.sharepoint.detectFolderType(subfolderItem)
# Only add files and direct subfolders, NO RECURSION
subfolderItemInfo = {
"id": subfolderItem.get("id"),
"name": subfolderItem.get("name"),
"size": subfolderItem.get("size", 0),
"createdDateTime": subfolderItem.get("createdDateTime"),
"lastModifiedDateTime": subfolderItem.get("lastModifiedDateTime"),
"webUrl": subfolderItem.get("webUrl"),
"type": "folder" if subfolderIsFolder else "file",
"parentPath": subfolderPath,
"siteName": siteName,
"siteUrl": siteUrl
}
if "file" in subfolderItem:
subfolderItemInfo.update({
"mimeType": subfolderItem["file"].get("mimeType"),
"downloadUrl": subfolderItem.get("@microsoft.graph.downloadUrl")
})
processedItems.append(subfolderItemInfo)
else:
logger.warning(f"Failed to get contents of subfolder {item['name']}: {subfolderResult.get('error')}")
elif subfolderCount >= maxSubfolders:
logger.warning(f"Reached maximum subfolder limit ({maxSubfolders}), skipping remaining folders")
break
logger.info(f"Processed {subfolderCount} subfolders, total items: {len(processedItems)}")
folderResults.append({
"siteName": siteName,
"siteUrl": siteUrl,
"itemCount": len(processedItems),
"items": processedItems
})
listResults.append({
"folderPath": folderPath,
"sitesProcessed": len(folderResults),
"siteResults": folderResults
})
except Exception as e:
logger.error(f"Error listing folder {folderPath}: {str(e)}")
listResults.append({
"folderPath": folderPath,
"error": str(e),
"siteResults": []
})
totalItems = sum(len(result.get("siteResults", [])) for result in listResults)
self.services.chat.progressLogUpdate(operationId, 0.9, f"Found {totalItems} item(s)")
# Create result data
resultData = {
"pathQuery": listQuery,
"includeSubfolders": includeSubfolders,
"sitesSearched": len(sites),
"listResults": listResults,
"timestamp": self.services.utils.timestampGetUtc()
}
# Use default JSON format for output
outputExtension = ".json" # Default
outputMimeType = "application/json" # Default
validationMetadata = {
"actionType": "sharepoint.listDocuments",
"pathQuery": listQuery,
"includeSubfolders": includeSubfolders,
"sitesSearched": len(sites),
"folderCount": len(listResults),
"totalItems": totalItems
}
self.services.chat.progressLogFinish(operationId, True)
return ActionResult(
success=True,
documents=[
ActionDocument(
documentName=f"sharepoint_document_list_{self._format_timestamp_for_filename()}{outputExtension}",
documentData=json.dumps(resultData, indent=2),
mimeType=outputMimeType,
validationMetadata=validationMetadata
)
]
)
except Exception as e:
logger.error(f"Error listing SharePoint documents: {str(e)}")
if operationId:
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass
return ActionResult(
success=False,
error=str(e)
)
@action
async def analyzeFolderUsage(self, parameters: Dict[str, Any]) -> ActionResult:
"""
GENERAL:
- Purpose: Analyze usage intensity of folders and files in SharePoint.
- Input requirements: connectionReference (required); documentList (required); optional startDateTime, endDateTime, interval.
- Output format: JSON with usage analytics grouped by time intervals.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- documentList (list, required): Document list reference(s) containing findDocumentPath result.
- startDateTime (str, optional): Start date/time in ISO format (e.g., "2025-11-01T00:00:00Z"). Default: 30 days ago.
- endDateTime (str, optional): End date/time in ISO format (e.g., "2025-11-30T23:59:59Z"). Default: current time.
- interval (str, optional): Time interval for grouping activities. Options: "day", "week", "month". Default: "day".
"""
import time
operationId = None
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"sharepoint_usage_{workflowId}_{int(time.time())}"
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"Analyze Folder Usage",
"SharePoint Analytics",
"Processing document list",
parentOperationId=parentOperationId
)
connectionReference = parameters.get("connectionReference")
documentList = parameters.get("documentList")
pathQuery = parameters.get("pathQuery")
if isinstance(documentList, str):
documentList = [documentList]
startDateTime = parameters.get("startDateTime")
endDateTime = parameters.get("endDateTime")
interval = parameters.get("interval", "day")
if not connectionReference:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Connection reference is required")
# Require either documentList or pathQuery
if not documentList and (not pathQuery or pathQuery.strip() == "" or pathQuery.strip() == "*"):
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Either documentList or pathQuery is required")
# Resolve folder/item information from documentList or pathQuery
siteId = None
driveId = None
itemId = None
folderPath = None
folderName = None
if documentList:
foundDocuments, sites, errorMsg = await self._parseDocumentListForFoundDocuments(documentList)
if errorMsg:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=errorMsg)
if not foundDocuments:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No documents found in documentList")
# Get siteId from first document (all should be from same site)
firstItem = foundDocuments[0]
siteId = firstItem.get("siteId")
if not siteId:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Site ID missing from documentList")
# Get drive ID (needed for analytics)
driveId = await self.services.sharepoint.getDriveId(siteId)
if not driveId:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Could not determine drive ID for the site")
# If no items from documentList, try pathQuery fallback
if not foundDocuments and pathQuery and pathQuery.strip() != "" and pathQuery.strip() != "*":
sites, errorMsg = await self._resolveSitesFromPathQuery(pathQuery)
if errorMsg:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=errorMsg)
if sites:
siteId = sites[0].get("id")
# Parse pathQuery to find the folder/item
pathQueryParsed, fileQuery, searchType, searchOptions = self._parseSearchQuery(pathQuery)
# Extract folder path from pathQuery
folderPath = '/'
if pathQueryParsed and pathQueryParsed.startswith('/sites/'):
parsedPath = self._extractSiteFromStandardPath(pathQueryParsed)
if parsedPath:
innerPath = parsedPath.get("innerPath", "")
folderPath = '/' + innerPath if innerPath else '/'
elif pathQueryParsed:
folderPath = pathQueryParsed
# Get drive ID
driveId = await self.services.sharepoint.getDriveId(siteId)
if not driveId:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Could not determine drive ID for the site")
# Get folder/item by path
folderInfo = await self.services.sharepoint.getFolderByPath(siteId, folderPath.lstrip('/'))
if not folderInfo:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=f"Folder or file not found at path: {folderPath}")
# Add pathQuery item to foundDocuments for processing
foundDocuments = [{
"id": folderInfo.get("id"),
"name": folderInfo.get("name", ""),
"type": "folder" if folderInfo.get("folder") else "file",
"siteId": siteId,
"fullPath": folderPath,
"webUrl": folderInfo.get("webUrl", "")
}]
if not siteId or not driveId:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Either documentList must contain findDocumentPath result with folder information, or pathQuery must be provided. Use findDocumentPath first to get folder path, or provide pathQuery directly.")
self.services.chat.progressLogUpdate(operationId, 0.2, "Getting Microsoft connection")
# Get Microsoft connection
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Set access token
if not self.services.sharepoint.setAccessTokenFromConnection(connection):
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Failed to set SharePoint access token")
# Process all items from documentList or pathQuery
# IMPORTANT: Only analyze FOLDERS, not files (action is "analyzeFolderUsage")
itemsToAnalyze = []
if foundDocuments:
for item in foundDocuments:
itemId = item.get("id")
itemType = item.get("type", "").lower()
# Only process folders, skip files and site-level items
if itemId and itemType == "folder":
itemsToAnalyze.append({
"id": itemId,
"name": item.get("name", ""),
"type": itemType,
"path": item.get("fullPath", ""),
"webUrl": item.get("webUrl", "")
})
if not itemsToAnalyze:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid folders found in documentList to analyze. Note: This action only analyzes folders, not files.")
self.services.chat.progressLogUpdate(operationId, 0.4, f"Analyzing {len(itemsToAnalyze)} folder(s)")
# Analyze each item
allAnalytics = []
totalActivities = 0
uniqueUsers = set()
activityTypes = {}
# Compute actual date range values (getFolderUsageAnalytics will set defaults if None)
# We need to compute them here to store in output, since getFolderUsageAnalytics modifies them
actualStartDateTime = startDateTime
actualEndDateTime = endDateTime
if not actualEndDateTime:
actualEndDateTime = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
if not actualStartDateTime:
startDate = datetime.now(timezone.utc) - timedelta(days=30)
actualStartDateTime = startDate.isoformat().replace('+00:00', 'Z')
for idx, item in enumerate(itemsToAnalyze):
progress = 0.4 + (idx / len(itemsToAnalyze)) * 0.5
self.services.chat.progressLogUpdate(operationId, progress, f"Analyzing folder {item['name']} ({idx+1}/{len(itemsToAnalyze)})")
# Get usage analytics for this folder
analyticsResult = await self.services.sharepoint.getFolderUsageAnalytics(
siteId=siteId,
driveId=driveId,
itemId=item["id"],
startDateTime=startDateTime,
endDateTime=endDateTime,
interval=interval
)
if "error" in analyticsResult:
logger.warning(f"Failed to get analytics for item {item['name']} ({item['id']}): {analyticsResult['error']}")
# Continue with other items even if one fails
itemAnalytics = {
"itemId": item["id"],
"itemName": item["name"],
"itemType": item["type"],
"itemPath": item["path"],
"error": analyticsResult.get("error", "Unknown error")
}
else:
# Process analytics for this item
itemActivities = 0
itemUsers = set()
itemActivityTypes = {}
if "value" in analyticsResult:
for intervalData in analyticsResult["value"]:
activities = intervalData.get("activities", [])
for activity in activities:
itemActivities += 1
totalActivities += 1
action = activity.get("action", {})
actionType = action.get("verb", "unknown")
itemActivityTypes[actionType] = itemActivityTypes.get(actionType, 0) + 1
activityTypes[actionType] = activityTypes.get(actionType, 0) + 1
actor = activity.get("actor", {})
userPrincipalName = actor.get("userPrincipalName", "")
if userPrincipalName:
itemUsers.add(userPrincipalName)
uniqueUsers.add(userPrincipalName)
itemAnalytics = {
"itemId": item["id"],
"itemName": item["name"],
"itemType": item["type"],
"itemPath": item["path"],
"webUrl": item["webUrl"],
"analytics": analyticsResult,
"summary": {
"totalActivities": itemActivities,
"uniqueUsers": len(itemUsers),
"activityTypes": itemActivityTypes
}
}
# Include note if analytics are not available
if "note" in analyticsResult:
itemAnalytics["note"] = analyticsResult["note"]
allAnalytics.append(itemAnalytics)
self.services.chat.progressLogUpdate(operationId, 0.9, "Processing analytics data")
# Process and format analytics data
resultData = {
"siteId": siteId,
"driveId": driveId,
"startDateTime": actualStartDateTime, # Store computed date range (not None)
"endDateTime": actualEndDateTime, # Store computed date range (not None)
"interval": interval,
"itemsAnalyzed": len(itemsToAnalyze),
"foldersAnalyzed": len([item for item in allAnalytics if item.get("itemType") == "folder"]),
"items": allAnalytics,
"summary": {
"totalActivities": totalActivities,
"uniqueUsers": len(uniqueUsers),
"activityTypes": activityTypes
},
"note": f"Analyzed {len(itemsToAnalyze)} folder(s) from {actualStartDateTime} to {actualEndDateTime}. " +
f"Found {totalActivities} total activities across {len(uniqueUsers)} unique user(s)." +
(f" Note: {len([item for item in allAnalytics if 'error' in item])} folder(s) had errors or no analytics data available." if any('error' in item for item in allAnalytics) else ""),
"timestamp": self.services.utils.timestampGetUtc()
}
self.services.chat.progressLogUpdate(operationId, 0.95, f"Found {totalActivities} total activities across {len(itemsToAnalyze)} folder(s)")
validationMetadata = {
"actionType": "sharepoint.analyzeFolderUsage",
"itemsAnalyzed": len(itemsToAnalyze),
"interval": interval,
"totalActivities": totalActivities,
"uniqueUsers": len(uniqueUsers)
}
self.services.chat.progressLogFinish(operationId, True)
return ActionResult(
success=True,
documents=[
ActionDocument(
documentName=f"sharepoint_usage_analysis_{self._format_timestamp_for_filename()}.json",
documentData=json.dumps(resultData, indent=2),
mimeType="application/json",
validationMetadata=validationMetadata
)
]
)
except Exception as e:
logger.error(f"Error analyzing folder usage: {str(e)}")
if operationId:
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass
return ActionResult(
success=False,
error=str(e)
)
@action
async def findSiteByUrl(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Find SharePoint site by hostname and site path.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- hostname (str, required): SharePoint hostname (e.g., "example.sharepoint.com")
- sitePath (str, required): Site path (e.g., "SteeringBPM" or "/sites/SteeringBPM")
Returns:
- ActionResult with ActionDocument containing site information (id, displayName, name, webUrl)
"""
try:
connectionReference = parameters.get("connectionReference")
if not connectionReference:
return ActionResult.isFailure(error="connectionReference parameter is required")
hostname = parameters.get("hostname")
if not hostname:
return ActionResult.isFailure(error="hostname parameter is required")
sitePath = parameters.get("sitePath")
if not sitePath:
return ActionResult.isFailure(error="sitePath parameter is required")
# Get Microsoft connection
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Find site by URL
siteInfo = await self.services.sharepoint.findSiteByUrl(
hostname=hostname,
sitePath=sitePath
)
if not siteInfo:
return ActionResult.isFailure(error=f"Site not found: {hostname}:/sites/{sitePath}")
logger.info(f"Found SharePoint site: {siteInfo.get('displayName')} (ID: {siteInfo.get('id')})")
# Generate filename
workflowContext = self.services.chat.getWorkflowContext() if hasattr(self.services, 'chat') else None
filename = self._generateMeaningfulFileName(
"sharepoint_site",
"json",
workflowContext,
"findSiteByUrl"
)
validationMetadata = self._createValidationMetadata(
"findSiteByUrl",
hostname=hostname,
sitePath=sitePath,
siteId=siteInfo.get("id")
)
document = ActionDocument(
documentName=filename,
documentData=json.dumps(siteInfo, indent=2),
mimeType="application/json",
validationMetadata=validationMetadata
)
return ActionResult.isSuccess(documents=[document])
except Exception as e:
errorMsg = f"Error finding SharePoint site: {str(e)}"
logger.error(errorMsg)
return ActionResult.isFailure(error=errorMsg)
@action
async def downloadFileByPath(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Download file from SharePoint by exact file path.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- siteId (str, required): SharePoint site ID (from findSiteByUrl result) or document reference containing site info
- filePath (str, required): Full file path relative to site root (e.g., "/General/50 Docs hosted by SELISE/file.xlsx")
Returns:
- ActionResult with ActionDocument containing file content as base64-encoded bytes
"""
try:
connectionReference = parameters.get("connectionReference")
if not connectionReference:
return ActionResult.isFailure(error="connectionReference parameter is required")
siteIdParam = parameters.get("siteId")
if not siteIdParam:
return ActionResult.isFailure(error="siteId parameter is required")
filePath = parameters.get("filePath")
if not filePath:
return ActionResult.isFailure(error="filePath parameter is required")
# Extract siteId from document if it's a reference
siteId = None
if isinstance(siteIdParam, str):
# Try to parse from document reference
from modules.datamodels.datamodelDocref import DocumentReferenceList
try:
docList = DocumentReferenceList.from_string_list([siteIdParam])
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
if chatDocuments and len(chatDocuments) > 0:
siteInfoJson = json.loads(chatDocuments[0].documentData)
siteId = siteInfoJson.get("id")
except:
pass
if not siteId:
# Assume it's the site ID directly
siteId = siteIdParam
else:
siteId = siteIdParam
if not siteId:
return ActionResult.isFailure(error="Could not extract siteId from parameter")
# Get Microsoft connection
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Download file
fileContent = await self.services.sharepoint.downloadFileByPath(
siteId=siteId,
filePath=filePath
)
if fileContent is None:
return ActionResult.isFailure(error=f"File not found or could not be downloaded: {filePath}")
logger.info(f"Downloaded file from SharePoint: {filePath} ({len(fileContent)} bytes)")
# Generate filename from filePath
import os
fileName = os.path.basename(filePath) or "downloaded_file"
workflowContext = self.services.chat.getWorkflowContext() if hasattr(self.services, 'chat') else None
filename = self._generateMeaningfulFileName(
fileName.split('.')[0] if '.' in fileName else fileName,
fileName.split('.')[-1] if '.' in fileName else "bin",
workflowContext,
"downloadFileByPath"
)
# Encode as base64
import base64
fileBase64 = base64.b64encode(fileContent).decode('utf-8')
validationMetadata = self._createValidationMetadata(
"downloadFileByPath",
siteId=siteId,
filePath=filePath,
fileSize=len(fileContent)
)
document = ActionDocument(
documentName=filename,
documentData=fileBase64,
mimeType="application/octet-stream",
validationMetadata=validationMetadata
)
return ActionResult.isSuccess(documents=[document])
except Exception as e:
errorMsg = f"Error downloading file from SharePoint: {str(e)}"
logger.error(errorMsg)
return ActionResult.isFailure(error=errorMsg)
@action
async def copyFile(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Copy file within SharePoint.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- siteId (str, required): SharePoint site ID (from findSiteByUrl result) or document reference containing site info
- sourceFolder (str, required): Source folder path relative to site root
- sourceFile (str, required): Source file name
- destFolder (str, required): Destination folder path relative to site root
- destFile (str, required): Destination file name
Returns:
- ActionResult with ActionDocument containing copy result
"""
try:
connectionReference = parameters.get("connectionReference")
if not connectionReference:
return ActionResult.isFailure(error="connectionReference parameter is required")
siteIdParam = parameters.get("siteId")
if not siteIdParam:
return ActionResult.isFailure(error="siteId parameter is required")
sourceFolder = parameters.get("sourceFolder")
if not sourceFolder:
return ActionResult.isFailure(error="sourceFolder parameter is required")
sourceFile = parameters.get("sourceFile")
if not sourceFile:
return ActionResult.isFailure(error="sourceFile parameter is required")
destFolder = parameters.get("destFolder")
if not destFolder:
return ActionResult.isFailure(error="destFolder parameter is required")
destFile = parameters.get("destFile")
if not destFile:
return ActionResult.isFailure(error="destFile parameter is required")
# Extract siteId from document if it's a reference
siteId = None
if isinstance(siteIdParam, str):
from modules.datamodels.datamodelDocref import DocumentReferenceList
try:
docList = DocumentReferenceList.from_string_list([siteIdParam])
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
if chatDocuments and len(chatDocuments) > 0:
siteInfoJson = json.loads(chatDocuments[0].documentData)
siteId = siteInfoJson.get("id")
except:
pass
if not siteId:
siteId = siteIdParam
else:
siteId = siteIdParam
if not siteId:
return ActionResult.isFailure(error="Could not extract siteId from parameter")
# Get Microsoft connection
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Copy file
await self.services.sharepoint.copyFileAsync(
siteId=siteId,
sourceFolder=sourceFolder,
sourceFile=sourceFile,
destFolder=destFolder,
destFile=destFile
)
logger.info(f"Copied file in SharePoint: {sourceFolder}/{sourceFile} -> {destFolder}/{destFile}")
# Generate filename
workflowContext = self.services.chat.getWorkflowContext() if hasattr(self.services, 'chat') else None
filename = self._generateMeaningfulFileName(
"file_copy_result",
"json",
workflowContext,
"copyFile"
)
result = {
"success": True,
"siteId": siteId,
"sourcePath": f"{sourceFolder}/{sourceFile}",
"destPath": f"{destFolder}/{destFile}"
}
validationMetadata = self._createValidationMetadata(
"copyFile",
siteId=siteId,
sourcePath=f"{sourceFolder}/{sourceFile}",
destPath=f"{destFolder}/{destFile}"
)
document = ActionDocument(
documentName=filename,
documentData=json.dumps(result, indent=2),
mimeType="application/json",
validationMetadata=validationMetadata
)
return ActionResult.isSuccess(documents=[document])
except Exception as e:
# Handle file not found gracefully
if "itemNotFound" in str(e) or "404" in str(e):
logger.warning(f"File not found for copy: {parameters.get('sourceFolder')}/{parameters.get('sourceFile')}")
# Return success with skipped status
workflowContext = self.services.chat.getWorkflowContext() if hasattr(self.services, 'chat') else None
filename = self._generateMeaningfulFileName(
"file_copy_result",
"json",
workflowContext,
"copyFile"
)
result = {
"success": True,
"skipped": True,
"reason": "File not found (may not exist yet)"
}
validationMetadata = self._createValidationMetadata(
"copyFile",
skipped=True
)
document = ActionDocument(
documentName=filename,
documentData=json.dumps(result, indent=2),
mimeType="application/json",
validationMetadata=validationMetadata
)
return ActionResult.isSuccess(documents=[document])
errorMsg = f"Error copying file in SharePoint: {str(e)}"
logger.error(errorMsg)
return ActionResult.isFailure(error=errorMsg)
@action
async def uploadFile(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Upload raw file content (bytes) to SharePoint.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- siteId (str, required): SharePoint site ID (from findSiteByUrl result) or document reference containing site info
- folderPath (str, required): Folder path relative to site root
- fileName (str, required): File name
- content (str, required): Document reference containing file content as base64-encoded bytes
Returns:
- ActionResult with ActionDocument containing upload result
"""
try:
connectionReference = parameters.get("connectionReference")
if not connectionReference:
return ActionResult.isFailure(error="connectionReference parameter is required")
siteIdParam = parameters.get("siteId")
if not siteIdParam:
return ActionResult.isFailure(error="siteId parameter is required")
folderPath = parameters.get("folderPath")
if not folderPath:
return ActionResult.isFailure(error="folderPath parameter is required")
fileName = parameters.get("fileName")
if not fileName:
return ActionResult.isFailure(error="fileName parameter is required")
contentParam = parameters.get("content")
if not contentParam:
return ActionResult.isFailure(error="content parameter is required")
# Extract siteId from document if it's a reference
siteId = None
if isinstance(siteIdParam, str):
from modules.datamodels.datamodelDocref import DocumentReferenceList
try:
docList = DocumentReferenceList.from_string_list([siteIdParam])
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
if chatDocuments and len(chatDocuments) > 0:
siteInfoJson = json.loads(chatDocuments[0].documentData)
siteId = siteInfoJson.get("id")
except:
pass
if not siteId:
siteId = siteIdParam
else:
siteId = siteIdParam
if not siteId:
return ActionResult.isFailure(error="Could not extract siteId from parameter")
# Get file content from document
from modules.datamodels.datamodelDocref import DocumentReferenceList
docList = DocumentReferenceList.from_string_list([contentParam] if isinstance(contentParam, str) else contentParam)
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
if not chatDocuments or len(chatDocuments) == 0:
return ActionResult.isFailure(error="Could not get file content from document reference")
fileContentBase64 = chatDocuments[0].documentData
# Decode base64
import base64
try:
fileContent = base64.b64decode(fileContentBase64)
except Exception as e:
return ActionResult.isFailure(error=f"Could not decode base64 file content: {str(e)}")
# Get Microsoft connection
connection = self._getMicrosoftConnection(connectionReference)
if not connection:
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
# Upload file
uploadResult = await self.services.sharepoint.uploadFile(
siteId=siteId,
folderPath=folderPath,
fileName=fileName,
content=fileContent
)
if "error" in uploadResult:
return ActionResult.isFailure(error=f"Upload failed: {uploadResult['error']}")
logger.info(f"Uploaded file to SharePoint: {folderPath}/{fileName} ({len(fileContent)} bytes)")
# Generate filename
workflowContext = self.services.chat.getWorkflowContext() if hasattr(self.services, 'chat') else None
filename = self._generateMeaningfulFileName(
"file_upload_result",
"json",
workflowContext,
"uploadFile"
)
result = {
"success": True,
"siteId": siteId,
"filePath": f"{folderPath}/{fileName}",
"fileSize": len(fileContent),
"uploadResult": uploadResult
}
validationMetadata = self._createValidationMetadata(
"uploadFile",
siteId=siteId,
filePath=f"{folderPath}/{fileName}",
fileSize=len(fileContent)
)
document = ActionDocument(
documentName=filename,
documentData=json.dumps(result, indent=2),
mimeType="application/json",
validationMetadata=validationMetadata
)
return ActionResult.isSuccess(documents=[document])
except Exception as e:
errorMsg = f"Error uploading file to SharePoint: {str(e)}"
logger.error(errorMsg)
return ActionResult.isFailure(error=errorMsg)