fixed sys attributes

This commit is contained in:
ValueOn AG 2026-03-28 21:46:55 +01:00
parent 20d2bf215f
commit 1883f8cd6a
3 changed files with 301 additions and 115 deletions

View file

@ -5,7 +5,7 @@ import re
import psycopg2
import psycopg2.extras
import logging
from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type
from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type, Set, Tuple
import uuid
from pydantic import BaseModel, Field
import threading
@ -158,6 +158,83 @@ def _parseRecordFields(record: Dict[str, Any], fields: Dict[str, str], context:
logger.warning(f"Could not parse JSONB field {fieldName}, keeping as string ({context})")
# Legacy system columns (underscore-prefixed internal names) -> PowerOn sys* columns.
_LEGACY_UNDERSCORE_TO_SYS: Tuple[Tuple[str, str], ...] = (
("_createdAt", "sysCreatedAt"),
("_createdBy", "sysCreatedBy"),
("_modifiedAt", "sysModifiedAt"),
("_modifiedBy", "sysModifiedBy"),
)
def _quotePgIdent(name: str) -> str:
return '"' + str(name).replace('"', '""') + '"'
def _resolveColumnCaseInsensitive(cols: Set[str], logicalName: str) -> Optional[str]:
"""Match information_schema column_name to logical CamelCase (PG folds unquoted legacy names to lowercase)."""
if not logicalName or not cols:
return None
for c in cols:
if c.lower() == logicalName.lower():
return c
return None
def _pgColumnDataType(cursor, tablePg: str, colPg: str) -> Optional[str]:
cursor.execute(
"""
SELECT data_type FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s AND column_name = %s
""",
(tablePg, colPg),
)
row = cursor.fetchone()
return row["data_type"] if row else None
def _legacySourceToSysSqlExpr(srcIdent: str, srcType: Optional[str], tgtType: Optional[str]) -> str:
"""Build RHS for UPDATE sys* = expr from legacy _* column (handles text/timestamp -> double precision)."""
s = _quotePgIdent(srcIdent)
sl = (srcType or "").lower()
tl = (tgtType or "").lower()
if "double" in tl or tl == "real" or tl == "numeric":
if any(x in sl for x in ("double precision", "real", "numeric", "integer", "bigint", "smallint")):
return f"{s}::double precision"
if "timestamp" in sl or sl == "date":
return f"EXTRACT(EPOCH FROM {s}::timestamptz)"
if "text" in sl or "character" in sl or sl == "uuid":
return (
f"CASE WHEN trim({s}::text) ~ '^[+-]?[0-9]+(\\.[0-9]*)?([eE][+-]?[0-9]+)?$' "
f"THEN trim({s}::text)::double precision "
f"ELSE EXTRACT(EPOCH FROM trim({s}::text)::timestamptz) END"
)
return s
return s
def _listPublicBaseTableNames(cursor) -> List[str]:
cursor.execute(
"""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
ORDER BY table_name
"""
)
return [row["table_name"] for row in cursor.fetchall()]
def _listTableColumnNames(cursor, tableName: str) -> Set[str]:
cursor.execute(
"""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
""",
(tableName,),
)
return {row["column_name"] for row in cursor.fetchall()}
# Cache connectors by (host, database, port) to avoid duplicate inits for same database.
# Thread safety: _connector_cache_lock protects cache access. userId is request-scoped via
# contextvars to avoid races when concurrent requests share the same connector.
@ -374,6 +451,63 @@ class DatabaseConnector:
logger.warning(f"Connection lost, reconnecting: {e}")
self._connect()
def migrateLegacyUnderscoreSysColumns(self) -> int:
"""
Scan all public base tables on this connection's database. Where both a legacy
_createdAt / _createdBy / _modifiedAt / _modifiedBy column (any case) and the
matching sys* column exist, copy into sys* rows where sys* IS NULL and legacy IS NOT NULL.
Idempotent; safe to run on every bootstrap.
"""
self._ensure_connection()
total = 0
try:
with self.connection.cursor() as cursor:
tableNames = _listPublicBaseTableNames(cursor)
for table in tableNames:
with self.connection.cursor() as cursor:
cols = _listTableColumnNames(cursor, table)
for legacyLogical, sysLogical in _LEGACY_UNDERSCORE_TO_SYS:
src = _resolveColumnCaseInsensitive(cols, legacyLogical)
tgt = _resolveColumnCaseInsensitive(cols, sysLogical)
if not src or not tgt or src == tgt:
continue
try:
with self.connection.cursor() as cursor:
srcType = _pgColumnDataType(cursor, table, src)
tgtType = _pgColumnDataType(cursor, table, tgt)
expr = _legacySourceToSysSqlExpr(src, srcType, tgtType)
tq = _quotePgIdent(table)
tr = _quotePgIdent(tgt)
sr = _quotePgIdent(src)
sql = (
f"UPDATE {tq} SET {tr} = {expr} "
f"WHERE {tr} IS NULL AND {sr} IS NOT NULL"
)
cursor.execute(sql)
n = cursor.rowcount
self.connection.commit()
total += n
except Exception as e:
try:
self.connection.rollback()
except Exception:
pass
logger.debug(
f"migrateLegacyUnderscoreSysColumns skip {self.dbDatabase}.{table} "
f"{src}->{tgt}: {e}"
)
except Exception as e:
logger.error(f"migrateLegacyUnderscoreSysColumns failed on {self.dbDatabase}: {e}")
try:
self.connection.rollback()
except Exception:
pass
if total:
logger.info(
f"migrateLegacyUnderscoreSysColumns: {total} cell(s) in {self.dbDatabase}"
)
return total
def _initializeSystemTable(self):
"""Initializes the system table if it doesn't exist yet."""
try:
@ -710,6 +844,10 @@ class DatabaseConnector:
logger.error(f"Error loading record {recordId} from table {table}: {e}")
return None
def getRecord(self, model_class: type, recordId: str) -> Optional[Dict[str, Any]]:
"""Load one row by primary key (routes / services; wraps _loadRecord)."""
return self._loadRecord(model_class, str(recordId))
def _saveRecord(
self, model_class: type, recordId: str, record: Dict[str, Any]
) -> bool:
@ -730,8 +868,9 @@ class DatabaseConnector:
effective_user_id = self.userId
currentTime = getUtcTimestamp()
# Set sysCreatedAt/sysCreatedBy on first persist; always refresh modified fields.
# Use falsy check: model_dump() always includes sysCreatedAt key (often None).
if not record.get("sysCreatedAt"):
# Treat None and 0 as unset (legacy rows / bad defaults); model_dump often has sysCreatedAt=None.
createdTs = record.get("sysCreatedAt")
if createdTs is None or createdTs == 0 or createdTs == 0.0:
record["sysCreatedAt"] = currentTime
if effective_user_id:
record["sysCreatedBy"] = effective_user_id
@ -1030,6 +1169,9 @@ class DatabaseConnector:
continue
colType = fields.get(key, "TEXT")
logger.debug(f"_buildPaginationClauses: filter key='{key}' val={val!r} type(val)={type(val).__name__} colType={colType}")
if val is None:
where_parts.append(f'"{key}" IS NULL')
continue
if isinstance(val, dict):
op = val.get("operator", "equals")
v = val.get("value", "")

View file

@ -11,9 +11,9 @@ Multi-Tenant Design:
"""
import logging
from typing import Optional, Dict, Set, Tuple
from typing import Optional, Dict, Tuple
from passlib.context import CryptContext
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.connectors.connectorDbPostgre import DatabaseConnector, _get_cached_connector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelUam import (
Mandate,
@ -38,119 +38,79 @@ pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
# Cache für Role-IDs (roleLabel -> roleId)
_roleIdCache: Dict[str, str] = {}
# Historical PostgreSQL column identifiers (pre-sys*). Used only in _migrateSystemFieldColumns SQL.
_LEGACY_SYS_PAIR_RENAMES: Tuple[Tuple[str, str], ...] = (
("_createdAt", "sysCreatedAt"),
("_createdBy", "sysCreatedBy"),
("_modifiedAt", "sysModifiedAt"),
("_modifiedBy", "sysModifiedBy"),
# PowerOn logical databases to scan (same set as gateway/scripts/script_db_export_migration.py).
_POWERON_DATABASE_NAMES: Tuple[str, ...] = (
"poweron_app",
"poweron_chat",
"poweron_chatbot",
"poweron_management",
"poweron_realestate",
"poweron_trustee",
"poweron_automation",
)
def _getPublicTableColumns(db: DatabaseConnector, tableName: str) -> Set[str]:
"""Column names for a quoted PostgreSQL table (exact case in information_schema)."""
def _configPrefixForPoweronDatabase(dbName: str) -> str:
return {
"poweron_app": "DB_APP",
"poweron_chat": "DB_CHAT",
"poweron_chatbot": "DB_CHATBOT",
"poweron_management": "DB_MANAGEMENT",
"poweron_realestate": "DB_REALESTATE",
"poweron_trustee": "DB_TRUSTEE",
# Same as initAutomationTemplates: default DB_* (not a separate DB_AUTOMATION_* prefix).
"poweron_automation": "DB",
}.get(dbName, "DB")
def _openConnectorForPoweronDatabase(dbName: str) -> Optional[DatabaseConnector]:
"""Connect to a named PowerOn database using DB_* / DB_APP_* style config (shared with export script)."""
prefix = _configPrefixForPoweronDatabase(dbName)
host = APP_CONFIG.get(f"{prefix}_HOST") or APP_CONFIG.get("DB_HOST", "localhost")
user = APP_CONFIG.get(f"{prefix}_USER") or APP_CONFIG.get("DB_USER")
password = APP_CONFIG.get(f"{prefix}_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD_SECRET")
portRaw = APP_CONFIG.get(f"{prefix}_PORT") or APP_CONFIG.get("DB_PORT", 5432)
try:
with db.connection.cursor() as cursor:
cursor.execute(
"""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
""",
(tableName,),
port = int(portRaw)
except (TypeError, ValueError):
port = 5432
if not user or not password:
logger.debug(
f"bootstrap: skip legacy _* -> sys* migration for {dbName} (missing credentials for {prefix})"
)
return None
try:
return _get_cached_connector(
dbHost=host,
dbDatabase=dbName,
dbUser=user,
dbPassword=password,
dbPort=port,
userId=None,
)
return {row["column_name"] for row in cursor.fetchall()}
except Exception as e:
logger.warning(f"_getPublicTableColumns failed for {tableName}: {e}")
return set()
logger.warning(f"bootstrap: cannot open {dbName} for legacy _* -> sys* migration: {e}")
return None
def _migrateSystemFieldColumns(db: DatabaseConnector) -> None:
"""Backfill sys* from older physical columns and business duplicates where sys* IS NULL (idempotent)."""
businessFieldMigrations: Dict[str, Dict[str, str]] = {
"FileFolder": {"createdAt": "sysCreatedAt"},
"FileItem": {"creationDate": "sysCreatedAt"},
"Invitation": {"createdAt": "sysCreatedAt", "createdBy": "sysCreatedBy"},
"FeatureDataSource": {"createdAt": "sysCreatedAt"},
"DataSource": {"createdAt": "sysCreatedAt"},
"UserNotification": {"createdAt": "sysCreatedAt"},
"Token": {"createdAt": "sysCreatedAt"},
"MessagingSubscription": {"createdBy": "sysCreatedBy", "modifiedBy": "sysModifiedBy"},
"CoachingContext": {"createdAt": "sysCreatedAt"},
"CoachingSession": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"},
"CoachingMessage": {"createdAt": "sysCreatedAt"},
"CoachingTask": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"},
"CoachingScore": {"createdAt": "sysCreatedAt"},
"CoachingUserProfile": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"},
"CoachingPersona": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"},
"CoachingBadge": {"createdAt": "sysCreatedAt"},
"TeamsbotSession": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"},
"TeamsbotTranscript": {"creationDate": "sysCreatedAt"},
"TeamsbotBotResponse": {"creationDate": "sysCreatedAt"},
"TeamsbotSystemBot": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"},
"TeamsbotUserAccount": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"},
"TeamsbotUserSettings": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"},
"_system": {
k: v
for k, v in _LEGACY_SYS_PAIR_RENAMES
if k in ("_createdAt", "_modifiedAt")
},
}
try:
db._ensure_connection()
with db.connection.cursor() as cursor:
cursor.execute(
def migrateLegacyUnderscoreSysColumnsAllPoweronDatabases() -> None:
"""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
Run DatabaseConnector.migrateLegacyUnderscoreSysColumns on every configured PowerOn database.
Actual table scan and SQL live in the connector module.
"""
)
tableNames = [row["table_name"] for row in cursor.fetchall()]
totalUpdates = 0
for table in tableNames:
cols = _getPublicTableColumns(db, table)
if not cols:
grandTotal = 0
for dbName in _POWERON_DATABASE_NAMES:
conn = _openConnectorForPoweronDatabase(dbName)
if not conn:
continue
for old_col, new_col in _LEGACY_SYS_PAIR_RENAMES:
if old_col in cols and new_col in cols:
try:
with db.connection.cursor() as cursor:
cursor.execute(
f'UPDATE "{table}" SET "{new_col}" = "{old_col}" '
f'WHERE "{new_col}" IS NULL AND "{old_col}" IS NOT NULL'
grandTotal += conn.migrateLegacyUnderscoreSysColumns()
except Exception as e:
logger.warning(f"bootstrap: migrateLegacyUnderscoreSysColumns failed for {dbName}: {e}")
if grandTotal:
logger.info(
f"bootstrap: legacy _* -> sys* migration total {grandTotal} cell(s) across PowerOn databases"
)
totalUpdates += cursor.rowcount
db.connection.commit()
except Exception as e:
db.connection.rollback()
logger.debug(f"Column migrate skip {table}.{old_col}->{new_col}: {e}")
biz = businessFieldMigrations.get(table)
if biz:
for old_col, new_col in biz.items():
if old_col in cols and new_col in cols:
try:
with db.connection.cursor() as cursor:
cursor.execute(
f'UPDATE "{table}" SET "{new_col}" = "{old_col}" '
f'WHERE "{new_col}" IS NULL AND "{old_col}" IS NOT NULL'
)
totalUpdates += cursor.rowcount
db.connection.commit()
except Exception as e:
db.connection.rollback()
logger.debug(f"Business field migrate skip {table}.{old_col}->{new_col}: {e}")
if totalUpdates:
logger.info(f"_migrateSystemFieldColumns: backfilled {totalUpdates} cell(s) on {db.dbDatabase}")
except Exception as e:
logger.error(f"_migrateSystemFieldColumns failed: {e}")
try:
db.connection.rollback()
except Exception:
pass
def initBootstrap(db: DatabaseConnector) -> None:
@ -165,8 +125,8 @@ def initBootstrap(db: DatabaseConnector) -> None:
# Initialize root mandate
mandateId = initRootMandate(db)
# Backfill sys* columns from legacy _* / duplicate business fields (idempotent)
_migrateSystemFieldColumns(db)
# Copy legacy _createdAt/_createdBy/_modifiedAt/_modifiedBy into sys* on all PowerOn DBs (connector routine)
migrateLegacyUnderscoreSysColumnsAllPoweronDatabases()
# Migrate existing mandate records: description -> label
_migrateMandateDescriptionToLabel(db)

View file

@ -8,10 +8,81 @@ Handles folder ID resolution and folder name lookups.
import logging
import requests
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
# Microsoft Graph well-known folder path segments (always English in the URL; works for any mailbox UI language).
# See https://learn.microsoft.com/en-us/graph/api/resources/mailfolder
_graphWellKnownSegments = frozenset(
{
"inbox",
"drafts",
"sentitems",
"deleteditems",
"junkemail",
"outbox",
"archive",
"clutter",
"conflicts",
"conversationhistory",
"msgfolderroot",
"recoverableitemsdeletions",
"scheduled",
"searchfolders",
"syncissues",
}
)
# Map common user/tool labels (any language) -> Graph well-known segment
_wellKnownAliases: Tuple[Tuple[str, str], ...] = (
("inbox", "inbox"),
("posteingang", "inbox"),
("postfach", "inbox"),
("boîte de réception", "inbox"),
("boite de reception", "inbox"),
("drafts", "drafts"),
("draft", "drafts"),
("entwürfe", "drafts"),
("entwurfe", "drafts"),
("brouillons", "drafts"),
("brouillon", "drafts"),
("sent items", "sentitems"),
("sentitems", "sentitems"),
("gesendete elemente", "sentitems"),
("éléments envoyés", "sentitems"),
("elements envoyes", "sentitems"),
("deleted items", "deleteditems"),
("deleteditems", "deleteditems"),
("gelöschte elemente", "deleteditems"),
("geloschte elemente", "deleteditems"),
("éléments supprimés", "deleteditems"),
("junk email", "junkemail"),
("junkemail", "junkemail"),
("junk-e-mail", "junkemail"),
("junk e-mail", "junkemail"),
("courrier indésirable", "junkemail"),
("outbox", "outbox"),
("postausgang", "outbox"),
("out box", "outbox"),
("archive", "archive"),
("archiv", "archive"),
)
def _wellKnownSegmentForName(folderName: str) -> Optional[str]:
"""Return Graph mailFolder segment if folderName is a known default folder alias."""
if not folderName or not str(folderName).strip():
return None
key = str(folderName).strip().lower()
if key in _graphWellKnownSegments:
return key
for alias, segment in _wellKnownAliases:
if key == alias:
return segment
return None
class FolderManagementHelper:
"""Helper for folder management operations"""
@ -43,7 +114,20 @@ class FolderManagementHelper:
"Content-Type": "application/json"
}
# Get mail folders
# Resolve default folders by Graph well-known name (locale-independent; avoids missing "Inbox" on paginated /mailFolders lists)
wk = _wellKnownSegmentForName(folder_name)
if wk:
wk_url = f"{graph_url}/me/mailFolders/{wk}"
wk_resp = requests.get(wk_url, headers=headers)
if wk_resp.status_code == 200:
wid = wk_resp.json().get("id")
if wid:
return wid
logger.debug(
f"Well-known folder '{wk}' lookup failed ({wk_resp.status_code}); falling back to folder list"
)
# Get mail folders (first page only; subfolders / pagination may omit Inbox)
api_url = f"{graph_url}/me/mailFolders"
response = requests.get(api_url, headers=headers)