gateway/modules/features/trustee/accounting/accountingDataSync.py
2026-04-26 18:11:42 +02:00

795 lines
34 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Orchestrates importing accounting data from external systems into TrusteeData* tables.
Flow per phase:
1. async fetch from external system (HTTP, awaits cleanly on the event loop)
2. await asyncio.to_thread(...) for the DB write phase, so the heavy
synchronous psycopg2 calls do NOT block the FastAPI event loop
3. inside the worker thread we use bulk delete / bulk insert (single
transaction per phase) instead of N+1 single-row operations
Why this matters: a typical accounting sync is ~10k-100k rows. The old
implementation called ``recordCreate`` row-by-row on the event loop, which
froze every other request (chat, health-check, etc.) for minutes. See
``local/notes/changelog.txt`` for the original incident analysis.
"""
import asyncio
import json as _json
import logging
import os
import time
from collections import defaultdict
from datetime import datetime as _dt, timezone as _tz
from pathlib import Path
from typing import Callable, Dict, Any, List, Optional, Type
from .accountingConnectorBase import BaseAccountingConnector
from .accountingRegistry import getAccountingRegistry
logger = logging.getLogger(__name__)
_HEARTBEAT_EVERY = 500
def _isoDateToTimestamp(raw: Any) -> Optional[float]:
"""Convert an ISO date string (``YYYY-MM-DD`` or datetime) to a UTC
midnight unix timestamp. Returns ``None`` only when *raw* is
falsy/None. Raises ``ValueError`` for non-empty but unparseable
values so import errors are never silently swallowed.
"""
if raw is None or raw == "":
return None
s = str(raw).split("T")[0].strip()[:10]
if not s:
return None
try:
return _dt.strptime(s, "%Y-%m-%d").replace(tzinfo=_tz.utc).timestamp()
except ValueError:
raise ValueError(f"Cannot parse bookingDate '{raw}' as YYYY-MM-DD")
def _isIncomeStatementAccount(accountNumber: str) -> bool:
"""Swiss KMU-Kontenrahmen heuristic: 1xxx + 2xxx -> balance sheet
(cumulative carry-over across years); 3xxx..9xxx -> income statement
(reset to 0 at fiscal-year start). Used by the local fallback only;
when a connector returns balances, those values are used verbatim.
"""
a = (accountNumber or "").strip()
if not a or not a[0].isdigit():
return False
return a[0] not in ("1", "2")
def _resolveBalanceYears(
dateFrom: Optional[str],
dateTo: Optional[str],
oldestBookingDate: Optional[str],
newestBookingDate: Optional[str],
) -> List[int]:
"""Derive the list of years for which the connector should compute balances.
Prefers the ``dateFrom``/``dateTo`` import window the user requested. Falls
back to the actual oldest/newest booking date observed in the imported
journal (so e.g. a `dateTo=None` import still produces balances for every
year that has data). If nothing is known, returns the current year as a
sensible default.
"""
def _yearOf(s: Optional[str]) -> Optional[int]:
if not s:
return None
try:
return int(str(s)[:4])
except (TypeError, ValueError):
return None
fromYear = _yearOf(dateFrom) or _yearOf(oldestBookingDate)
toYear = _yearOf(dateTo) or _yearOf(newestBookingDate)
if fromYear is None and toYear is None:
return [time.gmtime().tm_year]
if fromYear is None:
fromYear = toYear
if toYear is None:
toYear = fromYear
if toYear < fromYear:
fromYear, toYear = toYear, fromYear
return list(range(fromYear, toYear + 1))
def _balanceModelToRow(b: Any, scope: Dict[str, Any]) -> Dict[str, Any]:
"""Map an ``AccountingPeriodBalance`` (or compatible dict) to a DB row."""
if isinstance(b, dict):
get = b.get
else:
get = lambda k, default=None: getattr(b, k, default)
return {
"accountNumber": str(get("accountNumber", "") or ""),
"periodYear": int(get("periodYear", 0) or 0),
"periodMonth": int(get("periodMonth", 0) or 0),
"openingBalance": round(float(get("openingBalance", 0) or 0), 2),
"debitTotal": round(float(get("debitTotal", 0) or 0), 2),
"creditTotal": round(float(get("creditTotal", 0) or 0), 2),
"closingBalance": round(float(get("closingBalance", 0) or 0), 2),
"currency": str(get("currency", "CHF") or "CHF"),
**scope,
}
def _isDebugDumpEnabled() -> bool:
"""Whether to write raw connector payloads to disk for offline inspection.
Controlled by ``APP_DEBUG_ACCOUNTING_SYNC_ENABLED``. Default False so that
INT/PROD never spend disk/IO/RAM on dumping 7-figure JSON files. Mirrors
the existing ``APP_DEBUG_CHAT_WORKFLOW_ENABLED`` pattern.
"""
try:
from modules.shared.configuration import APP_CONFIG
raw = APP_CONFIG.get("APP_DEBUG_ACCOUNTING_SYNC_ENABLED", False)
if isinstance(raw, bool):
return raw
return str(raw).strip().lower() in {"true", "1", "yes", "on"}
except Exception:
return False
def _resolveDebugDumpDir() -> Optional[Path]:
"""Resolve the debug dump directory. Returns None if dumping is disabled
or the path could not be created."""
if not _isDebugDumpEnabled():
return None
try:
from modules.shared.configuration import APP_CONFIG
configured = APP_CONFIG.get("APP_DEBUG_ACCOUNTING_SYNC_DIR", None)
if not configured:
return None
path = Path(str(configured))
if not path.is_absolute():
gatewayDir = Path(__file__).resolve().parents[4]
path = gatewayDir / configured
path.mkdir(parents=True, exist_ok=True)
return path
except Exception as ex:
logger.warning(f"Could not resolve debug dump dir: {ex}")
return None
def _dumpSyncData(tag: str, rows: list) -> None:
"""Write raw connector data to a timestamped JSON file. No-op unless
``APP_DEBUG_ACCOUNTING_SYNC_ENABLED`` is true AND the configured dir
resolves."""
dumpDir = _resolveDebugDumpDir()
if dumpDir is None:
return
try:
ts = time.strftime("%Y%m%d-%H%M%S")
path = dumpDir / f"{ts}_{tag}.json"
serializable = []
for r in rows:
if isinstance(r, dict):
serializable.append(r)
elif hasattr(r, "__dict__"):
serializable.append({k: v for k, v in r.__dict__.items() if not k.startswith("_")})
else:
serializable.append(str(r))
with open(path, "w", encoding="utf-8") as f:
_json.dump({"count": len(serializable), "rows": serializable}, f, ensure_ascii=False, indent=2, default=str)
logger.info(f"Debug sync dump: {path.name} ({len(serializable)} rows)")
except Exception as e:
logger.warning(f"Failed to write debug sync dump for {tag}: {e}")
class AccountingDataSync:
"""Imports accounting data (read-only) from an external system into local TrusteeData* tables."""
def __init__(self, trusteeInterface):
self._if = trusteeInterface
self._registry = getAccountingRegistry()
async def importData(
self,
featureInstanceId: str,
mandateId: str,
dateFrom: Optional[str] = None,
dateTo: Optional[str] = None,
progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
) -> Dict[str, Any]:
"""Run a full data import for a feature instance.
Returns a summary dict with counts per entity and any errors. All heavy
DB work is offloaded to a worker thread via ``asyncio.to_thread`` so
the event loop remains responsive for other requests.
``progressCb(percent, message)`` -- if provided, called at every phase
boundary so the UI poll on ``GET /api/jobs/{jobId}`` shows real
movement instead of jumping from 10 % to 100 %. Safe to omit.
"""
def _progress(pct: int, msg: str) -> None:
if progressCb is None:
return
try:
progressCb(pct, msg)
except Exception as ex:
logger.warning(f"progressCb failed at {pct}%: {ex}")
from modules.features.trustee.datamodelFeatureTrustee import (
TrusteeAccountingConfig,
TrusteeDataAccount,
TrusteeDataJournalEntry,
TrusteeDataJournalLine,
TrusteeDataContact,
TrusteeDataAccountBalance,
)
from modules.shared.configuration import decryptValue
summary: Dict[str, Any] = {
"accounts": 0,
"journalEntries": 0,
"journalLines": 0,
"contacts": 0,
"accountBalances": 0,
"oldestBookingDate": None,
"newestBookingDate": None,
"errors": [],
"startedAt": time.time(),
}
cfgRecords = self._if.db.getRecordset(
TrusteeAccountingConfig,
recordFilter={"featureInstanceId": featureInstanceId, "isActive": True},
)
if not cfgRecords:
summary["errors"].append("No active accounting configuration found")
return summary
cfgRecord = cfgRecords[0]
connectorType = cfgRecord.get("connectorType", "")
encryptedConfig = cfgRecord.get("encryptedConfig", "")
try:
plainJson = decryptValue(encryptedConfig)
connConfig = _json.loads(plainJson) if plainJson else {}
except Exception as e:
summary["errors"].append(f"Failed to decrypt config: {e}")
return summary
connector = self._registry.getConnector(connectorType)
if not connector:
summary["errors"].append(f"Unknown connector type: {connectorType}")
return summary
scope = {"featureInstanceId": featureInstanceId, "mandateId": mandateId}
logger.info(
f"AccountingDataSync starting for {featureInstanceId}, "
f"connector={connectorType}, dateFrom={dateFrom}, dateTo={dateTo}"
)
fetchedAccountNumbers: list = []
# ---- Phase 1: Chart of accounts ----
# Progress budget: 15-30 %. The fetch (15 %) is usually a single
# snappy API call; the persist step (30 %) is bulk-insert and finishes
# in <100 ms even for thousands of rows.
try:
_progress(15, "Lade Kontenplan...")
charts = await connector.getChartOfAccounts(connConfig)
_dumpSyncData("accounts", charts)
fetchedAccountNumbers = [acc.accountNumber for acc in charts if acc.accountNumber]
_progress(25, f"Speichere {len(charts)} Konten...")
written = await asyncio.to_thread(
self._persistAccounts, charts, scope, featureInstanceId, TrusteeDataAccount
)
summary["accounts"] = written
_progress(30, f"{written} Konten gespeichert.")
except Exception as e:
logger.error(f"Import accounts failed: {e}", exc_info=True)
summary["errors"].append(f"Accounts: {e}")
# ---- Phase 2: Journal entries + lines ----
# Progress budget: 35-65 %. Usually the longest phase; the external
# API often paginates per account, so the fetch alone can take 30+ s.
try:
_progress(35, "Lade Journaleintraege vom Buchhaltungssystem...")
rawEntries = await connector.getJournalEntries(
connConfig,
dateFrom=dateFrom,
dateTo=dateTo,
accountNumbers=fetchedAccountNumbers or None,
)
_dumpSyncData("journalEntries", rawEntries)
_progress(60, f"Speichere {len(rawEntries)} Buchungssaetze...")
entriesCount, linesCount, oldestDate, newestDate = await asyncio.to_thread(
self._persistJournal, rawEntries, scope, featureInstanceId,
TrusteeDataJournalEntry, TrusteeDataJournalLine,
)
summary["journalEntries"] = entriesCount
summary["journalLines"] = linesCount
summary["oldestBookingDate"] = oldestDate
summary["newestBookingDate"] = newestDate
_progress(65, f"{entriesCount} Saetze + {linesCount} Buchungszeilen gespeichert.")
except Exception as e:
logger.error(f"Import journal entries failed: {e}", exc_info=True)
summary["errors"].append(f"Journal entries: {e}")
# ---- Phase 3: Contacts (customers + vendors) ----
# Progress budget: 70-85 %. Two quick API calls + one bulk-insert.
try:
_progress(70, "Lade Kunden...")
customers = await connector.getCustomers(connConfig)
_dumpSyncData("customers", customers)
_progress(78, "Lade Lieferanten...")
vendors = await connector.getVendors(connConfig)
_dumpSyncData("vendors", vendors)
_progress(82, f"Speichere {len(customers) + len(vendors)} Kontakte...")
contactCount = await asyncio.to_thread(
self._persistContacts, customers, vendors, scope,
featureInstanceId, TrusteeDataContact,
)
summary["contacts"] = contactCount
_progress(85, f"{contactCount} Kontakte gespeichert.")
except Exception as e:
logger.error(f"Import contacts failed: {e}", exc_info=True)
summary["errors"].append(f"Contacts: {e}")
# ---- Phase 4: Account balances ----
# Progress budget: 88-95 %. Connector first (RMA -> /gl/saldo, Bexio
# & Abacus -> aggregated journal). On empty/failed connector output
# we fall back to a *correct* cumulative aggregation from the
# journal lines we just persisted.
connectorBalances: list = []
balanceSource = "local-fallback"
try:
_progress(88, "Lade Kontensaldi vom Buchhaltungssystem...")
balanceYears = _resolveBalanceYears(dateFrom, dateTo, summary.get("oldestBookingDate"), summary.get("newestBookingDate"))
connectorBalances = await connector.getAccountBalances(
connConfig,
years=balanceYears,
accountNumbers=fetchedAccountNumbers or None,
)
_dumpSyncData("accountBalances", connectorBalances)
if connectorBalances:
balanceSource = "connector"
except Exception as e:
logger.warning(f"Connector getAccountBalances failed, will use local fallback: {e}", exc_info=True)
summary["errors"].append(f"Balances connector: {e}")
try:
_progress(92, "Speichere Kontensaldi...")
balanceCount = await asyncio.to_thread(
self._persistBalances, featureInstanceId, mandateId,
TrusteeDataJournalEntry, TrusteeDataJournalLine, TrusteeDataAccountBalance,
connectorBalances, balanceSource,
)
summary["accountBalances"] = balanceCount
_progress(95, f"{balanceCount} Saldi gespeichert (source={balanceSource}).")
except Exception as e:
logger.error(f"Persist balances failed: {e}", exc_info=True)
summary["errors"].append(f"Balances: {e}")
cfgId = cfgRecord.get("id")
if cfgId:
corePayload = {
"lastSyncAt": time.time(),
"lastSyncStatus": "success" if not summary["errors"] else "partial",
"lastSyncErrorMessage": "; ".join(summary["errors"])[:500] if summary["errors"] else None,
}
try:
self._if.db.recordModify(TrusteeAccountingConfig, cfgId, corePayload)
except Exception as coreErr:
logger.exception(f"AccountingDataSync: failed to write core lastSync* fields for cfg {cfgId}: {coreErr}")
summary["errors"].append(f"Persist lastSync core: {coreErr}")
extPayload = {
"lastSyncDateFrom": _isoDateToTimestamp(dateFrom),
"lastSyncDateTo": _isoDateToTimestamp(dateTo),
"lastSyncCounts": {
"accounts": int(summary.get("accounts", 0)),
"journalEntries": int(summary.get("journalEntries", 0)),
"journalLines": int(summary.get("journalLines", 0)),
"contacts": int(summary.get("contacts", 0)),
"accountBalances": int(summary.get("accountBalances", 0)),
# Actual oldest/newest booking date observed in the
# imported journal entries. Lets the user verify that the
# full requested window was returned by the source system.
"oldestBookingDate": summary.get("oldestBookingDate"),
"newestBookingDate": summary.get("newestBookingDate"),
},
}
try:
self._if.db.recordModify(TrusteeAccountingConfig, cfgId, extPayload)
except Exception as extErr:
logger.exception(f"AccountingDataSync: failed to write extended lastSync* fields for cfg {cfgId}: {extErr}")
summary["errors"].append(f"Persist lastSync ext: {extErr}")
summary["finishedAt"] = time.time()
summary["durationSeconds"] = round(summary["finishedAt"] - summary["startedAt"], 1)
logger.info(
f"AccountingDataSync completed for {featureInstanceId}: "
f"{summary['accounts']} accounts, {summary['journalEntries']} entries, "
f"{summary['journalLines']} lines, {summary['contacts']} contacts, "
f"{summary['accountBalances']} balances, {len(summary['errors'])} errors, "
f"{summary['durationSeconds']}s"
)
return summary
# ===== Sync persistence helpers (run inside asyncio.to_thread) =====
def _persistAccounts(self, charts: list, scope: Dict[str, Any],
featureInstanceId: str, modelAccount: Type) -> int:
"""Bulk-replace chart of accounts for one feature instance."""
t0 = time.time()
self._bulkClear(modelAccount, featureInstanceId)
rows = [{
"accountNumber": acc.accountNumber,
"label": acc.label,
"accountType": acc.accountType or "",
"currency": "CHF",
"isActive": True,
**scope,
} for acc in charts]
n = self._bulkCreate(modelAccount, rows)
logger.info(f"Persisted {n} accounts for {featureInstanceId} in {time.time() - t0:.1f}s")
return n
def _persistJournal(self, rawEntries: list, scope: Dict[str, Any],
featureInstanceId: str, modelEntry: Type, modelLine: Type) -> tuple:
"""Bulk-replace journal entries + journal lines for one feature instance.
We pre-build the line rows in memory keyed by the freshly minted entryId
so a single ``execute_values`` call can persist all of them.
Returns ``(entriesCount, linesCount, oldestBookingDate, newestBookingDate)``
where the date strings are ISO ``YYYY-MM-DD`` (or ``None`` if no entries).
"""
import uuid as _uuid
t0 = time.time()
self._bulkClear(modelEntry, featureInstanceId)
self._bulkClear(modelLine, featureInstanceId)
entryRows: List[Dict[str, Any]] = []
lineRows: List[Dict[str, Any]] = []
oldestDate: Optional[str] = None
newestDate: Optional[str] = None
for raw in rawEntries:
entryId = str(_uuid.uuid4())
rawDate = raw.get("bookingDate")
bookingTs = _isoDateToTimestamp(rawDate)
if rawDate:
isoDay = str(rawDate).split("T")[0][:10]
if isoDay:
if oldestDate is None or isoDay < oldestDate:
oldestDate = isoDay
if newestDate is None or isoDay > newestDate:
newestDate = isoDay
entryRows.append({
"id": entryId,
"externalId": raw.get("externalId"),
"bookingDate": bookingTs,
"reference": raw.get("reference"),
"description": raw.get("description", ""),
"currency": raw.get("currency", "CHF"),
"totalAmount": float(raw.get("totalAmount", 0)),
**scope,
})
for line in (raw.get("lines") or []):
lineRows.append({
"journalEntryId": entryId,
"accountNumber": line.get("accountNumber", ""),
"debitAmount": float(line.get("debitAmount", 0)),
"creditAmount": float(line.get("creditAmount", 0)),
"currency": line.get("currency", "CHF"),
"taxCode": line.get("taxCode"),
"costCenter": line.get("costCenter"),
"description": line.get("description", ""),
**scope,
})
if len(entryRows) % _HEARTBEAT_EVERY == 0:
logger.info(
f"Journal build progress: {len(entryRows)}/{len(rawEntries)} entries, "
f"{len(lineRows)} lines so far"
)
entriesCount = self._bulkCreate(modelEntry, entryRows)
linesCount = self._bulkCreate(modelLine, lineRows)
logger.info(
f"Persisted {entriesCount} entries + {linesCount} lines for "
f"{featureInstanceId} in {time.time() - t0:.1f}s "
f"(window: {oldestDate or '?'} .. {newestDate or '?'})"
)
return entriesCount, linesCount, oldestDate, newestDate
def _persistContacts(self, customers: list, vendors: list, scope: Dict[str, Any],
featureInstanceId: str, modelContact: Type) -> int:
"""Bulk-replace contacts (customers + vendors) for one feature instance."""
t0 = time.time()
self._bulkClear(modelContact, featureInstanceId)
rows = [self._mapContact(c, "customer", scope) for c in customers]
rows += [self._mapContact(v, "vendor", scope) for v in vendors]
n = self._bulkCreate(modelContact, rows)
logger.info(f"Persisted {n} contacts for {featureInstanceId} in {time.time() - t0:.1f}s")
return n
def _persistBalances(
self,
featureInstanceId: str,
mandateId: str,
modelEntry: Type,
modelLine: Type,
modelBalance: Type,
connectorBalances: list,
source: str,
) -> int:
"""Persist account balances per (account, period) into ``TrusteeDataAccountBalance``.
Source of truth (``source="connector"``): the list returned by
``BaseAccountingConnector.getAccountBalances`` is persisted with
``openingBalance``/``closingBalance`` from the connector. If the
connector doesn't supply ``debitTotal``/``creditTotal`` (e.g. RMA's
``/gl/saldo`` only returns net balance), those fields are enriched
from the already-imported journal lines.
Fallback (``source="local-fallback"``): aggregate the just-persisted
journal lines into **cumulative** balances.
"""
t0 = time.time()
self._bulkClear(modelBalance, featureInstanceId)
scope = {"featureInstanceId": featureInstanceId, "mandateId": mandateId}
if connectorBalances:
rows = [_balanceModelToRow(b, scope) for b in connectorBalances]
movements = self._aggregateJournalMovements(featureInstanceId, modelEntry, modelLine)
if movements:
self._enrichRowsWithMovements(rows, movements)
n = self._bulkCreate(modelBalance, rows)
logger.info(
f"Persisted {n} balances for {featureInstanceId} in {time.time() - t0:.1f}s "
f"(source={source})"
)
return n
rows = self._buildLocalBalanceFallback(featureInstanceId, modelEntry, modelLine, scope)
n = self._bulkCreate(modelBalance, rows)
logger.info(
f"Persisted {n} balances for {featureInstanceId} in {time.time() - t0:.1f}s "
f"(source={source})"
)
return n
def _aggregateJournalMovements(
self,
featureInstanceId: str,
modelEntry: Type,
modelLine: Type,
) -> Dict[tuple, Dict[str, float]]:
"""Aggregate debit/credit movements per ``(accountNumber, year, month)``
from the already-persisted journal lines.
Returns ``{(accNo, year, month): {"debit": float, "credit": float}}``.
Used by both the local-fallback balance builder and the connector-balance
enrichment (RMA's ``/gl/saldo`` delivers net balance but no debit/credit
breakdown).
"""
entries = self._if.db.getRecordset(
modelEntry, recordFilter={"featureInstanceId": featureInstanceId},
) or []
entryDates: Dict[str, str] = {}
for e in entries:
eid = e.get("id") if isinstance(e, dict) else getattr(e, "id", None)
bdate = e.get("bookingDate") if isinstance(e, dict) else getattr(e, "bookingDate", None)
if eid and bdate:
entryDates[eid] = bdate
lines = self._if.db.getRecordset(
modelLine, recordFilter={"featureInstanceId": featureInstanceId},
) or []
movements: Dict[tuple, Dict[str, float]] = defaultdict(lambda: {"debit": 0.0, "credit": 0.0})
for ln in lines:
if isinstance(ln, dict):
jeid = ln.get("journalEntryId", "")
accNo = ln.get("accountNumber", "")
debit = float(ln.get("debitAmount", 0))
credit = float(ln.get("creditAmount", 0))
else:
jeid = getattr(ln, "journalEntryId", "")
accNo = getattr(ln, "accountNumber", "")
debit = float(getattr(ln, "debitAmount", 0))
credit = float(getattr(ln, "creditAmount", 0))
bdate = entryDates.get(jeid)
if not accNo or not bdate:
continue
try:
dt = _dt.fromtimestamp(float(bdate), tz=_tz.utc)
year = dt.year
month = dt.month
except (ValueError, TypeError, OSError):
continue
movements[(accNo, year, month)]["debit"] += debit
movements[(accNo, year, month)]["credit"] += credit
return movements
@staticmethod
def _enrichRowsWithMovements(
rows: List[Dict[str, Any]],
movements: Dict[tuple, Dict[str, float]],
) -> None:
"""Patch ``debitTotal`` / ``creditTotal`` on balance rows from journal movements.
For monthly rows: use the exact month's movement.
For annual rows (``periodMonth=0``): sum all 12 months of that year+account.
Only overwrites if the existing value is 0 (connector didn't provide it).
"""
for row in rows:
if row.get("debitTotal", 0) != 0 or row.get("creditTotal", 0) != 0:
continue
accNo = row.get("accountNumber", "")
year = row.get("periodYear", 0)
month = row.get("periodMonth", 0)
if month > 0:
mov = movements.get((accNo, year, month))
if mov:
row["debitTotal"] = round(mov["debit"], 2)
row["creditTotal"] = round(mov["credit"], 2)
else:
yearDebit = 0.0
yearCredit = 0.0
for m in range(1, 13):
mov = movements.get((accNo, year, m))
if mov:
yearDebit += mov["debit"]
yearCredit += mov["credit"]
if yearDebit or yearCredit:
row["debitTotal"] = round(yearDebit, 2)
row["creditTotal"] = round(yearCredit, 2)
def _buildLocalBalanceFallback(
self,
featureInstanceId: str,
modelEntry: Type,
modelLine: Type,
scope: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""Aggregate ``TrusteeDataJournalLine`` rows into cumulative period balances.
Returns rows ready for ``_bulkCreate``. Walks every account
chronologically through all years observed in the journal so the
cumulative balance and per-period opening are exact (within the
bounds of the imported window).
"""
movements = self._aggregateJournalMovements(featureInstanceId, modelEntry, modelLine)
observedYears: set = set()
observedAccounts: set = set()
for (accNo, year, month) in movements:
observedYears.add(year)
observedAccounts.add(accNo)
if not observedYears or not observedAccounts:
return []
sortedYears = sorted(observedYears)
rows: List[Dict[str, Any]] = []
for accNo in sorted(observedAccounts):
isER = _isIncomeStatementAccount(accNo)
cumulativeOpeningOfYear = 0.0
for year in sortedYears:
yearOpening = 0.0 if isER else cumulativeOpeningOfYear
running = yearOpening
yearDebit = 0.0
yearCredit = 0.0
for month in range(1, 13):
opening = running
mov = movements.get((accNo, year, month), {"debit": 0.0, "credit": 0.0})
running = opening + mov["debit"] - mov["credit"]
yearDebit += mov["debit"]
yearCredit += mov["credit"]
if mov["debit"] == 0 and mov["credit"] == 0 and opening == 0 and running == 0:
continue
rows.append({
"accountNumber": accNo,
"periodYear": year,
"periodMonth": month,
"openingBalance": round(opening, 2),
"debitTotal": round(mov["debit"], 2),
"creditTotal": round(mov["credit"], 2),
"closingBalance": round(running, 2),
"currency": "CHF",
**scope,
})
rows.append({
"accountNumber": accNo,
"periodYear": year,
"periodMonth": 0,
"openingBalance": round(yearOpening, 2),
"debitTotal": round(yearDebit, 2),
"creditTotal": round(yearCredit, 2),
"closingBalance": round(running, 2),
"currency": "CHF",
**scope,
})
cumulativeOpeningOfYear = running
return rows
# ===== Low-level bulk helpers =====
def _bulkClear(self, model: Type, featureInstanceId: str) -> int:
"""Delete every row for this feature instance in a single statement."""
try:
return self._if.db.recordDeleteWhere(
model, {"featureInstanceId": featureInstanceId}
)
except AttributeError:
# Backwards-compatible path if the connector hasn't been upgraded
# yet. Logs a warning so we notice in dev/CI.
logger.warning(
"DatabaseConnector.recordDeleteWhere missing — falling back to slow per-row delete for %s",
model.__name__,
)
records = self._if.db.getRecordset(
model, recordFilter={"featureInstanceId": featureInstanceId}
) or []
count = 0
for r in records:
rid = r.get("id") if isinstance(r, dict) else getattr(r, "id", None)
if rid:
try:
self._if.db.recordDelete(model, rid)
count += 1
except Exception:
pass
return count
def _bulkCreate(self, model: Type, rows: List[Dict[str, Any]]) -> int:
"""Insert all rows in a single transaction. Falls back to per-row
insert only if the connector lacks ``recordCreateBulk`` (legacy)."""
if not rows:
return 0
try:
return self._if.db.recordCreateBulk(model, rows)
except AttributeError:
logger.warning(
"DatabaseConnector.recordCreateBulk missing — falling back to slow per-row insert for %s",
model.__name__,
)
n = 0
for i, r in enumerate(rows, start=1):
try:
self._if.db.recordCreate(model, r)
n += 1
except Exception as ex:
logger.warning(f"Per-row insert failed at {i}/{len(rows)}: {ex}")
if i % _HEARTBEAT_EVERY == 0:
logger.info(f"Per-row insert progress: {i}/{len(rows)} rows ({model.__name__})")
return n
# ===== Field helpers =====
@staticmethod
def _safeStr(val: Any) -> str:
"""Convert a value to a safe string for DB storage, collapsing nested dicts/lists."""
if val is None:
return ""
if isinstance(val, (dict, list)):
return ""
return str(val)
def _mapContact(self, raw: Dict[str, Any], contactType: str, scope: Dict[str, Any]) -> Dict[str, Any]:
"""Extract contact fields from a raw API dict, handling varying field names across connectors."""
s = self._safeStr
return {
"externalId": s(raw.get("id") or raw.get("Id") or raw.get("customer_nr") or raw.get("vendor_nr") or ""),
"contactType": contactType,
"contactNumber": s(
raw.get("customernumber") or raw.get("customer_nr")
or raw.get("vendornumber") or raw.get("vendor_nr")
or raw.get("nr") or raw.get("ContactNumber")
or raw.get("id") or ""
),
"name": s(raw.get("name") or raw.get("Name") or raw.get("name_1") or ""),
"address": s(raw.get("addr1") or raw.get("address") or raw.get("Address") or ""),
"zip": s(raw.get("zipcode") or raw.get("postcode") or raw.get("Zip") or raw.get("zip") or ""),
"city": s(raw.get("city") or raw.get("City") or ""),
"country": s(raw.get("country") or raw.get("country_id") or raw.get("Country") or ""),
"email": s(raw.get("email") or raw.get("mail") or raw.get("Email") or ""),
"phone": s(raw.get("phone") or raw.get("phone_fixed") or raw.get("Phone") or ""),
"vatNumber": s(raw.get("vat_identifier") or raw.get("vatNumber") or ""),
**scope,
}