gateway/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py
2026-04-17 11:50:24 +02:00

604 lines
28 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Extract document type and structured data from files (PDF, JPG).
Input: fileIds (list) OR connectionReference + sharepointFolder.
Output: ActionResult with one ActionDocument per file: { documentType, extractedData, fileId, fileName }, resultLabel.
"""
import asyncio
import json
import logging
import uuid
import csv
import io
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Tuple
from modules.datamodels.datamodelChat import ActionResult, ActionDocument, ChatDocument, ChatMessage
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
from modules.datamodels.datamodelAi import AiCallOptions, AiCallRequest, OperationTypeEnum
from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException as _SubscriptionInactiveException
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError as _BillingContextError
logger = logging.getLogger(__name__)
ALLOWED_EXTENSIONS = (".pdf", ".jpg", ".jpeg")
MAX_FILES = 50
# Phase 1a: Pure OCR / text extraction (no JSON, plain text only)
_OCR_PROMPT = "Extract ALL readable text from this document. Return ONLY the plain text, nothing else."
# Phase 1b: Classification (text-only, lightweight)
_CLASSIFY_PROMPT = (
"Classify this document text into one of these types. "
"Return ONLY the type name, nothing else.\n"
"EXPENSE_RECEIPT: Quittungen, Tankbelege, Kassenzettel\n"
"BANK_DOCUMENT: Bankauszuege, Kontoauszuege mit Transaktionslisten\n"
"INVOICE: Rechnungen mit Rechnungsnummer und Faelligkeitsdatum\n"
"CONTRACT: Vertraege\n"
"UNKNOWN: Falls unklar"
)
# Phase 2: Type-specific structuring prompts (placeholders: {expenseList}, {bankList})
_PROMPT_EXPENSE_RECEIPT = (
"Extrahiere aus dem folgenden Beleg eine Buchung pro Ausgabeposition. "
"Return JSON: {{\"records\": [{{...}}]}}. "
"Jeder Record hat diese Felder:\n"
"- documentType: immer \"expense_receipt\"\n"
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, numeric)\n"
"- company: vollstaendiger Firmenname inkl. Rechtsform\n"
"- desc: AUSFUEHRLICHE Beschreibung — alle Positionen/Artikel, Mengen, Einzelpreise, "
"Adresse des Geschaefts, Belegnummer, evtl. Kassennummer. "
"Fuer die Nachbearbeitung muessen alle relevanten Details im desc stehen.\n"
"- bookingCurrency, bookingAmount, originalCurrency, originalAmount\n"
"- vatPercentage, vatAmount\n"
"- debitAccountNumber (NUR Kontonummer, z.B. \"6200\", aus: {expenseList})\n"
"- creditAccountNumber (NUR Kontonummer, z.B. \"1020\", aus: {bankList})\n"
"- tags, taxCode, costCenter, bookingReference\n"
"- payeeIban, payeeName, payeeBic: falls Zahlungsdaten auf dem Beleg stehen\n"
"- paymentReference: QR-Referenz / ESR-Nummer / Mitteilung, falls vorhanden\n"
"- dueDate (YYYY-MM-DD): Zahlungsfrist, falls angegeben\n"
"WICHTIG: transactionDateTime muss eine ZAHL sein (z.B. 1737417600), niemals '21.01.2026'. "
"Felder ohne Wert als null."
)
_PROMPT_BANK_DOCUMENT = (
"Extrahiere aus dem folgenden Bankauszug eine Buchung pro Transaktionszeile. "
"Return JSON: {{\"records\": [{{...}}]}}. "
"Jeder Record hat diese Felder:\n"
"- documentType: immer \"bank_document\"\n"
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, optional)\n"
"- company: Gegenpartei (vollstaendiger Name)\n"
"- desc: AUSFUEHRLICH — Zahlungsreferenz, Mitteilung, Verwendungszweck, alle Details der Transaktionszeile. "
"Wenn mehrere Referenzen/Mitteilungen vorhanden sind, alle angeben.\n"
"- bookingAmount, bookingCurrency\n"
"- debitAccountNumber (NUR Kontonummer aus: {expenseList})\n"
"- creditAccountNumber (NUR Kontonummer aus: {bankList})\n"
"- bookingReference\n"
"- payeeIban: IBAN der Gegenpartei, falls sichtbar\n"
"- payeeName: Name des Kontoinhabers der Gegenpartei\n"
"- paymentReference: Referenznummer der Transaktion\n"
"Kein MwSt bei Bankauszuegen. Felder ohne Wert als null."
)
_PROMPT_INVOICE = (
"Extrahiere aus der folgenden Rechnung genau eine Buchung. "
"Return JSON: {{\"records\": [{{...}}]}}. "
"Der Record hat diese Felder:\n"
"- documentType: immer \"invoice\"\n"
"- valuta (Rechnungsdatum, YYYY-MM-DD)\n"
"- transactionDateTime (unix seconds als Zahl)\n"
"- company: Kreditor — vollstaendiger Firmenname inkl. Rechtsform und Adresse\n"
"- desc: AUSFUEHRLICHE Rechnungsdetails — alle Positionen mit Einzelpreisen und Mengen, "
"Rechnungsnummer, Kundennummer, Lieferadresse, besondere Bedingungen. "
"Alle Informationen die fuer die Folgebearbeitung (Zahlung, Kontrolle, Verbuchung) relevant sind.\n"
"- bookingAmount (Totalbetrag), bookingCurrency\n"
"- vatPercentage, vatAmount\n"
"- debitAccountNumber (NUR Kontonummer aus: {expenseList})\n"
"- creditAccountNumber (NUR Kontonummer aus: {bankList})\n"
"- bookingReference: Rechnungsnummer\n"
"- taxCode, costCenter\n"
"ZAHLUNGSDATEN (sehr wichtig, haeufig im QR-Code oder Einzahlungsschein):\n"
"- payeeIban: IBAN des Zahlungsempfaengers\n"
"- payeeName: Kontoinhaber / Bankname des Empfaengers\n"
"- payeeBic: BIC/SWIFT-Code, falls vorhanden\n"
"- paymentReference: Strukturierte Referenz — QR-Referenz (26-27 Stellen), "
"ESR-Referenznummer, SCOR-Referenz oder unstrukturierte Mitteilung. "
"Alle Referenzen vollstaendig uebernehmen.\n"
"- dueDate (YYYY-MM-DD): Zahlungsfrist / Faelligkeitsdatum\n"
"Formatregeln: valuta und dueDate nur YYYY-MM-DD; transactionDateTime nur unix seconds als Zahl. "
"Felder ohne Wert als null."
)
_PROMPT_FALLBACK = (
"Extrahiere aus dem folgenden Dokument Buchungsdaten. "
"Return JSON: {{\"records\": [{{...}}]}}. "
"Jeder Record hat diese Felder:\n"
"- documentType: Art des Dokuments (\"invoice\", \"expense_receipt\", \"bank_document\" oder \"unknown\")\n"
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, numeric)\n"
"- company: vollstaendiger Firmenname\n"
"- desc: AUSFUEHRLICHE Beschreibung — alle Details des Dokuments, Positionen, Referenzen, "
"Adressen, Bedingungen. Nicht nur ein Stichwort sondern alle relevanten Informationen.\n"
"- bookingCurrency, bookingAmount, originalCurrency, originalAmount\n"
"- vatPercentage, vatAmount\n"
"- debitAccountNumber (NUR Kontonummer, z.B. \"6200\", aus: {expenseList})\n"
"- creditAccountNumber (NUR Kontonummer, z.B. \"1020\", aus: {bankList})\n"
"- tags, taxCode, costCenter, bookingReference\n"
"- payeeIban, payeeName, payeeBic: Zahlungsdaten, falls vorhanden\n"
"- paymentReference: QR-Referenz / ESR / SCOR / Mitteilung\n"
"- dueDate (YYYY-MM-DD): Zahlungsfrist, falls vorhanden\n"
"WICHTIG: transactionDateTime muss eine ZAHL sein, niemals DD.MM.YYYY. "
"Felder ohne Wert als null."
)
def _parseDocumentType(raw: str) -> str:
"""Parse classification response (plain type name). Returns normalised document type."""
_VALID_TYPES = {"EXPENSE_RECEIPT", "BANK_DOCUMENT", "INVOICE", "CONTRACT", "UNKNOWN"}
cleaned = (raw or "").strip().upper().replace(" ", "_").replace('"', "").replace("'", "")
for t in _VALID_TYPES:
if t in cleaned:
return t
return "UNKNOWN"
def _buildStructuringPrompt(documentType: str, expenseList: str, bankList: str) -> str:
"""Build phase 2 prompt for the given document type, with account lists injected."""
expenseList = expenseList or "6200 Fahrzeugaufwand, 6000 Materialaufwand"
bankList = bankList or "1020 Bank"
docType = (documentType or "UNKNOWN").upper().replace(" ", "_")
if docType == "EXPENSE_RECEIPT":
return _PROMPT_EXPENSE_RECEIPT.format(expenseList=expenseList, bankList=bankList)
if docType == "BANK_DOCUMENT":
return _PROMPT_BANK_DOCUMENT.format(expenseList=expenseList, bankList=bankList)
if docType == "INVOICE":
return _PROMPT_INVOICE.format(expenseList=expenseList, bankList=bankList)
return _PROMPT_FALLBACK.format(expenseList=expenseList, bankList=bankList)
async def _getAccountLists(self, featureInstanceId: str) -> Tuple[str, str]:
"""Load expense and bank account lists from the connected accounting system for use in prompts.
Returns (expenseList, bankList). Empty strings if not configured or on error."""
try:
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
from modules.features.trustee.accounting.accountingBridge import AccountingBridge
trusteeInterface = getTrusteeInterface(
self.services.user,
mandateId=self.services.mandateId,
featureInstanceId=featureInstanceId,
)
bridge = AccountingBridge(trusteeInterface)
expenseAccounts = await bridge._getExpenseAccounts(featureInstanceId)
assetAccounts = await bridge.getChartOfAccounts(featureInstanceId, accountType="asset")
except Exception as e:
logger.debug("Could not load chart of accounts for prompt: %s", e)
return ("", "")
if not expenseAccounts:
return ("", "")
expenseList = ", ".join(f"{a.accountNumber} {a.label}" for a in expenseAccounts[:50])
bankAccounts = [a for a in assetAccounts if a.accountNumber.startswith("10")]
bankList = ", ".join(f"{a.accountNumber} {a.label}" for a in bankAccounts[:10]) if bankAccounts else "1020 Bank"
return (expenseList, bankList)
def _parseStructuredRecords(raw: str) -> List[Dict[str, Any]]:
"""Parse phase 2 AI response (JSON with records or CSV) into list of record dicts."""
from modules.shared.jsonUtils import stripCodeFences, extractFirstBalancedJson
records: List[Dict[str, Any]] = []
cleaned = extractFirstBalancedJson(stripCodeFences((raw or "").strip()))
try:
data = json.loads(cleaned)
records = data.get("records") or data.get("extractedData") or []
except Exception:
if cleaned:
records = _parseCsvToRecords(cleaned)
return records if isinstance(records, list) else []
def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
"""Parse CSV content to list of expense records."""
records = []
try:
content = (csvContent or "").strip()
if content.startswith("```"):
lines = content.split("\n")
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
content = "\n".join(lines)
reader = csv.DictReader(io.StringIO(content))
for row in reader:
cleaned = {(k.strip() if k else k): (v.strip() if isinstance(v, str) else v) for k, v in row.items() if k}
records.append(cleaned)
except Exception as e:
logger.warning(f"Parse CSV: {e}")
return records
def _estimateBankTransactionLineCount(rawText: str) -> int:
"""Estimate how many transaction rows exist in bank statement OCR text."""
import re
lines = (rawText or "").splitlines()
datePattern = re.compile(r"\b(\d{2}[./-]\d{2}[./-]\d{2,4}|\d{4}-\d{2}-\d{2})\b")
amountPattern = re.compile(r"[-+]?\d{1,3}(?:[ '\u00A0]\d{3})*(?:[.,]\d{2})\b")
candidateCount = 0
for line in lines:
stripped = (line or "").strip()
if len(stripped) < 8:
continue
if datePattern.search(stripped) and amountPattern.search(stripped):
candidateCount += 1
return candidateCount
def _buildBankDocumentRetryPrompt(expenseList: str, bankList: str, expectedRows: int) -> str:
"""Build a stricter retry prompt to force full bank-row extraction."""
return (
"Du hast vorher zu wenige Buchungen extrahiert. "
"Extrahiere JETZT ALLE Transaktionszeilen aus dem Bankauszug vollstaendig. "
f"Erwartete Groessenordnung: mindestens {max(2, expectedRows)} Zeilen. "
"WICHTIG: Eine Transaktionszeile = genau ein Record. "
"Niemals Zeilen zusammenfassen, niemals nur die erste oder eine Beispielzeile liefern. "
"Wenn Details fehlen, trotzdem Record erzeugen und fehlende Felder als null setzen. "
"Return JSON: {\"records\": [{...}]}. "
"Jeder Record hat diese Felder:\n"
"- documentType: immer \"bank_document\"\n"
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, optional)\n"
"- company (Gegenpartei)\n"
"- desc (vollstaendige Details der Zeile inkl. Referenz/Mitteilung)\n"
"- bookingAmount, bookingCurrency\n"
f"- debitAccountNumber (NUR Kontonummer aus: {expenseList})\n"
f"- creditAccountNumber (NUR Kontonummer aus: {bankList})\n"
"- bookingReference, payeeIban, payeeName, paymentReference\n"
"Kein MwSt bei Bankauszuegen."
)
async def _extractWithAi(
self,
chatDocumentId: str,
fileId: str,
fileName: str,
mimeType: str,
expenseList: str,
bankList: str,
featureInstanceId: str,
) -> Dict[str, Any]:
"""3-step extraction: (1a) OCR/text via Vision AI, (1b) classify text, (2) structure by type."""
await self.services.ai.ensureAiObjectsInitialized()
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
docList = DocumentReferenceList(
references=[DocumentItemReference(documentId=chatDocumentId, fileName=fileName)]
)
# --- Step 1a: Pure text extraction (Vision AI for images, text extraction for text PDFs) ---
try:
self.services.utils.writeDebugFile(_OCR_PROMPT, "trustee_ocr_prompt")
except Exception:
pass
ocrOptions = AiCallOptions(resultFormat="text", operationType=OperationTypeEnum.DATA_EXTRACT)
ocrResponse = await self.services.ai.callAiContent(
prompt=_OCR_PROMPT,
options=ocrOptions,
documentList=docList,
contentParts=None,
outputFormat="txt",
generationIntent="extract",
)
if not ocrResponse or not ocrResponse.documents:
return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName}
rawText = ocrResponse.documents[0].documentData
if isinstance(rawText, bytes):
rawText = rawText.decode("utf-8")
rawText = (rawText or "").strip()
try:
self.services.utils.writeDebugFile(rawText[:5000] if rawText else "(empty)", "trustee_ocr_result")
except Exception:
pass
if not rawText:
return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName}
# --- Step 1b: Classify the extracted text (lightweight text-only call, no Vision AI) ---
classifyPrompt = f"{_CLASSIFY_PROMPT}\n\nTEXT:\n{rawText[:3000]}"
classifyRequest = AiCallRequest(prompt=classifyPrompt, context="", options=AiCallOptions(resultFormat="text"))
classifyResponse = await self.services.ai.callAi(classifyRequest)
documentType = _parseDocumentType(classifyResponse.content if hasattr(classifyResponse, "content") else "")
logger.info("Document classified: type=%s, rawText_length=%d, file=%s", documentType, len(rawText), fileName)
structuringPrompt = _buildStructuringPrompt(documentType, expenseList, bankList)
try:
self.services.utils.writeDebugFile(structuringPrompt, "trustee_structuring_prompt")
except Exception:
pass
fullPrompt = f"{structuringPrompt}\n\nDOKUMENT-TEXT:\n{rawText}"
phase2Request = AiCallRequest(
prompt=fullPrompt,
context="",
options=AiCallOptions(resultFormat="json"),
)
phase2Response = await self.services.ai.callAi(phase2Request)
raw2 = (phase2Response.content or "").strip() if hasattr(phase2Response, "content") else ""
try:
self.services.utils.writeDebugFile(raw2 or "(empty)", "trustee_structuring_response")
except Exception:
pass
records = _parseStructuredRecords(raw2)
logger.info("Phase 2 result: documentType=%s, records=%d, raw2_length=%d", documentType, len(records), len(raw2))
# Failsafe for bank statements: retry with stricter prompt if extraction is likely incomplete.
if documentType == "BANK_DOCUMENT":
estimatedRows = _estimateBankTransactionLineCount(rawText)
likelyIncomplete = (
estimatedRows >= 3
and (
len(records) <= 1
or len(records) < max(2, estimatedRows // 2)
)
)
if likelyIncomplete:
retryPrompt = _buildBankDocumentRetryPrompt(expenseList, bankList, estimatedRows)
retryFullPrompt = f"{retryPrompt}\n\nDOKUMENT-TEXT:\n{rawText}"
retryRequest = AiCallRequest(
prompt=retryFullPrompt,
context="",
options=AiCallOptions(resultFormat="json"),
)
retryResponse = await self.services.ai.callAi(retryRequest)
retryRaw = (retryResponse.content or "").strip() if hasattr(retryResponse, "content") else ""
retryRecords = _parseStructuredRecords(retryRaw)
if len(retryRecords) > len(records):
records = retryRecords
logger.info(
"Bank statement retry improved extraction: records=%d -> %d (estimatedRows=%d, file=%s)",
len(_parseStructuredRecords(raw2)),
len(records),
estimatedRows,
fileName,
)
else:
logger.warning(
"Bank statement extraction may be incomplete: records=%d, estimatedRows=%d, file=%s",
len(records),
estimatedRows,
fileName,
)
if records and (not documentType or documentType == "UNKNOWN"):
documentType = "EXPENSE_RECEIPT"
return {"documentType": documentType or "UNKNOWN", "extractedData": records, "fileId": fileId, "fileName": fileName}
async def _extractOne(
self,
f: Dict[str, Any],
fileIdToChatDocId: Dict[str, str],
expenseList: str,
bankList: str,
featureInstanceId: str,
) -> ActionDocument:
"""Run extraction for one file; returns success or error ActionDocument (never raises)."""
chatDocId = fileIdToChatDocId.get(f["fileId"])
if not chatDocId:
return ActionDocument(
documentName=(f.get("fileName") or "error") + ".json",
documentData=json.dumps({
"documentType": "UNKNOWN",
"extractedData": [],
"fileId": f["fileId"],
"fileName": f.get("fileName"),
"error": "No ChatDocument id for file",
}),
mimeType="application/json",
)
try:
out = await _extractWithAi(
self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], expenseList, bankList, featureInstanceId
)
return ActionDocument(
documentName=f.get("fileName", "extract") + ".json",
documentData=json.dumps(out),
mimeType="application/json",
)
except (_SubscriptionInactiveException, _BillingContextError):
raise
except Exception as e:
logger.exception(f"Extract failed for {f.get('fileName')}")
return ActionDocument(
documentName=(f.get("fileName") or "error") + ".json",
documentData=json.dumps({
"documentType": "UNKNOWN",
"extractedData": [],
"fileId": f["fileId"],
"fileName": f.get("fileName"),
"error": str(e),
}),
mimeType="application/json",
)
async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract document type and data from files.
Either fileIds (list of file IDs already in DB) or connectionReference + sharepointFolder (list PDF/JPG, download, store in DB).
Returns one ActionDocument per file with documentData = JSON { documentType, extractedData, fileId, fileName }.
"""
fileIds = parameters.get("fileIds") or []
connectionReference = parameters.get("connectionReference")
sharepointFolder = parameters.get("sharepointFolder")
featureInstanceId = parameters.get("featureInstanceId") or getattr(self.services, "featureInstanceId", None)
if not featureInstanceId:
return ActionResult.isFailure(error="featureInstanceId is required")
filesToProcess = [] # list of { fileId, fileName, mimeType }
sharepointMoveInfo: List[Optional[Dict[str, Any]]] = [] # one entry per file; None if not from SharePoint
if fileIds:
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId)
for fid in (fileIds if isinstance(fileIds, list) else [fileIds]):
if not fid:
continue
rec = db.getFile(fid) if hasattr(db, "getFile") else None
if rec:
fileId = rec.id if hasattr(rec, "id") else rec.get("id", fid)
fileName = getattr(rec, "fileName", None) or rec.get("fileName", rec.get("name", "document"))
mimeType = getattr(rec, "mimeType", None) or rec.get("mimeType", "application/octet-stream")
filesToProcess.append({"fileId": fileId, "fileName": fileName, "mimeType": mimeType})
else:
filesToProcess.append({"fileId": fid, "fileName": "document", "mimeType": "application/octet-stream"})
sharepointMoveInfo.append(None)
elif connectionReference and sharepointFolder:
userConn = self.services.chat.getUserConnectionFromConnectionReference(connectionReference)
if not userConn:
return ActionResult.isFailure(error="No Microsoft connection for connectionReference")
if not self.services.sharepoint.setAccessTokenFromConnection(userConn):
return ActionResult.isFailure(error="Failed to set SharePoint access token")
sites = await self.services.sharepoint.resolveSitesFromPathQuery(sharepointFolder)
if not sites:
return ActionResult.isFailure(error="No SharePoint site found for path")
siteId = sites[0].get("id")
if not siteId:
return ActionResult.isFailure(error="SharePoint site has no id")
parsed = self.services.sharepoint.extractSiteFromStandardPath(sharepointFolder)
folderPath = (parsed.get("innerPath") or "").strip() if parsed else ""
items = await self.services.sharepoint.listFolderContents(siteId, folderPath) or []
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId)
for item in items[:MAX_FILES]:
if item.get("type") != "file":
continue
name = (item.get("name") or "").lower()
if not any(name.endswith(ext) for ext in ALLOWED_EXTENSIONS):
continue
content = await self.services.sharepoint.downloadFile(siteId, item.get("id"))
if not content:
continue
mime = "application/pdf" if name.endswith(".pdf") else "image/jpeg"
fileItem = db.createFile(name=item.get("name", "file"), mimeType=mime, content=content)
if fileItem:
db.createFileData(fileItem.id, content)
filesToProcess.append({"fileId": fileItem.id, "fileName": item.get("name", "file"), "mimeType": mime})
sharepointMoveInfo.append({
"siteId": siteId,
"folderPath": folderPath,
"fileName": item.get("name", "file"),
"itemId": item.get("id"),
})
else:
return ActionResult.isFailure(error="Provide fileIds or connectionReference + sharepointFolder")
if not filesToProcess:
return ActionResult.isSuccess(documents=[])
# Attach all files as ChatDocuments so AI can resolve them via DocumentReferenceList.
# When running inside the graph engine there is no real ChatWorkflow (workflow.id is None),
# so we create in-memory ChatDocument objects and inject them directly into the placeholder
# workflow's messages list instead of going through storeMessageWithDocuments.
chatDocs = []
for f in filesToProcess:
chatDocs.append(ChatDocument(
id=str(uuid.uuid4()),
mandateId=self.services.mandateId or "",
featureInstanceId=featureInstanceId or "",
messageId="",
fileId=f["fileId"],
fileName=f["fileName"],
fileSize=0,
mimeType=f["mimeType"],
))
workflow = self.services.workflow
_wfId = getattr(workflow, "id", None) or ""
hasRealWorkflow = workflow is not None and bool(_wfId) and not str(_wfId).startswith("transient-")
if hasRealWorkflow:
chatDocDumps = [d.model_dump() for d in chatDocs]
messageData = {
"id": f"msg_extract_{uuid.uuid4().hex[:12]}",
"documentsLabel": "extract_files",
"role": "user",
"status": "step",
"message": f"Extract from {len(filesToProcess)} file(s)",
}
createdMessage = self.services.chat.storeMessageWithDocuments(
workflow, messageData, chatDocDumps,
)
if not createdMessage or not createdMessage.documents:
return ActionResult.isFailure(error="Failed to attach documents to workflow")
fileIdToChatDocId = {}
for i, f in enumerate(filesToProcess):
if i < len(createdMessage.documents):
fileIdToChatDocId[f["fileId"]] = createdMessage.documents[i].id
else:
# Graph-engine path: inject documents into the placeholder workflow so
# getChatDocumentsFromDocumentList can find them via workflow.messages.
msgId = f"msg_extract_{uuid.uuid4().hex[:12]}"
placeholderMsg = ChatMessage(
id=msgId,
workflowId=getattr(workflow, "id", None) or "transient",
documentsLabel="extract_files",
role="user",
status="step",
message=f"Extract from {len(filesToProcess)} file(s)",
documents=chatDocs,
)
if workflow is not None and hasattr(workflow, "messages"):
workflow.messages.append(placeholderMsg)
fileIdToChatDocId = {f["fileId"]: chatDocs[i].id for i, f in enumerate(filesToProcess)}
expenseList, bankList = await _getAccountLists(self, featureInstanceId)
# Parallel extraction (all files at once, 2-phase: classify + structure)
tasks = [
_extractOne(self, f, fileIdToChatDocId, expenseList, bankList, featureInstanceId)
for f in filesToProcess
]
resultDocuments = list(await asyncio.gather(*tasks))
# Move SharePoint files to processed/ or error/ (parallel)
if sharepointMoveInfo and len(sharepointMoveInfo) == len(resultDocuments):
sharepoint = self.services.sharepoint
async def _moveOneFile(moveInfo: Dict[str, Any], resultDoc: ActionDocument) -> None:
try:
raw = resultDoc.documentData
data = json.loads(raw) if isinstance(raw, str) else raw
hasError = "error" in data or not data.get("extractedData")
if hasError:
logger.info(f"Extraction failed for {moveInfo.get('fileName', '?')} — leaving file in place")
return
folderPath = (moveInfo.get("folderPath") or "").strip().rstrip("/")
destFolder = f"{folderPath}/processed".strip("/") if folderPath else "processed"
sourceFolder = folderPath or ""
fileName = moveInfo.get("fileName") or "file"
destFile = f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{fileName}"
await sharepoint.copyFileAsync(
moveInfo["siteId"], sourceFolder, fileName, destFolder, destFile
)
await sharepoint.deleteFile(moveInfo["siteId"], moveInfo["itemId"])
except Exception as e:
logger.warning(f"Move SharePoint file failed for {moveInfo.get('fileName', '?')}: {e}")
moveTasks = [
_moveOneFile(sharepointMoveInfo[i], resultDocuments[i])
for i in range(len(sharepointMoveInfo))
if sharepointMoveInfo[i] is not None
]
if moveTasks:
await asyncio.gather(*moveTasks, return_exceptions=True)
return ActionResult.isSuccess(documents=resultDocuments)