gateway/modules/workflows/methods/methodTrustee/actions/processDocuments.py
patrick-motsch f8137e857a Add document type, payment data and detailed descriptions to trustee positions
Extend TrusteePosition with documentType, payeeIban, payeeName, payeeBic, paymentReference and dueDate fields. Overhaul AI extraction prompts to capture full document details in desc (line items, addresses, conditions), extract QR/ESR payment references and IBAN from invoices, and tag each record with its documentType. Sanitise and normalise all new fields on write.

Made-with: Cursor
2026-03-01 10:14:46 +01:00

171 lines
7.2 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Process extracted documents: create TrusteeDocument + TrusteePosition from extraction JSON.
Input: documentList (reference to extractFromFiles result).
Each document is JSON with documentType, extractedData, fileId, fileName.
extractedData is a list of expense/position records.
Output: one ActionDocument with JSON { positionIds, documentIds } for chaining to syncToAccounting.
"""
import json
import logging
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList
logger = logging.getLogger(__name__)
def _parseFloat(value) -> float:
try:
if value is None or value == "":
return 0.0
return float(value)
except (ValueError, TypeError):
return 0.0
def _extractAccountNumber(value) -> Optional[str]:
"""Extract the leading numeric account number from AI output like '6200 Fahrzeugaufwand' -> '6200'."""
if not value or not isinstance(value, str):
return None
import re
match = re.match(r"(\d+)", value.strip())
return match.group(1) if match else value.strip() or None
def _normaliseTags(value) -> str:
"""Convert tags from various formats to a clean comma-separated string."""
if not value:
return ""
if isinstance(value, list):
return ", ".join(str(t) for t in value if t)
return str(value)
def _cleanStr(value, default=None) -> Optional[str]:
"""Strip and return a non-empty string, else *default*."""
if not value:
return default
s = str(value).strip()
return s if s else default
def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], featureInstanceId: str, mandateId: str, documentType: Optional[str] = None) -> Dict[str, Any]:
"""Map extraction record to TrusteePosition payload."""
recDocType = _cleanStr(record.get("documentType")) or documentType
if recDocType:
recDocType = recDocType.lower().strip()
return {
"documentId": documentId,
"documentType": recDocType,
"valuta": record.get("valuta"),
"transactionDateTime": record.get("transactionDateTime"),
"company": record.get("company", ""),
"desc": record.get("desc", ""),
"tags": _normaliseTags(record.get("tags")),
"bookingCurrency": record.get("bookingCurrency", "CHF"),
"bookingAmount": _parseFloat(record.get("bookingAmount", 0)),
"originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"),
"originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)),
"vatPercentage": _parseFloat(record.get("vatPercentage", 0)),
"vatAmount": _parseFloat(record.get("vatAmount", 0)),
"debitAccountNumber": _extractAccountNumber(record.get("debitAccountNumber")),
"creditAccountNumber": _extractAccountNumber(record.get("creditAccountNumber")),
"taxCode": record.get("taxCode") or None,
"costCenter": record.get("costCenter") or None,
"bookingReference": record.get("bookingReference") or None,
"payeeIban": _cleanStr(record.get("payeeIban")),
"payeeName": _cleanStr(record.get("payeeName")),
"payeeBic": _cleanStr(record.get("payeeBic")),
"paymentReference": _cleanStr(record.get("paymentReference")),
"dueDate": _cleanStr(record.get("dueDate")),
"featureInstanceId": featureInstanceId,
"mandateId": mandateId,
}
async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Resolve documentList to ChatDocuments, load extraction JSON per document,
create TrusteeDocument (with documentType) + TrusteePosition(s), return one JSON document with positionIds/documentIds.
"""
documentListParam = parameters.get("documentList")
featureInstanceId = parameters.get("featureInstanceId") or (getattr(self.services, "featureInstanceId", None))
if not documentListParam:
return ActionResult.isFailure(error="documentList is required (reference to extractFromFiles result)")
if not featureInstanceId:
return ActionResult.isFailure(error="featureInstanceId is required")
try:
docList = DocumentReferenceList.from_string_list(
documentListParam if isinstance(documentListParam, list) else [documentListParam]
)
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
if not chatDocuments:
return ActionResult.isFailure(error="No documents found for documentList")
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
trusteeInterface = getTrusteeInterface(
self.services.user,
mandateId=self.services.mandateId,
featureInstanceId=featureInstanceId
)
allPositionIds = []
allDocumentIds = []
for chatDoc in chatDocuments:
rawBytes = self.services.chat.getFileData(chatDoc.fileId)
if not rawBytes:
logger.warning(f"Could not load file {chatDoc.fileId}, skipping")
continue
content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes
data = json.loads(content) if isinstance(content, str) else content
documentType = data.get("documentType")
extractedData = data.get("extractedData")
fileId = data.get("fileId") or chatDoc.fileId
fileName = data.get("fileName") or chatDoc.fileName or "document"
records = extractedData if isinstance(extractedData, list) else [extractedData] if extractedData else []
if not records:
continue
docPayload = {
"fileId": fileId,
"documentName": fileName,
"documentMimeType": chatDoc.mimeType or "application/octet-stream",
"sourceType": "workflow",
"documentType": documentType,
}
trusteeDoc = trusteeInterface.createDocument(docPayload)
if not trusteeDoc:
logger.warning(f"Failed to create TrusteeDocument for {fileName}")
continue
allDocumentIds.append(trusteeDoc.id)
docTypeLower = (documentType or "unknown").lower()
for record in records:
posPayload = _recordToPosition(record, trusteeDoc.id, featureInstanceId, self.services.mandateId, documentType=docTypeLower)
pos = trusteeInterface.createPosition(posPayload)
if pos:
allPositionIds.append(pos.id)
payload = {"positionIds": allPositionIds, "documentIds": allDocumentIds}
return ActionResult.isSuccess(
documents=[
ActionDocument(
documentName="process_documents_result.json",
documentData=json.dumps(payload),
mimeType="application/json",
)
]
)
except Exception as e:
logger.exception("processDocuments failed")
return ActionResult.isFailure(error=str(e))