291 lines
15 KiB
Python
291 lines
15 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
|
|
"""
|
|
Document Parsing helper for SharePoint operations.
|
|
Handles parsing of document lists and extracting found documents and site information.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DocumentParsingHelper:
|
|
"""Helper for parsing document lists and extracting document information"""
|
|
|
|
def __init__(self, methodInstance):
|
|
"""
|
|
Initialize document parsing helper.
|
|
|
|
Args:
|
|
methodInstance: Instance of MethodSharepoint (for access to services)
|
|
"""
|
|
self.method = methodInstance
|
|
self.services = methodInstance.services
|
|
|
|
async def parseDocumentListForFoundDocuments(self, documentList: Any) -> tuple[Optional[List[Dict[str, Any]]], Optional[List[Dict[str, Any]]], Optional[str]]:
|
|
"""
|
|
Parse documentList to extract foundDocuments and site information.
|
|
|
|
Parameters:
|
|
documentList: Document list (can be list, DocumentReferenceList, or string)
|
|
|
|
Returns:
|
|
tuple: (foundDocuments, sites, errorMessage)
|
|
- foundDocuments: List of found documents from findDocumentPath result
|
|
- sites: List of site dictionaries with id, displayName, webUrl
|
|
- errorMessage: Error message if parsing failed, None otherwise
|
|
"""
|
|
try:
|
|
if isinstance(documentList, str):
|
|
documentList = [documentList]
|
|
|
|
# Resolve documentList to get actual documents
|
|
from modules.datamodels.datamodelDocref import DocumentReferenceList
|
|
if isinstance(documentList, DocumentReferenceList):
|
|
docRefList = documentList
|
|
elif isinstance(documentList, list):
|
|
docRefList = DocumentReferenceList.from_string_list(documentList)
|
|
else:
|
|
docRefList = DocumentReferenceList(references=[])
|
|
|
|
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docRefList)
|
|
if not chatDocuments:
|
|
return None, None, "No documents found for the provided document list"
|
|
|
|
firstDocument = chatDocuments[0]
|
|
fileData = self.services.chat.getFileData(firstDocument.fileId)
|
|
if not fileData:
|
|
return None, None, None # No fileData, but not an error (might be regular file)
|
|
|
|
try:
|
|
resultData = json.loads(fileData)
|
|
|
|
# Handle nested structure: documentData.data may contain the actual JSON as a string
|
|
if "documentData" in resultData and isinstance(resultData.get("documentData"), dict):
|
|
innerData = resultData["documentData"].get("data")
|
|
if innerData and isinstance(innerData, str):
|
|
try:
|
|
# Parse the inner JSON string
|
|
resultData = json.loads(innerData)
|
|
logger.debug(f"Parsed nested documentData.data structure")
|
|
except json.JSONDecodeError:
|
|
logger.debug(f"documentData.data is not valid JSON, using as-is")
|
|
|
|
foundDocuments = resultData.get("foundDocuments", [])
|
|
|
|
# If no foundDocuments, check if it's a listDocuments result (has listResults)
|
|
if not foundDocuments and "listResults" in resultData:
|
|
logger.info(f"documentList contains listResults from listDocuments, converting to foundDocuments format")
|
|
listResults = resultData.get("listResults", [])
|
|
foundDocuments = []
|
|
siteIdFromList = None
|
|
siteNameFromList = None
|
|
|
|
for listResult in listResults:
|
|
siteResults = listResult.get("siteResults", [])
|
|
for siteResult in siteResults:
|
|
items = siteResult.get("items", [])
|
|
# Extract site info from first item if available
|
|
if items and not siteIdFromList:
|
|
siteNameFromList = items[0].get("siteName")
|
|
|
|
for item in items:
|
|
# Convert listDocuments item format to foundDocuments format
|
|
if item.get("type") == "file":
|
|
foundDoc = {
|
|
"id": item.get("id"),
|
|
"name": item.get("name"),
|
|
"type": "file",
|
|
"siteName": item.get("siteName"),
|
|
"siteId": None, # Will be determined from site discovery
|
|
"webUrl": item.get("webUrl"),
|
|
"fullPath": item.get("webUrl", ""),
|
|
"parentPath": item.get("parentPath", "")
|
|
}
|
|
foundDocuments.append(foundDoc)
|
|
|
|
# Discover sites to get siteId if we have siteName
|
|
if foundDocuments and siteNameFromList and not siteIdFromList:
|
|
logger.info(f"Discovering sites to find siteId for '{siteNameFromList}'")
|
|
allSites = await self.method.siteDiscovery.discoverSharePointSites()
|
|
matchingSites = self.method.siteDiscovery.filterSitesByHint(allSites, siteNameFromList)
|
|
if matchingSites:
|
|
siteIdFromList = matchingSites[0].get("id")
|
|
# Update all foundDocuments with siteId
|
|
for doc in foundDocuments:
|
|
doc["siteId"] = siteIdFromList
|
|
logger.info(f"Found siteId '{siteIdFromList}' for site '{siteNameFromList}'")
|
|
|
|
logger.info(f"Converted {len(foundDocuments)} files from listResults format")
|
|
|
|
if not foundDocuments:
|
|
return None, None, None # No foundDocuments, but not an error
|
|
|
|
# Extract site information from foundDocuments
|
|
firstDoc = foundDocuments[0]
|
|
siteName = firstDoc.get("siteName")
|
|
siteId = firstDoc.get("siteId")
|
|
|
|
# If siteId is missing (from listDocuments conversion), discover sites to find it
|
|
if siteName and not siteId:
|
|
logger.info(f"Site ID missing, discovering sites to find siteId for '{siteName}'")
|
|
allSites = await self.method.siteDiscovery.discoverSharePointSites()
|
|
matchingSites = self.method.siteDiscovery.filterSitesByHint(allSites, siteName)
|
|
if matchingSites:
|
|
siteId = matchingSites[0].get("id")
|
|
logger.info(f"Found siteId '{siteId}' for site '{siteName}'")
|
|
|
|
sites = None
|
|
if siteName and siteId:
|
|
sites = [{
|
|
"id": siteId,
|
|
"displayName": siteName,
|
|
"webUrl": firstDoc.get("webUrl", "")
|
|
}]
|
|
logger.info(f"Using specific site from documentList: {siteName} (ID: {siteId})")
|
|
elif siteName:
|
|
# Try to get site by name
|
|
allSites = await self.method.siteDiscovery.discoverSharePointSites()
|
|
matchingSites = self.method.siteDiscovery.filterSitesByHint(allSites, siteName)
|
|
if matchingSites:
|
|
sites = [{
|
|
"id": matchingSites[0].get("id"),
|
|
"displayName": siteName,
|
|
"webUrl": matchingSites[0].get("webUrl", "")
|
|
}]
|
|
logger.info(f"Found site by name: {siteName} (ID: {sites[0]['id']})")
|
|
else:
|
|
return None, None, f"Site '{siteName}' not found. Cannot determine target site."
|
|
else:
|
|
return None, None, "Site information missing from documentList. Cannot determine target site."
|
|
|
|
return foundDocuments, sites, None
|
|
|
|
except json.JSONDecodeError as e:
|
|
return None, None, f"Invalid JSON in documentList: {str(e)}"
|
|
except Exception as e:
|
|
return None, None, f"Error processing documentList: {str(e)}"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing documentList: {str(e)}")
|
|
return None, None, f"Error parsing documentList: {str(e)}"
|
|
|
|
async def parseDocumentListForFolder(self, documentList: Any) -> tuple[Optional[str], Optional[List[Dict[str, Any]]], Optional[List], Optional[str]]:
|
|
"""
|
|
Parse documentList to extract folder path, site information, and files to upload.
|
|
|
|
Parameters:
|
|
documentList: Document list (can be list, DocumentReferenceList, or string)
|
|
|
|
Returns:
|
|
tuple: (folderPath, sites, filesToUpload, errorMessage)
|
|
- folderPath: Folder path from findDocumentPath result (or None)
|
|
- sites: List of site dictionaries with id, displayName, webUrl
|
|
- filesToUpload: List of ChatDocument objects to upload (or None)
|
|
- errorMessage: Error message if parsing failed, None otherwise
|
|
"""
|
|
try:
|
|
if isinstance(documentList, str):
|
|
documentList = [documentList]
|
|
|
|
# Resolve documentList to get actual documents
|
|
from modules.datamodels.datamodelDocref import DocumentReferenceList
|
|
if isinstance(documentList, DocumentReferenceList):
|
|
docRefList = documentList
|
|
elif isinstance(documentList, list):
|
|
docRefList = DocumentReferenceList.from_string_list(documentList)
|
|
else:
|
|
docRefList = DocumentReferenceList(references=[])
|
|
|
|
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docRefList)
|
|
if not chatDocuments:
|
|
return None, None, None, "No documents found for the provided document list"
|
|
|
|
# Check if first document is a findDocumentPath result (has foundDocuments)
|
|
firstDocument = chatDocuments[0]
|
|
fileData = self.services.chat.getFileData(firstDocument.fileId)
|
|
|
|
folderPath = None
|
|
sites = None
|
|
filesToUpload = None
|
|
|
|
if fileData:
|
|
try:
|
|
# Check if fileData is binary (not text/JSON)
|
|
# Binary files (xlsx, pdf, etc.) can't be parsed as JSON
|
|
isBinaryFile = False
|
|
if isinstance(fileData, bytes):
|
|
try:
|
|
fileData = fileData.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
# Binary file - cannot parse as JSON
|
|
isBinaryFile = True
|
|
logger.debug(f"File is binary (not UTF-8 decodable), treating as regular file to upload")
|
|
|
|
if isBinaryFile:
|
|
# Binary file - treat as regular file to upload
|
|
filesToUpload = chatDocuments
|
|
else:
|
|
resultData = json.loads(fileData)
|
|
|
|
# Handle nested structure: documentData.data may contain the actual JSON as a string
|
|
if "documentData" in resultData and isinstance(resultData.get("documentData"), dict):
|
|
innerData = resultData["documentData"].get("data")
|
|
if innerData and isinstance(innerData, str):
|
|
try:
|
|
# Parse the inner JSON string
|
|
resultData = json.loads(innerData)
|
|
logger.debug(f"Parsed nested documentData.data structure for folder parsing")
|
|
except json.JSONDecodeError:
|
|
logger.debug(f"documentData.data is not valid JSON, using as-is")
|
|
|
|
foundDocuments = resultData.get("foundDocuments", [])
|
|
|
|
if foundDocuments:
|
|
# Extract folder path from first found document
|
|
firstDoc = foundDocuments[0]
|
|
parentPath = firstDoc.get("parentPath", "")
|
|
if parentPath:
|
|
folderPath = parentPath
|
|
|
|
# Extract site information
|
|
siteName = firstDoc.get("siteName")
|
|
siteId = firstDoc.get("siteId")
|
|
|
|
if siteName and siteId:
|
|
sites = [{
|
|
"id": siteId,
|
|
"displayName": siteName,
|
|
"webUrl": firstDoc.get("webUrl", "")
|
|
}]
|
|
elif siteName:
|
|
# Discover sites to find siteId
|
|
allSites = await self.method.siteDiscovery.discoverSharePointSites()
|
|
matchingSites = self.method.siteDiscovery.filterSitesByHint(allSites, siteName)
|
|
if matchingSites:
|
|
sites = [{
|
|
"id": matchingSites[0].get("id"),
|
|
"displayName": siteName,
|
|
"webUrl": matchingSites[0].get("webUrl", "")
|
|
}]
|
|
|
|
# For uploadDocument: filesToUpload are the chatDocuments themselves
|
|
# (they contain the files to upload)
|
|
filesToUpload = chatDocuments
|
|
|
|
except json.JSONDecodeError:
|
|
# Not a findDocumentPath result - treat as regular files to upload
|
|
filesToUpload = chatDocuments
|
|
else:
|
|
# No fileData - treat as regular files to upload
|
|
filesToUpload = chatDocuments
|
|
|
|
return folderPath, sites, filesToUpload, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing documentList for folder: {str(e)}")
|
|
return None, None, None, f"Error parsing documentList for folder: {str(e)}"
|
|
|