gateway/modules/migration/migrateVoiceAndDocuments.py
2026-03-24 16:39:25 +01:00

316 lines
12 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Migration: Voice settings consolidation and CoachingDocument scope-tagging.
Moves VoiceSettings (workspace DB) and CoachingUserProfile voice fields (commcoach DB)
into the unified UserVoicePreferences model, and tags CoachingDocument files with
featureInstance scope before deleting the legacy records.
Called once from bootstrap, sets a DB flag to prevent re-execution.
"""
import logging
import uuid
from typing import Dict, List, Optional
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelUam import UserVoicePreferences
logger = logging.getLogger(__name__)
_MIGRATION_FLAG_KEY = "migration_voice_documents_completed"
def _isMigrationCompleted(db) -> bool:
"""Check if migration has already been executed."""
try:
from modules.datamodels.datamodelUam import Mandate
records = db.getRecordset(Mandate, recordFilter={"name": _MIGRATION_FLAG_KEY})
return len(records) > 0
except Exception:
return False
def _setMigrationCompleted(db) -> None:
"""Set flag that migration is completed (uses a settings-like record)."""
if _isMigrationCompleted(db):
return
try:
from modules.datamodels.datamodelUam import Mandate
flag = Mandate(name=_MIGRATION_FLAG_KEY, label="Migration completed", enabled=False, isSystem=True)
db.recordCreate(Mandate, flag)
logger.info("Migration flag set: voice & documents migration completed")
except Exception as e:
logger.error(f"Failed to set migration flag: {e}")
def _getRawRows(connector: DatabaseConnector, tableName: str, columns: List[str]) -> List[Dict]:
"""Read all rows from a table via raw SQL. Returns empty list if table doesn't exist."""
try:
connector._ensure_connection()
colList = ", ".join(f'"{c}"' for c in columns)
with connector.connection.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM information_schema.tables "
"WHERE LOWER(table_name) = LOWER(%s) AND table_schema = 'public'",
(tableName,),
)
if cur.fetchone()["count"] == 0:
logger.info(f"Table '{tableName}' does not exist, skipping")
return []
cur.execute(f'SELECT {colList} FROM "{tableName}"')
return [dict(row) for row in cur.fetchall()]
except Exception as e:
logger.warning(f"Raw query on '{tableName}' failed: {e}")
try:
connector.connection.rollback()
except Exception:
pass
return []
def _deleteRawRow(connector: DatabaseConnector, tableName: str, rowId: str) -> bool:
"""Delete a single row by id via raw SQL."""
try:
connector._ensure_connection()
with connector.connection.cursor() as cur:
cur.execute(f'DELETE FROM "{tableName}" WHERE "id" = %s', (rowId,))
connector.connection.commit()
return True
except Exception as e:
logger.warning(f"Failed to delete row {rowId} from '{tableName}': {e}")
try:
connector.connection.rollback()
except Exception:
pass
return False
def _createDbConnector(dbName: str) -> Optional[DatabaseConnector]:
"""Create a DatabaseConnector for a named database, returns None on failure."""
try:
dbHost = APP_CONFIG.get("DB_HOST")
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
return DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbName,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
)
except Exception as e:
logger.warning(f"Could not connect to database '{dbName}': {e}")
return None
# ─── Part A ───────────────────────────────────────────────────────────────────
def _migrateVoiceSettings(db, wsDb: DatabaseConnector, dryRun: bool, stats: Dict) -> None:
"""Migrate VoiceSettings records from poweron_workspace into UserVoicePreferences."""
rows = _getRawRows(wsDb, "VoiceSettings", [
"id", "userId", "mandateId", "ttsVoiceMap", "sttLanguage", "ttsLanguage", "ttsVoice",
])
if not rows:
logger.info("Part A: No VoiceSettings records found, skipping")
return
for row in rows:
userId = row.get("userId")
if not userId:
continue
existing = db.getRecordset(UserVoicePreferences, recordFilter={"userId": userId})
if existing:
stats["voiceSettingsSkipped"] += 1
if not dryRun:
_deleteRawRow(wsDb, "VoiceSettings", row["id"])
continue
if dryRun:
logger.info(f"[DRY RUN] Would create UserVoicePreferences for user {userId} from VoiceSettings")
stats["voiceSettingsCreated"] += 1
continue
try:
import json
ttsVoiceMap = row.get("ttsVoiceMap")
if isinstance(ttsVoiceMap, str):
try:
ttsVoiceMap = json.loads(ttsVoiceMap)
except (json.JSONDecodeError, TypeError):
ttsVoiceMap = None
prefs = UserVoicePreferences(
userId=userId,
mandateId=row.get("mandateId"),
ttsVoiceMap=ttsVoiceMap,
sttLanguage=row.get("sttLanguage", "de-DE"),
ttsLanguage=row.get("ttsLanguage", "de-DE"),
ttsVoice=row.get("ttsVoice"),
)
db.recordCreate(UserVoicePreferences, prefs)
stats["voiceSettingsCreated"] += 1
_deleteRawRow(wsDb, "VoiceSettings", row["id"])
except Exception as e:
logger.error(f"Part A: Failed to migrate VoiceSettings {row['id']}: {e}")
stats["errors"] += 1
# ─── Part B ───────────────────────────────────────────────────────────────────
def _migrateCoachingProfileVoice(db, ccDb: DatabaseConnector, dryRun: bool, stats: Dict) -> None:
"""Migrate preferredLanguage/preferredVoice from CoachingUserProfile into UserVoicePreferences."""
rows = _getRawRows(ccDb, "CoachingUserProfile", [
"id", "userId", "mandateId", "preferredLanguage", "preferredVoice",
])
if not rows:
logger.info("Part B: No CoachingUserProfile records with voice data found, skipping")
return
for row in rows:
userId = row.get("userId")
prefLang = row.get("preferredLanguage")
prefVoice = row.get("preferredVoice")
if not userId or (not prefLang and not prefVoice):
continue
existing = db.getRecordset(UserVoicePreferences, recordFilter={"userId": userId})
if existing:
stats["coachingProfileSkipped"] += 1
continue
if dryRun:
logger.info(f"[DRY RUN] Would create UserVoicePreferences for user {userId} from CoachingUserProfile")
stats["coachingProfileCreated"] += 1
continue
try:
prefs = UserVoicePreferences(
userId=userId,
mandateId=row.get("mandateId"),
sttLanguage=prefLang or "de-DE",
ttsLanguage=prefLang or "de-DE",
ttsVoice=prefVoice,
)
db.recordCreate(UserVoicePreferences, prefs)
stats["coachingProfileCreated"] += 1
except Exception as e:
logger.error(f"Part B: Failed to migrate CoachingUserProfile {row['id']}: {e}")
stats["errors"] += 1
# ─── Part C ───────────────────────────────────────────────────────────────────
def _migrateCoachingDocuments(ccDb: DatabaseConnector, dryRun: bool, stats: Dict) -> None:
"""Tag FileItem/FileContentIndex with featureInstance scope for each CoachingDocument."""
from modules.datamodels.datamodelFiles import FileItem
from modules.datamodels.datamodelKnowledge import FileContentIndex
rows = _getRawRows(ccDb, "CoachingDocument", [
"id", "fileRef", "instanceId",
])
if not rows:
logger.info("Part C: No CoachingDocument records found, skipping")
return
mgmtDb = _createDbConnector("poweron_management")
knowledgeDb = _createDbConnector("poweron_knowledge")
if not mgmtDb:
logger.error("Part C: Cannot connect to poweron_management, aborting document migration")
return
for row in rows:
fileRef = row.get("fileRef")
instanceId = row.get("instanceId")
docId = row.get("id")
if not fileRef:
if not dryRun:
_deleteRawRow(ccDb, "CoachingDocument", docId)
continue
if dryRun:
logger.info(f"[DRY RUN] Would tag FileItem {fileRef} with featureInstanceId={instanceId}")
stats["documentsTagged"] += 1
continue
try:
fileRecords = mgmtDb.getRecordset(FileItem, recordFilter={"id": fileRef})
if fileRecords:
updateData = {"scope": "featureInstance"}
if instanceId:
updateData["featureInstanceId"] = instanceId
mgmtDb.recordModify(FileItem, fileRef, updateData)
stats["documentsTagged"] += 1
else:
logger.warning(f"Part C: FileItem {fileRef} not found in management DB")
if knowledgeDb:
fciRecords = knowledgeDb.getRecordset(FileContentIndex, recordFilter={"id": fileRef})
if fciRecords:
fciUpdate = {"scope": "featureInstance"}
if instanceId:
fciUpdate["featureInstanceId"] = instanceId
knowledgeDb.recordModify(FileContentIndex, fileRef, fciUpdate)
_deleteRawRow(ccDb, "CoachingDocument", docId)
except Exception as e:
logger.error(f"Part C: Failed to migrate CoachingDocument {docId}: {e}")
stats["errors"] += 1
# ─── Main entry ───────────────────────────────────────────────────────────────
def migrateVoiceAndDocuments(db, dryRun: bool = False) -> dict:
"""
Migrate VoiceSettings + CoachingUserProfile voice fields into UserVoicePreferences,
and tag CoachingDocument files with featureInstance scope.
Args:
db: Root database connector (poweron_app)
dryRun: If True, log actions without making changes
Returns:
Summary dict with migration statistics
"""
if _isMigrationCompleted(db):
logger.info("Voice & documents migration already completed, skipping")
return {"status": "already_completed"}
stats = {
"voiceSettingsCreated": 0,
"voiceSettingsSkipped": 0,
"coachingProfileCreated": 0,
"coachingProfileSkipped": 0,
"documentsTagged": 0,
"errors": 0,
"dryRun": dryRun,
}
wsDb = _createDbConnector("poweron_workspace")
ccDb = _createDbConnector("poweron_commcoach")
# Part A
if wsDb:
_migrateVoiceSettings(db, wsDb, dryRun, stats)
else:
logger.warning("Skipping Part A: poweron_workspace DB unavailable")
# Part B
if ccDb:
_migrateCoachingProfileVoice(db, ccDb, dryRun, stats)
else:
logger.warning("Skipping Part B: poweron_commcoach DB unavailable")
# Part C
if ccDb:
_migrateCoachingDocuments(ccDb, dryRun, stats)
else:
logger.warning("Skipping Part C: poweron_commcoach DB unavailable")
if not dryRun:
_setMigrationCompleted(db)
logger.info(f"Voice & documents migration completed: {stats}")
return {"status": "completed", **stats}