gateway/modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py
2026-02-21 00:56:53 +01:00

836 lines
35 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Action to extract expenses from PDF documents in SharePoint and save to TrusteePosition.
Process:
1. Read PDF files from SharePoint folder (max 50 files per execution)
2. FOR EACH PDF document:
a. AI call to extract expense data in CSV format
b. If 0 records: move to "error" folder
c. Validate/calculate VAT, complete valuta/transactionDateTime
d. Save all records to TrusteePosition
e. Move document to "processed" subfolder with timestamp prefix
"""
import logging
import time
import json
import csv
import io
import asyncio
from datetime import datetime, UTC
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
logger = logging.getLogger(__name__)
# Configuration
MAX_FILES_PER_EXECUTION = 50
MAX_CONCURRENT_AI_TASKS = 10 # Limit concurrent AI calls to avoid rate limits
ALLOWED_TAGS = ["customer", "meeting", "license", "subscription", "fuel", "food", "material"]
RATE_LIMIT_WAIT_SECONDS = 60
async def getExpensesFromPdf(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract expenses from PDF documents in SharePoint and save to TrusteePosition.
Parameters:
- connectionReference (str): Microsoft connection label
- sharepointFolder (str): SharePoint folder path (e.g., /sites/MySite/Documents/Expenses)
- featureInstanceId (str): Feature instance ID for TrusteePosition
- prompt (str): AI prompt for content extraction
Returns:
ActionResult with success status and processing summary
"""
operationId = None
processedDocuments = []
skippedDocuments = []
errorDocuments = []
totalPositions = 0
try:
# Initialize progress tracking
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"sharepoint_expenses_{workflowId}_{int(time.time())}"
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"Extract Expenses from PDF",
"SharePoint PDF Processing",
"Initializing expense extraction",
parentOperationId=parentOperationId
)
# Extract and validate parameters
connectionReference = parameters.get("connectionReference")
sharepointFolder = parameters.get("sharepointFolder")
featureInstanceId = parameters.get("featureInstanceId")
prompt = parameters.get("prompt")
if not connectionReference:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="connectionReference is required")
if not sharepointFolder:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="sharepointFolder is required")
if not featureInstanceId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="featureInstanceId is required")
if not prompt:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="prompt is required")
# Get Microsoft connection
self.services.chat.progressLogUpdate(operationId, 0.05, "Getting Microsoft connection")
connection = self.connection.getMicrosoftConnection(connectionReference)
if not connection:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid Microsoft connection found")
# Set access token for SharePoint service
if not self.services.sharepoint.setAccessTokenFromConnection(connection):
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="Failed to set SharePoint access token")
# Find site and folder info
self.services.chat.progressLogUpdate(operationId, 0.1, "Resolving SharePoint site")
siteInfo, folderPath = await _resolveSiteAndFolder(self, sharepointFolder)
if not siteInfo:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=f"Could not resolve SharePoint site from path: {sharepointFolder}")
siteId = siteInfo.get("id")
# List PDF files in folder
self.services.chat.progressLogUpdate(operationId, 0.15, "Finding PDF files in folder")
pdfFiles = await _listPdfFilesInFolder(self, siteId, folderPath)
if not pdfFiles:
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(
documents=[ActionDocument(
documentName="expense_extraction_result.json",
documentData=json.dumps({
"status": "no_documents",
"message": "No PDF files found in the specified folder",
"folder": sharepointFolder
}, indent=2),
mimeType="application/json",
validationMetadata={"actionType": "sharepoint.getExpensesFromPdf"}
)]
)
# Limit files
originalFileCount = len(pdfFiles)
if originalFileCount > MAX_FILES_PER_EXECUTION:
logger.warning(f"Found {originalFileCount} PDFs, limiting to {MAX_FILES_PER_EXECUTION}")
pdfFiles = pdfFiles[:MAX_FILES_PER_EXECUTION]
totalFiles = len(pdfFiles)
progressPerFile = 0.7 / totalFiles
# Get Trustee interface
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
trusteeInterface = getTrusteeInterface(
self.services.user,
mandateId=self.services.mandateId,
featureInstanceId=featureInstanceId
)
# Process PDFs in parallel with semaphore to limit concurrent AI calls
semaphore = asyncio.Semaphore(MAX_CONCURRENT_AI_TASKS)
completedCount = [0] # Use list for mutable reference in closure
async def processSinglePdf(idx: int, pdfFile: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single PDF document. Returns result dict."""
fileName = pdfFile.get("name", f"file_{idx}")
fileId = pdfFile.get("id")
async with semaphore:
# Update progress (thread-safe via asyncio)
completedCount[0] += 1
currentProgress = 0.2 + (completedCount[0] * progressPerFile)
self.services.chat.progressLogUpdate(
operationId,
min(currentProgress, 0.9),
f"Processing {completedCount[0]}/{totalFiles}: {fileName}"
)
try:
# Download PDF content
fileContent = await self.services.sharepoint.downloadFile(siteId, fileId)
if not fileContent:
await _moveToErrorFolder(self, siteId, folderPath, fileName)
return {"type": "error", "file": fileName, "error": "Failed to download", "movedTo": "error/"}
# AI call to extract expense data (this is the bottleneck - parallelized)
aiResult = await _extractExpensesWithAi(self.services, fileContent, fileName, prompt, featureInstanceId)
if not aiResult.get("success"):
await _moveToErrorFolder(self, siteId, folderPath, fileName)
return {"type": "error", "file": fileName, "error": aiResult.get("error", "AI extraction failed"), "movedTo": "error/"}
records = aiResult.get("records", [])
fileId = aiResult.get("fileId")
# Check for empty records
if not records:
logger.warning(f"Document {fileName}: No records extracted, moving to error folder")
await _moveToErrorFolder(self, siteId, folderPath, fileName)
return {"type": "skipped", "file": fileName, "reason": "No expense records extracted", "movedTo": "error/"}
# Validate and enrich records
validatedRecords = _validateAndEnrichRecords(records, fileName)
# Save to TrusteePosition and create Document + Position-Document links
savedCount = _saveToTrusteePosition(
trusteeInterface,
validatedRecords,
featureInstanceId,
self.services.mandateId,
fileId=fileId,
fileName=fileName,
sourceLocation=sharepointFolder
)
# Move document to "processed" subfolder
timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
newFileName = f"{timestamp}_{fileName}"
moveSuccess = await _moveToProcessedFolder(self, siteId, folderPath, fileName, newFileName)
return {
"type": "processed",
"file": fileName,
"newLocation": f"processed/{newFileName}" if moveSuccess else "move_failed",
"recordsExtracted": len(validatedRecords),
"recordsSaved": savedCount
}
except Exception as e:
errorMsg = str(e)
logger.error(f"Error processing {fileName}: {errorMsg}")
# Handle rate limit
if "429" in errorMsg or "throttl" in errorMsg.lower():
logger.warning(f"Rate limit hit, waiting {RATE_LIMIT_WAIT_SECONDS} seconds")
await asyncio.sleep(RATE_LIMIT_WAIT_SECONDS)
await _moveToErrorFolder(self, siteId, folderPath, fileName)
return {"type": "error", "file": fileName, "error": errorMsg, "movedTo": "error/"}
# Execute all PDF processing tasks in parallel (limited by semaphore)
logger.info(f"Starting parallel processing of {totalFiles} PDFs (max {MAX_CONCURRENT_AI_TASKS} concurrent)")
tasks = [processSinglePdf(idx, pdfFile) for idx, pdfFile in enumerate(pdfFiles)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Collect results
for result in results:
if isinstance(result, Exception):
errorDocuments.append({"file": "unknown", "error": str(result), "movedTo": "error/"})
elif result.get("type") == "processed":
processedDocuments.append(result)
totalPositions += result.get("recordsSaved", 0)
elif result.get("type") == "skipped":
skippedDocuments.append(result)
elif result.get("type") == "error":
errorDocuments.append(result)
# Create result summary
self.services.chat.progressLogUpdate(operationId, 0.95, "Creating result summary")
remainingFiles = max(0, originalFileCount - MAX_FILES_PER_EXECUTION)
resultSummary = {
"status": "completed",
"folder": sharepointFolder,
"featureInstanceId": featureInstanceId,
"summary": {
"totalFilesFound": originalFileCount,
"filesProcessedThisRun": totalFiles,
"remainingFiles": remainingFiles,
"successfulDocuments": len(processedDocuments),
"skippedDocuments": len(skippedDocuments),
"errorDocuments": len(errorDocuments),
"totalPositionsSaved": totalPositions
},
"processedDocuments": processedDocuments,
"skippedDocuments": skippedDocuments,
"errorDocuments": errorDocuments
}
if remainingFiles > 0:
resultSummary["note"] = f"{remainingFiles} files remaining for next execution"
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(
documents=[ActionDocument(
documentName="expense_extraction_result.json",
documentData=json.dumps(resultSummary, indent=2),
mimeType="application/json",
validationMetadata={
"actionType": "sharepoint.getExpensesFromPdf",
"sharepointFolder": sharepointFolder,
"featureInstanceId": featureInstanceId,
"totalPositions": totalPositions
}
)]
)
except Exception as e:
logger.error(f"Error in getExpensesFromPdf: {str(e)}")
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=str(e))
async def _resolveSiteAndFolder(self, sharepointFolder: str) -> tuple:
"""Resolve SharePoint site and folder path from the given path."""
try:
# Parse path format: /sites/SiteName/FolderPath
if sharepointFolder.startswith('/sites/'):
parts = sharepointFolder[7:].split('/', 1) # Remove '/sites/' prefix
if len(parts) >= 1:
siteName = parts[0]
folderPath = parts[1] if len(parts) > 1 else ""
# Try to find site by name
sites, _ = await self.siteDiscovery.resolveSitesFromPathQuery(sharepointFolder)
if sites:
return sites[0], folderPath
# Fallback: try to resolve via siteDiscovery
sites, _ = await self.siteDiscovery.resolveSitesFromPathQuery(sharepointFolder)
if sites:
return sites[0], ""
return None, None
except Exception as e:
logger.error(f"Error resolving site and folder: {str(e)}")
return None, None
async def _listPdfFilesInFolder(self, siteId: str, folderPath: str) -> List[Dict[str, Any]]:
"""List PDF files in the given folder."""
try:
import urllib.parse
# Build endpoint
if not folderPath or folderPath == "/":
endpoint = f"sites/{siteId}/drive/root/children"
else:
cleanPath = folderPath.strip('/')
encodedPath = urllib.parse.quote(cleanPath, safe='/')
endpoint = f"sites/{siteId}/drive/root:/{encodedPath}:/children"
result = await self.apiClient.makeGraphApiCall(endpoint)
if "error" in result:
logger.error(f"Error listing folder: {result['error']}")
return []
items = result.get("value", [])
# Filter for PDF files only
pdfFiles = []
for item in items:
name = item.get("name", "")
if name.lower().endswith('.pdf') and "file" in item:
pdfFiles.append({
"id": item.get("id"),
"name": name,
"size": item.get("size", 0),
"webUrl": item.get("webUrl"),
"lastModifiedDateTime": item.get("lastModifiedDateTime")
})
logger.info(f"Found {len(pdfFiles)} PDF files in folder")
return pdfFiles
except Exception as e:
logger.error(f"Error listing PDF files: {str(e)}")
return []
async def _extractExpensesWithAi(services, fileContent: bytes, fileName: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]:
"""
Call AI service to extract expense data from PDF content.
Uses the full AI service pipeline which handles:
- Document extraction (text + images)
- Intent analysis
- Chunking for large documents
- Vision processing for images
"""
try:
import uuid
# Ensure AI is initialized
await services.ai.ensureAiObjectsInitialized()
# Step 1: Store file temporarily in database so AI service can access it
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList
dbInterface = getDbInterface(services.user, mandateId=services.mandateId, featureInstanceId=featureInstanceId)
# Create file record
fileItem = dbInterface.createFile(
name=fileName,
mimeType="application/pdf",
content=fileContent
)
# Store file data
dbInterface.createFileData(fileItem.id, fileContent)
logger.info(f"Stored PDF {fileName} ({len(fileContent)} bytes) with fileId: {fileItem.id}")
# Step 2: Create ChatDocument referencing the file
documentId = str(uuid.uuid4())
chatDocument = ChatDocument(
id=documentId,
mandateId=services.mandateId or "",
featureInstanceId=featureInstanceId or "",
messageId="", # Will be set when attached to message
fileId=fileItem.id,
fileName=fileName,
fileSize=len(fileContent),
mimeType="application/pdf"
)
# Step 3: Create a proper message with the document attached to the workflow
# This ensures getChatDocumentsFromDocumentList can find the document via workflow.messages
messageData = {
"id": f"msg_expense_import_{str(uuid.uuid4())[:8]}",
"documentsLabel": f"expense_pdf_{fileName}",
"role": "user",
"status": "step",
"message": f"PDF document for expense extraction: {fileName}"
}
# Use storeMessageWithDocuments to properly create message + document and sync with workflow
createdMessage = services.chat.storeMessageWithDocuments(
services.workflow,
messageData,
[chatDocument.model_dump()]
)
# Update documentId to match the created document's actual ID
if createdMessage and createdMessage.documents:
documentId = createdMessage.documents[0].id
logger.info(f"Created message {createdMessage.id} with ChatDocument {documentId} for AI processing")
# Step 4: Create DocumentReferenceList for AI service
from modules.datamodels.datamodelDocref import DocumentItemReference
documentList = DocumentReferenceList(
references=[
DocumentItemReference(
documentId=documentId,
fileName=fileName
)
]
)
# Step 5: Call AI with documentList - let AI service handle everything
# (extraction, intent analysis, chunking, image processing)
# Use DATA_GENERATE (same path as ai.process) which handles chunking correctly
options = AiCallOptions(
resultFormat="csv",
operationType=OperationTypeEnum.DATA_GENERATE
)
aiResponse = await services.ai.callAiContent(
prompt=prompt,
options=options,
documentList=documentList,
contentParts=None, # Let AI service extract from documents
outputFormat="csv",
generationIntent="extract" # Signal this is extraction, not document generation
)
if not aiResponse:
return {"success": False, "error": "AI returned empty response"}
# Get CSV from rendered documents (not from content - that's the internal structure)
if not aiResponse.documents or len(aiResponse.documents) == 0:
return {"success": False, "error": "AI returned no documents"}
# Get the CSV content from the first document
csvDocument = aiResponse.documents[0]
csvContent = csvDocument.documentData
# documentData is bytes, decode to string
if isinstance(csvContent, bytes):
csvContent = csvContent.decode('utf-8')
logger.info(f"Retrieved CSV content ({len(csvContent)} chars) from rendered document: {csvDocument.documentName}")
records = _parseCsvToRecords(csvContent)
# Return fileId so it can be used to create TrusteeDocument reference
return {"success": True, "records": records, "fileId": fileItem.id}
except Exception as e:
logger.error(f"AI extraction error for {fileName}: {str(e)}")
return {"success": False, "error": str(e)}
def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
"""Parse CSV content to list of expense records."""
records = []
try:
content = csvContent.strip()
# Clean up CSV content - remove markdown code blocks if present
if content.startswith("```"):
lines = content.split('\n')
# Remove first and last line if they're code block markers
if lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
content = '\n'.join(lines)
reader = csv.DictReader(io.StringIO(content))
for row in reader:
# Clean up keys (remove whitespace)
cleanedRow = {k.strip(): v.strip() if isinstance(v, str) else v for k, v in row.items()}
records.append(cleanedRow)
logger.info(f"Parsed {len(records)} records from CSV content")
except Exception as e:
logger.error(f"Error parsing CSV: {str(e)}")
return records
def _validateAndEnrichRecords(records: List[Dict[str, Any]], sourceFileName: str) -> List[Dict[str, Any]]:
"""
Validate and enrich expense records:
1. Calculate/correct VAT amount
2. Complete valuta/transactionDateTime if one is missing
3. Validate tags
"""
enrichedRecords = []
for record in records:
enriched = record.copy()
# VAT calculation/validation
vatPercentage = _parseFloat(record.get("vatPercentage", 0))
vatAmount = _parseFloat(record.get("vatAmount", 0))
bookingAmount = _parseFloat(record.get("bookingAmount", 0))
if vatPercentage > 0 and bookingAmount > 0:
# Calculate expected VAT amount (VAT is included in bookingAmount)
expectedVat = bookingAmount * vatPercentage / (100 + vatPercentage)
# If vatAmount is missing or significantly different, recalculate
if vatAmount == 0 or abs(vatAmount - expectedVat) > 0.01:
enriched["vatAmount"] = round(expectedVat, 2)
logger.debug(f"VAT amount corrected: {vatAmount} -> {enriched['vatAmount']}")
# Valuta / transactionDateTime completion
valuta = record.get("valuta")
transactionDateTime = record.get("transactionDateTime")
if valuta and not transactionDateTime:
try:
dt = datetime.strptime(str(valuta).strip(), "%Y-%m-%d")
enriched["transactionDateTime"] = dt.replace(hour=12).timestamp()
except:
pass
elif transactionDateTime and not valuta:
try:
ts = float(transactionDateTime)
dt = datetime.fromtimestamp(ts, UTC)
enriched["valuta"] = dt.strftime("%Y-%m-%d")
except:
pass
# Validate tags
tags = record.get("tags", "")
if tags:
tagList = [t.strip().lower() for t in str(tags).split(",")]
validTags = [t for t in tagList if t in ALLOWED_TAGS]
enriched["tags"] = ",".join(validTags)
# Store source file info in description
existingDesc = record.get("desc", "")
if sourceFileName and sourceFileName not in str(existingDesc):
enriched["desc"] = f"[Source: {sourceFileName}]\n{existingDesc}"
enrichedRecords.append(enriched)
return enrichedRecords
def _parseFloat(value) -> float:
"""Safely parse float value."""
try:
if value is None or value == "":
return 0.0
return float(value)
except (ValueError, TypeError):
return 0.0
def _saveToTrusteePosition(
trusteeInterface,
records: List[Dict[str, Any]],
featureInstanceId: str,
mandateId: str,
fileId: Optional[str] = None,
fileName: Optional[str] = None,
sourceLocation: Optional[str] = None
) -> int:
"""
Save validated records to TrusteePosition table.
Also creates TrusteeDocument (referencing the source file) and links positions to it.
Args:
trusteeInterface: Trustee interface instance
records: List of expense records to save
featureInstanceId: Feature instance ID
mandateId: Mandate ID
fileId: Optional file ID from central Files table (source PDF)
fileName: Optional file name
sourceLocation: Optional source location (e.g., SharePoint path)
Returns:
Number of positions saved
"""
savedCount = 0
savedPositionIds = []
# Step 1: Create TrusteeDocument referencing the source file
documentId = None
if fileId and fileName:
try:
document = trusteeInterface.createDocument({
"fileId": fileId,
"documentName": fileName,
"documentMimeType": "application/pdf",
"sourceType": "sharepoint",
"sourceLocation": sourceLocation
})
if document:
documentId = document.id
logger.info(f"Created TrusteeDocument {documentId} referencing file {fileId}")
else:
logger.warning(f"Failed to create TrusteeDocument for file {fileId}")
except Exception as e:
logger.error(f"Error creating TrusteeDocument: {str(e)}")
# Step 2: Save positions with direct documentId reference and accounting fields
for record in records:
try:
position = {
"documentId": documentId,
"valuta": record.get("valuta"),
"transactionDateTime": record.get("transactionDateTime"),
"company": record.get("company", ""),
"desc": record.get("desc", ""),
"tags": record.get("tags", ""),
"bookingCurrency": record.get("bookingCurrency", "CHF"),
"bookingAmount": _parseFloat(record.get("bookingAmount", 0)),
"originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"),
"originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)),
"vatPercentage": _parseFloat(record.get("vatPercentage", 0)),
"vatAmount": _parseFloat(record.get("vatAmount", 0)),
"debitAccountNumber": record.get("debitAccountNumber") or None,
"creditAccountNumber": record.get("creditAccountNumber") or None,
"taxCode": record.get("taxCode") or None,
"costCenter": record.get("costCenter") or None,
"bookingReference": record.get("bookingReference") or None,
"featureInstanceId": featureInstanceId,
"mandateId": mandateId
}
result = trusteeInterface.createPosition(position)
if result:
savedCount += 1
savedPositionIds.append(result.id)
logger.debug(f"Saved position: {position.get('company')} - {position.get('bookingAmount')}")
except Exception as e:
logger.error(f"Failed to save position: {str(e)}")
# Step 3: Auto-sync to accounting system if configured
if savedCount > 0 and savedPositionIds:
try:
from modules.features.trustee.accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(trusteeInterface)
configRecord = await bridge.getActiveConfig(featureInstanceId)
if configRecord:
syncResults = await bridge.pushBatchToAccounting(featureInstanceId, savedPositionIds)
syncedCount = sum(1 for r in syncResults if r.success)
logger.info(f"Auto-synced {syncedCount}/{len(savedPositionIds)} positions to accounting system")
except Exception as e:
logger.warning(f"Accounting auto-sync skipped (non-critical): {e}")
return savedCount
async def _ensureFolderExists(self, siteId: str, folderPath: str) -> bool:
"""Create folder if it doesn't exist."""
try:
import urllib.parse
# Check if folder exists
cleanPath = folderPath.strip('/')
encodedPath = urllib.parse.quote(cleanPath, safe='/')
checkEndpoint = f"sites/{siteId}/drive/root:/{encodedPath}"
result = await self.apiClient.makeGraphApiCall(checkEndpoint)
if "error" not in result:
return True # Folder exists
# Create folder - need to create parent first if nested
pathParts = cleanPath.split('/')
currentPath = ""
for part in pathParts:
parentPath = currentPath if currentPath else "root"
currentPath = f"{currentPath}/{part}" if currentPath else part
# Check if this level exists
checkPath = urllib.parse.quote(currentPath, safe='/')
checkResult = await self.apiClient.makeGraphApiCall(f"sites/{siteId}/drive/root:/{checkPath}")
if "error" in checkResult:
# Create this folder
if parentPath == "root":
createEndpoint = f"sites/{siteId}/drive/root/children"
else:
encodedParent = urllib.parse.quote(parentPath, safe='/')
createEndpoint = f"sites/{siteId}/drive/root:/{encodedParent}:/children"
createData = json.dumps({
"name": part,
"folder": {},
"@microsoft.graph.conflictBehavior": "fail"
}).encode('utf-8')
createResult = await self.apiClient.makeGraphApiCall(createEndpoint, method="POST", data=createData)
if "error" in createResult:
logger.warning(f"Failed to create folder {part}: {createResult['error']}")
return False
logger.info(f"Created folder: {currentPath}")
return True
except Exception as e:
logger.error(f"Failed to ensure folder exists: {str(e)}")
return False
async def _moveToProcessedFolder(self, siteId: str, sourceFolderPath: str, sourceFileName: str, destFileName: str) -> bool:
"""Move processed PDF to 'processed' subfolder."""
try:
# Build processed folder path
cleanSource = sourceFolderPath.strip('/')
processedFolder = f"{cleanSource}/processed" if cleanSource else "processed"
# Ensure processed folder exists
await _ensureFolderExists(self, siteId, processedFolder)
# Copy file to new location
await self.services.sharepoint.copyFileAsync(
siteId=siteId,
sourceFolder=cleanSource if cleanSource else "/",
sourceFile=sourceFileName,
destFolder=processedFolder,
destFile=destFileName
)
# Delete original file
await _deleteFile(self, siteId, sourceFolderPath, sourceFileName)
logger.info(f"Moved {sourceFileName} to processed/{destFileName}")
return True
except Exception as e:
logger.error(f"Failed to move file to processed: {str(e)}")
return False
async def _moveToErrorFolder(self, siteId: str, sourceFolderPath: str, sourceFileName: str) -> bool:
"""Move failed PDF to 'error' subfolder (filename unchanged)."""
try:
# Build error folder path
cleanSource = sourceFolderPath.strip('/')
errorFolder = f"{cleanSource}/error" if cleanSource else "error"
# Ensure error folder exists
await _ensureFolderExists(self, siteId, errorFolder)
# Copy file to error folder (keep original name)
await self.services.sharepoint.copyFileAsync(
siteId=siteId,
sourceFolder=cleanSource if cleanSource else "/",
sourceFile=sourceFileName,
destFolder=errorFolder,
destFile=sourceFileName # Same filename
)
# Delete original file
await _deleteFile(self, siteId, sourceFolderPath, sourceFileName)
logger.info(f"Moved {sourceFileName} to error/")
return True
except Exception as e:
logger.error(f"Failed to move file to error folder: {str(e)}")
return False
async def _deleteFile(self, siteId: str, folderPath: str, fileName: str) -> bool:
"""Delete file from SharePoint."""
try:
import urllib.parse
cleanPath = folderPath.strip('/')
filePath = f"{cleanPath}/{fileName}" if cleanPath else fileName
encodedPath = urllib.parse.quote(filePath, safe='/')
endpoint = f"sites/{siteId}/drive/root:/{encodedPath}"
# Get file ID first
fileInfo = await self.apiClient.makeGraphApiCall(endpoint)
if "error" in fileInfo:
logger.warning(f"File not found for deletion: {filePath}")
return False
fileId = fileInfo.get("id")
if not fileId:
return False
# Delete by ID using apiClient
deleteEndpoint = f"sites/{siteId}/drive/items/{fileId}"
result = await self.apiClient.makeGraphApiCall(deleteEndpoint, method="DELETE")
if "error" in result:
logger.warning(f"Delete failed: {result['error']}")
return False
logger.debug(f"Deleted file: {filePath}")
return True
except Exception as e:
logger.error(f"Failed to delete file: {str(e)}")
return False