gateway/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py
patrick-motsch 6b11d66766 fixes
2026-02-22 01:03:19 +01:00

377 lines
18 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Extract document type and structured data from files (PDF, JPG).
Input: fileIds (list) OR connectionReference + sharepointFolder.
Output: ActionResult with one ActionDocument per file: { documentType, extractedData, fileId, fileName }, resultLabel.
"""
import asyncio
import json
import logging
import uuid
import csv
import io
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ActionResult, ActionDocument, ChatDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
logger = logging.getLogger(__name__)
ALLOWED_EXTENSIONS = (".pdf", ".jpg", ".jpeg")
MAX_FILES = 50
_DEFAULT_PROMPT_FALLBACK = (
'Extract document type (one of: INVOICE, EXPENSE_RECEIPT, BANK_DOCUMENT, CONTRACT, UNKNOWN) '
'and expense/position records. Return JSON: {"documentType": "...", "records": [{...}]}. '
'Each record must have: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc (full extracted text), '
'tags (from: customer, meeting, license, subscription, fuel, food, material), '
'bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, '
'debitAccountNumber (Soll-Konto nach Schweizer KMU-Kontenrahmen, z.B. 6200 Fahrzeugaufwand, 6000 Materialaufwand), '
'creditAccountNumber (Haben-Konto, z.B. 1020 Bank), taxCode, costCenter, bookingReference.'
)
async def _buildDefaultPromptWithAccounts(self, featureInstanceId: str) -> str:
"""Build extraction prompt with real expense accounts from the connected accounting system."""
try:
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
from modules.features.trustee.accounting.accountingBridge import AccountingBridge
trusteeInterface = getTrusteeInterface(
self.services.user,
mandateId=self.services.mandateId,
featureInstanceId=featureInstanceId,
)
bridge = AccountingBridge(trusteeInterface)
expenseAccounts = await bridge._getExpenseAccounts(featureInstanceId)
assetAccounts = await bridge.getChartOfAccounts(featureInstanceId, accountType="asset")
except Exception as e:
logger.debug("Could not load chart of accounts for prompt: %s", e)
return ""
if not expenseAccounts:
return ""
expenseList = ", ".join(f"{a.accountNumber} {a.label}" for a in expenseAccounts[:50])
bankAccounts = [a for a in assetAccounts if a.accountNumber.startswith("10")]
bankList = ", ".join(f"{a.accountNumber} {a.label}" for a in bankAccounts[:10]) if bankAccounts else "1020 Bank"
return (
'Extract document type (one of: INVOICE, EXPENSE_RECEIPT, BANK_DOCUMENT, CONTRACT, UNKNOWN) '
'and expense/position records. Return JSON: {"documentType": "...", "records": [{...}]}. '
'Each record must have: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc (full extracted text), '
'tags (from: customer, meeting, license, subscription, fuel, food, material), '
'bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, '
f'debitAccountNumber (Soll-Konto, verwende eines der folgenden Aufwandkonten: {expenseList}), '
f'creditAccountNumber (Haben-Konto, verwende eines der folgenden Konten: {bankList}), '
'taxCode, costCenter, bookingReference.'
)
def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
"""Parse CSV content to list of expense records."""
records = []
try:
content = (csvContent or "").strip()
if content.startswith("```"):
lines = content.split("\n")
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
content = "\n".join(lines)
reader = csv.DictReader(io.StringIO(content))
for row in reader:
cleaned = {k.strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()}
records.append(cleaned)
except Exception as e:
logger.warning(f"Parse CSV: {e}")
return records
async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str, mimeType: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]:
"""Run AI extraction on one file; return { documentType, extractedData (records), fileId, fileName }."""
await self.services.ai.ensureAiObjectsInitialized()
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
docList = DocumentReferenceList(
references=[DocumentItemReference(documentId=chatDocumentId, fileName=fileName)]
)
# Prefer JSON for documentType + records in one response; fallback to CSV
options = AiCallOptions(resultFormat="json", operationType=OperationTypeEnum.DATA_GENERATE)
try:
aiResponse = await self.services.ai.callAiContent(
prompt=prompt or _DEFAULT_PROMPT_FALLBACK,
options=options,
documentList=docList,
contentParts=None,
outputFormat="json",
generationIntent="extract",
)
except Exception:
options = AiCallOptions(resultFormat="csv", operationType=OperationTypeEnum.DATA_GENERATE)
aiResponse = await self.services.ai.callAiContent(
prompt=prompt or _DEFAULT_PROMPT_FALLBACK,
options=options,
documentList=docList,
contentParts=None,
outputFormat="csv",
generationIntent="extract",
)
if not aiResponse or not aiResponse.documents:
return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName}
doc = aiResponse.documents[0]
raw = doc.documentData
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
documentType = "UNKNOWN"
records = []
# Try JSON first
try:
if raw.strip().startswith("{"):
data = json.loads(raw)
# Direct format: {"documentType": "...", "records": [...]}
if "records" in data or "extractedData" in data:
documentType = (data.get("documentType") or "UNKNOWN").upper().replace(" ", "_")
records = data.get("records") or data.get("extractedData") or []
# Wrapped in document structure: {"documents": [{"sections": [{"elements": [{"content": {"code": "..."}}]}]}]}
elif "documents" in data:
for doc in data.get("documents", []):
for section in doc.get("sections", []):
for elem in section.get("elements", []):
code = (elem.get("content") or {}).get("code")
if code and isinstance(code, str):
try:
inner = json.loads(code)
if isinstance(inner, dict) and ("records" in inner or "documentType" in inner):
documentType = (inner.get("documentType") or "UNKNOWN").upper().replace(" ", "_")
records = inner.get("records") or inner.get("extractedData") or []
break
except Exception:
pass
if records:
break
if records:
break
elif "documentType" in data:
documentType = (data.get("documentType") or "UNKNOWN").upper().replace(" ", "_")
except Exception:
pass
# Fallback: CSV
if not records and raw:
records = _parseCsvToRecords(raw)
if records and not documentType or documentType == "UNKNOWN":
documentType = "EXPENSE_RECEIPT"
return {"documentType": documentType, "extractedData": records, "fileId": fileId, "fileName": fileName} # fileId from caller for result
async def _extractOne(
self,
f: Dict[str, Any],
fileIdToChatDocId: Dict[str, str],
prompt: str,
featureInstanceId: str,
) -> ActionDocument:
"""Run extraction for one file; returns success or error ActionDocument (never raises)."""
chatDocId = fileIdToChatDocId.get(f["fileId"])
if not chatDocId:
return ActionDocument(
documentName=(f.get("fileName") or "error") + ".json",
documentData=json.dumps({
"documentType": "UNKNOWN",
"extractedData": [],
"fileId": f["fileId"],
"fileName": f.get("fileName"),
"error": "No ChatDocument id for file",
}),
mimeType="application/json",
)
try:
out = await _extractWithAi(
self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], prompt, featureInstanceId
)
return ActionDocument(
documentName=f.get("fileName", "extract") + ".json",
documentData=json.dumps(out),
mimeType="application/json",
)
except Exception as e:
logger.exception(f"Extract failed for {f.get('fileName')}")
return ActionDocument(
documentName=(f.get("fileName") or "error") + ".json",
documentData=json.dumps({
"documentType": "UNKNOWN",
"extractedData": [],
"fileId": f["fileId"],
"fileName": f.get("fileName"),
"error": str(e),
}),
mimeType="application/json",
)
async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract document type and data from files.
Either fileIds (list of file IDs already in DB) or connectionReference + sharepointFolder (list PDF/JPG, download, store in DB).
Returns one ActionDocument per file with documentData = JSON { documentType, extractedData, fileId, fileName }.
"""
fileIds = parameters.get("fileIds") or []
connectionReference = parameters.get("connectionReference")
sharepointFolder = parameters.get("sharepointFolder")
featureInstanceId = parameters.get("featureInstanceId") or getattr(self.services, "featureInstanceId", None)
prompt = parameters.get("prompt") or ""
if not featureInstanceId:
return ActionResult.isFailure(error="featureInstanceId is required")
filesToProcess = [] # list of { fileId, fileName, mimeType }
sharepointMoveInfo: List[Optional[Dict[str, Any]]] = [] # one entry per file; None if not from SharePoint
if fileIds:
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId)
for fid in (fileIds if isinstance(fileIds, list) else [fileIds]):
if not fid:
continue
rec = db.getFile(fid) if hasattr(db, "getFile") else None
if rec:
fileId = rec.id if hasattr(rec, "id") else rec.get("id", fid)
fileName = getattr(rec, "fileName", None) or rec.get("fileName", rec.get("name", "document"))
mimeType = getattr(rec, "mimeType", None) or rec.get("mimeType", "application/octet-stream")
filesToProcess.append({"fileId": fileId, "fileName": fileName, "mimeType": mimeType})
else:
filesToProcess.append({"fileId": fid, "fileName": "document", "mimeType": "application/octet-stream"})
sharepointMoveInfo.append(None)
elif connectionReference and sharepointFolder:
userConn = self.services.chat.getUserConnectionFromConnectionReference(connectionReference)
if not userConn:
return ActionResult.isFailure(error="No Microsoft connection for connectionReference")
if not self.services.sharepoint.setAccessTokenFromConnection(userConn):
return ActionResult.isFailure(error="Failed to set SharePoint access token")
sites = await self.services.sharepoint.resolveSitesFromPathQuery(sharepointFolder)
if not sites:
return ActionResult.isFailure(error="No SharePoint site found for path")
siteId = sites[0].get("id")
if not siteId:
return ActionResult.isFailure(error="SharePoint site has no id")
parsed = self.services.sharepoint.extractSiteFromStandardPath(sharepointFolder)
folderPath = (parsed.get("innerPath") or "").strip() if parsed else ""
items = await self.services.sharepoint.listFolderContents(siteId, folderPath) or []
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId)
for item in items[:MAX_FILES]:
if item.get("type") != "file":
continue
name = (item.get("name") or "").lower()
if not any(name.endswith(ext) for ext in ALLOWED_EXTENSIONS):
continue
content = await self.services.sharepoint.downloadFile(siteId, item.get("id"))
if not content:
continue
mime = "application/pdf" if name.endswith(".pdf") else "image/jpeg"
fileItem = db.createFile(name=item.get("name", "file"), mimeType=mime, content=content)
if fileItem:
db.createFileData(fileItem.id, content)
filesToProcess.append({"fileId": fileItem.id, "fileName": item.get("name", "file"), "mimeType": mime})
sharepointMoveInfo.append({
"siteId": siteId,
"folderPath": folderPath,
"fileName": item.get("name", "file"),
"itemId": item.get("id"),
})
else:
return ActionResult.isFailure(error="Provide fileIds or connectionReference + sharepointFolder")
if not filesToProcess:
return ActionResult.isSuccess(documents=[])
# Attach all files as ChatDocuments to the workflow so AI can resolve them
chatDocDumps = []
for f in filesToProcess:
chatDoc = ChatDocument(
id=str(uuid.uuid4()),
mandateId=self.services.mandateId or "",
featureInstanceId=featureInstanceId or "",
messageId="",
fileId=f["fileId"],
fileName=f["fileName"],
fileSize=0,
mimeType=f["mimeType"],
)
chatDocDumps.append(chatDoc.model_dump())
messageData = {
"id": f"msg_extract_{uuid.uuid4().hex[:12]}",
"documentsLabel": "extract_files",
"role": "user",
"status": "step",
"message": f"Extract from {len(filesToProcess)} file(s)",
}
createdMessage = self.services.chat.storeMessageWithDocuments(
self.services.workflow,
messageData,
chatDocDumps,
)
if not createdMessage or not createdMessage.documents:
return ActionResult.isFailure(error="Failed to attach documents to workflow")
# Map fileId -> ChatDocument id for AI reference
fileIdToChatDocId = {}
for i, f in enumerate(filesToProcess):
if i < len(createdMessage.documents):
fileIdToChatDocId[f["fileId"]] = createdMessage.documents[i].id
# Load expense accounts from accounting system for AI prompt (if configured)
if not prompt:
prompt = await _buildDefaultPromptWithAccounts(self, featureInstanceId)
# Parallel extraction (all files at once)
tasks = [
_extractOne(self, f, fileIdToChatDocId, prompt, featureInstanceId)
for f in filesToProcess
]
resultDocuments = list(await asyncio.gather(*tasks))
# Move SharePoint files to processed/ or error/ (parallel)
if sharepointMoveInfo and len(sharepointMoveInfo) == len(resultDocuments):
sharepoint = self.services.sharepoint
async def _moveOneFile(moveInfo: Dict[str, Any], resultDoc: ActionDocument) -> None:
try:
raw = resultDoc.documentData
data = json.loads(raw) if isinstance(raw, str) else raw
hasError = "error" in data or not data.get("extractedData")
destSub = "error" if hasError else "processed"
folderPath = (moveInfo.get("folderPath") or "").strip().rstrip("/")
destFolder = f"{folderPath}/{destSub}".strip("/") if folderPath else destSub
sourceFolder = folderPath or ""
fileName = moveInfo.get("fileName") or "file"
destFile = (
f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{fileName}"
if not hasError
else fileName
)
await sharepoint.copyFileAsync(
moveInfo["siteId"], sourceFolder, fileName, destFolder, destFile
)
await sharepoint.deleteFile(moveInfo["siteId"], moveInfo["itemId"])
except Exception as e:
logger.warning(f"Move SharePoint file failed for {moveInfo.get('fileName', '?')}: {e}")
moveTasks = [
_moveOneFile(sharepointMoveInfo[i], resultDocuments[i])
for i in range(len(sharepointMoveInfo))
if sharepointMoveInfo[i] is not None
]
if moveTasks:
await asyncio.gather(*moveTasks, return_exceptions=True)
return ActionResult.isSuccess(documents=resultDocuments)