core class for system attributes sysCreated / sysModified

This commit is contained in:
ValueOn AG 2026-04-04 16:45:46 +02:00
parent 50bf59879f
commit 65a495dc36
4 changed files with 9 additions and 248 deletions

View file

@ -5,7 +5,7 @@ import re
import psycopg2
import psycopg2.extras
import logging
from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type, Set, Tuple
from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type
import uuid
from pydantic import BaseModel, Field
import threading
@ -158,88 +158,10 @@ def _parseRecordFields(record: Dict[str, Any], fields: Dict[str, str], context:
logger.warning(f"Could not parse JSONB field {fieldName}, keeping as string ({context})")
# Legacy column names (historical _* internal names and old camelCase audit fields) -> PowerOn sys* columns.
# Order matters: more specific / underscore names first; first successful copy wins per cell via IS NULL on target.
_LEGACY_FIELD_TO_SYS: Tuple[Tuple[str, str], ...] = (
("_createdAt", "sysCreatedAt"),
("_createdBy", "sysCreatedBy"),
("_modifiedAt", "sysModifiedAt"),
("_modifiedBy", "sysModifiedBy"),
("createdAt", "sysCreatedAt"),
("creationDate", "sysCreatedAt"),
("updatedAt", "sysModifiedAt"),
("lastModified", "sysModifiedAt"),
)
def _quotePgIdent(name: str) -> str:
return '"' + str(name).replace('"', '""') + '"'
def _resolveColumnCaseInsensitive(cols: Set[str], logicalName: str) -> Optional[str]:
"""Match information_schema column_name to logical CamelCase (PG folds unquoted legacy names to lowercase)."""
if not logicalName or not cols:
return None
for c in cols:
if c.lower() == logicalName.lower():
return c
return None
def _pgColumnDataType(cursor, tablePg: str, colPg: str) -> Optional[str]:
cursor.execute(
"""
SELECT data_type FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s AND column_name = %s
""",
(tablePg, colPg),
)
row = cursor.fetchone()
return row["data_type"] if row else None
def _legacySourceToSysSqlExpr(srcIdent: str, srcType: Optional[str], tgtType: Optional[str]) -> str:
"""Build RHS for UPDATE sys* = expr from legacy _* column (handles text/timestamp -> double precision)."""
s = _quotePgIdent(srcIdent)
sl = (srcType or "").lower()
tl = (tgtType or "").lower()
if "double" in tl or tl == "real" or tl == "numeric":
if any(x in sl for x in ("double precision", "real", "numeric", "integer", "bigint", "smallint")):
return f"{s}::double precision"
if "timestamp" in sl or sl == "date":
return f"EXTRACT(EPOCH FROM {s}::timestamptz)"
if "text" in sl or "character" in sl or sl == "uuid":
return (
f"CASE WHEN trim({s}::text) ~ '^[+-]?[0-9]+(\\.[0-9]*)?([eE][+-]?[0-9]+)?$' "
f"THEN trim({s}::text)::double precision "
f"ELSE EXTRACT(EPOCH FROM trim({s}::text)::timestamptz) END"
)
return s
return s
def _listPublicBaseTableNames(cursor) -> List[str]:
cursor.execute(
"""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
ORDER BY table_name
"""
)
return [row["table_name"] for row in cursor.fetchall()]
def _listTableColumnNames(cursor, tableName: str) -> Set[str]:
cursor.execute(
"""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
""",
(tableName,),
)
return {row["column_name"] for row in cursor.fetchall()}
# Cache connectors by (host, database, port) to avoid duplicate inits for same database.
# Thread safety: _connector_cache_lock protects cache access. userId is request-scoped via
# contextvars to avoid races when concurrent requests share the same connector.
@ -456,63 +378,6 @@ class DatabaseConnector:
logger.warning(f"Connection lost, reconnecting: {e}")
self._connect()
def migrateLegacyUnderscoreSysColumns(self) -> int:
"""
Scan all public base tables on this connection's database. Where both a legacy
source column (any case: _createdAt, createdAt, creationDate, ) and the matching
sys* column exist, UPDATE sys* from legacy where sys* IS NULL AND legacy IS NOT NULL.
Idempotent; run after schema adds sys* columns (see _ensureTableExists).
"""
self._ensure_connection()
total = 0
try:
with self.connection.cursor() as cursor:
tableNames = _listPublicBaseTableNames(cursor)
for table in tableNames:
with self.connection.cursor() as cursor:
cols = _listTableColumnNames(cursor, table)
for legacyLogical, sysLogical in _LEGACY_FIELD_TO_SYS:
src = _resolveColumnCaseInsensitive(cols, legacyLogical)
tgt = _resolveColumnCaseInsensitive(cols, sysLogical)
if not src or not tgt or src == tgt:
continue
try:
with self.connection.cursor() as cursor:
srcType = _pgColumnDataType(cursor, table, src)
tgtType = _pgColumnDataType(cursor, table, tgt)
expr = _legacySourceToSysSqlExpr(src, srcType, tgtType)
tq = _quotePgIdent(table)
tr = _quotePgIdent(tgt)
sr = _quotePgIdent(src)
sql = (
f"UPDATE {tq} SET {tr} = {expr} "
f"WHERE {tr} IS NULL AND {sr} IS NOT NULL"
)
cursor.execute(sql)
n = cursor.rowcount
self.connection.commit()
total += n
except Exception as e:
try:
self.connection.rollback()
except Exception:
pass
logger.debug(
f"migrateLegacyUnderscoreSysColumns skip {self.dbDatabase}.{table} "
f"{src}->{tgt}: {e}"
)
except Exception as e:
logger.error(f"migrateLegacyUnderscoreSysColumns failed on {self.dbDatabase}: {e}")
try:
self.connection.rollback()
except Exception:
pass
if total:
logger.info(
f"migrateLegacyUnderscoreSysColumns: {total} cell(s) in {self.dbDatabase}"
)
return total
def _initializeSystemTable(self):
"""Initializes the system table if it doesn't exist yet."""
try:
@ -634,7 +499,6 @@ class DatabaseConnector:
try:
self._ensure_connection()
schemaTouched = False
with self.connection.cursor() as cursor:
# Check if table exists by querying information_schema with case-insensitive search
@ -653,7 +517,6 @@ class DatabaseConnector:
logger.info(
f"Created table '{table}' with columns from Pydantic model"
)
schemaTouched = True
else:
# Table exists: ensure all columns from model are present (simple additive migration)
try:
@ -687,7 +550,6 @@ class DatabaseConnector:
logger.info(
f"Added missing column '{col}' ({sql_type}) to '{table}'"
)
schemaTouched = True
except Exception as add_err:
logger.warning(
f"Could not add column '{col}' to '{table}': {add_err}"
@ -698,23 +560,6 @@ class DatabaseConnector:
)
self.connection.commit()
if schemaTouched:
try:
n = self.migrateLegacyUnderscoreSysColumns()
if n:
logger.info(
"After schema change on %s.%s: legacy -> sys* migration wrote %s cell(s)",
self.dbDatabase,
table,
n,
)
except Exception as mig_err:
logger.error(
"migrateLegacyUnderscoreSysColumns failed after schema change %s.%s: %s",
self.dbDatabase,
table,
mig_err,
)
return True
except Exception as e:
logger.error(f"Error ensuring table {table} exists: {e}")
@ -893,7 +738,7 @@ class DatabaseConnector:
effective_user_id = self.userId
currentTime = getUtcTimestamp()
# Set sysCreatedAt/sysCreatedBy on first persist; always refresh modified fields.
# Treat None and 0 as unset (legacy rows / bad defaults); model_dump often has sysCreatedAt=None.
# Treat None and 0 as unset (empty / bad defaults); model_dump often has sysCreatedAt=None.
createdTs = record.get("sysCreatedAt")
if createdTs is None or createdTs == 0 or createdTs == 0.0:
record["sysCreatedAt"] = currentTime

View file

@ -340,7 +340,10 @@ class Automation2Objects:
for r in runs:
wf = wf_by_id.get(r.get("workflowId"), {})
r["workflowLabel"] = wf.get("label") or r.get("workflowId", "")
runs.sort(key=lambda x: (x.get("_modifiedAt") or x.get("_createdAt") or 0), reverse=True)
runs.sort(
key=lambda x: (x.get("sysModifiedAt") or x.get("sysCreatedAt") or 0),
reverse=True,
)
return runs[:limit]
def getRunsWaitingForEmail(self) -> List[Dict[str, Any]]:

View file

@ -11,9 +11,9 @@ Multi-Tenant Design:
"""
import logging
from typing import Optional, Dict, Tuple
from typing import Optional, Dict
from passlib.context import CryptContext
from modules.connectors.connectorDbPostgre import DatabaseConnector, _get_cached_connector
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelUam import (
Mandate,
@ -38,90 +38,6 @@ pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
# Cache für Role-IDs (roleLabel -> roleId)
_roleIdCache: Dict[str, str] = {}
# PowerOn logical databases to scan (same set as gateway/scripts/script_db_export_migration.py ALL_DATABASES).
_POWERON_DATABASE_NAMES: Tuple[str, ...] = (
"poweron_app",
"poweron_automation",
"poweron_automation2",
"poweron_billing",
"poweron_chat",
"poweron_chatbot",
"poweron_commcoach",
"poweron_knowledge",
"poweron_management",
"poweron_neutralization",
"poweron_realestate",
"poweron_teamsbot",
"poweron_test",
"poweron_trustee",
"poweron_workspace",
)
def _configPrefixForPoweronDatabase(dbName: str) -> str:
return {
"poweron_app": "DB_APP",
"poweron_chat": "DB_CHAT",
"poweron_chatbot": "DB_CHATBOT",
"poweron_management": "DB_MANAGEMENT",
"poweron_realestate": "DB_REALESTATE",
"poweron_trustee": "DB_TRUSTEE",
# Same as initAutomationTemplates: default DB_* (not a separate DB_AUTOMATION_* prefix).
"poweron_automation": "DB",
"poweron_billing": "DB",
}.get(dbName, "DB")
def _openConnectorForPoweronDatabase(dbName: str) -> Optional[DatabaseConnector]:
"""Connect to a named PowerOn database using DB_* / DB_APP_* style config (shared with export script)."""
prefix = _configPrefixForPoweronDatabase(dbName)
host = APP_CONFIG.get(f"{prefix}_HOST") or APP_CONFIG.get("DB_HOST", "localhost")
user = APP_CONFIG.get(f"{prefix}_USER") or APP_CONFIG.get("DB_USER")
password = APP_CONFIG.get(f"{prefix}_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD_SECRET")
portRaw = APP_CONFIG.get(f"{prefix}_PORT") or APP_CONFIG.get("DB_PORT", 5432)
try:
port = int(portRaw)
except (TypeError, ValueError):
port = 5432
if not user or not password:
logger.debug(
f"bootstrap: skip legacy _* -> sys* migration for {dbName} (missing credentials for {prefix})"
)
return None
try:
return _get_cached_connector(
dbHost=host,
dbDatabase=dbName,
dbUser=user,
dbPassword=password,
dbPort=port,
userId=None,
)
except Exception as e:
logger.warning(f"bootstrap: cannot open {dbName} for legacy _* -> sys* migration: {e}")
return None
def migrateLegacyUnderscoreSysColumnsAllPoweronDatabases() -> None:
"""
Run DatabaseConnector.migrateLegacyUnderscoreSysColumns on every configured PowerOn database.
Actual table scan and SQL live in the connector module.
"""
grandTotal = 0
for dbName in _POWERON_DATABASE_NAMES:
conn = _openConnectorForPoweronDatabase(dbName)
if not conn:
continue
try:
grandTotal += conn.migrateLegacyUnderscoreSysColumns()
except Exception as e:
logger.warning(f"bootstrap: migrateLegacyUnderscoreSysColumns failed for {dbName}: {e}")
if grandTotal:
logger.info(
f"bootstrap: legacy _* -> sys* migration total {grandTotal} cell(s) across PowerOn databases"
)
def initBootstrap(db: DatabaseConnector) -> None:
"""
Main bootstrap entry point - initializes all system components.
@ -134,9 +50,6 @@ def initBootstrap(db: DatabaseConnector) -> None:
# Initialize root mandate
mandateId = initRootMandate(db)
# Copy legacy _createdAt/_createdBy/_modifiedAt/_modifiedBy into sys* on all PowerOn DBs (connector routine)
migrateLegacyUnderscoreSysColumnsAllPoweronDatabases()
# Migrate existing mandate records: description -> label
_migrateMandateDescriptionToLabel(db)

View file

@ -99,7 +99,7 @@ try:
except Exception as e:
logger.warning(f"Could not refresh APP_CONFIG: {e}")
# Alle PowerOn Datenbanken (keep in sync with interfaceBootstrap._POWERON_DATABASE_NAMES)
# Alle PowerOn Datenbanken (für Export / Migration-Skripte)
ALL_DATABASES = [
"poweron_app",
"poweron_automation",