603 lines
28 KiB
Python
603 lines
28 KiB
Python
"""
|
|
Investor Demo April 2026
|
|
|
|
Creates a complete demo environment with two mandates, one user,
|
|
and all feature instances needed for the investor live demo.
|
|
|
|
Mandates:
|
|
- HappyLife AG (happylife) — Dokumentenablage, Buchhaltung, Automationen, Chatbot, Datenschutz
|
|
- Alpina Treuhand AG (alpina) — Dokumentenablage, 3x Treuhand-Kunden, Automationen, Datenschutz
|
|
|
|
User:
|
|
- Patrick Helvetia (p.motsch@poweron.swiss) — SysAdmin, member of both mandates
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from typing import Dict, Any, Optional, List
|
|
|
|
from modules.demoConfigs._baseDemoConfig import _BaseDemoConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DEMO_PREFIX = "demo-inv2026"
|
|
|
|
_MANDATE_HAPPYLIFE = {
|
|
"name": "happylife",
|
|
"label": "HappyLife AG",
|
|
}
|
|
|
|
_MANDATE_ALPINA = {
|
|
"name": "alpina-treuhand",
|
|
"label": "Alpina Treuhand AG",
|
|
}
|
|
|
|
_USER = {
|
|
"username": "patrick.helvetia",
|
|
"email": "p.motsch@poweron.swiss",
|
|
"fullName": "Patrick Helvetia",
|
|
"password": "patrick.helvetia.demo",
|
|
"language": "en",
|
|
}
|
|
|
|
_FEATURES_HAPPYLIFE = [
|
|
{"code": "workspace", "label": "Dokumentenablage"},
|
|
{"code": "trustee", "label": "Buchhaltung"},
|
|
{"code": "graphicalEditor", "label": "Automationen"},
|
|
{"code": "chatbot", "label": "Chatbot"},
|
|
{"code": "neutralization", "label": "Datenschutz"},
|
|
]
|
|
_FEATURES_ALPINA = [
|
|
{"code": "workspace", "label": "Dokumentenablage"},
|
|
{"code": "trustee", "label": "BUHA Müller Immobilien GmbH"},
|
|
{"code": "trustee", "label": "BUHA Schneider Gastro AG"},
|
|
{"code": "trustee", "label": "BUHA Weber Consulting"},
|
|
{"code": "graphicalEditor", "label": "Automationen"},
|
|
{"code": "neutralization", "label": "Datenschutz"},
|
|
]
|
|
|
|
|
|
class InvestorDemo2026(_BaseDemoConfig):
|
|
code = "investor-demo-2026"
|
|
label = "Investor Demo April 2026"
|
|
description = (
|
|
"Two mandates (HappyLife AG + Alpina Treuhand AG), one SysAdmin user, "
|
|
"trustee with RMA, workspace, graph editor, chatbot, and neutralization."
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# load
|
|
# ------------------------------------------------------------------
|
|
def load(self, db) -> Dict[str, Any]:
|
|
summary: Dict[str, Any] = {"created": [], "skipped": [], "errors": []}
|
|
|
|
try:
|
|
mandateIdHappy = self._ensureMandate(db, _MANDATE_HAPPYLIFE, summary)
|
|
mandateIdAlpina = self._ensureMandate(db, _MANDATE_ALPINA, summary)
|
|
|
|
userId = self._ensureUser(db, summary)
|
|
self._ensureRootMandateSysAdminRole(db, userId, summary)
|
|
|
|
if mandateIdHappy:
|
|
self._ensureMembership(db, userId, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary)
|
|
self._ensureFeatures(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], _FEATURES_HAPPYLIFE, summary)
|
|
self._ensureFeatureAccess(db, userId, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary)
|
|
|
|
if mandateIdAlpina:
|
|
self._ensureMembership(db, userId, mandateIdAlpina, _MANDATE_ALPINA["label"], summary)
|
|
self._ensureFeatures(db, mandateIdAlpina, _MANDATE_ALPINA["label"], _FEATURES_ALPINA, summary)
|
|
self._ensureFeatureAccess(db, userId, mandateIdAlpina, _MANDATE_ALPINA["label"], summary)
|
|
|
|
self._ensureTrusteeRmaConfig(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary)
|
|
self._ensureTrusteeRmaConfig(db, mandateIdAlpina, _MANDATE_ALPINA["label"], summary)
|
|
|
|
self._ensureNeutralizationConfig(db, mandateIdHappy, userId, summary)
|
|
self._ensureNeutralizationConfig(db, mandateIdAlpina, userId, summary)
|
|
|
|
self._ensureBilling(db, mandateIdHappy, _MANDATE_HAPPYLIFE["label"], summary)
|
|
self._ensureBilling(db, mandateIdAlpina, _MANDATE_ALPINA["label"], summary)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Demo load failed: {e}", exc_info=True)
|
|
summary["errors"].append(str(e))
|
|
|
|
return summary
|
|
|
|
# ------------------------------------------------------------------
|
|
# remove
|
|
# ------------------------------------------------------------------
|
|
def remove(self, db) -> Dict[str, Any]:
|
|
summary: Dict[str, Any] = {"removed": [], "errors": []}
|
|
|
|
from modules.datamodels.datamodelUam import Mandate, UserInDB
|
|
from modules.datamodels.datamodelMembership import UserMandate
|
|
|
|
for mandateDef in [_MANDATE_HAPPYLIFE, _MANDATE_ALPINA]:
|
|
try:
|
|
existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]})
|
|
for m in existing:
|
|
mid = m.get("id")
|
|
self._removeMandateData(db, mid, mandateDef["label"], summary)
|
|
db.recordDelete(Mandate, mid)
|
|
summary["removed"].append(f"Mandate {mandateDef['label']} ({mid})")
|
|
logger.info(f"Removed mandate {mandateDef['label']} ({mid})")
|
|
except Exception as e:
|
|
summary["errors"].append(f"Remove mandate {mandateDef['label']}: {e}")
|
|
|
|
try:
|
|
existing = db.getRecordset(UserInDB, recordFilter={"username": _USER["username"]})
|
|
for u in existing:
|
|
uid = u.get("id")
|
|
memberships = db.getRecordset(UserMandate, recordFilter={"userId": uid})
|
|
for mem in memberships:
|
|
try:
|
|
db.recordDelete(UserMandate, mem.get("id"))
|
|
except Exception:
|
|
pass
|
|
db.recordDelete(UserInDB, uid)
|
|
summary["removed"].append(f"User {_USER['username']} ({uid})")
|
|
logger.info(f"Removed user {_USER['username']} ({uid})")
|
|
except Exception as e:
|
|
summary["errors"].append(f"Remove user: {e}")
|
|
|
|
self._removeLanguageSet(db, "es", summary)
|
|
|
|
return summary
|
|
|
|
# ------------------------------------------------------------------
|
|
# helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _ensureMandate(self, db, mandateDef: Dict, summary: Dict) -> Optional[str]:
|
|
from modules.datamodels.datamodelUam import Mandate
|
|
from modules.interfaces.interfaceBootstrap import copySystemRolesToMandate
|
|
|
|
existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]})
|
|
if existing:
|
|
mid = existing[0].get("id")
|
|
summary["skipped"].append(f"Mandate {mandateDef['label']} exists ({mid})")
|
|
return mid
|
|
|
|
mandate = Mandate(name=mandateDef["name"], label=mandateDef["label"], enabled=True)
|
|
created = db.recordCreate(Mandate, mandate)
|
|
mid = created.get("id")
|
|
logger.info(f"Created mandate {mandateDef['label']} ({mid})")
|
|
summary["created"].append(f"Mandate {mandateDef['label']}")
|
|
|
|
copySystemRolesToMandate(db, mid)
|
|
return mid
|
|
|
|
def _ensureUser(self, db, summary: Dict) -> Optional[str]:
|
|
from modules.datamodels.datamodelUam import UserInDB, AuthAuthority
|
|
from passlib.context import CryptContext
|
|
|
|
existing = db.getRecordset(UserInDB, recordFilter={"username": _USER["username"]})
|
|
if existing:
|
|
uid = existing[0].get("id")
|
|
summary["skipped"].append(f"User {_USER['username']} exists ({uid})")
|
|
return uid
|
|
|
|
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
|
|
user = UserInDB(
|
|
username=_USER["username"],
|
|
email=_USER["email"],
|
|
fullName=_USER["fullName"],
|
|
enabled=True,
|
|
language=_USER["language"],
|
|
isSysAdmin=True,
|
|
authenticationAuthority=AuthAuthority.LOCAL,
|
|
hashedPassword=pwdContext.hash(_USER["password"]),
|
|
)
|
|
created = db.recordCreate(UserInDB, user)
|
|
uid = created.get("id")
|
|
logger.info(f"Created user {_USER['username']} ({uid})")
|
|
summary["created"].append(f"User {_USER['fullName']}")
|
|
return uid
|
|
|
|
def _ensureRootMandateSysAdminRole(self, db, userId: str, summary: Dict):
|
|
"""Ensure the demo user is member of the root mandate with the sysadmin role.
|
|
Without this, hasSysAdminRole returns False and admin menus are hidden."""
|
|
from modules.datamodels.datamodelUam import Mandate
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
|
|
from modules.datamodels.datamodelRbac import Role
|
|
|
|
rootMandates = db.getRecordset(Mandate, recordFilter={"name": "root", "isSystem": True})
|
|
if not rootMandates:
|
|
summary["errors"].append("Root mandate not found — cannot assign sysadmin role")
|
|
return
|
|
|
|
rootMandateId = rootMandates[0].get("id")
|
|
|
|
existing = db.getRecordset(UserMandate, recordFilter={"userId": userId, "mandateId": rootMandateId})
|
|
if existing:
|
|
userMandateId = existing[0].get("id")
|
|
else:
|
|
um = UserMandate(userId=userId, mandateId=rootMandateId, enabled=True)
|
|
created = db.recordCreate(UserMandate, um)
|
|
userMandateId = created.get("id")
|
|
summary["created"].append("Membership -> root mandate")
|
|
logger.info(f"Created root mandate membership for {_USER['username']}")
|
|
|
|
sysadminRoles = db.getRecordset(Role, recordFilter={"mandateId": rootMandateId, "roleLabel": "sysadmin"})
|
|
if not sysadminRoles:
|
|
summary["errors"].append("sysadmin role not found in root mandate")
|
|
return
|
|
|
|
sysadminRoleId = sysadminRoles[0].get("id")
|
|
existingRole = db.getRecordset(UserMandateRole, recordFilter={
|
|
"userMandateId": userMandateId,
|
|
"roleId": sysadminRoleId,
|
|
})
|
|
if not existingRole:
|
|
umr = UserMandateRole(userMandateId=userMandateId, roleId=sysadminRoleId)
|
|
db.recordCreate(UserMandateRole, umr)
|
|
summary["created"].append("SysAdmin role in root mandate")
|
|
logger.info(f"Assigned sysadmin role in root mandate for {_USER['username']}")
|
|
else:
|
|
summary["skipped"].append("SysAdmin role in root mandate exists")
|
|
|
|
def _ensureMembership(self, db, userId: str, mandateId: str, mandateLabel: str, summary: Dict):
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
|
|
from modules.datamodels.datamodelRbac import Role
|
|
|
|
existing = db.getRecordset(UserMandate, recordFilter={"userId": userId, "mandateId": mandateId})
|
|
if existing:
|
|
userMandateId = existing[0].get("id")
|
|
summary["skipped"].append(f"Membership {_USER['username']} -> {mandateLabel} exists")
|
|
else:
|
|
um = UserMandate(userId=userId, mandateId=mandateId, enabled=True)
|
|
created = db.recordCreate(UserMandate, um)
|
|
userMandateId = created.get("id")
|
|
summary["created"].append(f"Membership {_USER['username']} -> {mandateLabel}")
|
|
logger.info(f"Created membership {_USER['username']} -> {mandateLabel}")
|
|
|
|
adminRoles = db.getRecordset(Role, recordFilter={"mandateId": mandateId, "roleLabel": "admin"})
|
|
if adminRoles:
|
|
adminRoleId = adminRoles[0].get("id")
|
|
existingRole = db.getRecordset(UserMandateRole, recordFilter={"userMandateId": userMandateId, "roleId": adminRoleId})
|
|
if not existingRole:
|
|
umr = UserMandateRole(userMandateId=userMandateId, roleId=adminRoleId)
|
|
db.recordCreate(UserMandateRole, umr)
|
|
logger.info(f"Assigned admin role in {mandateLabel}")
|
|
|
|
def _ensureFeatures(self, db, mandateId: str, mandateLabel: str, featureDefs: List[Dict], summary: Dict):
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
|
|
fi = getFeatureInterface(db)
|
|
existingInstances = fi.getFeatureInstancesForMandate(mandateId)
|
|
existingLabels = {
|
|
(inst.label if hasattr(inst, "label") else inst.get("label", ""))
|
|
for inst in existingInstances
|
|
}
|
|
|
|
for featureDef in featureDefs:
|
|
code = featureDef["code"]
|
|
instanceLabel = featureDef["label"]
|
|
if instanceLabel in existingLabels:
|
|
summary["skipped"].append(f"Feature '{instanceLabel}' in {mandateLabel} exists")
|
|
continue
|
|
try:
|
|
fi.createFeatureInstance(
|
|
featureCode=code,
|
|
mandateId=mandateId,
|
|
label=instanceLabel,
|
|
enabled=True,
|
|
copyTemplateRoles=True,
|
|
)
|
|
summary["created"].append(f"Feature '{instanceLabel}' in {mandateLabel}")
|
|
logger.info(f"Created feature instance '{instanceLabel}' ({code}) in {mandateLabel}")
|
|
except Exception as e:
|
|
summary["errors"].append(f"Feature '{instanceLabel}' in {mandateLabel}: {e}")
|
|
logger.error(f"Failed to create feature '{instanceLabel}' ({code}) in {mandateLabel}: {e}")
|
|
|
|
def _ensureFeatureAccess(self, db, userId: str, mandateId: str, mandateLabel: str, summary: Dict):
|
|
"""Grant the demo user admin access to every feature instance in the mandate."""
|
|
from modules.datamodels.datamodelFeatures import FeatureInstance
|
|
from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
|
|
from modules.datamodels.datamodelRbac import Role
|
|
|
|
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) or []
|
|
|
|
for inst in instances:
|
|
instId = inst.get("id")
|
|
featureCode = inst.get("featureCode", "")
|
|
if not instId:
|
|
continue
|
|
|
|
existing = db.getRecordset(FeatureAccess, recordFilter={"userId": userId, "featureInstanceId": instId})
|
|
if existing:
|
|
featureAccessId = existing[0].get("id")
|
|
summary["skipped"].append(f"FeatureAccess {featureCode} in {mandateLabel} exists")
|
|
else:
|
|
fa = FeatureAccess(userId=userId, featureInstanceId=instId, enabled=True)
|
|
created = db.recordCreate(FeatureAccess, fa)
|
|
featureAccessId = created.get("id")
|
|
summary["created"].append(f"FeatureAccess {featureCode} in {mandateLabel}")
|
|
logger.info(f"Created feature access for {featureCode} in {mandateLabel}")
|
|
|
|
adminRoleLabel = f"{featureCode}-admin"
|
|
adminRoles = db.getRecordset(Role, recordFilter={
|
|
"featureInstanceId": instId,
|
|
"roleLabel": adminRoleLabel,
|
|
})
|
|
if adminRoles:
|
|
adminRoleId = adminRoles[0].get("id")
|
|
existingRole = db.getRecordset(FeatureAccessRole, recordFilter={
|
|
"featureAccessId": featureAccessId,
|
|
"roleId": adminRoleId,
|
|
})
|
|
if not existingRole:
|
|
far = FeatureAccessRole(featureAccessId=featureAccessId, roleId=adminRoleId)
|
|
db.recordCreate(FeatureAccessRole, far)
|
|
logger.info(f"Assigned {adminRoleLabel} role in {mandateLabel}")
|
|
|
|
def _ensureTrusteeRmaConfig(self, db, mandateId: Optional[str], mandateLabel: str, summary: Dict):
|
|
if not mandateId:
|
|
return
|
|
|
|
from modules.datamodels.datamodelFeatures import FeatureInstance
|
|
from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig
|
|
from modules.shared.configuration import APP_CONFIG, encryptValue
|
|
|
|
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId, "featureCode": "trustee"})
|
|
if not instances:
|
|
summary["skipped"].append(f"No trustee instance in {mandateLabel} for RMA config")
|
|
return
|
|
|
|
instanceId = instances[0].get("id")
|
|
|
|
existing = db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId})
|
|
if existing:
|
|
summary["skipped"].append(f"RMA config for {mandateLabel} exists")
|
|
return
|
|
|
|
apiBaseUrl = APP_CONFIG.get("Demo_RMA_ApiBaseUrl", "")
|
|
clientName = APP_CONFIG.get("Demo_RMA_ClientName", "")
|
|
apiKey = APP_CONFIG.get("Demo_RMA_ApiKey", "")
|
|
|
|
if not apiBaseUrl or not apiKey:
|
|
summary["errors"].append(
|
|
f"RMA credentials missing in config.ini (Demo_RMA_ApiBaseUrl, Demo_RMA_ClientName, Demo_RMA_ApiKey) for {mandateLabel}"
|
|
)
|
|
return
|
|
|
|
plainConfig = {
|
|
"apiBaseUrl": apiBaseUrl,
|
|
"clientName": clientName,
|
|
"apiKey": apiKey,
|
|
}
|
|
|
|
configRecord = {
|
|
"id": str(uuid.uuid4()),
|
|
"featureInstanceId": instanceId,
|
|
"connectorType": "rma",
|
|
"displayLabel": "Run My Accounts",
|
|
"encryptedConfig": encryptValue(json.dumps(plainConfig), keyName="accountingConfig"),
|
|
"isActive": True,
|
|
"mandateId": mandateId,
|
|
}
|
|
db.recordCreate(TrusteeAccountingConfig, configRecord)
|
|
summary["created"].append(f"RMA accounting config for {mandateLabel}")
|
|
logger.info(f"Created RMA accounting config for {mandateLabel}")
|
|
|
|
def _ensureNeutralizationConfig(self, db, mandateId: Optional[str], userId: Optional[str], summary: Dict):
|
|
if not mandateId or not userId:
|
|
return
|
|
|
|
from modules.datamodels.datamodelFeatures import FeatureInstance
|
|
|
|
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId, "featureCode": "neutralization"})
|
|
if not instances:
|
|
return
|
|
|
|
instanceId = instances[0].get("id")
|
|
|
|
try:
|
|
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig
|
|
|
|
existing = db.getRecordset(DataNeutraliserConfig, recordFilter={"featureInstanceId": instanceId})
|
|
if existing:
|
|
summary["skipped"].append(f"Neutralization config for mandate {mandateId} exists")
|
|
return
|
|
|
|
config = DataNeutraliserConfig(
|
|
featureInstanceId=instanceId,
|
|
mandateId=mandateId,
|
|
userId=userId,
|
|
enabled=True,
|
|
scope="featureInstance",
|
|
)
|
|
db.recordCreate(DataNeutraliserConfig, config)
|
|
summary["created"].append(f"Neutralization config for mandate {mandateId}")
|
|
logger.info(f"Created neutralization config for mandate {mandateId}")
|
|
except Exception as e:
|
|
summary["errors"].append(f"Neutralization config: {e}")
|
|
|
|
def _ensureBilling(self, db, mandateId: Optional[str], mandateLabel: str, summary: Dict):
|
|
if not mandateId:
|
|
return
|
|
try:
|
|
from modules.interfaces.interfaceDbBilling import _getRootInterface
|
|
from modules.datamodels.datamodelBilling import BillingSettings
|
|
|
|
billingInterface = _getRootInterface()
|
|
existingSettings = billingInterface.getSettings(mandateId)
|
|
if existingSettings:
|
|
summary["skipped"].append(f"Billing for {mandateLabel} exists")
|
|
return
|
|
|
|
settings = BillingSettings(
|
|
mandateId=mandateId,
|
|
warningThresholdPercent=10.0,
|
|
notifyOnWarning=True,
|
|
)
|
|
billingInterface.db.recordCreate(BillingSettings, settings)
|
|
summary["created"].append(f"Billing settings for {mandateLabel}")
|
|
logger.info(f"Created billing settings for {mandateLabel}")
|
|
except Exception as e:
|
|
summary["errors"].append(f"Billing for {mandateLabel}: {e}")
|
|
|
|
def _removeMandateData(self, db, mandateId: str, mandateLabel: str, summary: Dict):
|
|
"""Remove all data scoped to a mandate before deleting the mandate itself."""
|
|
from modules.datamodels.datamodelFeatures import FeatureInstance
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole, FeatureAccess, FeatureAccessRole
|
|
from modules.datamodels.datamodelRbac import Role, AccessRule
|
|
from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage, ChatLog
|
|
from modules.datamodels.datamodelBilling import BillingSettings
|
|
|
|
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) or []
|
|
|
|
for inst in instances:
|
|
instId = inst.get("id")
|
|
featureCode = inst.get("featureCode", "")
|
|
if not instId:
|
|
continue
|
|
|
|
if featureCode == "graphicalEditor":
|
|
self._removeGraphicalEditorData(instId, mandateId, mandateLabel, summary)
|
|
|
|
if featureCode == "trustee":
|
|
self._removeTrusteeData(db, instId, mandateLabel, summary)
|
|
|
|
if featureCode == "neutralization":
|
|
self._removeNeutralizationData(db, instId, mandateLabel, summary)
|
|
|
|
chatWorkflows = db.getRecordset(ChatWorkflow, recordFilter={"featureInstanceId": instId}) or []
|
|
for wf in chatWorkflows:
|
|
wfId = wf.get("id")
|
|
if not wfId:
|
|
continue
|
|
for msg in db.getRecordset(ChatMessage, recordFilter={"workflowId": wfId}) or []:
|
|
db.recordDelete(ChatMessage, msg.get("id"))
|
|
for log in db.getRecordset(ChatLog, recordFilter={"workflowId": wfId}) or []:
|
|
db.recordDelete(ChatLog, log.get("id"))
|
|
db.recordDelete(ChatWorkflow, wfId)
|
|
if chatWorkflows:
|
|
summary["removed"].append(f"{len(chatWorkflows)} ChatWorkflows in {mandateLabel}")
|
|
|
|
accesses = db.getRecordset(FeatureAccess, recordFilter={"featureInstanceId": instId}) or []
|
|
for access in accesses:
|
|
for role in db.getRecordset(FeatureAccessRole, recordFilter={"featureAccessId": access.get("id")}) or []:
|
|
db.recordDelete(FeatureAccessRole, role.get("id"))
|
|
db.recordDelete(FeatureAccess, access.get("id"))
|
|
|
|
db.recordDelete(FeatureInstance, instId)
|
|
summary["removed"].append(f"FeatureInstance {featureCode} in {mandateLabel}")
|
|
logger.info(f"Removed feature instance {featureCode} ({instId}) in {mandateLabel}")
|
|
|
|
memberships = db.getRecordset(UserMandate, recordFilter={"mandateId": mandateId}) or []
|
|
for um in memberships:
|
|
for umr in db.getRecordset(UserMandateRole, recordFilter={"userMandateId": um.get("id")}) or []:
|
|
db.recordDelete(UserMandateRole, umr.get("id"))
|
|
db.recordDelete(UserMandate, um.get("id"))
|
|
if memberships:
|
|
summary["removed"].append(f"{len(memberships)} memberships in {mandateLabel}")
|
|
|
|
roles = db.getRecordset(Role, recordFilter={"mandateId": mandateId}) or []
|
|
for role in roles:
|
|
for rule in db.getRecordset(AccessRule, recordFilter={"roleId": role.get("id")}) or []:
|
|
db.recordDelete(AccessRule, rule.get("id"))
|
|
db.recordDelete(Role, role.get("id"))
|
|
if roles:
|
|
summary["removed"].append(f"{len(roles)} roles in {mandateLabel}")
|
|
|
|
try:
|
|
from modules.interfaces.interfaceDbBilling import _getRootInterface
|
|
billingDb = _getRootInterface().db
|
|
billingSettings = billingDb.getRecordset(BillingSettings, recordFilter={"mandateId": mandateId}) or []
|
|
for bs in billingSettings:
|
|
billingDb.recordDelete(BillingSettings, bs.get("id"))
|
|
if billingSettings:
|
|
summary["removed"].append(f"BillingSettings in {mandateLabel}")
|
|
except Exception as e:
|
|
summary["errors"].append(f"Billing cleanup for {mandateLabel}: {e}")
|
|
|
|
def _removeGraphicalEditorData(self, featureInstanceId: str, mandateId: str, mandateLabel: str, summary: Dict):
|
|
"""Remove all AutoWorkflow data (workflows, runs, versions, logs, tasks) from the Greenfield DB."""
|
|
try:
|
|
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
|
|
AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask,
|
|
)
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
geDb = DatabaseConnector(
|
|
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
|
|
dbDatabase="poweron_graphicaleditor",
|
|
dbUser=APP_CONFIG.get("DB_USER"),
|
|
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
|
|
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
|
|
userId=None,
|
|
)
|
|
|
|
workflows = geDb.getRecordset(AutoWorkflow, recordFilter={
|
|
"mandateId": mandateId,
|
|
"featureInstanceId": featureInstanceId,
|
|
}) or []
|
|
|
|
for wf in workflows:
|
|
wfId = wf.get("id")
|
|
if not wfId:
|
|
continue
|
|
|
|
for version in geDb.getRecordset(AutoVersion, recordFilter={"workflowId": wfId}) or []:
|
|
geDb.recordDelete(AutoVersion, version.get("id"))
|
|
|
|
runs = geDb.getRecordset(AutoRun, recordFilter={"workflowId": wfId}) or []
|
|
for run in runs:
|
|
runId = run.get("id")
|
|
for stepLog in geDb.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
|
|
geDb.recordDelete(AutoStepLog, stepLog.get("id"))
|
|
geDb.recordDelete(AutoRun, runId)
|
|
|
|
for task in geDb.getRecordset(AutoTask, recordFilter={"workflowId": wfId}) or []:
|
|
geDb.recordDelete(AutoTask, task.get("id"))
|
|
|
|
geDb.recordDelete(AutoWorkflow, wfId)
|
|
|
|
if workflows:
|
|
summary["removed"].append(f"{len(workflows)} AutoWorkflows in {mandateLabel}")
|
|
logger.info(f"Removed {len(workflows)} graphical editor workflows for {mandateLabel}")
|
|
except Exception as e:
|
|
summary["errors"].append(f"GraphicalEditor cleanup for {mandateLabel}: {e}")
|
|
logger.error(f"Failed to clean up graphical editor data for {mandateLabel}: {e}")
|
|
|
|
def _removeTrusteeData(self, db, featureInstanceId: str, mandateLabel: str, summary: Dict):
|
|
"""Remove TrusteeAccountingConfig for a feature instance."""
|
|
try:
|
|
from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig
|
|
|
|
configs = db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": featureInstanceId}) or []
|
|
for cfg in configs:
|
|
db.recordDelete(TrusteeAccountingConfig, cfg.get("id"))
|
|
if configs:
|
|
summary["removed"].append(f"TrusteeAccountingConfig in {mandateLabel}")
|
|
except Exception as e:
|
|
summary["errors"].append(f"Trustee cleanup for {mandateLabel}: {e}")
|
|
|
|
def _removeNeutralizationData(self, db, featureInstanceId: str, mandateLabel: str, summary: Dict):
|
|
"""Remove DataNeutraliserConfig for a feature instance."""
|
|
try:
|
|
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig
|
|
|
|
configs = db.getRecordset(DataNeutraliserConfig, recordFilter={"featureInstanceId": featureInstanceId}) or []
|
|
for cfg in configs:
|
|
db.recordDelete(DataNeutraliserConfig, cfg.get("id"))
|
|
if configs:
|
|
summary["removed"].append(f"DataNeutraliserConfig in {mandateLabel}")
|
|
except Exception as e:
|
|
summary["errors"].append(f"Neutralization cleanup for {mandateLabel}: {e}")
|
|
|
|
def _removeLanguageSet(self, db, code: str, summary: Dict):
|
|
"""Remove a language set if it was created during demo (e.g. 'es' from UC4)."""
|
|
try:
|
|
from modules.datamodels.datamodelUiLanguage import UiLanguageSet
|
|
|
|
existing = db.getRecordset(UiLanguageSet, recordFilter={"id": code})
|
|
if existing:
|
|
db.recordDelete(UiLanguageSet, code)
|
|
summary["removed"].append(f"Language set '{code}'")
|
|
logger.info(f"Removed language set '{code}'")
|
|
except Exception as e:
|
|
logger.debug(f"Could not remove language set '{code}': {e}")
|