gateway/modules/demoConfigs/investorDemo2026.py

580 lines
27 KiB
Python

"""
Investor Demo April 2026
Creates a complete demo environment with two mandates, one user,
and all feature instances needed for the investor live demo.
Mandates:
- HappyLife AG (happylife) — Dokumentenablage, Buchhaltung, Automationen, Chatbot, Datenschutz
- Alpina Treuhand AG (alpina) — Dokumentenablage, 3x Treuhand-Kunden, Automationen, Datenschutz
User:
- Patrick Helvetia (p.motsch@poweron.swiss) — SysAdmin, member of both mandates
"""
import json
import logging
import uuid
from typing import Dict, Any, Optional, List
from modules.demoConfigs._baseDemoConfig import _BaseDemoConfig
logger = logging.getLogger(__name__)
_DEMO_PREFIX = "demo-inv2026"
_MANDATE_HAPPYLIFE = {
"name": "happylife",
"label": "HappyLife AG",
}
_MANDATE_ALPINA = {
"name": "alpina-treuhand",
"label": "Alpina Treuhand AG",
}
_USER = {
"username": "patrick.helvetia",
"email": "p.motsch@poweron.swiss",
"fullName": "Patrick Helvetia",
"password": "patrick.helvetia.demo",
"language": "en",
}
_FEATURES_HAPPYLIFE = [
{"code": "workspace", "label": "Dokumentenablage"},
{"code": "trustee", "label": "Buchhaltung"},
{"code": "graphicalEditor", "label": "Automationen"},
{"code": "chatbot", "label": "Chatbot"},
{"code": "neutralization", "label": "Datenschutz"},
]
_FEATURES_ALPINA = [
{"code": "workspace", "label": "Dokumentenablage"},
{"code": "trustee", "label": "BUHA Müller Immobilien GmbH"},
{"code": "trustee", "label": "BUHA Schneider Gastro AG"},
{"code": "trustee", "label": "BUHA Weber Consulting"},
{"code": "graphicalEditor", "label": "Automationen"},
{"code": "neutralization", "label": "Datenschutz"},
]
class InvestorDemo2026(_BaseDemoConfig):
code = "investor-demo-2026"
label = "Investor Demo April 2026"
description = (
"Two mandates (HappyLife AG + Alpina Treuhand AG), one SysAdmin user, "
"trustee with RMA, workspace, graph editor, chatbot, and neutralization."
)
# ------------------------------------------------------------------
# load
# ------------------------------------------------------------------
def load(self, db) -> Dict[str, Any]:
summary: Dict[str, Any] = {"created": [], "skipped": [], "errors": []}
try:
mandateIdHappy = self._ensureMandate(db, _MANDATE_HAPPYLIFE, summary)
mandateIdAlpina = self._ensureMandate(db, _MANDATE_ALPINA, summary)
userId = self._ensureUser(db, summary)
self._ensurePlatformAdminFlag(db, userId, summary)
if mandateIdHappy:
self._ensureMembership(db, userId, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary)
self._ensureFeatures(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], _FEATURES_HAPPYLIFE, summary)
self._ensureFeatureAccess(db, userId, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary)
if mandateIdAlpina:
self._ensureMembership(db, userId, mandateIdAlpina, _MANDATE_ALPINA["label"], summary)
self._ensureFeatures(db, mandateIdAlpina, _MANDATE_ALPINA["label"], _FEATURES_ALPINA, summary)
self._ensureFeatureAccess(db, userId, mandateIdAlpina, _MANDATE_ALPINA["label"], summary)
self._ensureTrusteeRmaConfig(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary)
self._ensureTrusteeRmaConfig(db, mandateIdAlpina, _MANDATE_ALPINA["label"], summary)
self._ensureNeutralizationConfig(db, mandateIdHappy, userId, summary)
self._ensureNeutralizationConfig(db, mandateIdAlpina, userId, summary)
self._ensureBilling(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary)
self._ensureBilling(db, mandateIdAlpina, _MANDATE_ALPINA["label"], summary)
except Exception as e:
logger.error(f"Demo load failed: {e}", exc_info=True)
summary["errors"].append(str(e))
return summary
# ------------------------------------------------------------------
# remove
# ------------------------------------------------------------------
def remove(self, db) -> Dict[str, Any]:
summary: Dict[str, Any] = {"removed": [], "errors": []}
from modules.datamodels.datamodelUam import Mandate, UserInDB
from modules.datamodels.datamodelMembership import UserMandate
for mandateDef in [_MANDATE_HAPPYLIFE, _MANDATE_ALPINA]:
try:
existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]})
for m in existing:
mid = m.get("id")
self._removeMandateData(db, mid, mandateDef["label"], summary)
db.recordDelete(Mandate, mid)
summary["removed"].append(f"Mandate {mandateDef['label']} ({mid})")
logger.info(f"Removed mandate {mandateDef['label']} ({mid})")
except Exception as e:
summary["errors"].append(f"Remove mandate {mandateDef['label']}: {e}")
try:
existing = db.getRecordset(UserInDB, recordFilter={"username": _USER["username"]})
for u in existing:
uid = u.get("id")
memberships = db.getRecordset(UserMandate, recordFilter={"userId": uid})
for mem in memberships:
try:
db.recordDelete(UserMandate, mem.get("id"))
except Exception:
pass
db.recordDelete(UserInDB, uid)
summary["removed"].append(f"User {_USER['username']} ({uid})")
logger.info(f"Removed user {_USER['username']} ({uid})")
except Exception as e:
summary["errors"].append(f"Remove user: {e}")
self._removeLanguageSet(db, "es", summary)
return summary
# ------------------------------------------------------------------
# helpers
# ------------------------------------------------------------------
def _ensureMandate(self, db, mandateDef: Dict, summary: Dict) -> Optional[str]:
from modules.datamodels.datamodelUam import Mandate
from modules.interfaces.interfaceBootstrap import copySystemRolesToMandate
existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]})
if existing:
mid = existing[0].get("id")
summary["skipped"].append(f"Mandate {mandateDef['label']} exists ({mid})")
return mid
mandate = Mandate(name=mandateDef["name"], label=mandateDef["label"], enabled=True)
created = db.recordCreate(Mandate, mandate)
mid = created.get("id")
logger.info(f"Created mandate {mandateDef['label']} ({mid})")
summary["created"].append(f"Mandate {mandateDef['label']}")
copySystemRolesToMandate(db, mid)
return mid
def _ensureUser(self, db, summary: Dict) -> Optional[str]:
from modules.datamodels.datamodelUam import UserInDB, AuthAuthority
from passlib.context import CryptContext
existing = db.getRecordset(UserInDB, recordFilter={"username": _USER["username"]})
if existing:
uid = existing[0].get("id")
summary["skipped"].append(f"User {_USER['username']} exists ({uid})")
return uid
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
user = UserInDB(
username=_USER["username"],
email=_USER["email"],
fullName=_USER["fullName"],
enabled=True,
language=_USER["language"],
isSysAdmin=True,
authenticationAuthority=AuthAuthority.LOCAL,
hashedPassword=pwdContext.hash(_USER["password"]),
)
created = db.recordCreate(UserInDB, user)
uid = created.get("id")
logger.info(f"Created user {_USER['username']} ({uid})")
summary["created"].append(f"User {_USER['fullName']}")
return uid
def _ensurePlatformAdminFlag(self, db, userId: str, summary: Dict):
"""Ensure the demo user has isPlatformAdmin=True for cross-mandate governance.
Without this, the admin UI menus would be hidden."""
from modules.datamodels.datamodelUam import UserInDB
existing = db.getRecord(UserInDB, userId)
if not existing:
summary["errors"].append(f"Demo user {userId} not found — cannot set isPlatformAdmin")
return
currentFlag = bool(existing.get("isPlatformAdmin", False)) if isinstance(existing, dict) else bool(getattr(existing, "isPlatformAdmin", False))
if currentFlag:
summary["skipped"].append("isPlatformAdmin already set")
return
db.recordModify(UserInDB, userId, {"isPlatformAdmin": True})
summary["created"].append("isPlatformAdmin flag")
logger.info(f"Set isPlatformAdmin=True for {_USER['username']}")
def _ensureMembership(self, db, userId: str, mandateId: str, mandateLabel: str, summary: Dict):
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
from modules.datamodels.datamodelRbac import Role
existing = db.getRecordset(UserMandate, recordFilter={"userId": userId, "mandateId": mandateId})
if existing:
userMandateId = existing[0].get("id")
summary["skipped"].append(f"Membership {_USER['username']} -> {mandateLabel} exists")
else:
um = UserMandate(userId=userId, mandateId=mandateId, enabled=True)
created = db.recordCreate(UserMandate, um)
userMandateId = created.get("id")
summary["created"].append(f"Membership {_USER['username']} -> {mandateLabel}")
logger.info(f"Created membership {_USER['username']} -> {mandateLabel}")
adminRoles = db.getRecordset(Role, recordFilter={"mandateId": mandateId, "roleLabel": "admin"})
if adminRoles:
adminRoleId = adminRoles[0].get("id")
existingRole = db.getRecordset(UserMandateRole, recordFilter={"userMandateId": userMandateId, "roleId": adminRoleId})
if not existingRole:
umr = UserMandateRole(userMandateId=userMandateId, roleId=adminRoleId)
db.recordCreate(UserMandateRole, umr)
logger.info(f"Assigned admin role in {mandateLabel}")
def _ensureFeatures(self, db, mandateId: str, mandateLabel: str, featureDefs: List[Dict], summary: Dict):
from modules.interfaces.interfaceFeatures import getFeatureInterface
fi = getFeatureInterface(db)
existingInstances = fi.getFeatureInstancesForMandate(mandateId)
existingLabels = {
(inst.label if hasattr(inst, "label") else inst.get("label", ""))
for inst in existingInstances
}
for featureDef in featureDefs:
code = featureDef["code"]
instanceLabel = featureDef["label"]
if instanceLabel in existingLabels:
summary["skipped"].append(f"Feature '{instanceLabel}' in {mandateLabel} exists")
continue
try:
fi.createFeatureInstance(
featureCode=code,
mandateId=mandateId,
label=instanceLabel,
enabled=True,
copyTemplateRoles=True,
)
summary["created"].append(f"Feature '{instanceLabel}' in {mandateLabel}")
logger.info(f"Created feature instance '{instanceLabel}' ({code}) in {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Feature '{instanceLabel}' in {mandateLabel}: {e}")
logger.error(f"Failed to create feature '{instanceLabel}' ({code}) in {mandateLabel}: {e}")
def _ensureFeatureAccess(self, db, userId: str, mandateId: str, mandateLabel: str, summary: Dict):
"""Grant the demo user admin access to every feature instance in the mandate."""
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
from modules.datamodels.datamodelRbac import Role
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) or []
for inst in instances:
instId = inst.get("id")
featureCode = inst.get("featureCode", "")
if not instId:
continue
existing = db.getRecordset(FeatureAccess, recordFilter={"userId": userId, "featureInstanceId": instId})
if existing:
featureAccessId = existing[0].get("id")
summary["skipped"].append(f"FeatureAccess {featureCode} in {mandateLabel} exists")
else:
fa = FeatureAccess(userId=userId, featureInstanceId=instId, enabled=True)
created = db.recordCreate(FeatureAccess, fa)
featureAccessId = created.get("id")
summary["created"].append(f"FeatureAccess {featureCode} in {mandateLabel}")
logger.info(f"Created feature access for {featureCode} in {mandateLabel}")
adminRoleLabel = f"{featureCode}-admin"
adminRoles = db.getRecordset(Role, recordFilter={
"featureInstanceId": instId,
"roleLabel": adminRoleLabel,
})
if adminRoles:
adminRoleId = adminRoles[0].get("id")
existingRole = db.getRecordset(FeatureAccessRole, recordFilter={
"featureAccessId": featureAccessId,
"roleId": adminRoleId,
})
if not existingRole:
far = FeatureAccessRole(featureAccessId=featureAccessId, roleId=adminRoleId)
db.recordCreate(FeatureAccessRole, far)
logger.info(f"Assigned {adminRoleLabel} role in {mandateLabel}")
def _ensureTrusteeRmaConfig(self, db, mandateId: Optional[str], mandateLabel: str, summary: Dict):
if not mandateId:
return
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig
from modules.shared.configuration import APP_CONFIG, encryptValue
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId, "featureCode": "trustee"})
if not instances:
summary["skipped"].append(f"No trustee instance in {mandateLabel} for RMA config")
return
instanceId = instances[0].get("id")
existing = db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId})
if existing:
summary["skipped"].append(f"RMA config for {mandateLabel} exists")
return
apiBaseUrl = APP_CONFIG.get("Demo_RMA_ApiBaseUrl", "")
clientName = APP_CONFIG.get("Demo_RMA_ClientName", "")
apiKey = APP_CONFIG.get("Demo_RMA_ApiKey", "")
if not apiBaseUrl or not apiKey:
summary["errors"].append(
f"RMA credentials missing in config.ini (Demo_RMA_ApiBaseUrl, Demo_RMA_ClientName, Demo_RMA_ApiKey) for {mandateLabel}"
)
return
plainConfig = {
"apiBaseUrl": apiBaseUrl,
"clientName": clientName,
"apiKey": apiKey,
}
configRecord = {
"id": str(uuid.uuid4()),
"featureInstanceId": instanceId,
"connectorType": "rma",
"displayLabel": "Run My Accounts",
"encryptedConfig": encryptValue(json.dumps(plainConfig), keyName="accountingConfig"),
"isActive": True,
"mandateId": mandateId,
}
db.recordCreate(TrusteeAccountingConfig, configRecord)
summary["created"].append(f"RMA accounting config for {mandateLabel}")
logger.info(f"Created RMA accounting config for {mandateLabel}")
def _ensureNeutralizationConfig(self, db, mandateId: Optional[str], userId: Optional[str], summary: Dict):
if not mandateId or not userId:
return
from modules.datamodels.datamodelFeatures import FeatureInstance
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId, "featureCode": "neutralization"})
if not instances:
return
instanceId = instances[0].get("id")
try:
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig
existing = db.getRecordset(DataNeutraliserConfig, recordFilter={"featureInstanceId": instanceId})
if existing:
summary["skipped"].append(f"Neutralization config for mandate {mandateId} exists")
return
config = DataNeutraliserConfig(
featureInstanceId=instanceId,
mandateId=mandateId,
userId=userId,
enabled=True,
scope="featureInstance",
)
db.recordCreate(DataNeutraliserConfig, config)
summary["created"].append(f"Neutralization config for mandate {mandateId}")
logger.info(f"Created neutralization config for mandate {mandateId}")
except Exception as e:
summary["errors"].append(f"Neutralization config: {e}")
def _ensureBilling(self, db, mandateId: Optional[str], mandateLabel: str, summary: Dict):
if not mandateId:
return
try:
from modules.interfaces.interfaceDbBilling import _getRootInterface
from modules.datamodels.datamodelBilling import BillingSettings
billingInterface = _getRootInterface()
existingSettings = billingInterface.getSettings(mandateId)
if existingSettings:
summary["skipped"].append(f"Billing for {mandateLabel} exists")
return
settings = BillingSettings(
mandateId=mandateId,
warningThresholdPercent=10.0,
notifyOnWarning=True,
)
billingInterface.db.recordCreate(BillingSettings, settings)
summary["created"].append(f"Billing settings for {mandateLabel}")
logger.info(f"Created billing settings for {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Billing for {mandateLabel}: {e}")
def _removeMandateData(self, db, mandateId: str, mandateLabel: str, summary: Dict):
"""Remove all data scoped to a mandate before deleting the mandate itself."""
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole, FeatureAccess, FeatureAccessRole
from modules.datamodels.datamodelRbac import Role, AccessRule
from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage, ChatLog
from modules.datamodels.datamodelBilling import BillingSettings
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) or []
for inst in instances:
instId = inst.get("id")
featureCode = inst.get("featureCode", "")
if not instId:
continue
if featureCode == "graphicalEditor":
self._removeGraphicalEditorData(instId, mandateId, mandateLabel, summary)
if featureCode == "trustee":
self._removeTrusteeData(db, instId, mandateLabel, summary)
if featureCode == "neutralization":
self._removeNeutralizationData(db, instId, mandateLabel, summary)
chatWorkflows = db.getRecordset(ChatWorkflow, recordFilter={"featureInstanceId": instId}) or []
for wf in chatWorkflows:
wfId = wf.get("id")
if not wfId:
continue
for msg in db.getRecordset(ChatMessage, recordFilter={"workflowId": wfId}) or []:
db.recordDelete(ChatMessage, msg.get("id"))
for log in db.getRecordset(ChatLog, recordFilter={"workflowId": wfId}) or []:
db.recordDelete(ChatLog, log.get("id"))
db.recordDelete(ChatWorkflow, wfId)
if chatWorkflows:
summary["removed"].append(f"{len(chatWorkflows)} ChatWorkflows in {mandateLabel}")
accesses = db.getRecordset(FeatureAccess, recordFilter={"featureInstanceId": instId}) or []
for access in accesses:
for role in db.getRecordset(FeatureAccessRole, recordFilter={"featureAccessId": access.get("id")}) or []:
db.recordDelete(FeatureAccessRole, role.get("id"))
db.recordDelete(FeatureAccess, access.get("id"))
db.recordDelete(FeatureInstance, instId)
summary["removed"].append(f"FeatureInstance {featureCode} in {mandateLabel}")
logger.info(f"Removed feature instance {featureCode} ({instId}) in {mandateLabel}")
memberships = db.getRecordset(UserMandate, recordFilter={"mandateId": mandateId}) or []
for um in memberships:
for umr in db.getRecordset(UserMandateRole, recordFilter={"userMandateId": um.get("id")}) or []:
db.recordDelete(UserMandateRole, umr.get("id"))
db.recordDelete(UserMandate, um.get("id"))
if memberships:
summary["removed"].append(f"{len(memberships)} memberships in {mandateLabel}")
roles = db.getRecordset(Role, recordFilter={"mandateId": mandateId}) or []
for role in roles:
for rule in db.getRecordset(AccessRule, recordFilter={"roleId": role.get("id")}) or []:
db.recordDelete(AccessRule, rule.get("id"))
db.recordDelete(Role, role.get("id"))
if roles:
summary["removed"].append(f"{len(roles)} roles in {mandateLabel}")
try:
from modules.interfaces.interfaceDbBilling import _getRootInterface
billingDb = _getRootInterface().db
billingSettings = billingDb.getRecordset(BillingSettings, recordFilter={"mandateId": mandateId}) or []
for bs in billingSettings:
billingDb.recordDelete(BillingSettings, bs.get("id"))
if billingSettings:
summary["removed"].append(f"BillingSettings in {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Billing cleanup for {mandateLabel}: {e}")
def _removeGraphicalEditorData(self, featureInstanceId: str, mandateId: str, mandateLabel: str, summary: Dict):
"""Remove all AutoWorkflow data (workflows, runs, versions, logs, tasks) from the Greenfield DB."""
try:
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask,
)
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
geDb = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase="poweron_graphicaleditor",
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
userId=None,
)
workflows = geDb.getRecordset(AutoWorkflow, recordFilter={
"mandateId": mandateId,
"featureInstanceId": featureInstanceId,
}) or []
for wf in workflows:
wfId = wf.get("id")
if not wfId:
continue
for version in geDb.getRecordset(AutoVersion, recordFilter={"workflowId": wfId}) or []:
geDb.recordDelete(AutoVersion, version.get("id"))
runs = geDb.getRecordset(AutoRun, recordFilter={"workflowId": wfId}) or []
for run in runs:
runId = run.get("id")
for stepLog in geDb.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
geDb.recordDelete(AutoStepLog, stepLog.get("id"))
geDb.recordDelete(AutoRun, runId)
for task in geDb.getRecordset(AutoTask, recordFilter={"workflowId": wfId}) or []:
geDb.recordDelete(AutoTask, task.get("id"))
geDb.recordDelete(AutoWorkflow, wfId)
if workflows:
summary["removed"].append(f"{len(workflows)} AutoWorkflows in {mandateLabel}")
logger.info(f"Removed {len(workflows)} graphical editor workflows for {mandateLabel}")
except Exception as e:
summary["errors"].append(f"GraphicalEditor cleanup for {mandateLabel}: {e}")
logger.error(f"Failed to clean up graphical editor data for {mandateLabel}: {e}")
def _removeTrusteeData(self, db, featureInstanceId: str, mandateLabel: str, summary: Dict):
"""Remove TrusteeAccountingConfig for a feature instance."""
try:
from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig
configs = db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": featureInstanceId}) or []
for cfg in configs:
db.recordDelete(TrusteeAccountingConfig, cfg.get("id"))
if configs:
summary["removed"].append(f"TrusteeAccountingConfig in {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Trustee cleanup for {mandateLabel}: {e}")
def _removeNeutralizationData(self, db, featureInstanceId: str, mandateLabel: str, summary: Dict):
"""Remove DataNeutraliserConfig for a feature instance."""
try:
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig
configs = db.getRecordset(DataNeutraliserConfig, recordFilter={"featureInstanceId": featureInstanceId}) or []
for cfg in configs:
db.recordDelete(DataNeutraliserConfig, cfg.get("id"))
if configs:
summary["removed"].append(f"DataNeutraliserConfig in {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Neutralization cleanup for {mandateLabel}: {e}")
def _removeLanguageSet(self, db, code: str, summary: Dict):
"""Remove a language set if it was created during demo (e.g. 'es' from UC4)."""
try:
from modules.datamodels.datamodelUiLanguage import UiLanguageSet
existing = db.getRecordset(UiLanguageSet, recordFilter={"id": code})
if existing:
db.recordDelete(UiLanguageSet, code)
summary["removed"].append(f"Language set '{code}'")
logger.info(f"Removed language set '{code}'")
except Exception as e:
logger.debug(f"Could not remove language set '{code}': {e}")