From 5ef311a82eac83d835801bf03fc4af246f1409ea Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 21 Apr 2026 08:57:43 +0200
Subject: [PATCH 1/2] stripe fix
---
modules/interfaces/interfaceDbChat.py | 18 +++++++-
.../mainServiceSubscription.py | 45 +++++++++++++++++++
.../serviceSubscription/stripeBootstrap.py | 43 ++++++++++--------
3 files changed, 86 insertions(+), 20 deletions(-)
diff --git a/modules/interfaces/interfaceDbChat.py b/modules/interfaces/interfaceDbChat.py
index 3614d04b..be097263 100644
--- a/modules/interfaces/interfaceDbChat.py
+++ b/modules/interfaces/interfaceDbChat.py
@@ -219,6 +219,22 @@ class ChatObjects:
# Everything else is an object
return True
+ def _unwrapOptional(self, fieldType):
+ """Unwrap ``Optional[X]`` / ``Union[X, None]`` to ``X``.
+
+ The generic JSONB detection in ``_separateObjectFields`` checks
+ ``__origin__`` against ``(dict, list)``. For ``Optional[List[str]]``
+ the origin is ``Union``, so JSONB fields declared as ``Optional[...]``
+ would silently fall through to ``objectFields`` and be dropped on
+ write. Unwrapping the Optional first keeps the existing detection
+ intact while supporting nullable JSONB columns.
+ """
+ if getattr(fieldType, '__origin__', None) is Union:
+ nonNone = [a for a in getattr(fieldType, '__args__', ()) if a is not type(None)]
+ if len(nonNone) == 1:
+ return nonNone[0]
+ return fieldType
+
def _separateObjectFields(self, model_class, data: Dict[str, Any]) -> tuple[Dict[str, Any], Dict[str, Any]]:
"""Separate simple fields from object fields based on Pydantic model structure."""
simpleFields = {}
@@ -232,7 +248,7 @@ class ChatObjects:
if fieldName in modelFields:
fieldInfo = modelFields[fieldName]
# Pydantic v2 only
- fieldType = fieldInfo.annotation
+ fieldType = self._unwrapOptional(fieldInfo.annotation)
# Always route relational/object fields to object_fields for separate handling
# These fields are stored in separate normalized tables, not as JSONB
diff --git a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
index 8a2ff8d5..c9ba5f54 100644
--- a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
+++ b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
@@ -246,6 +246,30 @@ class SubscriptionService:
if not priceMapping or (not priceMapping.stripePriceIdUsers and not priceMapping.stripePriceIdInstances):
raise ValueError(f"Stripe Price IDs not provisioned for plan {plan.planKey}")
+ # Defense in depth: if either of the persisted Stripe Price IDs has been
+ # archived in Stripe in the meantime (e.g. another environment's bootstrap
+ # rotated them on a shared Stripe account), the upcoming
+ # ``checkout.Session.create`` would fail with "The price specified is
+ # inactive". Trigger a one-shot bootstrap re-run to rotate inactive prices,
+ # then reload the mapping. This is idempotent and cheap when nothing
+ # changed.
+ if not self._areStripePricesActive(stripe, priceMapping):
+ logger.warning(
+ "Stripe Price(s) for plan %s are no longer active in Stripe — "
+ "running bootstrap to rotate.", plan.planKey,
+ )
+ try:
+ from modules.serviceCenter.services.serviceSubscription.stripeBootstrap import bootstrapStripePrices
+ bootstrapStripePrices()
+ priceMapping = getStripePricesForPlan(plan.planKey)
+ except Exception as ex:
+ logger.error("Inline Stripe bootstrap failed for plan %s: %s", plan.planKey, ex)
+ if not priceMapping or not self._areStripePricesActive(stripe, priceMapping):
+ raise ValueError(
+ f"Stripe Price IDs for plan {plan.planKey} are inactive and "
+ "could not be rotated automatically."
+ )
+
stripeCustomerId = self._resolveStripeCustomer(mandateId)
if not stripeCustomerId:
raise ValueError(f"Could not resolve Stripe customer for mandate {mandateId}")
@@ -353,6 +377,27 @@ class SubscriptionService:
except Exception as e:
logger.error("Failed to clear stripeCustomerId for mandate %s: %s", mandateId, e)
+ def _areStripePricesActive(self, stripe, priceMapping) -> bool:
+ """Verify that every persisted Stripe Price ID for the plan is still
+ ``active`` in Stripe. ``stripe.Price.retrieve`` returns archived prices
+ too, so we must inspect the ``active`` flag explicitly. Returns True
+ only when ALL non-empty price IDs resolve to active prices."""
+ priceIds = [pid for pid in (
+ getattr(priceMapping, "stripePriceIdUsers", None),
+ getattr(priceMapping, "stripePriceIdInstances", None),
+ ) if pid]
+ if not priceIds:
+ return False
+ for pid in priceIds:
+ try:
+ price = stripe.Price.retrieve(pid)
+ if not bool(getattr(price, "active", False) if not isinstance(price, dict) else price.get("active")):
+ return False
+ except Exception as ex:
+ logger.warning("Stripe Price %s could not be retrieved: %s", pid, ex)
+ return False
+ return True
+
def _resolveStripeCustomer(self, mandateId: str) -> Optional[str]:
try:
from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
diff --git a/modules/serviceCenter/services/serviceSubscription/stripeBootstrap.py b/modules/serviceCenter/services/serviceSubscription/stripeBootstrap.py
index d26ef50e..ce63a43d 100644
--- a/modules/serviceCenter/services/serviceSubscription/stripeBootstrap.py
+++ b/modules/serviceCenter/services/serviceSubscription/stripeBootstrap.py
@@ -124,16 +124,6 @@ def _findExistingStripePrice(
return None
-def _getStripePriceAmount(stripe, priceId: str) -> Optional[int]:
- """Retrieve the unit_amount (in Rappen) of an existing Stripe Price."""
- try:
- from modules.shared.stripeClient import stripeToDict
- price = stripeToDict(stripe.Price.retrieve(priceId))
- return price.get("unit_amount") if price else None
- except Exception:
- return None
-
-
def _reconcilePrice(
stripe,
productId: str,
@@ -143,25 +133,35 @@ def _reconcilePrice(
nickname: str,
intervalCount: int = 1,
) -> str:
- """If the stored Stripe Price has a different amount, create a new one and deactivate the old."""
+ """If the stored Stripe Price has a different amount, is no longer active,
+ or has a different recurring interval, create/find a new active one and
+ deactivate the old."""
from modules.shared.stripeClient import stripeToDict
expectedCents = int(round(expectedCHF * 100))
- actualCents = _getStripePriceAmount(stripe, oldPriceId)
+ actualCents: Optional[int] = None
matchesRecurring = False
+ isActive = False
+ retrieveFailed = False
try:
raw = stripe.Price.retrieve(oldPriceId)
pd = stripeToDict(raw)
+ actualCents = pd.get("unit_amount")
matchesRecurring = _recurringMatches(pd.get("recurring") or {}, interval, intervalCount)
- except Exception:
- pass
+ # Stripe.Price.retrieve returns archived prices too, so we MUST check
+ # `active` explicitly. Subscription.create rejects inactive prices with
+ # "The price specified is inactive. This field only accepts active prices."
+ isActive = bool(pd.get("active"))
+ except Exception as ex:
+ retrieveFailed = True
+ logger.warning("Could not retrieve Stripe Price %s: %s", oldPriceId, ex)
- if actualCents == expectedCents and matchesRecurring:
+ if not retrieveFailed and isActive and actualCents == expectedCents and matchesRecurring:
return oldPriceId
logger.warning(
- "Price drift or recurring mismatch for %s: Stripe amount=%s Rappen (expected %s). Rotating price.",
- oldPriceId, actualCents, expectedCents,
+ "Rotating Stripe Price %s on product %s: active=%s amount=%s (expected %s) recurringMatches=%s retrieveFailed=%s.",
+ oldPriceId, productId, isActive, actualCents, expectedCents, matchesRecurring, retrieveFailed,
)
existingMatch = _findExistingStripePrice(stripe, productId, expectedCents, interval, intervalCount)
@@ -221,8 +221,13 @@ def _archiveOtherRecurringPrices(
def _validateStripeIdsExist(stripe, mapping: StripePlanPrice) -> bool:
- """Quick check whether at least the stored product IDs still exist in Stripe.
- Returns False when running against a different Stripe account or after DB copy."""
+ """Quick check whether the stored Stripe product IDs still exist.
+
+ Returns False when running against a different Stripe account or after a
+ DB copy from another environment. Price-level validation (active flag,
+ drift) is handled by ``_reconcilePrice``; we don't fail here on archived
+ prices, otherwise we'd needlessly re-provision products on every rotation.
+ """
try:
if mapping.stripeProductIdUsers:
stripe.Product.retrieve(mapping.stripeProductIdUsers)
From be43876461ee727324669ba705cbc0d94e383a55 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 21 Apr 2026 10:45:14 +0200
Subject: [PATCH 2/2] fix critical trustee db sync
---
env_dev.env | 2 +
env_int.env | 2 +
env_prod.env | 2 +
modules/connectors/connectorDbPostgre.py | 186 +++++++
.../trustee/accounting/accountingDataSync.py | 486 ++++++++++++------
.../features/trustee/routeFeatureTrustee.py | 23 +-
modules/routes/routeI18n.py | 115 ++++-
modules/shared/i18nRegistry.py | 148 +++++-
8 files changed, 789 insertions(+), 175 deletions(-)
diff --git a/env_dev.env b/env_dev.env
index f8d5b999..4f1c7367 100644
--- a/env_dev.env
+++ b/env_dev.env
@@ -82,6 +82,8 @@ TEAMSBOT_BROWSER_BOT_URL = https://cae-poweron-shared.redwater-53d21339.switzerl
# Debug Configuration
APP_DEBUG_CHAT_WORKFLOW_ENABLED = True
APP_DEBUG_CHAT_WORKFLOW_DIR = D:/Athi/Local/Web/poweron/local/debug
+APP_DEBUG_ACCOUNTING_SYNC_ENABLED = True
+APP_DEBUG_ACCOUNTING_SYNC_DIR = D:/Athi/Local/Web/poweron/local/debug/sync
# Manadate Pre-Processing Servers
PREPROCESS_ALTHAUS_CHAT_SECRET = DEV_ENC:Z0FBQUFBQnBudkpGbEphQ3ZUMlFMQ2EwSGpoSE9NNzRJNTJtaGk1N0RGakdIYnVVeVFHZmF5OXB3QTVWLVNaZk9wNkhfQkZWRnVwRGRxem9iRzJIWXdpX1NIN2FwSExfT3c9PQ==
diff --git a/env_int.env b/env_int.env
index d11052fa..1d273920 100644
--- a/env_int.env
+++ b/env_int.env
@@ -80,6 +80,8 @@ TEAMSBOT_BROWSER_BOT_URL = https://cae-poweron-shared.redwater-53d21339.switzerl
# Debug Configuration
APP_DEBUG_CHAT_WORKFLOW_ENABLED = FALSE
APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat
+APP_DEBUG_ACCOUNTING_SYNC_ENABLED = FALSE
+APP_DEBUG_ACCOUNTING_SYNC_DIR = ./debug/sync
# Manadate Pre-Processing Servers
PREPROCESS_ALTHAUS_CHAT_SECRET = INT_ENC:Z0FBQUFBQnBaSnM4UkNBelhvckxCQUVjZm94N3BZUDcxaEMyckE2dm1lRVhqODhrWU1SUjNXZ3dQZlVJOWhveXFkZXpobW5xT0NneGZ2SkNUblFmYXd0WTBYNTl3UmRnSWc9PQ==
diff --git a/env_prod.env b/env_prod.env
index 9e099bc2..555b9a95 100644
--- a/env_prod.env
+++ b/env_prod.env
@@ -81,6 +81,8 @@ TEAMSBOT_BROWSER_BOT_URL = https://cae-poweron-shared.redwater-53d21339.switzerl
# Debug Configuration
APP_DEBUG_CHAT_WORKFLOW_ENABLED = FALSE
APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat
+APP_DEBUG_ACCOUNTING_SYNC_ENABLED = FALSE
+APP_DEBUG_ACCOUNTING_SYNC_DIR = ./debug/sync
# Manadate Pre-Processing Servers
PREPROCESS_ALTHAUS_CHAT_SECRET = PROD_ENC:Z0FBQUFBQnBaSnM4RVRmYW5IelNIbklTUDZIMEoycEN4ZFF0YUJoWWlUTUh2M0dhSXpYRXcwVkRGd1VieDNsYkdCRlpxMUR5Rjk1RDhPRkE5bmVtc2VDMURfLW9QNkxMVHN0M1JhbU9sa3JHWmdDZnlHS3BQRVBGTERVMHhXOVdDOWVqNkhfSUQyOHo=
diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py
index c588c906..72098ff1 100644
--- a/modules/connectors/connectorDbPostgre.py
+++ b/modules/connectors/connectorDbPostgre.py
@@ -1396,6 +1396,192 @@ class DatabaseConnector:
self.connection.rollback()
return False
+ def recordCreateBulk(
+ self, model_class: type, records: List[Union[Dict[str, Any], BaseModel]]
+ ) -> int:
+ """Bulk-insert many records in a single transaction.
+
+ Use this instead of calling recordCreate() in a tight loop when importing
+ large datasets (>100 rows). Performance gain is roughly two orders of
+ magnitude because:
+ - one network round-trip via execute_values() instead of N
+ - one COMMIT instead of N
+ - initial ID is registered once for the whole batch instead of every row
+
+ Returns the number of rows successfully inserted. Caller is responsible
+ for catching exceptions; on any error the transaction is rolled back so
+ the table stays consistent (all-or-nothing).
+ """
+ if not records:
+ return 0
+
+ table = model_class.__name__
+ if not self._ensureTableExists(model_class):
+ raise ValueError(f"Table {table} does not exist")
+
+ fields = _get_model_fields(model_class)
+ columns = ["id"] + [f for f in fields.keys() if f != "id"]
+ modelFields = model_class.model_fields
+
+ effectiveUserId = _current_user_id.get()
+ if effectiveUserId is None:
+ effectiveUserId = self.userId
+ currentTime = getUtcTimestamp()
+
+ normalised: List[Dict[str, Any]] = []
+ for raw in records:
+ if isinstance(raw, BaseModel):
+ rec = raw.model_dump()
+ elif isinstance(raw, dict):
+ rec = raw.copy()
+ else:
+ raise ValueError("Bulk record must be a Pydantic model or dictionary")
+ if "id" not in rec or not rec["id"]:
+ rec["id"] = str(uuid.uuid4())
+ createdTs = rec.get("sysCreatedAt")
+ if createdTs is None or createdTs == 0 or createdTs == 0.0:
+ rec["sysCreatedAt"] = currentTime
+ if effectiveUserId:
+ rec["sysCreatedBy"] = effectiveUserId
+ elif not rec.get("sysCreatedBy") and effectiveUserId:
+ rec["sysCreatedBy"] = effectiveUserId
+ rec["sysModifiedAt"] = currentTime
+ if effectiveUserId:
+ rec["sysModifiedBy"] = effectiveUserId
+ normalised.append(rec)
+
+ rows = [self._coerceRowForInsert(rec, columns, fields, modelFields) for rec in normalised]
+
+ col_names = ", ".join([f'"{c}"' for c in columns])
+ updates = ", ".join(
+ [f'"{c}" = EXCLUDED."{c}"' for c in columns[1:]
+ if c not in ("sysCreatedAt", "sysCreatedBy")]
+ )
+ sql = (
+ f'INSERT INTO "{table}" ({col_names}) VALUES %s '
+ f'ON CONFLICT ("id") DO UPDATE SET {updates}'
+ )
+
+ try:
+ self._ensure_connection()
+ with self.connection.cursor() as cursor:
+ psycopg2.extras.execute_values(cursor, sql, rows, page_size=500)
+ self.connection.commit()
+ except Exception as e:
+ logger.error(f"Bulk insert into {table} failed (n={len(rows)}): {e}")
+ try:
+ self.connection.rollback()
+ except Exception:
+ pass
+ raise
+
+ if self.getInitialId(model_class) is None and normalised:
+ self._registerInitialId(table, normalised[0]["id"])
+ logger.info(f"Registered initial ID {normalised[0]['id']} for table {table}")
+
+ return len(rows)
+
+ def _coerceRowForInsert(
+ self,
+ record: Dict[str, Any],
+ columns: List[str],
+ fields: Dict[str, str],
+ modelFields: Dict[str, Any],
+ ) -> tuple:
+ """Convert one record dict to a positional tuple matching `columns`.
+
+ Mirrors the per-column coercion logic in `_save_record` so that bulk and
+ single inserts produce identical on-disk values (timestamps as floats,
+ enums as strings, vectors as pgvector text, JSONB as JSON strings).
+ """
+ import json as _json
+ out = []
+ for col in columns:
+ value = record.get(col)
+ if col in ("sysCreatedAt", "sysModifiedAt") and value is not None:
+ if isinstance(value, str):
+ try:
+ value = float(value)
+ except Exception:
+ pass
+ elif hasattr(value, "value"):
+ value = value.value
+ elif col in fields and _isVectorType(fields[col]) and value is not None:
+ if isinstance(value, list):
+ value = f"[{','.join(str(v) for v in value)}]"
+ elif col in fields and fields[col] == "JSONB" and value is not None:
+ if isinstance(value, (dict, list)):
+ value = _json.dumps(value)
+ elif isinstance(value, str):
+ try:
+ _json.loads(value)
+ except (ValueError, TypeError):
+ value = _json.dumps(value)
+ elif hasattr(value, "model_dump"):
+ value = _json.dumps(value.model_dump())
+ else:
+ value = _json.dumps(value)
+ out.append(value)
+ return tuple(out)
+
+ def recordDeleteWhere(
+ self, model_class: type, recordFilter: Dict[str, Any]
+ ) -> int:
+ """Delete all records matching a simple equality filter, in one statement.
+
+ Replaces the N+1 pattern `for r in getRecordset(...): recordDelete(r.id)`.
+ Returns the number of rows actually deleted. If the table holds the
+ initial ID and that row gets deleted, the initial ID registration is
+ cleared so the next insert can re-register a fresh one.
+ """
+ if not recordFilter:
+ raise ValueError("recordDeleteWhere requires a non-empty recordFilter (refusing to truncate)")
+
+ table = model_class.__name__
+ if not self._ensureTableExists(model_class):
+ return 0
+
+ fields = _get_model_fields(model_class)
+ clauses: List[str] = []
+ params: List[Any] = []
+ for key, val in recordFilter.items():
+ if key not in fields and key != "id":
+ raise ValueError(f"recordDeleteWhere: unknown column {table}.{key}")
+ clauses.append(f'"{key}" = %s')
+ params.append(val)
+ whereSql = " AND ".join(clauses)
+
+ initialId = self.getInitialId(model_class)
+ try:
+ self._ensure_connection()
+ with self.connection.cursor() as cursor:
+ if initialId is not None:
+ cursor.execute(
+ f'SELECT 1 FROM "{table}" WHERE "id" = %s AND ' + whereSql,
+ [initialId, *params],
+ )
+ initialIsAffected = cursor.fetchone() is not None
+ else:
+ initialIsAffected = False
+
+ cursor.execute(f'DELETE FROM "{table}" WHERE ' + whereSql, params)
+ deleted = cursor.rowcount or 0
+ self.connection.commit()
+ except Exception as e:
+ logger.error(f"Bulk delete from {table} failed (filter={recordFilter}): {e}")
+ try:
+ self.connection.rollback()
+ except Exception:
+ pass
+ raise
+
+ if deleted and initialIsAffected:
+ self._removeInitialId(table)
+ logger.info(f"Initial ID for table {table} cleared (bulk-delete removed it)")
+ if deleted:
+ logger.info(f"recordDeleteWhere: deleted {deleted} rows from {table} where {recordFilter}")
+ return deleted
+
def getInitialId(self, model_class: type) -> Optional[str]:
"""Returns the initial ID for a table."""
table = model_class.__name__
diff --git a/modules/features/trustee/accounting/accountingDataSync.py b/modules/features/trustee/accounting/accountingDataSync.py
index e422566f..a606c58a 100644
--- a/modules/features/trustee/accounting/accountingDataSync.py
+++ b/modules/features/trustee/accounting/accountingDataSync.py
@@ -2,16 +2,27 @@
# All rights reserved.
"""Orchestrates importing accounting data from external systems into TrusteeData* tables.
-Flow: load config → resolve connector → fetch data → clear old records → write new records → compute balances.
+Flow per phase:
+ 1. async fetch from external system (HTTP, awaits cleanly on the event loop)
+ 2. await asyncio.to_thread(...) for the DB write phase, so the heavy
+ synchronous psycopg2 calls do NOT block the FastAPI event loop
+ 3. inside the worker thread we use bulk delete / bulk insert (single
+ transaction per phase) instead of N+1 single-row operations
+
+Why this matters: a typical accounting sync is ~10k-100k rows. The old
+implementation called ``recordCreate`` row-by-row on the event loop, which
+froze every other request (chat, health-check, etc.) for minutes. See
+``local/notes/changelog.txt`` for the original incident analysis.
"""
+import asyncio
import json as _json
import logging
import os
import time
from collections import defaultdict
from pathlib import Path
-from typing import Dict, Any, List, Optional
+from typing import Callable, Dict, Any, List, Optional, Type
from .accountingConnectorBase import BaseAccountingConnector
from .accountingRegistry import _getAccountingRegistry
@@ -19,30 +30,57 @@ from .accountingRegistry import _getAccountingRegistry
logger = logging.getLogger(__name__)
-_DEBUG_SYNC_DIR = Path("D:/Athi/Local/Web/poweron/local/debug/sync")
+_HEARTBEAT_EVERY = 500
-def _debugSyncDir() -> Path:
- _DEBUG_SYNC_DIR.mkdir(parents=True, exist_ok=True)
- return _DEBUG_SYNC_DIR
+def _isDebugDumpEnabled() -> bool:
+ """Whether to write raw connector payloads to disk for offline inspection.
-
-def _isDebugEnabled() -> bool:
+ Controlled by ``APP_DEBUG_ACCOUNTING_SYNC_ENABLED``. Default False so that
+ INT/PROD never spend disk/IO/RAM on dumping 7-figure JSON files. Mirrors
+ the existing ``APP_DEBUG_CHAT_WORKFLOW_ENABLED`` pattern.
+ """
try:
from modules.shared.configuration import APP_CONFIG
- return APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", False) is True or str(APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", "")).lower() == "true"
+ raw = APP_CONFIG.get("APP_DEBUG_ACCOUNTING_SYNC_ENABLED", False)
+ if isinstance(raw, bool):
+ return raw
+ return str(raw).strip().lower() in {"true", "1", "yes", "on"}
except Exception:
return False
-def _dumpSyncData(tag: str, rows: list):
- """Write raw connector data to a timestamped JSON file in local/debug/sync/."""
- if not _isDebugEnabled():
+def _resolveDebugDumpDir() -> Optional[Path]:
+ """Resolve the debug dump directory. Returns None if dumping is disabled
+ or the path could not be created."""
+ if not _isDebugDumpEnabled():
+ return None
+ try:
+ from modules.shared.configuration import APP_CONFIG
+ configured = APP_CONFIG.get("APP_DEBUG_ACCOUNTING_SYNC_DIR", None)
+ if not configured:
+ return None
+ path = Path(str(configured))
+ if not path.is_absolute():
+ gatewayDir = Path(__file__).resolve().parents[4]
+ path = gatewayDir / configured
+ path.mkdir(parents=True, exist_ok=True)
+ return path
+ except Exception as ex:
+ logger.warning(f"Could not resolve debug dump dir: {ex}")
+ return None
+
+
+def _dumpSyncData(tag: str, rows: list) -> None:
+ """Write raw connector data to a timestamped JSON file. No-op unless
+ ``APP_DEBUG_ACCOUNTING_SYNC_ENABLED`` is true AND the configured dir
+ resolves."""
+ dumpDir = _resolveDebugDumpDir()
+ if dumpDir is None:
return
try:
- d = _debugSyncDir()
ts = time.strftime("%Y%m%d-%H%M%S")
- path = d / f"{ts}_{tag}.json"
+ path = dumpDir / f"{ts}_{tag}.json"
serializable = []
for r in rows:
if isinstance(r, dict):
@@ -71,11 +109,25 @@ class AccountingDataSync:
mandateId: str,
dateFrom: Optional[str] = None,
dateTo: Optional[str] = None,
+ progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
) -> Dict[str, Any]:
"""Run a full data import for a feature instance.
- Returns a summary dict with counts per entity and any errors.
+ Returns a summary dict with counts per entity and any errors. All heavy
+ DB work is offloaded to a worker thread via ``asyncio.to_thread`` so
+ the event loop remains responsive for other requests.
+
+ ``progressCb(percent, message)`` -- if provided, called at every phase
+ boundary so the UI poll on ``GET /api/jobs/{jobId}`` shows real
+ movement instead of jumping from 10 % to 100 %. Safe to omit.
"""
+ def _progress(pct: int, msg: str) -> None:
+ if progressCb is None:
+ return
+ try:
+ progressCb(pct, msg)
+ except Exception as ex:
+ logger.warning(f"progressCb failed at {pct}%: {ex}")
from modules.features.trustee.datamodelFeatureTrustee import (
TrusteeAccountingConfig,
TrusteeDataAccount,
@@ -109,9 +161,8 @@ class AccountingDataSync:
encryptedConfig = cfgRecord.get("encryptedConfig", "")
try:
- import json
plainJson = decryptValue(encryptedConfig)
- connConfig = json.loads(plainJson) if plainJson else {}
+ connConfig = _json.loads(plainJson) if plainJson else {}
except Exception as e:
summary["errors"].append(f"Failed to decrypt config: {e}")
return summary
@@ -122,97 +173,87 @@ class AccountingDataSync:
return summary
scope = {"featureInstanceId": featureInstanceId, "mandateId": mandateId}
- logger.info(f"AccountingDataSync starting for {featureInstanceId}, connector={connectorType}, dateFrom={dateFrom}, dateTo={dateTo}")
+ logger.info(
+ f"AccountingDataSync starting for {featureInstanceId}, "
+ f"connector={connectorType}, dateFrom={dateFrom}, dateTo={dateTo}"
+ )
fetchedAccountNumbers: list = []
- # 1) Chart of accounts
+ # ---- Phase 1: Chart of accounts ----
+ # Progress budget: 15-30 %. The fetch (15 %) is usually a single
+ # snappy API call; the persist step (30 %) is bulk-insert and finishes
+ # in <100 ms even for thousands of rows.
try:
+ _progress(15, "Lade Kontenplan...")
charts = await connector.getChartOfAccounts(connConfig)
_dumpSyncData("accounts", charts)
fetchedAccountNumbers = [acc.accountNumber for acc in charts if acc.accountNumber]
- self._clearTable(TrusteeDataAccount, featureInstanceId)
- for acc in charts:
- self._if.db.recordCreate(TrusteeDataAccount, {
- "accountNumber": acc.accountNumber,
- "label": acc.label,
- "accountType": acc.accountType or "",
- "currency": "CHF",
- "isActive": True,
- **scope,
- })
- summary["accounts"] = len(charts)
+ _progress(25, f"Speichere {len(charts)} Konten...")
+ written = await asyncio.to_thread(
+ self._persistAccounts, charts, scope, featureInstanceId, TrusteeDataAccount
+ )
+ summary["accounts"] = written
+ _progress(30, f"{written} Konten gespeichert.")
except Exception as e:
logger.error(f"Import accounts failed: {e}", exc_info=True)
summary["errors"].append(f"Accounts: {e}")
- # 2) Journal entries + lines (pass already-fetched chart to avoid redundant API call)
+ # ---- Phase 2: Journal entries + lines ----
+ # Progress budget: 35-65 %. Usually the longest phase; the external
+ # API often paginates per account, so the fetch alone can take 30+ s.
try:
- rawEntries = await connector.getJournalEntries(connConfig, dateFrom=dateFrom, dateTo=dateTo, accountNumbers=fetchedAccountNumbers or None)
+ _progress(35, "Lade Journaleintraege vom Buchhaltungssystem...")
+ rawEntries = await connector.getJournalEntries(
+ connConfig,
+ dateFrom=dateFrom,
+ dateTo=dateTo,
+ accountNumbers=fetchedAccountNumbers or None,
+ )
_dumpSyncData("journalEntries", rawEntries)
- self._clearTable(TrusteeDataJournalEntry, featureInstanceId)
- self._clearTable(TrusteeDataJournalLine, featureInstanceId)
- lineCount = 0
- for raw in rawEntries:
- import uuid
- entryId = str(uuid.uuid4())
- self._if.db.recordCreate(TrusteeDataJournalEntry, {
- "id": entryId,
- "externalId": raw.get("externalId"),
- "bookingDate": raw.get("bookingDate"),
- "reference": raw.get("reference"),
- "description": raw.get("description", ""),
- "currency": raw.get("currency", "CHF"),
- "totalAmount": float(raw.get("totalAmount", 0)),
- **scope,
- })
- for line in (raw.get("lines") or []):
- self._if.db.recordCreate(TrusteeDataJournalLine, {
- "journalEntryId": entryId,
- "accountNumber": line.get("accountNumber", ""),
- "debitAmount": float(line.get("debitAmount", 0)),
- "creditAmount": float(line.get("creditAmount", 0)),
- "currency": line.get("currency", "CHF"),
- "taxCode": line.get("taxCode"),
- "costCenter": line.get("costCenter"),
- "description": line.get("description", ""),
- **scope,
- })
- lineCount += 1
- summary["journalEntries"] = len(rawEntries)
- summary["journalLines"] = lineCount
+ _progress(60, f"Speichere {len(rawEntries)} Buchungssaetze...")
+ entriesCount, linesCount = await asyncio.to_thread(
+ self._persistJournal, rawEntries, scope, featureInstanceId,
+ TrusteeDataJournalEntry, TrusteeDataJournalLine,
+ )
+ summary["journalEntries"] = entriesCount
+ summary["journalLines"] = linesCount
+ _progress(65, f"{entriesCount} Saetze + {linesCount} Buchungszeilen gespeichert.")
except Exception as e:
- logger.error(f"Import journal entries failed: {e}")
+ logger.error(f"Import journal entries failed: {e}", exc_info=True)
summary["errors"].append(f"Journal entries: {e}")
- # 3) Contacts (customers + vendors)
+ # ---- Phase 3: Contacts (customers + vendors) ----
+ # Progress budget: 70-85 %. Two quick API calls + one bulk-insert.
try:
- self._clearTable(TrusteeDataContact, featureInstanceId)
- contactCount = 0
-
+ _progress(70, "Lade Kunden...")
customers = await connector.getCustomers(connConfig)
_dumpSyncData("customers", customers)
- for c in customers:
- self._if.db.recordCreate(TrusteeDataContact, self._mapContact(c, "customer", scope))
- contactCount += 1
-
+ _progress(78, "Lade Lieferanten...")
vendors = await connector.getVendors(connConfig)
_dumpSyncData("vendors", vendors)
- for v in vendors:
- self._if.db.recordCreate(TrusteeDataContact, self._mapContact(v, "vendor", scope))
- contactCount += 1
-
+ _progress(82, f"Speichere {len(customers) + len(vendors)} Kontakte...")
+ contactCount = await asyncio.to_thread(
+ self._persistContacts, customers, vendors, scope,
+ featureInstanceId, TrusteeDataContact,
+ )
summary["contacts"] = contactCount
+ _progress(85, f"{contactCount} Kontakte gespeichert.")
except Exception as e:
logger.error(f"Import contacts failed: {e}", exc_info=True)
summary["errors"].append(f"Contacts: {e}")
- # 4) Compute account balances from journal lines
+ # ---- Phase 4: Compute account balances ----
+ # Progress budget: 90-95 %. Pure DB aggregation, no external calls.
try:
- self._clearTable(TrusteeDataAccountBalance, featureInstanceId)
- balanceCount = self._computeBalances(featureInstanceId, mandateId)
+ _progress(90, "Berechne Kontensaldi...")
+ balanceCount = await asyncio.to_thread(
+ self._persistBalances, featureInstanceId, mandateId,
+ TrusteeDataJournalEntry, TrusteeDataJournalLine, TrusteeDataAccountBalance,
+ )
summary["accountBalances"] = balanceCount
+ _progress(95, f"{balanceCount} Saldi berechnet.")
except Exception as e:
- logger.error(f"Compute balances failed: {e}")
+ logger.error(f"Compute balances failed: {e}", exc_info=True)
summary["errors"].append(f"Balances: {e}")
cfgId = cfgRecord.get("id")
@@ -255,6 +296,210 @@ class AccountingDataSync:
)
return summary
+ # ===== Sync persistence helpers (run inside asyncio.to_thread) =====
+
+ def _persistAccounts(self, charts: list, scope: Dict[str, Any],
+ featureInstanceId: str, modelAccount: Type) -> int:
+ """Bulk-replace chart of accounts for one feature instance."""
+ t0 = time.time()
+ self._bulkClear(modelAccount, featureInstanceId)
+ rows = [{
+ "accountNumber": acc.accountNumber,
+ "label": acc.label,
+ "accountType": acc.accountType or "",
+ "currency": "CHF",
+ "isActive": True,
+ **scope,
+ } for acc in charts]
+ n = self._bulkCreate(modelAccount, rows)
+ logger.info(f"Persisted {n} accounts for {featureInstanceId} in {time.time() - t0:.1f}s")
+ return n
+
+ def _persistJournal(self, rawEntries: list, scope: Dict[str, Any],
+ featureInstanceId: str, modelEntry: Type, modelLine: Type) -> tuple:
+ """Bulk-replace journal entries + journal lines for one feature instance.
+
+ We pre-build the line rows in memory keyed by the freshly minted entryId
+ so a single ``execute_values`` call can persist all of them.
+ """
+ import uuid as _uuid
+ t0 = time.time()
+ self._bulkClear(modelEntry, featureInstanceId)
+ self._bulkClear(modelLine, featureInstanceId)
+
+ entryRows: List[Dict[str, Any]] = []
+ lineRows: List[Dict[str, Any]] = []
+ for raw in rawEntries:
+ entryId = str(_uuid.uuid4())
+ entryRows.append({
+ "id": entryId,
+ "externalId": raw.get("externalId"),
+ "bookingDate": raw.get("bookingDate"),
+ "reference": raw.get("reference"),
+ "description": raw.get("description", ""),
+ "currency": raw.get("currency", "CHF"),
+ "totalAmount": float(raw.get("totalAmount", 0)),
+ **scope,
+ })
+ for line in (raw.get("lines") or []):
+ lineRows.append({
+ "journalEntryId": entryId,
+ "accountNumber": line.get("accountNumber", ""),
+ "debitAmount": float(line.get("debitAmount", 0)),
+ "creditAmount": float(line.get("creditAmount", 0)),
+ "currency": line.get("currency", "CHF"),
+ "taxCode": line.get("taxCode"),
+ "costCenter": line.get("costCenter"),
+ "description": line.get("description", ""),
+ **scope,
+ })
+ if len(entryRows) % _HEARTBEAT_EVERY == 0:
+ logger.info(
+ f"Journal build progress: {len(entryRows)}/{len(rawEntries)} entries, "
+ f"{len(lineRows)} lines so far"
+ )
+
+ entriesCount = self._bulkCreate(modelEntry, entryRows)
+ linesCount = self._bulkCreate(modelLine, lineRows)
+ logger.info(
+ f"Persisted {entriesCount} entries + {linesCount} lines for "
+ f"{featureInstanceId} in {time.time() - t0:.1f}s"
+ )
+ return entriesCount, linesCount
+
+ def _persistContacts(self, customers: list, vendors: list, scope: Dict[str, Any],
+ featureInstanceId: str, modelContact: Type) -> int:
+ """Bulk-replace contacts (customers + vendors) for one feature instance."""
+ t0 = time.time()
+ self._bulkClear(modelContact, featureInstanceId)
+ rows = [self._mapContact(c, "customer", scope) for c in customers]
+ rows += [self._mapContact(v, "vendor", scope) for v in vendors]
+ n = self._bulkCreate(modelContact, rows)
+ logger.info(f"Persisted {n} contacts for {featureInstanceId} in {time.time() - t0:.1f}s")
+ return n
+
+ def _persistBalances(self, featureInstanceId: str, mandateId: str,
+ modelEntry: Type, modelLine: Type, modelBalance: Type) -> int:
+ """Re-aggregate journal lines into monthly + annual balances."""
+ t0 = time.time()
+ self._bulkClear(modelBalance, featureInstanceId)
+
+ entries = self._if.db.getRecordset(
+ modelEntry, recordFilter={"featureInstanceId": featureInstanceId},
+ ) or []
+ entryDates: Dict[str, str] = {}
+ for e in entries:
+ eid = e.get("id") if isinstance(e, dict) else getattr(e, "id", None)
+ bdate = e.get("bookingDate") if isinstance(e, dict) else getattr(e, "bookingDate", None)
+ if eid and bdate:
+ entryDates[eid] = bdate
+
+ lines = self._if.db.getRecordset(
+ modelLine, recordFilter={"featureInstanceId": featureInstanceId},
+ ) or []
+
+ buckets: Dict[tuple, Dict[str, float]] = defaultdict(lambda: {"debit": 0.0, "credit": 0.0})
+ for ln in lines:
+ if isinstance(ln, dict):
+ jeid = ln.get("journalEntryId", "")
+ accNo = ln.get("accountNumber", "")
+ debit = float(ln.get("debitAmount", 0))
+ credit = float(ln.get("creditAmount", 0))
+ else:
+ jeid = getattr(ln, "journalEntryId", "")
+ accNo = getattr(ln, "accountNumber", "")
+ debit = float(getattr(ln, "debitAmount", 0))
+ credit = float(getattr(ln, "creditAmount", 0))
+
+ bdate = entryDates.get(jeid, "")
+ if not accNo or not bdate:
+ continue
+ parts = bdate.split("-")
+ if len(parts) < 2:
+ continue
+ try:
+ year = int(parts[0])
+ month = int(parts[1])
+ except ValueError:
+ continue
+
+ buckets[(accNo, year, month)]["debit"] += debit
+ buckets[(accNo, year, month)]["credit"] += credit
+ buckets[(accNo, year, 0)]["debit"] += debit
+ buckets[(accNo, year, 0)]["credit"] += credit
+
+ scope = {"featureInstanceId": featureInstanceId, "mandateId": mandateId}
+ rows = [{
+ "accountNumber": accNo,
+ "periodYear": year,
+ "periodMonth": month,
+ "openingBalance": 0.0,
+ "debitTotal": round(totals["debit"], 2),
+ "creditTotal": round(totals["credit"], 2),
+ "closingBalance": round(totals["debit"] - totals["credit"], 2),
+ "currency": "CHF",
+ **scope,
+ } for (accNo, year, month), totals in buckets.items()]
+ n = self._bulkCreate(modelBalance, rows)
+ logger.info(
+ f"Persisted {n} balances for {featureInstanceId} in {time.time() - t0:.1f}s"
+ )
+ return n
+
+ # ===== Low-level bulk helpers =====
+
+ def _bulkClear(self, model: Type, featureInstanceId: str) -> int:
+ """Delete every row for this feature instance in a single statement."""
+ try:
+ return self._if.db.recordDeleteWhere(
+ model, {"featureInstanceId": featureInstanceId}
+ )
+ except AttributeError:
+ # Backwards-compatible path if the connector hasn't been upgraded
+ # yet. Logs a warning so we notice in dev/CI.
+ logger.warning(
+ "DatabaseConnector.recordDeleteWhere missing — falling back to slow per-row delete for %s",
+ model.__name__,
+ )
+ records = self._if.db.getRecordset(
+ model, recordFilter={"featureInstanceId": featureInstanceId}
+ ) or []
+ count = 0
+ for r in records:
+ rid = r.get("id") if isinstance(r, dict) else getattr(r, "id", None)
+ if rid:
+ try:
+ self._if.db.recordDelete(model, rid)
+ count += 1
+ except Exception:
+ pass
+ return count
+
+ def _bulkCreate(self, model: Type, rows: List[Dict[str, Any]]) -> int:
+ """Insert all rows in a single transaction. Falls back to per-row
+ insert only if the connector lacks ``recordCreateBulk`` (legacy)."""
+ if not rows:
+ return 0
+ try:
+ return self._if.db.recordCreateBulk(model, rows)
+ except AttributeError:
+ logger.warning(
+ "DatabaseConnector.recordCreateBulk missing — falling back to slow per-row insert for %s",
+ model.__name__,
+ )
+ n = 0
+ for i, r in enumerate(rows, start=1):
+ try:
+ self._if.db.recordCreate(model, r)
+ n += 1
+ except Exception as ex:
+ logger.warning(f"Per-row insert failed at {i}/{len(rows)}: {ex}")
+ if i % _HEARTBEAT_EVERY == 0:
+ logger.info(f"Per-row insert progress: {i}/{len(rows)} rows ({model.__name__})")
+ return n
+
+ # ===== Field helpers =====
+
@staticmethod
def _safeStr(val: Any) -> str:
"""Convert a value to a safe string for DB storage, collapsing nested dicts/lists."""
@@ -286,84 +531,3 @@ class AccountingDataSync:
"vatNumber": s(raw.get("vat_identifier") or raw.get("vatNumber") or ""),
**scope,
}
-
- def _clearTable(self, model, featureInstanceId: str):
- """Delete all records for this feature instance from a TrusteeData* table."""
- records = self._if.db.getRecordset(model, recordFilter={"featureInstanceId": featureInstanceId})
- for r in (records or []):
- rid = r.get("id") if isinstance(r, dict) else getattr(r, "id", None)
- if rid:
- try:
- self._if.db.recordDelete(model, rid)
- except Exception:
- pass
-
- def _computeBalances(self, featureInstanceId: str, mandateId: str) -> int:
- """Aggregate journal lines into monthly + annual account balances."""
- from modules.features.trustee.datamodelFeatureTrustee import (
- TrusteeDataJournalEntry,
- TrusteeDataJournalLine,
- TrusteeDataAccountBalance,
- )
-
- entries = self._if.db.getRecordset(
- TrusteeDataJournalEntry,
- recordFilter={"featureInstanceId": featureInstanceId},
- ) or []
- entryDates = {}
- for e in entries:
- eid = e.get("id") if isinstance(e, dict) else getattr(e, "id", None)
- bdate = e.get("bookingDate") if isinstance(e, dict) else getattr(e, "bookingDate", None)
- if eid and bdate:
- entryDates[eid] = bdate
-
- lines = self._if.db.getRecordset(
- TrusteeDataJournalLine,
- recordFilter={"featureInstanceId": featureInstanceId},
- ) or []
-
- # key: (accountNumber, year, month)
- buckets: Dict[tuple, Dict[str, float]] = defaultdict(lambda: {"debit": 0.0, "credit": 0.0})
- for ln in lines:
- if isinstance(ln, dict):
- jeid = ln.get("journalEntryId", "")
- accNo = ln.get("accountNumber", "")
- debit = float(ln.get("debitAmount", 0))
- credit = float(ln.get("creditAmount", 0))
- else:
- jeid = getattr(ln, "journalEntryId", "")
- accNo = getattr(ln, "accountNumber", "")
- debit = float(getattr(ln, "debitAmount", 0))
- credit = float(getattr(ln, "creditAmount", 0))
-
- bdate = entryDates.get(jeid, "")
- if not accNo or not bdate:
- continue
- parts = bdate.split("-")
- if len(parts) < 2:
- continue
- year = int(parts[0])
- month = int(parts[1])
-
- buckets[(accNo, year, month)]["debit"] += debit
- buckets[(accNo, year, month)]["credit"] += credit
- buckets[(accNo, year, 0)]["debit"] += debit
- buckets[(accNo, year, 0)]["credit"] += credit
-
- count = 0
- scope = {"featureInstanceId": featureInstanceId, "mandateId": mandateId}
- for (accNo, year, month), totals in buckets.items():
- closing = totals["debit"] - totals["credit"]
- self._if.db.recordCreate(TrusteeDataAccountBalance, {
- "accountNumber": accNo,
- "periodYear": year,
- "periodMonth": month,
- "openingBalance": 0.0,
- "debitTotal": round(totals["debit"], 2),
- "creditTotal": round(totals["credit"], 2),
- "closingBalance": round(closing, 2),
- "currency": "CHF",
- **scope,
- })
- count += 1
- return count
diff --git a/modules/features/trustee/routeFeatureTrustee.py b/modules/features/trustee/routeFeatureTrustee.py
index 573d8420..3a6bfab0 100644
--- a/modules/features/trustee/routeFeatureTrustee.py
+++ b/modules/features/trustee/routeFeatureTrustee.py
@@ -1682,14 +1682,15 @@ async def _trusteeAccountingSyncJobHandler(job: Dict[str, Any], progressCb) -> D
progressCb(5, "Initialisiere Import...")
interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId)
sync = AccountingDataSync(interface)
- progressCb(10, "Lese Daten vom Buchhaltungssystem...")
+ progressCb(10, "Verbinde mit Buchhaltungssystem...")
result = await sync.importData(
featureInstanceId=instanceId,
mandateId=mandateId,
dateFrom=payload.get("dateFrom"),
dateTo=payload.get("dateTo"),
+ progressCb=progressCb,
)
- progressCb(100, "Import abgeschlossen")
+ progressCb(100, "Import abgeschlossen.")
return result
@@ -1902,6 +1903,20 @@ def _validateInstanceAdmin(instanceId: str, context: RequestContext) -> str:
return mandateId
+def _serializeRoleForApi(role) -> Dict[str, Any]:
+ """Dump a Role and resolve the multilingual ``description`` to a plain string.
+
+ The Role.description field is a ``TextMultilingual`` (``{xx, de, en, ...}``).
+ The frontend expects a plain string, so we resolve via the request language
+ here (same pattern as ``getQuickActions``). Without this the React tree
+ crashes with "Objects are not valid as a React child".
+ """
+ from modules.shared.i18nRegistry import resolveText
+ payload = role.model_dump()
+ payload["description"] = resolveText(payload.get("description"))
+ return payload
+
+
@router.get("/{instanceId}/instance-roles", response_model=PaginatedResponse)
@limiter.limit("30/minute")
def get_instance_roles(
@@ -1921,7 +1936,7 @@ def get_instance_roles(
roles = rootInterface.getRolesByFeatureCode("trustee", featureInstanceId=instanceId)
return PaginatedResponse(
- items=[r.model_dump() for r in roles],
+ items=[_serializeRoleForApi(r) for r in roles],
pagination=None
)
@@ -1947,7 +1962,7 @@ def get_instance_role(
if str(role.featureInstanceId) != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
- return role.model_dump()
+ return _serializeRoleForApi(role)
@router.get("/{instanceId}/instance-roles/{roleId}/rules", response_model=PaginatedResponse)
diff --git a/modules/routes/routeI18n.py b/modules/routes/routeI18n.py
index 7f230324..91fbd9fe 100644
--- a/modules/routes/routeI18n.py
+++ b/modules/routes/routeI18n.py
@@ -42,7 +42,11 @@ from modules.datamodels.datamodelNotification import NotificationType
from modules.interfaces.interfaceDbManagement import getInterface as getMgmtInterface
from modules.routes.routeNotifications import _createNotification
from modules.shared.configuration import APP_CONFIG
-from modules.shared.i18nRegistry import _loadCache as _reloadI18nCache, apiRouteContext
+from modules.shared.i18nRegistry import (
+ _enforceSourcePlaceholders,
+ _loadCache as _reloadI18nCache,
+ apiRouteContext,
+)
from modules.shared.timeUtils import getUtcTimestamp
routeApiMsg = apiRouteContext("routeI18n")
@@ -248,7 +252,12 @@ async def _translateBatch(
f"2. If the source is already in the target language, keep it (do not re-translate, "
f"do not paraphrase).\n"
f"3. KEEP the exact JSON keys from the input — do NOT translate or modify the keys.\n"
- f"4. KEEP placeholders like {{variable}}, {{count}}, %s, %(name)s exactly as they are.\n"
+ f"4. PLACEHOLDERS ARE SACRED. Tokens of the form {{name}}, {{count}}, "
+ f"{{konten}}, {{anyWord}}, %s, %(name)s, %d MUST be copied character-for-"
+ f"character into the translation, EVEN IF the name inside the curly braces "
+ f"looks like a German or English word. Never translate, rename, reorder, "
+ f"add, or remove placeholders. Example: '{{konten}} Konten' translated to "
+ f"English MUST stay '{{konten}} accounts' — NEVER '{{accounts}} accounts'.\n"
f"5. Preserve leading/trailing whitespace, punctuation and capitalisation pattern.\n"
f"6. Answer ONLY with a JSON object mapping source-key -> translated value in "
f"{targetLanguageLabel}. No markdown fences, no comments, no explanations.\n"
@@ -320,10 +329,31 @@ async def _translateBatch(
if batchIdx < totalBatches - 1:
await asyncio.sleep(_TRANSLATE_BATCH_PAUSE_S)
+ _enforcePlaceholdersOnBatch(result)
_matchCapitalization(keysToTranslate, result)
return result
+def _enforcePlaceholdersOnBatch(translations: Dict[str, str]) -> None:
+ """Ensure every translated value preserves the source key's placeholders.
+
+ See ``_enforceSourcePlaceholders`` for the detailed strategy. Mutates
+ ``translations`` in place; logs a warning per repaired key.
+ """
+ repaired = 0
+ for sourceKey, translatedValue in list(translations.items()):
+ fixed, changed = _enforceSourcePlaceholders(sourceKey, translatedValue)
+ if changed:
+ translations[sourceKey] = fixed
+ repaired += 1
+ logger.warning(
+ "i18n placeholder mismatch repaired: %r -> %r",
+ translatedValue, fixed,
+ )
+ if repaired:
+ logger.info("i18n batch: repaired placeholders in %d translations", repaired)
+
+
def _matchCapitalization(originals: Dict[str, str], translations: Dict[str, str]) -> None:
"""Ensure translations preserve the capitalisation pattern of the original key."""
for key, translated in translations.items():
@@ -857,6 +887,87 @@ async def sync_xx_master(
return result
+def _repairLanguageSetPlaceholders(db, code: str, userId: Optional[str]) -> dict:
+ """Persistently fix placeholder mismatches in one language set.
+
+ Walks every entry, runs ``_enforceSourcePlaceholders(key, value)`` and
+ persists any changed values back to the row. Only saves if at least one
+ entry was modified.
+ """
+ rows = db.getRecordset(UiLanguageSet, recordFilter={"id": code})
+ if not rows:
+ raise HTTPException(status_code=404, detail=routeApiMsg("Sprachset nicht gefunden"))
+ row = dict(rows[0])
+ entries = _rowEntries(row)
+ repaired: List[Dict[str, str]] = []
+ for entry in entries:
+ key = entry.get("key", "")
+ val = entry.get("value", "")
+ fixed, changed = _enforceSourcePlaceholders(key, val)
+ if changed:
+ repaired.append({"key": key, "before": val, "after": fixed})
+ entry["value"] = fixed
+
+ if repaired:
+ row["entries"] = entries
+ if "keys" in row:
+ del row["keys"]
+ row["sysModifiedAt"] = getUtcTimestamp()
+ row["sysModifiedBy"] = userId
+ db.recordModify(UiLanguageSet, code, row)
+
+ return {
+ "code": code,
+ "checked": len(entries),
+ "repaired": len(repaired),
+ "examples": repaired[:10],
+ }
+
+
+@router.post("/sets/{code}/repair-placeholders")
+async def repair_language_set_placeholders(
+ code: str,
+ adminUser: User = Depends(requireSysAdmin),
+):
+ """SysAdmin: persistently restore placeholder tokens in one language set.
+
+ Use this once after the AI translator turned ``{konten}`` into
+ ``{accounts}`` (or similar). Compares each entry's value against its
+ German source key; where the placeholder *names* differ but the *count*
+ matches, restores the source names positionally. Safe and idempotent.
+ """
+ c = code.strip().lower()
+ if c == "xx":
+ raise HTTPException(status_code=400, detail=routeApiMsg("Das xx-Set hat keine Übersetzungen."))
+ db = getMgmtInterface(adminUser, mandateId=None).db
+ result = _repairLanguageSetPlaceholders(db, c, str(adminUser.id))
+ await _reloadI18nCache()
+ return result
+
+
+@router.post("/sets/repair-placeholders-all")
+async def repair_all_language_sets_placeholders(
+ adminUser: User = Depends(requireSysAdmin),
+):
+ """SysAdmin: persistently restore placeholder tokens in ALL language sets."""
+ db = getMgmtInterface(adminUser, mandateId=None).db
+ rows = db.getRecordset(UiLanguageSet)
+ summary: List[dict] = []
+ totalRepaired = 0
+ for row in rows:
+ code = row.get("id", "")
+ if not code or code == "xx":
+ continue
+ try:
+ res = _repairLanguageSetPlaceholders(db, code, str(adminUser.id))
+ summary.append(res)
+ totalRepaired += res["repaired"]
+ except HTTPException:
+ continue
+ await _reloadI18nCache()
+ return {"languages": len(summary), "totalRepaired": totalRepaired, "details": summary}
+
+
@router.get("/sets/{code}/sync-diff")
async def get_language_sync_diff(
code: str,
diff --git a/modules/shared/i18nRegistry.py b/modules/shared/i18nRegistry.py
index f681b19f..4b942bd2 100644
--- a/modules/shared/i18nRegistry.py
+++ b/modules/shared/i18nRegistry.py
@@ -12,15 +12,59 @@ At runtime, t() returns the cached translation for the current request language.
from __future__ import annotations
import logging
+import re
from contextvars import ContextVar
from dataclasses import dataclass, field as dataclass_field
-from typing import Any, Dict, List, Optional, Type
+from typing import Any, Dict, List, Optional, Tuple, Type
from pydantic import BaseModel
logger = logging.getLogger(__name__)
+# Matches {placeholderName} tokens used by t(...) param substitution in the
+# frontend (LanguageContext._applyParams) and the gateway. Allows ASCII
+# identifiers and digits, no spaces.
+_PLACEHOLDER_PATTERN = re.compile(r"\{[A-Za-z_][A-Za-z0-9_]*\}")
+
+
+def _enforceSourcePlaceholders(sourceKey: str, translatedValue: str) -> Tuple[str, bool]:
+ """Repair a translated value so its placeholder tokens match the source key.
+
+ Background: AI translators occasionally translate the *names* of
+ placeholders even when instructed not to (e.g. ``{konten}`` -> ``{accounts}``).
+ The frontend then cannot substitute params and the user sees raw
+ ``{accounts}`` in the UI.
+
+ Strategy (positional, conservative):
+ - if the source has no placeholders -> nothing to do
+ - if source and translation have the same set of tokens -> nothing to do
+ - if both have the *same number* of tokens but different names -> swap
+ each translation token with the source token at the same position
+ - if counts differ -> leave the translation untouched (too risky to
+ guess; surfaced as a logger.warning by the caller if desired)
+
+ Returns ``(repairedValue, wasChanged)``.
+ """
+ if not sourceKey or not translatedValue:
+ return translatedValue, False
+ sourceTokens = _PLACEHOLDER_PATTERN.findall(sourceKey)
+ if not sourceTokens:
+ return translatedValue, False
+ valueTokens = _PLACEHOLDER_PATTERN.findall(translatedValue)
+ if not valueTokens:
+ return translatedValue, False
+ if sourceTokens == valueTokens:
+ return translatedValue, False
+ if len(sourceTokens) != len(valueTokens):
+ return translatedValue, False
+ parts = _PLACEHOLDER_PATTERN.split(translatedValue)
+ rebuilt = parts[0]
+ for idx, srcTok in enumerate(sourceTokens):
+ rebuilt += srcTok + parts[idx + 1]
+ return rebuilt, True
+
+
def _extractRegistrySourceText(obj: Any) -> str:
"""Resolve a str or multilingual dict to one canonical registry key string."""
if isinstance(obj, str):
@@ -492,6 +536,48 @@ def _registerNodeLabels():
logger.info("i18n node labels: %d new keys (node.*/port.* context)", added)
+def _registerAccountingConnectorLabels():
+ """Register all accounting connector configField labels (label) at boot time.
+
+ Connector ``getRequiredConfigFields()`` is normally invoked lazily at first
+ request, which is too late for the boot-sync. We discover the connectors
+ here so their ``t()`` calls register the keys before they are written to the
+ ``xx`` set and AI-translated for every active language set.
+ """
+ added = 0
+ try:
+ from modules.features.trustee.accounting.accountingRegistry import _getAccountingRegistry
+ except ImportError:
+ logger.debug("i18n accounting connectors: registry not importable")
+ return
+
+ try:
+ registry = _getAccountingRegistry()
+ except Exception as e:
+ logger.warning("i18n accounting connectors: registry init failed: %s", e)
+ return
+
+ for connectorType, connector in (registry._connectors or {}).items():
+ try:
+ for field in connector.getRequiredConfigFields():
+ key = getattr(field, "label", "") or ""
+ if not isinstance(key, str) or not key:
+ continue
+ if key not in _REGISTRY:
+ _REGISTRY[key] = _I18nRegistryEntry(
+ context=f"connector.accounting.{connectorType}",
+ value="",
+ )
+ added += 1
+ except Exception as e:
+ logger.warning(
+ "i18n accounting connector %s: failed to read fields: %s",
+ connectorType, e,
+ )
+
+ logger.info("i18n accounting connector labels: %d new keys", added)
+
+
def _registerDatamodelOptionLabels():
"""Register all frontend_options labels from Pydantic datamodels and subscription plans."""
added = 0
@@ -568,6 +654,7 @@ async def _syncRegistryToDb():
_registerServiceCenterLabels()
_registerNodeLabels()
_registerDatamodelOptionLabels()
+ _registerAccountingConnectorLabels()
if not _REGISTRY:
logger.info("i18n registry: no keys to sync (empty registry)")
@@ -668,6 +755,13 @@ async def _syncRegistryToDb():
async def _loadCache():
"""Boot hook: load all UiLanguageSets into the in-memory cache.
+ Also persistently repairs placeholder mismatches in the DB:
+ if an entry's value has placeholder *names* that differ from the
+ source key (typical AI translation mishap, e.g. ``{konten}`` ->
+ ``{accounts}``), the source names are restored positionally and the
+ row is written back to the DB. Idempotent and safe -- only mutates
+ when the placeholder count matches and the names actually differ.
+
After this, t() lookups are O(1) dict access with no DB calls.
"""
from modules.datamodels.datamodelUiLanguage import UiLanguageSet
@@ -686,6 +780,8 @@ async def _loadCache():
rows = db.getRecordset(UiLanguageSet)
_CACHE.clear()
+ repairedTotal = 0
+ persistedLanguages = 0
for row in rows:
code = row.get("id", "")
if code == "xx":
@@ -694,13 +790,49 @@ async def _loadCache():
if not isinstance(entries, list):
continue
langDict: Dict[str, str] = {}
- for e in entries:
- key = e.get("key", "")
- val = e.get("value", "")
- if key and val:
- langDict[key] = val
+ repairedInLang = 0
+ # Walk a mutable copy so we can write the corrected entries back to
+ # the row without re-reading from the DB.
+ for entry in entries:
+ key = entry.get("key", "")
+ val = entry.get("value", "")
+ if not key or not val:
+ continue
+ fixed, changed = _enforceSourcePlaceholders(key, val)
+ if changed:
+ entry["value"] = fixed
+ repairedInLang += 1
+ langDict[key] = fixed
if langDict:
_CACHE[code] = langDict
+ if repairedInLang:
+ repairedTotal += repairedInLang
+ try:
+ rowToSave = dict(row)
+ rowToSave["entries"] = entries
+ if "keys" in rowToSave:
+ del rowToSave["keys"]
+ db.recordModify(UiLanguageSet, code, rowToSave)
+ persistedLanguages += 1
+ logger.info(
+ "i18n boot repair: fixed and persisted %d placeholder mismatches in language '%s'",
+ repairedInLang, code,
+ )
+ except Exception as ex:
+ # Persistence is best-effort -- the in-memory cache is
+ # already correct (langDict above contains the fixed
+ # values), so the UI works either way. Log and move on.
+ logger.warning(
+ "i18n boot repair: in-memory fixed %d entries in '%s' but DB persist failed: %s",
+ repairedInLang, code, ex,
+ )
- logger.info("i18n cache loaded: %d languages, %d total keys",
- len(_CACHE), sum(len(v) for v in _CACHE.values()))
+ logger.info(
+ "i18n cache loaded: %d languages, %d total keys%s",
+ len(_CACHE), sum(len(v) for v in _CACHE.values()),
+ (
+ f" (boot-repaired {repairedTotal} placeholders, "
+ f"persisted to {persistedLanguages} language sets)"
+ if repairedTotal else ""
+ ),
+ )