improve bank extraction retry + auto-matching for bank documents

Made-with: Cursor
This commit is contained in:
patrick-motsch 2026-03-01 21:54:56 +01:00
parent 565ad62c39
commit a6be2b90e0
2 changed files with 245 additions and 1 deletions

View file

@ -215,6 +215,48 @@ def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
return records
def _estimateBankTransactionLineCount(rawText: str) -> int:
"""Estimate how many transaction rows exist in bank statement OCR text."""
import re
lines = (rawText or "").splitlines()
datePattern = re.compile(r"\b(\d{2}[./-]\d{2}[./-]\d{2,4}|\d{4}-\d{2}-\d{2})\b")
amountPattern = re.compile(r"[-+]?\d{1,3}(?:[ '\u00A0]\d{3})*(?:[.,]\d{2})\b")
candidateCount = 0
for line in lines:
stripped = (line or "").strip()
if len(stripped) < 8:
continue
if datePattern.search(stripped) and amountPattern.search(stripped):
candidateCount += 1
return candidateCount
def _buildBankDocumentRetryPrompt(expenseList: str, bankList: str, expectedRows: int) -> str:
"""Build a stricter retry prompt to force full bank-row extraction."""
return (
"Du hast vorher zu wenige Buchungen extrahiert. "
"Extrahiere JETZT ALLE Transaktionszeilen aus dem Bankauszug vollstaendig. "
f"Erwartete Groessenordnung: mindestens {max(2, expectedRows)} Zeilen. "
"WICHTIG: Eine Transaktionszeile = genau ein Record. "
"Niemals Zeilen zusammenfassen, niemals nur die erste oder eine Beispielzeile liefern. "
"Wenn Details fehlen, trotzdem Record erzeugen und fehlende Felder als null setzen. "
"Return JSON: {\"records\": [{...}]}. "
"Jeder Record hat diese Felder:\n"
"- documentType: immer \"bank_document\"\n"
"- valuta (YYYY-MM-DD), transactionDateTime (unix seconds, optional)\n"
"- company (Gegenpartei)\n"
"- desc (vollstaendige Details der Zeile inkl. Referenz/Mitteilung)\n"
"- bookingAmount, bookingCurrency\n"
f"- debitAccountNumber (NUR Kontonummer aus: {expenseList})\n"
f"- creditAccountNumber (NUR Kontonummer aus: {bankList})\n"
"- bookingReference, payeeIban, payeeName, paymentReference\n"
"Kein MwSt bei Bankauszuegen."
)
async def _extractWithAi(
self,
chatDocumentId: str,
@ -293,6 +335,44 @@ async def _extractWithAi(
records = _parseStructuredRecords(raw2)
logger.info("Phase 2 result: documentType=%s, records=%d, raw2_length=%d", documentType, len(records), len(raw2))
# Failsafe for bank statements: retry with stricter prompt if extraction is likely incomplete.
if documentType == "BANK_DOCUMENT":
estimatedRows = _estimateBankTransactionLineCount(rawText)
likelyIncomplete = (
estimatedRows >= 3
and (
len(records) <= 1
or len(records) < max(2, estimatedRows // 2)
)
)
if likelyIncomplete:
retryPrompt = _buildBankDocumentRetryPrompt(expenseList, bankList, estimatedRows)
retryFullPrompt = f"{retryPrompt}\n\nDOKUMENT-TEXT:\n{rawText}"
retryRequest = AiCallRequest(
prompt=retryFullPrompt,
context="",
options=AiCallOptions(resultFormat="json"),
)
retryResponse = await self.services.ai.callAi(retryRequest)
retryRaw = (retryResponse.content or "").strip() if hasattr(retryResponse, "content") else ""
retryRecords = _parseStructuredRecords(retryRaw)
if len(retryRecords) > len(records):
records = retryRecords
logger.info(
"Bank statement retry improved extraction: records=%d -> %d (estimatedRows=%d, file=%s)",
len(_parseStructuredRecords(raw2)),
len(records),
estimatedRows,
fileName,
)
else:
logger.warning(
"Bank statement extraction may be incomplete: records=%d, estimatedRows=%d, file=%s",
len(records),
estimatedRows,
fileName,
)
if records and (not documentType or documentType == "UNKNOWN"):
documentType = "EXPENSE_RECEIPT"

View file

@ -10,6 +10,7 @@ Output: one ActionDocument with JSON { positionIds, documentIds } for chaining t
import json
import logging
from datetime import datetime
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
@ -53,6 +54,121 @@ def _cleanStr(value, default=None) -> Optional[str]:
return s if s else default
def _normaliseRef(value: Any) -> Optional[str]:
"""Normalise payment references for robust matching."""
raw = _cleanStr(value)
if not raw:
return None
import re
return re.sub(r"[^A-Z0-9]", "", raw.upper()) or None
def _parseIsoDate(value: Any) -> Optional[datetime]:
"""Parse YYYY-MM-DD date for proximity scoring."""
raw = _cleanStr(value)
if not raw:
return None
try:
return datetime.strptime(raw[:10], "%Y-%m-%d")
except ValueError:
return None
def _normaliseAmount(value: Any) -> float:
"""Use absolute rounded amount, since bank lines are often signed."""
return round(abs(_parseFloat(value)), 2)
def _normaliseCompany(value: Any) -> Optional[str]:
"""Normalise company names for approximate matching."""
raw = _cleanStr(value)
if not raw:
return None
import re
cleaned = re.sub(r"[^A-Z0-9]", "", raw.upper())
return cleaned or None
def _findBestBankMatch(
bankPosition: Dict[str, Any],
candidatePositions: List[Dict[str, Any]],
alreadyMatchedIds: set,
) -> Optional[Dict[str, Any]]:
"""Find best invoice/expense position for one bank position."""
bankRef = _normaliseRef(bankPosition.get("paymentReference") or bankPosition.get("bookingReference"))
bankAmount = _normaliseAmount(bankPosition.get("bookingAmount"))
bankIban = _normaliseRef(bankPosition.get("payeeIban"))
bankDate = _parseIsoDate(bankPosition.get("valuta"))
bankCompany = _normaliseCompany(bankPosition.get("company"))
bestScore = 0
bestCandidate = None
for candidate in candidatePositions:
candidateId = candidate.get("id")
if not candidateId or candidateId in alreadyMatchedIds:
continue
if candidate.get("bankDocumentId"):
continue
if (candidate.get("documentType") or "").lower().strip() == "bank_document":
continue
score = 0
candidateRef = _normaliseRef(candidate.get("paymentReference") or candidate.get("bookingReference"))
candidateAmount = _normaliseAmount(candidate.get("bookingAmount"))
candidateIban = _normaliseRef(candidate.get("payeeIban"))
candidateDate = _parseIsoDate(candidate.get("valuta"))
candidateCompany = _normaliseCompany(candidate.get("company"))
# Strongest signal: structured payment reference / invoice reference match.
if bankRef and candidateRef and bankRef == candidateRef:
score += 100
# Amount must usually match; use tolerance for minor rounding differences.
if abs(candidateAmount - bankAmount) <= 0.05:
score += 40
# IBAN is a strong supporting signal.
if bankIban and candidateIban and bankIban == candidateIban:
score += 25
# Small date difference increases confidence.
if bankDate and candidateDate:
dayDiff = abs((bankDate - candidateDate).days)
if dayDiff <= 3:
score += 20
elif dayDiff <= 14:
score += 10
# Company/party comparison helps when no structured reference is present.
if bankCompany and candidateCompany and (
bankCompany in candidateCompany or candidateCompany in bankCompany
):
score += 15
# If no reference exists, require stronger secondary evidence.
minScore = 45 if bankRef else 65
if score >= minScore and score > bestScore:
bestScore = score
bestCandidate = candidate
return bestCandidate
def _safeRecordFromModel(modelOrDict: Any) -> Dict[str, Any]:
"""Convert Pydantic/dict object to plain dict for matching."""
if not modelOrDict:
return {}
if isinstance(modelOrDict, dict):
return modelOrDict
if hasattr(modelOrDict, "model_dump"):
return modelOrDict.model_dump()
try:
return dict(modelOrDict)
except Exception:
return {}
def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], featureInstanceId: str, mandateId: str, documentType: Optional[str] = None) -> Dict[str, Any]:
"""Map extraction record to TrusteePosition payload."""
recDocType = _cleanStr(record.get("documentType")) or documentType
@ -119,6 +235,7 @@ async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
allPositionIds = []
allDocumentIds = []
autoMatchedPositionIds = []
for chatDoc in chatDocuments:
rawBytes = self.services.chat.getFileData(chatDoc.fileId)
@ -150,13 +267,60 @@ async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
allDocumentIds.append(trusteeDoc.id)
docTypeLower = (documentType or "unknown").lower()
createdPositions = []
for record in records:
posPayload = _recordToPosition(record, trusteeDoc.id, featureInstanceId, self.services.mandateId, documentType=docTypeLower)
pos = trusteeInterface.createPosition(posPayload)
if pos:
allPositionIds.append(pos.id)
createdPositions.append(pos)
payload = {"positionIds": allPositionIds, "documentIds": allDocumentIds}
# Auto-link bank statement lines to existing invoice/expense positions.
if docTypeLower == "bank_document" and createdPositions:
try:
from modules.features.trustee.datamodelFeatureTrustee import TrusteePosition
candidatesRaw = trusteeInterface.db.getRecordset(
TrusteePosition,
recordFilter={"featureInstanceId": featureInstanceId},
)
candidatePositions = [_safeRecordFromModel(c) for c in (candidatesRaw or [])]
matchedInThisBankDoc = set()
for createdPos in createdPositions:
bankPosition = _safeRecordFromModel(createdPos)
if not bankPosition:
continue
matchCandidate = _findBestBankMatch(
bankPosition=bankPosition,
candidatePositions=candidatePositions,
alreadyMatchedIds=matchedInThisBankDoc,
)
if not matchCandidate:
continue
matchedId = matchCandidate.get("id")
if not matchedId:
continue
updated = trusteeInterface.updatePosition(
matchedId,
{
"bankDocumentId": trusteeDoc.id,
},
)
if updated:
matchedInThisBankDoc.add(matchedId)
autoMatchedPositionIds.append(matchedId)
except Exception:
logger.exception("Automatic bank-document matching failed for documentId=%s", trusteeDoc.id)
payload = {
"positionIds": allPositionIds,
"documentIds": allDocumentIds,
"autoMatchedPositionIds": autoMatchedPositionIds,
}
return ActionResult.isSuccess(
documents=[
ActionDocument(