platform-core/modules/features/trustee/handlerTrusteeAccounting.py
ValueOn AG 4a60086c80
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 15s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
cp adapted to 2026 poweron
2026-06-09 09:53:31 +02:00

371 lines
15 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""
Business logic for Trustee accounting integration endpoints.
Extracted from routeFeatureTrustee.py for maintainability.
"""
import json
import logging
import time
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
_CONFIG_PLACEHOLDER = "***"
class SaveAccountingConfigBody(BaseModel):
"""Request body for saving accounting config."""
connectorType: str = ""
displayLabel: str = ""
config: Dict[str, Any] = Field(default_factory=dict, description="Connector credentials (e.g. clientName, apiKey)")
def getConfigMasked(connectorType: str, plainConfig: Dict[str, Any]) -> Dict[str, str]:
"""Build config with secret values replaced by placeholder for GET response."""
from .accounting.accountingRegistry import getAccountingRegistry
connector = getAccountingRegistry().getConnector(connectorType)
if not connector:
return {k: (v if isinstance(v, str) else str(v)) for k, v in (plainConfig or {}).items()}
secretKeys = {f.key for f in connector.getRequiredConfigFields() if f.secret}
return {
k: _CONFIG_PLACEHOLDER if k in secretKeys else (v if isinstance(v, str) else str(v) if v is not None else "")
for k, v in (plainConfig or {}).items()
}
async def refreshChartSilently(interface, instanceId: str) -> None:
"""Best-effort chart-of-accounts cache refresh. Logs but does not raise on failure."""
try:
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
charts = await bridge.refreshChartOfAccounts(instanceId)
logger.info(f"Chart cache refreshed: {len(charts)} entries for instance {instanceId}")
except Exception as e:
logger.warning(f"Chart cache refresh failed (non-critical): {e}")
def readAccountingConfig(interface, instanceId: str) -> Dict[str, Any]:
"""Read and return the masked accounting config for an instance."""
from .datamodelFeatureTrustee import TrusteeAccountingConfig
from modules.shared.configuration import decryptValue
records = interface.db.getRecordset(
TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId, "isActive": True}
)
if not records:
return {"configured": False}
record = {k: v for k, v in records[0].items() if not k.startswith("_")}
encryptedConfig = record.pop("encryptedConfig", None)
record["configured"] = True
if encryptedConfig:
try:
plain = json.loads(decryptValue(encryptedConfig, keyName="accountingConfig"))
record["configMasked"] = getConfigMasked(record.get("connectorType", ""), plain)
except Exception:
record["configMasked"] = {}
else:
record["configMasked"] = {}
return record
async def saveAccountingConfig(interface, instanceId: str, mandateId: str, body: "SaveAccountingConfigBody") -> Dict[str, Any]:
"""Save or update accounting config with encrypted credentials and config merging."""
import uuid as _uuid
from .datamodelFeatureTrustee import TrusteeAccountingConfig
from modules.shared.configuration import encryptValue, decryptValue
plainConfig = body.config if isinstance(body.config, dict) else {}
if not plainConfig and body.connectorType:
logger.warning("Accounting config save: config is empty (credentials will not be stored or updated)")
else:
logger.info(
"Accounting config save: instanceId=%s connectorType=%s configKeys=%s",
instanceId, body.connectorType, list(plainConfig.keys())
)
existing = interface.db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId})
if existing:
configId = existing[0].get("id")
updatePayload = {
"connectorType": body.connectorType or "",
"displayLabel": body.displayLabel or "",
"isActive": True,
}
if plainConfig:
existingEnc = existing[0].get("encryptedConfig") or ""
merged = {}
if existingEnc:
try:
merged = json.loads(decryptValue(existingEnc, keyName="accountingConfig"))
except Exception:
pass
for k, v in plainConfig.items():
if v is not None and str(v).strip() and str(v).strip() != _CONFIG_PLACEHOLDER:
merged[k] = v
updatePayload["encryptedConfig"] = encryptValue(json.dumps(merged), keyName="accountingConfig")
interface.db.recordModify(TrusteeAccountingConfig, configId, updatePayload)
await refreshChartSilently(interface, instanceId)
return {"message": "Accounting config updated", "id": configId}
if not plainConfig:
return None # Signal to route handler: raise 400
encryptedConfig = encryptValue(json.dumps(plainConfig), keyName="accountingConfig")
configRecord = {
"id": str(_uuid.uuid4()),
"featureInstanceId": instanceId,
"connectorType": body.connectorType or "",
"displayLabel": body.displayLabel or "",
"encryptedConfig": encryptedConfig,
"isActive": True,
"mandateId": mandateId,
}
interface.db.recordCreate(TrusteeAccountingConfig, configRecord)
await refreshChartSilently(interface, instanceId)
return {"message": "Accounting config created", "id": configRecord["id"]}
def getImportStatus(interface, instanceId: str) -> Dict[str, Any]:
"""Get counts of imported TrusteeData* records for this instance."""
from .datamodelFeatureTrustee import (
TrusteeDataAccount, TrusteeDataJournalEntry, TrusteeDataJournalLine,
TrusteeDataContact, TrusteeDataAccountBalance, TrusteeAccountingConfig,
)
filt = {"featureInstanceId": instanceId}
counts = {
"accounts": len(interface.db.getRecordset(TrusteeDataAccount, recordFilter=filt) or []),
"journalEntries": len(interface.db.getRecordset(TrusteeDataJournalEntry, recordFilter=filt) or []),
"journalLines": len(interface.db.getRecordset(TrusteeDataJournalLine, recordFilter=filt) or []),
"contacts": len(interface.db.getRecordset(TrusteeDataContact, recordFilter=filt) or []),
"accountBalances": len(interface.db.getRecordset(TrusteeDataAccountBalance, recordFilter=filt) or []),
}
cfgRecords = interface.db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId, "isActive": True})
if cfgRecords:
cfg = cfgRecords[0]
counts["lastSyncAt"] = cfg.get("lastSyncAt")
counts["lastSyncStatus"] = cfg.get("lastSyncStatus")
counts["lastSyncErrorMessage"] = cfg.get("lastSyncErrorMessage")
counts["lastSyncDateFrom"] = cfg.get("lastSyncDateFrom")
counts["lastSyncDateTo"] = cfg.get("lastSyncDateTo")
counts["lastSyncCounts"] = cfg.get("lastSyncCounts")
return counts
def wipeImportedData(interface, instanceId: str) -> Dict[str, Any]:
"""Delete all TrusteeData* rows imported for this instance and reset sync markers."""
from .datamodelFeatureTrustee import (
TrusteeDataAccount, TrusteeDataJournalEntry, TrusteeDataJournalLine,
TrusteeDataContact, TrusteeDataAccountBalance, TrusteeAccountingConfig,
)
from modules.serviceCenter.services.serviceAgent.coreTools._featureSubAgentTools import clearFeatureQueryCache
removed: Dict[str, int] = {}
for tableName, model in [
("accounts", TrusteeDataAccount),
("journalEntries", TrusteeDataJournalEntry),
("journalLines", TrusteeDataJournalLine),
("contacts", TrusteeDataContact),
("accountBalances", TrusteeDataAccountBalance),
]:
try:
removed[tableName] = int(interface.db.recordDeleteWhere(model, {"featureInstanceId": instanceId}) or 0)
except Exception as ex:
logger.warning("wipeImportedData: failed for %s: %s", tableName, ex)
removed[tableName] = 0
cfgRecords = interface.db.getRecordset(
TrusteeAccountingConfig,
recordFilter={"featureInstanceId": instanceId, "isActive": True},
)
if cfgRecords:
cfgId = cfgRecords[0].get("id")
if cfgId:
try:
interface.db.recordModify(TrusteeAccountingConfig, cfgId, {
"lastSyncAt": None,
"lastSyncStatus": None,
"lastSyncErrorMessage": None,
"lastSyncDateFrom": None,
"lastSyncDateTo": None,
"lastSyncCounts": None,
})
except Exception as ex:
logger.warning("wipeImportedData: failed to reset lastSync* on cfg %s: %s", cfgId, ex)
cacheCleared = clearFeatureQueryCache(instanceId)
logger.info("wipeImportedData instance=%s removed=%s cacheCleared=%s", instanceId, removed, cacheCleared)
return {
"removed": removed,
"totalRemoved": sum(removed.values()),
"cacheCleared": cacheCleared,
"featureInstanceId": instanceId,
}
def exportAccountingData(interface, instanceId: str, mandateId: str) -> Dict[str, Any]:
"""Build the export payload for all TrusteeData* tables for this instance."""
from .datamodelFeatureTrustee import (
TrusteeDataAccount, TrusteeDataJournalEntry, TrusteeDataJournalLine,
TrusteeDataContact, TrusteeDataAccountBalance, TrusteeAccountingConfig,
)
_filter = {"featureInstanceId": instanceId}
tables: Dict[str, Any] = {}
for tableName, model in [
("TrusteeDataAccount", TrusteeDataAccount),
("TrusteeDataJournalEntry", TrusteeDataJournalEntry),
("TrusteeDataJournalLine", TrusteeDataJournalLine),
("TrusteeDataContact", TrusteeDataContact),
("TrusteeDataAccountBalance", TrusteeDataAccountBalance),
]:
records = interface.db.getRecordset(model, recordFilter=_filter) or []
tables[tableName] = records
cfgRecords = interface.db.getRecordset(
TrusteeAccountingConfig,
recordFilter={"featureInstanceId": instanceId, "isActive": True},
)
syncInfo = {}
if cfgRecords:
cfg = cfgRecords[0]
syncInfo = {
"connectorType": cfg.get("connectorType", ""),
"lastSyncAt": cfg.get("lastSyncAt"),
"lastSyncStatus": cfg.get("lastSyncStatus", ""),
}
return {
"exportedAt": time.time(),
"featureInstanceId": instanceId,
"mandateId": mandateId,
"syncInfo": syncInfo,
"tables": tables,
}
# ---------------------------------------------------------------------------
# Background Job Handlers
# ---------------------------------------------------------------------------
TRUSTEE_ACCOUNTING_PUSH_JOB_TYPE = "trusteeAccountingPush"
TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE = "trusteeAccountingSync"
async def accountingPushJobHandler(job: Dict[str, Any], progressCb) -> Dict[str, Any]:
"""BackgroundJob handler: pushes a batch of positions to the external accounting system."""
from modules.security.rootAccess import getRootUser
from .accounting.accountingBridge import AccountingBridge, SyncResult
from .interfaceFeatureTrustee import getInterface
instanceId = job["featureInstanceId"]
mandateId = job["mandateId"]
payload = job.get("payload") or {}
positionIds: List[str] = list(payload.get("positionIds") or [])
if not positionIds:
return {"total": 0, "success": 0, "skipped": 0, "errors": 0, "results": []}
rootUser = getRootUser()
interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId)
bridge = AccountingBridge(interface)
results = []
total = len(positionIds)
progressCb(
2,
messageKey="Sync wird vorbereitet ({total} Position(en))...",
messageParams={"total": total},
)
try:
connector, plainConfig, configRecord = await bridge._resolveConnectorAndConfig(instanceId)
except Exception as resolveErr:
logger.exception("Accounting push: failed to resolve connector/config")
progressCb(100, messageKey="Verbindungsaufbau fehlgeschlagen.")
raise resolveErr
if not connector or not plainConfig:
results = [SyncResult(success=False, errorMessage="No active accounting configuration found") for _ in positionIds]
progressCb(100, messageKey="Keine aktive Buchhaltungs-Konfiguration gefunden.")
return {
"total": len(results),
"success": 0,
"skipped": 0,
"errors": len(results),
"results": [r.model_dump() for r in results],
}
for index, positionId in enumerate(positionIds, start=1):
result = await bridge.pushPositionToAccounting(
instanceId,
positionId,
_resolvedConnector=connector,
_resolvedPlainConfig=plainConfig,
_resolvedConfigRecord=configRecord,
)
results.append(result)
pct = 5 + int(90 * index / total)
progressCb(
pct,
messageKey="Position {index}/{total} verarbeitet",
messageParams={"index": index, "total": total},
)
skipped = [r for r in results if not r.success and r.errorMessage and "already synced" in r.errorMessage]
failed = [r for r in results if not r.success and r not in skipped]
if skipped:
logger.info("Accounting sync: %s position(s) already synced, skipped", len(skipped))
if failed:
logger.warning(
"Accounting sync had %s failure(s): %s",
len(failed),
"; ".join(r.errorMessage or "unknown" for r in failed[:3]),
)
progressCb(100, messageKey="Sync abgeschlossen.")
return {
"total": len(results),
"success": sum(1 for r in results if r.success),
"skipped": len(skipped),
"errors": len(failed),
"results": [r.model_dump() for r in results],
}
async def accountingSyncJobHandler(job: Dict[str, Any], progressCb) -> Dict[str, Any]:
"""BackgroundJob handler: imports accounting data from the external system."""
from modules.security.rootAccess import getRootUser
from .accounting.accountingDataSync import AccountingDataSync
from .interfaceFeatureTrustee import getInterface
instanceId = job["featureInstanceId"]
mandateId = job["mandateId"]
payload = job.get("payload") or {}
rootUser = getRootUser()
progressCb(5, messageKey="Initialisiere Import...")
interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId)
sync = AccountingDataSync(interface)
progressCb(10, messageKey="Verbinde mit Buchhaltungssystem...")
result = await sync.importData(
featureInstanceId=instanceId,
mandateId=mandateId,
dateFrom=payload.get("dateFrom"),
dateTo=payload.get("dateTo"),
progressCb=progressCb,
)
progressCb(100, messageKey="Import abgeschlossen.")
return result
# Register background job handlers
try:
from modules.serviceCenter.services.serviceBackgroundJobs import registerJobHandler
registerJobHandler(TRUSTEE_ACCOUNTING_PUSH_JOB_TYPE, accountingPushJobHandler)
registerJobHandler(TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE, accountingSyncJobHandler)
except Exception as _regErr:
logger.warning("Failed to register accounting job handlers: %s", _regErr)