platform-core/modules/demoConfigs/pwgDemo2026.py
ValueOn AG ebc4b2a080
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 12s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
cp adapted to 2026 poweron 2
2026-06-09 09:58:05 +02:00

740 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""PWG Pilot Demo (April 2026)
Bootstraps a complete PWG-Pilot demo environment in an empty dev/demo install:
- 1 mandate "Stiftung PWG"
- 1 SysAdmin demo user "pwg.demo"
- 3 features: workspace, trustee (BUHA PWG), neutralization (Datenschutz)
- Trustee seed-data (5 fictitious tenants with monthly rent journal lines for
the current year, loaded from ``demoData/pwg/_seedTrusteeData.json``)
- Pilot workflow imported from
``demoData/workflows/pwg-mietzinsbestaetigung-pilot.workflow.json``
(active=false — user activates manually after triggering once).
Idempotent: ``load()`` skips anything that already exists; ``remove()`` deletes
mandate, user, seed data and imported workflow cleanly.
Pattern: subclass of :class:`BaseDemoConfig`, auto-discovered by
``demoConfigs/__init__.py``. See ``investorDemo2026.py`` for the reference
implementation we mirror here.
"""
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from modules.demoConfigs.baseDemoConfig import BaseDemoConfig
logger = logging.getLogger(__name__)
_DEMO_PREFIX = "demo-pwg2026"
_MANDATE_PWG = {
"name": "stiftung-pwg",
"label": "Stiftung PWG",
}
_USER = {
"username": "pwg.demo",
"email": "pwg.demo@poweron.swiss",
"fullName": "PWG Demo Sachbearbeiter",
"password": "pwg.demo.2026",
"language": "de",
}
_FEATURES_PWG = [
{"code": "workspace", "label": "Dokumentenablage PWG"},
{"code": "trustee", "label": "Buchhaltung PWG"},
{"code": "neutralization", "label": "Datenschutz"},
]
_SEED_TRUSTEE_FILE = "_seedTrusteeData.json"
class PwgDemo2026(BaseDemoConfig):
code = "pwg-demo-2026"
label = "PWG Pilot Demo (Mietzinsbestätigungen)"
description = (
"Stiftung PWG, ein Demo-Sachbearbeiter, Trustee mit fiktiven Mietern, "
"Workflow-Automation (als File importiert, active=false). Idempotent."
)
credentials = [
{
"role": "Demo-Sachbearbeiter",
"username": _USER["username"],
"email": _USER["email"],
"password": _USER["password"],
}
]
# ------------------------------------------------------------------
# load
# ------------------------------------------------------------------
def load(self, db) -> Dict[str, Any]:
summary: Dict[str, Any] = {"created": [], "skipped": [], "errors": []}
try:
mandateId = self._ensureMandate(db, _MANDATE_PWG, summary)
userId = self._ensureUser(db, summary)
self._ensurePlatformAdminFlag(db, userId, summary)
if mandateId and userId:
self._ensureMembership(db, userId, mandateId, _MANDATE_PWG["label"], summary)
self._ensureFeatures(db, mandateId, _MANDATE_PWG["label"], _FEATURES_PWG, summary)
self._ensureFeatureAccess(db, userId, mandateId, _MANDATE_PWG["label"], summary)
self._ensureNeutralizationConfig(db, mandateId, userId, summary)
self._ensureBilling(db, mandateId, _MANDATE_PWG["label"], summary)
trusteeInstanceId = self._getFeatureInstanceId(db, mandateId, "trustee", "Buchhaltung PWG")
if trusteeInstanceId:
self._ensureTrusteeSeed(mandateId, trusteeInstanceId, summary)
except Exception as e:
logger.error(f"PWG demo load failed: {e}", exc_info=True)
summary["errors"].append(str(e))
# Surface initial credentials so the SysAdmin doesn't have to grep the
# source code -- consumed by AdminDemoConfigPage to render a copyable
# login box in the result banner.
summary["credentials"] = list(self.credentials)
return summary
# ------------------------------------------------------------------
# remove
# ------------------------------------------------------------------
def remove(self, db) -> Dict[str, Any]:
summary: Dict[str, Any] = {"removed": [], "errors": []}
from modules.datamodels.datamodelMembership import UserMandate
from modules.datamodels.datamodelUam import Mandate, UserInDB
removedMandateIds = set()
try:
existing = db.getRecordset(Mandate, recordFilter={"name": _MANDATE_PWG["name"]})
for m in existing:
mid = m.get("id")
self._removeMandateData(db, mid, _MANDATE_PWG["label"], summary)
db.recordDelete(Mandate, mid)
removedMandateIds.add(mid)
summary["removed"].append(f"Mandate {_MANDATE_PWG['label']} ({mid})")
logger.info(f"Removed mandate {_MANDATE_PWG['label']} ({mid})")
except Exception as e:
summary["errors"].append(f"Remove mandate {_MANDATE_PWG['label']}: {e}")
# SAFETY: NEVER delete the user record. The user may have connections,
# chats, workflows, files, and other data across multiple databases.
# Only remove the mandate memberships that THIS demo created.
try:
existing = db.getRecordset(UserInDB, recordFilter={"username": _USER["username"]})
for u in existing:
uid = u.get("id")
memberships = db.getRecordset(UserMandate, recordFilter={"userId": uid}) or []
for mem in memberships:
if mem.get("mandateId") in removedMandateIds:
try:
db.recordDelete(UserMandate, mem.get("id"))
except Exception:
pass
summary["skipped"].append(
f"User {_USER['username']} ({uid}) preserved (only demo mandate memberships removed)"
)
logger.info(f"Preserved user {_USER['username']} ({uid}) - removed demo mandate memberships only")
except Exception as e:
summary["errors"].append(f"Remove user memberships: {e}")
return summary
# ==================================================================
# — load helpers (mostly mirrors of investorDemo2026.py)
# ==================================================================
def _ensureMandate(self, db, mandateDef: Dict, summary: Dict) -> Optional[str]:
from modules.datamodels.datamodelUam import Mandate
from modules.interfaces.interfaceRbac import copySystemRolesToMandate
existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]})
if existing:
mid = existing[0].get("id")
summary["skipped"].append(f"Mandate {mandateDef['label']} exists ({mid})")
return mid
mandate = Mandate(name=mandateDef["name"], label=mandateDef["label"], enabled=True)
created = db.recordCreate(Mandate, mandate)
mid = created.get("id")
logger.info(f"Created mandate {mandateDef['label']} ({mid})")
summary["created"].append(f"Mandate {mandateDef['label']}")
copySystemRolesToMandate(db, mid)
return mid
def _ensureUser(self, db, summary: Dict) -> Optional[str]:
from modules.datamodels.datamodelUam import AuthAuthority, UserInDB
from passlib.context import CryptContext
existing = db.getRecordset(UserInDB, recordFilter={"username": _USER["username"]})
if existing:
uid = existing[0].get("id")
summary["skipped"].append(f"User {_USER['username']} exists ({uid})")
return uid
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
user = UserInDB(
username=_USER["username"],
email=_USER["email"],
fullName=_USER["fullName"],
enabled=True,
language=_USER["language"],
isSysAdmin=True,
authenticationAuthority=AuthAuthority.LOCAL,
hashedPassword=pwdContext.hash(_USER["password"]),
)
created = db.recordCreate(UserInDB, user)
uid = created.get("id")
logger.info(f"Created user {_USER['username']} ({uid})")
summary["created"].append(f"User {_USER['fullName']}")
return uid
def _ensurePlatformAdminFlag(self, db, userId: Optional[str], summary: Dict):
from modules.datamodels.datamodelUam import UserInDB
if not userId:
return
existing = db.getRecord(UserInDB, userId)
if not existing:
summary["errors"].append(f"User {userId} not found — cannot set isPlatformAdmin")
return
currentFlag = bool(existing.get("isPlatformAdmin", False)) if isinstance(existing, dict) else bool(getattr(existing, "isPlatformAdmin", False))
if currentFlag:
summary["skipped"].append("isPlatformAdmin already set")
return
db.recordModify(UserInDB, userId, {"isPlatformAdmin": True})
summary["created"].append("isPlatformAdmin flag")
def _ensureMembership(self, db, userId: str, mandateId: str, mandateLabel: str, summary: Dict):
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
from modules.datamodels.datamodelRbac import Role
existing = db.getRecordset(UserMandate, recordFilter={"userId": userId, "mandateId": mandateId})
if existing:
userMandateId = existing[0].get("id")
summary["skipped"].append(f"Membership {_USER['username']} -> {mandateLabel} exists")
else:
um = UserMandate(userId=userId, mandateId=mandateId, enabled=True)
created = db.recordCreate(UserMandate, um)
userMandateId = created.get("id")
summary["created"].append(f"Membership {_USER['username']} -> {mandateLabel}")
adminRoles = db.getRecordset(Role, recordFilter={"mandateId": mandateId, "roleLabel": "admin"})
if adminRoles:
adminRoleId = adminRoles[0].get("id")
existingRole = db.getRecordset(UserMandateRole, recordFilter={"userMandateId": userMandateId, "roleId": adminRoleId})
if not existingRole:
umr = UserMandateRole(userMandateId=userMandateId, roleId=adminRoleId)
db.recordCreate(UserMandateRole, umr)
def _ensureFeatures(self, db, mandateId: str, mandateLabel: str, featureDefs: List[Dict], summary: Dict):
from modules.interfaces.interfaceFeatures import getFeatureInterface
fi = getFeatureInterface(db)
existingInstances = fi.getFeatureInstancesForMandate(mandateId)
existingLabels = {
(inst.label if hasattr(inst, "label") else inst.get("label", ""))
for inst in existingInstances
}
for featureDef in featureDefs:
code = featureDef["code"]
instanceLabel = featureDef["label"]
if instanceLabel in existingLabels:
summary["skipped"].append(f"Feature '{instanceLabel}' in {mandateLabel} exists")
continue
try:
fi.createFeatureInstance(
featureCode=code,
mandateId=mandateId,
label=instanceLabel,
enabled=True,
copyTemplateRoles=True,
)
summary["created"].append(f"Feature '{instanceLabel}' in {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Feature '{instanceLabel}' in {mandateLabel}: {e}")
def _ensureFeatureAccess(self, db, userId: str, mandateId: str, mandateLabel: str, summary: Dict):
"""Grant the demo user admin access on EVERY feature instance of the
mandate. Without an explicit ``FeatureAccess`` + ``{code}-admin`` role
the user does not see any feature tile in the UI -- so this method
ALSO heals a half-broken state by re-copying the per-feature template
roles if they are missing (e.g. when the instance was created via an
older code path that skipped ``copyTemplateRoles``).
"""
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
from modules.datamodels.datamodelRbac import Role
from modules.interfaces.interfaceFeatures import getFeatureInterface
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) or []
for inst in instances:
instId = inst.get("id")
featureCode = inst.get("featureCode", "")
if not instId:
continue
existing = db.getRecordset(FeatureAccess, recordFilter={"userId": userId, "featureInstanceId": instId})
if existing:
featureAccessId = existing[0].get("id")
summary["skipped"].append(f"FeatureAccess {featureCode} in {mandateLabel} exists")
else:
fa = FeatureAccess(userId=userId, featureInstanceId=instId, enabled=True)
created = db.recordCreate(FeatureAccess, fa)
featureAccessId = created.get("id")
summary["created"].append(f"FeatureAccess {featureCode} in {mandateLabel}")
adminRoleLabel = f"{featureCode}-admin"
adminRoles = db.getRecordset(Role, recordFilter={
"featureInstanceId": instId,
"roleLabel": adminRoleLabel,
})
# Self-heal: if the per-feature admin role does not exist on this
# instance the template roles were never copied -- copy them now.
if not adminRoles:
logger.warning(
"Feature instance %s (%s) is missing role '%s' -- "
"re-copying template roles", instId, featureCode, adminRoleLabel,
)
try:
fi = getFeatureInterface(db)
fi._copyTemplateRoles(featureCode, mandateId, instId)
summary["created"].append(
f"Repaired template roles for {featureCode} in {mandateLabel}"
)
except Exception as repairErr:
summary["errors"].append(
f"Could not repair template roles for {featureCode} "
f"in {mandateLabel}: {repairErr}"
)
adminRoles = db.getRecordset(Role, recordFilter={
"featureInstanceId": instId,
"roleLabel": adminRoleLabel,
})
if not adminRoles:
# Hard fail surfaced to UI -- without the admin role the user
# would silently not see the instance.
summary["errors"].append(
f"Admin role '{adminRoleLabel}' not found for feature "
f"instance {featureCode} in {mandateLabel} -- demo user "
f"will not see this feature."
)
continue
adminRoleId = adminRoles[0].get("id")
existingRole = db.getRecordset(FeatureAccessRole, recordFilter={
"featureAccessId": featureAccessId,
"roleId": adminRoleId,
})
if not existingRole:
far = FeatureAccessRole(featureAccessId=featureAccessId, roleId=adminRoleId)
db.recordCreate(FeatureAccessRole, far)
summary["created"].append(
f"Role '{adminRoleLabel}' assigned to demo user in {mandateLabel}"
)
def _ensureNeutralizationConfig(self, db, mandateId: Optional[str], userId: Optional[str], summary: Dict):
if not mandateId or not userId:
return
from modules.datamodels.datamodelFeatures import FeatureInstance
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId, "featureCode": "neutralization"})
if not instances:
return
instanceId = instances[0].get("id")
try:
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig
existing = db.getRecordset(DataNeutraliserConfig, recordFilter={"featureInstanceId": instanceId})
if existing:
summary["skipped"].append(f"Neutralization config for mandate {mandateId} exists")
return
config = DataNeutraliserConfig(
featureInstanceId=instanceId,
mandateId=mandateId,
userId=userId,
enabled=True,
scope="featureInstance",
)
db.recordCreate(DataNeutraliserConfig, config)
summary["created"].append(f"Neutralization config for mandate {mandateId}")
except Exception as e:
summary["errors"].append(f"Neutralization config: {e}")
def _ensureBilling(self, db, mandateId: Optional[str], mandateLabel: str, summary: Dict):
if not mandateId:
return
try:
from modules.datamodels.datamodelBilling import BillingSettings
from modules.interfaces.interfaceDbBilling import getRootInterface
billingInterface = getRootInterface()
existingSettings = billingInterface.getSettings(mandateId)
if existingSettings:
summary["skipped"].append(f"Billing for {mandateLabel} exists")
return
settings = BillingSettings(
mandateId=mandateId,
warningThresholdPercent=10.0,
notifyOnWarning=True,
)
billingInterface.db.recordCreate(BillingSettings, settings)
summary["created"].append(f"Billing settings for {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Billing for {mandateLabel}: {e}")
def _getFeatureInstanceId(self, db, mandateId: str, featureCode: str, label: str) -> Optional[str]:
from modules.datamodels.datamodelFeatures import FeatureInstance
instances = db.getRecordset(FeatureInstance, recordFilter={
"mandateId": mandateId,
"featureCode": featureCode,
"label": label,
}) or []
if instances:
return instances[0].get("id")
# fallback: any instance of that feature in the mandate
instances = db.getRecordset(FeatureInstance, recordFilter={
"mandateId": mandateId,
"featureCode": featureCode,
}) or []
return instances[0].get("id") if instances else None
# ------------------------------------------------------------------
# PWG-specific helpers — Trustee seed-data + pilot-workflow import
# ------------------------------------------------------------------
def _ensureTrusteeSeed(self, mandateId: str, featureInstanceId: str, summary: Dict):
"""Idempotently load 5 fictitious tenants and their 12-month rent
journal lines into the trustee database for this feature instance.
Skips any tenant whose contact (matched by name+address) already
exists, so re-running ``load()`` is safe.
"""
seedPath = _demoDataDir() / "pwg" / _SEED_TRUSTEE_FILE
if not seedPath.is_file():
summary["errors"].append(f"PWG seed file missing: {seedPath}")
return
try:
seed = json.loads(seedPath.read_text(encoding="utf-8"))
except Exception as exc:
summary["errors"].append(f"PWG seed file unreadable: {exc}")
return
try:
trusteeDb = _openTrusteeDb()
except Exception as exc:
summary["errors"].append(f"Trustee DB connection failed: {exc}")
return
from modules.features.trustee.datamodelFeatureTrustee import (
TrusteeDataAccount,
TrusteeDataContact,
TrusteeDataJournalEntry,
TrusteeDataJournalLine,
)
rentAccountNumber = str(seed.get("rentAccount", "6000"))
year = int(seed.get("year", datetime.now().year))
# 1) Ensure rent account exists once
existingAccounts = trusteeDb.getRecordset(TrusteeDataAccount, recordFilter={
"featureInstanceId": featureInstanceId,
"accountNumber": rentAccountNumber,
}) or []
if not existingAccounts:
trusteeDb.recordCreate(TrusteeDataAccount, TrusteeDataAccount(
accountNumber=rentAccountNumber,
label=str(seed.get("rentAccountLabel", "Mietzinsertrag")),
accountType="revenue",
accountGroup="rental_income",
currency="CHF",
isActive=True,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
))
summary["created"].append(f"Trustee account {rentAccountNumber}")
# 2) Ensure contacts + monthly journal entries
createdTenants = 0
skippedTenants = 0
for tenant in seed.get("tenants", []):
name = tenant.get("name", "")
address = tenant.get("address", "")
if not name:
continue
existing = trusteeDb.getRecordset(TrusteeDataContact, recordFilter={
"featureInstanceId": featureInstanceId,
"name": name,
"address": address,
}) or []
if existing:
skippedTenants += 1
continue
contact = TrusteeDataContact(
externalId=tenant.get("contactNumber"),
contactType="customer",
contactNumber=tenant.get("contactNumber"),
name=name,
address=address,
zip=tenant.get("zip"),
city=tenant.get("city"),
country=tenant.get("country"),
email=tenant.get("email"),
mandateId=mandateId,
featureInstanceId=featureInstanceId,
)
trusteeDb.recordCreate(TrusteeDataContact, contact)
createdTenants += 1
# 12 monthly rent bookings (credit on rent account)
monthlyRent = float(tenant.get("monthlyRentChf") or 0.0)
if monthlyRent <= 0:
continue
for month in range(1, 13):
from datetime import datetime as _dtCls, timezone as _tzCls
bookingTs = _dtCls(year, month, 1, tzinfo=_tzCls.utc).timestamp()
entryRef = f"PWG-{tenant.get('contactNumber')}-{year}{month:02d}"
entry = TrusteeDataJournalEntry(
externalId=entryRef,
bookingDate=bookingTs,
reference=entryRef,
description=f"Mietzins {month:02d}/{year} {name}",
currency="CHF",
totalAmount=monthlyRent,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
)
createdEntry = trusteeDb.recordCreate(TrusteeDataJournalEntry, entry)
line = TrusteeDataJournalLine(
journalEntryId=createdEntry.get("id"),
accountNumber=rentAccountNumber,
debitAmount=0.0,
creditAmount=monthlyRent,
currency="CHF",
description=f"Mietzins {month:02d}/{year} {name} ({tenant.get('contactNumber')})",
mandateId=mandateId,
featureInstanceId=featureInstanceId,
)
trusteeDb.recordCreate(TrusteeDataJournalLine, line)
if createdTenants:
summary["created"].append(f"PWG seed: {createdTenants} tenants × 12 monthly journal lines")
if skippedTenants:
summary["skipped"].append(f"PWG seed: {skippedTenants} tenants already present")
def _guessTrusteeInstanceId(self, mandateId: str) -> Optional[str]:
"""Return the first trustee feature-instance id of the given mandate.
The demo only ever creates one trustee feature in this mandate, so a
first-hit lookup is sufficient and avoids depending on the label.
"""
try:
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.shared.configuration import APP_CONFIG
appDb = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase="poweron_app",
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
userId=None,
)
instances = appDb.getRecordset(FeatureInstance, recordFilter={
"mandateId": mandateId,
"featureCode": "trustee",
}) or []
return instances[0].get("id") if instances else None
except Exception as exc:
logger.warning(f"Could not resolve trustee instance for mandate {mandateId}: {exc}")
return None
# ------------------------------------------------------------------
# remove helpers
# ------------------------------------------------------------------
def _removeMandateData(self, db, mandateId: str, mandateLabel: str, summary: Dict):
"""Cascade-delete everything created by load() for this mandate."""
from modules.datamodels.datamodelBilling import BillingSettings
from modules.datamodels.datamodelChat import ChatLog, ChatMessage, ChatWorkflow
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.datamodels.datamodelMembership import (
FeatureAccess,
FeatureAccessRole,
UserMandate,
UserMandateRole,
)
from modules.datamodels.datamodelRbac import AccessRule, Role
instances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) or []
for inst in instances:
instId = inst.get("id")
featureCode = inst.get("featureCode", "")
if not instId:
continue
if featureCode == "workflowAutomation":
self._removeWorkflowAutomationData(instId, mandateId, mandateLabel, summary)
if featureCode == "trustee":
self._removeTrusteeSeed(instId, mandateLabel, summary)
if featureCode == "neutralization":
self._removeNeutralizationData(db, instId, mandateLabel, summary)
chatWorkflows = db.getRecordset(ChatWorkflow, recordFilter={"featureInstanceId": instId}) or []
for wf in chatWorkflows:
wfId = wf.get("id")
for msg in db.getRecordset(ChatMessage, recordFilter={"workflowId": wfId}) or []:
db.recordDelete(ChatMessage, msg.get("id"))
for log in db.getRecordset(ChatLog, recordFilter={"workflowId": wfId}) or []:
db.recordDelete(ChatLog, log.get("id"))
db.recordDelete(ChatWorkflow, wfId)
accesses = db.getRecordset(FeatureAccess, recordFilter={"featureInstanceId": instId}) or []
for access in accesses:
for role in db.getRecordset(FeatureAccessRole, recordFilter={"featureAccessId": access.get("id")}) or []:
db.recordDelete(FeatureAccessRole, role.get("id"))
db.recordDelete(FeatureAccess, access.get("id"))
db.recordDelete(FeatureInstance, instId)
summary["removed"].append(f"FeatureInstance {featureCode} in {mandateLabel}")
memberships = db.getRecordset(UserMandate, recordFilter={"mandateId": mandateId}) or []
for um in memberships:
for umr in db.getRecordset(UserMandateRole, recordFilter={"userMandateId": um.get("id")}) or []:
db.recordDelete(UserMandateRole, umr.get("id"))
db.recordDelete(UserMandate, um.get("id"))
roles = db.getRecordset(Role, recordFilter={"mandateId": mandateId}) or []
for role in roles:
for rule in db.getRecordset(AccessRule, recordFilter={"roleId": role.get("id")}) or []:
db.recordDelete(AccessRule, rule.get("id"))
db.recordDelete(Role, role.get("id"))
try:
from modules.interfaces.interfaceDbBilling import getRootInterface
billingDb = getRootInterface().db
billingSettings = billingDb.getRecordset(BillingSettings, recordFilter={"mandateId": mandateId}) or []
for bs in billingSettings:
billingDb.recordDelete(BillingSettings, bs.get("id"))
except Exception as e:
summary["errors"].append(f"Billing cleanup for {mandateLabel}: {e}")
def _removeWorkflowAutomationData(self, featureInstanceId: str, mandateId: str, mandateLabel: str, summary: Dict):
try:
from modules.datamodels.datamodelWorkflowAutomation import (
AutoRun,
AutoStepLog,
AutoTask,
AutoVersion,
AutoWorkflow,
)
waDb = _openWorkflowAutomationDb()
workflows = waDb.getRecordset(AutoWorkflow, recordFilter={
"mandateId": mandateId,
"featureInstanceId": featureInstanceId,
}) or []
for wf in workflows:
wfId = wf.get("id")
for version in waDb.getRecordset(AutoVersion, recordFilter={"workflowId": wfId}) or []:
waDb.recordDelete(AutoVersion, version.get("id"))
for run in waDb.getRecordset(AutoRun, recordFilter={"workflowId": wfId}) or []:
runId = run.get("id")
for step in waDb.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
waDb.recordDelete(AutoStepLog, step.get("id"))
waDb.recordDelete(AutoRun, runId)
for task in waDb.getRecordset(AutoTask, recordFilter={"workflowId": wfId}) or []:
waDb.recordDelete(AutoTask, task.get("id"))
waDb.recordDelete(AutoWorkflow, wfId)
if workflows:
summary["removed"].append(f"{len(workflows)} AutoWorkflows in {mandateLabel}")
except Exception as e:
summary["errors"].append(f"WorkflowAutomation cleanup for {mandateLabel}: {e}")
def _removeTrusteeSeed(self, featureInstanceId: str, mandateLabel: str, summary: Dict):
try:
from modules.features.trustee.datamodelFeatureTrustee import (
TrusteeAccountingConfig,
TrusteeDataAccount,
TrusteeDataContact,
TrusteeDataJournalEntry,
TrusteeDataJournalLine,
)
trusteeDb = _openTrusteeDb()
for model in (
TrusteeDataJournalLine,
TrusteeDataJournalEntry,
TrusteeDataContact,
TrusteeDataAccount,
TrusteeAccountingConfig,
):
rows = trusteeDb.getRecordset(model, recordFilter={"featureInstanceId": featureInstanceId}) or []
for row in rows:
trusteeDb.recordDelete(model, row.get("id"))
if rows:
summary["removed"].append(f"{len(rows)} {model.__name__} in {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Trustee cleanup for {mandateLabel}: {e}")
def _removeNeutralizationData(self, db, featureInstanceId: str, mandateLabel: str, summary: Dict):
try:
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig
configs = db.getRecordset(DataNeutraliserConfig, recordFilter={"featureInstanceId": featureInstanceId}) or []
for cfg in configs:
db.recordDelete(DataNeutraliserConfig, cfg.get("id"))
if configs:
summary["removed"].append(f"DataNeutraliserConfig in {mandateLabel}")
except Exception as e:
summary["errors"].append(f"Neutralization cleanup for {mandateLabel}: {e}")
# ----------------------------------------------------------------------
# Module-level helpers (private)
# ----------------------------------------------------------------------
def _demoDataDir() -> Path:
"""Return absolute path to ``gateway/demoData`` regardless of CWD."""
# __file__ = .../gateway/modules/demoConfigs/pwgDemo2026.py
return Path(__file__).resolve().parents[2] / "demoData"
def _openTrusteeDb():
"""Open a privileged DB connection to ``poweron_trustee`` (used by both
seed and remove paths so they work consistently)."""
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
return DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase="poweron_trustee",
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
userId=None,
)
def _openWorkflowAutomationDb():
"""Open a privileged DB connection to the workflow-automation database."""
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelWorkflowAutomation import WORKFLOW_AUTOMATION_DATABASE
from modules.shared.configuration import APP_CONFIG
return DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
userId=None,
)