From 65a495dc36afb92f2dd865dfbd8e08d53804e777 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sat, 4 Apr 2026 16:45:46 +0200
Subject: [PATCH] core class for system attributes sysCreated / sysModified
---
modules/connectors/connectorDbPostgre.py | 159 +-----------------
.../interfaceFeatureAutomation2.py | 5 +-
modules/interfaces/interfaceBootstrap.py | 91 +---------
scripts/script_db_export_migration.py | 2 +-
4 files changed, 9 insertions(+), 248 deletions(-)
diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py
index 6bd661b4..2c7eeab3 100644
--- a/modules/connectors/connectorDbPostgre.py
+++ b/modules/connectors/connectorDbPostgre.py
@@ -5,7 +5,7 @@ import re
import psycopg2
import psycopg2.extras
import logging
-from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type, Set, Tuple
+from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type
import uuid
from pydantic import BaseModel, Field
import threading
@@ -158,88 +158,10 @@ def _parseRecordFields(record: Dict[str, Any], fields: Dict[str, str], context:
logger.warning(f"Could not parse JSONB field {fieldName}, keeping as string ({context})")
-# Legacy column names (historical _* internal names and old camelCase audit fields) -> PowerOn sys* columns.
-# Order matters: more specific / underscore names first; first successful copy wins per cell via IS NULL on target.
-_LEGACY_FIELD_TO_SYS: Tuple[Tuple[str, str], ...] = (
- ("_createdAt", "sysCreatedAt"),
- ("_createdBy", "sysCreatedBy"),
- ("_modifiedAt", "sysModifiedAt"),
- ("_modifiedBy", "sysModifiedBy"),
- ("createdAt", "sysCreatedAt"),
- ("creationDate", "sysCreatedAt"),
- ("updatedAt", "sysModifiedAt"),
- ("lastModified", "sysModifiedAt"),
-)
-
-
def _quotePgIdent(name: str) -> str:
return '"' + str(name).replace('"', '""') + '"'
-def _resolveColumnCaseInsensitive(cols: Set[str], logicalName: str) -> Optional[str]:
- """Match information_schema column_name to logical CamelCase (PG folds unquoted legacy names to lowercase)."""
- if not logicalName or not cols:
- return None
- for c in cols:
- if c.lower() == logicalName.lower():
- return c
- return None
-
-
-def _pgColumnDataType(cursor, tablePg: str, colPg: str) -> Optional[str]:
- cursor.execute(
- """
- SELECT data_type FROM information_schema.columns
- WHERE table_schema = 'public' AND table_name = %s AND column_name = %s
- """,
- (tablePg, colPg),
- )
- row = cursor.fetchone()
- return row["data_type"] if row else None
-
-
-def _legacySourceToSysSqlExpr(srcIdent: str, srcType: Optional[str], tgtType: Optional[str]) -> str:
- """Build RHS for UPDATE sys* = expr from legacy _* column (handles text/timestamp -> double precision)."""
- s = _quotePgIdent(srcIdent)
- sl = (srcType or "").lower()
- tl = (tgtType or "").lower()
- if "double" in tl or tl == "real" or tl == "numeric":
- if any(x in sl for x in ("double precision", "real", "numeric", "integer", "bigint", "smallint")):
- return f"{s}::double precision"
- if "timestamp" in sl or sl == "date":
- return f"EXTRACT(EPOCH FROM {s}::timestamptz)"
- if "text" in sl or "character" in sl or sl == "uuid":
- return (
- f"CASE WHEN trim({s}::text) ~ '^[+-]?[0-9]+(\\.[0-9]*)?([eE][+-]?[0-9]+)?$' "
- f"THEN trim({s}::text)::double precision "
- f"ELSE EXTRACT(EPOCH FROM trim({s}::text)::timestamptz) END"
- )
- return s
- return s
-
-
-def _listPublicBaseTableNames(cursor) -> List[str]:
- cursor.execute(
- """
- SELECT table_name FROM information_schema.tables
- WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
- ORDER BY table_name
- """
- )
- return [row["table_name"] for row in cursor.fetchall()]
-
-
-def _listTableColumnNames(cursor, tableName: str) -> Set[str]:
- cursor.execute(
- """
- SELECT column_name FROM information_schema.columns
- WHERE table_schema = 'public' AND table_name = %s
- """,
- (tableName,),
- )
- return {row["column_name"] for row in cursor.fetchall()}
-
-
# Cache connectors by (host, database, port) to avoid duplicate inits for same database.
# Thread safety: _connector_cache_lock protects cache access. userId is request-scoped via
# contextvars to avoid races when concurrent requests share the same connector.
@@ -456,63 +378,6 @@ class DatabaseConnector:
logger.warning(f"Connection lost, reconnecting: {e}")
self._connect()
- def migrateLegacyUnderscoreSysColumns(self) -> int:
- """
- Scan all public base tables on this connection's database. Where both a legacy
- source column (any case: _createdAt, createdAt, creationDate, …) and the matching
- sys* column exist, UPDATE sys* from legacy where sys* IS NULL AND legacy IS NOT NULL.
- Idempotent; run after schema adds sys* columns (see _ensureTableExists).
- """
- self._ensure_connection()
- total = 0
- try:
- with self.connection.cursor() as cursor:
- tableNames = _listPublicBaseTableNames(cursor)
- for table in tableNames:
- with self.connection.cursor() as cursor:
- cols = _listTableColumnNames(cursor, table)
- for legacyLogical, sysLogical in _LEGACY_FIELD_TO_SYS:
- src = _resolveColumnCaseInsensitive(cols, legacyLogical)
- tgt = _resolveColumnCaseInsensitive(cols, sysLogical)
- if not src or not tgt or src == tgt:
- continue
- try:
- with self.connection.cursor() as cursor:
- srcType = _pgColumnDataType(cursor, table, src)
- tgtType = _pgColumnDataType(cursor, table, tgt)
- expr = _legacySourceToSysSqlExpr(src, srcType, tgtType)
- tq = _quotePgIdent(table)
- tr = _quotePgIdent(tgt)
- sr = _quotePgIdent(src)
- sql = (
- f"UPDATE {tq} SET {tr} = {expr} "
- f"WHERE {tr} IS NULL AND {sr} IS NOT NULL"
- )
- cursor.execute(sql)
- n = cursor.rowcount
- self.connection.commit()
- total += n
- except Exception as e:
- try:
- self.connection.rollback()
- except Exception:
- pass
- logger.debug(
- f"migrateLegacyUnderscoreSysColumns skip {self.dbDatabase}.{table} "
- f"{src}->{tgt}: {e}"
- )
- except Exception as e:
- logger.error(f"migrateLegacyUnderscoreSysColumns failed on {self.dbDatabase}: {e}")
- try:
- self.connection.rollback()
- except Exception:
- pass
- if total:
- logger.info(
- f"migrateLegacyUnderscoreSysColumns: {total} cell(s) in {self.dbDatabase}"
- )
- return total
-
def _initializeSystemTable(self):
"""Initializes the system table if it doesn't exist yet."""
try:
@@ -634,7 +499,6 @@ class DatabaseConnector:
try:
self._ensure_connection()
- schemaTouched = False
with self.connection.cursor() as cursor:
# Check if table exists by querying information_schema with case-insensitive search
@@ -653,7 +517,6 @@ class DatabaseConnector:
logger.info(
f"Created table '{table}' with columns from Pydantic model"
)
- schemaTouched = True
else:
# Table exists: ensure all columns from model are present (simple additive migration)
try:
@@ -687,7 +550,6 @@ class DatabaseConnector:
logger.info(
f"Added missing column '{col}' ({sql_type}) to '{table}'"
)
- schemaTouched = True
except Exception as add_err:
logger.warning(
f"Could not add column '{col}' to '{table}': {add_err}"
@@ -698,23 +560,6 @@ class DatabaseConnector:
)
self.connection.commit()
- if schemaTouched:
- try:
- n = self.migrateLegacyUnderscoreSysColumns()
- if n:
- logger.info(
- "After schema change on %s.%s: legacy -> sys* migration wrote %s cell(s)",
- self.dbDatabase,
- table,
- n,
- )
- except Exception as mig_err:
- logger.error(
- "migrateLegacyUnderscoreSysColumns failed after schema change %s.%s: %s",
- self.dbDatabase,
- table,
- mig_err,
- )
return True
except Exception as e:
logger.error(f"Error ensuring table {table} exists: {e}")
@@ -893,7 +738,7 @@ class DatabaseConnector:
effective_user_id = self.userId
currentTime = getUtcTimestamp()
# Set sysCreatedAt/sysCreatedBy on first persist; always refresh modified fields.
- # Treat None and 0 as unset (legacy rows / bad defaults); model_dump often has sysCreatedAt=None.
+ # Treat None and 0 as unset (empty / bad defaults); model_dump often has sysCreatedAt=None.
createdTs = record.get("sysCreatedAt")
if createdTs is None or createdTs == 0 or createdTs == 0.0:
record["sysCreatedAt"] = currentTime
diff --git a/modules/features/automation2/interfaceFeatureAutomation2.py b/modules/features/automation2/interfaceFeatureAutomation2.py
index b38b21db..cec51181 100644
--- a/modules/features/automation2/interfaceFeatureAutomation2.py
+++ b/modules/features/automation2/interfaceFeatureAutomation2.py
@@ -340,7 +340,10 @@ class Automation2Objects:
for r in runs:
wf = wf_by_id.get(r.get("workflowId"), {})
r["workflowLabel"] = wf.get("label") or r.get("workflowId", "")
- runs.sort(key=lambda x: (x.get("_modifiedAt") or x.get("_createdAt") or 0), reverse=True)
+ runs.sort(
+ key=lambda x: (x.get("sysModifiedAt") or x.get("sysCreatedAt") or 0),
+ reverse=True,
+ )
return runs[:limit]
def getRunsWaitingForEmail(self) -> List[Dict[str, Any]]:
diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py
index 93b17d6a..b3b8bcd0 100644
--- a/modules/interfaces/interfaceBootstrap.py
+++ b/modules/interfaces/interfaceBootstrap.py
@@ -11,9 +11,9 @@ Multi-Tenant Design:
"""
import logging
-from typing import Optional, Dict, Tuple
+from typing import Optional, Dict
from passlib.context import CryptContext
-from modules.connectors.connectorDbPostgre import DatabaseConnector, _get_cached_connector
+from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelUam import (
Mandate,
@@ -38,90 +38,6 @@ pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
# Cache für Role-IDs (roleLabel -> roleId)
_roleIdCache: Dict[str, str] = {}
-# PowerOn logical databases to scan (same set as gateway/scripts/script_db_export_migration.py ALL_DATABASES).
-_POWERON_DATABASE_NAMES: Tuple[str, ...] = (
- "poweron_app",
- "poweron_automation",
- "poweron_automation2",
- "poweron_billing",
- "poweron_chat",
- "poweron_chatbot",
- "poweron_commcoach",
- "poweron_knowledge",
- "poweron_management",
- "poweron_neutralization",
- "poweron_realestate",
- "poweron_teamsbot",
- "poweron_test",
- "poweron_trustee",
- "poweron_workspace",
-)
-
-
-def _configPrefixForPoweronDatabase(dbName: str) -> str:
- return {
- "poweron_app": "DB_APP",
- "poweron_chat": "DB_CHAT",
- "poweron_chatbot": "DB_CHATBOT",
- "poweron_management": "DB_MANAGEMENT",
- "poweron_realestate": "DB_REALESTATE",
- "poweron_trustee": "DB_TRUSTEE",
- # Same as initAutomationTemplates: default DB_* (not a separate DB_AUTOMATION_* prefix).
- "poweron_automation": "DB",
- "poweron_billing": "DB",
- }.get(dbName, "DB")
-
-
-def _openConnectorForPoweronDatabase(dbName: str) -> Optional[DatabaseConnector]:
- """Connect to a named PowerOn database using DB_* / DB_APP_* style config (shared with export script)."""
- prefix = _configPrefixForPoweronDatabase(dbName)
- host = APP_CONFIG.get(f"{prefix}_HOST") or APP_CONFIG.get("DB_HOST", "localhost")
- user = APP_CONFIG.get(f"{prefix}_USER") or APP_CONFIG.get("DB_USER")
- password = APP_CONFIG.get(f"{prefix}_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD_SECRET")
- portRaw = APP_CONFIG.get(f"{prefix}_PORT") or APP_CONFIG.get("DB_PORT", 5432)
- try:
- port = int(portRaw)
- except (TypeError, ValueError):
- port = 5432
- if not user or not password:
- logger.debug(
- f"bootstrap: skip legacy _* -> sys* migration for {dbName} (missing credentials for {prefix})"
- )
- return None
- try:
- return _get_cached_connector(
- dbHost=host,
- dbDatabase=dbName,
- dbUser=user,
- dbPassword=password,
- dbPort=port,
- userId=None,
- )
- except Exception as e:
- logger.warning(f"bootstrap: cannot open {dbName} for legacy _* -> sys* migration: {e}")
- return None
-
-
-def migrateLegacyUnderscoreSysColumnsAllPoweronDatabases() -> None:
- """
- Run DatabaseConnector.migrateLegacyUnderscoreSysColumns on every configured PowerOn database.
- Actual table scan and SQL live in the connector module.
- """
- grandTotal = 0
- for dbName in _POWERON_DATABASE_NAMES:
- conn = _openConnectorForPoweronDatabase(dbName)
- if not conn:
- continue
- try:
- grandTotal += conn.migrateLegacyUnderscoreSysColumns()
- except Exception as e:
- logger.warning(f"bootstrap: migrateLegacyUnderscoreSysColumns failed for {dbName}: {e}")
- if grandTotal:
- logger.info(
- f"bootstrap: legacy _* -> sys* migration total {grandTotal} cell(s) across PowerOn databases"
- )
-
-
def initBootstrap(db: DatabaseConnector) -> None:
"""
Main bootstrap entry point - initializes all system components.
@@ -134,9 +50,6 @@ def initBootstrap(db: DatabaseConnector) -> None:
# Initialize root mandate
mandateId = initRootMandate(db)
- # Copy legacy _createdAt/_createdBy/_modifiedAt/_modifiedBy into sys* on all PowerOn DBs (connector routine)
- migrateLegacyUnderscoreSysColumnsAllPoweronDatabases()
-
# Migrate existing mandate records: description -> label
_migrateMandateDescriptionToLabel(db)
diff --git a/scripts/script_db_export_migration.py b/scripts/script_db_export_migration.py
index b85dcf54..73c13b25 100644
--- a/scripts/script_db_export_migration.py
+++ b/scripts/script_db_export_migration.py
@@ -99,7 +99,7 @@ try:
except Exception as e:
logger.warning(f"Could not refresh APP_CONFIG: {e}")
-# Alle PowerOn Datenbanken (keep in sync with interfaceBootstrap._POWERON_DATABASE_NAMES)
+# Alle PowerOn Datenbanken (für Export / Migration-Skripte)
ALL_DATABASES = [
"poweron_app",
"poweron_automation",