From 2eeaf35990d1217b13e2cf9cbd7c8d92a1c0e1cb Mon Sep 17 00:00:00 2001 From: patrick-motsch Date: Sun, 22 Feb 2026 00:07:33 +0100 Subject: [PATCH] sync trustee feature with rma --- .../trustee/accounting/accountingBridge.py | 106 ++- .../accounting/accountingConnectorBase.py | 26 +- .../connectors/accountingConnectorRma.py | 285 +++++- .../trustee/datamodelFeatureTrustee.py | 62 +- .../trustee/interfaceFeatureTrustee.py | 13 +- modules/features/trustee/mainTrustee.py | 6 + .../features/trustee/routeFeatureTrustee.py | 13 + .../mainServiceSharepoint.py | 25 + .../methodSharepoint/actions/__init__.py | 2 - .../actions/getExpensesFromPdf.py | 836 ------------------ .../methodSharepoint/methodSharepoint.py | 39 - .../methods/methodTrustee/__init__.py | 7 + .../methodTrustee/actions/extractFromFiles.py | 304 +++++++ .../methodTrustee/actions/processDocuments.py | 134 +++ .../methodTrustee/actions/syncToAccounting.py | 72 ++ .../methods/methodTrustee/methodTrustee.py | 120 +++ .../processing/modes/modeAutomation.py | 38 +- .../workflows/processing/workflowProcessor.py | 1 + 18 files changed, 1168 insertions(+), 921 deletions(-) delete mode 100644 modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py create mode 100644 modules/workflows/methods/methodTrustee/__init__.py create mode 100644 modules/workflows/methods/methodTrustee/actions/extractFromFiles.py create mode 100644 modules/workflows/methods/methodTrustee/actions/processDocuments.py create mode 100644 modules/workflows/methods/methodTrustee/actions/syncToAccounting.py create mode 100644 modules/workflows/methods/methodTrustee/methodTrustee.py diff --git a/modules/features/trustee/accounting/accountingBridge.py b/modules/features/trustee/accounting/accountingBridge.py index 664166c0..1e6a9f78 100644 --- a/modules/features/trustee/accounting/accountingBridge.py +++ b/modules/features/trustee/accounting/accountingBridge.py @@ -133,17 +133,98 @@ class AccountingBridge: return SyncResult(success=False, errorMessage=f"Position {positionId} not found") position = posRecords[0] - # Duplicate check + # Build booking once (for push; externalDocumentIds filled after document upload) + booking = self._buildBookingFromPosition(position) + + # 1) First: ensure all documents are in RMA (upload or duplicate); collect Beleg-IDs for linking + documentIds = [] + for key in ("documentId", "bankDocumentId"): + docId = position.get(key) + if docId: + documentIds.append(docId) + if documentIds: + from modules.features.trustee.datamodelFeatureTrustee import TrusteeDocument as TrusteeDocumentModel + logger.info("Accounting sync: positionId=%s, syncing %s document(s) to RMA ...", positionId, len(documentIds)) + belegIds = [] + belegLabels = [] + for documentId in documentIds: + doc = self._trusteeInterface.getDocument(documentId) + if not doc: + continue + fileName = getattr(doc, "documentName", None) or "beleg.pdf" + existingBelegId = getattr(doc, "externalBelegId", None) + if existingBelegId: + logger.info("Accounting sync: document %s already has belegId=%s, skipping upload", documentId, existingBelegId) + belegIds.append(existingBelegId) + belegLabels.append(fileName) + continue + docData = self._trusteeInterface.getDocumentData(documentId) + if docData is None: + continue + mimeType = getattr(doc, "documentMimeType", None) or "application/pdf" + uploadResult = await connector.uploadDocument( + plainConfig, + fileName=fileName, + fileContent=docData, + mimeType=mimeType, + comment=booking.reference, + ) + if not uploadResult.success: + errMsg = f"Dokument konnte nicht nach RMA hochgeladen werden: {uploadResult.errorMessage}" + logger.error( + "Accounting sync failed (document upload): positionId=%s, documentId=%s, error=%s", + positionId, documentId, uploadResult.errorMessage, + ) + return SyncResult(success=False, errorMessage=errMsg) + belegId = uploadResult.externalId + if belegId: + self._trusteeInterface.db.recordModify(TrusteeDocumentModel, documentId, {"externalBelegId": belegId}) + logger.info("Accounting sync: document uploaded & belegId=%s stored on document %s", belegId, documentId) + else: + logger.info("Accounting sync: document uploaded but no belegId in response (409 duplicate?), fileName=%s", fileName) + belegIds.append(belegId) + belegLabels.append(fileName) + if belegIds or belegLabels: + booking.externalDocumentIds = belegIds + booking.externalDocumentLabels = belegLabels + logger.info("Accounting sync: positionId=%s, document sync done, pushing GL booking (POST /gl) ...", positionId) + + # Duplicate check: if locally marked as synced, verify with Buha system + accountingSyncId = position.get("accountingSyncId") existingSyncs = self._trusteeInterface.db.getRecordset( TrusteeAccountingSync, recordFilter={"positionId": positionId, "connectorType": connectorType, "syncStatus": "synced"}, ) - if existingSyncs: - return SyncResult(success=False, errorMessage="Position already synced to this system") + if accountingSyncId or existingSyncs: + checkResult = await connector.isBookingSynced(plainConfig, booking) + if checkResult.success: + logger.info( + "Accounting sync skipped (verified in Buha): positionId=%s, reference=%s", + positionId, booking.reference, + ) + return SyncResult(success=False, errorMessage="Position already synced to this system") + # Not found in Buha (e.g. deleted there): clear local records and re-push + logger.info( + "Accounting sync: reference %s not found in Buha (deleted?), clearing local records and re-pushing positionId=%s", + booking.reference, positionId, + ) + if accountingSyncId: + self._trusteeInterface.db.recordModify(TrusteePosition, positionId, {"accountingSyncId": None}) + for rec in existingSyncs: + rid = rec.get("id") + if rid: + self._trusteeInterface.db.recordDelete(TrusteeAccountingSync, rid) - # Build and push - booking = self._buildBookingFromPosition(position) + # 2) Then: push booking (with reference to document IDs so RMA can link) + if not documentIds: + logger.info("Accounting sync: positionId=%s, no documents, pushing GL booking (POST /gl) ...", positionId) result = await connector.pushBooking(plainConfig, booking) + if not result.success: + logger.error( + "Accounting sync failed: positionId=%s, error=%s", + positionId, + result.errorMessage or "unknown", + ) # Save sync record import uuid @@ -163,13 +244,24 @@ class AccountingBridge: } self._trusteeInterface.db.recordCreate(TrusteeAccountingSync, syncRecord) + # Write back external ID to position (source of truth for sync check) + if result.success and result.externalId: + self._trusteeInterface.db.recordModify( + TrusteePosition, positionId, {"accountingSyncId": result.externalId} + ) + # Update last sync on config record if configRecord: from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig - self._trusteeInterface.db.recordModify(TrusteeAccountingConfig, configRecord["id"], { + updatePayload = { "lastSyncAt": time.time(), "lastSyncStatus": "success" if result.success else "error", - }) + } + if result.success: + updatePayload["lastSyncErrorMessage"] = None + else: + updatePayload["lastSyncErrorMessage"] = result.errorMessage or "Sync failed" + self._trusteeInterface.db.recordModify(TrusteeAccountingConfig, configRecord["id"], updatePayload) return result diff --git a/modules/features/trustee/accounting/accountingConnectorBase.py b/modules/features/trustee/accounting/accountingConnectorBase.py index 1a427e21..775a07b6 100644 --- a/modules/features/trustee/accounting/accountingConnectorBase.py +++ b/modules/features/trustee/accounting/accountingConnectorBase.py @@ -28,6 +28,8 @@ class AccountingBooking(BaseModel): bookingDate: str description: str = "" lines: List[AccountingBookingLine] = [] + externalDocumentIds: Optional[List[str]] = None # e.g. RMA Beleg-IDs, sent before booking for linking + externalDocumentLabels: Optional[List[str]] = None # display names for links (e.g. file names), one per id class AccountingChart(BaseModel): @@ -91,6 +93,19 @@ class BaseAccountingConnector(ABC): async def getBookingStatus(self, config: Dict[str, Any], externalId: str) -> SyncResult: """Query the status of a previously pushed booking.""" + async def getBookingByExternalId(self, config: Dict[str, Any], externalId: str) -> SyncResult: + """Fetch the booking in the external system by its external ID (UUID). + success=True: record exists. success=False: not found or error (e.g. deleted in Buha). + Override in connectors that support exact lookup; default = not supported.""" + return SyncResult(success=False, errorMessage="Lookup by external ID not supported by this connector") + + async def isBookingSynced(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult: + """Check with the external system if this booking already exists. + success=True: booking exists in external system (do not push again). + success=False: not found or error (allow push). + Default: success=True (trust local sync record; override in connectors that can verify via API, e.g. RMA).""" + return SyncResult(success=True) + async def pushInvoice(self, config: Dict[str, Any], invoice: Dict[str, Any]) -> SyncResult: """Push an invoice. Override in connectors that support it.""" return SyncResult(success=False, errorMessage="Not supported by this connector") @@ -103,6 +118,13 @@ class BaseAccountingConnector(ABC): """Load the vendor list. Override in connectors that support it.""" return [] - async def uploadDocument(self, config: Dict[str, Any], fileName: str, fileContent: bytes, mimeType: str = "application/pdf") -> SyncResult: - """Upload a document/receipt. Override in connectors that support it.""" + async def uploadDocument( + self, + config: Dict[str, Any], + fileName: str, + fileContent: bytes, + mimeType: str = "application/pdf", + comment: Optional[str] = None, + ) -> SyncResult: + """Upload a document/receipt (e.g. beleg). comment can link to booking reference. Override in connectors that support it.""" return SyncResult(success=False, errorMessage="Document upload not supported by this connector") diff --git a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py index 30aeff39..e55cfe40 100644 --- a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py +++ b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py @@ -7,8 +7,12 @@ Auth: Static API key via X-RMA-KEY header. Base URL: https://service.runmyaccounts.com/api/latest/clients/{clientName}/ """ +import asyncio +import json import logging -from typing import List, Dict, Any +import re +from datetime import datetime +from typing import List, Dict, Any, Optional import aiohttp @@ -59,7 +63,7 @@ class AccountingConnectorRma(BaseAccountingConnector): apiKey = config.get("apiKey", "") return { "Authorization": f"Bearer {apiKey}", - "Accept": "application/json", + "Accept": "application/json, application/xml, */*", # RMA may return XML on error (e.g. 406) "Content-Type": "application/json", } @@ -106,34 +110,97 @@ class AccountingConnectorRma(BaseAccountingConnector): return [] async def pushBooking(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult: - """Push a GL batch booking to RMA.""" + """Push a GL batch booking to RMA. API expects request body = batch object (no wrapper); only non-zero amount per line.""" try: - entries = [] - for line in booking.lines: - entry = { - "accno": line.accountNumber, - "transdate": booking.bookingDate, - "reference": booking.reference, - "description": line.description or booking.description, - } - if line.debitAmount > 0: - entry["debit"] = line.debitAmount - if line.creditAmount > 0: - entry["credit"] = line.creditAmount - if line.taxCode: - entry["tax_code"] = line.taxCode - entries.append(entry) + if not booking.lines: + return SyncResult(success=False, errorMessage="Missing transactions in batch (no booking lines)") - payload = {"gl_batch": {"entry": entries}} + glTransactions = [] + for line in booking.lines: + t = {"accno": line.accountNumber} + if line.debitAmount and line.debitAmount > 0: + t["debit_amount"] = line.debitAmount + if line.creditAmount and line.creditAmount > 0: + t["credit_amount"] = line.creditAmount + if line.description: + t["memo"] = (line.description or "")[:500] + glTransactions.append(t) + + transdate = booking.bookingDate or "" + if transdate and "T" not in str(transdate): + transdate = f"{transdate}T00:00:00.000+00:00" + batchNumber = (booking.reference or "").strip()[:32] + if not batchNumber: + batchNumber = "GL-" + (booking.reference or str(id(booking)))[:24] + rawDesc = (booking.description or "").strip() + externalDocIds = getattr(booking, "externalDocumentIds", None) or [] + externalDocLabels = getattr(booking, "externalDocumentLabels", None) or [] + if externalDocIds or externalDocLabels: + clientSegment = (config.get("clientId") or config.get("clientName") or "").strip() + docParts = [] + maxI = max(len(externalDocIds), len(externalDocLabels)) + for i in range(min(maxI, 10)): + label = (externalDocLabels[i] if i < len(externalDocLabels) else "Rechnung") or "Rechnung" + belegId = externalDocIds[i] if i < len(externalDocIds) else None + if belegId: + docUrl = f"https://my.runmyaccounts.com/phoenix-core/api/clients/{clientSegment}/workflow/l0doc/file/{belegId}" + docParts.append(f'{label}') + else: + docParts.append(label) + erfDate = datetime.utcnow().strftime("%d.%m.%Y") + linkSuffix = " (" + ", ".join(docParts) + ", erf. " + erfDate + ")" + shortDesc = (rawDesc[:80] + "…") if len(rawDesc) > 80 else rawDesc + description = (shortDesc + linkSuffix).strip()[:500] + else: + description = rawDesc[:500] + payload = { + "batch_number": batchNumber, + "transdate": transdate, + "description": description, + "currency": "CHF", + "exchangerate": 1.0, + "gl_transactions": {"gl_transaction": glTransactions}, + } + if rawDesc and len(rawDesc) > 80: + payload["notes"] = rawDesc[:2000] async with aiohttp.ClientSession() as session: url = self._buildUrl(config, "gl") async with session.post(url, headers=self._buildHeaders(config), json=payload, timeout=aiohttp.ClientTimeout(total=30)) as resp: body = await resp.text() if resp.status in (200, 201, 204): - return SyncResult(success=True, rawResponse={"status": resp.status, "body": body[:500]}) - return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:300]}") + externalId = None + try: + data = json.loads(body) if body.strip() else {} + if isinstance(data, dict): + externalId = ( + data.get("id") + or data.get("batch_id") + or data.get("entry_id") + or (data.get("gl_batch") or {}).get("id") + ) + if externalId is not None: + externalId = str(externalId).strip() + if not externalId: + externalId = batchNumber + except Exception: + externalId = batchNumber + return SyncResult( + success=True, + externalId=externalId or batchNumber, + rawResponse={"status": resp.status, "body": body[:500]}, + ) + errMsg = f"HTTP {resp.status}: {body[:300]}" + logger.error("RMA pushBooking failed: status=%s, body=%s", resp.status, body[:500]) + return SyncResult(success=False, errorMessage=errMsg) + except asyncio.TimeoutError: + logger.warning("RMA pushBooking timeout (30s)") + return SyncResult( + success=False, + errorMessage="Verbindung zum Buchhaltungssystem hat zu lange gedauert (Timeout). Bitte später erneut versuchen.", + ) except Exception as e: + logger.exception("RMA pushBooking error") return SyncResult(success=False, errorMessage=str(e)) async def getBookingStatus(self, config: Dict[str, Any], externalId: str) -> SyncResult: @@ -147,6 +214,67 @@ class AccountingConnectorRma(BaseAccountingConnector): except Exception as e: return SyncResult(success=False, errorMessage=str(e)) + async def getBookingByExternalId(self, config: Dict[str, Any], externalId: str) -> SyncResult: + """Check if the GL batch/transaction still exists in RMA by external ID (id or batch_number).""" + if not externalId or not str(externalId).strip(): + return SyncResult(success=False, errorMessage="Not found") + try: + async with aiohttp.ClientSession() as session: + url = self._buildUrl(config, f"gl/{externalId.strip()}") + async with session.get( + url, headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=15) + ) as resp: + if resp.status == 200: + return SyncResult(success=True, externalId=externalId) + if resp.status == 404: + return SyncResult(success=False, errorMessage="Not found") + body = await resp.text() + return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:200]}") + except Exception as e: + logger.debug("RMA getBookingByExternalId failed: %s", e) + return SyncResult(success=False, errorMessage=str(e)) + + async def isBookingSynced(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult: + """Check if booking exists in RMA by searching transactions for reference match. + Uses GET charts/{accno}/transactions with date filter. success=True = booking exists.""" + if not booking.lines: + return SyncResult(success=True) + ref = (booking.reference or "").strip()[:32] + if not ref: + return SyncResult(success=True) + fromDate = (booking.bookingDate or "").split("T")[0].strip()[:10] + if not fromDate or len(fromDate) < 10: + return SyncResult(success=True) + accountNumbers = list({ln.accountNumber for ln in booking.lines if ln.accountNumber}) + if not accountNumbers: + return SyncResult(success=True) + try: + async with aiohttp.ClientSession() as session: + for accno in accountNumbers: + url = self._buildUrl(config, f"charts/{accno}/transactions") + params = {"from_date": fromDate, "to_date": fromDate} + async with session.get( + url, headers=self._buildHeaders(config), params=params, timeout=aiohttp.ClientTimeout(total=15) + ) as resp: + if resp.status != 200: + continue + data = await resp.json() + transactions = data.get("transaction") if isinstance(data, dict) else [] + if isinstance(data, list): + transactions = data + if not isinstance(transactions, list): + transactions = [transactions] if isinstance(transactions, dict) else [] + for t in transactions: + if isinstance(t, dict) and (t.get("reference") or "").strip() == ref: + return SyncResult(success=True) + return SyncResult(success=False, errorMessage="Reference not found in RMA transactions") + except asyncio.TimeoutError: + logger.debug("RMA isBookingSynced timeout – trust local") + return SyncResult(success=True) + except Exception as e: + logger.debug("RMA isBookingSynced error: %s – trust local", e) + return SyncResult(success=True) + async def pushInvoice(self, config: Dict[str, Any], invoice: Dict[str, Any]) -> SyncResult: try: async with aiohttp.ClientSession() as session: @@ -185,20 +313,127 @@ class AccountingConnectorRma(BaseAccountingConnector): logger.error(f"RMA getVendors error: {e}") return [] - async def uploadDocument(self, config: Dict[str, Any], fileName: str, fileContent: bytes, mimeType: str = "application/pdf") -> SyncResult: - """Upload a receipt via POST /belege (multipart/form-data).""" + async def _findBelegByFilename(self, config: Dict[str, Any], session: aiohttp.ClientSession, fileName: str) -> Optional[str]: + """Try GET /belege (undocumented) to find an existing beleg by filename.""" + try: + url = self._buildUrl(config, "belege") + async with session.get(url, headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=15)) as resp: + body = await resp.text() + logger.info("RMA GET /belege: status=%s, body=%s", resp.status, body[:1000]) + if resp.status != 200: + return None + try: + data = json.loads(body) + except Exception: + data = None + if isinstance(data, list): + items = data + elif isinstance(data, dict): + items = data.get("beleg_upload") or data.get("belege") or data.get("row") or [] + if isinstance(items, dict): + items = [items] + else: + # Try XML: extract all + pairs + ids = re.findall(r"([^<]+)", body) + names = re.findall(r"([^<]+)", body) + for bid, fname in zip(ids, names): + if fileName.lower() in fname.lower(): + logger.info("RMA GET /belege: matched filename=%s → belegId=%s", fname, bid) + return bid.strip() + return None + if not isinstance(items, list): + return None + for item in items: + if not isinstance(item, dict): + continue + fname = item.get("file_name") or item.get("fileName") or "" + if fileName.lower() in fname.lower(): + bid = item.get("id") + if bid: + logger.info("RMA GET /belege: matched filename=%s → belegId=%s", fname, bid) + return str(bid).strip() + except Exception as e: + logger.debug("RMA _findBelegByFilename failed: %s", e) + return None + + def _parseExistingBelegId(self, body: str) -> Optional[str]: + """Try to extract the existing Beleg-ID from a 409 duplicate response. Tries multiple patterns.""" + if not (body or "").strip(): + return None + # Pattern: "id":12345 or "id":"12345" in JSON + match = re.search(r'"id"\s*:\s*"?(\d+)"?', body) + if match: + return match.group(1).strip() + # Pattern: 12345 in XML + match = re.search(r"([^<]+)", body) + if match: + return match.group(1).strip() + # Pattern: numeric id in URL like /file/12345 + match = re.search(r"/file/(\d+)", body) + if match: + return match.group(1).strip() + return None + + def _parseBelegIdFromResponse(self, body: str) -> Optional[str]: + """Extract Beleg-ID from RMA POST /belege response (XML or JSON).""" + if not (body or "").strip(): + return None + try: + data = json.loads(body) + if isinstance(data, dict): + bid = data.get("id") or (data.get("beleg_upload") or {}).get("id") + if bid is not None: + return str(bid).strip() + except Exception: + pass + match = re.search(r"([^<]+)", body) + if match: + return match.group(1).strip() + return None + + async def uploadDocument( + self, + config: Dict[str, Any], + fileName: str, + fileContent: bytes, + mimeType: str = "application/pdf", + comment: Optional[str] = None, + ) -> SyncResult: + """Upload a receipt via POST /belege (multipart/form-data). Returns SyncResult with externalId = Beleg-ID when present.""" try: formData = aiohttp.FormData() formData.add_field("Filedata", fileContent, filename=fileName, content_type=mimeType) + if comment: + formData.add_field("comment", comment[:500]) headers = self._buildHeaders(config) headers.pop("Content-Type", None) # let aiohttp set multipart boundary + headers["Accept"] = "*/*" # RMA may return XML on error; avoid 406 Not Acceptable async with aiohttp.ClientSession() as session: url = self._buildUrl(config, "belege") async with session.post(url, headers=headers, data=formData, timeout=aiohttp.ClientTimeout(total=60)) as resp: body = await resp.text() + belegId = self._parseBelegIdFromResponse(body) if resp.status in (200, 201): - return SyncResult(success=True, rawResponse={"body": body[:500]}) - return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:300]}") + return SyncResult( + success=True, + externalId=belegId, + rawResponse={"body": body[:500]}, + ) + if resp.status == 409: + logger.info("RMA uploadDocument 409 (duplicate), body=%s", body[:500]) + if not belegId: + belegId = self._parseExistingBelegId(body) + if not belegId: + belegId = await self._findBelegByFilename(config, session, fileName) + return SyncResult( + success=True, + externalId=belegId, + rawResponse={"body": body[:500]}, + ) + errMsg = f"HTTP {resp.status}: {body[:300]}" + logger.error("RMA uploadDocument failed: status=%s, body=%s", resp.status, body[:500]) + return SyncResult(success=False, errorMessage=errMsg) except Exception as e: + logger.exception("RMA uploadDocument error") return SyncResult(success=False, errorMessage=str(e)) diff --git a/modules/features/trustee/datamodelFeatureTrustee.py b/modules/features/trustee/datamodelFeatureTrustee.py index 09643735..2f630464 100644 --- a/modules/features/trustee/datamodelFeatureTrustee.py +++ b/modules/features/trustee/datamodelFeatureTrustee.py @@ -2,6 +2,7 @@ # All rights reserved. """Trustee models: TrusteeOrganisation, TrusteeRole, TrusteeAccess, TrusteeContract, TrusteeDocument, TrusteePosition.""" +from enum import Enum from typing import Optional from pydantic import BaseModel, Field from modules.shared.attributeUtils import registerModelLabels @@ -278,6 +279,16 @@ registerModelLabels( ) +class TrusteeDocumentTypeEnum(str, Enum): + """Document type for trustee documents (expense extraction, ingest, sync).""" + INVOICE = "invoice" + EXPENSE_RECEIPT = "expense_receipt" + BANK_DOCUMENT = "bank_document" + CONTRACT = "contract" + UNKNOWN = "unknown" + AUTO = "auto" + + class TrusteeDocument(BaseModel): """Contains document references for bookings. @@ -362,6 +373,25 @@ class TrusteeDocument(BaseModel): "frontend_hidden": True } ) + documentType: Optional[str] = Field( + default=None, + description="Document type (e.g. invoice, expense_receipt, bank_document, contract); use TrusteeDocumentTypeEnum values", + json_schema_extra={ + "frontend_type": "text", + "frontend_readonly": False, + "frontend_required": False + } + ) + externalBelegId: Optional[str] = Field( + default=None, + description="External Beleg-ID in accounting system (e.g. RMA); set on first successful upload, reused on re-sync", + json_schema_extra={ + "frontend_type": "text", + "frontend_readonly": True, + "frontend_required": False, + "frontend_hidden": True + } + ) # System attributes are automatically set by DatabaseConnector @@ -377,6 +407,8 @@ registerModelLabels( "sourceLocation": {"en": "Source Location", "fr": "Emplacement source", "de": "Quellort"}, "mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"}, "featureInstanceId": {"en": "Feature Instance", "fr": "Instance de fonctionnalité", "de": "Feature-Instanz"}, + "documentType": {"en": "Document Type", "fr": "Type de document", "de": "Dokumenttyp"}, + "externalBelegId": {"en": "Beleg ID (Accounting)", "fr": "ID Beleg (Comptabilité)", "de": "Beleg-ID (Buchhaltung)"}, }, ) @@ -384,7 +416,7 @@ registerModelLabels( class TrusteePosition(BaseModel): """Contains booking positions (expense entries). - Each position references exactly one source document via documentId (1:N relationship). + A position can have up to two document references: documentId (Beleg) and bankDocumentId (Bank-Referenz). One document (e.g. bank statement) can generate many positions. """ id: str = Field( @@ -398,7 +430,17 @@ class TrusteePosition(BaseModel): ) documentId: Optional[str] = Field( default=None, - description="Reference to TrusteeDocument.id (source document that generated this position)", + description="Reference to TrusteeDocument.id (Beleg / primary document)", + json_schema_extra={ + "frontend_type": "select", + "frontend_readonly": False, + "frontend_required": False, + "frontend_options": "/api/trustee/{instanceId}/documents/options" + } + ) + bankDocumentId: Optional[str] = Field( + default=None, + description="Reference to TrusteeDocument.id (Bank-Referenz / second document)", json_schema_extra={ "frontend_type": "select", "frontend_readonly": False, @@ -582,7 +624,17 @@ class TrusteePosition(BaseModel): "frontend_hidden": True } ) - + accountingSyncId: Optional[str] = Field( + default=None, + description="External ID (UUID) of the synced record in the accounting system; set by sync, used for duplicate check", + json_schema_extra={ + "frontend_type": "text", + "frontend_readonly": True, + "frontend_required": False, + "frontend_hidden": True + } + ) + # Allow extra fields like _createdAt from database model_config = {"extra": "allow"} @@ -593,6 +645,7 @@ registerModelLabels( { "id": {"en": "ID", "fr": "ID", "de": "ID"}, "documentId": {"en": "Document", "fr": "Document", "de": "Dokument"}, + "bankDocumentId": {"en": "Bank Reference", "fr": "Référence bancaire", "de": "Bank-Referenz"}, "valuta": {"en": "Value Date", "fr": "Date de valeur", "de": "Valutadatum"}, "transactionDateTime": {"en": "Transaction Date/Time", "fr": "Date/Heure de transaction", "de": "Transaktionszeitpunkt"}, "company": {"en": "Company", "fr": "Entreprise", "de": "Firma"}, @@ -611,6 +664,7 @@ registerModelLabels( "bookingReference": {"en": "Booking Reference", "fr": "Référence de réservation", "de": "Buchungsreferenz"}, "mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"}, "featureInstanceId": {"en": "Feature Instance", "fr": "Instance de fonctionnalité", "de": "Feature-Instanz"}, + "accountingSyncId": {"en": "Accounting Sync ID", "fr": "ID sync comptabilité", "de": "Buha-Sync-ID"}, }, ) @@ -629,6 +683,7 @@ class TrusteeAccountingConfig(BaseModel): isActive: bool = Field(default=True) lastSyncAt: Optional[float] = Field(default=None, description="Timestamp of last sync attempt") lastSyncStatus: Optional[str] = Field(default=None, description="Last sync result: success, error, partial") + lastSyncErrorMessage: Optional[str] = Field(default=None, description="Error message when lastSyncStatus is error") mandateId: Optional[str] = Field(default=None) @@ -643,6 +698,7 @@ registerModelLabels( "isActive": {"en": "Active", "fr": "Actif", "de": "Aktiv"}, "lastSyncAt": {"en": "Last Sync", "fr": "Dernière sync.", "de": "Letzte Synchronisation"}, "lastSyncStatus": {"en": "Status", "fr": "Statut", "de": "Status"}, + "lastSyncErrorMessage": {"en": "Error", "fr": "Erreur", "de": "Fehlermeldung"}, "mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"}, }, ) diff --git a/modules/features/trustee/interfaceFeatureTrustee.py b/modules/features/trustee/interfaceFeatureTrustee.py index 9ead992d..4c97cc4d 100644 --- a/modules/features/trustee/interfaceFeatureTrustee.py +++ b/modules/features/trustee/interfaceFeatureTrustee.py @@ -1121,12 +1121,13 @@ class TrusteeObjects: logger.warning(f"User {self.userId} lacks permission to delete document") return False - # Clear documentId on positions that reference this document - positions = self.db.getRecordset(TrusteePosition, recordFilter={"documentId": documentId}) - for pos in positions: - posId = pos.get("id") - if posId: - self.db.recordModify(TrusteePosition, posId, {"documentId": None}) + # Clear documentId or bankDocumentId on positions that reference this document + for field in ("documentId", "bankDocumentId"): + positions = self.db.getRecordset(TrusteePosition, recordFilter={field: documentId}) + for pos in positions: + posId = pos.get("id") + if posId: + self.db.recordModify(TrusteePosition, posId, {field: None}) return self.db.recordDelete(TrusteeDocument, documentId) diff --git a/modules/features/trustee/mainTrustee.py b/modules/features/trustee/mainTrustee.py index 10d0a95b..1fa1948f 100644 --- a/modules/features/trustee/mainTrustee.py +++ b/modules/features/trustee/mainTrustee.py @@ -38,6 +38,11 @@ UI_OBJECTS = [ "label": {"en": "Expense Import", "de": "Spesen Import", "fr": "Import de dépenses"}, "meta": {"area": "expense-import"} }, + { + "objectKey": "ui.feature.trustee.scan-upload", + "label": {"en": "Scan / Upload", "de": "Scannen / Hochladen", "fr": "Scanner / Téléverser"}, + "meta": {"area": "scan-upload"} + }, { "objectKey": "ui.feature.trustee.settings", "label": {"en": "Accounting Settings", "de": "Buchhaltungs-Einstellungen", "fr": "Paramètres comptables"}, @@ -189,6 +194,7 @@ TEMPLATE_ROLES = [ {"context": "UI", "item": "ui.feature.trustee.positions", "view": True}, {"context": "UI", "item": "ui.feature.trustee.documents", "view": True}, {"context": "UI", "item": "ui.feature.trustee.expense-import", "view": True}, + {"context": "UI", "item": "ui.feature.trustee.scan-upload", "view": True}, # Own records only (MY level) {"context": "DATA", "item": "data.feature.trustee.TrusteePosition", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"}, {"context": "DATA", "item": "data.feature.trustee.TrusteeDocument", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"}, diff --git a/modules/features/trustee/routeFeatureTrustee.py b/modules/features/trustee/routeFeatureTrustee.py index cab6eecb..2161f719 100644 --- a/modules/features/trustee/routeFeatureTrustee.py +++ b/modules/features/trustee/routeFeatureTrustee.py @@ -1378,6 +1378,13 @@ async def sync_positions_to_accounting( raise HTTPException(status_code=400, detail="positionIds required") results = await bridge.pushBatchToAccounting(instanceId, positionIds) + failed = [r for r in results if not r.success] + if failed: + logger.warning( + "Accounting sync had %s failure(s): %s", + len(failed), + "; ".join(r.errorMessage or "unknown" for r in failed[:3]), + ) return { "total": len(results), "success": sum(1 for r in results if r.success), @@ -1400,6 +1407,12 @@ async def sync_single_position_to_accounting( from .accounting.accountingBridge import AccountingBridge bridge = AccountingBridge(interface) result = await bridge.pushPositionToAccounting(instanceId, positionId) + if not result.success: + logger.warning( + "Accounting sync failed for positionId=%s: %s", + positionId, + result.errorMessage or "unknown", + ) return result.model_dump() diff --git a/modules/services/serviceSharepoint/mainServiceSharepoint.py b/modules/services/serviceSharepoint/mainServiceSharepoint.py index dc8717a5..62276a79 100644 --- a/modules/services/serviceSharepoint/mainServiceSharepoint.py +++ b/modules/services/serviceSharepoint/mainServiceSharepoint.py @@ -119,6 +119,15 @@ class SharepointService: error_text = await response.text() logger.error(f"Graph API call failed: {response.status} - {error_text}") return {"error": f"API call failed: {response.status} - {error_text}"} + + elif method == "DELETE": + async with session.delete(url, headers=headers) as response: + if response.status in [200, 204]: + return {} + else: + error_text = await response.text() + logger.error(f"Graph API call failed: {response.status} - {error_text}") + return {"error": f"API call failed: {response.status} - {error_text}"} except asyncio.TimeoutError: logger.error(f"Graph API call timed out after 30 seconds: {endpoint}") @@ -476,6 +485,22 @@ class SharepointService: raise Exception(f"Source file not found (404): {sourcePath} - {errorMsg}") else: raise Exception(f"Error copying file: {errorMsg}") + + async def deleteFile(self, siteId: str, itemId: str) -> bool: + """Delete a file (or folder) from SharePoint by item ID. Returns True on success.""" + try: + if not siteId or not itemId: + logger.warning("deleteFile: siteId and itemId are required") + return False + endpoint = f"sites/{siteId}/drive/items/{itemId}" + result = await self._makeGraphApiCall(endpoint, method="DELETE") + if result and "error" in result: + logger.warning(f"deleteFile failed: {result.get('error')}") + return False + return True + except Exception as e: + logger.error(f"Error deleting file: {str(e)}") + return False async def downloadFileByPath(self, siteId: str, filePath: str) -> Optional[bytes]: """Download a file by its path within a site.""" diff --git a/modules/workflows/methods/methodSharepoint/actions/__init__.py b/modules/workflows/methods/methodSharepoint/actions/__init__.py index d59d29aa..6975f8af 100644 --- a/modules/workflows/methods/methodSharepoint/actions/__init__.py +++ b/modules/workflows/methods/methodSharepoint/actions/__init__.py @@ -13,7 +13,6 @@ from .findSiteByUrl import findSiteByUrl from .downloadFileByPath import downloadFileByPath from .copyFile import copyFile from .uploadFile import uploadFile -from .getExpensesFromPdf import getExpensesFromPdf __all__ = [ 'findDocumentPath', @@ -25,6 +24,5 @@ __all__ = [ 'downloadFileByPath', 'copyFile', 'uploadFile', - 'getExpensesFromPdf', ] diff --git a/modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py b/modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py deleted file mode 100644 index 9c873bb2..00000000 --- a/modules/workflows/methods/methodSharepoint/actions/getExpensesFromPdf.py +++ /dev/null @@ -1,836 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. - -""" -Action to extract expenses from PDF documents in SharePoint and save to TrusteePosition. - -Process: -1. Read PDF files from SharePoint folder (max 50 files per execution) -2. FOR EACH PDF document: - a. AI call to extract expense data in CSV format - b. If 0 records: move to "error" folder - c. Validate/calculate VAT, complete valuta/transactionDateTime - d. Save all records to TrusteePosition - e. Move document to "processed" subfolder with timestamp prefix -""" - -import logging -import time -import json -import csv -import io -import asyncio -from datetime import datetime, UTC -from typing import Dict, Any, List, Optional -from modules.datamodels.datamodelChat import ActionResult, ActionDocument -from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum - -logger = logging.getLogger(__name__) - -# Configuration -MAX_FILES_PER_EXECUTION = 50 -MAX_CONCURRENT_AI_TASKS = 10 # Limit concurrent AI calls to avoid rate limits -ALLOWED_TAGS = ["customer", "meeting", "license", "subscription", "fuel", "food", "material"] -RATE_LIMIT_WAIT_SECONDS = 60 - - -async def getExpensesFromPdf(self, parameters: Dict[str, Any]) -> ActionResult: - """ - Extract expenses from PDF documents in SharePoint and save to TrusteePosition. - - Parameters: - - connectionReference (str): Microsoft connection label - - sharepointFolder (str): SharePoint folder path (e.g., /sites/MySite/Documents/Expenses) - - featureInstanceId (str): Feature instance ID for TrusteePosition - - prompt (str): AI prompt for content extraction - - Returns: - ActionResult with success status and processing summary - """ - operationId = None - processedDocuments = [] - skippedDocuments = [] - errorDocuments = [] - totalPositions = 0 - - try: - # Initialize progress tracking - workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" - operationId = f"sharepoint_expenses_{workflowId}_{int(time.time())}" - - parentOperationId = parameters.get('parentOperationId') - self.services.chat.progressLogStart( - operationId, - "Extract Expenses from PDF", - "SharePoint PDF Processing", - "Initializing expense extraction", - parentOperationId=parentOperationId - ) - - # Extract and validate parameters - connectionReference = parameters.get("connectionReference") - sharepointFolder = parameters.get("sharepointFolder") - featureInstanceId = parameters.get("featureInstanceId") - prompt = parameters.get("prompt") - - if not connectionReference: - self.services.chat.progressLogFinish(operationId, False) - return ActionResult.isFailure(error="connectionReference is required") - if not sharepointFolder: - self.services.chat.progressLogFinish(operationId, False) - return ActionResult.isFailure(error="sharepointFolder is required") - if not featureInstanceId: - self.services.chat.progressLogFinish(operationId, False) - return ActionResult.isFailure(error="featureInstanceId is required") - if not prompt: - self.services.chat.progressLogFinish(operationId, False) - return ActionResult.isFailure(error="prompt is required") - - # Get Microsoft connection - self.services.chat.progressLogUpdate(operationId, 0.05, "Getting Microsoft connection") - connection = self.connection.getMicrosoftConnection(connectionReference) - if not connection: - self.services.chat.progressLogFinish(operationId, False) - return ActionResult.isFailure(error="No valid Microsoft connection found") - - # Set access token for SharePoint service - if not self.services.sharepoint.setAccessTokenFromConnection(connection): - self.services.chat.progressLogFinish(operationId, False) - return ActionResult.isFailure(error="Failed to set SharePoint access token") - - # Find site and folder info - self.services.chat.progressLogUpdate(operationId, 0.1, "Resolving SharePoint site") - siteInfo, folderPath = await _resolveSiteAndFolder(self, sharepointFolder) - if not siteInfo: - self.services.chat.progressLogFinish(operationId, False) - return ActionResult.isFailure(error=f"Could not resolve SharePoint site from path: {sharepointFolder}") - - siteId = siteInfo.get("id") - - # List PDF files in folder - self.services.chat.progressLogUpdate(operationId, 0.15, "Finding PDF files in folder") - pdfFiles = await _listPdfFilesInFolder(self, siteId, folderPath) - - if not pdfFiles: - self.services.chat.progressLogFinish(operationId, True) - return ActionResult.isSuccess( - documents=[ActionDocument( - documentName="expense_extraction_result.json", - documentData=json.dumps({ - "status": "no_documents", - "message": "No PDF files found in the specified folder", - "folder": sharepointFolder - }, indent=2), - mimeType="application/json", - validationMetadata={"actionType": "sharepoint.getExpensesFromPdf"} - )] - ) - - # Limit files - originalFileCount = len(pdfFiles) - if originalFileCount > MAX_FILES_PER_EXECUTION: - logger.warning(f"Found {originalFileCount} PDFs, limiting to {MAX_FILES_PER_EXECUTION}") - pdfFiles = pdfFiles[:MAX_FILES_PER_EXECUTION] - - totalFiles = len(pdfFiles) - progressPerFile = 0.7 / totalFiles - - # Get Trustee interface - from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface - trusteeInterface = getTrusteeInterface( - self.services.user, - mandateId=self.services.mandateId, - featureInstanceId=featureInstanceId - ) - - # Process PDFs in parallel with semaphore to limit concurrent AI calls - semaphore = asyncio.Semaphore(MAX_CONCURRENT_AI_TASKS) - completedCount = [0] # Use list for mutable reference in closure - - async def processSinglePdf(idx: int, pdfFile: Dict[str, Any]) -> Dict[str, Any]: - """Process a single PDF document. Returns result dict.""" - fileName = pdfFile.get("name", f"file_{idx}") - fileId = pdfFile.get("id") - - async with semaphore: - # Update progress (thread-safe via asyncio) - completedCount[0] += 1 - currentProgress = 0.2 + (completedCount[0] * progressPerFile) - self.services.chat.progressLogUpdate( - operationId, - min(currentProgress, 0.9), - f"Processing {completedCount[0]}/{totalFiles}: {fileName}" - ) - - try: - # Download PDF content - fileContent = await self.services.sharepoint.downloadFile(siteId, fileId) - if not fileContent: - await _moveToErrorFolder(self, siteId, folderPath, fileName) - return {"type": "error", "file": fileName, "error": "Failed to download", "movedTo": "error/"} - - # AI call to extract expense data (this is the bottleneck - parallelized) - aiResult = await _extractExpensesWithAi(self.services, fileContent, fileName, prompt, featureInstanceId) - - if not aiResult.get("success"): - await _moveToErrorFolder(self, siteId, folderPath, fileName) - return {"type": "error", "file": fileName, "error": aiResult.get("error", "AI extraction failed"), "movedTo": "error/"} - - records = aiResult.get("records", []) - fileId = aiResult.get("fileId") - - # Check for empty records - if not records: - logger.warning(f"Document {fileName}: No records extracted, moving to error folder") - await _moveToErrorFolder(self, siteId, folderPath, fileName) - return {"type": "skipped", "file": fileName, "reason": "No expense records extracted", "movedTo": "error/"} - - # Validate and enrich records - validatedRecords = _validateAndEnrichRecords(records, fileName) - - # Save to TrusteePosition and create Document + Position-Document links - savedCount = _saveToTrusteePosition( - trusteeInterface, - validatedRecords, - featureInstanceId, - self.services.mandateId, - fileId=fileId, - fileName=fileName, - sourceLocation=sharepointFolder - ) - - # Move document to "processed" subfolder - timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - newFileName = f"{timestamp}_{fileName}" - - moveSuccess = await _moveToProcessedFolder(self, siteId, folderPath, fileName, newFileName) - - return { - "type": "processed", - "file": fileName, - "newLocation": f"processed/{newFileName}" if moveSuccess else "move_failed", - "recordsExtracted": len(validatedRecords), - "recordsSaved": savedCount - } - - except Exception as e: - errorMsg = str(e) - logger.error(f"Error processing {fileName}: {errorMsg}") - - # Handle rate limit - if "429" in errorMsg or "throttl" in errorMsg.lower(): - logger.warning(f"Rate limit hit, waiting {RATE_LIMIT_WAIT_SECONDS} seconds") - await asyncio.sleep(RATE_LIMIT_WAIT_SECONDS) - - await _moveToErrorFolder(self, siteId, folderPath, fileName) - return {"type": "error", "file": fileName, "error": errorMsg, "movedTo": "error/"} - - # Execute all PDF processing tasks in parallel (limited by semaphore) - logger.info(f"Starting parallel processing of {totalFiles} PDFs (max {MAX_CONCURRENT_AI_TASKS} concurrent)") - tasks = [processSinglePdf(idx, pdfFile) for idx, pdfFile in enumerate(pdfFiles)] - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Collect results - for result in results: - if isinstance(result, Exception): - errorDocuments.append({"file": "unknown", "error": str(result), "movedTo": "error/"}) - elif result.get("type") == "processed": - processedDocuments.append(result) - totalPositions += result.get("recordsSaved", 0) - elif result.get("type") == "skipped": - skippedDocuments.append(result) - elif result.get("type") == "error": - errorDocuments.append(result) - - # Create result summary - self.services.chat.progressLogUpdate(operationId, 0.95, "Creating result summary") - - remainingFiles = max(0, originalFileCount - MAX_FILES_PER_EXECUTION) - - resultSummary = { - "status": "completed", - "folder": sharepointFolder, - "featureInstanceId": featureInstanceId, - "summary": { - "totalFilesFound": originalFileCount, - "filesProcessedThisRun": totalFiles, - "remainingFiles": remainingFiles, - "successfulDocuments": len(processedDocuments), - "skippedDocuments": len(skippedDocuments), - "errorDocuments": len(errorDocuments), - "totalPositionsSaved": totalPositions - }, - "processedDocuments": processedDocuments, - "skippedDocuments": skippedDocuments, - "errorDocuments": errorDocuments - } - - if remainingFiles > 0: - resultSummary["note"] = f"{remainingFiles} files remaining for next execution" - - self.services.chat.progressLogFinish(operationId, True) - - return ActionResult.isSuccess( - documents=[ActionDocument( - documentName="expense_extraction_result.json", - documentData=json.dumps(resultSummary, indent=2), - mimeType="application/json", - validationMetadata={ - "actionType": "sharepoint.getExpensesFromPdf", - "sharepointFolder": sharepointFolder, - "featureInstanceId": featureInstanceId, - "totalPositions": totalPositions - } - )] - ) - - except Exception as e: - logger.error(f"Error in getExpensesFromPdf: {str(e)}") - if operationId: - self.services.chat.progressLogFinish(operationId, False) - return ActionResult.isFailure(error=str(e)) - - -async def _resolveSiteAndFolder(self, sharepointFolder: str) -> tuple: - """Resolve SharePoint site and folder path from the given path.""" - try: - # Parse path format: /sites/SiteName/FolderPath - if sharepointFolder.startswith('/sites/'): - parts = sharepointFolder[7:].split('/', 1) # Remove '/sites/' prefix - if len(parts) >= 1: - siteName = parts[0] - folderPath = parts[1] if len(parts) > 1 else "" - - # Try to find site by name - sites, _ = await self.siteDiscovery.resolveSitesFromPathQuery(sharepointFolder) - if sites: - return sites[0], folderPath - - # Fallback: try to resolve via siteDiscovery - sites, _ = await self.siteDiscovery.resolveSitesFromPathQuery(sharepointFolder) - if sites: - return sites[0], "" - - return None, None - - except Exception as e: - logger.error(f"Error resolving site and folder: {str(e)}") - return None, None - - -async def _listPdfFilesInFolder(self, siteId: str, folderPath: str) -> List[Dict[str, Any]]: - """List PDF files in the given folder.""" - try: - import urllib.parse - - # Build endpoint - if not folderPath or folderPath == "/": - endpoint = f"sites/{siteId}/drive/root/children" - else: - cleanPath = folderPath.strip('/') - encodedPath = urllib.parse.quote(cleanPath, safe='/') - endpoint = f"sites/{siteId}/drive/root:/{encodedPath}:/children" - - result = await self.apiClient.makeGraphApiCall(endpoint) - - if "error" in result: - logger.error(f"Error listing folder: {result['error']}") - return [] - - items = result.get("value", []) - - # Filter for PDF files only - pdfFiles = [] - for item in items: - name = item.get("name", "") - if name.lower().endswith('.pdf') and "file" in item: - pdfFiles.append({ - "id": item.get("id"), - "name": name, - "size": item.get("size", 0), - "webUrl": item.get("webUrl"), - "lastModifiedDateTime": item.get("lastModifiedDateTime") - }) - - logger.info(f"Found {len(pdfFiles)} PDF files in folder") - return pdfFiles - - except Exception as e: - logger.error(f"Error listing PDF files: {str(e)}") - return [] - - -async def _extractExpensesWithAi(services, fileContent: bytes, fileName: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]: - """ - Call AI service to extract expense data from PDF content. - Uses the full AI service pipeline which handles: - - Document extraction (text + images) - - Intent analysis - - Chunking for large documents - - Vision processing for images - """ - try: - import uuid - - # Ensure AI is initialized - await services.ai.ensureAiObjectsInitialized() - - # Step 1: Store file temporarily in database so AI service can access it - from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface - from modules.datamodels.datamodelChat import ChatDocument - from modules.datamodels.datamodelDocref import DocumentReferenceList - - dbInterface = getDbInterface(services.user, mandateId=services.mandateId, featureInstanceId=featureInstanceId) - - # Create file record - fileItem = dbInterface.createFile( - name=fileName, - mimeType="application/pdf", - content=fileContent - ) - - # Store file data - dbInterface.createFileData(fileItem.id, fileContent) - - logger.info(f"Stored PDF {fileName} ({len(fileContent)} bytes) with fileId: {fileItem.id}") - - # Step 2: Create ChatDocument referencing the file - documentId = str(uuid.uuid4()) - chatDocument = ChatDocument( - id=documentId, - mandateId=services.mandateId or "", - featureInstanceId=featureInstanceId or "", - messageId="", # Will be set when attached to message - fileId=fileItem.id, - fileName=fileName, - fileSize=len(fileContent), - mimeType="application/pdf" - ) - - # Step 3: Create a proper message with the document attached to the workflow - # This ensures getChatDocumentsFromDocumentList can find the document via workflow.messages - messageData = { - "id": f"msg_expense_import_{str(uuid.uuid4())[:8]}", - "documentsLabel": f"expense_pdf_{fileName}", - "role": "user", - "status": "step", - "message": f"PDF document for expense extraction: {fileName}" - } - - # Use storeMessageWithDocuments to properly create message + document and sync with workflow - createdMessage = services.chat.storeMessageWithDocuments( - services.workflow, - messageData, - [chatDocument.model_dump()] - ) - - # Update documentId to match the created document's actual ID - if createdMessage and createdMessage.documents: - documentId = createdMessage.documents[0].id - - logger.info(f"Created message {createdMessage.id} with ChatDocument {documentId} for AI processing") - - # Step 4: Create DocumentReferenceList for AI service - from modules.datamodels.datamodelDocref import DocumentItemReference - documentList = DocumentReferenceList( - references=[ - DocumentItemReference( - documentId=documentId, - fileName=fileName - ) - ] - ) - - # Step 5: Call AI with documentList - let AI service handle everything - # (extraction, intent analysis, chunking, image processing) - # Use DATA_GENERATE (same path as ai.process) which handles chunking correctly - options = AiCallOptions( - resultFormat="csv", - operationType=OperationTypeEnum.DATA_GENERATE - ) - - aiResponse = await services.ai.callAiContent( - prompt=prompt, - options=options, - documentList=documentList, - contentParts=None, # Let AI service extract from documents - outputFormat="csv", - generationIntent="extract" # Signal this is extraction, not document generation - ) - - if not aiResponse: - return {"success": False, "error": "AI returned empty response"} - - # Get CSV from rendered documents (not from content - that's the internal structure) - if not aiResponse.documents or len(aiResponse.documents) == 0: - return {"success": False, "error": "AI returned no documents"} - - # Get the CSV content from the first document - csvDocument = aiResponse.documents[0] - csvContent = csvDocument.documentData - - # documentData is bytes, decode to string - if isinstance(csvContent, bytes): - csvContent = csvContent.decode('utf-8') - - logger.info(f"Retrieved CSV content ({len(csvContent)} chars) from rendered document: {csvDocument.documentName}") - - records = _parseCsvToRecords(csvContent) - - # Return fileId so it can be used to create TrusteeDocument reference - return {"success": True, "records": records, "fileId": fileItem.id} - - except Exception as e: - logger.error(f"AI extraction error for {fileName}: {str(e)}") - return {"success": False, "error": str(e)} - - -def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]: - """Parse CSV content to list of expense records.""" - records = [] - try: - content = csvContent.strip() - - # Clean up CSV content - remove markdown code blocks if present - if content.startswith("```"): - lines = content.split('\n') - # Remove first and last line if they're code block markers - if lines[0].startswith("```"): - lines = lines[1:] - if lines and lines[-1].strip() == "```": - lines = lines[:-1] - content = '\n'.join(lines) - - reader = csv.DictReader(io.StringIO(content)) - for row in reader: - # Clean up keys (remove whitespace) - cleanedRow = {k.strip(): v.strip() if isinstance(v, str) else v for k, v in row.items()} - records.append(cleanedRow) - - logger.info(f"Parsed {len(records)} records from CSV content") - - except Exception as e: - logger.error(f"Error parsing CSV: {str(e)}") - - return records - - -def _validateAndEnrichRecords(records: List[Dict[str, Any]], sourceFileName: str) -> List[Dict[str, Any]]: - """ - Validate and enrich expense records: - 1. Calculate/correct VAT amount - 2. Complete valuta/transactionDateTime if one is missing - 3. Validate tags - """ - enrichedRecords = [] - - for record in records: - enriched = record.copy() - - # VAT calculation/validation - vatPercentage = _parseFloat(record.get("vatPercentage", 0)) - vatAmount = _parseFloat(record.get("vatAmount", 0)) - bookingAmount = _parseFloat(record.get("bookingAmount", 0)) - - if vatPercentage > 0 and bookingAmount > 0: - # Calculate expected VAT amount (VAT is included in bookingAmount) - expectedVat = bookingAmount * vatPercentage / (100 + vatPercentage) - - # If vatAmount is missing or significantly different, recalculate - if vatAmount == 0 or abs(vatAmount - expectedVat) > 0.01: - enriched["vatAmount"] = round(expectedVat, 2) - logger.debug(f"VAT amount corrected: {vatAmount} -> {enriched['vatAmount']}") - - # Valuta / transactionDateTime completion - valuta = record.get("valuta") - transactionDateTime = record.get("transactionDateTime") - - if valuta and not transactionDateTime: - try: - dt = datetime.strptime(str(valuta).strip(), "%Y-%m-%d") - enriched["transactionDateTime"] = dt.replace(hour=12).timestamp() - except: - pass - elif transactionDateTime and not valuta: - try: - ts = float(transactionDateTime) - dt = datetime.fromtimestamp(ts, UTC) - enriched["valuta"] = dt.strftime("%Y-%m-%d") - except: - pass - - # Validate tags - tags = record.get("tags", "") - if tags: - tagList = [t.strip().lower() for t in str(tags).split(",")] - validTags = [t for t in tagList if t in ALLOWED_TAGS] - enriched["tags"] = ",".join(validTags) - - # Store source file info in description - existingDesc = record.get("desc", "") - if sourceFileName and sourceFileName not in str(existingDesc): - enriched["desc"] = f"[Source: {sourceFileName}]\n{existingDesc}" - - enrichedRecords.append(enriched) - - return enrichedRecords - - -def _parseFloat(value) -> float: - """Safely parse float value.""" - try: - if value is None or value == "": - return 0.0 - return float(value) - except (ValueError, TypeError): - return 0.0 - - -def _saveToTrusteePosition( - trusteeInterface, - records: List[Dict[str, Any]], - featureInstanceId: str, - mandateId: str, - fileId: Optional[str] = None, - fileName: Optional[str] = None, - sourceLocation: Optional[str] = None -) -> int: - """ - Save validated records to TrusteePosition table. - Also creates TrusteeDocument (referencing the source file) and links positions to it. - - Args: - trusteeInterface: Trustee interface instance - records: List of expense records to save - featureInstanceId: Feature instance ID - mandateId: Mandate ID - fileId: Optional file ID from central Files table (source PDF) - fileName: Optional file name - sourceLocation: Optional source location (e.g., SharePoint path) - - Returns: - Number of positions saved - """ - savedCount = 0 - savedPositionIds = [] - - # Step 1: Create TrusteeDocument referencing the source file - documentId = None - if fileId and fileName: - try: - document = trusteeInterface.createDocument({ - "fileId": fileId, - "documentName": fileName, - "documentMimeType": "application/pdf", - "sourceType": "sharepoint", - "sourceLocation": sourceLocation - }) - if document: - documentId = document.id - logger.info(f"Created TrusteeDocument {documentId} referencing file {fileId}") - else: - logger.warning(f"Failed to create TrusteeDocument for file {fileId}") - except Exception as e: - logger.error(f"Error creating TrusteeDocument: {str(e)}") - - # Step 2: Save positions with direct documentId reference and accounting fields - for record in records: - try: - position = { - "documentId": documentId, - "valuta": record.get("valuta"), - "transactionDateTime": record.get("transactionDateTime"), - "company": record.get("company", ""), - "desc": record.get("desc", ""), - "tags": record.get("tags", ""), - "bookingCurrency": record.get("bookingCurrency", "CHF"), - "bookingAmount": _parseFloat(record.get("bookingAmount", 0)), - "originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"), - "originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)), - "vatPercentage": _parseFloat(record.get("vatPercentage", 0)), - "vatAmount": _parseFloat(record.get("vatAmount", 0)), - "debitAccountNumber": record.get("debitAccountNumber") or None, - "creditAccountNumber": record.get("creditAccountNumber") or None, - "taxCode": record.get("taxCode") or None, - "costCenter": record.get("costCenter") or None, - "bookingReference": record.get("bookingReference") or None, - "featureInstanceId": featureInstanceId, - "mandateId": mandateId - } - - result = trusteeInterface.createPosition(position) - if result: - savedCount += 1 - savedPositionIds.append(result.id) - logger.debug(f"Saved position: {position.get('company')} - {position.get('bookingAmount')}") - - except Exception as e: - logger.error(f"Failed to save position: {str(e)}") - - # Step 3: Auto-sync to accounting system if configured - if savedCount > 0 and savedPositionIds: - try: - from modules.features.trustee.accounting.accountingBridge import AccountingBridge - bridge = AccountingBridge(trusteeInterface) - configRecord = await bridge.getActiveConfig(featureInstanceId) - if configRecord: - syncResults = await bridge.pushBatchToAccounting(featureInstanceId, savedPositionIds) - syncedCount = sum(1 for r in syncResults if r.success) - logger.info(f"Auto-synced {syncedCount}/{len(savedPositionIds)} positions to accounting system") - except Exception as e: - logger.warning(f"Accounting auto-sync skipped (non-critical): {e}") - - return savedCount - - -async def _ensureFolderExists(self, siteId: str, folderPath: str) -> bool: - """Create folder if it doesn't exist.""" - try: - import urllib.parse - - # Check if folder exists - cleanPath = folderPath.strip('/') - encodedPath = urllib.parse.quote(cleanPath, safe='/') - checkEndpoint = f"sites/{siteId}/drive/root:/{encodedPath}" - - result = await self.apiClient.makeGraphApiCall(checkEndpoint) - - if "error" not in result: - return True # Folder exists - - # Create folder - need to create parent first if nested - pathParts = cleanPath.split('/') - currentPath = "" - - for part in pathParts: - parentPath = currentPath if currentPath else "root" - currentPath = f"{currentPath}/{part}" if currentPath else part - - # Check if this level exists - checkPath = urllib.parse.quote(currentPath, safe='/') - checkResult = await self.apiClient.makeGraphApiCall(f"sites/{siteId}/drive/root:/{checkPath}") - - if "error" in checkResult: - # Create this folder - if parentPath == "root": - createEndpoint = f"sites/{siteId}/drive/root/children" - else: - encodedParent = urllib.parse.quote(parentPath, safe='/') - createEndpoint = f"sites/{siteId}/drive/root:/{encodedParent}:/children" - - createData = json.dumps({ - "name": part, - "folder": {}, - "@microsoft.graph.conflictBehavior": "fail" - }).encode('utf-8') - - createResult = await self.apiClient.makeGraphApiCall(createEndpoint, method="POST", data=createData) - - if "error" in createResult: - logger.warning(f"Failed to create folder {part}: {createResult['error']}") - return False - - logger.info(f"Created folder: {currentPath}") - - return True - - except Exception as e: - logger.error(f"Failed to ensure folder exists: {str(e)}") - return False - - -async def _moveToProcessedFolder(self, siteId: str, sourceFolderPath: str, sourceFileName: str, destFileName: str) -> bool: - """Move processed PDF to 'processed' subfolder.""" - try: - # Build processed folder path - cleanSource = sourceFolderPath.strip('/') - processedFolder = f"{cleanSource}/processed" if cleanSource else "processed" - - # Ensure processed folder exists - await _ensureFolderExists(self, siteId, processedFolder) - - # Copy file to new location - await self.services.sharepoint.copyFileAsync( - siteId=siteId, - sourceFolder=cleanSource if cleanSource else "/", - sourceFile=sourceFileName, - destFolder=processedFolder, - destFile=destFileName - ) - - # Delete original file - await _deleteFile(self, siteId, sourceFolderPath, sourceFileName) - - logger.info(f"Moved {sourceFileName} to processed/{destFileName}") - return True - - except Exception as e: - logger.error(f"Failed to move file to processed: {str(e)}") - return False - - -async def _moveToErrorFolder(self, siteId: str, sourceFolderPath: str, sourceFileName: str) -> bool: - """Move failed PDF to 'error' subfolder (filename unchanged).""" - try: - # Build error folder path - cleanSource = sourceFolderPath.strip('/') - errorFolder = f"{cleanSource}/error" if cleanSource else "error" - - # Ensure error folder exists - await _ensureFolderExists(self, siteId, errorFolder) - - # Copy file to error folder (keep original name) - await self.services.sharepoint.copyFileAsync( - siteId=siteId, - sourceFolder=cleanSource if cleanSource else "/", - sourceFile=sourceFileName, - destFolder=errorFolder, - destFile=sourceFileName # Same filename - ) - - # Delete original file - await _deleteFile(self, siteId, sourceFolderPath, sourceFileName) - - logger.info(f"Moved {sourceFileName} to error/") - return True - - except Exception as e: - logger.error(f"Failed to move file to error folder: {str(e)}") - return False - - -async def _deleteFile(self, siteId: str, folderPath: str, fileName: str) -> bool: - """Delete file from SharePoint.""" - try: - import urllib.parse - - cleanPath = folderPath.strip('/') - filePath = f"{cleanPath}/{fileName}" if cleanPath else fileName - encodedPath = urllib.parse.quote(filePath, safe='/') - - endpoint = f"sites/{siteId}/drive/root:/{encodedPath}" - - # Get file ID first - fileInfo = await self.apiClient.makeGraphApiCall(endpoint) - if "error" in fileInfo: - logger.warning(f"File not found for deletion: {filePath}") - return False - - fileId = fileInfo.get("id") - if not fileId: - return False - - # Delete by ID using apiClient - deleteEndpoint = f"sites/{siteId}/drive/items/{fileId}" - result = await self.apiClient.makeGraphApiCall(deleteEndpoint, method="DELETE") - - if "error" in result: - logger.warning(f"Delete failed: {result['error']}") - return False - - logger.debug(f"Deleted file: {filePath}") - return True - - except Exception as e: - logger.error(f"Failed to delete file: {str(e)}") - return False diff --git a/modules/workflows/methods/methodSharepoint/methodSharepoint.py b/modules/workflows/methods/methodSharepoint/methodSharepoint.py index 9ca4c3e5..d187e438 100644 --- a/modules/workflows/methods/methodSharepoint/methodSharepoint.py +++ b/modules/workflows/methods/methodSharepoint/methodSharepoint.py @@ -28,8 +28,6 @@ from .actions.findSiteByUrl import findSiteByUrl from .actions.downloadFileByPath import downloadFileByPath from .actions.copyFile import copyFile from .actions.uploadFile import uploadFile -from .actions.getExpensesFromPdf import getExpensesFromPdf - logger = logging.getLogger(__name__) class MethodSharepoint(MethodBase): @@ -378,42 +376,6 @@ class MethodSharepoint(MethodBase): ) }, execute=uploadFile.__get__(self, self.__class__) - ), - "getExpensesFromPdf": WorkflowActionDefinition( - actionId="sharepoint.getExpensesFromPdf", - description="Extract expenses from PDF documents in SharePoint folder and save to TrusteePosition", - dynamicMode=False, # Not for dynamic workflow - parameters={ - "connectionReference": WorkflowActionParameter( - name="connectionReference", - type="str", - frontendType=FrontendType.USER_CONNECTION, - required=True, - description="Microsoft connection label for SharePoint access" - ), - "sharepointFolder": WorkflowActionParameter( - name="sharepointFolder", - type="str", - frontendType=FrontendType.TEXT, - required=True, - description="SharePoint folder path containing PDF expense documents (e.g., /sites/MySite/Documents/Expenses)" - ), - "featureInstanceId": WorkflowActionParameter( - name="featureInstanceId", - type="str", - frontendType=FrontendType.TEXT, - required=True, - description="Feature Instance ID for the Trustee feature where positions will be stored" - ), - "prompt": WorkflowActionParameter( - name="prompt", - type="str", - frontendType=FrontendType.TEXTAREA, - required=True, - description="AI prompt for extracting expense data from PDF content" - ) - }, - execute=getExpensesFromPdf.__get__(self, self.__class__) ) } @@ -430,5 +392,4 @@ class MethodSharepoint(MethodBase): self.downloadFileByPath = downloadFileByPath.__get__(self, self.__class__) self.copyFile = copyFile.__get__(self, self.__class__) self.uploadFile = uploadFile.__get__(self, self.__class__) - self.getExpensesFromPdf = getExpensesFromPdf.__get__(self, self.__class__) diff --git a/modules/workflows/methods/methodTrustee/__init__.py b/modules/workflows/methods/methodTrustee/__init__.py new file mode 100644 index 00000000..fa7acc95 --- /dev/null +++ b/modules/workflows/methods/methodTrustee/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Trustee document and expense workflow method (extract, process, sync to accounting).""" + +from .methodTrustee import MethodTrustee + +__all__ = ["MethodTrustee"] diff --git a/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py b/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py new file mode 100644 index 00000000..fe2379bd --- /dev/null +++ b/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py @@ -0,0 +1,304 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Extract document type and structured data from files (PDF, JPG). +Input: fileIds (list) OR connectionReference + sharepointFolder. +Output: ActionResult with one ActionDocument per file: { documentType, extractedData, fileId, fileName }, resultLabel. +""" + +import asyncio +import json +import logging +import uuid +import csv +import io +from datetime import datetime, timezone +from typing import Dict, Any, List, Optional + +from modules.datamodels.datamodelChat import ActionResult, ActionDocument, ChatDocument +from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference +from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum + +logger = logging.getLogger(__name__) + +ALLOWED_EXTENSIONS = (".pdf", ".jpg", ".jpeg") +MAX_FILES = 50 + + +def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]: + """Parse CSV content to list of expense records.""" + records = [] + try: + content = (csvContent or "").strip() + if content.startswith("```"): + lines = content.split("\n") + if lines and lines[0].startswith("```"): + lines = lines[1:] + if lines and lines[-1].strip() == "```": + lines = lines[:-1] + content = "\n".join(lines) + reader = csv.DictReader(io.StringIO(content)) + for row in reader: + cleaned = {k.strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()} + records.append(cleaned) + except Exception as e: + logger.warning(f"Parse CSV: {e}") + return records + + +async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str, mimeType: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]: + """Run AI extraction on one file; return { documentType, extractedData (records), fileId, fileName }.""" + await self.services.ai.ensureAiObjectsInitialized() + from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference + + docList = DocumentReferenceList( + references=[DocumentItemReference(documentId=chatDocumentId, fileName=fileName)] + ) + # Prefer JSON for documentType + records in one response; fallback to CSV + options = AiCallOptions(resultFormat="json", operationType=OperationTypeEnum.DATA_GENERATE) + try: + aiResponse = await self.services.ai.callAiContent( + prompt=prompt or "Extract document type (one of: INVOICE, EXPENSE_RECEIPT, BANK_DOCUMENT, CONTRACT, UNKNOWN) and expense/position records. Return JSON: {\"documentType\": \"...\", \"records\": [{...}]}.", + options=options, + documentList=docList, + contentParts=None, + outputFormat="json", + generationIntent="extract", + ) + except Exception: + options = AiCallOptions(resultFormat="csv", operationType=OperationTypeEnum.DATA_GENERATE) + aiResponse = await self.services.ai.callAiContent( + prompt=prompt or "Extract expense data from this document. Return CSV with columns: company, desc, valuta, bookingAmount, bookingCurrency, vatPercentage, vatAmount, tags.", + options=options, + documentList=docList, + contentParts=None, + outputFormat="csv", + generationIntent="extract", + ) + + if not aiResponse or not aiResponse.documents: + return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName} + + doc = aiResponse.documents[0] + raw = doc.documentData + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + + documentType = "UNKNOWN" + records = [] + + # Try JSON first + try: + if raw.strip().startswith("{"): + data = json.loads(raw) + documentType = (data.get("documentType") or "UNKNOWN").upper().replace(" ", "_") + records = data.get("records") or data.get("extractedData") or [] + except Exception: + pass + + # Fallback: CSV + if not records and raw: + records = _parseCsvToRecords(raw) + if records and not documentType or documentType == "UNKNOWN": + documentType = "EXPENSE_RECEIPT" + + return {"documentType": documentType, "extractedData": records, "fileId": fileId, "fileName": fileName} # fileId from caller for result + + +async def _extractOne( + self, + f: Dict[str, Any], + fileIdToChatDocId: Dict[str, str], + prompt: str, + featureInstanceId: str, +) -> ActionDocument: + """Run extraction for one file; returns success or error ActionDocument (never raises).""" + chatDocId = fileIdToChatDocId.get(f["fileId"]) + if not chatDocId: + return ActionDocument( + documentName=(f.get("fileName") or "error") + ".json", + documentData=json.dumps({ + "documentType": "UNKNOWN", + "extractedData": [], + "fileId": f["fileId"], + "fileName": f.get("fileName"), + "error": "No ChatDocument id for file", + }), + mimeType="application/json", + ) + try: + out = await _extractWithAi( + self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], prompt, featureInstanceId + ) + return ActionDocument( + documentName=f.get("fileName", "extract") + ".json", + documentData=json.dumps(out), + mimeType="application/json", + ) + except Exception as e: + logger.exception(f"Extract failed for {f.get('fileName')}") + return ActionDocument( + documentName=(f.get("fileName") or "error") + ".json", + documentData=json.dumps({ + "documentType": "UNKNOWN", + "extractedData": [], + "fileId": f["fileId"], + "fileName": f.get("fileName"), + "error": str(e), + }), + mimeType="application/json", + ) + + +async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult: + """ + Extract document type and data from files. + Either fileIds (list of file IDs already in DB) or connectionReference + sharepointFolder (list PDF/JPG, download, store in DB). + Returns one ActionDocument per file with documentData = JSON { documentType, extractedData, fileId, fileName }. + """ + fileIds = parameters.get("fileIds") or [] + connectionReference = parameters.get("connectionReference") + sharepointFolder = parameters.get("sharepointFolder") + featureInstanceId = parameters.get("featureInstanceId") or getattr(self.services, "featureInstanceId", None) + prompt = parameters.get("prompt") or "" + + if not featureInstanceId: + return ActionResult.isFailure(error="featureInstanceId is required") + + filesToProcess = [] # list of { fileId, fileName, mimeType } + sharepointMoveInfo: List[Optional[Dict[str, Any]]] = [] # one entry per file; None if not from SharePoint + + if fileIds: + from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface + db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId) + for fid in (fileIds if isinstance(fileIds, list) else [fileIds]): + if not fid: + continue + rec = db.getFile(fid) if hasattr(db, "getFile") else None + if rec: + fileId = rec.id if hasattr(rec, "id") else rec.get("id", fid) + fileName = getattr(rec, "fileName", None) or rec.get("fileName", rec.get("name", "document")) + mimeType = getattr(rec, "mimeType", None) or rec.get("mimeType", "application/octet-stream") + filesToProcess.append({"fileId": fileId, "fileName": fileName, "mimeType": mimeType}) + else: + filesToProcess.append({"fileId": fid, "fileName": "document", "mimeType": "application/octet-stream"}) + sharepointMoveInfo.append(None) + elif connectionReference and sharepointFolder: + userConn = self.services.chat.getUserConnectionFromConnectionReference(connectionReference) + if not userConn: + return ActionResult.isFailure(error="No Microsoft connection for connectionReference") + if not self.services.sharepoint.setAccessTokenFromConnection(userConn): + return ActionResult.isFailure(error="Failed to set SharePoint access token") + sites = await self.services.sharepoint.resolveSitesFromPathQuery(sharepointFolder) + if not sites: + return ActionResult.isFailure(error="No SharePoint site found for path") + siteId = sites[0].get("id") + if not siteId: + return ActionResult.isFailure(error="SharePoint site has no id") + parsed = self.services.sharepoint.extractSiteFromStandardPath(sharepointFolder) + folderPath = (parsed.get("innerPath") or "").strip() if parsed else "" + items = await self.services.sharepoint.listFolderContents(siteId, folderPath) or [] + from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface + db = getDbInterface(self.services.user, mandateId=self.services.mandateId, featureInstanceId=featureInstanceId) + for item in items[:MAX_FILES]: + if item.get("type") != "file": + continue + name = (item.get("name") or "").lower() + if not any(name.endswith(ext) for ext in ALLOWED_EXTENSIONS): + continue + content = await self.services.sharepoint.downloadFile(siteId, item.get("id")) + if not content: + continue + mime = "application/pdf" if name.endswith(".pdf") else "image/jpeg" + fileItem = db.createFile(name=item.get("name", "file"), mimeType=mime, content=content) + if fileItem: + db.createFileData(fileItem.id, content) + filesToProcess.append({"fileId": fileItem.id, "fileName": item.get("name", "file"), "mimeType": mime}) + sharepointMoveInfo.append({ + "siteId": siteId, + "folderPath": folderPath, + "fileName": item.get("name", "file"), + "itemId": item.get("id"), + }) + else: + return ActionResult.isFailure(error="Provide fileIds or connectionReference + sharepointFolder") + + if not filesToProcess: + return ActionResult.isSuccess(documents=[]) + + # Attach all files as ChatDocuments to the workflow so AI can resolve them + chatDocDumps = [] + for f in filesToProcess: + chatDoc = ChatDocument( + id=str(uuid.uuid4()), + mandateId=self.services.mandateId or "", + featureInstanceId=featureInstanceId or "", + messageId="", + fileId=f["fileId"], + fileName=f["fileName"], + fileSize=0, + mimeType=f["mimeType"], + ) + chatDocDumps.append(chatDoc.model_dump()) + messageData = { + "id": f"msg_extract_{uuid.uuid4().hex[:12]}", + "documentsLabel": "extract_files", + "role": "user", + "status": "step", + "message": f"Extract from {len(filesToProcess)} file(s)", + } + createdMessage = self.services.chat.storeMessageWithDocuments( + self.services.workflow, + messageData, + chatDocDumps, + ) + if not createdMessage or not createdMessage.documents: + return ActionResult.isFailure(error="Failed to attach documents to workflow") + # Map fileId -> ChatDocument id for AI reference + fileIdToChatDocId = {} + for i, f in enumerate(filesToProcess): + if i < len(createdMessage.documents): + fileIdToChatDocId[f["fileId"]] = createdMessage.documents[i].id + + # Parallel extraction (all files at once) + tasks = [ + _extractOne(self, f, fileIdToChatDocId, prompt, featureInstanceId) + for f in filesToProcess + ] + resultDocuments = list(await asyncio.gather(*tasks)) + + # Move SharePoint files to processed/ or error/ (parallel) + if sharepointMoveInfo and len(sharepointMoveInfo) == len(resultDocuments): + sharepoint = self.services.sharepoint + + async def _moveOneFile(moveInfo: Dict[str, Any], resultDoc: ActionDocument) -> None: + try: + raw = resultDoc.documentData + data = json.loads(raw) if isinstance(raw, str) else raw + hasError = "error" in data or not data.get("extractedData") + destSub = "error" if hasError else "processed" + folderPath = (moveInfo.get("folderPath") or "").strip().rstrip("/") + destFolder = f"{folderPath}/{destSub}".strip("/") if folderPath else destSub + sourceFolder = folderPath or "" + fileName = moveInfo.get("fileName") or "file" + destFile = ( + f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{fileName}" + if not hasError + else fileName + ) + await sharepoint.copyFileAsync( + moveInfo["siteId"], sourceFolder, fileName, destFolder, destFile + ) + await sharepoint.deleteFile(moveInfo["siteId"], moveInfo["itemId"]) + except Exception as e: + logger.warning(f"Move SharePoint file failed for {moveInfo.get('fileName', '?')}: {e}") + + moveTasks = [ + _moveOneFile(sharepointMoveInfo[i], resultDocuments[i]) + for i in range(len(sharepointMoveInfo)) + if sharepointMoveInfo[i] is not None + ] + if moveTasks: + await asyncio.gather(*moveTasks, return_exceptions=True) + + return ActionResult.isSuccess(documents=resultDocuments) diff --git a/modules/workflows/methods/methodTrustee/actions/processDocuments.py b/modules/workflows/methods/methodTrustee/actions/processDocuments.py new file mode 100644 index 00000000..f5c2a868 --- /dev/null +++ b/modules/workflows/methods/methodTrustee/actions/processDocuments.py @@ -0,0 +1,134 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Process extracted documents: create TrusteeDocument + TrusteePosition from extraction JSON. +Input: documentList (reference to extractFromFiles result). +Each document is JSON with documentType, extractedData, fileId, fileName. +extractedData is a list of expense/position records. +Output: one ActionDocument with JSON { positionIds, documentIds } for chaining to syncToAccounting. +""" + +import json +import logging +from typing import Dict, Any, List, Optional + +from modules.datamodels.datamodelChat import ActionResult, ActionDocument +from modules.datamodels.datamodelDocref import DocumentReferenceList + +logger = logging.getLogger(__name__) + + +def _parseFloat(value) -> float: + try: + if value is None or value == "": + return 0.0 + return float(value) + except (ValueError, TypeError): + return 0.0 + + +def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], featureInstanceId: str, mandateId: str) -> Dict[str, Any]: + """Map extraction record to TrusteePosition payload.""" + return { + "documentId": documentId, + "valuta": record.get("valuta"), + "transactionDateTime": record.get("transactionDateTime"), + "company": record.get("company", ""), + "desc": record.get("desc", ""), + "tags": record.get("tags", ""), + "bookingCurrency": record.get("bookingCurrency", "CHF"), + "bookingAmount": _parseFloat(record.get("bookingAmount", 0)), + "originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"), + "originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)), + "vatPercentage": _parseFloat(record.get("vatPercentage", 0)), + "vatAmount": _parseFloat(record.get("vatAmount", 0)), + "debitAccountNumber": record.get("debitAccountNumber") or None, + "creditAccountNumber": record.get("creditAccountNumber") or None, + "taxCode": record.get("taxCode") or None, + "costCenter": record.get("costCenter") or None, + "bookingReference": record.get("bookingReference") or None, + "featureInstanceId": featureInstanceId, + "mandateId": mandateId, + } + + +async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult: + """ + Resolve documentList to ChatDocuments, load extraction JSON per document, + create TrusteeDocument (with documentType) + TrusteePosition(s), return one JSON document with positionIds/documentIds. + """ + documentListParam = parameters.get("documentList") + featureInstanceId = parameters.get("featureInstanceId") or (getattr(self.services, "featureInstanceId", None)) + + if not documentListParam: + return ActionResult.isFailure(error="documentList is required (reference to extractFromFiles result)") + if not featureInstanceId: + return ActionResult.isFailure(error="featureInstanceId is required") + + try: + docList = DocumentReferenceList.from_string_list( + documentListParam if isinstance(documentListParam, list) else [documentListParam] + ) + chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList) + if not chatDocuments: + return ActionResult.isFailure(error="No documents found for documentList") + + from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface + + trusteeInterface = getTrusteeInterface( + self.services.user, + mandateId=self.services.mandateId, + featureInstanceId=featureInstanceId + ) + + allPositionIds = [] + allDocumentIds = [] + + for chatDoc in chatDocuments: + rawBytes = self.services.chat.getFileData(chatDoc.fileId) + if not rawBytes: + logger.warning(f"Could not load file {chatDoc.fileId}, skipping") + continue + content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes + data = json.loads(content) if isinstance(content, str) else content + documentType = data.get("documentType") + extractedData = data.get("extractedData") + fileId = data.get("fileId") or chatDoc.fileId + fileName = data.get("fileName") or chatDoc.fileName or "document" + + records = extractedData if isinstance(extractedData, list) else [extractedData] if extractedData else [] + if not records: + continue + + docPayload = { + "fileId": fileId, + "documentName": fileName, + "documentMimeType": chatDoc.mimeType or "application/octet-stream", + "sourceType": "workflow", + "documentType": documentType, + } + trusteeDoc = trusteeInterface.createDocument(docPayload) + if not trusteeDoc: + logger.warning(f"Failed to create TrusteeDocument for {fileName}") + continue + allDocumentIds.append(trusteeDoc.id) + + for record in records: + posPayload = _recordToPosition(record, trusteeDoc.id, featureInstanceId, self.services.mandateId) + pos = trusteeInterface.createPosition(posPayload) + if pos: + allPositionIds.append(pos.id) + + payload = {"positionIds": allPositionIds, "documentIds": allDocumentIds} + return ActionResult.isSuccess( + documents=[ + ActionDocument( + documentName="process_documents_result.json", + documentData=json.dumps(payload), + mimeType="application/json", + ) + ] + ) + except Exception as e: + logger.exception("processDocuments failed") + return ActionResult.isFailure(error=str(e)) diff --git a/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py b/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py new file mode 100644 index 00000000..4633f32e --- /dev/null +++ b/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py @@ -0,0 +1,72 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Sync trustee positions to accounting (Buha). +Input: featureInstanceId, documentList (reference to processDocuments result message). +Reads positionIds from the document and calls AccountingBridge.pushBatchToAccounting. +""" + +import json +import logging +from typing import Dict, Any, List + +from modules.datamodels.datamodelChat import ActionResult, ActionDocument +from modules.datamodels.datamodelDocref import DocumentReferenceList + +logger = logging.getLogger(__name__) + + +async def syncToAccounting(self, parameters: Dict[str, Any]) -> ActionResult: + """ + Push trustee positions to the configured accounting system. + documentList must reference the message from processDocuments (one document with JSON { positionIds, documentIds }). + """ + featureInstanceId = parameters.get("featureInstanceId") or (self.services.featureInstanceId if hasattr(self.services, "featureInstanceId") else None) + documentListParam = parameters.get("documentList") + + if not featureInstanceId: + return ActionResult.isFailure(error="featureInstanceId is required") + if not documentListParam: + return ActionResult.isFailure(error="documentList is required (reference to processDocuments result)") + + try: + docList = DocumentReferenceList.from_string_list( + documentListParam if isinstance(documentListParam, list) else [documentListParam] + ) + chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList) + if not chatDocuments: + return ActionResult.isFailure(error="No document found for documentList; ensure processDocuments ran before this action") + + # Expect one document (JSON with positionIds, documentIds) + doc = chatDocuments[0] + rawBytes = self.services.chat.getFileData(doc.fileId) + if not rawBytes: + return ActionResult.isFailure(error=f"Could not load document content for fileId={doc.fileId}") + + content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes + data = json.loads(content) if isinstance(content, str) else content + positionIds = data.get("positionIds") or [] + if not positionIds: + return ActionResult.isSuccess(documents=[ + ActionDocument(documentName="sync_result", documentData=json.dumps({"pushed": 0, "message": "No positionIds in document"}), mimeType="application/json") + ]) + + from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface + from modules.features.trustee.accounting.accountingBridge import AccountingBridge + + trusteeInterface = getTrusteeInterface( + self.services.user, + mandateId=self.services.mandateId, + featureInstanceId=featureInstanceId + ) + bridge = AccountingBridge(trusteeInterface) + results = await bridge.pushBatchToAccounting(featureInstanceId, positionIds) + successCount = sum(1 for r in results if r.success) + summary = {"pushed": successCount, "total": len(positionIds), "results": [{"positionId": pid, "success": r.success, "error": getattr(r, "errorMessage", None)} for pid, r in zip(positionIds, results)]} + + return ActionResult.isSuccess(documents=[ + ActionDocument(documentName="sync_result", documentData=json.dumps(summary), mimeType="application/json") + ]) + except Exception as e: + logger.exception("syncToAccounting failed") + return ActionResult.isFailure(error=str(e)) diff --git a/modules/workflows/methods/methodTrustee/methodTrustee.py b/modules/workflows/methods/methodTrustee/methodTrustee.py new file mode 100644 index 00000000..fefeaa52 --- /dev/null +++ b/modules/workflows/methods/methodTrustee/methodTrustee.py @@ -0,0 +1,120 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Trustee document workflow method: extract from files, process to positions, sync to accounting. +""" + +import logging +from modules.workflows.methods.methodBase import MethodBase +from modules.datamodels.datamodelWorkflowActions import WorkflowActionDefinition, WorkflowActionParameter +from modules.shared.frontendTypes import FrontendType + +from .actions.extractFromFiles import extractFromFiles +from .actions.processDocuments import processDocuments +from .actions.syncToAccounting import syncToAccounting + +logger = logging.getLogger(__name__) + + +class MethodTrustee(MethodBase): + """Trustee document and expense workflow: extract, process, sync to accounting.""" + + def __init__(self, services): + super().__init__(services) + self.name = "trustee" + self.description = "Trustee document extraction, processing and accounting sync" + + self._actions = { + "extractFromFiles": WorkflowActionDefinition( + actionId="trustee.extractFromFiles", + description="Extract document type and data from PDF/JPG (fileIds or SharePoint folder)", + dynamicMode=False, + parameters={ + "fileIds": WorkflowActionParameter( + name="fileIds", + type="list", + frontendType=FrontendType.JSON, + required=False, + description="List of file IDs already in DB (alternative to connectionReference + sharepointFolder)", + ), + "connectionReference": WorkflowActionParameter( + name="connectionReference", + type="str", + frontendType=FrontendType.USER_CONNECTION, + required=False, + description="Microsoft connection for SharePoint (use with sharepointFolder)", + ), + "sharepointFolder": WorkflowActionParameter( + name="sharepointFolder", + type="str", + frontendType=FrontendType.TEXT, + required=False, + description="SharePoint folder path (e.g. /sites/MySite/Documents/Expenses)", + ), + "featureInstanceId": WorkflowActionParameter( + name="featureInstanceId", + type="str", + frontendType=FrontendType.TEXT, + required=True, + description="Trustee feature instance ID", + ), + "prompt": WorkflowActionParameter( + name="prompt", + type="str", + frontendType=FrontendType.TEXTAREA, + required=False, + description="AI prompt for extraction (optional)", + ), + }, + execute=extractFromFiles.__get__(self, self.__class__), + ), + "processDocuments": WorkflowActionDefinition( + actionId="trustee.processDocuments", + description="Create TrusteeDocument + TrusteePosition from extraction result (documentList from previous action)", + dynamicMode=False, + parameters={ + "documentList": WorkflowActionParameter( + name="documentList", + type="list", + frontendType=FrontendType.DOCUMENT_REFERENCE, + required=True, + description="Reference to extractFromFiles result (e.g. docList:messageId:extract_result)", + ), + "featureInstanceId": WorkflowActionParameter( + name="featureInstanceId", + type="str", + frontendType=FrontendType.TEXT, + required=True, + description="Trustee feature instance ID", + ), + }, + execute=processDocuments.__get__(self, self.__class__), + ), + "syncToAccounting": WorkflowActionDefinition( + actionId="trustee.syncToAccounting", + description="Push trustee positions to accounting (documentList = processDocuments result)", + dynamicMode=False, + parameters={ + "documentList": WorkflowActionParameter( + name="documentList", + type="list", + frontendType=FrontendType.DOCUMENT_REFERENCE, + required=True, + description="Reference to processDocuments result message", + ), + "featureInstanceId": WorkflowActionParameter( + name="featureInstanceId", + type="str", + frontendType=FrontendType.TEXT, + required=True, + description="Trustee feature instance ID", + ), + }, + execute=syncToAccounting.__get__(self, self.__class__), + ), + } + self._validateActions() + + self.extractFromFiles = extractFromFiles.__get__(self, self.__class__) + self.processDocuments = processDocuments.__get__(self, self.__class__) + self.syncToAccounting = syncToAccounting.__get__(self, self.__class__) diff --git a/modules/workflows/processing/modes/modeAutomation.py b/modules/workflows/processing/modes/modeAutomation.py index e3131939..4e3c7853 100644 --- a/modules/workflows/processing/modes/modeAutomation.py +++ b/modules/workflows/processing/modes/modeAutomation.py @@ -210,9 +210,17 @@ class AutomationMode(BaseMode): logger.info(f"Task {taskIndex} has {totalActions} actions to execute") - # Execute all actions sequentially + # Execute all actions sequentially; after each, persist result and inject documentList for next actionResults = [] + lastPersistedMessageId = None + lastPersistedResultLabel = None for actionIdx, action in enumerate(actions): + # Inject documentList from previous action's persisted message (chaining) + if actionIdx > 0 and lastPersistedMessageId and lastPersistedResultLabel: + if not isinstance(action.execParameters, dict): + action.execParameters = {} + action.execParameters["documentList"] = [f"docList:{lastPersistedMessageId}:{lastPersistedResultLabel}"] + # Check workflow status before each action checkWorkflowStopped(self.services) @@ -252,6 +260,34 @@ class AutomationMode(BaseMode): if result.success: logger.info(f"Action {actionNumber} completed successfully") + # Persist this action's result so next action can reference it via documentList + if getattr(self, "processor", None) and result.documents: + try: + from modules.datamodels.datamodelWorkflow import TaskResult as WorkflowTaskResult + resultLabel = action.execResultLabel or f"action_{actionNumber}_result" + actionResultWithLabel = ActionResult( + success=result.success, + documents=result.documents, + error=result.error, + resultLabel=resultLabel, + ) + syntheticTaskResult = WorkflowTaskResult( + taskId=taskStep.id, + actionResult=actionResultWithLabel, + ) + chatMessage = await self.processor.persistTaskResult( + syntheticTaskResult, workflow, context + ) + if chatMessage and getattr(chatMessage, "id", None): + lastPersistedMessageId = chatMessage.id + lastPersistedResultLabel = getattr( + chatMessage, "documentsLabel", None + ) or resultLabel + logger.info( + f"Persisted action {actionNumber} result: docList:{lastPersistedMessageId}:{lastPersistedResultLabel}" + ) + except Exception as e: + logger.warning(f"Per-action persist failed (chaining may break): {e}") else: logger.warning(f"Action {actionNumber} failed: {result.error}") diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index a78a2270..3547008a 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -28,6 +28,7 @@ class WorkflowProcessor: def __init__(self, services): self.services = services self.mode = self._createMode(services.workflow.workflowMode) + self.mode.processor = self # So mode can call persistTaskResult for per-action chaining self.workflow = services.workflow self.workflowExecOperationId = None # Will be set by workflowManager for task hierarchy