From 1883f8cd6a633160617bf6736820d932f1822603 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sat, 28 Mar 2026 21:46:55 +0100
Subject: [PATCH] fixed sys attributes
---
modules/connectors/connectorDbPostgre.py | 148 +++++++++++++-
modules/interfaces/interfaceBootstrap.py | 180 +++++++-----------
.../methodOutlook/helpers/folderManagement.py | 88 ++++++++-
3 files changed, 301 insertions(+), 115 deletions(-)
diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py
index e168467b..bf8fce44 100644
--- a/modules/connectors/connectorDbPostgre.py
+++ b/modules/connectors/connectorDbPostgre.py
@@ -5,7 +5,7 @@ import re
import psycopg2
import psycopg2.extras
import logging
-from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type
+from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type, Set, Tuple
import uuid
from pydantic import BaseModel, Field
import threading
@@ -158,6 +158,83 @@ def _parseRecordFields(record: Dict[str, Any], fields: Dict[str, str], context:
logger.warning(f"Could not parse JSONB field {fieldName}, keeping as string ({context})")
+# Legacy system columns (underscore-prefixed internal names) -> PowerOn sys* columns.
+_LEGACY_UNDERSCORE_TO_SYS: Tuple[Tuple[str, str], ...] = (
+ ("_createdAt", "sysCreatedAt"),
+ ("_createdBy", "sysCreatedBy"),
+ ("_modifiedAt", "sysModifiedAt"),
+ ("_modifiedBy", "sysModifiedBy"),
+)
+
+
+def _quotePgIdent(name: str) -> str:
+ return '"' + str(name).replace('"', '""') + '"'
+
+
+def _resolveColumnCaseInsensitive(cols: Set[str], logicalName: str) -> Optional[str]:
+ """Match information_schema column_name to logical CamelCase (PG folds unquoted legacy names to lowercase)."""
+ if not logicalName or not cols:
+ return None
+ for c in cols:
+ if c.lower() == logicalName.lower():
+ return c
+ return None
+
+
+def _pgColumnDataType(cursor, tablePg: str, colPg: str) -> Optional[str]:
+ cursor.execute(
+ """
+ SELECT data_type FROM information_schema.columns
+ WHERE table_schema = 'public' AND table_name = %s AND column_name = %s
+ """,
+ (tablePg, colPg),
+ )
+ row = cursor.fetchone()
+ return row["data_type"] if row else None
+
+
+def _legacySourceToSysSqlExpr(srcIdent: str, srcType: Optional[str], tgtType: Optional[str]) -> str:
+ """Build RHS for UPDATE sys* = expr from legacy _* column (handles text/timestamp -> double precision)."""
+ s = _quotePgIdent(srcIdent)
+ sl = (srcType or "").lower()
+ tl = (tgtType or "").lower()
+ if "double" in tl or tl == "real" or tl == "numeric":
+ if any(x in sl for x in ("double precision", "real", "numeric", "integer", "bigint", "smallint")):
+ return f"{s}::double precision"
+ if "timestamp" in sl or sl == "date":
+ return f"EXTRACT(EPOCH FROM {s}::timestamptz)"
+ if "text" in sl or "character" in sl or sl == "uuid":
+ return (
+ f"CASE WHEN trim({s}::text) ~ '^[+-]?[0-9]+(\\.[0-9]*)?([eE][+-]?[0-9]+)?$' "
+ f"THEN trim({s}::text)::double precision "
+ f"ELSE EXTRACT(EPOCH FROM trim({s}::text)::timestamptz) END"
+ )
+ return s
+ return s
+
+
+def _listPublicBaseTableNames(cursor) -> List[str]:
+ cursor.execute(
+ """
+ SELECT table_name FROM information_schema.tables
+ WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
+ ORDER BY table_name
+ """
+ )
+ return [row["table_name"] for row in cursor.fetchall()]
+
+
+def _listTableColumnNames(cursor, tableName: str) -> Set[str]:
+ cursor.execute(
+ """
+ SELECT column_name FROM information_schema.columns
+ WHERE table_schema = 'public' AND table_name = %s
+ """,
+ (tableName,),
+ )
+ return {row["column_name"] for row in cursor.fetchall()}
+
+
# Cache connectors by (host, database, port) to avoid duplicate inits for same database.
# Thread safety: _connector_cache_lock protects cache access. userId is request-scoped via
# contextvars to avoid races when concurrent requests share the same connector.
@@ -374,6 +451,63 @@ class DatabaseConnector:
logger.warning(f"Connection lost, reconnecting: {e}")
self._connect()
+ def migrateLegacyUnderscoreSysColumns(self) -> int:
+ """
+ Scan all public base tables on this connection's database. Where both a legacy
+ _createdAt / _createdBy / _modifiedAt / _modifiedBy column (any case) and the
+ matching sys* column exist, copy into sys* rows where sys* IS NULL and legacy IS NOT NULL.
+ Idempotent; safe to run on every bootstrap.
+ """
+ self._ensure_connection()
+ total = 0
+ try:
+ with self.connection.cursor() as cursor:
+ tableNames = _listPublicBaseTableNames(cursor)
+ for table in tableNames:
+ with self.connection.cursor() as cursor:
+ cols = _listTableColumnNames(cursor, table)
+ for legacyLogical, sysLogical in _LEGACY_UNDERSCORE_TO_SYS:
+ src = _resolveColumnCaseInsensitive(cols, legacyLogical)
+ tgt = _resolveColumnCaseInsensitive(cols, sysLogical)
+ if not src or not tgt or src == tgt:
+ continue
+ try:
+ with self.connection.cursor() as cursor:
+ srcType = _pgColumnDataType(cursor, table, src)
+ tgtType = _pgColumnDataType(cursor, table, tgt)
+ expr = _legacySourceToSysSqlExpr(src, srcType, tgtType)
+ tq = _quotePgIdent(table)
+ tr = _quotePgIdent(tgt)
+ sr = _quotePgIdent(src)
+ sql = (
+ f"UPDATE {tq} SET {tr} = {expr} "
+ f"WHERE {tr} IS NULL AND {sr} IS NOT NULL"
+ )
+ cursor.execute(sql)
+ n = cursor.rowcount
+ self.connection.commit()
+ total += n
+ except Exception as e:
+ try:
+ self.connection.rollback()
+ except Exception:
+ pass
+ logger.debug(
+ f"migrateLegacyUnderscoreSysColumns skip {self.dbDatabase}.{table} "
+ f"{src}->{tgt}: {e}"
+ )
+ except Exception as e:
+ logger.error(f"migrateLegacyUnderscoreSysColumns failed on {self.dbDatabase}: {e}")
+ try:
+ self.connection.rollback()
+ except Exception:
+ pass
+ if total:
+ logger.info(
+ f"migrateLegacyUnderscoreSysColumns: {total} cell(s) in {self.dbDatabase}"
+ )
+ return total
+
def _initializeSystemTable(self):
"""Initializes the system table if it doesn't exist yet."""
try:
@@ -710,6 +844,10 @@ class DatabaseConnector:
logger.error(f"Error loading record {recordId} from table {table}: {e}")
return None
+ def getRecord(self, model_class: type, recordId: str) -> Optional[Dict[str, Any]]:
+ """Load one row by primary key (routes / services; wraps _loadRecord)."""
+ return self._loadRecord(model_class, str(recordId))
+
def _saveRecord(
self, model_class: type, recordId: str, record: Dict[str, Any]
) -> bool:
@@ -730,8 +868,9 @@ class DatabaseConnector:
effective_user_id = self.userId
currentTime = getUtcTimestamp()
# Set sysCreatedAt/sysCreatedBy on first persist; always refresh modified fields.
- # Use falsy check: model_dump() always includes sysCreatedAt key (often None).
- if not record.get("sysCreatedAt"):
+ # Treat None and 0 as unset (legacy rows / bad defaults); model_dump often has sysCreatedAt=None.
+ createdTs = record.get("sysCreatedAt")
+ if createdTs is None or createdTs == 0 or createdTs == 0.0:
record["sysCreatedAt"] = currentTime
if effective_user_id:
record["sysCreatedBy"] = effective_user_id
@@ -1030,6 +1169,9 @@ class DatabaseConnector:
continue
colType = fields.get(key, "TEXT")
logger.debug(f"_buildPaginationClauses: filter key='{key}' val={val!r} type(val)={type(val).__name__} colType={colType}")
+ if val is None:
+ where_parts.append(f'"{key}" IS NULL')
+ continue
if isinstance(val, dict):
op = val.get("operator", "equals")
v = val.get("value", "")
diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py
index 0fb48ffe..98b70466 100644
--- a/modules/interfaces/interfaceBootstrap.py
+++ b/modules/interfaces/interfaceBootstrap.py
@@ -11,9 +11,9 @@ Multi-Tenant Design:
"""
import logging
-from typing import Optional, Dict, Set, Tuple
+from typing import Optional, Dict, Tuple
from passlib.context import CryptContext
-from modules.connectors.connectorDbPostgre import DatabaseConnector
+from modules.connectors.connectorDbPostgre import DatabaseConnector, _get_cached_connector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelUam import (
Mandate,
@@ -38,119 +38,79 @@ pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
# Cache für Role-IDs (roleLabel -> roleId)
_roleIdCache: Dict[str, str] = {}
-# Historical PostgreSQL column identifiers (pre-sys*). Used only in _migrateSystemFieldColumns SQL.
-_LEGACY_SYS_PAIR_RENAMES: Tuple[Tuple[str, str], ...] = (
- ("_createdAt", "sysCreatedAt"),
- ("_createdBy", "sysCreatedBy"),
- ("_modifiedAt", "sysModifiedAt"),
- ("_modifiedBy", "sysModifiedBy"),
+# PowerOn logical databases to scan (same set as gateway/scripts/script_db_export_migration.py).
+_POWERON_DATABASE_NAMES: Tuple[str, ...] = (
+ "poweron_app",
+ "poweron_chat",
+ "poweron_chatbot",
+ "poweron_management",
+ "poweron_realestate",
+ "poweron_trustee",
+ "poweron_automation",
)
-def _getPublicTableColumns(db: DatabaseConnector, tableName: str) -> Set[str]:
- """Column names for a quoted PostgreSQL table (exact case in information_schema)."""
+def _configPrefixForPoweronDatabase(dbName: str) -> str:
+ return {
+ "poweron_app": "DB_APP",
+ "poweron_chat": "DB_CHAT",
+ "poweron_chatbot": "DB_CHATBOT",
+ "poweron_management": "DB_MANAGEMENT",
+ "poweron_realestate": "DB_REALESTATE",
+ "poweron_trustee": "DB_TRUSTEE",
+ # Same as initAutomationTemplates: default DB_* (not a separate DB_AUTOMATION_* prefix).
+ "poweron_automation": "DB",
+ }.get(dbName, "DB")
+
+
+def _openConnectorForPoweronDatabase(dbName: str) -> Optional[DatabaseConnector]:
+ """Connect to a named PowerOn database using DB_* / DB_APP_* style config (shared with export script)."""
+ prefix = _configPrefixForPoweronDatabase(dbName)
+ host = APP_CONFIG.get(f"{prefix}_HOST") or APP_CONFIG.get("DB_HOST", "localhost")
+ user = APP_CONFIG.get(f"{prefix}_USER") or APP_CONFIG.get("DB_USER")
+ password = APP_CONFIG.get(f"{prefix}_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD_SECRET")
+ portRaw = APP_CONFIG.get(f"{prefix}_PORT") or APP_CONFIG.get("DB_PORT", 5432)
try:
- with db.connection.cursor() as cursor:
- cursor.execute(
- """
- SELECT column_name FROM information_schema.columns
- WHERE table_schema = 'public' AND table_name = %s
- """,
- (tableName,),
- )
- return {row["column_name"] for row in cursor.fetchall()}
- except Exception as e:
- logger.warning(f"_getPublicTableColumns failed for {tableName}: {e}")
- return set()
-
-
-def _migrateSystemFieldColumns(db: DatabaseConnector) -> None:
- """Backfill sys* from older physical columns and business duplicates where sys* IS NULL (idempotent)."""
- businessFieldMigrations: Dict[str, Dict[str, str]] = {
- "FileFolder": {"createdAt": "sysCreatedAt"},
- "FileItem": {"creationDate": "sysCreatedAt"},
- "Invitation": {"createdAt": "sysCreatedAt", "createdBy": "sysCreatedBy"},
- "FeatureDataSource": {"createdAt": "sysCreatedAt"},
- "DataSource": {"createdAt": "sysCreatedAt"},
- "UserNotification": {"createdAt": "sysCreatedAt"},
- "Token": {"createdAt": "sysCreatedAt"},
- "MessagingSubscription": {"createdBy": "sysCreatedBy", "modifiedBy": "sysModifiedBy"},
- "CoachingContext": {"createdAt": "sysCreatedAt"},
- "CoachingSession": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"},
- "CoachingMessage": {"createdAt": "sysCreatedAt"},
- "CoachingTask": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"},
- "CoachingScore": {"createdAt": "sysCreatedAt"},
- "CoachingUserProfile": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"},
- "CoachingPersona": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"},
- "CoachingBadge": {"createdAt": "sysCreatedAt"},
- "TeamsbotSession": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"},
- "TeamsbotTranscript": {"creationDate": "sysCreatedAt"},
- "TeamsbotBotResponse": {"creationDate": "sysCreatedAt"},
- "TeamsbotSystemBot": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"},
- "TeamsbotUserAccount": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"},
- "TeamsbotUserSettings": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"},
- "_system": {
- k: v
- for k, v in _LEGACY_SYS_PAIR_RENAMES
- if k in ("_createdAt", "_modifiedAt")
- },
- }
-
+ port = int(portRaw)
+ except (TypeError, ValueError):
+ port = 5432
+ if not user or not password:
+ logger.debug(
+ f"bootstrap: skip legacy _* -> sys* migration for {dbName} (missing credentials for {prefix})"
+ )
+ return None
try:
- db._ensure_connection()
- with db.connection.cursor() as cursor:
- cursor.execute(
- """
- SELECT table_name FROM information_schema.tables
- WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
- """
- )
- tableNames = [row["table_name"] for row in cursor.fetchall()]
-
- totalUpdates = 0
- for table in tableNames:
- cols = _getPublicTableColumns(db, table)
- if not cols:
- continue
-
- for old_col, new_col in _LEGACY_SYS_PAIR_RENAMES:
- if old_col in cols and new_col in cols:
- try:
- with db.connection.cursor() as cursor:
- cursor.execute(
- f'UPDATE "{table}" SET "{new_col}" = "{old_col}" '
- f'WHERE "{new_col}" IS NULL AND "{old_col}" IS NOT NULL'
- )
- totalUpdates += cursor.rowcount
- db.connection.commit()
- except Exception as e:
- db.connection.rollback()
- logger.debug(f"Column migrate skip {table}.{old_col}->{new_col}: {e}")
-
- biz = businessFieldMigrations.get(table)
- if biz:
- for old_col, new_col in biz.items():
- if old_col in cols and new_col in cols:
- try:
- with db.connection.cursor() as cursor:
- cursor.execute(
- f'UPDATE "{table}" SET "{new_col}" = "{old_col}" '
- f'WHERE "{new_col}" IS NULL AND "{old_col}" IS NOT NULL'
- )
- totalUpdates += cursor.rowcount
- db.connection.commit()
- except Exception as e:
- db.connection.rollback()
- logger.debug(f"Business field migrate skip {table}.{old_col}->{new_col}: {e}")
-
- if totalUpdates:
- logger.info(f"_migrateSystemFieldColumns: backfilled {totalUpdates} cell(s) on {db.dbDatabase}")
+ return _get_cached_connector(
+ dbHost=host,
+ dbDatabase=dbName,
+ dbUser=user,
+ dbPassword=password,
+ dbPort=port,
+ userId=None,
+ )
except Exception as e:
- logger.error(f"_migrateSystemFieldColumns failed: {e}")
+ logger.warning(f"bootstrap: cannot open {dbName} for legacy _* -> sys* migration: {e}")
+ return None
+
+
+def migrateLegacyUnderscoreSysColumnsAllPoweronDatabases() -> None:
+ """
+ Run DatabaseConnector.migrateLegacyUnderscoreSysColumns on every configured PowerOn database.
+ Actual table scan and SQL live in the connector module.
+ """
+ grandTotal = 0
+ for dbName in _POWERON_DATABASE_NAMES:
+ conn = _openConnectorForPoweronDatabase(dbName)
+ if not conn:
+ continue
try:
- db.connection.rollback()
- except Exception:
- pass
+ grandTotal += conn.migrateLegacyUnderscoreSysColumns()
+ except Exception as e:
+ logger.warning(f"bootstrap: migrateLegacyUnderscoreSysColumns failed for {dbName}: {e}")
+ if grandTotal:
+ logger.info(
+ f"bootstrap: legacy _* -> sys* migration total {grandTotal} cell(s) across PowerOn databases"
+ )
def initBootstrap(db: DatabaseConnector) -> None:
@@ -165,8 +125,8 @@ def initBootstrap(db: DatabaseConnector) -> None:
# Initialize root mandate
mandateId = initRootMandate(db)
- # Backfill sys* columns from legacy _* / duplicate business fields (idempotent)
- _migrateSystemFieldColumns(db)
+ # Copy legacy _createdAt/_createdBy/_modifiedAt/_modifiedBy into sys* on all PowerOn DBs (connector routine)
+ migrateLegacyUnderscoreSysColumnsAllPoweronDatabases()
# Migrate existing mandate records: description -> label
_migrateMandateDescriptionToLabel(db)
diff --git a/modules/workflows/methods/methodOutlook/helpers/folderManagement.py b/modules/workflows/methods/methodOutlook/helpers/folderManagement.py
index 47309a8b..2bbb8195 100644
--- a/modules/workflows/methods/methodOutlook/helpers/folderManagement.py
+++ b/modules/workflows/methods/methodOutlook/helpers/folderManagement.py
@@ -8,10 +8,81 @@ Handles folder ID resolution and folder name lookups.
import logging
import requests
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
+# Microsoft Graph well-known folder path segments (always English in the URL; works for any mailbox UI language).
+# See https://learn.microsoft.com/en-us/graph/api/resources/mailfolder
+_graphWellKnownSegments = frozenset(
+ {
+ "inbox",
+ "drafts",
+ "sentitems",
+ "deleteditems",
+ "junkemail",
+ "outbox",
+ "archive",
+ "clutter",
+ "conflicts",
+ "conversationhistory",
+ "msgfolderroot",
+ "recoverableitemsdeletions",
+ "scheduled",
+ "searchfolders",
+ "syncissues",
+ }
+)
+
+# Map common user/tool labels (any language) -> Graph well-known segment
+_wellKnownAliases: Tuple[Tuple[str, str], ...] = (
+ ("inbox", "inbox"),
+ ("posteingang", "inbox"),
+ ("postfach", "inbox"),
+ ("boîte de réception", "inbox"),
+ ("boite de reception", "inbox"),
+ ("drafts", "drafts"),
+ ("draft", "drafts"),
+ ("entwürfe", "drafts"),
+ ("entwurfe", "drafts"),
+ ("brouillons", "drafts"),
+ ("brouillon", "drafts"),
+ ("sent items", "sentitems"),
+ ("sentitems", "sentitems"),
+ ("gesendete elemente", "sentitems"),
+ ("éléments envoyés", "sentitems"),
+ ("elements envoyes", "sentitems"),
+ ("deleted items", "deleteditems"),
+ ("deleteditems", "deleteditems"),
+ ("gelöschte elemente", "deleteditems"),
+ ("geloschte elemente", "deleteditems"),
+ ("éléments supprimés", "deleteditems"),
+ ("junk email", "junkemail"),
+ ("junkemail", "junkemail"),
+ ("junk-e-mail", "junkemail"),
+ ("junk e-mail", "junkemail"),
+ ("courrier indésirable", "junkemail"),
+ ("outbox", "outbox"),
+ ("postausgang", "outbox"),
+ ("out box", "outbox"),
+ ("archive", "archive"),
+ ("archiv", "archive"),
+)
+
+
+def _wellKnownSegmentForName(folderName: str) -> Optional[str]:
+ """Return Graph mailFolder segment if folderName is a known default folder alias."""
+ if not folderName or not str(folderName).strip():
+ return None
+ key = str(folderName).strip().lower()
+ if key in _graphWellKnownSegments:
+ return key
+ for alias, segment in _wellKnownAliases:
+ if key == alias:
+ return segment
+ return None
+
+
class FolderManagementHelper:
"""Helper for folder management operations"""
@@ -42,8 +113,21 @@ class FolderManagementHelper:
"Authorization": f"Bearer {connection['accessToken']}",
"Content-Type": "application/json"
}
+
+ # Resolve default folders by Graph well-known name (locale-independent; avoids missing "Inbox" on paginated /mailFolders lists)
+ wk = _wellKnownSegmentForName(folder_name)
+ if wk:
+ wk_url = f"{graph_url}/me/mailFolders/{wk}"
+ wk_resp = requests.get(wk_url, headers=headers)
+ if wk_resp.status_code == 200:
+ wid = wk_resp.json().get("id")
+ if wid:
+ return wid
+ logger.debug(
+ f"Well-known folder '{wk}' lookup failed ({wk_resp.status_code}); falling back to folder list"
+ )
- # Get mail folders
+ # Get mail folders (first page only; subfolders / pagination may omit Inbox)
api_url = f"{graph_url}/me/mailFolders"
response = requests.get(api_url, headers=headers)