600 lines
28 KiB
Python
600 lines
28 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Extract document type and structured data from files (PDF, JPG).
|
|
Input: fileIds (list) OR connectionReference + sharepointFolder.
|
|
Output: ActionResult with one ActionDocument per file: { documentType, extractedData, fileId, fileName }, resultLabel.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import uuid
|
|
import csv
|
|
import io
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
|
from modules.datamodels.datamodelChat import ActionResult, ActionDocument, ChatDocument, ChatMessage
|
|
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
|
|
from modules.datamodels.datamodelAi import AiCallOptions, AiCallRequest, OperationTypeEnum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ALLOWED_EXTENSIONS = (".pdf", ".jpg", ".jpeg")
|
|
MAX_FILES = 50
|
|
|
|
# Phase 1a: Pure OCR / text extraction (no JSON, plain text only)
|
|
_OCR_PROMPT = "Extract ALL readable text from this document. Return ONLY the plain text, nothing else."
|
|
|
|
# Phase 1b: Classification (text-only, lightweight)
|
|
_CLASSIFY_PROMPT = (
|
|
"Classify this document text into one of these types. "
|
|
"Return ONLY the type name, nothing else.\n"
|
|
"EXPENSE_RECEIPT: Quittungen, Tankbelege, Kassenzettel\n"
|
|
"BANK_DOCUMENT: Bankauszuege, Kontoauszuege mit Transaktionslisten\n"
|
|
"INVOICE: Rechnungen mit Rechnungsnummer und Faelligkeitsdatum\n"
|
|
"CONTRACT: Vertraege\n"
|
|
"UNKNOWN: Falls unklar"
|
|
)
|
|
|
|
# Phase 2: Type-specific structuring prompts (placeholders: {expenseList}, {bankList})
|
|
_PROMPT_EXPENSE_RECEIPT = (
|
|
"Extrahiere aus dem folgenden Beleg eine Buchung pro Ausgabeposition. "
|
|
"Return JSON: {{\"records\": [{{...}}]}}. "
|
|
"Jeder Record hat diese Felder:\n"
|
|
"- documentType: immer \"expense_receipt\"\n"
|
|
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, numeric)\n"
|
|
"- company: vollstaendiger Firmenname inkl. Rechtsform\n"
|
|
"- desc: AUSFUEHRLICHE Beschreibung — alle Positionen/Artikel, Mengen, Einzelpreise, "
|
|
"Adresse des Geschaefts, Belegnummer, evtl. Kassennummer. "
|
|
"Fuer die Nachbearbeitung muessen alle relevanten Details im desc stehen.\n"
|
|
"- bookingCurrency, bookingAmount, originalCurrency, originalAmount\n"
|
|
"- vatPercentage, vatAmount\n"
|
|
"- debitAccountNumber (NUR Kontonummer, z.B. \"6200\", aus: {expenseList})\n"
|
|
"- creditAccountNumber (NUR Kontonummer, z.B. \"1020\", aus: {bankList})\n"
|
|
"- tags, taxCode, costCenter, bookingReference\n"
|
|
"- payeeIban, payeeName, payeeBic: falls Zahlungsdaten auf dem Beleg stehen\n"
|
|
"- paymentReference: QR-Referenz / ESR-Nummer / Mitteilung, falls vorhanden\n"
|
|
"- dueDate (YYYY-MM-DD): Zahlungsfrist, falls angegeben\n"
|
|
"WICHTIG: transactionDateTime muss eine ZAHL sein (z.B. 1737417600), niemals '21.01.2026'. "
|
|
"Felder ohne Wert als null."
|
|
)
|
|
_PROMPT_BANK_DOCUMENT = (
|
|
"Extrahiere aus dem folgenden Bankauszug eine Buchung pro Transaktionszeile. "
|
|
"Return JSON: {{\"records\": [{{...}}]}}. "
|
|
"Jeder Record hat diese Felder:\n"
|
|
"- documentType: immer \"bank_document\"\n"
|
|
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, optional)\n"
|
|
"- company: Gegenpartei (vollstaendiger Name)\n"
|
|
"- desc: AUSFUEHRLICH — Zahlungsreferenz, Mitteilung, Verwendungszweck, alle Details der Transaktionszeile. "
|
|
"Wenn mehrere Referenzen/Mitteilungen vorhanden sind, alle angeben.\n"
|
|
"- bookingAmount, bookingCurrency\n"
|
|
"- debitAccountNumber (NUR Kontonummer aus: {expenseList})\n"
|
|
"- creditAccountNumber (NUR Kontonummer aus: {bankList})\n"
|
|
"- bookingReference\n"
|
|
"- payeeIban: IBAN der Gegenpartei, falls sichtbar\n"
|
|
"- payeeName: Name des Kontoinhabers der Gegenpartei\n"
|
|
"- paymentReference: Referenznummer der Transaktion\n"
|
|
"Kein MwSt bei Bankauszuegen. Felder ohne Wert als null."
|
|
)
|
|
_PROMPT_INVOICE = (
|
|
"Extrahiere aus der folgenden Rechnung genau eine Buchung. "
|
|
"Return JSON: {{\"records\": [{{...}}]}}. "
|
|
"Der Record hat diese Felder:\n"
|
|
"- documentType: immer \"invoice\"\n"
|
|
"- valuta (Rechnungsdatum, YYYY-MM-DD)\n"
|
|
"- transactionDateTime (unix seconds als Zahl)\n"
|
|
"- company: Kreditor — vollstaendiger Firmenname inkl. Rechtsform und Adresse\n"
|
|
"- desc: AUSFUEHRLICHE Rechnungsdetails — alle Positionen mit Einzelpreisen und Mengen, "
|
|
"Rechnungsnummer, Kundennummer, Lieferadresse, besondere Bedingungen. "
|
|
"Alle Informationen die fuer die Folgebearbeitung (Zahlung, Kontrolle, Verbuchung) relevant sind.\n"
|
|
"- bookingAmount (Totalbetrag), bookingCurrency\n"
|
|
"- vatPercentage, vatAmount\n"
|
|
"- debitAccountNumber (NUR Kontonummer aus: {expenseList})\n"
|
|
"- creditAccountNumber (NUR Kontonummer aus: {bankList})\n"
|
|
"- bookingReference: Rechnungsnummer\n"
|
|
"- taxCode, costCenter\n"
|
|
"ZAHLUNGSDATEN (sehr wichtig, haeufig im QR-Code oder Einzahlungsschein):\n"
|
|
"- payeeIban: IBAN des Zahlungsempfaengers\n"
|
|
"- payeeName: Kontoinhaber / Bankname des Empfaengers\n"
|
|
"- payeeBic: BIC/SWIFT-Code, falls vorhanden\n"
|
|
"- paymentReference: Strukturierte Referenz — QR-Referenz (26-27 Stellen), "
|
|
"ESR-Referenznummer, SCOR-Referenz oder unstrukturierte Mitteilung. "
|
|
"Alle Referenzen vollstaendig uebernehmen.\n"
|
|
"- dueDate (YYYY-MM-DD): Zahlungsfrist / Faelligkeitsdatum\n"
|
|
"Formatregeln: valuta und dueDate nur YYYY-MM-DD; transactionDateTime nur unix seconds als Zahl. "
|
|
"Felder ohne Wert als null."
|
|
)
|
|
_PROMPT_FALLBACK = (
|
|
"Extrahiere aus dem folgenden Dokument Buchungsdaten. "
|
|
"Return JSON: {{\"records\": [{{...}}]}}. "
|
|
"Jeder Record hat diese Felder:\n"
|
|
"- documentType: Art des Dokuments (\"invoice\", \"expense_receipt\", \"bank_document\" oder \"unknown\")\n"
|
|
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, numeric)\n"
|
|
"- company: vollstaendiger Firmenname\n"
|
|
"- desc: AUSFUEHRLICHE Beschreibung — alle Details des Dokuments, Positionen, Referenzen, "
|
|
"Adressen, Bedingungen. Nicht nur ein Stichwort sondern alle relevanten Informationen.\n"
|
|
"- bookingCurrency, bookingAmount, originalCurrency, originalAmount\n"
|
|
"- vatPercentage, vatAmount\n"
|
|
"- debitAccountNumber (NUR Kontonummer, z.B. \"6200\", aus: {expenseList})\n"
|
|
"- creditAccountNumber (NUR Kontonummer, z.B. \"1020\", aus: {bankList})\n"
|
|
"- tags, taxCode, costCenter, bookingReference\n"
|
|
"- payeeIban, payeeName, payeeBic: Zahlungsdaten, falls vorhanden\n"
|
|
"- paymentReference: QR-Referenz / ESR / SCOR / Mitteilung\n"
|
|
"- dueDate (YYYY-MM-DD): Zahlungsfrist, falls vorhanden\n"
|
|
"WICHTIG: transactionDateTime muss eine ZAHL sein, niemals DD.MM.YYYY. "
|
|
"Felder ohne Wert als null."
|
|
)
|
|
|
|
|
|
def _parseDocumentType(raw: str) -> str:
|
|
"""Parse classification response (plain type name). Returns normalised document type."""
|
|
_VALID_TYPES = {"EXPENSE_RECEIPT", "BANK_DOCUMENT", "INVOICE", "CONTRACT", "UNKNOWN"}
|
|
cleaned = (raw or "").strip().upper().replace(" ", "_").replace('"', "").replace("'", "")
|
|
for t in _VALID_TYPES:
|
|
if t in cleaned:
|
|
return t
|
|
return "UNKNOWN"
|
|
|
|
|
|
def _buildStructuringPrompt(documentType: str, expenseList: str, bankList: str) -> str:
|
|
"""Build phase 2 prompt for the given document type, with account lists injected."""
|
|
expenseList = expenseList or "6200 Fahrzeugaufwand, 6000 Materialaufwand"
|
|
bankList = bankList or "1020 Bank"
|
|
docType = (documentType or "UNKNOWN").upper().replace(" ", "_")
|
|
if docType == "EXPENSE_RECEIPT":
|
|
return _PROMPT_EXPENSE_RECEIPT.format(expenseList=expenseList, bankList=bankList)
|
|
if docType == "BANK_DOCUMENT":
|
|
return _PROMPT_BANK_DOCUMENT.format(expenseList=expenseList, bankList=bankList)
|
|
if docType == "INVOICE":
|
|
return _PROMPT_INVOICE.format(expenseList=expenseList, bankList=bankList)
|
|
return _PROMPT_FALLBACK.format(expenseList=expenseList, bankList=bankList)
|
|
|
|
|
|
async def _getAccountLists(self, featureInstanceId: str) -> Tuple[str, str]:
|
|
"""Load expense and bank account lists from the connected accounting system for use in prompts.
|
|
Returns (expenseList, bankList). Empty strings if not configured or on error."""
|
|
try:
|
|
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
|
|
from modules.features.trustee.accounting.accountingBridge import AccountingBridge
|
|
trusteeInterface = getTrusteeInterface(
|
|
self.services.user,
|
|
mandateId=self.services.mandateId,
|
|
featureInstanceId=featureInstanceId,
|
|
)
|
|
bridge = AccountingBridge(trusteeInterface)
|
|
expenseAccounts = await bridge._getExpenseAccounts(featureInstanceId)
|
|
assetAccounts = await bridge.getChartOfAccounts(featureInstanceId, accountType="asset")
|
|
except Exception as e:
|
|
logger.debug("Could not load chart of accounts for prompt: %s", e)
|
|
return ("", "")
|
|
|
|
if not expenseAccounts:
|
|
return ("", "")
|
|
|
|
expenseList = ", ".join(f"{a.accountNumber} {a.label}" for a in expenseAccounts[:50])
|
|
bankAccounts = [a for a in assetAccounts if a.accountNumber.startswith("10")]
|
|
bankList = ", ".join(f"{a.accountNumber} {a.label}" for a in bankAccounts[:10]) if bankAccounts else "1020 Bank"
|
|
return (expenseList, bankList)
|
|
|
|
|
|
def _parseStructuredRecords(raw: str) -> List[Dict[str, Any]]:
|
|
"""Parse phase 2 AI response (JSON with records or CSV) into list of record dicts."""
|
|
from modules.shared.jsonUtils import stripCodeFences, extractFirstBalancedJson
|
|
|
|
records: List[Dict[str, Any]] = []
|
|
cleaned = extractFirstBalancedJson(stripCodeFences((raw or "").strip()))
|
|
try:
|
|
data = json.loads(cleaned)
|
|
records = data.get("records") or data.get("extractedData") or []
|
|
except Exception:
|
|
if cleaned:
|
|
records = _parseCsvToRecords(cleaned)
|
|
return records if isinstance(records, list) else []
|
|
|
|
|
|
def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
|
|
"""Parse CSV content to list of expense records."""
|
|
records = []
|
|
try:
|
|
content = (csvContent or "").strip()
|
|
if content.startswith("```"):
|
|
lines = content.split("\n")
|
|
if lines and lines[0].startswith("```"):
|
|
lines = lines[1:]
|
|
if lines and lines[-1].strip() == "```":
|
|
lines = lines[:-1]
|
|
content = "\n".join(lines)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
cleaned = {(k.strip() if k else k): (v.strip() if isinstance(v, str) else v) for k, v in row.items() if k}
|
|
records.append(cleaned)
|
|
except Exception as e:
|
|
logger.warning(f"Parse CSV: {e}")
|
|
return records
|
|
|
|
|
|
def _estimateBankTransactionLineCount(rawText: str) -> int:
|
|
"""Estimate how many transaction rows exist in bank statement OCR text."""
|
|
import re
|
|
|
|
lines = (rawText or "").splitlines()
|
|
datePattern = re.compile(r"\b(\d{2}[./-]\d{2}[./-]\d{2,4}|\d{4}-\d{2}-\d{2})\b")
|
|
amountPattern = re.compile(r"[-+]?\d{1,3}(?:[ '\u00A0]\d{3})*(?:[.,]\d{2})\b")
|
|
|
|
candidateCount = 0
|
|
for line in lines:
|
|
stripped = (line or "").strip()
|
|
if len(stripped) < 8:
|
|
continue
|
|
if datePattern.search(stripped) and amountPattern.search(stripped):
|
|
candidateCount += 1
|
|
|
|
return candidateCount
|
|
|
|
|
|
def _buildBankDocumentRetryPrompt(expenseList: str, bankList: str, expectedRows: int) -> str:
|
|
"""Build a stricter retry prompt to force full bank-row extraction."""
|
|
return (
|
|
"Du hast vorher zu wenige Buchungen extrahiert. "
|
|
"Extrahiere JETZT ALLE Transaktionszeilen aus dem Bankauszug vollstaendig. "
|
|
f"Erwartete Groessenordnung: mindestens {max(2, expectedRows)} Zeilen. "
|
|
"WICHTIG: Eine Transaktionszeile = genau ein Record. "
|
|
"Niemals Zeilen zusammenfassen, niemals nur die erste oder eine Beispielzeile liefern. "
|
|
"Wenn Details fehlen, trotzdem Record erzeugen und fehlende Felder als null setzen. "
|
|
"Return JSON: {\"records\": [{...}]}. "
|
|
"Jeder Record hat diese Felder:\n"
|
|
"- documentType: immer \"bank_document\"\n"
|
|
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, optional)\n"
|
|
"- company (Gegenpartei)\n"
|
|
"- desc (vollstaendige Details der Zeile inkl. Referenz/Mitteilung)\n"
|
|
"- bookingAmount, bookingCurrency\n"
|
|
f"- debitAccountNumber (NUR Kontonummer aus: {expenseList})\n"
|
|
f"- creditAccountNumber (NUR Kontonummer aus: {bankList})\n"
|
|
"- bookingReference, payeeIban, payeeName, paymentReference\n"
|
|
"Kein MwSt bei Bankauszuegen."
|
|
)
|
|
|
|
|
|
async def _extractWithAi(
|
|
self,
|
|
chatDocumentId: str,
|
|
fileId: str,
|
|
fileName: str,
|
|
mimeType: str,
|
|
expenseList: str,
|
|
bankList: str,
|
|
featureInstanceId: str,
|
|
) -> Dict[str, Any]:
|
|
"""3-step extraction: (1a) OCR/text via Vision AI, (1b) classify text, (2) structure by type."""
|
|
await self.services.ai.ensureAiObjectsInitialized()
|
|
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
|
|
|
|
docList = DocumentReferenceList(
|
|
references=[DocumentItemReference(documentId=chatDocumentId, fileName=fileName)]
|
|
)
|
|
|
|
# --- Step 1a: Pure text extraction (Vision AI for images, text extraction for text PDFs) ---
|
|
try:
|
|
self.services.utils.writeDebugFile(_OCR_PROMPT, "trustee_ocr_prompt")
|
|
except Exception:
|
|
pass
|
|
|
|
ocrOptions = AiCallOptions(resultFormat="text", operationType=OperationTypeEnum.DATA_EXTRACT)
|
|
ocrResponse = await self.services.ai.callAiContent(
|
|
prompt=_OCR_PROMPT,
|
|
options=ocrOptions,
|
|
documentList=docList,
|
|
contentParts=None,
|
|
outputFormat="txt",
|
|
generationIntent="extract",
|
|
)
|
|
|
|
if not ocrResponse or not ocrResponse.documents:
|
|
return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName}
|
|
|
|
rawText = ocrResponse.documents[0].documentData
|
|
if isinstance(rawText, bytes):
|
|
rawText = rawText.decode("utf-8")
|
|
rawText = (rawText or "").strip()
|
|
|
|
try:
|
|
self.services.utils.writeDebugFile(rawText[:5000] if rawText else "(empty)", "trustee_ocr_result")
|
|
except Exception:
|
|
pass
|
|
|
|
if not rawText:
|
|
return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName}
|
|
|
|
# --- Step 1b: Classify the extracted text (lightweight text-only call, no Vision AI) ---
|
|
classifyPrompt = f"{_CLASSIFY_PROMPT}\n\nTEXT:\n{rawText[:3000]}"
|
|
classifyRequest = AiCallRequest(prompt=classifyPrompt, context="", options=AiCallOptions(resultFormat="text"))
|
|
classifyResponse = await self.services.ai.callAi(classifyRequest)
|
|
documentType = _parseDocumentType(classifyResponse.content if hasattr(classifyResponse, "content") else "")
|
|
logger.info("Document classified: type=%s, rawText_length=%d, file=%s", documentType, len(rawText), fileName)
|
|
|
|
structuringPrompt = _buildStructuringPrompt(documentType, expenseList, bankList)
|
|
try:
|
|
self.services.utils.writeDebugFile(structuringPrompt, "trustee_structuring_prompt")
|
|
except Exception:
|
|
pass
|
|
|
|
fullPrompt = f"{structuringPrompt}\n\nDOKUMENT-TEXT:\n{rawText}"
|
|
phase2Request = AiCallRequest(
|
|
prompt=fullPrompt,
|
|
context="",
|
|
options=AiCallOptions(resultFormat="json"),
|
|
)
|
|
phase2Response = await self.services.ai.callAi(phase2Request)
|
|
raw2 = (phase2Response.content or "").strip() if hasattr(phase2Response, "content") else ""
|
|
try:
|
|
self.services.utils.writeDebugFile(raw2 or "(empty)", "trustee_structuring_response")
|
|
except Exception:
|
|
pass
|
|
records = _parseStructuredRecords(raw2)
|
|
logger.info("Phase 2 result: documentType=%s, records=%d, raw2_length=%d", documentType, len(records), len(raw2))
|
|
|
|
# Failsafe for bank statements: retry with stricter prompt if extraction is likely incomplete.
|
|
if documentType == "BANK_DOCUMENT":
|
|
estimatedRows = _estimateBankTransactionLineCount(rawText)
|
|
likelyIncomplete = (
|
|
estimatedRows >= 3
|
|
and (
|
|
len(records) <= 1
|
|
or len(records) < max(2, estimatedRows // 2)
|
|
)
|
|
)
|
|
if likelyIncomplete:
|
|
retryPrompt = _buildBankDocumentRetryPrompt(expenseList, bankList, estimatedRows)
|
|
retryFullPrompt = f"{retryPrompt}\n\nDOKUMENT-TEXT:\n{rawText}"
|
|
retryRequest = AiCallRequest(
|
|
prompt=retryFullPrompt,
|
|
context="",
|
|
options=AiCallOptions(resultFormat="json"),
|
|
)
|
|
retryResponse = await self.services.ai.callAi(retryRequest)
|
|
retryRaw = (retryResponse.content or "").strip() if hasattr(retryResponse, "content") else ""
|
|
retryRecords = _parseStructuredRecords(retryRaw)
|
|
if len(retryRecords) > len(records):
|
|
records = retryRecords
|
|
logger.info(
|
|
"Bank statement retry improved extraction: records=%d -> %d (estimatedRows=%d, file=%s)",
|
|
len(_parseStructuredRecords(raw2)),
|
|
len(records),
|
|
estimatedRows,
|
|
fileName,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"Bank statement extraction may be incomplete: records=%d, estimatedRows=%d, file=%s",
|
|
len(records),
|
|
estimatedRows,
|
|
fileName,
|
|
)
|
|
|
|
if records and (not documentType or documentType == "UNKNOWN"):
|
|
documentType = "EXPENSE_RECEIPT"
|
|
|
|
return {"documentType": documentType or "UNKNOWN", "extractedData": records, "fileId": fileId, "fileName": fileName}
|
|
|
|
|
|
async def _extractOne(
|
|
self,
|
|
f: Dict[str, Any],
|
|
fileIdToChatDocId: Dict[str, str],
|
|
expenseList: str,
|
|
bankList: str,
|
|
featureInstanceId: str,
|
|
) -> ActionDocument:
|
|
"""Run extraction for one file; returns success or error ActionDocument (never raises)."""
|
|
chatDocId = fileIdToChatDocId.get(f["fileId"])
|
|
if not chatDocId:
|
|
return ActionDocument(
|
|
documentName=(f.get("fileName") or "error") + ".json",
|
|
documentData=json.dumps({
|
|
"documentType": "UNKNOWN",
|
|
"extractedData": [],
|
|
"fileId": f["fileId"],
|
|
"fileName": f.get("fileName"),
|
|
"error": "No ChatDocument id for file",
|
|
}),
|
|
mimeType="application/json",
|
|
)
|
|
try:
|
|
out = await _extractWithAi(
|
|
self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], expenseList, bankList, featureInstanceId
|
|
)
|
|
return ActionDocument(
|
|
documentName=f.get("fileName", "extract") + ".json",
|
|
documentData=json.dumps(out),
|
|
mimeType="application/json",
|
|
)
|
|
except Exception as e:
|
|
logger.exception(f"Extract failed for {f.get('fileName')}")
|
|
return ActionDocument(
|
|
documentName=(f.get("fileName") or "error") + ".json",
|
|
documentData=json.dumps({
|
|
"documentType": "UNKNOWN",
|
|
"extractedData": [],
|
|
"fileId": f["fileId"],
|
|
"fileName": f.get("fileName"),
|
|
"error": str(e),
|
|
}),
|
|
mimeType="application/json",
|
|
)
|
|
|
|
|
|
async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Extract document type and data from files.
|
|
Either fileIds (list of file IDs already in DB) or connectionReference + sharepointFolder (list PDF/JPG, download, store in DB).
|
|
Returns one ActionDocument per file with documentData = JSON { documentType, extractedData, fileId, fileName }.
|
|
"""
|
|
fileIds = parameters.get("fileIds") or []
|
|
connectionReference = parameters.get("connectionReference")
|
|
sharepointFolder = parameters.get("sharepointFolder")
|
|
featureInstanceId = parameters.get("featureInstanceId") or getattr(self.services, "featureInstanceId", None)
|
|
|
|
if not featureInstanceId:
|
|
return ActionResult.isFailure(error="featureInstanceId is required")
|
|
|
|
filesToProcess = [] # list of { fileId, fileName, mimeType }
|
|
sharepointMoveInfo: List[Optional[Dict[str, Any]]] = [] # one entry per file; None if not from SharePoint
|
|
|
|
if fileIds:
|
|
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
|
|
db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId)
|
|
for fid in (fileIds if isinstance(fileIds, list) else [fileIds]):
|
|
if not fid:
|
|
continue
|
|
rec = db.getFile(fid) if hasattr(db, "getFile") else None
|
|
if rec:
|
|
fileId = rec.id if hasattr(rec, "id") else rec.get("id", fid)
|
|
fileName = getattr(rec, "fileName", None) or rec.get("fileName", rec.get("name", "document"))
|
|
mimeType = getattr(rec, "mimeType", None) or rec.get("mimeType", "application/octet-stream")
|
|
filesToProcess.append({"fileId": fileId, "fileName": fileName, "mimeType": mimeType})
|
|
else:
|
|
filesToProcess.append({"fileId": fid, "fileName": "document", "mimeType": "application/octet-stream"})
|
|
sharepointMoveInfo.append(None)
|
|
elif connectionReference and sharepointFolder:
|
|
userConn = self.services.chat.getUserConnectionFromConnectionReference(connectionReference)
|
|
if not userConn:
|
|
return ActionResult.isFailure(error="No Microsoft connection for connectionReference")
|
|
if not self.services.sharepoint.setAccessTokenFromConnection(userConn):
|
|
return ActionResult.isFailure(error="Failed to set SharePoint access token")
|
|
sites = await self.services.sharepoint.resolveSitesFromPathQuery(sharepointFolder)
|
|
if not sites:
|
|
return ActionResult.isFailure(error="No SharePoint site found for path")
|
|
siteId = sites[0].get("id")
|
|
if not siteId:
|
|
return ActionResult.isFailure(error="SharePoint site has no id")
|
|
parsed = self.services.sharepoint.extractSiteFromStandardPath(sharepointFolder)
|
|
folderPath = (parsed.get("innerPath") or "").strip() if parsed else ""
|
|
items = await self.services.sharepoint.listFolderContents(siteId, folderPath) or []
|
|
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
|
|
db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId)
|
|
for item in items[:MAX_FILES]:
|
|
if item.get("type") != "file":
|
|
continue
|
|
name = (item.get("name") or "").lower()
|
|
if not any(name.endswith(ext) for ext in ALLOWED_EXTENSIONS):
|
|
continue
|
|
content = await self.services.sharepoint.downloadFile(siteId, item.get("id"))
|
|
if not content:
|
|
continue
|
|
mime = "application/pdf" if name.endswith(".pdf") else "image/jpeg"
|
|
fileItem = db.createFile(name=item.get("name", "file"), mimeType=mime, content=content)
|
|
if fileItem:
|
|
db.createFileData(fileItem.id, content)
|
|
filesToProcess.append({"fileId": fileItem.id, "fileName": item.get("name", "file"), "mimeType": mime})
|
|
sharepointMoveInfo.append({
|
|
"siteId": siteId,
|
|
"folderPath": folderPath,
|
|
"fileName": item.get("name", "file"),
|
|
"itemId": item.get("id"),
|
|
})
|
|
else:
|
|
return ActionResult.isFailure(error="Provide fileIds or connectionReference + sharepointFolder")
|
|
|
|
if not filesToProcess:
|
|
return ActionResult.isSuccess(documents=[])
|
|
|
|
# Attach all files as ChatDocuments so AI can resolve them via DocumentReferenceList.
|
|
# When running inside the graph engine there is no real ChatWorkflow (workflow.id is None),
|
|
# so we create in-memory ChatDocument objects and inject them directly into the placeholder
|
|
# workflow's messages list instead of going through storeMessageWithDocuments.
|
|
chatDocs = []
|
|
for f in filesToProcess:
|
|
chatDocs.append(ChatDocument(
|
|
id=str(uuid.uuid4()),
|
|
mandateId=self.services.mandateId or "",
|
|
featureInstanceId=featureInstanceId or "",
|
|
messageId="",
|
|
fileId=f["fileId"],
|
|
fileName=f["fileName"],
|
|
fileSize=0,
|
|
mimeType=f["mimeType"],
|
|
))
|
|
|
|
workflow = self.services.workflow
|
|
_wfId = getattr(workflow, "id", None) or ""
|
|
hasRealWorkflow = workflow is not None and bool(_wfId) and not str(_wfId).startswith("transient-")
|
|
|
|
if hasRealWorkflow:
|
|
chatDocDumps = [d.model_dump() for d in chatDocs]
|
|
messageData = {
|
|
"id": f"msg_extract_{uuid.uuid4().hex[:12]}",
|
|
"documentsLabel": "extract_files",
|
|
"role": "user",
|
|
"status": "step",
|
|
"message": f"Extract from {len(filesToProcess)} file(s)",
|
|
}
|
|
createdMessage = self.services.chat.storeMessageWithDocuments(
|
|
workflow, messageData, chatDocDumps,
|
|
)
|
|
if not createdMessage or not createdMessage.documents:
|
|
return ActionResult.isFailure(error="Failed to attach documents to workflow")
|
|
fileIdToChatDocId = {}
|
|
for i, f in enumerate(filesToProcess):
|
|
if i < len(createdMessage.documents):
|
|
fileIdToChatDocId[f["fileId"]] = createdMessage.documents[i].id
|
|
else:
|
|
# Graph-engine path: inject documents into the placeholder workflow so
|
|
# getChatDocumentsFromDocumentList can find them via workflow.messages.
|
|
msgId = f"msg_extract_{uuid.uuid4().hex[:12]}"
|
|
placeholderMsg = ChatMessage(
|
|
id=msgId,
|
|
workflowId=getattr(workflow, "id", None) or "transient",
|
|
documentsLabel="extract_files",
|
|
role="user",
|
|
status="step",
|
|
message=f"Extract from {len(filesToProcess)} file(s)",
|
|
documents=chatDocs,
|
|
)
|
|
if workflow is not None and hasattr(workflow, "messages"):
|
|
workflow.messages.append(placeholderMsg)
|
|
fileIdToChatDocId = {f["fileId"]: chatDocs[i].id for i, f in enumerate(filesToProcess)}
|
|
|
|
expenseList, bankList = await _getAccountLists(self, featureInstanceId)
|
|
|
|
# Parallel extraction (all files at once, 2-phase: classify + structure)
|
|
tasks = [
|
|
_extractOne(self, f, fileIdToChatDocId, expenseList, bankList, featureInstanceId)
|
|
for f in filesToProcess
|
|
]
|
|
resultDocuments = list(await asyncio.gather(*tasks))
|
|
|
|
# Move SharePoint files to processed/ or error/ (parallel)
|
|
if sharepointMoveInfo and len(sharepointMoveInfo) == len(resultDocuments):
|
|
sharepoint = self.services.sharepoint
|
|
|
|
async def _moveOneFile(moveInfo: Dict[str, Any], resultDoc: ActionDocument) -> None:
|
|
try:
|
|
raw = resultDoc.documentData
|
|
data = json.loads(raw) if isinstance(raw, str) else raw
|
|
hasError = "error" in data or not data.get("extractedData")
|
|
if hasError:
|
|
logger.info(f"Extraction failed for {moveInfo.get('fileName', '?')} — leaving file in place")
|
|
return
|
|
folderPath = (moveInfo.get("folderPath") or "").strip().rstrip("/")
|
|
destFolder = f"{folderPath}/processed".strip("/") if folderPath else "processed"
|
|
sourceFolder = folderPath or ""
|
|
fileName = moveInfo.get("fileName") or "file"
|
|
destFile = f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{fileName}"
|
|
await sharepoint.copyFileAsync(
|
|
moveInfo["siteId"], sourceFolder, fileName, destFolder, destFile
|
|
)
|
|
await sharepoint.deleteFile(moveInfo["siteId"], moveInfo["itemId"])
|
|
except Exception as e:
|
|
logger.warning(f"Move SharePoint file failed for {moveInfo.get('fileName', '?')}: {e}")
|
|
|
|
moveTasks = [
|
|
_moveOneFile(sharepointMoveInfo[i], resultDocuments[i])
|
|
for i in range(len(sharepointMoveInfo))
|
|
if sharepointMoveInfo[i] is not None
|
|
]
|
|
if moveTasks:
|
|
await asyncio.gather(*moveTasks, return_exceptions=True)
|
|
|
|
return ActionResult.isSuccess(documents=resultDocuments)
|