""" Investor Demo April 2026 Creates a complete demo environment with two mandates, one user, and all feature instances needed for the investor live demo. Mandates: - HappyLife AG (happylife) — Dokumentenablage, Buchhaltung, Automationen, Chatbot, Datenschutz - Alpina Treuhand AG (alpina) — Dokumentenablage, 3x Treuhand-Kunden, Automationen, Datenschutz User: - Patrick Helvetia (p.motsch@poweron.swiss) — SysAdmin, member of both mandates """ import json import logging import uuid from typing import Dict, Any, Optional, List from modules.demoConfigs._baseDemoConfig import _BaseDemoConfig logger = logging.getLogger(__name__) _DEMO_PREFIX = "demo-inv2026" _MANDATE_HAPPYLIFE = { "name": "happylife", "label": "HappyLife AG", } _MANDATE_ALPINA = { "name": "alpina-treuhand", "label": "Alpina Treuhand AG", } _USER = { "username": "patrick.helvetia", "email": "p.motsch@poweron.swiss", "fullName": "Patrick Helvetia", "password": "patrick.helvetia.demo", "language": "en", } _FEATURES_HAPPYLIFE = [ {"code": "workspace", "label": "Dokumentenablage"}, {"code": "trustee", "label": "Buchhaltung"}, {"code": "graphicalEditor", "label": "Automationen"}, {"code": "chatbot", "label": "Chatbot"}, {"code": "neutralization", "label": "Datenschutz"}, ] _FEATURES_ALPINA = [ {"code": "workspace", "label": "Dokumentenablage"}, {"code": "trustee", "label": "BUHA Müller Immobilien GmbH"}, {"code": "trustee", "label": "BUHA Schneider Gastro AG"}, {"code": "trustee", "label": "BUHA Weber Consulting"}, {"code": "graphicalEditor", "label": "Automationen"}, {"code": "neutralization", "label": "Datenschutz"}, ] class InvestorDemo2026(_BaseDemoConfig): code = "investor-demo-2026" label = "Investor Demo April 2026" description = ( "Two mandates (HappyLife AG + Alpina Treuhand AG), one SysAdmin user, " "trustee with RMA, workspace, graph editor, chatbot, and neutralization." ) # ------------------------------------------------------------------ # load # ------------------------------------------------------------------ def load(self, db) -> Dict[str, Any]: summary: Dict[str, Any] = {"created": [], "skipped": [], "errors": []} try: mandateIdHappy = self._ensureMandate(db, _MANDATE_HAPPYLIFE, summary) mandateIdAlpina = self._ensureMandate(db, _MANDATE_ALPINA, summary) userId = self._ensureUser(db, summary) self._ensurePlatformAdminFlag(db, userId, summary) if mandateIdHappy: self._ensureMembership(db, userId, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary) self._ensureFeatures(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], _FEATURES_HAPPYLIFE, summary) self._ensureFeatureAccess(db, userId, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary) if mandateIdAlpina: self._ensureMembership(db, userId, mandateIdAlpina, _MANDATE_ALPINA["label"], summary) self._ensureFeatures(db, mandateIdAlpina, _MANDATE_ALPINA["label"], _FEATURES_ALPINA, summary) self._ensureFeatureAccess(db, userId, mandateIdAlpina, _MANDATE_ALPINA["label"], summary) self._ensureTrusteeRmaConfig(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary) self._ensureTrusteeRmaConfig(db, mandateIdAlpina, _MANDATE_ALPINA["label"], summary) self._ensureNeutralizationConfig(db, mandateIdHappy, userId, summary) self._ensureNeutralizationConfig(db, mandateIdAlpina, userId, summary) self._ensureBilling(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary) self._ensureBilling(db, mandateIdAlpina, _MANDATE_ALPINA["label"], summary) except Exception as e: logger.error(f"Demo load failed: {e}", exc_info=True) summary["errors"].append(str(e)) return summary # ------------------------------------------------------------------ # remove # ------------------------------------------------------------------ def remove(self, db) -> Dict[str, Any]: summary: Dict[str, Any] = {"removed": [], "errors": []} from modules.datamodels.datamodelUam import Mandate, UserInDB from modules.datamodels.datamodelMembership import UserMandate for mandateDef in [_MANDATE_HAPPYLIFE, _MANDATE_ALPINA]: try: existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]}) for m in existing: mid = m.get("id") self._removeMandateData(db, mid, mandateDef["label"], summary) db.recordDelete(Mandate, mid) summary["removed"].append(f"Mandate {mandateDef['label']} ({mid})") logger.info(f"Removed mandate {mandateDef['label']} ({mid})") except Exception as e: summary["errors"].append(f"Remove mandate {mandateDef['label']}: {e}") try: existing = db.getRecordset(UserInDB, recordFilter={"username": _USER["username"]}) for u in existing: uid = u.get("id") memberships = db.getRecordset(UserMandate, recordFilter={"userId": uid}) for mem in memberships: try: db.recordDelete(UserMandate, mem.get("id")) except Exception: pass db.recordDelete(UserInDB, uid) summary["removed"].append(f"User {_USER['username']} ({uid})") logger.info(f"Removed user {_USER['username']} ({uid})") except Exception as e: summary["errors"].append(f"Remove user: {e}") self._removeLanguageSet(db, "es", summary) return summary # ------------------------------------------------------------------ # helpers # ------------------------------------------------------------------ def _ensureMandate(self, db, mandateDef: Dict, summary: Dict) -> Optional[str]: from modules.datamodels.datamodelUam import Mandate from modules.interfaces.interfaceBootstrap import copySystemRolesToMandate existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]}) if existing: mid = existing[0].get("id") summary["skipped"].append(f"Mandate {mandateDef['label']} exists ({mid})") return mid mandate = Mandate(name=mandateDef["name"], label=mandateDef["label"], enabled=True) created = db.recordCreate(Mandate, mandate) mid = created.get("id") logger.info(f"Created mandate {mandateDef['label']} ({mid})") summary["created"].append(f"Mandate {mandateDef['label']}") copySystemRolesToMandate(db, mid) return mid def _ensureUser(self, db, summary: Dict) -> Optional[str]: from modules.datamodels.datamodelUam import UserInDB, AuthAuthority from passlib.context import CryptContext existing = db.getRecordset(UserInDB, recordFilter={"username": _USER["username"]}) if existing: uid = existing[0].get("id") summary["skipped"].append(f"User {_USER['username']} exists ({uid})") return uid pwdContext = CryptContext(schemes=["argon2"], deprecated="auto") user = UserInDB( username=_USER["username"], email=_USER["email"], fullName=_USER["fullName"], enabled=True, language=_USER["language"], isSysAdmin=True, authenticationAuthority=AuthAuthority.LOCAL, hashedPassword=pwdContext.hash(_USER["password"]), ) created = db.recordCreate(UserInDB, user) uid = created.get("id") logger.info(f"Created user {_USER['username']} ({uid})") summary["created"].append(f"User {_USER['fullName']}") return uid def _ensurePlatformAdminFlag(self, db, userId: str, summary: Dict): """Ensure the demo user has isPlatformAdmin=True for cross-mandate governance. Without this, the admin UI menus would be hidden.""" from modules.datamodels.datamodelUam import UserInDB existing = db.getRecord(UserInDB, userId) if not existing: summary["errors"].append(f"Demo user {userId} not found — cannot set isPlatformAdmin") return currentFlag = bool(existing.get("isPlatformAdmin", False)) if isinstance(existing, dict) else bool(getattr(existing, "isPlatformAdmin", False)) if currentFlag: summary["skipped"].append("isPlatformAdmin already set") return db.recordModify(UserInDB, userId, {"isPlatformAdmin": True}) summary["created"].append("isPlatformAdmin flag") logger.info(f"Set isPlatformAdmin=True for {_USER['username']}") def _ensureMembership(self, db, userId: str, mandateId: str, mandateLabel: str, summary: Dict): from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole from modules.datamodels.datamodelRbac import Role existing = db.getRecordset(UserMandate, recordFilter={"userId": userId, "mandateId": mandateId}) if existing: userMandateId = existing[0].get("id") summary["skipped"].append(f"Membership {_USER['username']} -> {mandateLabel} exists") else: um = UserMandate(userId=userId, mandateId=mandateId, enabled=True) created = db.recordCreate(UserMandate, um) userMandateId = created.get("id") summary["created"].append(f"Membership {_USER['username']} -> {mandateLabel}") logger.info(f"Created membership {_USER['username']} -> {mandateLabel}") adminRoles = db.getRecordset(Role, recordFilter={"mandateId": mandateId, "roleLabel": "admin"}) if adminRoles: adminRoleId = adminRoles[0].get("id") existingRole = db.getRecordset(UserMandateRole, recordFilter={"userMandateId": userMandateId, "roleId": adminRoleId}) if not existingRole: umr = UserMandateRole(userMandateId=userMandateId, roleId=adminRoleId) db.recordCreate(UserMandateRole, umr) logger.info(f"Assigned admin role in {mandateLabel}") def _ensureFeatures(self, db, mandateId: str, mandateLabel: str, featureDefs: List[Dict], summary: Dict): from modules.interfaces.interfaceFeatures import getFeatureInterface fi = getFeatureInterface(db) existingInstances = fi.getFeatureInstancesForMandate(mandateId) existingLabels = { (inst.label if hasattr(inst, "label") else inst.get("label", "")) for inst in existingInstances } for featureDef in featureDefs: code = featureDef["code"] instanceLabel = featureDef["label"] if instanceLabel in existingLabels: summary["skipped"].append(f"Feature '{instanceLabel}' in {mandateLabel} exists") continue try: fi.createFeatureInstance( featureCode=code, mandateId=mandateId, label=instanceLabel, enabled=True, copyTemplateRoles=True, ) summary["created"].append(f"Feature '{instanceLabel}' in {mandateLabel}") logger.info(f"Created feature instance '{instanceLabel}' ({code}) in {mandateLabel}") except Exception as e: summary["errors"].append(f"Feature '{instanceLabel}' in {mandateLabel}: {e}") logger.error(f"Failed to create feature '{instanceLabel}' ({code}) in {mandateLabel}: {e}") def _ensureFeatureAccess(self, db, userId: str, mandateId: str, mandateLabel: str, summary: Dict): """Grant the demo user admin access to every feature instance in the mandate.""" from modules.datamodels.datamodelFeatures import FeatureInstance from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole from modules.datamodels.datamodelRbac import Role instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) or [] for inst in instances: instId = inst.get("id") featureCode = inst.get("featureCode", "") if not instId: continue existing = db.getRecordset(FeatureAccess, recordFilter={"userId": userId, "featureInstanceId": instId}) if existing: featureAccessId = existing[0].get("id") summary["skipped"].append(f"FeatureAccess {featureCode} in {mandateLabel} exists") else: fa = FeatureAccess(userId=userId, featureInstanceId=instId, enabled=True) created = db.recordCreate(FeatureAccess, fa) featureAccessId = created.get("id") summary["created"].append(f"FeatureAccess {featureCode} in {mandateLabel}") logger.info(f"Created feature access for {featureCode} in {mandateLabel}") adminRoleLabel = f"{featureCode}-admin" adminRoles = db.getRecordset(Role, recordFilter={ "featureInstanceId": instId, "roleLabel": adminRoleLabel, }) if adminRoles: adminRoleId = adminRoles[0].get("id") existingRole = db.getRecordset(FeatureAccessRole, recordFilter={ "featureAccessId": featureAccessId, "roleId": adminRoleId, }) if not existingRole: far = FeatureAccessRole(featureAccessId=featureAccessId, roleId=adminRoleId) db.recordCreate(FeatureAccessRole, far) logger.info(f"Assigned {adminRoleLabel} role in {mandateLabel}") def _ensureTrusteeRmaConfig(self, db, mandateId: Optional[str], mandateLabel: str, summary: Dict): if not mandateId: return from modules.datamodels.datamodelFeatures import FeatureInstance from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig from modules.shared.configuration import APP_CONFIG, encryptValue instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId, "featureCode": "trustee"}) if not instances: summary["skipped"].append(f"No trustee instance in {mandateLabel} for RMA config") return instanceId = instances[0].get("id") existing = db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId}) if existing: summary["skipped"].append(f"RMA config for {mandateLabel} exists") return apiBaseUrl = APP_CONFIG.get("Demo_RMA_ApiBaseUrl", "") clientName = APP_CONFIG.get("Demo_RMA_ClientName", "") apiKey = APP_CONFIG.get("Demo_RMA_ApiKey", "") if not apiBaseUrl or not apiKey: summary["errors"].append( f"RMA credentials missing in config.ini (Demo_RMA_ApiBaseUrl, Demo_RMA_ClientName, Demo_RMA_ApiKey) for {mandateLabel}" ) return plainConfig = { "apiBaseUrl": apiBaseUrl, "clientName": clientName, "apiKey": apiKey, } configRecord = { "id": str(uuid.uuid4()), "featureInstanceId": instanceId, "connectorType": "rma", "displayLabel": "Run My Accounts", "encryptedConfig": encryptValue(json.dumps(plainConfig), keyName="accountingConfig"), "isActive": True, "mandateId": mandateId, } db.recordCreate(TrusteeAccountingConfig, configRecord) summary["created"].append(f"RMA accounting config for {mandateLabel}") logger.info(f"Created RMA accounting config for {mandateLabel}") def _ensureNeutralizationConfig(self, db, mandateId: Optional[str], userId: Optional[str], summary: Dict): if not mandateId or not userId: return from modules.datamodels.datamodelFeatures import FeatureInstance instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId, "featureCode": "neutralization"}) if not instances: return instanceId = instances[0].get("id") try: from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig existing = db.getRecordset(DataNeutraliserConfig, recordFilter={"featureInstanceId": instanceId}) if existing: summary["skipped"].append(f"Neutralization config for mandate {mandateId} exists") return config = DataNeutraliserConfig( featureInstanceId=instanceId, mandateId=mandateId, userId=userId, enabled=True, scope="featureInstance", ) db.recordCreate(DataNeutraliserConfig, config) summary["created"].append(f"Neutralization config for mandate {mandateId}") logger.info(f"Created neutralization config for mandate {mandateId}") except Exception as e: summary["errors"].append(f"Neutralization config: {e}") def _ensureBilling(self, db, mandateId: Optional[str], mandateLabel: str, summary: Dict): if not mandateId: return try: from modules.interfaces.interfaceDbBilling import _getRootInterface from modules.datamodels.datamodelBilling import BillingSettings billingInterface = _getRootInterface() existingSettings = billingInterface.getSettings(mandateId) if existingSettings: summary["skipped"].append(f"Billing for {mandateLabel} exists") return settings = BillingSettings( mandateId=mandateId, warningThresholdPercent=10.0, notifyOnWarning=True, ) billingInterface.db.recordCreate(BillingSettings, settings) summary["created"].append(f"Billing settings for {mandateLabel}") logger.info(f"Created billing settings for {mandateLabel}") except Exception as e: summary["errors"].append(f"Billing for {mandateLabel}: {e}") def _removeMandateData(self, db, mandateId: str, mandateLabel: str, summary: Dict): """Remove all data scoped to a mandate before deleting the mandate itself.""" from modules.datamodels.datamodelFeatures import FeatureInstance from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole, FeatureAccess, FeatureAccessRole from modules.datamodels.datamodelRbac import Role, AccessRule from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage, ChatLog from modules.datamodels.datamodelBilling import BillingSettings instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) or [] for inst in instances: instId = inst.get("id") featureCode = inst.get("featureCode", "") if not instId: continue if featureCode == "graphicalEditor": self._removeGraphicalEditorData(instId, mandateId, mandateLabel, summary) if featureCode == "trustee": self._removeTrusteeData(db, instId, mandateLabel, summary) if featureCode == "neutralization": self._removeNeutralizationData(db, instId, mandateLabel, summary) chatWorkflows = db.getRecordset(ChatWorkflow, recordFilter={"featureInstanceId": instId}) or [] for wf in chatWorkflows: wfId = wf.get("id") if not wfId: continue for msg in db.getRecordset(ChatMessage, recordFilter={"workflowId": wfId}) or []: db.recordDelete(ChatMessage, msg.get("id")) for log in db.getRecordset(ChatLog, recordFilter={"workflowId": wfId}) or []: db.recordDelete(ChatLog, log.get("id")) db.recordDelete(ChatWorkflow, wfId) if chatWorkflows: summary["removed"].append(f"{len(chatWorkflows)} ChatWorkflows in {mandateLabel}") accesses = db.getRecordset(FeatureAccess, recordFilter={"featureInstanceId": instId}) or [] for access in accesses: for role in db.getRecordset(FeatureAccessRole, recordFilter={"featureAccessId": access.get("id")}) or []: db.recordDelete(FeatureAccessRole, role.get("id")) db.recordDelete(FeatureAccess, access.get("id")) db.recordDelete(FeatureInstance, instId) summary["removed"].append(f"FeatureInstance {featureCode} in {mandateLabel}") logger.info(f"Removed feature instance {featureCode} ({instId}) in {mandateLabel}") memberships = db.getRecordset(UserMandate, recordFilter={"mandateId": mandateId}) or [] for um in memberships: for umr in db.getRecordset(UserMandateRole, recordFilter={"userMandateId": um.get("id")}) or []: db.recordDelete(UserMandateRole, umr.get("id")) db.recordDelete(UserMandate, um.get("id")) if memberships: summary["removed"].append(f"{len(memberships)} memberships in {mandateLabel}") roles = db.getRecordset(Role, recordFilter={"mandateId": mandateId}) or [] for role in roles: for rule in db.getRecordset(AccessRule, recordFilter={"roleId": role.get("id")}) or []: db.recordDelete(AccessRule, rule.get("id")) db.recordDelete(Role, role.get("id")) if roles: summary["removed"].append(f"{len(roles)} roles in {mandateLabel}") try: from modules.interfaces.interfaceDbBilling import _getRootInterface billingDb = _getRootInterface().db billingSettings = billingDb.getRecordset(BillingSettings, recordFilter={"mandateId": mandateId}) or [] for bs in billingSettings: billingDb.recordDelete(BillingSettings, bs.get("id")) if billingSettings: summary["removed"].append(f"BillingSettings in {mandateLabel}") except Exception as e: summary["errors"].append(f"Billing cleanup for {mandateLabel}: {e}") def _removeGraphicalEditorData(self, featureInstanceId: str, mandateId: str, mandateLabel: str, summary: Dict): """Remove all AutoWorkflow data (workflows, runs, versions, logs, tasks) from the Greenfield DB.""" try: from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import ( AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask, ) from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG geDb = DatabaseConnector( dbHost=APP_CONFIG.get("DB_HOST", "localhost"), dbDatabase="poweron_graphicaleditor", dbUser=APP_CONFIG.get("DB_USER"), dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"), dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), userId=None, ) workflows = geDb.getRecordset(AutoWorkflow, recordFilter={ "mandateId": mandateId, "featureInstanceId": featureInstanceId, }) or [] for wf in workflows: wfId = wf.get("id") if not wfId: continue for version in geDb.getRecordset(AutoVersion, recordFilter={"workflowId": wfId}) or []: geDb.recordDelete(AutoVersion, version.get("id")) runs = geDb.getRecordset(AutoRun, recordFilter={"workflowId": wfId}) or [] for run in runs: runId = run.get("id") for stepLog in geDb.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []: geDb.recordDelete(AutoStepLog, stepLog.get("id")) geDb.recordDelete(AutoRun, runId) for task in geDb.getRecordset(AutoTask, recordFilter={"workflowId": wfId}) or []: geDb.recordDelete(AutoTask, task.get("id")) geDb.recordDelete(AutoWorkflow, wfId) if workflows: summary["removed"].append(f"{len(workflows)} AutoWorkflows in {mandateLabel}") logger.info(f"Removed {len(workflows)} graphical editor workflows for {mandateLabel}") except Exception as e: summary["errors"].append(f"GraphicalEditor cleanup for {mandateLabel}: {e}") logger.error(f"Failed to clean up graphical editor data for {mandateLabel}: {e}") def _removeTrusteeData(self, db, featureInstanceId: str, mandateLabel: str, summary: Dict): """Remove TrusteeAccountingConfig for a feature instance.""" try: from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig configs = db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": featureInstanceId}) or [] for cfg in configs: db.recordDelete(TrusteeAccountingConfig, cfg.get("id")) if configs: summary["removed"].append(f"TrusteeAccountingConfig in {mandateLabel}") except Exception as e: summary["errors"].append(f"Trustee cleanup for {mandateLabel}: {e}") def _removeNeutralizationData(self, db, featureInstanceId: str, mandateLabel: str, summary: Dict): """Remove DataNeutraliserConfig for a feature instance.""" try: from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig configs = db.getRecordset(DataNeutraliserConfig, recordFilter={"featureInstanceId": featureInstanceId}) or [] for cfg in configs: db.recordDelete(DataNeutraliserConfig, cfg.get("id")) if configs: summary["removed"].append(f"DataNeutraliserConfig in {mandateLabel}") except Exception as e: summary["errors"].append(f"Neutralization cleanup for {mandateLabel}: {e}") def _removeLanguageSet(self, db, code: str, summary: Dict): """Remove a language set if it was created during demo (e.g. 'es' from UC4).""" try: from modules.datamodels.datamodelUiLanguage import UiLanguageSet existing = db.getRecordset(UiLanguageSet, recordFilter={"id": code}) if existing: db.recordDelete(UiLanguageSet, code) summary["removed"].append(f"Language set '{code}'") logger.info(f"Removed language set '{code}'") except Exception as e: logger.debug(f"Could not remove language set '{code}': {e}")