From f35a90e4282d613540da26d71a921adc6576a0b3 Mon Sep 17 00:00:00 2001 From: patrick-motsch Date: Sun, 22 Feb 2026 22:34:07 +0100 Subject: [PATCH] fixes sync and ai tree --- .../trustee/accounting/accountingBridge.py | 81 ++++-- .../connectors/accountingConnectorRma.py | 33 ++- .../trustee/datamodelFeatureTrustee.py | 4 + .../features/trustee/routeFeatureTrustee.py | 35 ++- modules/services/serviceAi/mainServiceAi.py | 102 ++++---- .../methodTrustee/actions/extractFromFiles.py | 232 +++++++++++------- .../methodTrustee/actions/processDocuments.py | 24 +- 7 files changed, 351 insertions(+), 160 deletions(-) diff --git a/modules/features/trustee/accounting/accountingBridge.py b/modules/features/trustee/accounting/accountingBridge.py index fc79159e..b91cd83e 100644 --- a/modules/features/trustee/accounting/accountingBridge.py +++ b/modules/features/trustee/accounting/accountingBridge.py @@ -5,6 +5,7 @@ Encapsulates: config loading -> connector resolution -> duplicate check -> push -> sync record. """ +import json import logging import time from typing import List, Dict, Any, Optional @@ -109,19 +110,26 @@ class AccountingBridge: lines=lines, ) - async def pushPositionToAccounting(self, featureInstanceId: str, positionId: str) -> SyncResult: + async def pushPositionToAccounting( + self, + featureInstanceId: str, + positionId: str, + _resolvedConnector=None, + _resolvedPlainConfig=None, + _resolvedConfigRecord=None, + ) -> SyncResult: """Push a single position to the configured accounting system. - 1. Load config and connector - 2. Load position data - 3. Check for existing successful sync (duplicate guard) - 4. Build AccountingBooking - 5. Push via connector - 6. Create TrusteeAccountingSync record + Optional _resolved* params allow pushBatchToAccounting to pass a pre-resolved + connector/config so we don't decrypt per position (avoids rate-limit). """ from modules.features.trustee.datamodelFeatureTrustee import TrusteePosition, TrusteeAccountingSync - connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId) + connector = _resolvedConnector + plainConfig = _resolvedPlainConfig + configRecord = _resolvedConfigRecord + if not connector or not plainConfig: + connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId) if not connector or not plainConfig: return SyncResult(success=False, errorMessage="No active accounting configuration found") @@ -269,19 +277,62 @@ class AccountingBridge: return result async def pushBatchToAccounting(self, featureInstanceId: str, positionIds: List[str]) -> List[SyncResult]: - """Push multiple positions sequentially.""" + """Push multiple positions sequentially. Resolves connector/config once to avoid decrypt rate-limit.""" + connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId) + if not connector or not plainConfig: + return [SyncResult(success=False, errorMessage="No active accounting configuration found") for _ in positionIds] results = [] for positionId in positionIds: - result = await self.pushPositionToAccounting(featureInstanceId, positionId) + result = await self.pushPositionToAccounting( + featureInstanceId, positionId, + _resolvedConnector=connector, _resolvedPlainConfig=plainConfig, _resolvedConfigRecord=configRecord, + ) results.append(result) return results - async def getChartOfAccounts(self, featureInstanceId: str, accountType: Optional[str] = None) -> List[AccountingChart]: - """Load the chart of accounts from the configured external system. Optional filter by accountType.""" - connector, plainConfig, _ = await self._resolveConnectorAndConfig(featureInstanceId) - if not connector or not plainConfig: + async def refreshChartOfAccounts(self, featureInstanceId: str) -> List[AccountingChart]: + """Fetch the full chart of accounts from the external system and cache it locally on TrusteeAccountingConfig.""" + from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig + + connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId) + if not connector or not plainConfig or not configRecord: + logger.warning("refreshChartOfAccounts: no connector/config — nothing to cache") return [] - charts = await connector.getChartOfAccounts(plainConfig, accountType=accountType) + + charts = await connector.getChartOfAccounts(plainConfig) + serialised = json.dumps([{"accountNumber": c.accountNumber, "label": c.label, "accountType": c.accountType or ""} for c in charts], ensure_ascii=False) + self._trusteeInterface.db.recordModify(TrusteeAccountingConfig, configRecord["id"], { + "cachedChartOfAccounts": serialised, + "chartCachedAt": time.time(), + }) + logger.info(f"Cached {len(charts)} chart-of-accounts entries for instance {featureInstanceId}") + return charts + + def _readCachedCharts(self, configRecord: Dict[str, Any]) -> List[AccountingChart]: + """Deserialise the cached chart-of-accounts JSON from a config record. Returns [] on any error.""" + raw = configRecord.get("cachedChartOfAccounts") + if not raw: + return [] + try: + items = json.loads(raw) if isinstance(raw, str) else raw + return [AccountingChart(accountNumber=i["accountNumber"], label=i["label"], accountType=i.get("accountType", "")) for i in items] + except Exception as e: + logger.debug("Could not deserialise cached chart: %s", e) + return [] + + async def getChartOfAccounts(self, featureInstanceId: str, accountType: Optional[str] = None) -> List[AccountingChart]: + """Return chart of accounts — from local cache if available, otherwise fetch externally and cache.""" + configRecord = await self.getActiveConfig(featureInstanceId) + if not configRecord: + return [] + + charts = self._readCachedCharts(configRecord) + if charts: + logger.debug(f"Using cached chart of accounts ({len(charts)} entries) for instance {featureInstanceId}") + else: + logger.info(f"No cached chart — fetching live for instance {featureInstanceId}") + charts = await self.refreshChartOfAccounts(featureInstanceId) + if accountType: charts = [c for c in charts if c.accountType == accountType] return charts diff --git a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py index 9d9fbf2f..b836b03c 100644 --- a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py +++ b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py @@ -87,26 +87,47 @@ class AccountingConnectorRma(BaseAccountingConnector): except Exception as e: return SyncResult(success=False, errorMessage=str(e)) + def _rmaLinkToAccountType(self, link: str) -> str: + """Map RMA chart 'link' (e.g. AP_amount, AR_amount, AR_paid:AP_paid) to our accountType.""" + if not link: + return "" + linkUpper = link.upper() + if "AP_AMOUNT" in linkUpper: + return "expense" + if "AR_AMOUNT" in linkUpper: + return "revenue" + if "AR_PAID" in linkUpper or "AP_PAID" in linkUpper: + return "asset" + if "AR_TAX" in linkUpper or "AP_TAX" in linkUpper: + return "liability" + if linkUpper in ("AR", "AP"): + return "asset" + return link + async def getChartOfAccounts(self, config: Dict[str, Any], accountType: Optional[str] = None) -> List[AccountingChart]: + """RMA API 'type' filter expects RMA values (AP_amount, AR_amount, etc.), not 'expense'. Fetch full chart and filter client-side.""" try: - params = {} - if accountType: - params["type"] = accountType async with aiohttp.ClientSession() as session: url = self._buildUrl(config, "charts") - async with session.get(url, headers=self._buildHeaders(config), params=params, timeout=aiohttp.ClientTimeout(total=30)) as resp: + async with session.get(url, headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=30)) as resp: if resp.status != 200: - logger.error(f"RMA charts failed: HTTP {resp.status}") + body = await resp.text() + logger.error(f"RMA charts failed: HTTP {resp.status} - {body[:200]}") return [] data = await resp.json() charts = [] items = data if isinstance(data, list) else data.get("chart", data.get("row", [])) + if not isinstance(items, list): + items = [] for item in items: if isinstance(item, dict): accNo = str(item.get("accno", item.get("account_number", ""))) label = str(item.get("description", item.get("label", ""))) - chartType = item.get("charttype") or item.get("category") or item.get("link") or "" + rmaLink = item.get("link") or "" + chartType = item.get("charttype") or item.get("category") or "" + if not chartType and rmaLink: + chartType = self._rmaLinkToAccountType(rmaLink) if not chartType and accNo: firstDigit = accNo[0] if accNo else "" chartType = { diff --git a/modules/features/trustee/datamodelFeatureTrustee.py b/modules/features/trustee/datamodelFeatureTrustee.py index 2f630464..8b335036 100644 --- a/modules/features/trustee/datamodelFeatureTrustee.py +++ b/modules/features/trustee/datamodelFeatureTrustee.py @@ -684,6 +684,8 @@ class TrusteeAccountingConfig(BaseModel): lastSyncAt: Optional[float] = Field(default=None, description="Timestamp of last sync attempt") lastSyncStatus: Optional[str] = Field(default=None, description="Last sync result: success, error, partial") lastSyncErrorMessage: Optional[str] = Field(default=None, description="Error message when lastSyncStatus is error") + cachedChartOfAccounts: Optional[str] = Field(default=None, description="JSON-serialised chart of accounts cache (list of {accountNumber, label, accountType})") + chartCachedAt: Optional[float] = Field(default=None, description="Timestamp when cachedChartOfAccounts was last refreshed") mandateId: Optional[str] = Field(default=None) @@ -699,6 +701,8 @@ registerModelLabels( "lastSyncAt": {"en": "Last Sync", "fr": "Dernière sync.", "de": "Letzte Synchronisation"}, "lastSyncStatus": {"en": "Status", "fr": "Statut", "de": "Status"}, "lastSyncErrorMessage": {"en": "Error", "fr": "Erreur", "de": "Fehlermeldung"}, + "cachedChartOfAccounts": {"en": "Cached Chart", "de": "Cached Kontoplan", "fr": "Plan comptable en cache"}, + "chartCachedAt": {"en": "Chart Cached At", "de": "Kontoplan-Cache-Zeitpunkt", "fr": "Horodatage cache plan comptable"}, "mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"}, }, ) diff --git a/modules/features/trustee/routeFeatureTrustee.py b/modules/features/trustee/routeFeatureTrustee.py index 673063eb..82feb22f 100644 --- a/modules/features/trustee/routeFeatureTrustee.py +++ b/modules/features/trustee/routeFeatureTrustee.py @@ -1236,7 +1236,7 @@ class SaveAccountingConfigBody(BaseModel): @router.post("/{instanceId}/accounting/config", status_code=201) @limiter.limit("5/minute") -def save_accounting_config( +async def save_accounting_config( request: Request, instanceId: str = Path(..., description="Feature Instance ID"), body: SaveAccountingConfigBody = Body(...), @@ -1288,6 +1288,7 @@ def save_accounting_config( merged[k] = v updatePayload["encryptedConfig"] = encryptValue(json.dumps(merged), keyName="accountingConfig") interface.db.recordModify(TrusteeAccountingConfig, configId, updatePayload) + await _refreshChartSilently(interface, instanceId) return {"message": "Accounting config updated", "id": configId} if not plainConfig: @@ -1307,6 +1308,7 @@ def save_accounting_config( "mandateId": mandateId, } interface.db.recordCreate(TrusteeAccountingConfig, configRecord) + await _refreshChartSilently(interface, instanceId) return {"message": "Accounting config created", "id": configRecord["id"]} @@ -1317,12 +1319,14 @@ async def test_accounting_connection( instanceId: str = Path(..., description="Feature Instance ID"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: - """Test the connection to the configured accounting system.""" + """Test the connection to the configured accounting system. On success, refreshes the local chart-of-accounts cache.""" mandateId = _validateInstanceAccess(instanceId, context) interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId) from .accounting.accountingBridge import AccountingBridge bridge = AccountingBridge(interface) result = await bridge.testConnection(instanceId) + if result.success: + await _refreshChartSilently(interface, instanceId) return result.model_dump() @@ -1360,6 +1364,33 @@ async def get_chart_of_accounts( return [c.model_dump() for c in charts] +async def _refreshChartSilently(interface, instanceId: str) -> None: + """Best-effort chart-of-accounts cache refresh. Logs but does not raise on failure.""" + try: + from .accounting.accountingBridge import AccountingBridge + bridge = AccountingBridge(interface) + charts = await bridge.refreshChartOfAccounts(instanceId) + logger.info(f"Chart cache refreshed: {len(charts)} entries for instance {instanceId}") + except Exception as e: + logger.warning(f"Chart cache refresh failed (non-critical): {e}") + + +@router.post("/{instanceId}/accounting/refresh-chart") +@limiter.limit("5/minute") +async def refresh_chart_of_accounts( + request: Request, + instanceId: str = Path(..., description="Feature Instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Explicitly refresh the locally cached chart of accounts from the external system.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId) + from .accounting.accountingBridge import AccountingBridge + bridge = AccountingBridge(interface) + charts = await bridge.refreshChartOfAccounts(instanceId) + return {"message": f"Chart of accounts refreshed: {len(charts)} entries", "count": len(charts)} + + @router.post("/{instanceId}/accounting/sync") @limiter.limit("5/minute") async def sync_positions_to_accounting( diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 3537fe43..420c910d 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -6,7 +6,7 @@ import re import time import base64 from typing import Dict, Any, List, Optional, Tuple -from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument +from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument, WorkflowModeEnum from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum from modules.datamodels.datamodelExtraction import ContentPart, DocumentIntent @@ -1283,8 +1283,12 @@ Respond with ONLY a JSON object in this exact format: parentOperationId: Optional[str] ) -> AiResponse: """ - Handle DATA_EXTRACT: Extract content from documents (no AI), then process with AI. - This is the original flow: extract all documents first, then process contentParts with AI. + Handle DATA_EXTRACT: Extract content from documents, then process with AI. + + - AUTOMATION mode: No intent analysis. The passed prompt is used as extractionPrompt + for every document and for the final AI call (exact prompt preserved). + - DYNAMIC mode: Intent analysis (clarifyDocumentIntents) runs first; extraction and + processing use the intents and AI-derived extractionPrompt. """ import time @@ -1332,14 +1336,28 @@ Respond with ONLY a JSON object in this exact format: documents = filteredDocuments # Use filtered list - # Step 2: Clarify document intents (if not provided) - REQUIRED for all documents + # Step 2: Document intents – AUTOMATION uses exact prompt; DYNAMIC uses intent analysis if not documentIntents and documents: - documentIntents = await self.clarifyDocumentIntents( - documents, - prompt, - {"outputFormat": outputFormat}, - extractOperationId - ) + workflowMode = getattr(self.services.workflow, "workflowMode", None) if self.services.workflow else None + if workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION: + # Automation: no intent AI call – use the given prompt as extractionPrompt for every document + documentIntents = [ + DocumentIntent( + documentId=doc.id, + intents=["extract"], + extractionPrompt=prompt, + reasoning="Automation mode: use exact prompt from action", + ) + for doc in documents + ] + logger.debug("DATA_EXTRACT in AUTOMATION mode: using exact prompt for all documents (no intent analysis)") + else: + documentIntents = await self.clarifyDocumentIntents( + documents, + prompt, + {"outputFormat": outputFormat}, + extractOperationId + ) # Step 3: Extract and prepare content (NO AI - pure extraction) - REQUIRED for all documents if documents: @@ -1359,51 +1377,33 @@ Respond with ONLY a JSON object in this exact format: contentParts = preparedContentParts - # Step 4: Process extracted contentParts with AI (simple text processing, no structure generation) + # Step 4: Process contentParts with AI via ExtractionService + # Always use processContentPartsWithAi – it handles text vs image parts correctly: + # - Text parts → text models (with chunking if needed) + # - Image parts → Vision AI (proper image_url content blocks) + # No manual contentText concatenation or token estimation needed. if not contentParts: raise ValueError("No content extracted from documents") - # Use simple AI call to process extracted content - # Prepare content for AI processing - contentText = "\n\n".join([ - f"[Document: {part.metadata.get('documentName', 'Unknown')}]\n{part.data}" - for part in contentParts - if part.data - ]) + # Filter out empty content parts (e.g. PDF container with 0 bytes) that would + # produce garbage AI responses and pollute the merged result. + nonEmptyParts = [p for p in contentParts if p.data and len(p.data.strip()) > 0] + if not nonEmptyParts: + raise ValueError("No non-empty content parts to process") - # Check content size and use chunking if needed - # Conservative estimate: 2 bytes per token, 80% of model limit for safety - contentSizeBytes = len(contentText.encode('utf-8')) - promptSizeBytes = len(prompt.encode('utf-8')) - totalSizeBytes = contentSizeBytes + promptSizeBytes - estimatedTokens = totalSizeBytes / 2 # Conservative: 2 bytes per token - - # Get max model context (use Claude's 200k as reference, 80% = 160k tokens) - maxSafeTokens = 160000 - - if estimatedTokens > maxSafeTokens: - # Content too large - use chunking via ExtractionService - logger.warning(f"Content too large for single AI call: ~{estimatedTokens:.0f} tokens (limit: {maxSafeTokens}). Using chunked processing.") - - # Use ExtractionService for chunked processing - extractionService = self.services.extraction - aiResponse = await extractionService.processContentPartsWithPrompt( - contentParts=contentParts, - prompt=prompt, - aiObjects=self.aiObjects, - options=options, - operationId=extractOperationId, - parentOperationId=parentOperationId - ) - else: - # Content fits - use single AI call - aiRequest = AiCallRequest( - prompt=f"{prompt}\n\nExtracted Content:\n{contentText}", - context="", - options=options - ) - - aiResponse = await self.callAi(aiRequest) + self.services.utils.writeDebugFile(prompt, "data_extract_prompt") + extractionService = self.services.extraction + aiRequest = AiCallRequest( + prompt=prompt, + context="", + options=options, + contentParts=nonEmptyParts, + ) + aiResponse = await extractionService.processContentPartsWithAi( + aiRequest, self.aiObjects + ) + _respText = aiResponse.content if isinstance(aiResponse.content, str) else (aiResponse.content.decode("utf-8", errors="replace") if aiResponse.content else "") + self.services.utils.writeDebugFile(_respText, "data_extract_response") # Create response document resultDocument = DocumentData( diff --git a/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py b/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py index ac4e8fbb..fe3473f9 100644 --- a/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py +++ b/modules/workflows/methods/methodTrustee/actions/extractFromFiles.py @@ -13,30 +13,93 @@ import uuid import csv import io from datetime import datetime, timezone -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List, Optional, Tuple from modules.datamodels.datamodelChat import ActionResult, ActionDocument, ChatDocument from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference -from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum +from modules.datamodels.datamodelAi import AiCallOptions, AiCallRequest, OperationTypeEnum logger = logging.getLogger(__name__) ALLOWED_EXTENSIONS = (".pdf", ".jpg", ".jpeg") MAX_FILES = 50 -_DEFAULT_PROMPT_FALLBACK = ( - 'Extract document type (one of: INVOICE, EXPENSE_RECEIPT, BANK_DOCUMENT, CONTRACT, UNKNOWN) ' - 'and expense/position records. Return JSON: {"documentType": "...", "records": [{...}]}. ' - 'Each record must have: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc (full extracted text), ' - 'tags (from: customer, meeting, license, subscription, fuel, food, material), ' - 'bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, ' - 'debitAccountNumber (Soll-Konto nach Schweizer KMU-Kontenrahmen, z.B. 6200 Fahrzeugaufwand, 6000 Materialaufwand), ' - 'creditAccountNumber (Haben-Konto, z.B. 1020 Bank), taxCode, costCenter, bookingReference.' +# Phase 1: Extract all text + classify document type (one step) +_CLASSIFICATION_PROMPT = ( + "Extract ALL text from this document verbatim. Then identify the document type.\n" + 'Return JSON: {"documentType": "EXPENSE_RECEIPT"|"BANK_DOCUMENT"|"INVOICE"|"CONTRACT"|"UNKNOWN", ' + '"rawText": ""}\n' + "EXPENSE_RECEIPT: Quittungen, Tankbelege, Kassenzettel\n" + "BANK_DOCUMENT: Bankauszuege, Kontoauszuege mit Transaktionslisten\n" + "INVOICE: Rechnungen mit Rechnungsnummer und Faelligkeitsdatum\n" + "CONTRACT: Vertraege\n" + "UNKNOWN: Falls unklar" +) + +# Phase 2: Type-specific structuring prompts (placeholders: {expenseList}, {bankList}) +_PROMPT_EXPENSE_RECEIPT = ( + "Extrahiere aus dem folgenden Dokument eine Buchung pro Ausgabeposition. " + "Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc, " + "bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, " + "debitAccountNumber (NUR die Kontonummer, z.B. \"6200\", aus: {expenseList}), " + "creditAccountNumber (NUR die Kontonummer, z.B. \"1020\", aus: {bankList}), tags, taxCode, costCenter, bookingReference." +) +_PROMPT_BANK_DOCUMENT = ( + "Extrahiere aus dem folgenden Bankauszug eine Buchung pro Transaktionszeile. " + "Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta, company (Gegenpartei), desc (Zahlungsreferenz), " + "bookingAmount, bookingCurrency, " + "debitAccountNumber (NUR die Kontonummer aus: {expenseList}), creditAccountNumber (NUR die Kontonummer aus: {bankList}), bookingReference. " + "Kein MwSt bei Bankauszuegen. transactionDateTime optional." +) +_PROMPT_INVOICE = ( + "Extrahiere aus der folgenden Rechnung genau eine Buchung. " + "Return JSON: {{\"records\": [{{...}}]}}. Record: valuta (Rechnungsdatum), company (Kreditor), desc (Rechnungsdetails), " + "bookingAmount, bookingCurrency, vatPercentage, vatAmount, " + "debitAccountNumber (NUR die Kontonummer aus: {expenseList}), creditAccountNumber (NUR die Kontonummer aus: {bankList}), " + "bookingReference (Rechnungsnummer), transactionDateTime, taxCode, costCenter." +) +_PROMPT_FALLBACK = ( + "Extrahiere aus dem folgenden Dokument Buchungsdaten. " + "Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc, " + "bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, " + "debitAccountNumber (NUR die Kontonummer, z.B. \"6200\", aus: {expenseList}), " + "creditAccountNumber (NUR die Kontonummer, z.B. \"1020\", aus: {bankList}), tags, taxCode, costCenter, bookingReference." ) -async def _buildDefaultPromptWithAccounts(self, featureInstanceId: str) -> str: - """Build extraction prompt with real expense accounts from the connected accounting system.""" +def _parseClassificationResult(raw: str) -> Tuple[str, str]: + """Parse phase 1 AI response: {documentType, rawText}. Returns (documentType, rawText).""" + from modules.shared.jsonUtils import stripCodeFences, extractFirstBalancedJson + + documentType = "UNKNOWN" + rawText = "" + cleaned = extractFirstBalancedJson(stripCodeFences((raw or "").strip())) + try: + data = json.loads(cleaned) + documentType = (data.get("documentType") or "UNKNOWN").strip().upper().replace(" ", "_") + rawText = (data.get("rawText") or data.get("raw_text") or "").strip() + except Exception as e: + logger.debug("Parse classification result: %s", e) + return (documentType, rawText) + + +def _buildStructuringPrompt(documentType: str, expenseList: str, bankList: str) -> str: + """Build phase 2 prompt for the given document type, with account lists injected.""" + expenseList = expenseList or "6200 Fahrzeugaufwand, 6000 Materialaufwand" + bankList = bankList or "1020 Bank" + docType = (documentType or "UNKNOWN").upper().replace(" ", "_") + if docType == "EXPENSE_RECEIPT": + return _PROMPT_EXPENSE_RECEIPT.format(expenseList=expenseList, bankList=bankList) + if docType == "BANK_DOCUMENT": + return _PROMPT_BANK_DOCUMENT.format(expenseList=expenseList, bankList=bankList) + if docType == "INVOICE": + return _PROMPT_INVOICE.format(expenseList=expenseList, bankList=bankList) + return _PROMPT_FALLBACK.format(expenseList=expenseList, bankList=bankList) + + +async def _getAccountLists(self, featureInstanceId: str) -> Tuple[str, str]: + """Load expense and bank account lists from the connected accounting system for use in prompts. + Returns (expenseList, bankList). Empty strings if not configured or on error.""" try: from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface from modules.features.trustee.accounting.accountingBridge import AccountingBridge @@ -50,25 +113,30 @@ async def _buildDefaultPromptWithAccounts(self, featureInstanceId: str) -> str: assetAccounts = await bridge.getChartOfAccounts(featureInstanceId, accountType="asset") except Exception as e: logger.debug("Could not load chart of accounts for prompt: %s", e) - return "" + return ("", "") if not expenseAccounts: - return "" + return ("", "") expenseList = ", ".join(f"{a.accountNumber} {a.label}" for a in expenseAccounts[:50]) bankAccounts = [a for a in assetAccounts if a.accountNumber.startswith("10")] bankList = ", ".join(f"{a.accountNumber} {a.label}" for a in bankAccounts[:10]) if bankAccounts else "1020 Bank" + return (expenseList, bankList) - return ( - 'Extract document type (one of: INVOICE, EXPENSE_RECEIPT, BANK_DOCUMENT, CONTRACT, UNKNOWN) ' - 'and expense/position records. Return JSON: {"documentType": "...", "records": [{...}]}. ' - 'Each record must have: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc (full extracted text), ' - 'tags (from: customer, meeting, license, subscription, fuel, food, material), ' - 'bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, ' - f'debitAccountNumber (Soll-Konto, verwende eines der folgenden Aufwandkonten: {expenseList}), ' - f'creditAccountNumber (Haben-Konto, verwende eines der folgenden Konten: {bankList}), ' - 'taxCode, costCenter, bookingReference.' - ) + +def _parseStructuredRecords(raw: str) -> List[Dict[str, Any]]: + """Parse phase 2 AI response (JSON with records or CSV) into list of record dicts.""" + from modules.shared.jsonUtils import stripCodeFences, extractFirstBalancedJson + + records: List[Dict[str, Any]] = [] + cleaned = extractFirstBalancedJson(stripCodeFences((raw or "").strip())) + try: + data = json.loads(cleaned) + records = data.get("records") or data.get("extractedData") or [] + except Exception: + if cleaned: + records = _parseCsvToRecords(cleaned) + return records if isinstance(records, list) else [] def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]: @@ -85,26 +153,40 @@ def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]: content = "\n".join(lines) reader = csv.DictReader(io.StringIO(content)) for row in reader: - cleaned = {k.strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()} + cleaned = {(k.strip() if k else k): (v.strip() if isinstance(v, str) else v) for k, v in row.items() if k} records.append(cleaned) except Exception as e: logger.warning(f"Parse CSV: {e}") return records -async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str, mimeType: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]: - """Run AI extraction on one file; return { documentType, extractedData (records), fileId, fileName }.""" +async def _extractWithAi( + self, + chatDocumentId: str, + fileId: str, + fileName: str, + mimeType: str, + expenseList: str, + bankList: str, + featureInstanceId: str, +) -> Dict[str, Any]: + """Run 2-phase AI extraction: (1) classify + full text, (2) structure by type. Returns { documentType, extractedData, fileId, fileName }.""" await self.services.ai.ensureAiObjectsInitialized() from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference docList = DocumentReferenceList( references=[DocumentItemReference(documentId=chatDocumentId, fileName=fileName)] ) - # Prefer JSON for documentType + records in one response; fallback to CSV - options = AiCallOptions(resultFormat="json", operationType=OperationTypeEnum.DATA_GENERATE) + try: - aiResponse = await self.services.ai.callAiContent( - prompt=prompt or _DEFAULT_PROMPT_FALLBACK, + self.services.utils.writeDebugFile(_CLASSIFICATION_PROMPT, "trustee_classification_prompt") + except Exception: + pass + + options = AiCallOptions(resultFormat="json", operationType=OperationTypeEnum.DATA_EXTRACT) + try: + phase1Response = await self.services.ai.callAiContent( + prompt=_CLASSIFICATION_PROMPT, options=options, documentList=docList, contentParts=None, @@ -112,9 +194,9 @@ async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str, generationIntent="extract", ) except Exception: - options = AiCallOptions(resultFormat="csv", operationType=OperationTypeEnum.DATA_GENERATE) - aiResponse = await self.services.ai.callAiContent( - prompt=prompt or _DEFAULT_PROMPT_FALLBACK, + options = AiCallOptions(resultFormat="csv", operationType=OperationTypeEnum.DATA_EXTRACT) + phase1Response = await self.services.ai.callAiContent( + prompt=_CLASSIFICATION_PROMPT, options=options, documentList=docList, contentParts=None, @@ -122,63 +204,50 @@ async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str, generationIntent="extract", ) - if not aiResponse or not aiResponse.documents: + if not phase1Response or not phase1Response.documents: return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName} - doc = aiResponse.documents[0] - raw = doc.documentData - if isinstance(raw, bytes): - raw = raw.decode("utf-8") + raw1 = phase1Response.documents[0].documentData + if isinstance(raw1, bytes): + raw1 = raw1.decode("utf-8") + documentType, rawText = _parseClassificationResult(raw1 or "") - documentType = "UNKNOWN" - records = [] + if not rawText: + return {"documentType": documentType or "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName} - # Try JSON first + structuringPrompt = _buildStructuringPrompt(documentType, expenseList, bankList) try: - if raw.strip().startswith("{"): - data = json.loads(raw) - # Direct format: {"documentType": "...", "records": [...]} - if "records" in data or "extractedData" in data: - documentType = (data.get("documentType") or "UNKNOWN").upper().replace(" ", "_") - records = data.get("records") or data.get("extractedData") or [] - # Wrapped in document structure: {"documents": [{"sections": [{"elements": [{"content": {"code": "..."}}]}]}]} - elif "documents" in data: - for doc in data.get("documents", []): - for section in doc.get("sections", []): - for elem in section.get("elements", []): - code = (elem.get("content") or {}).get("code") - if code and isinstance(code, str): - try: - inner = json.loads(code) - if isinstance(inner, dict) and ("records" in inner or "documentType" in inner): - documentType = (inner.get("documentType") or "UNKNOWN").upper().replace(" ", "_") - records = inner.get("records") or inner.get("extractedData") or [] - break - except Exception: - pass - if records: - break - if records: - break - elif "documentType" in data: - documentType = (data.get("documentType") or "UNKNOWN").upper().replace(" ", "_") + self.services.utils.writeDebugFile(structuringPrompt, "trustee_structuring_prompt") except Exception: pass - # Fallback: CSV - if not records and raw: - records = _parseCsvToRecords(raw) - if records and not documentType or documentType == "UNKNOWN": - documentType = "EXPENSE_RECEIPT" + fullPrompt = f"{structuringPrompt}\n\nDOKUMENT-TEXT:\n{rawText}" + phase2Request = AiCallRequest( + prompt=fullPrompt, + context="", + options=AiCallOptions(resultFormat="json"), + ) + phase2Response = await self.services.ai.callAi(phase2Request) + raw2 = (phase2Response.content or "").strip() if hasattr(phase2Response, "content") else "" + try: + self.services.utils.writeDebugFile(raw2 or "(empty)", "trustee_structuring_response") + except Exception: + pass + records = _parseStructuredRecords(raw2) + logger.info("Phase 2 result: documentType=%s, records=%d, raw2_length=%d", documentType, len(records), len(raw2)) - return {"documentType": documentType, "extractedData": records, "fileId": fileId, "fileName": fileName} # fileId from caller for result + if records and (not documentType or documentType == "UNKNOWN"): + documentType = "EXPENSE_RECEIPT" + + return {"documentType": documentType or "UNKNOWN", "extractedData": records, "fileId": fileId, "fileName": fileName} async def _extractOne( self, f: Dict[str, Any], fileIdToChatDocId: Dict[str, str], - prompt: str, + expenseList: str, + bankList: str, featureInstanceId: str, ) -> ActionDocument: """Run extraction for one file; returns success or error ActionDocument (never raises).""" @@ -197,7 +266,7 @@ async def _extractOne( ) try: out = await _extractWithAi( - self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], prompt, featureInstanceId + self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], expenseList, bankList, featureInstanceId ) return ActionDocument( documentName=f.get("fileName", "extract") + ".json", @@ -229,7 +298,6 @@ async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult: connectionReference = parameters.get("connectionReference") sharepointFolder = parameters.get("sharepointFolder") featureInstanceId = parameters.get("featureInstanceId") or getattr(self.services, "featureInstanceId", None) - prompt = parameters.get("prompt") or "" if not featureInstanceId: return ActionResult.isFailure(error="featureInstanceId is required") @@ -329,13 +397,11 @@ async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult: if i < len(createdMessage.documents): fileIdToChatDocId[f["fileId"]] = createdMessage.documents[i].id - # Load expense accounts from accounting system for AI prompt (if configured) - if not prompt: - prompt = await _buildDefaultPromptWithAccounts(self, featureInstanceId) + expenseList, bankList = await _getAccountLists(self, featureInstanceId) - # Parallel extraction (all files at once) + # Parallel extraction (all files at once, 2-phase: classify + structure) tasks = [ - _extractOne(self, f, fileIdToChatDocId, prompt, featureInstanceId) + _extractOne(self, f, fileIdToChatDocId, expenseList, bankList, featureInstanceId) for f in filesToProcess ] resultDocuments = list(await asyncio.gather(*tasks)) diff --git a/modules/workflows/methods/methodTrustee/actions/processDocuments.py b/modules/workflows/methods/methodTrustee/actions/processDocuments.py index f5c2a868..c00dda6d 100644 --- a/modules/workflows/methods/methodTrustee/actions/processDocuments.py +++ b/modules/workflows/methods/methodTrustee/actions/processDocuments.py @@ -27,6 +27,24 @@ def _parseFloat(value) -> float: return 0.0 +def _extractAccountNumber(value) -> Optional[str]: + """Extract the leading numeric account number from AI output like '6200 Fahrzeugaufwand' -> '6200'.""" + if not value or not isinstance(value, str): + return None + import re + match = re.match(r"(\d+)", value.strip()) + return match.group(1) if match else value.strip() or None + + +def _normaliseTags(value) -> str: + """Convert tags from various formats to a clean comma-separated string.""" + if not value: + return "" + if isinstance(value, list): + return ", ".join(str(t) for t in value if t) + return str(value) + + def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], featureInstanceId: str, mandateId: str) -> Dict[str, Any]: """Map extraction record to TrusteePosition payload.""" return { @@ -35,15 +53,15 @@ def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], feature "transactionDateTime": record.get("transactionDateTime"), "company": record.get("company", ""), "desc": record.get("desc", ""), - "tags": record.get("tags", ""), + "tags": _normaliseTags(record.get("tags")), "bookingCurrency": record.get("bookingCurrency", "CHF"), "bookingAmount": _parseFloat(record.get("bookingAmount", 0)), "originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"), "originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)), "vatPercentage": _parseFloat(record.get("vatPercentage", 0)), "vatAmount": _parseFloat(record.get("vatAmount", 0)), - "debitAccountNumber": record.get("debitAccountNumber") or None, - "creditAccountNumber": record.get("creditAccountNumber") or None, + "debitAccountNumber": _extractAccountNumber(record.get("debitAccountNumber")), + "creditAccountNumber": _extractAccountNumber(record.get("creditAccountNumber")), "taxCode": record.get("taxCode") or None, "costCenter": record.get("costCenter") or None, "bookingReference": record.get("bookingReference") or None,