diff --git a/modules/features/trustee/accounting/accountingBridge.py b/modules/features/trustee/accounting/accountingBridge.py
index 664166c0..1e6a9f78 100644
--- a/modules/features/trustee/accounting/accountingBridge.py
+++ b/modules/features/trustee/accounting/accountingBridge.py
@@ -133,17 +133,98 @@ class AccountingBridge:
return SyncResult(success=False, errorMessage=f"Position {positionId} not found")
position = posRecords[0]
- # Duplicate check
+ # Build booking once (for push; externalDocumentIds filled after document upload)
+ booking = self._buildBookingFromPosition(position)
+
+ # 1) First: ensure all documents are in RMA (upload or duplicate); collect Beleg-IDs for linking
+ documentIds = []
+ for key in ("documentId", "bankDocumentId"):
+ docId = position.get(key)
+ if docId:
+ documentIds.append(docId)
+ if documentIds:
+ from modules.features.trustee.datamodelFeatureTrustee import TrusteeDocument as TrusteeDocumentModel
+ logger.info("Accounting sync: positionId=%s, syncing %s document(s) to RMA ...", positionId, len(documentIds))
+ belegIds = []
+ belegLabels = []
+ for documentId in documentIds:
+ doc = self._trusteeInterface.getDocument(documentId)
+ if not doc:
+ continue
+ fileName = getattr(doc, "documentName", None) or "beleg.pdf"
+ existingBelegId = getattr(doc, "externalBelegId", None)
+ if existingBelegId:
+ logger.info("Accounting sync: document %s already has belegId=%s, skipping upload", documentId, existingBelegId)
+ belegIds.append(existingBelegId)
+ belegLabels.append(fileName)
+ continue
+ docData = self._trusteeInterface.getDocumentData(documentId)
+ if docData is None:
+ continue
+ mimeType = getattr(doc, "documentMimeType", None) or "application/pdf"
+ uploadResult = await connector.uploadDocument(
+ plainConfig,
+ fileName=fileName,
+ fileContent=docData,
+ mimeType=mimeType,
+ comment=booking.reference,
+ )
+ if not uploadResult.success:
+ errMsg = f"Dokument konnte nicht nach RMA hochgeladen werden: {uploadResult.errorMessage}"
+ logger.error(
+ "Accounting sync failed (document upload): positionId=%s, documentId=%s, error=%s",
+ positionId, documentId, uploadResult.errorMessage,
+ )
+ return SyncResult(success=False, errorMessage=errMsg)
+ belegId = uploadResult.externalId
+ if belegId:
+ self._trusteeInterface.db.recordModify(TrusteeDocumentModel, documentId, {"externalBelegId": belegId})
+ logger.info("Accounting sync: document uploaded & belegId=%s stored on document %s", belegId, documentId)
+ else:
+ logger.info("Accounting sync: document uploaded but no belegId in response (409 duplicate?), fileName=%s", fileName)
+ belegIds.append(belegId)
+ belegLabels.append(fileName)
+ if belegIds or belegLabels:
+ booking.externalDocumentIds = belegIds
+ booking.externalDocumentLabels = belegLabels
+ logger.info("Accounting sync: positionId=%s, document sync done, pushing GL booking (POST /gl) ...", positionId)
+
+ # Duplicate check: if locally marked as synced, verify with Buha system
+ accountingSyncId = position.get("accountingSyncId")
existingSyncs = self._trusteeInterface.db.getRecordset(
TrusteeAccountingSync,
recordFilter={"positionId": positionId, "connectorType": connectorType, "syncStatus": "synced"},
)
- if existingSyncs:
- return SyncResult(success=False, errorMessage="Position already synced to this system")
+ if accountingSyncId or existingSyncs:
+ checkResult = await connector.isBookingSynced(plainConfig, booking)
+ if checkResult.success:
+ logger.info(
+ "Accounting sync skipped (verified in Buha): positionId=%s, reference=%s",
+ positionId, booking.reference,
+ )
+ return SyncResult(success=False, errorMessage="Position already synced to this system")
+ # Not found in Buha (e.g. deleted there): clear local records and re-push
+ logger.info(
+ "Accounting sync: reference %s not found in Buha (deleted?), clearing local records and re-pushing positionId=%s",
+ booking.reference, positionId,
+ )
+ if accountingSyncId:
+ self._trusteeInterface.db.recordModify(TrusteePosition, positionId, {"accountingSyncId": None})
+ for rec in existingSyncs:
+ rid = rec.get("id")
+ if rid:
+ self._trusteeInterface.db.recordDelete(TrusteeAccountingSync, rid)
- # Build and push
- booking = self._buildBookingFromPosition(position)
+ # 2) Then: push booking (with reference to document IDs so RMA can link)
+ if not documentIds:
+ logger.info("Accounting sync: positionId=%s, no documents, pushing GL booking (POST /gl) ...", positionId)
result = await connector.pushBooking(plainConfig, booking)
+ if not result.success:
+ logger.error(
+ "Accounting sync failed: positionId=%s, error=%s",
+ positionId,
+ result.errorMessage or "unknown",
+ )
# Save sync record
import uuid
@@ -163,13 +244,24 @@ class AccountingBridge:
}
self._trusteeInterface.db.recordCreate(TrusteeAccountingSync, syncRecord)
+ # Write back external ID to position (source of truth for sync check)
+ if result.success and result.externalId:
+ self._trusteeInterface.db.recordModify(
+ TrusteePosition, positionId, {"accountingSyncId": result.externalId}
+ )
+
# Update last sync on config record
if configRecord:
from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig
- self._trusteeInterface.db.recordModify(TrusteeAccountingConfig, configRecord["id"], {
+ updatePayload = {
"lastSyncAt": time.time(),
"lastSyncStatus": "success" if result.success else "error",
- })
+ }
+ if result.success:
+ updatePayload["lastSyncErrorMessage"] = None
+ else:
+ updatePayload["lastSyncErrorMessage"] = result.errorMessage or "Sync failed"
+ self._trusteeInterface.db.recordModify(TrusteeAccountingConfig, configRecord["id"], updatePayload)
return result
diff --git a/modules/features/trustee/accounting/accountingConnectorBase.py b/modules/features/trustee/accounting/accountingConnectorBase.py
index 1a427e21..775a07b6 100644
--- a/modules/features/trustee/accounting/accountingConnectorBase.py
+++ b/modules/features/trustee/accounting/accountingConnectorBase.py
@@ -28,6 +28,8 @@ class AccountingBooking(BaseModel):
bookingDate: str
description: str = ""
lines: List[AccountingBookingLine] = []
+ externalDocumentIds: Optional[List[str]] = None # e.g. RMA Beleg-IDs, sent before booking for linking
+ externalDocumentLabels: Optional[List[str]] = None # display names for links (e.g. file names), one per id
class AccountingChart(BaseModel):
@@ -91,6 +93,19 @@ class BaseAccountingConnector(ABC):
async def getBookingStatus(self, config: Dict[str, Any], externalId: str) -> SyncResult:
"""Query the status of a previously pushed booking."""
+ async def getBookingByExternalId(self, config: Dict[str, Any], externalId: str) -> SyncResult:
+ """Fetch the booking in the external system by its external ID (UUID).
+ success=True: record exists. success=False: not found or error (e.g. deleted in Buha).
+ Override in connectors that support exact lookup; default = not supported."""
+ return SyncResult(success=False, errorMessage="Lookup by external ID not supported by this connector")
+
+ async def isBookingSynced(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult:
+ """Check with the external system if this booking already exists.
+ success=True: booking exists in external system (do not push again).
+ success=False: not found or error (allow push).
+ Default: success=True (trust local sync record; override in connectors that can verify via API, e.g. RMA)."""
+ return SyncResult(success=True)
+
async def pushInvoice(self, config: Dict[str, Any], invoice: Dict[str, Any]) -> SyncResult:
"""Push an invoice. Override in connectors that support it."""
return SyncResult(success=False, errorMessage="Not supported by this connector")
@@ -103,6 +118,13 @@ class BaseAccountingConnector(ABC):
"""Load the vendor list. Override in connectors that support it."""
return []
- async def uploadDocument(self, config: Dict[str, Any], fileName: str, fileContent: bytes, mimeType: str = "application/pdf") -> SyncResult:
- """Upload a document/receipt. Override in connectors that support it."""
+ async def uploadDocument(
+ self,
+ config: Dict[str, Any],
+ fileName: str,
+ fileContent: bytes,
+ mimeType: str = "application/pdf",
+ comment: Optional[str] = None,
+ ) -> SyncResult:
+ """Upload a document/receipt (e.g. beleg). comment can link to booking reference. Override in connectors that support it."""
return SyncResult(success=False, errorMessage="Document upload not supported by this connector")
diff --git a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py
index 30aeff39..e55cfe40 100644
--- a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py
+++ b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py
@@ -7,8 +7,12 @@ Auth: Static API key via X-RMA-KEY header.
Base URL: https://service.runmyaccounts.com/api/latest/clients/{clientName}/
"""
+import asyncio
+import json
import logging
-from typing import List, Dict, Any
+import re
+from datetime import datetime
+from typing import List, Dict, Any, Optional
import aiohttp
@@ -59,7 +63,7 @@ class AccountingConnectorRma(BaseAccountingConnector):
apiKey = config.get("apiKey", "")
return {
"Authorization": f"Bearer {apiKey}",
- "Accept": "application/json",
+ "Accept": "application/json, application/xml, */*", # RMA may return XML on error (e.g. 406)
"Content-Type": "application/json",
}
@@ -106,34 +110,97 @@ class AccountingConnectorRma(BaseAccountingConnector):
return []
async def pushBooking(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult:
- """Push a GL batch booking to RMA."""
+ """Push a GL batch booking to RMA. API expects request body = batch object (no wrapper); only non-zero amount per line."""
try:
- entries = []
- for line in booking.lines:
- entry = {
- "accno": line.accountNumber,
- "transdate": booking.bookingDate,
- "reference": booking.reference,
- "description": line.description or booking.description,
- }
- if line.debitAmount > 0:
- entry["debit"] = line.debitAmount
- if line.creditAmount > 0:
- entry["credit"] = line.creditAmount
- if line.taxCode:
- entry["tax_code"] = line.taxCode
- entries.append(entry)
+ if not booking.lines:
+ return SyncResult(success=False, errorMessage="Missing transactions in batch (no booking lines)")
- payload = {"gl_batch": {"entry": entries}}
+ glTransactions = []
+ for line in booking.lines:
+ t = {"accno": line.accountNumber}
+ if line.debitAmount and line.debitAmount > 0:
+ t["debit_amount"] = line.debitAmount
+ if line.creditAmount and line.creditAmount > 0:
+ t["credit_amount"] = line.creditAmount
+ if line.description:
+ t["memo"] = (line.description or "")[:500]
+ glTransactions.append(t)
+
+ transdate = booking.bookingDate or ""
+ if transdate and "T" not in str(transdate):
+ transdate = f"{transdate}T00:00:00.000+00:00"
+ batchNumber = (booking.reference or "").strip()[:32]
+ if not batchNumber:
+ batchNumber = "GL-" + (booking.reference or str(id(booking)))[:24]
+ rawDesc = (booking.description or "").strip()
+ externalDocIds = getattr(booking, "externalDocumentIds", None) or []
+ externalDocLabels = getattr(booking, "externalDocumentLabels", None) or []
+ if externalDocIds or externalDocLabels:
+ clientSegment = (config.get("clientId") or config.get("clientName") or "").strip()
+ docParts = []
+ maxI = max(len(externalDocIds), len(externalDocLabels))
+ for i in range(min(maxI, 10)):
+ label = (externalDocLabels[i] if i < len(externalDocLabels) else "Rechnung") or "Rechnung"
+ belegId = externalDocIds[i] if i < len(externalDocIds) else None
+ if belegId:
+ docUrl = f"https://my.runmyaccounts.com/phoenix-core/api/clients/{clientSegment}/workflow/l0doc/file/{belegId}"
+ docParts.append(f'{label}')
+ else:
+ docParts.append(label)
+ erfDate = datetime.utcnow().strftime("%d.%m.%Y")
+ linkSuffix = " (" + ", ".join(docParts) + ", erf. " + erfDate + ")"
+ shortDesc = (rawDesc[:80] + "…") if len(rawDesc) > 80 else rawDesc
+ description = (shortDesc + linkSuffix).strip()[:500]
+ else:
+ description = rawDesc[:500]
+ payload = {
+ "batch_number": batchNumber,
+ "transdate": transdate,
+ "description": description,
+ "currency": "CHF",
+ "exchangerate": 1.0,
+ "gl_transactions": {"gl_transaction": glTransactions},
+ }
+ if rawDesc and len(rawDesc) > 80:
+ payload["notes"] = rawDesc[:2000]
async with aiohttp.ClientSession() as session:
url = self._buildUrl(config, "gl")
async with session.post(url, headers=self._buildHeaders(config), json=payload, timeout=aiohttp.ClientTimeout(total=30)) as resp:
body = await resp.text()
if resp.status in (200, 201, 204):
- return SyncResult(success=True, rawResponse={"status": resp.status, "body": body[:500]})
- return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:300]}")
+ externalId = None
+ try:
+ data = json.loads(body) if body.strip() else {}
+ if isinstance(data, dict):
+ externalId = (
+ data.get("id")
+ or data.get("batch_id")
+ or data.get("entry_id")
+ or (data.get("gl_batch") or {}).get("id")
+ )
+ if externalId is not None:
+ externalId = str(externalId).strip()
+ if not externalId:
+ externalId = batchNumber
+ except Exception:
+ externalId = batchNumber
+ return SyncResult(
+ success=True,
+ externalId=externalId or batchNumber,
+ rawResponse={"status": resp.status, "body": body[:500]},
+ )
+ errMsg = f"HTTP {resp.status}: {body[:300]}"
+ logger.error("RMA pushBooking failed: status=%s, body=%s", resp.status, body[:500])
+ return SyncResult(success=False, errorMessage=errMsg)
+ except asyncio.TimeoutError:
+ logger.warning("RMA pushBooking timeout (30s)")
+ return SyncResult(
+ success=False,
+ errorMessage="Verbindung zum Buchhaltungssystem hat zu lange gedauert (Timeout). Bitte später erneut versuchen.",
+ )
except Exception as e:
+ logger.exception("RMA pushBooking error")
return SyncResult(success=False, errorMessage=str(e))
async def getBookingStatus(self, config: Dict[str, Any], externalId: str) -> SyncResult:
@@ -147,6 +214,67 @@ class AccountingConnectorRma(BaseAccountingConnector):
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
+ async def getBookingByExternalId(self, config: Dict[str, Any], externalId: str) -> SyncResult:
+ """Check if the GL batch/transaction still exists in RMA by external ID (id or batch_number)."""
+ if not externalId or not str(externalId).strip():
+ return SyncResult(success=False, errorMessage="Not found")
+ try:
+ async with aiohttp.ClientSession() as session:
+ url = self._buildUrl(config, f"gl/{externalId.strip()}")
+ async with session.get(
+ url, headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=15)
+ ) as resp:
+ if resp.status == 200:
+ return SyncResult(success=True, externalId=externalId)
+ if resp.status == 404:
+ return SyncResult(success=False, errorMessage="Not found")
+ body = await resp.text()
+ return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:200]}")
+ except Exception as e:
+ logger.debug("RMA getBookingByExternalId failed: %s", e)
+ return SyncResult(success=False, errorMessage=str(e))
+
+ async def isBookingSynced(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult:
+ """Check if booking exists in RMA by searching transactions for reference match.
+ Uses GET charts/{accno}/transactions with date filter. success=True = booking exists."""
+ if not booking.lines:
+ return SyncResult(success=True)
+ ref = (booking.reference or "").strip()[:32]
+ if not ref:
+ return SyncResult(success=True)
+ fromDate = (booking.bookingDate or "").split("T")[0].strip()[:10]
+ if not fromDate or len(fromDate) < 10:
+ return SyncResult(success=True)
+ accountNumbers = list({ln.accountNumber for ln in booking.lines if ln.accountNumber})
+ if not accountNumbers:
+ return SyncResult(success=True)
+ try:
+ async with aiohttp.ClientSession() as session:
+ for accno in accountNumbers:
+ url = self._buildUrl(config, f"charts/{accno}/transactions")
+ params = {"from_date": fromDate, "to_date": fromDate}
+ async with session.get(
+ url, headers=self._buildHeaders(config), params=params, timeout=aiohttp.ClientTimeout(total=15)
+ ) as resp:
+ if resp.status != 200:
+ continue
+ data = await resp.json()
+ transactions = data.get("transaction") if isinstance(data, dict) else []
+ if isinstance(data, list):
+ transactions = data
+ if not isinstance(transactions, list):
+ transactions = [transactions] if isinstance(transactions, dict) else []
+ for t in transactions:
+ if isinstance(t, dict) and (t.get("reference") or "").strip() == ref:
+ return SyncResult(success=True)
+ return SyncResult(success=False, errorMessage="Reference not found in RMA transactions")
+ except asyncio.TimeoutError:
+ logger.debug("RMA isBookingSynced timeout – trust local")
+ return SyncResult(success=True)
+ except Exception as e:
+ logger.debug("RMA isBookingSynced error: %s – trust local", e)
+ return SyncResult(success=True)
+
async def pushInvoice(self, config: Dict[str, Any], invoice: Dict[str, Any]) -> SyncResult:
try:
async with aiohttp.ClientSession() as session:
@@ -185,20 +313,127 @@ class AccountingConnectorRma(BaseAccountingConnector):
logger.error(f"RMA getVendors error: {e}")
return []
- async def uploadDocument(self, config: Dict[str, Any], fileName: str, fileContent: bytes, mimeType: str = "application/pdf") -> SyncResult:
- """Upload a receipt via POST /belege (multipart/form-data)."""
+ async def _findBelegByFilename(self, config: Dict[str, Any], session: aiohttp.ClientSession, fileName: str) -> Optional[str]:
+ """Try GET /belege (undocumented) to find an existing beleg by filename."""
+ try:
+ url = self._buildUrl(config, "belege")
+ async with session.get(url, headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=15)) as resp:
+ body = await resp.text()
+ logger.info("RMA GET /belege: status=%s, body=%s", resp.status, body[:1000])
+ if resp.status != 200:
+ return None
+ try:
+ data = json.loads(body)
+ except Exception:
+ data = None
+ if isinstance(data, list):
+ items = data
+ elif isinstance(data, dict):
+ items = data.get("beleg_upload") or data.get("belege") or data.get("row") or []
+ if isinstance(items, dict):
+ items = [items]
+ else:
+ # Try XML: extract all + pairs
+ ids = re.findall(r"([^<]+)", body)
+ names = re.findall(r"([^<]+)", body)
+ for bid, fname in zip(ids, names):
+ if fileName.lower() in fname.lower():
+ logger.info("RMA GET /belege: matched filename=%s → belegId=%s", fname, bid)
+ return bid.strip()
+ return None
+ if not isinstance(items, list):
+ return None
+ for item in items:
+ if not isinstance(item, dict):
+ continue
+ fname = item.get("file_name") or item.get("fileName") or ""
+ if fileName.lower() in fname.lower():
+ bid = item.get("id")
+ if bid:
+ logger.info("RMA GET /belege: matched filename=%s → belegId=%s", fname, bid)
+ return str(bid).strip()
+ except Exception as e:
+ logger.debug("RMA _findBelegByFilename failed: %s", e)
+ return None
+
+ def _parseExistingBelegId(self, body: str) -> Optional[str]:
+ """Try to extract the existing Beleg-ID from a 409 duplicate response. Tries multiple patterns."""
+ if not (body or "").strip():
+ return None
+ # Pattern: "id":12345 or "id":"12345" in JSON
+ match = re.search(r'"id"\s*:\s*"?(\d+)"?', body)
+ if match:
+ return match.group(1).strip()
+ # Pattern: 12345 in XML
+ match = re.search(r"([^<]+)", body)
+ if match:
+ return match.group(1).strip()
+ # Pattern: numeric id in URL like /file/12345
+ match = re.search(r"/file/(\d+)", body)
+ if match:
+ return match.group(1).strip()
+ return None
+
+ def _parseBelegIdFromResponse(self, body: str) -> Optional[str]:
+ """Extract Beleg-ID from RMA POST /belege response (XML or JSON)."""
+ if not (body or "").strip():
+ return None
+ try:
+ data = json.loads(body)
+ if isinstance(data, dict):
+ bid = data.get("id") or (data.get("beleg_upload") or {}).get("id")
+ if bid is not None:
+ return str(bid).strip()
+ except Exception:
+ pass
+ match = re.search(r"([^<]+)", body)
+ if match:
+ return match.group(1).strip()
+ return None
+
+ async def uploadDocument(
+ self,
+ config: Dict[str, Any],
+ fileName: str,
+ fileContent: bytes,
+ mimeType: str = "application/pdf",
+ comment: Optional[str] = None,
+ ) -> SyncResult:
+ """Upload a receipt via POST /belege (multipart/form-data). Returns SyncResult with externalId = Beleg-ID when present."""
try:
formData = aiohttp.FormData()
formData.add_field("Filedata", fileContent, filename=fileName, content_type=mimeType)
+ if comment:
+ formData.add_field("comment", comment[:500])
headers = self._buildHeaders(config)
headers.pop("Content-Type", None) # let aiohttp set multipart boundary
+ headers["Accept"] = "*/*" # RMA may return XML on error; avoid 406 Not Acceptable
async with aiohttp.ClientSession() as session:
url = self._buildUrl(config, "belege")
async with session.post(url, headers=headers, data=formData, timeout=aiohttp.ClientTimeout(total=60)) as resp:
body = await resp.text()
+ belegId = self._parseBelegIdFromResponse(body)
if resp.status in (200, 201):
- return SyncResult(success=True, rawResponse={"body": body[:500]})
- return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:300]}")
+ return SyncResult(
+ success=True,
+ externalId=belegId,
+ rawResponse={"body": body[:500]},
+ )
+ if resp.status == 409:
+ logger.info("RMA uploadDocument 409 (duplicate), body=%s", body[:500])
+ if not belegId:
+ belegId = self._parseExistingBelegId(body)
+ if not belegId:
+ belegId = await self._findBelegByFilename(config, session, fileName)
+ return SyncResult(
+ success=True,
+ externalId=belegId,
+ rawResponse={"body": body[:500]},
+ )
+ errMsg = f"HTTP {resp.status}: {body[:300]}"
+ logger.error("RMA uploadDocument failed: status=%s, body=%s", resp.status, body[:500])
+ return SyncResult(success=False, errorMessage=errMsg)
except Exception as e:
+ logger.exception("RMA uploadDocument error")
return SyncResult(success=False, errorMessage=str(e))
diff --git a/modules/features/trustee/datamodelFeatureTrustee.py b/modules/features/trustee/datamodelFeatureTrustee.py
index 09643735..2f630464 100644
--- a/modules/features/trustee/datamodelFeatureTrustee.py
+++ b/modules/features/trustee/datamodelFeatureTrustee.py
@@ -2,6 +2,7 @@
# All rights reserved.
"""Trustee models: TrusteeOrganisation, TrusteeRole, TrusteeAccess, TrusteeContract, TrusteeDocument, TrusteePosition."""
+from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
from modules.shared.attributeUtils import registerModelLabels
@@ -278,6 +279,16 @@ registerModelLabels(
)
+class TrusteeDocumentTypeEnum(str, Enum):
+ """Document type for trustee documents (expense extraction, ingest, sync)."""
+ INVOICE = "invoice"
+ EXPENSE_RECEIPT = "expense_receipt"
+ BANK_DOCUMENT = "bank_document"
+ CONTRACT = "contract"
+ UNKNOWN = "unknown"
+ AUTO = "auto"
+
+
class TrusteeDocument(BaseModel):
"""Contains document references for bookings.
@@ -362,6 +373,25 @@ class TrusteeDocument(BaseModel):
"frontend_hidden": True
}
)
+ documentType: Optional[str] = Field(
+ default=None,
+ description="Document type (e.g. invoice, expense_receipt, bank_document, contract); use TrusteeDocumentTypeEnum values",
+ json_schema_extra={
+ "frontend_type": "text",
+ "frontend_readonly": False,
+ "frontend_required": False
+ }
+ )
+ externalBelegId: Optional[str] = Field(
+ default=None,
+ description="External Beleg-ID in accounting system (e.g. RMA); set on first successful upload, reused on re-sync",
+ json_schema_extra={
+ "frontend_type": "text",
+ "frontend_readonly": True,
+ "frontend_required": False,
+ "frontend_hidden": True
+ }
+ )
# System attributes are automatically set by DatabaseConnector
@@ -377,6 +407,8 @@ registerModelLabels(
"sourceLocation": {"en": "Source Location", "fr": "Emplacement source", "de": "Quellort"},
"mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"},
"featureInstanceId": {"en": "Feature Instance", "fr": "Instance de fonctionnalité", "de": "Feature-Instanz"},
+ "documentType": {"en": "Document Type", "fr": "Type de document", "de": "Dokumenttyp"},
+ "externalBelegId": {"en": "Beleg ID (Accounting)", "fr": "ID Beleg (Comptabilité)", "de": "Beleg-ID (Buchhaltung)"},
},
)
@@ -384,7 +416,7 @@ registerModelLabels(
class TrusteePosition(BaseModel):
"""Contains booking positions (expense entries).
- Each position references exactly one source document via documentId (1:N relationship).
+ A position can have up to two document references: documentId (Beleg) and bankDocumentId (Bank-Referenz).
One document (e.g. bank statement) can generate many positions.
"""
id: str = Field(
@@ -398,7 +430,17 @@ class TrusteePosition(BaseModel):
)
documentId: Optional[str] = Field(
default=None,
- description="Reference to TrusteeDocument.id (source document that generated this position)",
+ description="Reference to TrusteeDocument.id (Beleg / primary document)",
+ json_schema_extra={
+ "frontend_type": "select",
+ "frontend_readonly": False,
+ "frontend_required": False,
+ "frontend_options": "/api/trustee/{instanceId}/documents/options"
+ }
+ )
+ bankDocumentId: Optional[str] = Field(
+ default=None,
+ description="Reference to TrusteeDocument.id (Bank-Referenz / second document)",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": False,
@@ -582,7 +624,17 @@ class TrusteePosition(BaseModel):
"frontend_hidden": True
}
)
-
+ accountingSyncId: Optional[str] = Field(
+ default=None,
+ description="External ID (UUID) of the synced record in the accounting system; set by sync, used for duplicate check",
+ json_schema_extra={
+ "frontend_type": "text",
+ "frontend_readonly": True,
+ "frontend_required": False,
+ "frontend_hidden": True
+ }
+ )
+
# Allow extra fields like _createdAt from database
model_config = {"extra": "allow"}
@@ -593,6 +645,7 @@ registerModelLabels(
{
"id": {"en": "ID", "fr": "ID", "de": "ID"},
"documentId": {"en": "Document", "fr": "Document", "de": "Dokument"},
+ "bankDocumentId": {"en": "Bank Reference", "fr": "Référence bancaire", "de": "Bank-Referenz"},
"valuta": {"en": "Value Date", "fr": "Date de valeur", "de": "Valutadatum"},
"transactionDateTime": {"en": "Transaction Date/Time", "fr": "Date/Heure de transaction", "de": "Transaktionszeitpunkt"},
"company": {"en": "Company", "fr": "Entreprise", "de": "Firma"},
@@ -611,6 +664,7 @@ registerModelLabels(
"bookingReference": {"en": "Booking Reference", "fr": "Référence de réservation", "de": "Buchungsreferenz"},
"mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"},
"featureInstanceId": {"en": "Feature Instance", "fr": "Instance de fonctionnalité", "de": "Feature-Instanz"},
+ "accountingSyncId": {"en": "Accounting Sync ID", "fr": "ID sync comptabilité", "de": "Buha-Sync-ID"},
},
)
@@ -629,6 +683,7 @@ class TrusteeAccountingConfig(BaseModel):
isActive: bool = Field(default=True)
lastSyncAt: Optional[float] = Field(default=None, description="Timestamp of last sync attempt")
lastSyncStatus: Optional[str] = Field(default=None, description="Last sync result: success, error, partial")
+ lastSyncErrorMessage: Optional[str] = Field(default=None, description="Error message when lastSyncStatus is error")
mandateId: Optional[str] = Field(default=None)
@@ -643,6 +698,7 @@ registerModelLabels(
"isActive": {"en": "Active", "fr": "Actif", "de": "Aktiv"},
"lastSyncAt": {"en": "Last Sync", "fr": "Dernière sync.", "de": "Letzte Synchronisation"},
"lastSyncStatus": {"en": "Status", "fr": "Statut", "de": "Status"},
+ "lastSyncErrorMessage": {"en": "Error", "fr": "Erreur", "de": "Fehlermeldung"},
"mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"},
},
)
diff --git a/modules/features/trustee/interfaceFeatureTrustee.py b/modules/features/trustee/interfaceFeatureTrustee.py
index 9ead992d..4c97cc4d 100644
--- a/modules/features/trustee/interfaceFeatureTrustee.py
+++ b/modules/features/trustee/interfaceFeatureTrustee.py
@@ -1121,12 +1121,13 @@ class TrusteeObjects:
logger.warning(f"User {self.userId} lacks permission to delete document")
return False
- # Clear documentId on positions that reference this document
- positions = self.db.getRecordset(TrusteePosition, recordFilter={"documentId": documentId})
- for pos in positions:
- posId = pos.get("id")
- if posId:
- self.db.recordModify(TrusteePosition, posId, {"documentId": None})
+ # Clear documentId or bankDocumentId on positions that reference this document
+ for field in ("documentId", "bankDocumentId"):
+ positions = self.db.getRecordset(TrusteePosition, recordFilter={field: documentId})
+ for pos in positions:
+ posId = pos.get("id")
+ if posId:
+ self.db.recordModify(TrusteePosition, posId, {field: None})
return self.db.recordDelete(TrusteeDocument, documentId)
diff --git a/modules/features/trustee/mainTrustee.py b/modules/features/trustee/mainTrustee.py
index 10d0a95b..1fa1948f 100644
--- a/modules/features/trustee/mainTrustee.py
+++ b/modules/features/trustee/mainTrustee.py
@@ -38,6 +38,11 @@ UI_OBJECTS = [
"label": {"en": "Expense Import", "de": "Spesen Import", "fr": "Import de dépenses"},
"meta": {"area": "expense-import"}
},
+ {
+ "objectKey": "ui.feature.trustee.scan-upload",
+ "label": {"en": "Scan / Upload", "de": "Scannen / Hochladen", "fr": "Scanner / Téléverser"},
+ "meta": {"area": "scan-upload"}
+ },
{
"objectKey": "ui.feature.trustee.settings",
"label": {"en": "Accounting Settings", "de": "Buchhaltungs-Einstellungen", "fr": "Paramètres comptables"},
@@ -189,6 +194,7 @@ TEMPLATE_ROLES = [
{"context": "UI", "item": "ui.feature.trustee.positions", "view": True},
{"context": "UI", "item": "ui.feature.trustee.documents", "view": True},
{"context": "UI", "item": "ui.feature.trustee.expense-import", "view": True},
+ {"context": "UI", "item": "ui.feature.trustee.scan-upload", "view": True},
# Own records only (MY level)
{"context": "DATA", "item": "data.feature.trustee.TrusteePosition", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
{"context": "DATA", "item": "data.feature.trustee.TrusteeDocument", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
diff --git a/modules/features/trustee/routeFeatureTrustee.py b/modules/features/trustee/routeFeatureTrustee.py
index cab6eecb..2161f719 100644
--- a/modules/features/trustee/routeFeatureTrustee.py
+++ b/modules/features/trustee/routeFeatureTrustee.py
@@ -1378,6 +1378,13 @@ async def sync_positions_to_accounting(
raise HTTPException(status_code=400, detail="positionIds required")
results = await bridge.pushBatchToAccounting(instanceId, positionIds)
+ failed = [r for r in results if not r.success]
+ if failed:
+ logger.warning(
+ "Accounting sync had %s failure(s): %s",
+ len(failed),
+ "; ".join(r.errorMessage or "unknown" for r in failed[:3]),
+ )
return {
"total": len(results),
"success": sum(1 for r in results if r.success),
@@ -1400,6 +1407,12 @@ async def sync_single_position_to_accounting(
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
result = await bridge.pushPositionToAccounting(instanceId, positionId)
+ if not result.success:
+ logger.warning(
+ "Accounting sync failed for positionId=%s: %s",
+ positionId,
+ result.errorMessage or "unknown",
+ )
return result.model_dump()
diff --git a/modules/services/serviceSharepoint/mainServiceSharepoint.py b/modules/services/serviceSharepoint/mainServiceSharepoint.py
index dc8717a5..62276a79 100644
--- a/modules/services/serviceSharepoint/mainServiceSharepoint.py
+++ b/modules/services/serviceSharepoint/mainServiceSharepoint.py
@@ -119,6 +119,15 @@ class SharepointService:
error_text = await response.text()
logger.error(f"Graph API call failed: {response.status} - {error_text}")
return {"error": f"API call failed: {response.status} - {error_text}"}
+
+ elif method == "DELETE":
+ async with session.delete(url, headers=headers) as response:
+ if response.status in [200, 204]:
+ return {}
+ else:
+ error_text = await response.text()
+ logger.error(f"Graph API call failed: {response.status} - {error_text}")
+ return {"error": f"API call failed: {response.status} - {error_text}"}
except asyncio.TimeoutError:
logger.error(f"Graph API call timed out after 30 seconds: {endpoint}")
@@ -476,6 +485,22 @@ class SharepointService:
raise Exception(f"Source file not found (404): {sourcePath} - {errorMsg}")
else:
raise Exception(f"Error copying file: {errorMsg}")
+
+ async def deleteFile(self, siteId: str, itemId: str) -> bool:
+ """Delete a file (or folder) from SharePoint by item ID. Returns True on success."""
+ try:
+ if not siteId or not itemId:
+ logger.warning("deleteFile: siteId and itemId are required")
+ return False
+ endpoint = f"sites/{siteId}/drive/items/{itemId}"
+ result = await self._makeGraphApiCall(endpoint, method="DELETE")
+ if result and "error" in result:
+ logger.warning(f"deleteFile failed: {result.get('error')}")
+ return False
+ return True
+ except Exception as e:
+ logger.error(f"Error deleting file: {str(e)}")
+ return False
async def downloadFileByPath(self, siteId: str, filePath: str) -> Optional[bytes]:
"""Download a file by its path within a site."""
diff --git a/modules/workflows/methods/methodSharepoint/actions/__init__.py b/modules/workflows/methods/methodSharepoint/actions/__init__.py
index d59d29aa..6975f8af 100644
--- a/modules/workflows/methods/methodSharepoint/actions/__init__.py
+++ b/modules/workflows/methods/methodSharepoint/actions/__init__.py
@@ -13,7 +13,6 @@ from .findSiteByUrl import findSiteByUrl
from .downloadFileByPath import downloadFileByPath
from .copyFile import copyFile
from .uploadFile import uploadFile
-from .getExpensesFromPdf import getExpensesFromPdf
__all__ = [
'findDocumentPath',
@@ -25,6 +24,5 @@ __all__ = [
'downloadFileByPath',
'copyFile',
'uploadFile',
- 'getExpensesFromPdf',
]
diff --git a/modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py b/modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py
deleted file mode 100644
index 9c873bb2..00000000
--- a/modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py
+++ /dev/null
@@ -1,836 +0,0 @@
-# Copyright (c) 2025 Patrick Motsch
-# All rights reserved.
-
-"""
-Action to extract expenses from PDF documents in SharePoint and save to TrusteePosition.
-
-Process:
-1. Read PDF files from SharePoint folder (max 50 files per execution)
-2. FOR EACH PDF document:
- a. AI call to extract expense data in CSV format
- b. If 0 records: move to "error" folder
- c. Validate/calculate VAT, complete valuta/transactionDateTime
- d. Save all records to TrusteePosition
- e. Move document to "processed" subfolder with timestamp prefix
-"""
-
-import logging
-import time
-import json
-import csv
-import io
-import asyncio
-from datetime import datetime, UTC
-from typing import Dict, Any, List, Optional
-from modules.datamodels.datamodelChat import ActionResult, ActionDocument
-from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
-
-logger = logging.getLogger(__name__)
-
-# Configuration
-MAX_FILES_PER_EXECUTION = 50
-MAX_CONCURRENT_AI_TASKS = 10 # Limit concurrent AI calls to avoid rate limits
-ALLOWED_TAGS = ["customer", "meeting", "license", "subscription", "fuel", "food", "material"]
-RATE_LIMIT_WAIT_SECONDS = 60
-
-
-async def getExpensesFromPdf(self, parameters: Dict[str, Any]) -> ActionResult:
- """
- Extract expenses from PDF documents in SharePoint and save to TrusteePosition.
-
- Parameters:
- - connectionReference (str): Microsoft connection label
- - sharepointFolder (str): SharePoint folder path (e.g., /sites/MySite/Documents/Expenses)
- - featureInstanceId (str): Feature instance ID for TrusteePosition
- - prompt (str): AI prompt for content extraction
-
- Returns:
- ActionResult with success status and processing summary
- """
- operationId = None
- processedDocuments = []
- skippedDocuments = []
- errorDocuments = []
- totalPositions = 0
-
- try:
- # Initialize progress tracking
- workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
- operationId = f"sharepoint_expenses_{workflowId}_{int(time.time())}"
-
- parentOperationId = parameters.get('parentOperationId')
- self.services.chat.progressLogStart(
- operationId,
- "Extract Expenses from PDF",
- "SharePoint PDF Processing",
- "Initializing expense extraction",
- parentOperationId=parentOperationId
- )
-
- # Extract and validate parameters
- connectionReference = parameters.get("connectionReference")
- sharepointFolder = parameters.get("sharepointFolder")
- featureInstanceId = parameters.get("featureInstanceId")
- prompt = parameters.get("prompt")
-
- if not connectionReference:
- self.services.chat.progressLogFinish(operationId, False)
- return ActionResult.isFailure(error="connectionReference is required")
- if not sharepointFolder:
- self.services.chat.progressLogFinish(operationId, False)
- return ActionResult.isFailure(error="sharepointFolder is required")
- if not featureInstanceId:
- self.services.chat.progressLogFinish(operationId, False)
- return ActionResult.isFailure(error="featureInstanceId is required")
- if not prompt:
- self.services.chat.progressLogFinish(operationId, False)
- return ActionResult.isFailure(error="prompt is required")
-
- # Get Microsoft connection
- self.services.chat.progressLogUpdate(operationId, 0.05, "Getting Microsoft connection")
- connection = self.connection.getMicrosoftConnection(connectionReference)
- if not connection:
- self.services.chat.progressLogFinish(operationId, False)
- return ActionResult.isFailure(error="No valid Microsoft connection found")
-
- # Set access token for SharePoint service
- if not self.services.sharepoint.setAccessTokenFromConnection(connection):
- self.services.chat.progressLogFinish(operationId, False)
- return ActionResult.isFailure(error="Failed to set SharePoint access token")
-
- # Find site and folder info
- self.services.chat.progressLogUpdate(operationId, 0.1, "Resolving SharePoint site")
- siteInfo, folderPath = await _resolveSiteAndFolder(self, sharepointFolder)
- if not siteInfo:
- self.services.chat.progressLogFinish(operationId, False)
- return ActionResult.isFailure(error=f"Could not resolve SharePoint site from path: {sharepointFolder}")
-
- siteId = siteInfo.get("id")
-
- # List PDF files in folder
- self.services.chat.progressLogUpdate(operationId, 0.15, "Finding PDF files in folder")
- pdfFiles = await _listPdfFilesInFolder(self, siteId, folderPath)
-
- if not pdfFiles:
- self.services.chat.progressLogFinish(operationId, True)
- return ActionResult.isSuccess(
- documents=[ActionDocument(
- documentName="expense_extraction_result.json",
- documentData=json.dumps({
- "status": "no_documents",
- "message": "No PDF files found in the specified folder",
- "folder": sharepointFolder
- }, indent=2),
- mimeType="application/json",
- validationMetadata={"actionType": "sharepoint.getExpensesFromPdf"}
- )]
- )
-
- # Limit files
- originalFileCount = len(pdfFiles)
- if originalFileCount > MAX_FILES_PER_EXECUTION:
- logger.warning(f"Found {originalFileCount} PDFs, limiting to {MAX_FILES_PER_EXECUTION}")
- pdfFiles = pdfFiles[:MAX_FILES_PER_EXECUTION]
-
- totalFiles = len(pdfFiles)
- progressPerFile = 0.7 / totalFiles
-
- # Get Trustee interface
- from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
- trusteeInterface = getTrusteeInterface(
- self.services.user,
- mandateId=self.services.mandateId,
- featureInstanceId=featureInstanceId
- )
-
- # Process PDFs in parallel with semaphore to limit concurrent AI calls
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_AI_TASKS)
- completedCount = [0] # Use list for mutable reference in closure
-
- async def processSinglePdf(idx: int, pdfFile: Dict[str, Any]) -> Dict[str, Any]:
- """Process a single PDF document. Returns result dict."""
- fileName = pdfFile.get("name", f"file_{idx}")
- fileId = pdfFile.get("id")
-
- async with semaphore:
- # Update progress (thread-safe via asyncio)
- completedCount[0] += 1
- currentProgress = 0.2 + (completedCount[0] * progressPerFile)
- self.services.chat.progressLogUpdate(
- operationId,
- min(currentProgress, 0.9),
- f"Processing {completedCount[0]}/{totalFiles}: {fileName}"
- )
-
- try:
- # Download PDF content
- fileContent = await self.services.sharepoint.downloadFile(siteId, fileId)
- if not fileContent:
- await _moveToErrorFolder(self, siteId, folderPath, fileName)
- return {"type": "error", "file": fileName, "error": "Failed to download", "movedTo": "error/"}
-
- # AI call to extract expense data (this is the bottleneck - parallelized)
- aiResult = await _extractExpensesWithAi(self.services, fileContent, fileName, prompt, featureInstanceId)
-
- if not aiResult.get("success"):
- await _moveToErrorFolder(self, siteId, folderPath, fileName)
- return {"type": "error", "file": fileName, "error": aiResult.get("error", "AI extraction failed"), "movedTo": "error/"}
-
- records = aiResult.get("records", [])
- fileId = aiResult.get("fileId")
-
- # Check for empty records
- if not records:
- logger.warning(f"Document {fileName}: No records extracted, moving to error folder")
- await _moveToErrorFolder(self, siteId, folderPath, fileName)
- return {"type": "skipped", "file": fileName, "reason": "No expense records extracted", "movedTo": "error/"}
-
- # Validate and enrich records
- validatedRecords = _validateAndEnrichRecords(records, fileName)
-
- # Save to TrusteePosition and create Document + Position-Document links
- savedCount = _saveToTrusteePosition(
- trusteeInterface,
- validatedRecords,
- featureInstanceId,
- self.services.mandateId,
- fileId=fileId,
- fileName=fileName,
- sourceLocation=sharepointFolder
- )
-
- # Move document to "processed" subfolder
- timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- newFileName = f"{timestamp}_{fileName}"
-
- moveSuccess = await _moveToProcessedFolder(self, siteId, folderPath, fileName, newFileName)
-
- return {
- "type": "processed",
- "file": fileName,
- "newLocation": f"processed/{newFileName}" if moveSuccess else "move_failed",
- "recordsExtracted": len(validatedRecords),
- "recordsSaved": savedCount
- }
-
- except Exception as e:
- errorMsg = str(e)
- logger.error(f"Error processing {fileName}: {errorMsg}")
-
- # Handle rate limit
- if "429" in errorMsg or "throttl" in errorMsg.lower():
- logger.warning(f"Rate limit hit, waiting {RATE_LIMIT_WAIT_SECONDS} seconds")
- await asyncio.sleep(RATE_LIMIT_WAIT_SECONDS)
-
- await _moveToErrorFolder(self, siteId, folderPath, fileName)
- return {"type": "error", "file": fileName, "error": errorMsg, "movedTo": "error/"}
-
- # Execute all PDF processing tasks in parallel (limited by semaphore)
- logger.info(f"Starting parallel processing of {totalFiles} PDFs (max {MAX_CONCURRENT_AI_TASKS} concurrent)")
- tasks = [processSinglePdf(idx, pdfFile) for idx, pdfFile in enumerate(pdfFiles)]
- results = await asyncio.gather(*tasks, return_exceptions=True)
-
- # Collect results
- for result in results:
- if isinstance(result, Exception):
- errorDocuments.append({"file": "unknown", "error": str(result), "movedTo": "error/"})
- elif result.get("type") == "processed":
- processedDocuments.append(result)
- totalPositions += result.get("recordsSaved", 0)
- elif result.get("type") == "skipped":
- skippedDocuments.append(result)
- elif result.get("type") == "error":
- errorDocuments.append(result)
-
- # Create result summary
- self.services.chat.progressLogUpdate(operationId, 0.95, "Creating result summary")
-
- remainingFiles = max(0, originalFileCount - MAX_FILES_PER_EXECUTION)
-
- resultSummary = {
- "status": "completed",
- "folder": sharepointFolder,
- "featureInstanceId": featureInstanceId,
- "summary": {
- "totalFilesFound": originalFileCount,
- "filesProcessedThisRun": totalFiles,
- "remainingFiles": remainingFiles,
- "successfulDocuments": len(processedDocuments),
- "skippedDocuments": len(skippedDocuments),
- "errorDocuments": len(errorDocuments),
- "totalPositionsSaved": totalPositions
- },
- "processedDocuments": processedDocuments,
- "skippedDocuments": skippedDocuments,
- "errorDocuments": errorDocuments
- }
-
- if remainingFiles > 0:
- resultSummary["note"] = f"{remainingFiles} files remaining for next execution"
-
- self.services.chat.progressLogFinish(operationId, True)
-
- return ActionResult.isSuccess(
- documents=[ActionDocument(
- documentName="expense_extraction_result.json",
- documentData=json.dumps(resultSummary, indent=2),
- mimeType="application/json",
- validationMetadata={
- "actionType": "sharepoint.getExpensesFromPdf",
- "sharepointFolder": sharepointFolder,
- "featureInstanceId": featureInstanceId,
- "totalPositions": totalPositions
- }
- )]
- )
-
- except Exception as e:
- logger.error(f"Error in getExpensesFromPdf: {str(e)}")
- if operationId:
- self.services.chat.progressLogFinish(operationId, False)
- return ActionResult.isFailure(error=str(e))
-
-
-async def _resolveSiteAndFolder(self, sharepointFolder: str) -> tuple:
- """Resolve SharePoint site and folder path from the given path."""
- try:
- # Parse path format: /sites/SiteName/FolderPath
- if sharepointFolder.startswith('/sites/'):
- parts = sharepointFolder[7:].split('/', 1) # Remove '/sites/' prefix
- if len(parts) >= 1:
- siteName = parts[0]
- folderPath = parts[1] if len(parts) > 1 else ""
-
- # Try to find site by name
- sites, _ = await self.siteDiscovery.resolveSitesFromPathQuery(sharepointFolder)
- if sites:
- return sites[0], folderPath
-
- # Fallback: try to resolve via siteDiscovery
- sites, _ = await self.siteDiscovery.resolveSitesFromPathQuery(sharepointFolder)
- if sites:
- return sites[0], ""
-
- return None, None
-
- except Exception as e:
- logger.error(f"Error resolving site and folder: {str(e)}")
- return None, None
-
-
-async def _listPdfFilesInFolder(self, siteId: str, folderPath: str) -> List[Dict[str, Any]]:
- """List PDF files in the given folder."""
- try:
- import urllib.parse
-
- # Build endpoint
- if not folderPath or folderPath == "/":
- endpoint = f"sites/{siteId}/drive/root/children"
- else:
- cleanPath = folderPath.strip('/')
- encodedPath = urllib.parse.quote(cleanPath, safe='/')
- endpoint = f"sites/{siteId}/drive/root:/{encodedPath}:/children"
-
- result = await self.apiClient.makeGraphApiCall(endpoint)
-
- if "error" in result:
- logger.error(f"Error listing folder: {result['error']}")
- return []
-
- items = result.get("value", [])
-
- # Filter for PDF files only
- pdfFiles = []
- for item in items:
- name = item.get("name", "")
- if name.lower().endswith('.pdf') and "file" in item:
- pdfFiles.append({
- "id": item.get("id"),
- "name": name,
- "size": item.get("size", 0),
- "webUrl": item.get("webUrl"),
- "lastModifiedDateTime": item.get("lastModifiedDateTime")
- })
-
- logger.info(f"Found {len(pdfFiles)} PDF files in folder")
- return pdfFiles
-
- except Exception as e:
- logger.error(f"Error listing PDF files: {str(e)}")
- return []
-
-
-async def _extractExpensesWithAi(services, fileContent: bytes, fileName: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]:
- """
- Call AI service to extract expense data from PDF content.
- Uses the full AI service pipeline which handles:
- - Document extraction (text + images)
- - Intent analysis
- - Chunking for large documents
- - Vision processing for images
- """
- try:
- import uuid
-
- # Ensure AI is initialized
- await services.ai.ensureAiObjectsInitialized()
-
- # Step 1: Store file temporarily in database so AI service can access it
- from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
- from modules.datamodels.datamodelChat import ChatDocument
- from modules.datamodels.datamodelDocref import DocumentReferenceList
-
- dbInterface = getDbInterface(services.user, mandateId=services.mandateId, featureInstanceId=featureInstanceId)
-
- # Create file record
- fileItem = dbInterface.createFile(
- name=fileName,
- mimeType="application/pdf",
- content=fileContent
- )
-
- # Store file data
- dbInterface.createFileData(fileItem.id, fileContent)
-
- logger.info(f"Stored PDF {fileName} ({len(fileContent)} bytes) with fileId: {fileItem.id}")
-
- # Step 2: Create ChatDocument referencing the file
- documentId = str(uuid.uuid4())
- chatDocument = ChatDocument(
- id=documentId,
- mandateId=services.mandateId or "",
- featureInstanceId=featureInstanceId or "",
- messageId="", # Will be set when attached to message
- fileId=fileItem.id,
- fileName=fileName,
- fileSize=len(fileContent),
- mimeType="application/pdf"
- )
-
- # Step 3: Create a proper message with the document attached to the workflow
- # This ensures getChatDocumentsFromDocumentList can find the document via workflow.messages
- messageData = {
- "id": f"msg_expense_import_{str(uuid.uuid4())[:8]}",
- "documentsLabel": f"expense_pdf_{fileName}",
- "role": "user",
- "status": "step",
- "message": f"PDF document for expense extraction: {fileName}"
- }
-
- # Use storeMessageWithDocuments to properly create message + document and sync with workflow
- createdMessage = services.chat.storeMessageWithDocuments(
- services.workflow,
- messageData,
- [chatDocument.model_dump()]
- )
-
- # Update documentId to match the created document's actual ID
- if createdMessage and createdMessage.documents:
- documentId = createdMessage.documents[0].id
-
- logger.info(f"Created message {createdMessage.id} with ChatDocument {documentId} for AI processing")
-
- # Step 4: Create DocumentReferenceList for AI service
- from modules.datamodels.datamodelDocref import DocumentItemReference
- documentList = DocumentReferenceList(
- references=[
- DocumentItemReference(
- documentId=documentId,
- fileName=fileName
- )
- ]
- )
-
- # Step 5: Call AI with documentList - let AI service handle everything
- # (extraction, intent analysis, chunking, image processing)
- # Use DATA_GENERATE (same path as ai.process) which handles chunking correctly
- options = AiCallOptions(
- resultFormat="csv",
- operationType=OperationTypeEnum.DATA_GENERATE
- )
-
- aiResponse = await services.ai.callAiContent(
- prompt=prompt,
- options=options,
- documentList=documentList,
- contentParts=None, # Let AI service extract from documents
- outputFormat="csv",
- generationIntent="extract" # Signal this is extraction, not document generation
- )
-
- if not aiResponse:
- return {"success": False, "error": "AI returned empty response"}
-
- # Get CSV from rendered documents (not from content - that's the internal structure)
- if not aiResponse.documents or len(aiResponse.documents) == 0:
- return {"success": False, "error": "AI returned no documents"}
-
- # Get the CSV content from the first document
- csvDocument = aiResponse.documents[0]
- csvContent = csvDocument.documentData
-
- # documentData is bytes, decode to string
- if isinstance(csvContent, bytes):
- csvContent = csvContent.decode('utf-8')
-
- logger.info(f"Retrieved CSV content ({len(csvContent)} chars) from rendered document: {csvDocument.documentName}")
-
- records = _parseCsvToRecords(csvContent)
-
- # Return fileId so it can be used to create TrusteeDocument reference
- return {"success": True, "records": records, "fileId": fileItem.id}
-
- except Exception as e:
- logger.error(f"AI extraction error for {fileName}: {str(e)}")
- return {"success": False, "error": str(e)}
-
-
-def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
- """Parse CSV content to list of expense records."""
- records = []
- try:
- content = csvContent.strip()
-
- # Clean up CSV content - remove markdown code blocks if present
- if content.startswith("```"):
- lines = content.split('\n')
- # Remove first and last line if they're code block markers
- if lines[0].startswith("```"):
- lines = lines[1:]
- if lines and lines[-1].strip() == "```":
- lines = lines[:-1]
- content = '\n'.join(lines)
-
- reader = csv.DictReader(io.StringIO(content))
- for row in reader:
- # Clean up keys (remove whitespace)
- cleanedRow = {k.strip(): v.strip() if isinstance(v, str) else v for k, v in row.items()}
- records.append(cleanedRow)
-
- logger.info(f"Parsed {len(records)} records from CSV content")
-
- except Exception as e:
- logger.error(f"Error parsing CSV: {str(e)}")
-
- return records
-
-
-def _validateAndEnrichRecords(records: List[Dict[str, Any]], sourceFileName: str) -> List[Dict[str, Any]]:
- """
- Validate and enrich expense records:
- 1. Calculate/correct VAT amount
- 2. Complete valuta/transactionDateTime if one is missing
- 3. Validate tags
- """
- enrichedRecords = []
-
- for record in records:
- enriched = record.copy()
-
- # VAT calculation/validation
- vatPercentage = _parseFloat(record.get("vatPercentage", 0))
- vatAmount = _parseFloat(record.get("vatAmount", 0))
- bookingAmount = _parseFloat(record.get("bookingAmount", 0))
-
- if vatPercentage > 0 and bookingAmount > 0:
- # Calculate expected VAT amount (VAT is included in bookingAmount)
- expectedVat = bookingAmount * vatPercentage / (100 + vatPercentage)
-
- # If vatAmount is missing or significantly different, recalculate
- if vatAmount == 0 or abs(vatAmount - expectedVat) > 0.01:
- enriched["vatAmount"] = round(expectedVat, 2)
- logger.debug(f"VAT amount corrected: {vatAmount} -> {enriched['vatAmount']}")
-
- # Valuta / transactionDateTime completion
- valuta = record.get("valuta")
- transactionDateTime = record.get("transactionDateTime")
-
- if valuta and not transactionDateTime:
- try:
- dt = datetime.strptime(str(valuta).strip(), "%Y-%m-%d")
- enriched["transactionDateTime"] = dt.replace(hour=12).timestamp()
- except:
- pass
- elif transactionDateTime and not valuta:
- try:
- ts = float(transactionDateTime)
- dt = datetime.fromtimestamp(ts, UTC)
- enriched["valuta"] = dt.strftime("%Y-%m-%d")
- except:
- pass
-
- # Validate tags
- tags = record.get("tags", "")
- if tags:
- tagList = [t.strip().lower() for t in str(tags).split(",")]
- validTags = [t for t in tagList if t in ALLOWED_TAGS]
- enriched["tags"] = ",".join(validTags)
-
- # Store source file info in description
- existingDesc = record.get("desc", "")
- if sourceFileName and sourceFileName not in str(existingDesc):
- enriched["desc"] = f"[Source: {sourceFileName}]\n{existingDesc}"
-
- enrichedRecords.append(enriched)
-
- return enrichedRecords
-
-
-def _parseFloat(value) -> float:
- """Safely parse float value."""
- try:
- if value is None or value == "":
- return 0.0
- return float(value)
- except (ValueError, TypeError):
- return 0.0
-
-
-def _saveToTrusteePosition(
- trusteeInterface,
- records: List[Dict[str, Any]],
- featureInstanceId: str,
- mandateId: str,
- fileId: Optional[str] = None,
- fileName: Optional[str] = None,
- sourceLocation: Optional[str] = None
-) -> int:
- """
- Save validated records to TrusteePosition table.
- Also creates TrusteeDocument (referencing the source file) and links positions to it.
-
- Args:
- trusteeInterface: Trustee interface instance
- records: List of expense records to save
- featureInstanceId: Feature instance ID
- mandateId: Mandate ID
- fileId: Optional file ID from central Files table (source PDF)
- fileName: Optional file name
- sourceLocation: Optional source location (e.g., SharePoint path)
-
- Returns:
- Number of positions saved
- """
- savedCount = 0
- savedPositionIds = []
-
- # Step 1: Create TrusteeDocument referencing the source file
- documentId = None
- if fileId and fileName:
- try:
- document = trusteeInterface.createDocument({
- "fileId": fileId,
- "documentName": fileName,
- "documentMimeType": "application/pdf",
- "sourceType": "sharepoint",
- "sourceLocation": sourceLocation
- })
- if document:
- documentId = document.id
- logger.info(f"Created TrusteeDocument {documentId} referencing file {fileId}")
- else:
- logger.warning(f"Failed to create TrusteeDocument for file {fileId}")
- except Exception as e:
- logger.error(f"Error creating TrusteeDocument: {str(e)}")
-
- # Step 2: Save positions with direct documentId reference and accounting fields
- for record in records:
- try:
- position = {
- "documentId": documentId,
- "valuta": record.get("valuta"),
- "transactionDateTime": record.get("transactionDateTime"),
- "company": record.get("company", ""),
- "desc": record.get("desc", ""),
- "tags": record.get("tags", ""),
- "bookingCurrency": record.get("bookingCurrency", "CHF"),
- "bookingAmount": _parseFloat(record.get("bookingAmount", 0)),
- "originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"),
- "originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)),
- "vatPercentage": _parseFloat(record.get("vatPercentage", 0)),
- "vatAmount": _parseFloat(record.get("vatAmount", 0)),
- "debitAccountNumber": record.get("debitAccountNumber") or None,
- "creditAccountNumber": record.get("creditAccountNumber") or None,
- "taxCode": record.get("taxCode") or None,
- "costCenter": record.get("costCenter") or None,
- "bookingReference": record.get("bookingReference") or None,
- "featureInstanceId": featureInstanceId,
- "mandateId": mandateId
- }
-
- result = trusteeInterface.createPosition(position)
- if result:
- savedCount += 1
- savedPositionIds.append(result.id)
- logger.debug(f"Saved position: {position.get('company')} - {position.get('bookingAmount')}")
-
- except Exception as e:
- logger.error(f"Failed to save position: {str(e)}")
-
- # Step 3: Auto-sync to accounting system if configured
- if savedCount > 0 and savedPositionIds:
- try:
- from modules.features.trustee.accounting.accountingBridge import AccountingBridge
- bridge = AccountingBridge(trusteeInterface)
- configRecord = await bridge.getActiveConfig(featureInstanceId)
- if configRecord:
- syncResults = await bridge.pushBatchToAccounting(featureInstanceId, savedPositionIds)
- syncedCount = sum(1 for r in syncResults if r.success)
- logger.info(f"Auto-synced {syncedCount}/{len(savedPositionIds)} positions to accounting system")
- except Exception as e:
- logger.warning(f"Accounting auto-sync skipped (non-critical): {e}")
-
- return savedCount
-
-
-async def _ensureFolderExists(self, siteId: str, folderPath: str) -> bool:
- """Create folder if it doesn't exist."""
- try:
- import urllib.parse
-
- # Check if folder exists
- cleanPath = folderPath.strip('/')
- encodedPath = urllib.parse.quote(cleanPath, safe='/')
- checkEndpoint = f"sites/{siteId}/drive/root:/{encodedPath}"
-
- result = await self.apiClient.makeGraphApiCall(checkEndpoint)
-
- if "error" not in result:
- return True # Folder exists
-
- # Create folder - need to create parent first if nested
- pathParts = cleanPath.split('/')
- currentPath = ""
-
- for part in pathParts:
- parentPath = currentPath if currentPath else "root"
- currentPath = f"{currentPath}/{part}" if currentPath else part
-
- # Check if this level exists
- checkPath = urllib.parse.quote(currentPath, safe='/')
- checkResult = await self.apiClient.makeGraphApiCall(f"sites/{siteId}/drive/root:/{checkPath}")
-
- if "error" in checkResult:
- # Create this folder
- if parentPath == "root":
- createEndpoint = f"sites/{siteId}/drive/root/children"
- else:
- encodedParent = urllib.parse.quote(parentPath, safe='/')
- createEndpoint = f"sites/{siteId}/drive/root:/{encodedParent}:/children"
-
- createData = json.dumps({
- "name": part,
- "folder": {},
- "@microsoft.graph.conflictBehavior": "fail"
- }).encode('utf-8')
-
- createResult = await self.apiClient.makeGraphApiCall(createEndpoint, method="POST", data=createData)
-
- if "error" in createResult:
- logger.warning(f"Failed to create folder {part}: {createResult['error']}")
- return False
-
- logger.info(f"Created folder: {currentPath}")
-
- return True
-
- except Exception as e:
- logger.error(f"Failed to ensure folder exists: {str(e)}")
- return False
-
-
-async def _moveToProcessedFolder(self, siteId: str, sourceFolderPath: str, sourceFileName: str, destFileName: str) -> bool:
- """Move processed PDF to 'processed' subfolder."""
- try:
- # Build processed folder path
- cleanSource = sourceFolderPath.strip('/')
- processedFolder = f"{cleanSource}/processed" if cleanSource else "processed"
-
- # Ensure processed folder exists
- await _ensureFolderExists(self, siteId, processedFolder)
-
- # Copy file to new location
- await self.services.sharepoint.copyFileAsync(
- siteId=siteId,
- sourceFolder=cleanSource if cleanSource else "/",
- sourceFile=sourceFileName,
- destFolder=processedFolder,
- destFile=destFileName
- )
-
- # Delete original file
- await _deleteFile(self, siteId, sourceFolderPath, sourceFileName)
-
- logger.info(f"Moved {sourceFileName} to processed/{destFileName}")
- return True
-
- except Exception as e:
- logger.error(f"Failed to move file to processed: {str(e)}")
- return False
-
-
-async def _moveToErrorFolder(self, siteId: str, sourceFolderPath: str, sourceFileName: str) -> bool:
- """Move failed PDF to 'error' subfolder (filename unchanged)."""
- try:
- # Build error folder path
- cleanSource = sourceFolderPath.strip('/')
- errorFolder = f"{cleanSource}/error" if cleanSource else "error"
-
- # Ensure error folder exists
- await _ensureFolderExists(self, siteId, errorFolder)
-
- # Copy file to error folder (keep original name)
- await self.services.sharepoint.copyFileAsync(
- siteId=siteId,
- sourceFolder=cleanSource if cleanSource else "/",
- sourceFile=sourceFileName,
- destFolder=errorFolder,
- destFile=sourceFileName # Same filename
- )
-
- # Delete original file
- await _deleteFile(self, siteId, sourceFolderPath, sourceFileName)
-
- logger.info(f"Moved {sourceFileName} to error/")
- return True
-
- except Exception as e:
- logger.error(f"Failed to move file to error folder: {str(e)}")
- return False
-
-
-async def _deleteFile(self, siteId: str, folderPath: str, fileName: str) -> bool:
- """Delete file from SharePoint."""
- try:
- import urllib.parse
-
- cleanPath = folderPath.strip('/')
- filePath = f"{cleanPath}/{fileName}" if cleanPath else fileName
- encodedPath = urllib.parse.quote(filePath, safe='/')
-
- endpoint = f"sites/{siteId}/drive/root:/{encodedPath}"
-
- # Get file ID first
- fileInfo = await self.apiClient.makeGraphApiCall(endpoint)
- if "error" in fileInfo:
- logger.warning(f"File not found for deletion: {filePath}")
- return False
-
- fileId = fileInfo.get("id")
- if not fileId:
- return False
-
- # Delete by ID using apiClient
- deleteEndpoint = f"sites/{siteId}/drive/items/{fileId}"
- result = await self.apiClient.makeGraphApiCall(deleteEndpoint, method="DELETE")
-
- if "error" in result:
- logger.warning(f"Delete failed: {result['error']}")
- return False
-
- logger.debug(f"Deleted file: {filePath}")
- return True
-
- except Exception as e:
- logger.error(f"Failed to delete file: {str(e)}")
- return False
diff --git a/modules/workflows/methods/methodSharepoint/methodSharepoint.py b/modules/workflows/methods/methodSharepoint/methodSharepoint.py
index 9ca4c3e5..d187e438 100644
--- a/modules/workflows/methods/methodSharepoint/methodSharepoint.py
+++ b/modules/workflows/methods/methodSharepoint/methodSharepoint.py
@@ -28,8 +28,6 @@ from .actions.findSiteByUrl import findSiteByUrl
from .actions.downloadFileByPath import downloadFileByPath
from .actions.copyFile import copyFile
from .actions.uploadFile import uploadFile
-from .actions.getExpensesFromPdf import getExpensesFromPdf
-
logger = logging.getLogger(__name__)
class MethodSharepoint(MethodBase):
@@ -378,42 +376,6 @@ class MethodSharepoint(MethodBase):
)
},
execute=uploadFile.__get__(self, self.__class__)
- ),
- "getExpensesFromPdf": WorkflowActionDefinition(
- actionId="sharepoint.getExpensesFromPdf",
- description="Extract expenses from PDF documents in SharePoint folder and save to TrusteePosition",
- dynamicMode=False, # Not for dynamic workflow
- parameters={
- "connectionReference": WorkflowActionParameter(
- name="connectionReference",
- type="str",
- frontendType=FrontendType.USER_CONNECTION,
- required=True,
- description="Microsoft connection label for SharePoint access"
- ),
- "sharepointFolder": WorkflowActionParameter(
- name="sharepointFolder",
- type="str",
- frontendType=FrontendType.TEXT,
- required=True,
- description="SharePoint folder path containing PDF expense documents (e.g., /sites/MySite/Documents/Expenses)"
- ),
- "featureInstanceId": WorkflowActionParameter(
- name="featureInstanceId",
- type="str",
- frontendType=FrontendType.TEXT,
- required=True,
- description="Feature Instance ID for the Trustee feature where positions will be stored"
- ),
- "prompt": WorkflowActionParameter(
- name="prompt",
- type="str",
- frontendType=FrontendType.TEXTAREA,
- required=True,
- description="AI prompt for extracting expense data from PDF content"
- )
- },
- execute=getExpensesFromPdf.__get__(self, self.__class__)
)
}
@@ -430,5 +392,4 @@ class MethodSharepoint(MethodBase):
self.downloadFileByPath = downloadFileByPath.__get__(self, self.__class__)
self.copyFile = copyFile.__get__(self, self.__class__)
self.uploadFile = uploadFile.__get__(self, self.__class__)
- self.getExpensesFromPdf = getExpensesFromPdf.__get__(self, self.__class__)
diff --git a/modules/workflows/methods/methodTrustee/__init__.py b/modules/workflows/methods/methodTrustee/__init__.py
new file mode 100644
index 00000000..fa7acc95
--- /dev/null
+++ b/modules/workflows/methods/methodTrustee/__init__.py
@@ -0,0 +1,7 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Trustee document and expense workflow method (extract, process, sync to accounting)."""
+
+from .methodTrustee import MethodTrustee
+
+__all__ = ["MethodTrustee"]
diff --git a/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py b/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py
new file mode 100644
index 00000000..fe2379bd
--- /dev/null
+++ b/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py
@@ -0,0 +1,304 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Extract document type and structured data from files (PDF, JPG).
+Input: fileIds (list) OR connectionReference + sharepointFolder.
+Output: ActionResult with one ActionDocument per file: { documentType, extractedData, fileId, fileName }, resultLabel.
+"""
+
+import asyncio
+import json
+import logging
+import uuid
+import csv
+import io
+from datetime import datetime, timezone
+from typing import Dict, Any, List, Optional
+
+from modules.datamodels.datamodelChat import ActionResult, ActionDocument, ChatDocument
+from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
+from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
+
+logger = logging.getLogger(__name__)
+
+ALLOWED_EXTENSIONS = (".pdf", ".jpg", ".jpeg")
+MAX_FILES = 50
+
+
+def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
+ """Parse CSV content to list of expense records."""
+ records = []
+ try:
+ content = (csvContent or "").strip()
+ if content.startswith("```"):
+ lines = content.split("\n")
+ if lines and lines[0].startswith("```"):
+ lines = lines[1:]
+ if lines and lines[-1].strip() == "```":
+ lines = lines[:-1]
+ content = "\n".join(lines)
+ reader = csv.DictReader(io.StringIO(content))
+ for row in reader:
+ cleaned = {k.strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()}
+ records.append(cleaned)
+ except Exception as e:
+ logger.warning(f"Parse CSV: {e}")
+ return records
+
+
+async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str, mimeType: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]:
+ """Run AI extraction on one file; return { documentType, extractedData (records), fileId, fileName }."""
+ await self.services.ai.ensureAiObjectsInitialized()
+ from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
+
+ docList = DocumentReferenceList(
+ references=[DocumentItemReference(documentId=chatDocumentId, fileName=fileName)]
+ )
+ # Prefer JSON for documentType + records in one response; fallback to CSV
+ options = AiCallOptions(resultFormat="json", operationType=OperationTypeEnum.DATA_GENERATE)
+ try:
+ aiResponse = await self.services.ai.callAiContent(
+ prompt=prompt or "Extract document type (one of: INVOICE, EXPENSE_RECEIPT, BANK_DOCUMENT, CONTRACT, UNKNOWN) and expense/position records. Return JSON: {\"documentType\": \"...\", \"records\": [{...}]}.",
+ options=options,
+ documentList=docList,
+ contentParts=None,
+ outputFormat="json",
+ generationIntent="extract",
+ )
+ except Exception:
+ options = AiCallOptions(resultFormat="csv", operationType=OperationTypeEnum.DATA_GENERATE)
+ aiResponse = await self.services.ai.callAiContent(
+ prompt=prompt or "Extract expense data from this document. Return CSV with columns: company, desc, valuta, bookingAmount, bookingCurrency, vatPercentage, vatAmount, tags.",
+ options=options,
+ documentList=docList,
+ contentParts=None,
+ outputFormat="csv",
+ generationIntent="extract",
+ )
+
+ if not aiResponse or not aiResponse.documents:
+ return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName}
+
+ doc = aiResponse.documents[0]
+ raw = doc.documentData
+ if isinstance(raw, bytes):
+ raw = raw.decode("utf-8")
+
+ documentType = "UNKNOWN"
+ records = []
+
+ # Try JSON first
+ try:
+ if raw.strip().startswith("{"):
+ data = json.loads(raw)
+ documentType = (data.get("documentType") or "UNKNOWN").upper().replace(" ", "_")
+ records = data.get("records") or data.get("extractedData") or []
+ except Exception:
+ pass
+
+ # Fallback: CSV
+ if not records and raw:
+ records = _parseCsvToRecords(raw)
+ if records and not documentType or documentType == "UNKNOWN":
+ documentType = "EXPENSE_RECEIPT"
+
+ return {"documentType": documentType, "extractedData": records, "fileId": fileId, "fileName": fileName} # fileId from caller for result
+
+
+async def _extractOne(
+ self,
+ f: Dict[str, Any],
+ fileIdToChatDocId: Dict[str, str],
+ prompt: str,
+ featureInstanceId: str,
+) -> ActionDocument:
+ """Run extraction for one file; returns success or error ActionDocument (never raises)."""
+ chatDocId = fileIdToChatDocId.get(f["fileId"])
+ if not chatDocId:
+ return ActionDocument(
+ documentName=(f.get("fileName") or "error") + ".json",
+ documentData=json.dumps({
+ "documentType": "UNKNOWN",
+ "extractedData": [],
+ "fileId": f["fileId"],
+ "fileName": f.get("fileName"),
+ "error": "No ChatDocument id for file",
+ }),
+ mimeType="application/json",
+ )
+ try:
+ out = await _extractWithAi(
+ self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], prompt, featureInstanceId
+ )
+ return ActionDocument(
+ documentName=f.get("fileName", "extract") + ".json",
+ documentData=json.dumps(out),
+ mimeType="application/json",
+ )
+ except Exception as e:
+ logger.exception(f"Extract failed for {f.get('fileName')}")
+ return ActionDocument(
+ documentName=(f.get("fileName") or "error") + ".json",
+ documentData=json.dumps({
+ "documentType": "UNKNOWN",
+ "extractedData": [],
+ "fileId": f["fileId"],
+ "fileName": f.get("fileName"),
+ "error": str(e),
+ }),
+ mimeType="application/json",
+ )
+
+
+async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult:
+ """
+ Extract document type and data from files.
+ Either fileIds (list of file IDs already in DB) or connectionReference + sharepointFolder (list PDF/JPG, download, store in DB).
+ Returns one ActionDocument per file with documentData = JSON { documentType, extractedData, fileId, fileName }.
+ """
+ fileIds = parameters.get("fileIds") or []
+ connectionReference = parameters.get("connectionReference")
+ sharepointFolder = parameters.get("sharepointFolder")
+ featureInstanceId = parameters.get("featureInstanceId") or getattr(self.services, "featureInstanceId", None)
+ prompt = parameters.get("prompt") or ""
+
+ if not featureInstanceId:
+ return ActionResult.isFailure(error="featureInstanceId is required")
+
+ filesToProcess = [] # list of { fileId, fileName, mimeType }
+ sharepointMoveInfo: List[Optional[Dict[str, Any]]] = [] # one entry per file; None if not from SharePoint
+
+ if fileIds:
+ from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
+ db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId)
+ for fid in (fileIds if isinstance(fileIds, list) else [fileIds]):
+ if not fid:
+ continue
+ rec = db.getFile(fid) if hasattr(db, "getFile") else None
+ if rec:
+ fileId = rec.id if hasattr(rec, "id") else rec.get("id", fid)
+ fileName = getattr(rec, "fileName", None) or rec.get("fileName", rec.get("name", "document"))
+ mimeType = getattr(rec, "mimeType", None) or rec.get("mimeType", "application/octet-stream")
+ filesToProcess.append({"fileId": fileId, "fileName": fileName, "mimeType": mimeType})
+ else:
+ filesToProcess.append({"fileId": fid, "fileName": "document", "mimeType": "application/octet-stream"})
+ sharepointMoveInfo.append(None)
+ elif connectionReference and sharepointFolder:
+ userConn = self.services.chat.getUserConnectionFromConnectionReference(connectionReference)
+ if not userConn:
+ return ActionResult.isFailure(error="No Microsoft connection for connectionReference")
+ if not self.services.sharepoint.setAccessTokenFromConnection(userConn):
+ return ActionResult.isFailure(error="Failed to set SharePoint access token")
+ sites = await self.services.sharepoint.resolveSitesFromPathQuery(sharepointFolder)
+ if not sites:
+ return ActionResult.isFailure(error="No SharePoint site found for path")
+ siteId = sites[0].get("id")
+ if not siteId:
+ return ActionResult.isFailure(error="SharePoint site has no id")
+ parsed = self.services.sharepoint.extractSiteFromStandardPath(sharepointFolder)
+ folderPath = (parsed.get("innerPath") or "").strip() if parsed else ""
+ items = await self.services.sharepoint.listFolderContents(siteId, folderPath) or []
+ from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
+ db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId)
+ for item in items[:MAX_FILES]:
+ if item.get("type") != "file":
+ continue
+ name = (item.get("name") or "").lower()
+ if not any(name.endswith(ext) for ext in ALLOWED_EXTENSIONS):
+ continue
+ content = await self.services.sharepoint.downloadFile(siteId, item.get("id"))
+ if not content:
+ continue
+ mime = "application/pdf" if name.endswith(".pdf") else "image/jpeg"
+ fileItem = db.createFile(name=item.get("name", "file"), mimeType=mime, content=content)
+ if fileItem:
+ db.createFileData(fileItem.id, content)
+ filesToProcess.append({"fileId": fileItem.id, "fileName": item.get("name", "file"), "mimeType": mime})
+ sharepointMoveInfo.append({
+ "siteId": siteId,
+ "folderPath": folderPath,
+ "fileName": item.get("name", "file"),
+ "itemId": item.get("id"),
+ })
+ else:
+ return ActionResult.isFailure(error="Provide fileIds or connectionReference + sharepointFolder")
+
+ if not filesToProcess:
+ return ActionResult.isSuccess(documents=[])
+
+ # Attach all files as ChatDocuments to the workflow so AI can resolve them
+ chatDocDumps = []
+ for f in filesToProcess:
+ chatDoc = ChatDocument(
+ id=str(uuid.uuid4()),
+ mandateId=self.services.mandateId or "",
+ featureInstanceId=featureInstanceId or "",
+ messageId="",
+ fileId=f["fileId"],
+ fileName=f["fileName"],
+ fileSize=0,
+ mimeType=f["mimeType"],
+ )
+ chatDocDumps.append(chatDoc.model_dump())
+ messageData = {
+ "id": f"msg_extract_{uuid.uuid4().hex[:12]}",
+ "documentsLabel": "extract_files",
+ "role": "user",
+ "status": "step",
+ "message": f"Extract from {len(filesToProcess)} file(s)",
+ }
+ createdMessage = self.services.chat.storeMessageWithDocuments(
+ self.services.workflow,
+ messageData,
+ chatDocDumps,
+ )
+ if not createdMessage or not createdMessage.documents:
+ return ActionResult.isFailure(error="Failed to attach documents to workflow")
+ # Map fileId -> ChatDocument id for AI reference
+ fileIdToChatDocId = {}
+ for i, f in enumerate(filesToProcess):
+ if i < len(createdMessage.documents):
+ fileIdToChatDocId[f["fileId"]] = createdMessage.documents[i].id
+
+ # Parallel extraction (all files at once)
+ tasks = [
+ _extractOne(self, f, fileIdToChatDocId, prompt, featureInstanceId)
+ for f in filesToProcess
+ ]
+ resultDocuments = list(await asyncio.gather(*tasks))
+
+ # Move SharePoint files to processed/ or error/ (parallel)
+ if sharepointMoveInfo and len(sharepointMoveInfo) == len(resultDocuments):
+ sharepoint = self.services.sharepoint
+
+ async def _moveOneFile(moveInfo: Dict[str, Any], resultDoc: ActionDocument) -> None:
+ try:
+ raw = resultDoc.documentData
+ data = json.loads(raw) if isinstance(raw, str) else raw
+ hasError = "error" in data or not data.get("extractedData")
+ destSub = "error" if hasError else "processed"
+ folderPath = (moveInfo.get("folderPath") or "").strip().rstrip("/")
+ destFolder = f"{folderPath}/{destSub}".strip("/") if folderPath else destSub
+ sourceFolder = folderPath or ""
+ fileName = moveInfo.get("fileName") or "file"
+ destFile = (
+ f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{fileName}"
+ if not hasError
+ else fileName
+ )
+ await sharepoint.copyFileAsync(
+ moveInfo["siteId"], sourceFolder, fileName, destFolder, destFile
+ )
+ await sharepoint.deleteFile(moveInfo["siteId"], moveInfo["itemId"])
+ except Exception as e:
+ logger.warning(f"Move SharePoint file failed for {moveInfo.get('fileName', '?')}: {e}")
+
+ moveTasks = [
+ _moveOneFile(sharepointMoveInfo[i], resultDocuments[i])
+ for i in range(len(sharepointMoveInfo))
+ if sharepointMoveInfo[i] is not None
+ ]
+ if moveTasks:
+ await asyncio.gather(*moveTasks, return_exceptions=True)
+
+ return ActionResult.isSuccess(documents=resultDocuments)
diff --git a/modules/workflows/methods/methodTrustee/actions/processDocuments.py b/modules/workflows/methods/methodTrustee/actions/processDocuments.py
new file mode 100644
index 00000000..f5c2a868
--- /dev/null
+++ b/modules/workflows/methods/methodTrustee/actions/processDocuments.py
@@ -0,0 +1,134 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Process extracted documents: create TrusteeDocument + TrusteePosition from extraction JSON.
+Input: documentList (reference to extractFromFiles result).
+Each document is JSON with documentType, extractedData, fileId, fileName.
+extractedData is a list of expense/position records.
+Output: one ActionDocument with JSON { positionIds, documentIds } for chaining to syncToAccounting.
+"""
+
+import json
+import logging
+from typing import Dict, Any, List, Optional
+
+from modules.datamodels.datamodelChat import ActionResult, ActionDocument
+from modules.datamodels.datamodelDocref import DocumentReferenceList
+
+logger = logging.getLogger(__name__)
+
+
+def _parseFloat(value) -> float:
+ try:
+ if value is None or value == "":
+ return 0.0
+ return float(value)
+ except (ValueError, TypeError):
+ return 0.0
+
+
+def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], featureInstanceId: str, mandateId: str) -> Dict[str, Any]:
+ """Map extraction record to TrusteePosition payload."""
+ return {
+ "documentId": documentId,
+ "valuta": record.get("valuta"),
+ "transactionDateTime": record.get("transactionDateTime"),
+ "company": record.get("company", ""),
+ "desc": record.get("desc", ""),
+ "tags": record.get("tags", ""),
+ "bookingCurrency": record.get("bookingCurrency", "CHF"),
+ "bookingAmount": _parseFloat(record.get("bookingAmount", 0)),
+ "originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"),
+ "originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)),
+ "vatPercentage": _parseFloat(record.get("vatPercentage", 0)),
+ "vatAmount": _parseFloat(record.get("vatAmount", 0)),
+ "debitAccountNumber": record.get("debitAccountNumber") or None,
+ "creditAccountNumber": record.get("creditAccountNumber") or None,
+ "taxCode": record.get("taxCode") or None,
+ "costCenter": record.get("costCenter") or None,
+ "bookingReference": record.get("bookingReference") or None,
+ "featureInstanceId": featureInstanceId,
+ "mandateId": mandateId,
+ }
+
+
+async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
+ """
+ Resolve documentList to ChatDocuments, load extraction JSON per document,
+ create TrusteeDocument (with documentType) + TrusteePosition(s), return one JSON document with positionIds/documentIds.
+ """
+ documentListParam = parameters.get("documentList")
+ featureInstanceId = parameters.get("featureInstanceId") or (getattr(self.services, "featureInstanceId", None))
+
+ if not documentListParam:
+ return ActionResult.isFailure(error="documentList is required (reference to extractFromFiles result)")
+ if not featureInstanceId:
+ return ActionResult.isFailure(error="featureInstanceId is required")
+
+ try:
+ docList = DocumentReferenceList.from_string_list(
+ documentListParam if isinstance(documentListParam, list) else [documentListParam]
+ )
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
+ if not chatDocuments:
+ return ActionResult.isFailure(error="No documents found for documentList")
+
+ from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
+
+ trusteeInterface = getTrusteeInterface(
+ self.services.user,
+ mandateId=self.services.mandateId,
+ featureInstanceId=featureInstanceId
+ )
+
+ allPositionIds = []
+ allDocumentIds = []
+
+ for chatDoc in chatDocuments:
+ rawBytes = self.services.chat.getFileData(chatDoc.fileId)
+ if not rawBytes:
+ logger.warning(f"Could not load file {chatDoc.fileId}, skipping")
+ continue
+ content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes
+ data = json.loads(content) if isinstance(content, str) else content
+ documentType = data.get("documentType")
+ extractedData = data.get("extractedData")
+ fileId = data.get("fileId") or chatDoc.fileId
+ fileName = data.get("fileName") or chatDoc.fileName or "document"
+
+ records = extractedData if isinstance(extractedData, list) else [extractedData] if extractedData else []
+ if not records:
+ continue
+
+ docPayload = {
+ "fileId": fileId,
+ "documentName": fileName,
+ "documentMimeType": chatDoc.mimeType or "application/octet-stream",
+ "sourceType": "workflow",
+ "documentType": documentType,
+ }
+ trusteeDoc = trusteeInterface.createDocument(docPayload)
+ if not trusteeDoc:
+ logger.warning(f"Failed to create TrusteeDocument for {fileName}")
+ continue
+ allDocumentIds.append(trusteeDoc.id)
+
+ for record in records:
+ posPayload = _recordToPosition(record, trusteeDoc.id, featureInstanceId, self.services.mandateId)
+ pos = trusteeInterface.createPosition(posPayload)
+ if pos:
+ allPositionIds.append(pos.id)
+
+ payload = {"positionIds": allPositionIds, "documentIds": allDocumentIds}
+ return ActionResult.isSuccess(
+ documents=[
+ ActionDocument(
+ documentName="process_documents_result.json",
+ documentData=json.dumps(payload),
+ mimeType="application/json",
+ )
+ ]
+ )
+ except Exception as e:
+ logger.exception("processDocuments failed")
+ return ActionResult.isFailure(error=str(e))
diff --git a/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py b/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py
new file mode 100644
index 00000000..4633f32e
--- /dev/null
+++ b/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py
@@ -0,0 +1,72 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Sync trustee positions to accounting (Buha).
+Input: featureInstanceId, documentList (reference to processDocuments result message).
+Reads positionIds from the document and calls AccountingBridge.pushBatchToAccounting.
+"""
+
+import json
+import logging
+from typing import Dict, Any, List
+
+from modules.datamodels.datamodelChat import ActionResult, ActionDocument
+from modules.datamodels.datamodelDocref import DocumentReferenceList
+
+logger = logging.getLogger(__name__)
+
+
+async def syncToAccounting(self, parameters: Dict[str, Any]) -> ActionResult:
+ """
+ Push trustee positions to the configured accounting system.
+ documentList must reference the message from processDocuments (one document with JSON { positionIds, documentIds }).
+ """
+ featureInstanceId = parameters.get("featureInstanceId") or (self.services.featureInstanceId if hasattr(self.services, "featureInstanceId") else None)
+ documentListParam = parameters.get("documentList")
+
+ if not featureInstanceId:
+ return ActionResult.isFailure(error="featureInstanceId is required")
+ if not documentListParam:
+ return ActionResult.isFailure(error="documentList is required (reference to processDocuments result)")
+
+ try:
+ docList = DocumentReferenceList.from_string_list(
+ documentListParam if isinstance(documentListParam, list) else [documentListParam]
+ )
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
+ if not chatDocuments:
+ return ActionResult.isFailure(error="No document found for documentList; ensure processDocuments ran before this action")
+
+ # Expect one document (JSON with positionIds, documentIds)
+ doc = chatDocuments[0]
+ rawBytes = self.services.chat.getFileData(doc.fileId)
+ if not rawBytes:
+ return ActionResult.isFailure(error=f"Could not load document content for fileId={doc.fileId}")
+
+ content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes
+ data = json.loads(content) if isinstance(content, str) else content
+ positionIds = data.get("positionIds") or []
+ if not positionIds:
+ return ActionResult.isSuccess(documents=[
+ ActionDocument(documentName="sync_result", documentData=json.dumps({"pushed": 0, "message": "No positionIds in document"}), mimeType="application/json")
+ ])
+
+ from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
+ from modules.features.trustee.accounting.accountingBridge import AccountingBridge
+
+ trusteeInterface = getTrusteeInterface(
+ self.services.user,
+ mandateId=self.services.mandateId,
+ featureInstanceId=featureInstanceId
+ )
+ bridge = AccountingBridge(trusteeInterface)
+ results = await bridge.pushBatchToAccounting(featureInstanceId, positionIds)
+ successCount = sum(1 for r in results if r.success)
+ summary = {"pushed": successCount, "total": len(positionIds), "results": [{"positionId": pid, "success": r.success, "error": getattr(r, "errorMessage", None)} for pid, r in zip(positionIds, results)]}
+
+ return ActionResult.isSuccess(documents=[
+ ActionDocument(documentName="sync_result", documentData=json.dumps(summary), mimeType="application/json")
+ ])
+ except Exception as e:
+ logger.exception("syncToAccounting failed")
+ return ActionResult.isFailure(error=str(e))
diff --git a/modules/workflows/methods/methodTrustee/methodTrustee.py b/modules/workflows/methods/methodTrustee/methodTrustee.py
new file mode 100644
index 00000000..fefeaa52
--- /dev/null
+++ b/modules/workflows/methods/methodTrustee/methodTrustee.py
@@ -0,0 +1,120 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Trustee document workflow method: extract from files, process to positions, sync to accounting.
+"""
+
+import logging
+from modules.workflows.methods.methodBase import MethodBase
+from modules.datamodels.datamodelWorkflowActions import WorkflowActionDefinition, WorkflowActionParameter
+from modules.shared.frontendTypes import FrontendType
+
+from .actions.extractFromFiles import extractFromFiles
+from .actions.processDocuments import processDocuments
+from .actions.syncToAccounting import syncToAccounting
+
+logger = logging.getLogger(__name__)
+
+
+class MethodTrustee(MethodBase):
+ """Trustee document and expense workflow: extract, process, sync to accounting."""
+
+ def __init__(self, services):
+ super().__init__(services)
+ self.name = "trustee"
+ self.description = "Trustee document extraction, processing and accounting sync"
+
+ self._actions = {
+ "extractFromFiles": WorkflowActionDefinition(
+ actionId="trustee.extractFromFiles",
+ description="Extract document type and data from PDF/JPG (fileIds or SharePoint folder)",
+ dynamicMode=False,
+ parameters={
+ "fileIds": WorkflowActionParameter(
+ name="fileIds",
+ type="list",
+ frontendType=FrontendType.JSON,
+ required=False,
+ description="List of file IDs already in DB (alternative to connectionReference + sharepointFolder)",
+ ),
+ "connectionReference": WorkflowActionParameter(
+ name="connectionReference",
+ type="str",
+ frontendType=FrontendType.USER_CONNECTION,
+ required=False,
+ description="Microsoft connection for SharePoint (use with sharepointFolder)",
+ ),
+ "sharepointFolder": WorkflowActionParameter(
+ name="sharepointFolder",
+ type="str",
+ frontendType=FrontendType.TEXT,
+ required=False,
+ description="SharePoint folder path (e.g. /sites/MySite/Documents/Expenses)",
+ ),
+ "featureInstanceId": WorkflowActionParameter(
+ name="featureInstanceId",
+ type="str",
+ frontendType=FrontendType.TEXT,
+ required=True,
+ description="Trustee feature instance ID",
+ ),
+ "prompt": WorkflowActionParameter(
+ name="prompt",
+ type="str",
+ frontendType=FrontendType.TEXTAREA,
+ required=False,
+ description="AI prompt for extraction (optional)",
+ ),
+ },
+ execute=extractFromFiles.__get__(self, self.__class__),
+ ),
+ "processDocuments": WorkflowActionDefinition(
+ actionId="trustee.processDocuments",
+ description="Create TrusteeDocument + TrusteePosition from extraction result (documentList from previous action)",
+ dynamicMode=False,
+ parameters={
+ "documentList": WorkflowActionParameter(
+ name="documentList",
+ type="list",
+ frontendType=FrontendType.DOCUMENT_REFERENCE,
+ required=True,
+ description="Reference to extractFromFiles result (e.g. docList:messageId:extract_result)",
+ ),
+ "featureInstanceId": WorkflowActionParameter(
+ name="featureInstanceId",
+ type="str",
+ frontendType=FrontendType.TEXT,
+ required=True,
+ description="Trustee feature instance ID",
+ ),
+ },
+ execute=processDocuments.__get__(self, self.__class__),
+ ),
+ "syncToAccounting": WorkflowActionDefinition(
+ actionId="trustee.syncToAccounting",
+ description="Push trustee positions to accounting (documentList = processDocuments result)",
+ dynamicMode=False,
+ parameters={
+ "documentList": WorkflowActionParameter(
+ name="documentList",
+ type="list",
+ frontendType=FrontendType.DOCUMENT_REFERENCE,
+ required=True,
+ description="Reference to processDocuments result message",
+ ),
+ "featureInstanceId": WorkflowActionParameter(
+ name="featureInstanceId",
+ type="str",
+ frontendType=FrontendType.TEXT,
+ required=True,
+ description="Trustee feature instance ID",
+ ),
+ },
+ execute=syncToAccounting.__get__(self, self.__class__),
+ ),
+ }
+ self._validateActions()
+
+ self.extractFromFiles = extractFromFiles.__get__(self, self.__class__)
+ self.processDocuments = processDocuments.__get__(self, self.__class__)
+ self.syncToAccounting = syncToAccounting.__get__(self, self.__class__)
diff --git a/modules/workflows/processing/modes/modeAutomation.py b/modules/workflows/processing/modes/modeAutomation.py
index e3131939..4e3c7853 100644
--- a/modules/workflows/processing/modes/modeAutomation.py
+++ b/modules/workflows/processing/modes/modeAutomation.py
@@ -210,9 +210,17 @@ class AutomationMode(BaseMode):
logger.info(f"Task {taskIndex} has {totalActions} actions to execute")
- # Execute all actions sequentially
+ # Execute all actions sequentially; after each, persist result and inject documentList for next
actionResults = []
+ lastPersistedMessageId = None
+ lastPersistedResultLabel = None
for actionIdx, action in enumerate(actions):
+ # Inject documentList from previous action's persisted message (chaining)
+ if actionIdx > 0 and lastPersistedMessageId and lastPersistedResultLabel:
+ if not isinstance(action.execParameters, dict):
+ action.execParameters = {}
+ action.execParameters["documentList"] = [f"docList:{lastPersistedMessageId}:{lastPersistedResultLabel}"]
+
# Check workflow status before each action
checkWorkflowStopped(self.services)
@@ -252,6 +260,34 @@ class AutomationMode(BaseMode):
if result.success:
logger.info(f"Action {actionNumber} completed successfully")
+ # Persist this action's result so next action can reference it via documentList
+ if getattr(self, "processor", None) and result.documents:
+ try:
+ from modules.datamodels.datamodelWorkflow import TaskResult as WorkflowTaskResult
+ resultLabel = action.execResultLabel or f"action_{actionNumber}_result"
+ actionResultWithLabel = ActionResult(
+ success=result.success,
+ documents=result.documents,
+ error=result.error,
+ resultLabel=resultLabel,
+ )
+ syntheticTaskResult = WorkflowTaskResult(
+ taskId=taskStep.id,
+ actionResult=actionResultWithLabel,
+ )
+ chatMessage = await self.processor.persistTaskResult(
+ syntheticTaskResult, workflow, context
+ )
+ if chatMessage and getattr(chatMessage, "id", None):
+ lastPersistedMessageId = chatMessage.id
+ lastPersistedResultLabel = getattr(
+ chatMessage, "documentsLabel", None
+ ) or resultLabel
+ logger.info(
+ f"Persisted action {actionNumber} result: docList:{lastPersistedMessageId}:{lastPersistedResultLabel}"
+ )
+ except Exception as e:
+ logger.warning(f"Per-action persist failed (chaining may break): {e}")
else:
logger.warning(f"Action {actionNumber} failed: {result.error}")
diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py
index a78a2270..3547008a 100644
--- a/modules/workflows/processing/workflowProcessor.py
+++ b/modules/workflows/processing/workflowProcessor.py
@@ -28,6 +28,7 @@ class WorkflowProcessor:
def __init__(self, services):
self.services = services
self.mode = self._createMode(services.workflow.workflowMode)
+ self.mode.processor = self # So mode can call persistTaskResult for per-action chaining
self.workflow = services.workflow
self.workflowExecOperationId = None # Will be set by workflowManager for task hierarchy