Harden trustee position ingestion and auto-clean corrupt records.

Normalize AI-derived date/time and numeric fields before writes, tighten extraction prompts for unix timestamps, and prevent /positions crashes by normalizing or deleting irreparably invalid legacy TrusteePosition records during reads.

Made-with: Cursor
This commit is contained in:
patrick-motsch 2026-02-27 23:30:37 +01:00
parent 80e8197d96
commit a527806436
2 changed files with 183 additions and 13 deletions

View file

@ -8,7 +8,9 @@ Manages trustee organisations, roles, access, contracts, documents, and position
import logging import logging
import math import math
import uuid import uuid
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Union from typing import Dict, Any, List, Optional, Union
from pydantic import ValidationError
from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG from modules.shared.configuration import APP_CONFIG
@ -32,6 +34,118 @@ logger = logging.getLogger(__name__)
_trusteeInterfaces = {} _trusteeInterfaces = {}
def _toSafeFloat(value: Any, defaultValue: float = 0.0) -> float:
"""Convert mixed numeric inputs (str/number) to float safely."""
if value is None or value == "":
return defaultValue
if isinstance(value, (int, float)):
return float(value)
try:
textValue = str(value).strip().replace("'", "").replace(" ", "")
if "," in textValue and "." not in textValue:
textValue = textValue.replace(",", ".")
return float(textValue)
except Exception:
return defaultValue
def _normaliseIsoDate(value: Any) -> Optional[str]:
"""Normalise date-like input to ISO date format YYYY-MM-DD."""
if value is None or value == "":
return None
if isinstance(value, (int, float)):
try:
return datetime.fromtimestamp(float(value), tz=timezone.utc).date().isoformat()
except Exception:
return None
textValue = str(value).strip()
if not textValue:
return None
# Try common explicit formats first (incl. Swiss/European notation).
for formatValue in (
"%Y-%m-%d",
"%d.%m.%Y",
"%d/%m/%Y",
"%d-%m-%Y",
"%Y/%m/%d",
"%Y.%m.%d",
):
try:
return datetime.strptime(textValue, formatValue).date().isoformat()
except Exception:
continue
# Try ISO datetime variants.
try:
return datetime.fromisoformat(textValue.replace("Z", "+00:00")).date().isoformat()
except Exception:
return None
def _normaliseTimestamp(value: Any, fallbackIsoDate: Optional[str] = None) -> Optional[float]:
"""Normalise timestamp input to unix seconds (float)."""
if value is None or value == "":
if fallbackIsoDate:
try:
fallbackDate = datetime.strptime(fallbackIsoDate, "%Y-%m-%d").replace(tzinfo=timezone.utc)
return float(fallbackDate.timestamp())
except Exception:
return None
return None
if isinstance(value, (int, float)):
return float(value)
textValue = str(value).strip()
if not textValue:
return None
numericTimestamp = _toSafeFloat(textValue, defaultValue=float("nan"))
if not math.isnan(numericTimestamp):
return float(numericTimestamp)
# Accept date-only input and normalise to midnight UTC timestamp.
isoDate = _normaliseIsoDate(textValue)
if isoDate:
try:
parsedDate = datetime.strptime(isoDate, "%Y-%m-%d").replace(tzinfo=timezone.utc)
return float(parsedDate.timestamp())
except Exception:
return None
return None
def _sanitisePositionPayload(data: Dict[str, Any]) -> Dict[str, Any]:
"""Failsafe normalisation for TrusteePosition payloads before DB writes."""
safeData = dict(data or {})
isoValuta = _normaliseIsoDate(safeData.get("valuta"))
safeData["valuta"] = isoValuta
safeData["transactionDateTime"] = _normaliseTimestamp(
safeData.get("transactionDateTime"),
fallbackIsoDate=isoValuta,
)
safeData["bookingAmount"] = _toSafeFloat(safeData.get("bookingAmount"), defaultValue=0.0)
safeData["originalAmount"] = _toSafeFloat(
safeData.get("originalAmount"),
defaultValue=safeData["bookingAmount"],
)
safeData["vatPercentage"] = _toSafeFloat(safeData.get("vatPercentage"), defaultValue=0.0)
safeData["vatAmount"] = _toSafeFloat(safeData.get("vatAmount"), defaultValue=0.0)
bookingCurrency = (safeData.get("bookingCurrency") or "CHF")
originalCurrency = (safeData.get("originalCurrency") or bookingCurrency)
safeData["bookingCurrency"] = str(bookingCurrency).upper()
safeData["originalCurrency"] = str(originalCurrency).upper()
return safeData
def getInterface(currentUser: User, mandateId: Optional[Union[str, uuid.UUID]] = None, featureInstanceId: Optional[str] = None) -> "TrusteeObjects": def getInterface(currentUser: User, mandateId: Optional[Union[str, uuid.UUID]] = None, featureInstanceId: Optional[str] = None) -> "TrusteeObjects":
"""Get or create a TrusteeObjects instance for the given user context. """Get or create a TrusteeObjects instance for the given user context.
@ -1133,6 +1247,37 @@ class TrusteeObjects:
# ===== Position CRUD ===== # ===== Position CRUD =====
def _toTrusteePositionOrDelete(self, rawRecord: Dict[str, Any], deleteCorrupt: bool = True) -> Optional[TrusteePosition]:
"""Build TrusteePosition safely; optionally delete irreparably corrupt records."""
cleanRecord = {k: v for k, v in (rawRecord or {}).items() if not k.startswith("_") or k == "_createdAt"}
if not cleanRecord:
return None
normalisedRecord = _sanitisePositionPayload(cleanRecord)
recordId = normalisedRecord.get("id") or cleanRecord.get("id")
try:
model = TrusteePosition(**normalisedRecord)
if recordId and normalisedRecord != cleanRecord:
try:
self.db.recordModify(TrusteePosition, recordId, normalisedRecord)
logger.info(f"Normalised legacy TrusteePosition record: {recordId}")
except Exception as writeErr:
logger.warning(f"Could not persist normalised TrusteePosition {recordId}: {writeErr}")
return model
except ValidationError as err:
logger.error(f"Corrupt TrusteePosition record detected (id={recordId}): {err}")
if deleteCorrupt and recordId:
try:
deleted = self.db.recordDelete(TrusteePosition, recordId)
if deleted:
logger.warning(f"Deleted corrupt TrusteePosition record: {recordId}")
else:
logger.warning(f"Failed to delete corrupt TrusteePosition record: {recordId}")
except Exception as deleteErr:
logger.error(f"Error deleting corrupt TrusteePosition record {recordId}: {deleteErr}")
return None
def createPosition(self, data: Dict[str, Any]) -> Optional[TrusteePosition]: def createPosition(self, data: Dict[str, Any]) -> Optional[TrusteePosition]:
"""Create a new position. """Create a new position.
@ -1144,6 +1289,9 @@ class TrusteeObjects:
logger.warning(f"User {self.userId} lacks permission to create position") logger.warning(f"User {self.userId} lacks permission to create position")
return None return None
# Failsafe normalisation to keep DB payload stable for AI/manual inputs.
data = _sanitisePositionPayload(data)
# Auto-set context fields # Auto-set context fields
data["mandateId"] = self.mandateId data["mandateId"] = self.mandateId
data["featureInstanceId"] = self.featureInstanceId data["featureInstanceId"] = self.featureInstanceId
@ -1160,7 +1308,7 @@ class TrusteeObjects:
createdRecord = self.db.recordCreate(TrusteePosition, data) createdRecord = self.db.recordCreate(TrusteePosition, data)
if createdRecord and createdRecord.get("id"): if createdRecord and createdRecord.get("id"):
return TrusteePosition(**{k: v for k, v in createdRecord.items() if not k.startswith("_")}) return self._toTrusteePositionOrDelete(createdRecord, deleteCorrupt=False)
return None return None
def getPosition(self, positionId: str) -> Optional[TrusteePosition]: def getPosition(self, positionId: str) -> Optional[TrusteePosition]:
@ -1168,7 +1316,7 @@ class TrusteeObjects:
records = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId}) records = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
if not records: if not records:
return None return None
return TrusteePosition(**{k: v for k, v in records[0].items() if not k.startswith("_")}) return self._toTrusteePositionOrDelete(records[0], deleteCorrupt=True)
def getAllPositions(self, params: Optional[PaginationParams] = None) -> PaginatedResult: def getAllPositions(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all positions with RBAC filtering + feature-level access filtering.""" """Get all positions with RBAC filtering + feature-level access filtering."""
@ -1198,8 +1346,12 @@ class TrusteeObjects:
# Step 3: Apply sorting # Step 3: Apply sorting
sortedRecords = self._applySorting(filteredRecords, params) sortedRecords = self._applySorting(filteredRecords, params)
# Step 4: Convert to Pydantic objects # Step 4: Convert to Pydantic objects and cleanup corrupt legacy records.
pydanticItems = [TrusteePosition(**r) for r in sortedRecords] pydanticItems = []
for record in sortedRecords:
position = self._toTrusteePositionOrDelete(record, deleteCorrupt=True)
if position is not None:
pydanticItems.append(position)
# Step 5: Apply pagination # Step 5: Apply pagination
totalItems = len(pydanticItems) totalItems = len(pydanticItems)
@ -1235,7 +1387,12 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId, featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE featureCode=self.FEATURE_CODE
) )
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records] safeItems = []
for record in records:
position = self._toTrusteePositionOrDelete(record, deleteCorrupt=True)
if position is not None:
safeItems.append(position)
return safeItems
def getPositionsByOrganisation(self, organisationId: str) -> List[TrusteePosition]: def getPositionsByOrganisation(self, organisationId: str) -> List[TrusteePosition]:
"""Get all positions for a specific organisation.""" """Get all positions for a specific organisation."""
@ -1250,7 +1407,12 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId, featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE featureCode=self.FEATURE_CODE
) )
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records] safeItems = []
for record in records:
position = self._toTrusteePositionOrDelete(record, deleteCorrupt=True)
if position is not None:
safeItems.append(position)
return safeItems
def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[TrusteePosition]: def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[TrusteePosition]:
"""Update a position. """Update a position.
@ -1276,7 +1438,7 @@ class TrusteeObjects:
updatedRecord = self.db.recordModify(TrusteePosition, positionId, data) updatedRecord = self.db.recordModify(TrusteePosition, positionId, data)
if not updatedRecord: if not updatedRecord:
return None return None
return TrusteePosition(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")}) return self._toTrusteePositionOrDelete(updatedRecord, deleteCorrupt=False)
def deletePosition(self, positionId: str) -> bool: def deletePosition(self, positionId: str) -> bool:
"""Delete a position.""" """Delete a position."""
@ -1309,7 +1471,12 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId, featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE featureCode=self.FEATURE_CODE
) )
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records] safeItems = []
for record in records:
position = self._toTrusteePositionOrDelete(record, deleteCorrupt=True)
if position is not None:
safeItems.append(position)
return safeItems
# ===== Trustee-specific Access Check ===== # ===== Trustee-specific Access Check =====

View file

@ -41,10 +41,11 @@ _CLASSIFY_PROMPT = (
# Phase 2: Type-specific structuring prompts (placeholders: {expenseList}, {bankList}) # Phase 2: Type-specific structuring prompts (placeholders: {expenseList}, {bankList})
_PROMPT_EXPENSE_RECEIPT = ( _PROMPT_EXPENSE_RECEIPT = (
"Extrahiere aus dem folgenden Dokument eine Buchung pro Ausgabeposition. " "Extrahiere aus dem folgenden Dokument eine Buchung pro Ausgabeposition. "
"Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc, " "Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta (YYYY-MM-DD), transactionDateTime (unix seconds, numeric), company, desc, "
"bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, " "bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, "
"debitAccountNumber (NUR die Kontonummer, z.B. \"6200\", aus: {expenseList}), " "debitAccountNumber (NUR die Kontonummer, z.B. \"6200\", aus: {expenseList}), "
"creditAccountNumber (NUR die Kontonummer, z.B. \"1020\", aus: {bankList}), tags, taxCode, costCenter, bookingReference." "creditAccountNumber (NUR die Kontonummer, z.B. \"1020\", aus: {bankList}), tags, taxCode, costCenter, bookingReference. "
"WICHTIG: transactionDateTime muss eine ZAHL sein (z.B. 1737417600), niemals '21.01.2026'."
) )
_PROMPT_BANK_DOCUMENT = ( _PROMPT_BANK_DOCUMENT = (
"Extrahiere aus dem folgenden Bankauszug eine Buchung pro Transaktionszeile. " "Extrahiere aus dem folgenden Bankauszug eine Buchung pro Transaktionszeile. "
@ -58,14 +59,16 @@ _PROMPT_INVOICE = (
"Return JSON: {{\"records\": [{{...}}]}}. Record: valuta (Rechnungsdatum), company (Kreditor), desc (Rechnungsdetails), " "Return JSON: {{\"records\": [{{...}}]}}. Record: valuta (Rechnungsdatum), company (Kreditor), desc (Rechnungsdetails), "
"bookingAmount, bookingCurrency, vatPercentage, vatAmount, " "bookingAmount, bookingCurrency, vatPercentage, vatAmount, "
"debitAccountNumber (NUR die Kontonummer aus: {expenseList}), creditAccountNumber (NUR die Kontonummer aus: {bankList}), " "debitAccountNumber (NUR die Kontonummer aus: {expenseList}), creditAccountNumber (NUR die Kontonummer aus: {bankList}), "
"bookingReference (Rechnungsnummer), transactionDateTime, taxCode, costCenter." "bookingReference (Rechnungsnummer), transactionDateTime, taxCode, costCenter. "
"Formatregeln: valuta nur YYYY-MM-DD; transactionDateTime nur unix seconds als Zahl."
) )
_PROMPT_FALLBACK = ( _PROMPT_FALLBACK = (
"Extrahiere aus dem folgenden Dokument Buchungsdaten. " "Extrahiere aus dem folgenden Dokument Buchungsdaten. "
"Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta (YYYY-MM-DD), transactionDateTime (unix seconds), company, desc, " "Return JSON: {{\"records\": [{{...}}]}}. Jeder Record: valuta (YYYY-MM-DD), transactionDateTime (unix seconds, numeric), company, desc, "
"bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, " "bookingCurrency, bookingAmount, originalCurrency, originalAmount, vatPercentage, vatAmount, "
"debitAccountNumber (NUR die Kontonummer, z.B. \"6200\", aus: {expenseList}), " "debitAccountNumber (NUR die Kontonummer, z.B. \"6200\", aus: {expenseList}), "
"creditAccountNumber (NUR die Kontonummer, z.B. \"1020\", aus: {bankList}), tags, taxCode, costCenter, bookingReference." "creditAccountNumber (NUR die Kontonummer, z.B. \"1020\", aus: {bankList}), tags, taxCode, costCenter, bookingReference. "
"WICHTIG: keine lokalen Datumsformate in transactionDateTime (kein DD.MM.YYYY)."
) )