diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py index e168467b..bf8fce44 100644 --- a/modules/connectors/connectorDbPostgre.py +++ b/modules/connectors/connectorDbPostgre.py @@ -5,7 +5,7 @@ import re import psycopg2 import psycopg2.extras import logging -from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type +from typing import List, Dict, Any, Optional, Union, get_origin, get_args, Type, Set, Tuple import uuid from pydantic import BaseModel, Field import threading @@ -158,6 +158,83 @@ def _parseRecordFields(record: Dict[str, Any], fields: Dict[str, str], context: logger.warning(f"Could not parse JSONB field {fieldName}, keeping as string ({context})") +# Legacy system columns (underscore-prefixed internal names) -> PowerOn sys* columns. +_LEGACY_UNDERSCORE_TO_SYS: Tuple[Tuple[str, str], ...] = ( + ("_createdAt", "sysCreatedAt"), + ("_createdBy", "sysCreatedBy"), + ("_modifiedAt", "sysModifiedAt"), + ("_modifiedBy", "sysModifiedBy"), +) + + +def _quotePgIdent(name: str) -> str: + return '"' + str(name).replace('"', '""') + '"' + + +def _resolveColumnCaseInsensitive(cols: Set[str], logicalName: str) -> Optional[str]: + """Match information_schema column_name to logical CamelCase (PG folds unquoted legacy names to lowercase).""" + if not logicalName or not cols: + return None + for c in cols: + if c.lower() == logicalName.lower(): + return c + return None + + +def _pgColumnDataType(cursor, tablePg: str, colPg: str) -> Optional[str]: + cursor.execute( + """ + SELECT data_type FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = %s AND column_name = %s + """, + (tablePg, colPg), + ) + row = cursor.fetchone() + return row["data_type"] if row else None + + +def _legacySourceToSysSqlExpr(srcIdent: str, srcType: Optional[str], tgtType: Optional[str]) -> str: + """Build RHS for UPDATE sys* = expr from legacy _* column (handles text/timestamp -> double precision).""" + s = _quotePgIdent(srcIdent) + sl = (srcType or "").lower() + tl = (tgtType or "").lower() + if "double" in tl or tl == "real" or tl == "numeric": + if any(x in sl for x in ("double precision", "real", "numeric", "integer", "bigint", "smallint")): + return f"{s}::double precision" + if "timestamp" in sl or sl == "date": + return f"EXTRACT(EPOCH FROM {s}::timestamptz)" + if "text" in sl or "character" in sl or sl == "uuid": + return ( + f"CASE WHEN trim({s}::text) ~ '^[+-]?[0-9]+(\\.[0-9]*)?([eE][+-]?[0-9]+)?$' " + f"THEN trim({s}::text)::double precision " + f"ELSE EXTRACT(EPOCH FROM trim({s}::text)::timestamptz) END" + ) + return s + return s + + +def _listPublicBaseTableNames(cursor) -> List[str]: + cursor.execute( + """ + SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' AND table_type = 'BASE TABLE' + ORDER BY table_name + """ + ) + return [row["table_name"] for row in cursor.fetchall()] + + +def _listTableColumnNames(cursor, tableName: str) -> Set[str]: + cursor.execute( + """ + SELECT column_name FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = %s + """, + (tableName,), + ) + return {row["column_name"] for row in cursor.fetchall()} + + # Cache connectors by (host, database, port) to avoid duplicate inits for same database. # Thread safety: _connector_cache_lock protects cache access. userId is request-scoped via # contextvars to avoid races when concurrent requests share the same connector. @@ -374,6 +451,63 @@ class DatabaseConnector: logger.warning(f"Connection lost, reconnecting: {e}") self._connect() + def migrateLegacyUnderscoreSysColumns(self) -> int: + """ + Scan all public base tables on this connection's database. Where both a legacy + _createdAt / _createdBy / _modifiedAt / _modifiedBy column (any case) and the + matching sys* column exist, copy into sys* rows where sys* IS NULL and legacy IS NOT NULL. + Idempotent; safe to run on every bootstrap. + """ + self._ensure_connection() + total = 0 + try: + with self.connection.cursor() as cursor: + tableNames = _listPublicBaseTableNames(cursor) + for table in tableNames: + with self.connection.cursor() as cursor: + cols = _listTableColumnNames(cursor, table) + for legacyLogical, sysLogical in _LEGACY_UNDERSCORE_TO_SYS: + src = _resolveColumnCaseInsensitive(cols, legacyLogical) + tgt = _resolveColumnCaseInsensitive(cols, sysLogical) + if not src or not tgt or src == tgt: + continue + try: + with self.connection.cursor() as cursor: + srcType = _pgColumnDataType(cursor, table, src) + tgtType = _pgColumnDataType(cursor, table, tgt) + expr = _legacySourceToSysSqlExpr(src, srcType, tgtType) + tq = _quotePgIdent(table) + tr = _quotePgIdent(tgt) + sr = _quotePgIdent(src) + sql = ( + f"UPDATE {tq} SET {tr} = {expr} " + f"WHERE {tr} IS NULL AND {sr} IS NOT NULL" + ) + cursor.execute(sql) + n = cursor.rowcount + self.connection.commit() + total += n + except Exception as e: + try: + self.connection.rollback() + except Exception: + pass + logger.debug( + f"migrateLegacyUnderscoreSysColumns skip {self.dbDatabase}.{table} " + f"{src}->{tgt}: {e}" + ) + except Exception as e: + logger.error(f"migrateLegacyUnderscoreSysColumns failed on {self.dbDatabase}: {e}") + try: + self.connection.rollback() + except Exception: + pass + if total: + logger.info( + f"migrateLegacyUnderscoreSysColumns: {total} cell(s) in {self.dbDatabase}" + ) + return total + def _initializeSystemTable(self): """Initializes the system table if it doesn't exist yet.""" try: @@ -710,6 +844,10 @@ class DatabaseConnector: logger.error(f"Error loading record {recordId} from table {table}: {e}") return None + def getRecord(self, model_class: type, recordId: str) -> Optional[Dict[str, Any]]: + """Load one row by primary key (routes / services; wraps _loadRecord).""" + return self._loadRecord(model_class, str(recordId)) + def _saveRecord( self, model_class: type, recordId: str, record: Dict[str, Any] ) -> bool: @@ -730,8 +868,9 @@ class DatabaseConnector: effective_user_id = self.userId currentTime = getUtcTimestamp() # Set sysCreatedAt/sysCreatedBy on first persist; always refresh modified fields. - # Use falsy check: model_dump() always includes sysCreatedAt key (often None). - if not record.get("sysCreatedAt"): + # Treat None and 0 as unset (legacy rows / bad defaults); model_dump often has sysCreatedAt=None. + createdTs = record.get("sysCreatedAt") + if createdTs is None or createdTs == 0 or createdTs == 0.0: record["sysCreatedAt"] = currentTime if effective_user_id: record["sysCreatedBy"] = effective_user_id @@ -1030,6 +1169,9 @@ class DatabaseConnector: continue colType = fields.get(key, "TEXT") logger.debug(f"_buildPaginationClauses: filter key='{key}' val={val!r} type(val)={type(val).__name__} colType={colType}") + if val is None: + where_parts.append(f'"{key}" IS NULL') + continue if isinstance(val, dict): op = val.get("operator", "equals") v = val.get("value", "") diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index 0fb48ffe..98b70466 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -11,9 +11,9 @@ Multi-Tenant Design: """ import logging -from typing import Optional, Dict, Set, Tuple +from typing import Optional, Dict, Tuple from passlib.context import CryptContext -from modules.connectors.connectorDbPostgre import DatabaseConnector +from modules.connectors.connectorDbPostgre import DatabaseConnector, _get_cached_connector from modules.shared.configuration import APP_CONFIG from modules.datamodels.datamodelUam import ( Mandate, @@ -38,119 +38,79 @@ pwdContext = CryptContext(schemes=["argon2"], deprecated="auto") # Cache für Role-IDs (roleLabel -> roleId) _roleIdCache: Dict[str, str] = {} -# Historical PostgreSQL column identifiers (pre-sys*). Used only in _migrateSystemFieldColumns SQL. -_LEGACY_SYS_PAIR_RENAMES: Tuple[Tuple[str, str], ...] = ( - ("_createdAt", "sysCreatedAt"), - ("_createdBy", "sysCreatedBy"), - ("_modifiedAt", "sysModifiedAt"), - ("_modifiedBy", "sysModifiedBy"), +# PowerOn logical databases to scan (same set as gateway/scripts/script_db_export_migration.py). +_POWERON_DATABASE_NAMES: Tuple[str, ...] = ( + "poweron_app", + "poweron_chat", + "poweron_chatbot", + "poweron_management", + "poweron_realestate", + "poweron_trustee", + "poweron_automation", ) -def _getPublicTableColumns(db: DatabaseConnector, tableName: str) -> Set[str]: - """Column names for a quoted PostgreSQL table (exact case in information_schema).""" +def _configPrefixForPoweronDatabase(dbName: str) -> str: + return { + "poweron_app": "DB_APP", + "poweron_chat": "DB_CHAT", + "poweron_chatbot": "DB_CHATBOT", + "poweron_management": "DB_MANAGEMENT", + "poweron_realestate": "DB_REALESTATE", + "poweron_trustee": "DB_TRUSTEE", + # Same as initAutomationTemplates: default DB_* (not a separate DB_AUTOMATION_* prefix). + "poweron_automation": "DB", + }.get(dbName, "DB") + + +def _openConnectorForPoweronDatabase(dbName: str) -> Optional[DatabaseConnector]: + """Connect to a named PowerOn database using DB_* / DB_APP_* style config (shared with export script).""" + prefix = _configPrefixForPoweronDatabase(dbName) + host = APP_CONFIG.get(f"{prefix}_HOST") or APP_CONFIG.get("DB_HOST", "localhost") + user = APP_CONFIG.get(f"{prefix}_USER") or APP_CONFIG.get("DB_USER") + password = APP_CONFIG.get(f"{prefix}_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD_SECRET") + portRaw = APP_CONFIG.get(f"{prefix}_PORT") or APP_CONFIG.get("DB_PORT", 5432) try: - with db.connection.cursor() as cursor: - cursor.execute( - """ - SELECT column_name FROM information_schema.columns - WHERE table_schema = 'public' AND table_name = %s - """, - (tableName,), - ) - return {row["column_name"] for row in cursor.fetchall()} - except Exception as e: - logger.warning(f"_getPublicTableColumns failed for {tableName}: {e}") - return set() - - -def _migrateSystemFieldColumns(db: DatabaseConnector) -> None: - """Backfill sys* from older physical columns and business duplicates where sys* IS NULL (idempotent).""" - businessFieldMigrations: Dict[str, Dict[str, str]] = { - "FileFolder": {"createdAt": "sysCreatedAt"}, - "FileItem": {"creationDate": "sysCreatedAt"}, - "Invitation": {"createdAt": "sysCreatedAt", "createdBy": "sysCreatedBy"}, - "FeatureDataSource": {"createdAt": "sysCreatedAt"}, - "DataSource": {"createdAt": "sysCreatedAt"}, - "UserNotification": {"createdAt": "sysCreatedAt"}, - "Token": {"createdAt": "sysCreatedAt"}, - "MessagingSubscription": {"createdBy": "sysCreatedBy", "modifiedBy": "sysModifiedBy"}, - "CoachingContext": {"createdAt": "sysCreatedAt"}, - "CoachingSession": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"}, - "CoachingMessage": {"createdAt": "sysCreatedAt"}, - "CoachingTask": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"}, - "CoachingScore": {"createdAt": "sysCreatedAt"}, - "CoachingUserProfile": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"}, - "CoachingPersona": {"createdAt": "sysCreatedAt", "updatedAt": "sysModifiedAt"}, - "CoachingBadge": {"createdAt": "sysCreatedAt"}, - "TeamsbotSession": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"}, - "TeamsbotTranscript": {"creationDate": "sysCreatedAt"}, - "TeamsbotBotResponse": {"creationDate": "sysCreatedAt"}, - "TeamsbotSystemBot": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"}, - "TeamsbotUserAccount": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"}, - "TeamsbotUserSettings": {"creationDate": "sysCreatedAt", "lastModified": "sysModifiedAt"}, - "_system": { - k: v - for k, v in _LEGACY_SYS_PAIR_RENAMES - if k in ("_createdAt", "_modifiedAt") - }, - } - + port = int(portRaw) + except (TypeError, ValueError): + port = 5432 + if not user or not password: + logger.debug( + f"bootstrap: skip legacy _* -> sys* migration for {dbName} (missing credentials for {prefix})" + ) + return None try: - db._ensure_connection() - with db.connection.cursor() as cursor: - cursor.execute( - """ - SELECT table_name FROM information_schema.tables - WHERE table_schema = 'public' AND table_type = 'BASE TABLE' - """ - ) - tableNames = [row["table_name"] for row in cursor.fetchall()] - - totalUpdates = 0 - for table in tableNames: - cols = _getPublicTableColumns(db, table) - if not cols: - continue - - for old_col, new_col in _LEGACY_SYS_PAIR_RENAMES: - if old_col in cols and new_col in cols: - try: - with db.connection.cursor() as cursor: - cursor.execute( - f'UPDATE "{table}" SET "{new_col}" = "{old_col}" ' - f'WHERE "{new_col}" IS NULL AND "{old_col}" IS NOT NULL' - ) - totalUpdates += cursor.rowcount - db.connection.commit() - except Exception as e: - db.connection.rollback() - logger.debug(f"Column migrate skip {table}.{old_col}->{new_col}: {e}") - - biz = businessFieldMigrations.get(table) - if biz: - for old_col, new_col in biz.items(): - if old_col in cols and new_col in cols: - try: - with db.connection.cursor() as cursor: - cursor.execute( - f'UPDATE "{table}" SET "{new_col}" = "{old_col}" ' - f'WHERE "{new_col}" IS NULL AND "{old_col}" IS NOT NULL' - ) - totalUpdates += cursor.rowcount - db.connection.commit() - except Exception as e: - db.connection.rollback() - logger.debug(f"Business field migrate skip {table}.{old_col}->{new_col}: {e}") - - if totalUpdates: - logger.info(f"_migrateSystemFieldColumns: backfilled {totalUpdates} cell(s) on {db.dbDatabase}") + return _get_cached_connector( + dbHost=host, + dbDatabase=dbName, + dbUser=user, + dbPassword=password, + dbPort=port, + userId=None, + ) except Exception as e: - logger.error(f"_migrateSystemFieldColumns failed: {e}") + logger.warning(f"bootstrap: cannot open {dbName} for legacy _* -> sys* migration: {e}") + return None + + +def migrateLegacyUnderscoreSysColumnsAllPoweronDatabases() -> None: + """ + Run DatabaseConnector.migrateLegacyUnderscoreSysColumns on every configured PowerOn database. + Actual table scan and SQL live in the connector module. + """ + grandTotal = 0 + for dbName in _POWERON_DATABASE_NAMES: + conn = _openConnectorForPoweronDatabase(dbName) + if not conn: + continue try: - db.connection.rollback() - except Exception: - pass + grandTotal += conn.migrateLegacyUnderscoreSysColumns() + except Exception as e: + logger.warning(f"bootstrap: migrateLegacyUnderscoreSysColumns failed for {dbName}: {e}") + if grandTotal: + logger.info( + f"bootstrap: legacy _* -> sys* migration total {grandTotal} cell(s) across PowerOn databases" + ) def initBootstrap(db: DatabaseConnector) -> None: @@ -165,8 +125,8 @@ def initBootstrap(db: DatabaseConnector) -> None: # Initialize root mandate mandateId = initRootMandate(db) - # Backfill sys* columns from legacy _* / duplicate business fields (idempotent) - _migrateSystemFieldColumns(db) + # Copy legacy _createdAt/_createdBy/_modifiedAt/_modifiedBy into sys* on all PowerOn DBs (connector routine) + migrateLegacyUnderscoreSysColumnsAllPoweronDatabases() # Migrate existing mandate records: description -> label _migrateMandateDescriptionToLabel(db) diff --git a/modules/workflows/methods/methodOutlook/helpers/folderManagement.py b/modules/workflows/methods/methodOutlook/helpers/folderManagement.py index 47309a8b..2bbb8195 100644 --- a/modules/workflows/methods/methodOutlook/helpers/folderManagement.py +++ b/modules/workflows/methods/methodOutlook/helpers/folderManagement.py @@ -8,10 +8,81 @@ Handles folder ID resolution and folder name lookups. import logging import requests -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, Tuple logger = logging.getLogger(__name__) +# Microsoft Graph well-known folder path segments (always English in the URL; works for any mailbox UI language). +# See https://learn.microsoft.com/en-us/graph/api/resources/mailfolder +_graphWellKnownSegments = frozenset( + { + "inbox", + "drafts", + "sentitems", + "deleteditems", + "junkemail", + "outbox", + "archive", + "clutter", + "conflicts", + "conversationhistory", + "msgfolderroot", + "recoverableitemsdeletions", + "scheduled", + "searchfolders", + "syncissues", + } +) + +# Map common user/tool labels (any language) -> Graph well-known segment +_wellKnownAliases: Tuple[Tuple[str, str], ...] = ( + ("inbox", "inbox"), + ("posteingang", "inbox"), + ("postfach", "inbox"), + ("boîte de réception", "inbox"), + ("boite de reception", "inbox"), + ("drafts", "drafts"), + ("draft", "drafts"), + ("entwürfe", "drafts"), + ("entwurfe", "drafts"), + ("brouillons", "drafts"), + ("brouillon", "drafts"), + ("sent items", "sentitems"), + ("sentitems", "sentitems"), + ("gesendete elemente", "sentitems"), + ("éléments envoyés", "sentitems"), + ("elements envoyes", "sentitems"), + ("deleted items", "deleteditems"), + ("deleteditems", "deleteditems"), + ("gelöschte elemente", "deleteditems"), + ("geloschte elemente", "deleteditems"), + ("éléments supprimés", "deleteditems"), + ("junk email", "junkemail"), + ("junkemail", "junkemail"), + ("junk-e-mail", "junkemail"), + ("junk e-mail", "junkemail"), + ("courrier indésirable", "junkemail"), + ("outbox", "outbox"), + ("postausgang", "outbox"), + ("out box", "outbox"), + ("archive", "archive"), + ("archiv", "archive"), +) + + +def _wellKnownSegmentForName(folderName: str) -> Optional[str]: + """Return Graph mailFolder segment if folderName is a known default folder alias.""" + if not folderName or not str(folderName).strip(): + return None + key = str(folderName).strip().lower() + if key in _graphWellKnownSegments: + return key + for alias, segment in _wellKnownAliases: + if key == alias: + return segment + return None + + class FolderManagementHelper: """Helper for folder management operations""" @@ -42,8 +113,21 @@ class FolderManagementHelper: "Authorization": f"Bearer {connection['accessToken']}", "Content-Type": "application/json" } + + # Resolve default folders by Graph well-known name (locale-independent; avoids missing "Inbox" on paginated /mailFolders lists) + wk = _wellKnownSegmentForName(folder_name) + if wk: + wk_url = f"{graph_url}/me/mailFolders/{wk}" + wk_resp = requests.get(wk_url, headers=headers) + if wk_resp.status_code == 200: + wid = wk_resp.json().get("id") + if wid: + return wid + logger.debug( + f"Well-known folder '{wk}' lookup failed ({wk_resp.status_code}); falling back to folder list" + ) - # Get mail folders + # Get mail folders (first page only; subfolders / pagination may omit Inbox) api_url = f"{graph_url}/me/mailFolders" response = requests.get(api_url, headers=headers)