fixes sync and ai tree

This commit is contained in:
patrick-motsch 2026-02-22 22:34:07 +01:00
parent 6b11d66766
commit f35a90e428
7 changed files with 351 additions and 160 deletions

View file

@ -5,6 +5,7 @@
Encapsulates: config loading -> connector resolution -> duplicate check -> push -> sync record. Encapsulates: config loading -> connector resolution -> duplicate check -> push -> sync record.
""" """
import json
import logging import logging
import time import time
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
@ -109,19 +110,26 @@ class AccountingBridge:
lines=lines, lines=lines,
) )
async def pushPositionToAccounting(self, featureInstanceId: str, positionId: str) -> SyncResult: async def pushPositionToAccounting(
self,
featureInstanceId: str,
positionId: str,
_resolvedConnector=None,
_resolvedPlainConfig=None,
_resolvedConfigRecord=None,
) -> SyncResult:
"""Push a single position to the configured accounting system. """Push a single position to the configured accounting system.
1. Load config and connector Optional _resolved* params allow pushBatchToAccounting to pass a pre-resolved
2. Load position data connector/config so we don't decrypt per position (avoids rate-limit).
3. Check for existing successful sync (duplicate guard)
4. Build AccountingBooking
5. Push via connector
6. Create TrusteeAccountingSync record
""" """
from modules.features.trustee.datamodelFeatureTrustee import TrusteePosition, TrusteeAccountingSync from modules.features.trustee.datamodelFeatureTrustee import TrusteePosition, TrusteeAccountingSync
connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId) connector = _resolvedConnector
plainConfig = _resolvedPlainConfig
configRecord = _resolvedConfigRecord
if not connector or not plainConfig:
connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId)
if not connector or not plainConfig: if not connector or not plainConfig:
return SyncResult(success=False, errorMessage="No active accounting configuration found") return SyncResult(success=False, errorMessage="No active accounting configuration found")
@ -269,19 +277,62 @@ class AccountingBridge:
return result return result
async def pushBatchToAccounting(self, featureInstanceId: str, positionIds: List[str]) -> List[SyncResult]: async def pushBatchToAccounting(self, featureInstanceId: str, positionIds: List[str]) -> List[SyncResult]:
"""Push multiple positions sequentially.""" """Push multiple positions sequentially. Resolves connector/config once to avoid decrypt rate-limit."""
connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId)
if not connector or not plainConfig:
return [SyncResult(success=False, errorMessage="No active accounting configuration found") for _ in positionIds]
results = [] results = []
for positionId in positionIds: for positionId in positionIds:
result = await self.pushPositionToAccounting(featureInstanceId, positionId) result = await self.pushPositionToAccounting(
featureInstanceId, positionId,
_resolvedConnector=connector, _resolvedPlainConfig=plainConfig, _resolvedConfigRecord=configRecord,
)
results.append(result) results.append(result)
return results return results
async def getChartOfAccounts(self, featureInstanceId: str, accountType: Optional[str] = None) -> List[AccountingChart]: async def refreshChartOfAccounts(self, featureInstanceId: str) -> List[AccountingChart]:
"""Load the chart of accounts from the configured external system. Optional filter by accountType.""" """Fetch the full chart of accounts from the external system and cache it locally on TrusteeAccountingConfig."""
connector, plainConfig, _ = await self._resolveConnectorAndConfig(featureInstanceId) from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig
if not connector or not plainConfig:
connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId)
if not connector or not plainConfig or not configRecord:
logger.warning("refreshChartOfAccounts: no connector/config — nothing to cache")
return [] return []
charts = await connector.getChartOfAccounts(plainConfig, accountType=accountType)
charts = await connector.getChartOfAccounts(plainConfig)
serialised = json.dumps([{"accountNumber": c.accountNumber, "label": c.label, "accountType": c.accountType or ""} for c in charts], ensure_ascii=False)
self._trusteeInterface.db.recordModify(TrusteeAccountingConfig, configRecord["id"], {
"cachedChartOfAccounts": serialised,
"chartCachedAt": time.time(),
})
logger.info(f"Cached {len(charts)} chart-of-accounts entries for instance {featureInstanceId}")
return charts
def _readCachedCharts(self, configRecord: Dict[str, Any]) -> List[AccountingChart]:
"""Deserialise the cached chart-of-accounts JSON from a config record. Returns [] on any error."""
raw = configRecord.get("cachedChartOfAccounts")
if not raw:
return []
try:
items = json.loads(raw) if isinstance(raw, str) else raw
return [AccountingChart(accountNumber=i["accountNumber"], label=i["label"], accountType=i.get("accountType", "")) for i in items]
except Exception as e:
logger.debug("Could not deserialise cached chart: %s", e)
return []
async def getChartOfAccounts(self, featureInstanceId: str, accountType: Optional[str] = None) -> List[AccountingChart]:
"""Return chart of accounts — from local cache if available, otherwise fetch externally and cache."""
configRecord = await self.getActiveConfig(featureInstanceId)
if not configRecord:
return []
charts = self._readCachedCharts(configRecord)
if charts:
logger.debug(f"Using cached chart of accounts ({len(charts)} entries) for instance {featureInstanceId}")
else:
logger.info(f"No cached chart — fetching live for instance {featureInstanceId}")
charts = await self.refreshChartOfAccounts(featureInstanceId)
if accountType: if accountType:
charts = [c for c in charts if c.accountType == accountType] charts = [c for c in charts if c.accountType == accountType]
return charts return charts

