gateway/modules/features/trustee/accounting/accountingBridge.py
2026-02-19 00:31:32 +01:00

186 lines
8 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Accounting bridge: standardised interface between Trustee and external accounting systems.
Encapsulates: config loading -> connector resolution -> duplicate check -> push -> sync record.
"""
import logging
import time
from typing import List, Dict, Any, Optional
from .accountingConnectorBase import (
AccountingBooking,
AccountingBookingLine,
AccountingChart,
SyncResult,
)
from .accountingRegistry import _getAccountingRegistry
logger = logging.getLogger(__name__)
class AccountingBridge:
"""Routes accounting operations through the correct connector for a feature instance."""
def __init__(self, trusteeInterface):
self._trusteeInterface = trusteeInterface
self._registry = _getAccountingRegistry()
async def getActiveConfig(self, featureInstanceId: str) -> Optional[Dict[str, Any]]:
"""Load the active TrusteeAccountingConfig for a feature instance."""
from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig
records = self._trusteeInterface.db.getRecordset(
TrusteeAccountingConfig,
recordFilter={"featureInstanceId": featureInstanceId, "isActive": True},
)
if not records:
return None
record = records[0]
return {k: v for k, v in record.items() if not k.startswith("_")}
def _decryptConfig(self, encryptedConfig: str) -> Dict[str, Any]:
"""Decrypt the stored connector config JSON."""
from modules.shared.configuration import decryptValue
import json
try:
decrypted = decryptValue(encryptedConfig, keyName="accountingConfig")
return json.loads(decrypted) if isinstance(decrypted, str) else decrypted
except Exception as e:
logger.error(f"Failed to decrypt accounting config: {e}")
return {}
async def _resolveConnectorAndConfig(self, featureInstanceId: str):
"""Load config, decrypt, resolve connector. Returns (connector, plainConfig, accountingConfigRecord)."""
configRecord = await self.getActiveConfig(featureInstanceId)
if not configRecord:
return None, None, None
connectorType = configRecord.get("connectorType")
connector = self._registry.getConnector(connectorType)
if not connector:
logger.error(f"Accounting connector '{connectorType}' not found")
return None, None, configRecord
plainConfig = self._decryptConfig(configRecord.get("encryptedConfig", ""))
return connector, plainConfig, configRecord
def _buildBookingFromPosition(self, position: Dict[str, Any]) -> AccountingBooking:
"""Build a standardised AccountingBooking from a TrusteePosition record."""
lines = []
debitAccount = position.get("debitAccountNumber")
creditAccount = position.get("creditAccountNumber")
amount = abs(position.get("bookingAmount", 0))
if debitAccount:
lines.append(AccountingBookingLine(
accountNumber=debitAccount,
debitAmount=amount,
currency=position.get("bookingCurrency", "CHF"),
taxCode=position.get("taxCode"),
taxRate=position.get("vatPercentage"),
description=position.get("desc", ""),
costCenter=position.get("costCenter"),
reference=position.get("bookingReference"),
))
if creditAccount:
lines.append(AccountingBookingLine(
accountNumber=creditAccount,
creditAmount=amount,
currency=position.get("bookingCurrency", "CHF"),
description=position.get("desc", ""),
costCenter=position.get("costCenter"),
))
return AccountingBooking(
reference=position.get("bookingReference") or position.get("id", ""),
bookingDate=position.get("valuta") or "",
description=position.get("desc", ""),
lines=lines,
)
async def pushPositionToAccounting(self, featureInstanceId: str, positionId: str) -> SyncResult:
"""Push a single position to the configured accounting system.
1. Load config and connector
2. Load position data
3. Check for existing successful sync (duplicate guard)
4. Build AccountingBooking
5. Push via connector
6. Create TrusteeAccountingSync record
"""
from modules.features.trustee.datamodelFeatureTrustee import TrusteePosition, TrusteeAccountingSync
connector, plainConfig, configRecord = await self._resolveConnectorAndConfig(featureInstanceId)
if not connector or not plainConfig:
return SyncResult(success=False, errorMessage="No active accounting configuration found")
connectorType = configRecord.get("connectorType", "")
# Load position
posRecords = self._trusteeInterface.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
if not posRecords:
return SyncResult(success=False, errorMessage=f"Position {positionId} not found")
position = posRecords[0]
# Duplicate check
existingSyncs = self._trusteeInterface.db.getRecordset(
TrusteeAccountingSync,
recordFilter={"positionId": positionId, "connectorType": connectorType, "syncStatus": "synced"},
)
if existingSyncs:
return SyncResult(success=False, errorMessage="Position already synced to this system")
# Build and push
booking = self._buildBookingFromPosition(position)
result = await connector.pushBooking(plainConfig, booking)
# Save sync record
import uuid
syncRecord = {
"id": str(uuid.uuid4()),
"positionId": positionId,
"featureInstanceId": featureInstanceId,
"connectorType": connectorType,
"externalId": result.externalId,
"externalReference": result.externalReference,
"syncStatus": "synced" if result.success else "error",
"syncDirection": "push",
"syncedAt": time.time() if result.success else None,
"errorMessage": result.errorMessage,
"bookingPayload": booking.model_dump(),
"mandateId": self._trusteeInterface.mandateId,
}
self._trusteeInterface.db.recordCreate(TrusteeAccountingSync, syncRecord)
# Update last sync on config record
if configRecord:
from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig
self._trusteeInterface.db.recordModify(TrusteeAccountingConfig, configRecord["id"], {
"lastSyncAt": time.time(),
"lastSyncStatus": "success" if result.success else "error",
})
return result
async def pushBatchToAccounting(self, featureInstanceId: str, positionIds: List[str]) -> List[SyncResult]:
"""Push multiple positions sequentially."""
results = []
for positionId in positionIds:
result = await self.pushPositionToAccounting(featureInstanceId, positionId)
results.append(result)
return results
async def getChartOfAccounts(self, featureInstanceId: str) -> List[AccountingChart]:
"""Load the chart of accounts from the configured external system."""
connector, plainConfig, _ = await self._resolveConnectorAndConfig(featureInstanceId)
if not connector or not plainConfig:
return []
return await connector.getChartOfAccounts(plainConfig)
async def testConnection(self, featureInstanceId: str) -> SyncResult:
"""Test the connection with the configured accounting system."""
connector, plainConfig, _ = await self._resolveConnectorAndConfig(featureInstanceId)
if not connector or not plainConfig:
return SyncResult(success=False, errorMessage="No active accounting configuration found")
return await connector.testConnection(plainConfig)