gateway/modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py
2026-01-26 12:39:00 +01:00

745 lines
30 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Action to extract expenses from PDF documents in SharePoint and save to TrusteePosition.
Process:
1. Read PDF files from SharePoint folder (max 50 files per execution)
2. FOR EACH PDF document:
a. AI call to extract expense data in CSV format
b. If 0 records: move to "error" folder
c. Validate/calculate VAT, complete valuta/transactionDateTime
d. Save all records to TrusteePosition
e. Move document to "processed" subfolder with timestamp prefix
"""
import logging
import time
import json
import csv
import io
import asyncio
from datetime import datetime, UTC
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
logger = logging.getLogger(__name__)
# Configuration
MAX_FILES_PER_EXECUTION = 50
ALLOWED_TAGS = ["customer", "meeting", "license", "subscription", "fuel", "food", "material"]
RATE_LIMIT_WAIT_SECONDS = 60
async def getExpensesFromPdf(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract expenses from PDF documents in SharePoint and save to TrusteePosition.
Parameters:
- connectionReference (str): Microsoft connection label
- sharepointFolder (str): SharePoint folder path (e.g., /sites/MySite/Documents/Expenses)
- featureInstanceId (str): Feature instance ID for TrusteePosition
- prompt (str): AI prompt for content extraction
Returns:
ActionResult with success status and processing summary
"""
operationId = None
processedDocuments = []
skippedDocuments = []
errorDocuments = []
totalPositions = 0
try:
# Initialize progress tracking
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"sharepoint_expenses_{workflowId}_{int(time.time())}"
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"Extract Expenses from PDF",
"SharePoint PDF Processing",
"Initializing expense extraction",
parentOperationId=parentOperationId
)
# Extract and validate parameters
connectionReference = parameters.get("connectionReference")
sharepointFolder = parameters.get("sharepointFolder")
featureInstanceId = parameters.get("featureInstanceId")
prompt = parameters.get("prompt")
if not connectionReference:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="connectionReference is required")
if not sharepointFolder:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="sharepointFolder is required")
if not featureInstanceId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="featureInstanceId is required")
if not prompt:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="prompt is required")
# Get Microsoft connection
self.services.chat.progressLogUpdate(operationId, 0.05, "Getting Microsoft connection")
connection = self.connection.getMicrosoftConnection(connectionReference)
if not connection:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid Microsoft connection found")
# Find site and folder info
self.services.chat.progressLogUpdate(operationId, 0.1, "Resolving SharePoint site")
siteInfo, folderPath = await _resolveSiteAndFolder(self, sharepointFolder)
if not siteInfo:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=f"Could not resolve SharePoint site from path: {sharepointFolder}")
siteId = siteInfo.get("id")
# List PDF files in folder
self.services.chat.progressLogUpdate(operationId, 0.15, "Finding PDF files in folder")
pdfFiles = await _listPdfFilesInFolder(self, siteId, folderPath)
if not pdfFiles:
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(
documents=[ActionDocument(
documentName="expense_extraction_result.json",
documentData=json.dumps({
"status": "no_documents",
"message": "No PDF files found in the specified folder",
"folder": sharepointFolder
}, indent=2),
mimeType="application/json",
validationMetadata={"actionType": "sharepoint.getExpensesFromPdf"}
)]
)
# Limit files
originalFileCount = len(pdfFiles)
if originalFileCount > MAX_FILES_PER_EXECUTION:
logger.warning(f"Found {originalFileCount} PDFs, limiting to {MAX_FILES_PER_EXECUTION}")
pdfFiles = pdfFiles[:MAX_FILES_PER_EXECUTION]
totalFiles = len(pdfFiles)
progressPerFile = 0.7 / totalFiles
# Get Trustee interface
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
trusteeInterface = getTrusteeInterface(
self.services.user,
mandateId=self.services.mandateId,
featureInstanceId=featureInstanceId
)
# Process each PDF
for idx, pdfFile in enumerate(pdfFiles):
currentProgress = 0.2 + (idx * progressPerFile)
fileName = pdfFile.get("name", f"file_{idx}")
fileId = pdfFile.get("id")
self.services.chat.progressLogUpdate(
operationId,
currentProgress,
f"Processing {idx + 1}/{totalFiles}: {fileName}"
)
try:
# Download PDF content
fileContent = await self.services.sharepoint.downloadFile(siteId, fileId)
if not fileContent:
await _moveToErrorFolder(self, siteId, folderPath, fileName)
errorDocuments.append({
"file": fileName,
"error": "Failed to download",
"movedTo": "error/"
})
continue
# AI call to extract expense data
aiResult = await _extractExpensesWithAi(self.services, fileContent, fileName, prompt, featureInstanceId)
if not aiResult.get("success"):
await _moveToErrorFolder(self, siteId, folderPath, fileName)
errorDocuments.append({
"file": fileName,
"error": aiResult.get("error", "AI extraction failed"),
"movedTo": "error/"
})
continue
records = aiResult.get("records", [])
# Check for empty records
if not records:
logger.warning(f"Document {fileName}: No records extracted, moving to error folder")
await _moveToErrorFolder(self, siteId, folderPath, fileName)
skippedDocuments.append({
"file": fileName,
"reason": "No expense records extracted",
"movedTo": "error/"
})
continue
# Validate and enrich records
validatedRecords = _validateAndEnrichRecords(records, fileName)
# Save to TrusteePosition
savedCount = _saveToTrusteePosition(trusteeInterface, validatedRecords, featureInstanceId, self.services.mandateId)
totalPositions += savedCount
# Move document to "processed" subfolder
timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
newFileName = f"{timestamp}_{fileName}"
moveSuccess = await _moveToProcessedFolder(self, siteId, folderPath, fileName, newFileName)
processedDocuments.append({
"file": fileName,
"newLocation": f"processed/{newFileName}" if moveSuccess else "move_failed",
"recordsExtracted": len(validatedRecords),
"recordsSaved": savedCount
})
except Exception as e:
errorMsg = str(e)
logger.error(f"Error processing {fileName}: {errorMsg}")
# Handle rate limit
if "429" in errorMsg or "throttl" in errorMsg.lower():
logger.warning(f"Rate limit hit, waiting {RATE_LIMIT_WAIT_SECONDS} seconds")
await asyncio.sleep(RATE_LIMIT_WAIT_SECONDS)
await _moveToErrorFolder(self, siteId, folderPath, fileName)
errorDocuments.append({
"file": fileName,
"error": errorMsg,
"movedTo": "error/"
})
# Create result summary
self.services.chat.progressLogUpdate(operationId, 0.95, "Creating result summary")
remainingFiles = max(0, originalFileCount - MAX_FILES_PER_EXECUTION)
resultSummary = {
"status": "completed",
"folder": sharepointFolder,
"featureInstanceId": featureInstanceId,
"summary": {
"totalFilesFound": originalFileCount,
"filesProcessedThisRun": totalFiles,
"remainingFiles": remainingFiles,
"successfulDocuments": len(processedDocuments),
"skippedDocuments": len(skippedDocuments),
"errorDocuments": len(errorDocuments),
"totalPositionsSaved": totalPositions
},
"processedDocuments": processedDocuments,
"skippedDocuments": skippedDocuments,
"errorDocuments": errorDocuments
}
if remainingFiles > 0:
resultSummary["note"] = f"{remainingFiles} files remaining for next execution"
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(
documents=[ActionDocument(
documentName="expense_extraction_result.json",
documentData=json.dumps(resultSummary, indent=2),
mimeType="application/json",
validationMetadata={
"actionType": "sharepoint.getExpensesFromPdf",
"sharepointFolder": sharepointFolder,
"featureInstanceId": featureInstanceId,
"totalPositions": totalPositions
}
)]
)
except Exception as e:
logger.error(f"Error in getExpensesFromPdf: {str(e)}")
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error=str(e))
async def _resolveSiteAndFolder(self, sharepointFolder: str) -> tuple:
"""Resolve SharePoint site and folder path from the given path."""
try:
# Parse path format: /sites/SiteName/FolderPath
if sharepointFolder.startswith('/sites/'):
parts = sharepointFolder[7:].split('/', 1) # Remove '/sites/' prefix
if len(parts) >= 1:
siteName = parts[0]
folderPath = parts[1] if len(parts) > 1 else ""
# Try to find site by name
sites, _ = await self.siteDiscovery.resolveSitesFromPathQuery(sharepointFolder)
if sites:
return sites[0], folderPath
# Fallback: try to resolve via siteDiscovery
sites, _ = await self.siteDiscovery.resolveSitesFromPathQuery(sharepointFolder)
if sites:
return sites[0], ""
return None, None
except Exception as e:
logger.error(f"Error resolving site and folder: {str(e)}")
return None, None
async def _listPdfFilesInFolder(self, siteId: str, folderPath: str) -> List[Dict[str, Any]]:
"""List PDF files in the given folder."""
try:
import urllib.parse
# Build endpoint
if not folderPath or folderPath == "/":
endpoint = f"sites/{siteId}/drive/root/children"
else:
cleanPath = folderPath.strip('/')
encodedPath = urllib.parse.quote(cleanPath, safe='/')
endpoint = f"sites/{siteId}/drive/root:/{encodedPath}:/children"
result = await self.apiClient.makeGraphApiCall(endpoint)
if "error" in result:
logger.error(f"Error listing folder: {result['error']}")
return []
items = result.get("value", [])
# Filter for PDF files only
pdfFiles = []
for item in items:
name = item.get("name", "")
if name.lower().endswith('.pdf') and "file" in item:
pdfFiles.append({
"id": item.get("id"),
"name": name,
"size": item.get("size", 0),
"webUrl": item.get("webUrl"),
"lastModifiedDateTime": item.get("lastModifiedDateTime")
})
logger.info(f"Found {len(pdfFiles)} PDF files in folder")
return pdfFiles
except Exception as e:
logger.error(f"Error listing PDF files: {str(e)}")
return []
async def _extractExpensesWithAi(services, fileContent: bytes, fileName: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]:
"""
Call AI service to extract expense data from PDF content.
Uses the full AI service pipeline which handles:
- Document extraction (text + images)
- Intent analysis
- Chunking for large documents
- Vision processing for images
"""
try:
import uuid
# Ensure AI is initialized
await services.ai.ensureAiObjectsInitialized()
# Step 1: Store file temporarily in database so AI service can access it
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList
dbInterface = getDbInterface(services.user, mandateId=services.mandateId, featureInstanceId=featureInstanceId)
# Create file record
fileItem = dbInterface.createFile(
name=fileName,
mimeType="application/pdf",
content=fileContent
)
# Store file data
dbInterface.createFileData(fileItem.id, fileContent)
logger.info(f"Stored PDF {fileName} ({len(fileContent)} bytes) with fileId: {fileItem.id}")
# Step 2: Create ChatDocument referencing the file
documentId = str(uuid.uuid4())
chatDocument = ChatDocument(
id=documentId,
mandateId=services.mandateId or "",
featureInstanceId=featureInstanceId or "",
messageId="", # Will be set when attached to message
fileId=fileItem.id,
fileName=fileName,
fileSize=len(fileContent),
mimeType="application/pdf"
)
# Step 3: Create a proper message with the document attached to the workflow
# This ensures getChatDocumentsFromDocumentList can find the document via workflow.messages
messageData = {
"id": f"msg_expense_import_{str(uuid.uuid4())[:8]}",
"documentsLabel": f"expense_pdf_{fileName}",
"role": "user",
"status": "step",
"message": f"PDF document for expense extraction: {fileName}"
}
# Use storeMessageWithDocuments to properly create message + document and sync with workflow
createdMessage = services.chat.storeMessageWithDocuments(
services.workflow,
messageData,
[chatDocument.model_dump()]
)
# Update documentId to match the created document's actual ID
if createdMessage and createdMessage.documents:
documentId = createdMessage.documents[0].id
logger.info(f"Created message {createdMessage.id} with ChatDocument {documentId} for AI processing")
# Step 4: Create DocumentReferenceList for AI service
from modules.datamodels.datamodelDocref import DocumentItemReference
documentList = DocumentReferenceList(
references=[
DocumentItemReference(
documentId=documentId,
fileName=fileName
)
]
)
# Step 5: Call AI with documentList - let AI service handle everything
# (extraction, intent analysis, chunking, image processing)
options = AiCallOptions(
resultFormat="csv",
operationType=OperationTypeEnum.DATA_EXTRACT
)
aiResponse = await services.ai.callAiContent(
prompt=prompt,
options=options,
documentList=documentList,
contentParts=None, # Let AI service extract from documents
outputFormat="csv"
)
if not aiResponse or not aiResponse.content:
return {"success": False, "error": "AI returned empty response"}
# Parse CSV response
csvContent = aiResponse.content
records = _parseCsvToRecords(csvContent)
return {"success": True, "records": records}
except Exception as e:
logger.error(f"AI extraction error for {fileName}: {str(e)}")
return {"success": False, "error": str(e)}
def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
"""Parse CSV content to list of expense records."""
records = []
try:
# Clean up CSV content - remove markdown code blocks if present
content = csvContent.strip()
if content.startswith("```"):
lines = content.split('\n')
# Remove first and last line if they're code block markers
if lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
content = '\n'.join(lines)
reader = csv.DictReader(io.StringIO(content))
for row in reader:
# Clean up keys (remove whitespace)
cleanedRow = {k.strip(): v.strip() if isinstance(v, str) else v for k, v in row.items()}
records.append(cleanedRow)
except Exception as e:
logger.error(f"Error parsing CSV: {str(e)}")
return records
def _validateAndEnrichRecords(records: List[Dict[str, Any]], sourceFileName: str) -> List[Dict[str, Any]]:
"""
Validate and enrich expense records:
1. Calculate/correct VAT amount
2. Complete valuta/transactionDateTime if one is missing
3. Validate tags
"""
enrichedRecords = []
for record in records:
enriched = record.copy()
# VAT calculation/validation
vatPercentage = _parseFloat(record.get("vatPercentage", 0))
vatAmount = _parseFloat(record.get("vatAmount", 0))
bookingAmount = _parseFloat(record.get("bookingAmount", 0))
if vatPercentage > 0 and bookingAmount > 0:
# Calculate expected VAT amount (VAT is included in bookingAmount)
expectedVat = bookingAmount * vatPercentage / (100 + vatPercentage)
# If vatAmount is missing or significantly different, recalculate
if vatAmount == 0 or abs(vatAmount - expectedVat) > 0.01:
enriched["vatAmount"] = round(expectedVat, 2)
logger.debug(f"VAT amount corrected: {vatAmount} -> {enriched['vatAmount']}")
# Valuta / transactionDateTime completion
valuta = record.get("valuta")
transactionDateTime = record.get("transactionDateTime")
if valuta and not transactionDateTime:
try:
dt = datetime.strptime(str(valuta).strip(), "%Y-%m-%d")
enriched["transactionDateTime"] = dt.replace(hour=12).timestamp()
except:
pass
elif transactionDateTime and not valuta:
try:
ts = float(transactionDateTime)
dt = datetime.fromtimestamp(ts, UTC)
enriched["valuta"] = dt.strftime("%Y-%m-%d")
except:
pass
# Validate tags
tags = record.get("tags", "")
if tags:
tagList = [t.strip().lower() for t in str(tags).split(",")]
validTags = [t for t in tagList if t in ALLOWED_TAGS]
enriched["tags"] = ",".join(validTags)
# Store source file info in description
existingDesc = record.get("desc", "")
if sourceFileName and sourceFileName not in str(existingDesc):
enriched["desc"] = f"[Source: {sourceFileName}]\n{existingDesc}"
enrichedRecords.append(enriched)
return enrichedRecords
def _parseFloat(value) -> float:
"""Safely parse float value."""
try:
if value is None or value == "":
return 0.0
return float(value)
except (ValueError, TypeError):
return 0.0
def _saveToTrusteePosition(trusteeInterface, records: List[Dict[str, Any]], featureInstanceId: str, mandateId: str) -> int:
"""Save validated records to TrusteePosition table."""
savedCount = 0
for record in records:
try:
position = {
"valuta": record.get("valuta"),
"transactionDateTime": record.get("transactionDateTime"),
"company": record.get("company", ""),
"desc": record.get("desc", ""),
"tags": record.get("tags", ""),
"bookingCurrency": record.get("bookingCurrency", "CHF"),
"bookingAmount": _parseFloat(record.get("bookingAmount", 0)),
"originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"),
"originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)),
"vatPercentage": _parseFloat(record.get("vatPercentage", 0)),
"vatAmount": _parseFloat(record.get("vatAmount", 0)),
"featureInstanceId": featureInstanceId,
"mandateId": mandateId
}
result = trusteeInterface.createPosition(position)
if result:
savedCount += 1
logger.debug(f"Saved position: {position.get('company')} - {position.get('bookingAmount')}")
except Exception as e:
logger.error(f"Failed to save position: {str(e)}")
return savedCount
async def _ensureFolderExists(self, siteId: str, folderPath: str) -> bool:
"""Create folder if it doesn't exist."""
try:
import urllib.parse
# Check if folder exists
cleanPath = folderPath.strip('/')
encodedPath = urllib.parse.quote(cleanPath, safe='/')
checkEndpoint = f"sites/{siteId}/drive/root:/{encodedPath}"
result = await self.apiClient.makeGraphApiCall(checkEndpoint)
if "error" not in result:
return True # Folder exists
# Create folder - need to create parent first if nested
pathParts = cleanPath.split('/')
currentPath = ""
for part in pathParts:
parentPath = currentPath if currentPath else "root"
currentPath = f"{currentPath}/{part}" if currentPath else part
# Check if this level exists
checkPath = urllib.parse.quote(currentPath, safe='/')
checkResult = await self.apiClient.makeGraphApiCall(f"sites/{siteId}/drive/root:/{checkPath}")
if "error" in checkResult:
# Create this folder
if parentPath == "root":
createEndpoint = f"sites/{siteId}/drive/root/children"
else:
encodedParent = urllib.parse.quote(parentPath, safe='/')
createEndpoint = f"sites/{siteId}/drive/root:/{encodedParent}:/children"
createData = json.dumps({
"name": part,
"folder": {},
"@microsoft.graph.conflictBehavior": "fail"
}).encode('utf-8')
createResult = await self.apiClient.makeGraphApiCall(createEndpoint, method="POST", data=createData)
if "error" in createResult:
logger.warning(f"Failed to create folder {part}: {createResult['error']}")
return False
logger.info(f"Created folder: {currentPath}")
return True
except Exception as e:
logger.error(f"Failed to ensure folder exists: {str(e)}")
return False
async def _moveToProcessedFolder(self, siteId: str, sourceFolderPath: str, sourceFileName: str, destFileName: str) -> bool:
"""Move processed PDF to 'processed' subfolder."""
try:
# Build processed folder path
cleanSource = sourceFolderPath.strip('/')
processedFolder = f"{cleanSource}/processed" if cleanSource else "processed"
# Ensure processed folder exists
await _ensureFolderExists(self, siteId, processedFolder)
# Copy file to new location
await self.services.sharepoint.copyFileAsync(
siteId=siteId,
sourceFolder=cleanSource if cleanSource else "/",
sourceFile=sourceFileName,
destFolder=processedFolder,
destFile=destFileName
)
# Delete original file
await _deleteFile(self, siteId, sourceFolderPath, sourceFileName)
logger.info(f"Moved {sourceFileName} to processed/{destFileName}")
return True
except Exception as e:
logger.error(f"Failed to move file to processed: {str(e)}")
return False
async def _moveToErrorFolder(self, siteId: str, sourceFolderPath: str, sourceFileName: str) -> bool:
"""Move failed PDF to 'error' subfolder (filename unchanged)."""
try:
# Build error folder path
cleanSource = sourceFolderPath.strip('/')
errorFolder = f"{cleanSource}/error" if cleanSource else "error"
# Ensure error folder exists
await _ensureFolderExists(self, siteId, errorFolder)
# Copy file to error folder (keep original name)
await self.services.sharepoint.copyFileAsync(
siteId=siteId,
sourceFolder=cleanSource if cleanSource else "/",
sourceFile=sourceFileName,
destFolder=errorFolder,
destFile=sourceFileName # Same filename
)
# Delete original file
await _deleteFile(self, siteId, sourceFolderPath, sourceFileName)
logger.info(f"Moved {sourceFileName} to error/")
return True
except Exception as e:
logger.error(f"Failed to move file to error folder: {str(e)}")
return False
async def _deleteFile(self, siteId: str, folderPath: str, fileName: str) -> bool:
"""Delete file from SharePoint."""
try:
import urllib.parse
cleanPath = folderPath.strip('/')
filePath = f"{cleanPath}/{fileName}" if cleanPath else fileName
encodedPath = urllib.parse.quote(filePath, safe='/')
endpoint = f"sites/{siteId}/drive/root:/{encodedPath}"
# Get file ID first
fileInfo = await self.apiClient.makeGraphApiCall(endpoint)
if "error" in fileInfo:
logger.warning(f"File not found for deletion: {filePath}")
return False
fileId = fileInfo.get("id")
if not fileId:
return False
# Delete by ID
deleteEndpoint = f"sites/{siteId}/drive/items/{fileId}"
# Make DELETE request
if self.services.sharepoint.accessToken is None:
logger.error("Access token not set for delete")
return False
import aiohttp
headers = {"Authorization": f"Bearer {self.services.sharepoint.accessToken}"}
url = f"https://graph.microsoft.com/v1.0/{deleteEndpoint}"
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers) as response:
if response.status in [200, 204]:
logger.debug(f"Deleted file: {filePath}")
return True
else:
errorText = await response.text()
logger.warning(f"Delete failed: {response.status} - {errorText}")
return False
except Exception as e:
logger.error(f"Failed to delete file: {str(e)}")
return False