View file

@ -87,26 +87,47 @@ class AccountingConnectorRma(BaseAccountingConnector):
except Exception as e: except Exception as e:
return SyncResult(success=False, errorMessage=str(e)) return SyncResult(success=False, errorMessage=str(e))
def _rmaLinkToAccountType(self, link: str) -> str:
"""Map RMA chart 'link' (e.g. AP_amount, AR_amount, AR_paid:AP_paid) to our accountType."""
if not link:
return ""
linkUpper = link.upper()
if "AP_AMOUNT" in linkUpper:
return "expense"
if "AR_AMOUNT" in linkUpper:
return "revenue"
if "AR_PAID" in linkUpper or "AP_PAID" in linkUpper:
return "asset"
if "AR_TAX" in linkUpper or "AP_TAX" in linkUpper:
return "liability"
if linkUpper in ("AR", "AP"):
return "asset"
return link
async def getChartOfAccounts(self, config: Dict[str, Any], accountType: Optional[str] = None) -> List[AccountingChart]: async def getChartOfAccounts(self, config: Dict[str, Any], accountType: Optional[str] = None) -> List[AccountingChart]:
"""RMA API 'type' filter expects RMA values (AP_amount, AR_amount, etc.), not 'expense'. Fetch full chart and filter client-side."""
try: try:
params = {}
if accountType:
params["type"] = accountType
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
url = self._buildUrl(config, "charts") url = self._buildUrl(config, "charts")
async with session.get(url, headers=self._buildHeaders(config), params=params, timeout=aiohttp.ClientTimeout(total=30)) as resp: async with session.get(url, headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200: if resp.status != 200:
logger.error(f"RMA charts failed: HTTP {resp.status}") body = await resp.text()
logger.error(f"RMA charts failed: HTTP {resp.status} - {body[:200]}")
return [] return []
data = await resp.json() data = await resp.json()
charts = [] charts = []
items = data if isinstance(data, list) else data.get("chart", data.get("row", [])) items = data if isinstance(data, list) else data.get("chart", data.get("row", []))
if not isinstance(items, list):
items = []
for item in items: for item in items:
if isinstance(item, dict): if isinstance(item, dict):
accNo = str(item.get("accno", item.get("account_number", ""))) accNo = str(item.get("accno", item.get("account_number", "")))
label = str(item.get("description", item.get("label", ""))) label = str(item.get("description", item.get("label", "")))
chartType = item.get("charttype") or item.get("category") or item.get("link") or "" rmaLink = item.get("link") or ""
chartType = item.get("charttype") or item.get("category") or ""
if not chartType and rmaLink:
chartType = self._rmaLinkToAccountType(rmaLink)
if not chartType and accNo: if not chartType and accNo:
firstDigit = accNo[0] if accNo else "" firstDigit = accNo[0] if accNo else ""
chartType = { chartType = {

View file

@ -684,6 +684,8 @@ class TrusteeAccountingConfig(BaseModel):
lastSyncAt: Optional[float] = Field(default=None, description="Timestamp of last sync attempt") lastSyncAt: Optional[float] = Field(default=None, description="Timestamp of last sync attempt")
lastSyncStatus: Optional[str] = Field(default=None, description="Last sync result: success, error, partial") lastSyncStatus: Optional[str] = Field(default=None, description="Last sync result: success, error, partial")
lastSyncErrorMessage: Optional[str] = Field(default=None, description="Error message when lastSyncStatus is error") lastSyncErrorMessage: Optional[str] = Field(default=None, description="Error message when lastSyncStatus is error")
cachedChartOfAccounts: Optional[str] = Field(default=None, description="JSON-serialised chart of accounts cache (list of {accountNumber, label, accountType})")
chartCachedAt: Optional[float] = Field(default=None, description="Timestamp when cachedChartOfAccounts was last refreshed")
mandateId: Optional[str] = Field(default=None) mandateId: Optional[str] = Field(default=None)
@ -699,6 +701,8 @@ registerModelLabels(
"lastSyncAt": {"en": "Last Sync", "fr": "Dernière sync.", "de": "Letzte Synchronisation"}, "lastSyncAt": {"en": "Last Sync", "fr": "Dernière sync.", "de": "Letzte Synchronisation"},
"lastSyncStatus": {"en": "Status", "fr": "Statut", "de": "Status"}, "lastSyncStatus": {"en": "Status", "fr": "Statut", "de": "Status"},
"lastSyncErrorMessage": {"en": "Error", "fr": "Erreur", "de": "Fehlermeldung"}, "lastSyncErrorMessage": {"en": "Error", "fr": "Erreur", "de": "Fehlermeldung"},
"cachedChartOfAccounts": {"en": "Cached Chart", "de": "Cached Kontoplan", "fr": "Plan comptable en cache"},
"chartCachedAt": {"en": "Chart Cached At", "de": "Kontoplan-Cache-Zeitpunkt", "fr": "Horodatage cache plan comptable"},
"mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"}, "mandateId": {"en": "Mandate", "fr": "Mandat", "de": "Mandat"},
}, },
) )

View file

@ -1236,7 +1236,7 @@ class SaveAccountingConfigBody(BaseModel):
@router.post("/{instanceId}/accounting/config", status_code=201) @router.post("/{instanceId}/accounting/config", status_code=201)
@limiter.limit("5/minute") @limiter.limit("5/minute")
def save_accounting_config( async def save_accounting_config(
request: Request, request: Request,
instanceId: str = Path(..., description="Feature Instance ID"), instanceId: str = Path(..., description="Feature Instance ID"),
body: SaveAccountingConfigBody = Body(...), body: SaveAccountingConfigBody = Body(...),
@ -1288,6 +1288,7 @@ def save_accounting_config(
merged[k] = v merged[k] = v
updatePayload["encryptedConfig"] = encryptValue(json.dumps(merged), keyName="accountingConfig") updatePayload["encryptedConfig"] = encryptValue(json.dumps(merged), keyName="accountingConfig")
interface.db.recordModify(TrusteeAccountingConfig, configId, updatePayload) interface.db.recordModify(TrusteeAccountingConfig, configId, updatePayload)
await _refreshChartSilently(interface, instanceId)
return {"message": "Accounting config updated", "id": configId} return {"message": "Accounting config updated", "id": configId}
if not plainConfig: if not plainConfig:
@ -1307,6 +1308,7 @@ def save_accounting_config(
"mandateId": mandateId, "mandateId": mandateId,
} }
interface.db.recordCreate(TrusteeAccountingConfig, configRecord) interface.db.recordCreate(TrusteeAccountingConfig, configRecord)
await _refreshChartSilently(interface, instanceId)
return {"message": "Accounting config created", "id": configRecord["id"]} return {"message": "Accounting config created", "id": configRecord["id"]}
@ -1317,12 +1319,14 @@ async def test_accounting_connection(
instanceId: str = Path(..., description="Feature Instance ID"), instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext) context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Test the connection to the configured accounting system.""" """Test the connection to the configured accounting system. On success, refreshes the local chart-of-accounts cache."""
mandateId = _validateInstanceAccess(instanceId, context) mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId) interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .accounting.accountingBridge import AccountingBridge from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface) bridge = AccountingBridge(interface)
result = await bridge.testConnection(instanceId) result = await bridge.testConnection(instanceId)
if result.success:
await _refreshChartSilently(interface, instanceId)
return result.model_dump() return result.model_dump()
@ -1360,6 +1364,33 @@ async def get_chart_of_accounts(
return [c.model_dump() for c in charts] return [c.model_dump() for c in charts]
async def _refreshChartSilently(interface, instanceId: str) -> None:
"""Best-effort chart-of-accounts cache refresh. Logs but does not raise on failure."""
try:
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
charts = await bridge.refreshChartOfAccounts(instanceId)
logger.info(f"Chart cache refreshed: {len(charts)} entries for instance {instanceId}")
except Exception as e:
logger.warning(f"Chart cache refresh failed (non-critical): {e}")
@router.post("/{instanceId}/accounting/refresh-chart")
@limiter.limit("5/minute")
async def refresh_chart_of_accounts(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Explicitly refresh the locally cached chart of accounts from the external system."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
charts = await bridge.refreshChartOfAccounts(instanceId)
return {"message": f"Chart of accounts refreshed: {len(charts)} entries", "count": len(charts)}
@router.post("/{instanceId}/accounting/sync") @router.post("/{instanceId}/accounting/sync")
@limiter.limit("5/minute") @limiter.limit("5/minute")
async def sync_positions_to_accounting( async def sync_positions_to_accounting(

View file

@ -6,7 +6,7 @@ import re
import time import time
import base64 import base64
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument, WorkflowModeEnum
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.datamodels.datamodelExtraction import ContentPart, DocumentIntent from modules.datamodels.datamodelExtraction import ContentPart, DocumentIntent
@ -1283,8 +1283,12 @@ Respond with ONLY a JSON object in this exact format:
parentOperationId: Optional[str] parentOperationId: Optional[str]
) -> AiResponse: ) -> AiResponse:
""" """
Handle DATA_EXTRACT: Extract content from documents (no AI), then process with AI. Handle DATA_EXTRACT: Extract content from documents, then process with AI.
This is the original flow: extract all documents first, then process contentParts with AI.
- AUTOMATION mode: No intent analysis. The passed prompt is used as extractionPrompt
for every document and for the final AI call (exact prompt preserved).
- DYNAMIC mode: Intent analysis (clarifyDocumentIntents) runs first; extraction and
processing use the intents and AI-derived extractionPrompt.
""" """
import time import time
@ -1332,14 +1336,28 @@ Respond with ONLY a JSON object in this exact format:
documents = filteredDocuments # Use filtered list documents = filteredDocuments # Use filtered list
# Step 2: Clarify document intents (if not provided) - REQUIRED for all documents # Step 2: Document intents AUTOMATION uses exact prompt; DYNAMIC uses intent analysis
if not documentIntents and documents: if not documentIntents and documents:
documentIntents = await self.clarifyDocumentIntents( workflowMode = getattr(self.services.workflow, "workflowMode", None) if self.services.workflow else None
documents, if workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION:
prompt, # Automation: no intent AI call use the given prompt as extractionPrompt for every document
{"outputFormat": outputFormat}, documentIntents = [
extractOperationId DocumentIntent(
) documentId=doc.id,
intents=["extract"],
extractionPrompt=prompt,
reasoning="Automation mode: use exact prompt from action",
)
for doc in documents
]
logger.debug("DATA_EXTRACT in AUTOMATION mode: using exact prompt for all documents (no intent analysis)")
else:
documentIntents = await self.clarifyDocumentIntents(
documents,
prompt,
{"outputFormat": outputFormat},
extractOperationId
)
# Step 3: Extract and prepare content (NO AI - pure extraction) - REQUIRED for all documents # Step 3: Extract and prepare content (NO AI - pure extraction) - REQUIRED for all documents
if documents: if documents:
@ -1359,51 +1377,33 @@ Respond with ONLY a JSON object in this exact format:
contentParts = preparedContentParts contentParts = preparedContentParts
# Step 4: Process extracted contentParts with AI (simple text processing, no structure generation) # Step 4: Process contentParts with AI via ExtractionService
# Always use processContentPartsWithAi it handles text vs image parts correctly:
# - Text parts → text models (with chunking if needed)
# - Image parts → Vision AI (proper image_url content blocks)
# No manual contentText concatenation or token estimation needed.
if not contentParts: if not contentParts:
raise ValueError("No content extracted from documents") raise ValueError("No content extracted from documents")
# Use simple AI call to process extracted content # Filter out empty content parts (e.g. PDF container with 0 bytes) that would
# Prepare content for AI processing # produce garbage AI responses and pollute the merged result.
contentText = "\n\n".join([ nonEmptyParts = [p for p in contentParts if p.data and len(p.data.strip()) > 0]
f"[Document: {part.metadata.get('documentName', 'Unknown')}]\n{part.data}" if not nonEmptyParts:
for part in contentParts raise ValueError("No non-empty content parts to process")
if part.data
])
# Check content size and use chunking if needed self.services.utils.writeDebugFile(prompt, "data_extract_prompt")
# Conservative estimate: 2 bytes per token, 80% of model limit for safety extractionService = self.services.extraction
contentSizeBytes = len(contentText.encode('utf-8')) aiRequest = AiCallRequest(
promptSizeBytes = len(prompt.encode('utf-8')) prompt=prompt,
totalSizeBytes = contentSizeBytes + promptSizeBytes context="",
estimatedTokens = totalSizeBytes / 2 # Conservative: 2 bytes per token options=options,
contentParts=nonEmptyParts,
# Get max model context (use Claude's 200k as reference, 80% = 160k tokens) )
maxSafeTokens = 160000 aiResponse = await extractionService.processContentPartsWithAi(
aiRequest, self.aiObjects
if estimatedTokens > maxSafeTokens: )
# Content too large - use chunking via ExtractionService _respText = aiResponse.content if isinstance(aiResponse.content, str) else (aiResponse.content.decode("utf-8", errors="replace") if aiResponse.content else "")
logger.warning(f"Content too large for single AI call: ~{estimatedTokens:.0f} tokens (limit: {maxSafeTokens}). Using chunked processing.") self.services.utils.writeDebugFile(_respText, "data_extract_response")
# Use ExtractionService for chunked processing
extractionService = self.services.extraction
aiResponse = await extractionService.processContentPartsWithPrompt(
contentParts=contentParts,
prompt=prompt,
aiObjects=self.aiObjects,
options=options,
operationId=extractOperationId,
parentOperationId=parentOperationId
)
else:
# Content fits - use single AI call
aiRequest = AiCallRequest(
prompt=f"{prompt}\n\nExtracted Content:\n{contentText}",
context="",
options=options
)
aiResponse = await self.callAi(aiRequest)
# Create response document # Create response document
resultDocument = DocumentData( resultDocument = DocumentData(

View file

@ -13,30 +13,93 @@ import uuid
import csv import csv
import io import io
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional, Tuple
from modules.datamodels.datamodelChat import ActionResult, ActionDocument, ChatDocument from modules.datamodels.datamodelChat import ActionResult, ActionDocument, ChatDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.datamodels.datamodelAi import AiCallOptions, AiCallRequest, OperationTypeEnum
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ALLOWED_EXTENSIONS = (".pdf", ".jpg", ".jpeg") ALLOWED_EXTENSIONS = (".pdf", ".jpg", ".jpeg")
MAX_FILES = 50 MAX_FILES = 50
_DEFAULT_PROMPT_FALLBACK = ( # Phase 1: Extract all text + classify document type (one step)
'Extract document type (one of: INVOICE, EXPENSE_RECEIPT, BANK_DOCUMENT, CONTRACT, UNKNOWN) ' _CLASSIFICATION_PROMPT = (
'and expense/position records. Return JSON: {"documentType": "...", "records": [{...}]}. ' "Extract ALL text from this document verbatim. Then identify the document type.\n"
'Each record must have: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc (full extracted text), ' 'Return JSON: {"documentType": "EXPENSE_RECEIPT"|"BANK_DOCUMENT"|"INVOICE"|"CONTRACT"|"UNKNOWN", '
'tags (from: customer, meeting, license, subscription, fuel, food, material), ' '"rawText": "<complete extracted text>"}\n'
'bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, ' "EXPENSE_RECEIPT: Quittungen, Tankbelege, Kassenzettel\n"
'debitAccountNumber (Soll-Konto nach Schweizer KMU-Kontenrahmen, z.B. 6200 Fahrzeugaufwand, 6000 Materialaufwand), ' "BANK_DOCUMENT: Bankauszuege, Kontoauszuege mit Transaktionslisten\n"
'creditAccountNumber (Haben-Konto, z.B. 1020 Bank), taxCode, costCenter, bookingReference.' "INVOICE: Rechnungen mit Rechnungsnummer und Faelligkeitsdatum\n"
"CONTRACT: Vertraege\n"
"UNKNOWN: Falls unklar"
)
# Phase 2: Type-specific structuring prompts (placeholders: {expenseList}, {bankList})
_PROMPT_EXPENSE_RECEIPT = (
"Extrahiere aus dem folgenden Dokument eine Buchung pro Ausgabeposition. "
"Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc, "
"bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, "
"debitAccountNumber (NUR die Kontonummer, z.B. \"6200\", aus: {expenseList}), "
"creditAccountNumber (NUR die Kontonummer, z.B. \"1020\", aus: {bankList}), tags, taxCode, costCenter, bookingReference."
)
_PROMPT_BANK_DOCUMENT = (
"Extrahiere aus dem folgenden Bankauszug eine Buchung pro Transaktionszeile. "
"Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta, company (Gegenpartei), desc (Zahlungsreferenz), "
"bookingAmount, bookingCurrency, "
"debitAccountNumber (NUR die Kontonummer aus: {expenseList}), creditAccountNumber (NUR die Kontonummer aus: {bankList}), bookingReference. "
"Kein MwSt bei Bankauszuegen. transactionDateTime optional."
)
_PROMPT_INVOICE = (
"Extrahiere aus der folgenden Rechnung genau eine Buchung. "
"Return JSON: {{\"records\": [{{...}}]}}. Record: valuta (Rechnungsdatum), company (Kreditor), desc (Rechnungsdetails), "
"bookingAmount, bookingCurrency, vatPercentage, vatAmount, "
"debitAccountNumber (NUR die Kontonummer aus: {expenseList}), creditAccountNumber (NUR die Kontonummer aus: {bankList}), "
"bookingReference (Rechnungsnummer), transactionDateTime, taxCode, costCenter."
)
_PROMPT_FALLBACK = (
"Extrahiere aus dem folgenden Dokument Buchungsdaten. "
"Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc, "
"bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, "
"debitAccountNumber (NUR die Kontonummer, z.B. \"6200\", aus: {expenseList}), "
"creditAccountNumber (NUR die Kontonummer, z.B. \"1020\", aus: {bankList}), tags, taxCode, costCenter, bookingReference."
) )
async def _buildDefaultPromptWithAccounts(self, featureInstanceId: str) -> str: def _parseClassificationResult(raw: str) -> Tuple[str, str]:
"""Build extraction prompt with real expense accounts from the connected accounting system.""" """Parse phase 1 AI response: {documentType, rawText}. Returns (documentType, rawText)."""
from modules.shared.jsonUtils import stripCodeFences, extractFirstBalancedJson
documentType = "UNKNOWN"
rawText = ""
cleaned = extractFirstBalancedJson(stripCodeFences((raw or "").strip()))
try:
data = json.loads(cleaned)
documentType = (data.get("documentType") or "UNKNOWN").strip().upper().replace(" ", "_")
rawText = (data.get("rawText") or data.get("raw_text") or "").strip()
except Exception as e:
logger.debug("Parse classification result: %s", e)
return (documentType, rawText)
def _buildStructuringPrompt(documentType: str, expenseList: str, bankList: str) -> str:
"""Build phase 2 prompt for the given document type, with account lists injected."""
expenseList = expenseList or "6200 Fahrzeugaufwand, 6000 Materialaufwand"
bankList = bankList or "1020 Bank"
docType = (documentType or "UNKNOWN").upper().replace(" ", "_")
if docType == "EXPENSE_RECEIPT":
return _PROMPT_EXPENSE_RECEIPT.format(expenseList=expenseList, bankList=bankList)
if docType == "BANK_DOCUMENT":
return _PROMPT_BANK_DOCUMENT.format(expenseList=expenseList, bankList=bankList)
if docType == "INVOICE":
return _PROMPT_INVOICE.format(expenseList=expenseList, bankList=bankList)
return _PROMPT_FALLBACK.format(expenseList=expenseList, bankList=bankList)
async def _getAccountLists(self, featureInstanceId: str) -> Tuple[str, str]:
"""Load expense and bank account lists from the connected accounting system for use in prompts.
Returns (expenseList, bankList). Empty strings if not configured or on error."""
try: try:
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
from modules.features.trustee.accounting.accountingBridge import AccountingBridge from modules.features.trustee.accounting.accountingBridge import AccountingBridge
@ -50,25 +113,30 @@ async def _buildDefaultPromptWithAccounts(self, featureInstanceId: str) -> str:
assetAccounts = await bridge.getChartOfAccounts(featureInstanceId, accountType="asset") assetAccounts = await bridge.getChartOfAccounts(featureInstanceId, accountType="asset")
except Exception as e: except Exception as e:
logger.debug("Could not load chart of accounts for prompt: %s", e) logger.debug("Could not load chart of accounts for prompt: %s", e)
return "" return ("", "")
if not expenseAccounts: if not expenseAccounts:
return "" return ("", "")
expenseList = ", ".join(f"{a.accountNumber} {a.label}" for a in expenseAccounts[:50]) expenseList = ", ".join(f"{a.accountNumber} {a.label}" for a in expenseAccounts[:50])
bankAccounts = [a for a in assetAccounts if a.accountNumber.startswith("10")] bankAccounts = [a for a in assetAccounts if a.accountNumber.startswith("10")]
bankList = ", ".join(f"{a.accountNumber} {a.label}" for a in bankAccounts[:10]) if bankAccounts else "1020 Bank" bankList = ", ".join(f"{a.accountNumber} {a.label}" for a in bankAccounts[:10]) if bankAccounts else "1020 Bank"
return (expenseList, bankList)
return (
'Extract document type (one of: INVOICE, EXPENSE_RECEIPT, BANK_DOCUMENT, CONTRACT, UNKNOWN) ' def _parseStructuredRecords(raw: str) -> List[Dict[str, Any]]:
'and expense/position records. Return JSON: {"documentType": "...", "records": [{...}]}. ' """Parse phase 2 AI response (JSON with records or CSV) into list of record dicts."""
'Each record must have: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc (full extracted text), ' from modules.shared.jsonUtils import stripCodeFences, extractFirstBalancedJson
'tags (from: customer, meeting, license, subscription, fuel, food, material), '
'bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, ' records: List[Dict[str, Any]] = []
f'debitAccountNumber (Soll-Konto, verwende eines der folgenden Aufwandkonten: {expenseList}), ' cleaned = extractFirstBalancedJson(stripCodeFences((raw or "").strip()))
f'creditAccountNumber (Haben-Konto, verwende eines der folgenden Konten: {bankList}), ' try:
'taxCode, costCenter, bookingReference.' data = json.loads(cleaned)
) records = data.get("records") or data.get("extractedData") or []
except Exception:
if cleaned:
records = _parseCsvToRecords(cleaned)
return records if isinstance(records, list) else []
def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]: def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
@ -85,26 +153,40 @@ def _parseCsvToRecords(csvContent: str) -> List[Dict[str, Any]]:
content = "\n".join(lines) content = "\n".join(lines)
reader = csv.DictReader(io.StringIO(content)) reader = csv.DictReader(io.StringIO(content))
for row in reader: for row in reader:
cleaned = {k.strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()} cleaned = {(k.strip() if k else k): (v.strip() if isinstance(v, str) else v) for k, v in row.items() if k}
records.append(cleaned) records.append(cleaned)
except Exception as e: except Exception as e:
logger.warning(f"Parse CSV: {e}") logger.warning(f"Parse CSV: {e}")
return records return records
async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str, mimeType: str, prompt: str, featureInstanceId: str) -> Dict[str, Any]: async def _extractWithAi(
"""Run AI extraction on one file; return { documentType, extractedData (records), fileId, fileName }.""" self,
chatDocumentId: str,
fileId: str,
fileName: str,
mimeType: str,
expenseList: str,
bankList: str,
featureInstanceId: str,
) -> Dict[str, Any]:
"""Run 2-phase AI extraction: (1) classify + full text, (2) structure by type. Returns { documentType, extractedData, fileId, fileName }."""
await self.services.ai.ensureAiObjectsInitialized() await self.services.ai.ensureAiObjectsInitialized()
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
docList = DocumentReferenceList( docList = DocumentReferenceList(
references=[DocumentItemReference(documentId=chatDocumentId, fileName=fileName)] references=[DocumentItemReference(documentId=chatDocumentId, fileName=fileName)]
) )
# Prefer JSON for documentType + records in one response; fallback to CSV
options = AiCallOptions(resultFormat="json", operationType=OperationTypeEnum.DATA_GENERATE)
try: try:
aiResponse = await self.services.ai.callAiContent( self.services.utils.writeDebugFile(_CLASSIFICATION_PROMPT, "trustee_classification_prompt")
prompt=prompt or _DEFAULT_PROMPT_FALLBACK, except Exception:
pass
options = AiCallOptions(resultFormat="json", operationType=OperationTypeEnum.DATA_EXTRACT)
try:
phase1Response = await self.services.ai.callAiContent(
prompt=_CLASSIFICATION_PROMPT,
options=options, options=options,
documentList=docList, documentList=docList,
contentParts=None, contentParts=None,
@ -112,9 +194,9 @@ async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str,
generationIntent="extract", generationIntent="extract",
) )
except Exception: except Exception:
options = AiCallOptions(resultFormat="csv", operationType=OperationTypeEnum.DATA_GENERATE) options = AiCallOptions(resultFormat="csv", operationType=OperationTypeEnum.DATA_EXTRACT)
aiResponse = await self.services.ai.callAiContent( phase1Response = await self.services.ai.callAiContent(
prompt=prompt or _DEFAULT_PROMPT_FALLBACK, prompt=_CLASSIFICATION_PROMPT,
options=options, options=options,
documentList=docList, documentList=docList,
contentParts=None, contentParts=None,
@ -122,63 +204,50 @@ async def _extractWithAi(self, chatDocumentId: str, fileId: str, fileName: str,
generationIntent="extract", generationIntent="extract",
) )
if not aiResponse or not aiResponse.documents: if not phase1Response or not phase1Response.documents:
return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName} return {"documentType": "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName}
doc = aiResponse.documents[0] raw1 = phase1Response.documents[0].documentData
raw = doc.documentData if isinstance(raw1, bytes):
if isinstance(raw, bytes): raw1 = raw1.decode("utf-8")
raw = raw.decode("utf-8") documentType, rawText = _parseClassificationResult(raw1 or "")
documentType = "UNKNOWN" if not rawText:
records = [] return {"documentType": documentType or "UNKNOWN", "extractedData": [], "fileId": fileId, "fileName": fileName}
# Try JSON first structuringPrompt = _buildStructuringPrompt(documentType, expenseList, bankList)
try: try:
if raw.strip().startswith("{"): self.services.utils.writeDebugFile(structuringPrompt, "trustee_structuring_prompt")
data = json.loads(raw)
# Direct format: {"documentType": "...", "records": [...]}
if "records" in data or "extractedData" in data:
documentType = (data.get("documentType") or "UNKNOWN").upper().replace(" ", "_")
records = data.get("records") or data.get("extractedData") or []
# Wrapped in document structure: {"documents": [{"sections": [{"elements": [{"content": {"code": "..."}}]}]}]}
elif "documents" in data:
for doc in data.get("documents", []):
for section in doc.get("sections", []):
for elem in section.get("elements", []):
code = (elem.get("content") or {}).get("code")
if code and isinstance(code, str):
try:
inner = json.loads(code)
if isinstance(inner, dict) and ("records" in inner or "documentType" in inner):
documentType = (inner.get("documentType") or "UNKNOWN").upper().replace(" ", "_")
records = inner.get("records") or inner.get("extractedData") or []
break
except Exception:
pass
if records:
break
if records:
break
elif "documentType" in data:
documentType = (data.get("documentType") or "UNKNOWN").upper().replace(" ", "_")
except Exception: except Exception:
pass pass
# Fallback: CSV fullPrompt = f"{structuringPrompt}\n\nDOKUMENT-TEXT:\n{rawText}"
if not records and raw: phase2Request = AiCallRequest(
records = _parseCsvToRecords(raw) prompt=fullPrompt,
if records and not documentType or documentType == "UNKNOWN": context="",
documentType = "EXPENSE_RECEIPT" options=AiCallOptions(resultFormat="json"),
)
phase2Response = await self.services.ai.callAi(phase2Request)
raw2 = (phase2Response.content or "").strip() if hasattr(phase2Response, "content") else ""
try:
self.services.utils.writeDebugFile(raw2 or "(empty)", "trustee_structuring_response")
except Exception:
pass
records = _parseStructuredRecords(raw2)
logger.info("Phase 2 result: documentType=%s, records=%d, raw2_length=%d", documentType, len(records), len(raw2))
return {"documentType": documentType, "extractedData": records, "fileId": fileId, "fileName": fileName} # fileId from caller for result if records and (not documentType or documentType == "UNKNOWN"):
documentType = "EXPENSE_RECEIPT"
return {"documentType": documentType or "UNKNOWN", "extractedData": records, "fileId": fileId, "fileName": fileName}
async def _extractOne( async def _extractOne(
self, self,
f: Dict[str, Any], f: Dict[str, Any],
fileIdToChatDocId: Dict[str, str], fileIdToChatDocId: Dict[str, str],
prompt: str, expenseList: str,
bankList: str,
featureInstanceId: str, featureInstanceId: str,
) -> ActionDocument: ) -> ActionDocument:
"""Run extraction for one file; returns success or error ActionDocument (never raises).""" """Run extraction for one file; returns success or error ActionDocument (never raises)."""
@ -197,7 +266,7 @@ async def _extractOne(
) )
try: try:
out = await _extractWithAi( out = await _extractWithAi(
self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], prompt, featureInstanceId self, chatDocId, f["fileId"], f["fileName"], f["mimeType"], expenseList, bankList, featureInstanceId
) )
return ActionDocument( return ActionDocument(
documentName=f.get("fileName", "extract") + ".json", documentName=f.get("fileName", "extract") + ".json",
@ -229,7 +298,6 @@ async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult:
connectionReference = parameters.get("connectionReference") connectionReference = parameters.get("connectionReference")
sharepointFolder = parameters.get("sharepointFolder") sharepointFolder = parameters.get("sharepointFolder")
featureInstanceId = parameters.get("featureInstanceId") or getattr(self.services, "featureInstanceId", None) featureInstanceId = parameters.get("featureInstanceId") or getattr(self.services, "featureInstanceId", None)
prompt = parameters.get("prompt") or ""
if not featureInstanceId: if not featureInstanceId:
return ActionResult.isFailure(error="featureInstanceId is required") return ActionResult.isFailure(error="featureInstanceId is required")
@ -329,13 +397,11 @@ async def extractFromFiles(self, parameters: Dict[str, Any]) -> ActionResult:
if i < len(createdMessage.documents): if i < len(createdMessage.documents):
fileIdToChatDocId[f["fileId"]] = createdMessage.documents[i].id fileIdToChatDocId[f["fileId"]] = createdMessage.documents[i].id
# Load expense accounts from accounting system for AI prompt (if configured) expenseList, bankList = await _getAccountLists(self, featureInstanceId)
if not prompt:
prompt = await _buildDefaultPromptWithAccounts(self, featureInstanceId)
# Parallel extraction (all files at once) # Parallel extraction (all files at once, 2-phase: classify + structure)
tasks = [ tasks = [
_extractOne(self, f, fileIdToChatDocId, prompt, featureInstanceId) _extractOne(self, f, fileIdToChatDocId, expenseList, bankList, featureInstanceId)
for f in filesToProcess for f in filesToProcess
] ]
resultDocuments = list(await asyncio.gather(*tasks)) resultDocuments = list(await asyncio.gather(*tasks))

View file

@ -27,6 +27,24 @@ def _parseFloat(value) -> float:
return 0.0 return 0.0
def _extractAccountNumber(value) -> Optional[str]:
"""Extract the leading numeric account number from AI output like '6200 Fahrzeugaufwand' -> '6200'."""
if not value or not isinstance(value, str):
return None
import re
match = re.match(r"(\d+)", value.strip())
return match.group(1) if match else value.strip() or None
def _normaliseTags(value) -> str:
"""Convert tags from various formats to a clean comma-separated string."""
if not value:
return ""
if isinstance(value, list):
return ", ".join(str(t) for t in value if t)
return str(value)
def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], featureInstanceId: str, mandateId: str) -> Dict[str, Any]: def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], featureInstanceId: str, mandateId: str) -> Dict[str, Any]:
"""Map extraction record to TrusteePosition payload.""" """Map extraction record to TrusteePosition payload."""
return { return {
@ -35,15 +53,15 @@ def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], feature
"transactionDateTime": record.get("transactionDateTime"), "transactionDateTime": record.get("transactionDateTime"),
"company": record.get("company", ""), "company": record.get("company", ""),
"desc": record.get("desc", ""), "desc": record.get("desc", ""),
"tags": record.get("tags", ""), "tags": _normaliseTags(record.get("tags")),
"bookingCurrency": record.get("bookingCurrency", "CHF"), "bookingCurrency": record.get("bookingCurrency", "CHF"),
"bookingAmount": _parseFloat(record.get("bookingAmount", 0)), "bookingAmount": _parseFloat(record.get("bookingAmount", 0)),
"originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"), "originalCurrency": record.get("originalCurrency") or record.get("bookingCurrency", "CHF"),
"originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)), "originalAmount": _parseFloat(record.get("originalAmount", 0)) or _parseFloat(record.get("bookingAmount", 0)),
"vatPercentage": _parseFloat(record.get("vatPercentage", 0)), "vatPercentage": _parseFloat(record.get("vatPercentage", 0)),
"vatAmount": _parseFloat(record.get("vatAmount", 0)), "vatAmount": _parseFloat(record.get("vatAmount", 0)),
"debitAccountNumber": record.get("debitAccountNumber") or None, "debitAccountNumber": _extractAccountNumber(record.get("debitAccountNumber")),
"creditAccountNumber": record.get("creditAccountNumber") or None, "creditAccountNumber": _extractAccountNumber(record.get("creditAccountNumber")),
"taxCode": record.get("taxCode") or None, "taxCode": record.get("taxCode") or None,
"costCenter": record.get("costCenter") or None, "costCenter": record.get("costCenter") or None,
"bookingReference": record.get("bookingReference") or None, "bookingReference": record.get("bookingReference") or None,