108 KiB
Multi-Tenant Gateway - Implementierungskonzept
Version: 3.3
Datum: 16. Januar 2026
Status: Entwurf (Review-Feedback eingearbeitet)
Basis: mandate_concept.md
⚠️ GREENFIELD-Implementierung: Keine Datenmigration, keine Backwards Compatibility, keine Fallback-Logik. Bestehende User-Mandate-Struktur wird vollständig ersetzt.
Scope dieses Konzepts
Dieses Konzept definiert die Code-Architektur und Datenmodelle für Multi-Mandantenfähigkeit.
Enthalten:
- Datenmodelle und Beziehungen
- RBAC-System und Berechtigungslogik
- API-Endpoints und Kontext-Handling
- Sicherheitsaspekte auf Code-Ebene
Nicht enthalten (out of scope):
- Testing: Testkonzepte und Teststrategie werden separat dokumentiert
- Datenbank-Performance: Wird über DB-Skalierung (Read Replicas, Connection Pooling, etc.) auf Infrastruktur-Ebene gelöst, nicht im Code. Keine Limits für Rollen pro User, keine Archivierungsstrategie für Junction Tables - dies sind DB-Infrastruktur-Themen.
- Organisatorische DSGVO-Themen: Daten-Anonymisierung bei Export ist Verantwortung des Users/Mandanten, kein Code-Problem. Der generische Gateway stellt nur die technischen Endpoints bereit - Feature-spezifische Export-Logik (z.B. Trustee-Contracts mit Kundendaten) wird in den jeweiligen Features implementiert.
- Consent-Tracking (DSGVO Art. 7): Einwilligungs-Management ist nicht Scope des Gateway - wird separat in den Features behandelt, die personenbezogene Daten verarbeiten.
Breaking Changes (gewollt):
User.mandateIdundUser.roleLabelswerden entfernt - dies ist bewusste Design-Entscheidung für das neue Multi-Mandant-Modell- Alle bestehenden Code-Referenzen auf diese Felder müssen migriert werden
1. Architektur-Übersicht
1.1 Kernprinzip
User (global)
│
├── UserMandate ──► Mandate
│ └── roleIds: List[str] → Role.id[]
│
└── FeatureAccess ──► FeatureInstance ──► Mandate
└── roleIds: List[str] → Role.id[]
Kein User gehört zu einem Mandanten. Zugehörigkeit wird über UserMandate und FeatureAccess gesteuert.
1.2 Drei Ebenen von Funktionen
| Ebene | Beschreibung | Beispiele | Berechtigung |
|---|---|---|---|
| System | Grundfunktionen ohne Mandant | Login, Logout, Profil, Settings | Jeder eingeloggte User |
| Business | Features in Mandanten | Trustee, Chatbot, Workflow | Via FeatureAccess |
| Admin | Plattform-Verwaltung | User-, Mandate-, RBAC-Management | isSysAdmin=true am User |
1.3 Bestehende Security-Infrastruktur (Gateway)
| Feature | Status | Implementierung |
|---|---|---|
| Password Hashing | ✅ Vorhanden | Argon2 via passlib (CryptContext(schemes=["argon2"])) |
| JWT Token | ✅ Vorhanden | HS256/HS384, httpOnly Cookies, SameSite=strict |
| Token Revocation | ✅ Vorhanden | DB-backed mit sessionId, Status active/revoked |
| Rate Limiting | ✅ Vorhanden | slowapi.Limiter auf allen Auth-Endpoints |
| CSRF Protection | ✅ Vorhanden | CSRFMiddleware in app.py |
| Audit Logging | ✅ Vorhanden | auditLogger.py mit daily rotation |
2. Datenmodell
2.1 Core Models
class User(BaseModel):
id: str
username: str
email: Optional[str]
isSysAdmin: bool = False # Globales Admin-Flag
enabled: bool = True
# KEIN mandateId, KEINE roleLabels direkt am User!
class Mandate(BaseModel):
id: str
name: str
enabled: bool = True
# Kein rbacCacheVersion - Stateless Design, kein Cache
2.2 Role Model (Kontextgebunden)
Kernkonzept: Eine Rolle existiert immer in einem spezifischen Kontext. Der Kontext ist IMMUTABLE nach Erstellung.
class Role(BaseModel):
"""Rolle mit unveränderlichem Kontext"""
id: str
roleLabel: str # z.B. "admin", "user", "viewer"
description: Dict[str, str] # I18n
# KONTEXT - IMMUTABLE nach Create (nur Create/Delete, kein Update!)
mandateId: Optional[str] # FK → Mandate (CASCADE DELETE)
featureInstanceId: Optional[str] # FK → FeatureInstance (CASCADE DELETE)
featureCode: Optional[str] # z.B. "trustee" - für Template-Rollen
# Kontext-Typ (abgeleitet, für Klarheit)
# - mandateId=None, featureInstanceId=None → GLOBAL (Template)
# - mandateId=X, featureInstanceId=None → MANDATE
# - mandateId=X, featureInstanceId=Y → INSTANCE
isSystemRole: bool = False # System-Rollen können nicht gelöscht werden
RBAC-Enforcement für Role:
# Role.context-Felder (mandateId, featureInstanceId) haben NUR:
# - CREATE: Ja (beim Erstellen setzen)
# - READ: Ja
# - UPDATE: NEIN! → Wird vom RBAC-System blockiert
# - DELETE: Ja (ganze Role löschen)
2.3 Membership Models (Junction Tables für CASCADE DELETE)
Architektur: Rollen werden über Junction Tables verknüpft, nicht als Array-Felder. Dies ermöglicht saubere CASCADE DELETE auf Datenbankebene.
class UserMandate(BaseModel):
"""User-Mitgliedschaft in Mandant"""
id: str
userId: str # FK → User (CASCADE DELETE)
mandateId: str # FK → Mandate (CASCADE DELETE)
enabled: bool = True
# Rollen via Junction Table UserMandateRole
class FeatureAccess(BaseModel):
"""User-Zugriff auf Feature-Instanz"""
id: str
userId: str # FK → User (CASCADE DELETE)
featureInstanceId: str # FK → FeatureInstance (CASCADE DELETE)
enabled: bool = True
# Rollen via Junction Table FeatureAccessRole
class UserMandateRole(BaseModel):
"""Junction: UserMandate zu Role"""
id: str
userMandateId: str # FK → UserMandate (CASCADE DELETE)
roleId: str # FK → Role (CASCADE DELETE)
class FeatureAccessRole(BaseModel):
"""Junction: FeatureAccess zu Role"""
id: str
featureAccessId: str # FK → FeatureAccess (CASCADE DELETE)
roleId: str # FK → Role (CASCADE DELETE)
Begründung für Junction Tables (statt Array-Felder):
- PostgreSQL CASCADE DELETE funktioniert auf Foreign Keys, nicht auf Array-Elemente
- Saubere referentielle Integrität
- Performante Queries mit JOINs
- Standard-Pattern für m:n Beziehungen
2.4 Feature Models
class Feature(BaseModel):
"""Feature-Definition (global, z.B. 'trustee')"""
code: str # PK, z.B. "trustee"
label: Dict[str, str] # I18n
icon: str
class FeatureInstance(BaseModel):
"""Instanz eines Features in einem Mandanten"""
id: str
featureCode: str # FK → Feature
mandateId: str # FK → Mandate (CASCADE DELETE)
label: str # z.B. "Buchhaltung 2025"
enabled: bool = True
# Kein rbacCacheVersion - Stateless Design, kein Cache
2.5 AccessRule Model
class AccessRule(BaseModel):
"""Berechtigungsregel für eine Rolle"""
id: str
roleId: str # FK → Role.id (CASCADE DELETE!)
context: str # "DATA" | "UI" | "RESOURCE" - IMMUTABLE!
item: Optional[str] # Dot-Notation (siehe 3.1)
view: bool = False
read: Optional[str] # "n" | "m" | "g" | "a"
create: Optional[str]
update: Optional[str]
delete: Optional[str]
RBAC-Enforcement für AccessRule:
# AccessRule.context und roleId sind IMMUTABLE:
# - CREATE: Ja
# - READ: Ja
# - UPDATE auf context/roleId: NEIN! → Blockiert
# - UPDATE auf permissions (view, read, etc.): Ja
# - DELETE: Ja
2.6 Kontext-Vererbung
Role definiert den Scope:
├── Global Role (mandateId=None) → AccessRules gelten überall
├── Mandate Role (mandateId=X) → AccessRules gelten nur in Mandant X
└── Instance Role (mandateId=X, instanceId=Y) → AccessRules gelten nur in Instanz Y
AccessRule erbt Scope von Role - kein separates mandateId/featureInstanceId auf AccessRule nötig!
3. RBAC-System
3.1 Item-Notation (Dot-Separated)
Alle berechtigungspflichtigen Elemente werden mit Dot-Notation definiert:
| Context | Format | Beispiele |
|---|---|---|
DATA |
table oder table.field |
ChatWorkflow, User.email, TrusteeContract.amount |
UI |
component.group.name... |
nav.trustee, dialog.settings.voice, button.export |
RESOURCE |
resource.category.name... |
ai.model.anthropic, action.email.send, connector.sharepoint |
Hierarchie: Längster Match gewinnt. trustee.contract überschreibt trustee.
3.2 IMMUTABLE Felder (Enforcement)
Kritisch: Bestimmte Felder dürfen nach Erstellung NICHT mehr geändert werden:
| Model | Feld | Create | Read | Update | Delete |
|---|---|---|---|---|---|
Role |
mandateId |
✅ | ✅ | ❌ | ✅ (ganzes Objekt) |
Role |
featureInstanceId |
✅ | ✅ | ❌ | ✅ (ganzes Objekt) |
AccessRule |
context |
✅ | ✅ | ❌ | ✅ (ganzes Objekt) |
AccessRule |
roleId |
✅ | ✅ | ❌ | ✅ (ganzes Objekt) |
Implementation auf zwei Ebenen:
1. Application Level (erste Verteidigung):
# Im RBAC-System: Update auf immutable Felder blockieren
IMMUTABLE_FIELDS = {
"Role": ["mandateId", "featureInstanceId", "featureCode"],
"AccessRule": ["context", "roleId"]
}
def validateUpdate(model: str, updateData: dict):
forbidden = IMMUTABLE_FIELDS.get(model, [])
for field in forbidden:
if field in updateData:
raise PermissionError(f"Field '{field}' is immutable on {model}")
2. Database Level (zweite Verteidigung - verhindert Bypass):
-- Trigger: Verhindert Änderung von immutable Feldern auf Role
CREATE OR REPLACE FUNCTION prevent_role_context_update()
RETURNS TRIGGER AS $$
BEGIN
IF OLD."mandateId" IS DISTINCT FROM NEW."mandateId" THEN
RAISE EXCEPTION 'mandateId is immutable on Role';
END IF;
IF OLD."featureInstanceId" IS DISTINCT FROM NEW."featureInstanceId" THEN
RAISE EXCEPTION 'featureInstanceId is immutable on Role';
END IF;
IF OLD."featureCode" IS DISTINCT FROM NEW."featureCode" THEN
RAISE EXCEPTION 'featureCode is immutable on Role';
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER tr_role_immutable
BEFORE UPDATE ON "Role"
FOR EACH ROW EXECUTE FUNCTION prevent_role_context_update();
-- Trigger: Verhindert Änderung von immutable Feldern auf AccessRule
CREATE OR REPLACE FUNCTION prevent_accessrule_context_update()
RETURNS TRIGGER AS $$
BEGIN
IF OLD."context" IS DISTINCT FROM NEW."context" THEN
RAISE EXCEPTION 'context is immutable on AccessRule';
END IF;
IF OLD."roleId" IS DISTINCT FROM NEW."roleId" THEN
RAISE EXCEPTION 'roleId is immutable on AccessRule';
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER tr_accessrule_immutable
BEFORE UPDATE ON "AccessRule"
FOR EACH ROW EXECUTE FUNCTION prevent_accessrule_context_update();
3.3 Regel-Ebenen und Priorisierung
Priorisierung (höher überschreibt niedriger):
1. Instance Role (Role.featureInstanceId gesetzt) → höchste Priorität
2. Mandate Role (Role.mandateId gesetzt, featureInstanceId=None)
3. Global Role (Role.mandateId=None) → niedrigste Priorität
3.4 Standard-Rollen pro Feature
Beim Erstellen einer FeatureInstance werden Rollen und deren AccessRules kopiert:
def createFeatureInstance(featureCode: str, mandateId: str, label: str):
"""
Erstellt eine neue Feature-Instanz und kopiert Template-Rollen.
WICHTIG: Templates werden NUR bei Erstellung kopiert.
Spätere Template-Änderungen werden NICHT automatisch propagiert.
Für manuelle Nachsynchronisation siehe syncRolesFromTemplate().
"""
instance = FeatureInstance(featureCode=featureCode, mandateId=mandateId, label=label)
db.create(instance)
# Globale Template-Rollen für dieses Feature finden
globalRoles = db.getRoles(featureCode=featureCode, mandateId=None)
templateRoleIds = [r.id for r in globalRoles]
if not templateRoleIds:
return instance
# BULK: Alle Template-AccessRules in einem Query laden
allTemplateRules = db.execute(
'SELECT * FROM "AccessRule" WHERE "roleId" = ANY(:roleIds)',
{"roleIds": templateRoleIds}
)
# Index für schnellen Lookup: roleId -> rules
rulesByRoleId = {}
for rule in allTemplateRules:
roleId = rule["roleId"]
if roleId not in rulesByRoleId:
rulesByRoleId[roleId] = []
rulesByRoleId[roleId].append(rule)
# Neue Rollen und AccessRules sammeln für Bulk-Insert
newRoles = []
newAccessRules = []
for templateRole in globalRoles:
newRoleId = str(uuid.uuid4())
newRoles.append({
"id": newRoleId,
"roleLabel": templateRole.roleLabel,
"description": templateRole.description,
"featureCode": templateRole.featureCode,
"mandateId": mandateId,
"featureInstanceId": instance.id,
"isSystemRole": False
})
# AccessRules für diese Rolle vorbereiten
templateRulesForRole = rulesByRoleId.get(templateRole.id, [])
for rule in templateRulesForRole:
newAccessRules.append({
"id": str(uuid.uuid4()),
"roleId": newRoleId,
"context": rule["context"],
"item": rule["item"],
"view": rule["view"],
"read": rule["read"],
"create": rule["create"],
"update": rule["update"],
"delete": rule["delete"]
})
# BULK INSERT: Alle neuen Rollen und AccessRules
if newRoles:
db.bulkInsert("Role", newRoles)
if newAccessRules:
db.bulkInsert("AccessRule", newAccessRules)
return instance
3.5 Template-Rollen Synchronisation
Problem: Wenn globale Template-Rollen aktualisiert werden, sind bestehende Feature-Instanzen nicht automatisch aktuell.
Lösung: Explizite Sync-Funktion für Mandant-Admins oder bei Template-Updates.
async def syncRolesFromTemplate(featureInstanceId: str, addOnly: bool = True):
"""
Synchronisiert Rollen einer Feature-Instanz mit den aktuellen Templates.
WICHTIG: Templates werden NUR bei Erstellung einer neuen FeatureInstance kopiert.
Diese Sync-Funktion ist für manuelle Nachsynchronisation gedacht, nicht für
automatische Propagation von Template-Änderungen.
Args:
featureInstanceId: ID der zu synchronisierenden Instanz
addOnly: Wenn True, werden nur fehlende Rollen hinzugefügt.
Wenn False, werden auch überzählige Rollen entfernt.
Returns:
SyncResult mit added/removed/unchanged Counts
"""
instance = db.getFeatureInstance(featureInstanceId)
if not instance:
raise ValueError(f"FeatureInstance {featureInstanceId} not found")
# Aktuelle Template-Rollen für dieses Feature
templateRoles = db.getRoles(featureCode=instance.featureCode, mandateId=None)
templateLabels = {r.roleLabel for r in templateRoles}
templateRoleIds = [r.id for r in templateRoles]
# Aktuelle Instanz-Rollen
instanceRoles = db.getRoles(featureInstanceId=featureInstanceId)
instanceLabels = {r.roleLabel for r in instanceRoles}
result = {"added": 0, "removed": 0, "unchanged": 0}
async with db.transaction() as tx:
# BULK: Alle Template-AccessRules in einem Query laden
allTemplateRules = []
if templateRoleIds:
allTemplateRules = await tx.execute(
'SELECT * FROM "AccessRule" WHERE "roleId" = ANY(:roleIds)',
{"roleIds": templateRoleIds}
)
# Index für schnellen Lookup: roleId -> rules
rulesByRoleId = {}
for rule in allTemplateRules:
roleId = rule["roleId"]
if roleId not in rulesByRoleId:
rulesByRoleId[roleId] = []
rulesByRoleId[roleId].append(rule)
# Neue Rollen und AccessRules sammeln für Bulk-Insert
newRoles = []
newAccessRules = []
roleIdMapping = {} # templateRoleId -> newRoleId
for templateRole in templateRoles:
if templateRole.roleLabel not in instanceLabels:
# Neue Rolle vorbereiten
newRoleId = str(uuid.uuid4())
roleIdMapping[templateRole.id] = newRoleId
newRoles.append({
"id": newRoleId,
"roleLabel": templateRole.roleLabel,
"description": templateRole.description,
"featureCode": templateRole.featureCode,
"mandateId": instance.mandateId,
"featureInstanceId": featureInstanceId,
"isSystemRole": False
})
# AccessRules für diese Rolle vorbereiten
templateRulesForRole = rulesByRoleId.get(templateRole.id, [])
for rule in templateRulesForRole:
newAccessRules.append({
"id": str(uuid.uuid4()),
"roleId": newRoleId,
"context": rule["context"],
"item": rule["item"],
"view": rule["view"],
"read": rule["read"],
"create": rule["create"],
"update": rule["update"],
"delete": rule["delete"]
})
result["added"] += 1
else:
result["unchanged"] += 1
# BULK INSERT: Alle neuen Rollen
if newRoles:
await tx.bulkInsert("Role", newRoles)
# BULK INSERT: Alle neuen AccessRules
if newAccessRules:
await tx.bulkInsert("AccessRule", newAccessRules)
# Überzählige Rollen entfernen (optional)
if not addOnly:
for instanceRole in instanceRoles:
if instanceRole.roleLabel not in templateLabels:
# Prüfen ob Rolle noch verwendet wird
usages = await tx.getRecordset(
FeatureAccessRole,
recordFilter={"roleId": instanceRole.id}
)
if not usages:
await tx.delete(Role, instanceRole.id)
result["removed"] += 1
await tx.commit()
logger.info(f"Synced roles for instance {featureInstanceId}: {result}")
return result
# Endpoint für Mandate-Admin
@router.post("/api/features/instances/{instanceId}/sync-roles")
@limiter.limit("10/minute")
async def syncInstanceRoles(
instanceId: str,
addOnly: bool = Query(True, description="Only add missing roles, don't remove extras"),
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
):
"""Synchronisiert Rollen einer Feature-Instanz mit Templates."""
# Nur Mandate-Admin oder Feature-Admin darf synchronisieren
if not hasAdminRole(ctx, instanceId):
raise HTTPException(403, "Admin role required")
result = await syncRolesFromTemplate(instanceId, addOnly)
return result
3.6 RBAC - Stateless Design (Kein Cache)
Prinzip: Gateway ist vollständig stateless - wie JWT Session-Handling.
Warum kein Cache:
- Gateway läuft mit Load Balancing (mehrere Cloud-Instanzen)
- Horizontale Skalierung erfolgt über Cloud-Infrastruktur, nicht im Code
- Keine zusätzliche Komponente (Redis) nötig
- PostgreSQL ist die einzige Source of Truth
Performance durch optimierte Queries:
- Bulk-Query mit JOIN (siehe 3.7) statt N+1 Queries
- Richtige Indizes auf Foreign Keys
- Query-Zeit typisch <10ms
- Datenbank-Skalierung ist Infrastruktur-Aufgabe, nicht Code-Aufgabe (Read Replicas, Connection Pooling, etc.)
# KEIN globaler Cache!
# Jeder Request lädt RBAC-Regeln frisch aus der Datenbank.
# Das ist by design: Stateless = Cloud-Ready
# Optional: Request-Scoped Cache (nur innerhalb EINES Requests)
class RequestContext:
"""Kontext für einen einzelnen Request - wird nicht persistiert."""
def __init__(self):
self.user: Optional[User] = None
self.mandateId: Optional[str] = None
self.featureInstanceId: Optional[str] = None
self.roleIds: List[str] = []
# Request-Scoped: Regeln nur einmal pro Request laden
self._cachedRules: Optional[List[AccessRule]] = None
def getRules(self) -> List[AccessRule]:
"""Lädt Regeln einmal pro Request (nicht über Requests hinweg)."""
if self._cachedRules is None:
self._cachedRules = getRulesForUserBulk(
self.user.id,
self.mandateId,
self.featureInstanceId
)
return self._cachedRules
Vorteile des Stateless-Designs:
- ✅ Keine Cache-Invalidierung nötig
- ✅ Keine Inkonsistenzen zwischen Instanzen
- ✅ Keine zusätzliche Infrastruktur (Redis)
- ✅ Einfaches Deployment (nur Gateway + PostgreSQL)
- ✅ RBAC-Änderungen sind sofort wirksam
Read Replica Handling: Bei Verwendung von Read Replicas kann es zu Replication Lag kommen. Kritische RBAC-Reads (nach Berechtigungsänderungen) müssen vom Primary erfolgen:
# RBAC-Queries immer vom Primary lesen
# Verhindert stale reads nach Berechtigungsänderungen
def getRulesForUserBulk(userId: str, mandateId: str,
featureInstanceId: Optional[str] = None,
usePrimary: bool = True) -> List[Tuple[int, AccessRule]]:
"""
Lädt alle relevanten Regeln für einen User.
Args:
usePrimary: Wenn True, wird Primary-DB verwendet (verhindert Replication Lag).
Default True für sicherheitskritische RBAC-Queries.
"""
db = getPrimaryConnection() if usePrimary else getReplicaConnection()
# ... rest of implementation
Wichtig: DB-Indizes für Performance
-- ============================================
-- CORE RBAC INDIZES
-- ============================================
-- UserMandate: Lookup by User (alle Mandanten eines Users)
CREATE INDEX idx_usermandate_user ON "UserMandate"("userId");
-- UserMandate: Lookup by User+Mandate (Mitgliedschaft prüfen)
CREATE INDEX idx_usermandate_user_mandate ON "UserMandate"("userId", "mandateId");
-- UserMandate: Lookup by Mandate (alle User eines Mandanten)
CREATE INDEX idx_usermandate_mandate ON "UserMandate"("mandateId");
-- UserMandate: Partial Index für aktive Memberships (häufigster Query-Fall)
CREATE INDEX idx_usermandate_user_enabled ON "UserMandate"("userId") WHERE "enabled" = true;
-- UserMandateRole: Lookup by UserMandate (Rollen laden)
CREATE INDEX idx_usermandaterole_usermandate ON "UserMandateRole"("userMandateId");
-- UserMandateRole: Lookup by Role (wer hat diese Rolle?)
CREATE INDEX idx_usermandaterole_role ON "UserMandateRole"("roleId");
-- FeatureAccess: Lookup by User+Instance (Zugang prüfen)
CREATE INDEX idx_featureaccess_user_instance ON "FeatureAccess"("userId", "featureInstanceId");
-- FeatureAccess: Lookup by User (alle Instanzen eines Users)
CREATE INDEX idx_featureaccess_user ON "FeatureAccess"("userId");
-- FeatureAccess: Lookup by Instance (alle User einer Instanz)
CREATE INDEX idx_featureaccess_instance ON "FeatureAccess"("featureInstanceId");
-- FeatureAccessRole: Lookup by FeatureAccess (Rollen laden)
CREATE INDEX idx_featureaccessrole_featureaccess ON "FeatureAccessRole"("featureAccessId");
-- FeatureAccessRole: Lookup by Role (wer hat diese Rolle?)
CREATE INDEX idx_featureaccessrole_role ON "FeatureAccessRole"("roleId");
-- ============================================
-- ACCESSRULE INDIZES
-- ============================================
-- AccessRule: Bulk-Load by Role (Hauptquery)
CREATE INDEX idx_accessrule_roleid ON "AccessRule"("roleId");
-- AccessRule: Lookup by Context+Role (gefilterte Queries)
CREATE INDEX idx_accessrule_context_roleid ON "AccessRule"("context", "roleId");
-- ============================================
-- ROLE INDIZES
-- ============================================
-- Role: Lookup by Mandate+Instance (Instanz-Rollen)
CREATE INDEX idx_role_mandate_instance ON "Role"("mandateId", "featureInstanceId");
-- Role: Lookup by FeatureCode (Template-Rollen finden)
CREATE INDEX idx_role_featurecode ON "Role"("featureCode") WHERE "mandateId" IS NULL;
-- Role: Lookup by Label (Rolle nach Name finden)
CREATE INDEX idx_role_label ON "Role"("roleLabel");
-- ============================================
-- FEATUREINSTANCE INDIZES
-- ============================================
-- FeatureInstance: Lookup by Mandate (alle Instanzen eines Mandanten)
CREATE INDEX idx_featureinstance_mandate ON "FeatureInstance"("mandateId");
-- FeatureInstance: Lookup by Mandate+FeatureCode (gefiltert)
CREATE INDEX idx_featureinstance_mandate_code ON "FeatureInstance"("mandateId", "featureCode");
def onRbacChange(mandateId: str, featureInstanceId: Optional[str] = None): """ Wird aufgerufen wenn RBAC-Regeln geändert werden.
Bei Stateless-Design: Keine Aktion nötig!
Änderungen sind sofort wirksam, da jeder Request frisch aus DB lädt.
Diese Funktion existiert als Hook für:
- Audit-Logging von RBAC-Änderungen
- Zukünftige Erweiterungen (z.B. Notifications)
"""
from modules.shared.auditLogger import audit_logger
audit_logger.logSecurityEvent(
userId="system",
mandateId=mandateId,
action="rbac_change",
details=f"featureInstanceId={featureInstanceId}" if featureInstanceId else "mandate-level"
)
### 3.7 Optimierte Regel-Auflösung (Bulk Query)
**Problem:** N+1 Queries bei vielen Rollen.
**Lösung:** Bulk-Query mit JOIN.
```python
def getRulesForUserBulk(userId: str, mandateId: str,
featureInstanceId: Optional[str] = None) -> List[Tuple[int, AccessRule]]:
"""
Lädt alle relevanten Regeln für einen User in EINEM Query.
Stateless: Kein Cache, direkt aus DB.
Returns:
Liste von (priority, AccessRule) Tupeln
"""
# 1. Alle roleIds für User sammeln via Junction Tables
roleIds = set()
# Mandant-Rollen via UserMandate → UserMandateRole
query = """
SELECT umr."roleId"
FROM "UserMandate" um
JOIN "UserMandateRole" umr ON umr."userMandateId" = um.id
WHERE um."userId" = :userId AND um."mandateId" = :mandateId AND um."enabled" = true
"""
mandateRoles = db.execute(query, {"userId": userId, "mandateId": mandateId})
roleIds.update(r["roleId"] for r in mandateRoles)
# Instanz-Rollen via FeatureAccess → FeatureAccessRole
if featureInstanceId:
query = """
SELECT far."roleId"
FROM "FeatureAccess" fa
JOIN "FeatureAccessRole" far ON far."featureAccessId" = fa.id
WHERE fa."userId" = :userId AND fa."featureInstanceId" = :instanceId AND fa."enabled" = true
"""
instanceRoles = db.execute(query, {"userId": userId, "instanceId": featureInstanceId})
roleIds.update(r["roleId"] for r in instanceRoles)
if not roleIds:
return []
# 2. BULK Query: Alle Regeln für alle Rollen + zugehörige Role-Daten
# SINGLE Query mit JOIN statt N+1
query = """
SELECT ar.*, r."mandateId" as "roleMandateId",
r."featureInstanceId" as "roleInstanceId"
FROM "AccessRule" ar
JOIN "Role" r ON ar."roleId" = r.id
WHERE ar."roleId" = ANY(:roleIds)
"""
allRulesWithContext = db.execute(query, {"roleIds": list(roleIds)})
# 3. Priorität zuweisen basierend auf Role-Scope
rulesWithPriority = []
for rule in allRulesWithContext:
if rule["roleInstanceId"]:
priority = 3 # Instance-Rolle = höchste Priorität
elif rule["roleMandateId"]:
priority = 2 # Mandate-Rolle
else:
priority = 1 # Global-Rolle = niedrigste Priorität
rulesWithPriority.append((priority, AccessRule(**rule)))
return rulesWithPriority
4. System-Funktionen (Ohne Mandant)
4.1 Immer verfügbare Endpoints
Diese Funktionen benötigen keinen Mandanten:
# Keine RBAC-Prüfung auf Mandant/Feature-Ebene
@router.get("/auth/me") # Eigener User
@router.put("/auth/profile") # Profil bearbeiten
@router.put("/auth/settings") # Einstellungen ändern
@router.post("/auth/logout") # Abmelden
@router.get("/mandates/my") # Meine Mandanten-Liste
@router.get("/features/my") # Meine Feature-Instanzen
4.2 Implementation
def getCurrentUser(token) -> User:
"""Validiert Token, lädt User. Kein Mandant-Kontext."""
userId = validateToken(token)
return db.getUser(userId)
# System-Endpoints prüfen nur: User eingeloggt
@router.get("/auth/me")
async def getMe(currentUser: User = Depends(getCurrentUser)):
return currentUser
5. Admin-Funktionen (SysAdmin)
5.1 Design-Entscheidung: Strikte Trennung System vs. Daten
Einfaches Prinzip: isSysAdmin=true bedeutet System-Zugriff, KEIN Daten-Zugriff.
class User(BaseModel):
isSysAdmin: bool = False # Globales Admin-Flag
# isSysAdmin=true → Kann System verwalten, aber KEINE Mandant-Daten sehen
Warum diese Trennung?
- 🔒 Einfach & sicher: Kein komplexes 4-Augen-System nötig
- 📊 Klare Verantwortung: SysAdmin = Infrastruktur, User = Daten
- 🏦 Bank-konform: Datenzugriff nur über explizite Rollen, nie über Admin-Privileg
- 🔍 Auditierbar: Wenn SysAdmin Daten braucht → normaler User mit Audit-Trail
5.2 Was SysAdmin KANN (System-Ebene)
def requireSysAdmin(currentUser: User = Depends(getCurrentUser)):
"""SysAdmin-Check für System-Level Operationen."""
if not currentUser.isSysAdmin:
raise HTTPException(403, "SysAdmin required")
# Audit für alle SysAdmin-Aktionen
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId="system",
action="sysadmin_action",
details="System-level operation"
)
return currentUser
# ✅ SysAdmin KANN:
@router.get("/admin/mandates")
async def listAllMandates(admin: User = Depends(requireSysAdmin)):
"""Liste aller Mandanten (nur Metadaten: id, name, enabled)."""
return db.getAllMandates(fields=["id", "name", "enabled", "createdAt"])
@router.post("/admin/mandates")
async def createMandate(data: MandateCreate, admin: User = Depends(requireSysAdmin)):
"""Neuen Mandanten erstellen."""
return db.createMandate(data)
@router.delete("/admin/mandates/{mandateId}")
async def deleteMandate(mandateId: str, admin: User = Depends(requireSysAdmin)):
"""Mandant löschen (Struktur, nicht Daten-Einblick)."""
# Prüft nur ob Mandant existiert, sieht keine Inhalte
return db.deleteMandate(mandateId)
@router.get("/admin/users")
async def listAllUsers(admin: User = Depends(requireSysAdmin)):
"""Liste aller User (nur Metadaten: id, username, email, enabled)."""
return db.getAllUsers(fields=["id", "username", "email", "enabled", "isSysAdmin"])
@router.post("/admin/users/{userId}/enable")
@router.post("/admin/users/{userId}/disable")
async def toggleUserEnabled(userId: str, admin: User = Depends(requireSysAdmin)):
"""User aktivieren/deaktivieren."""
@router.get("/admin/rbac/global/export")
async def exportGlobalRbac(admin: User = Depends(requireSysAdmin)):
"""Exportiert globale (Template) RBAC-Regeln."""
return exportRbac(scope="global")
@router.post("/admin/rbac/global/import")
async def importGlobalRbac(data: RbacImport, admin: User = Depends(requireSysAdmin)):
"""Importiert globale RBAC-Regeln."""
5.3 Was SysAdmin NICHT KANN (Daten-Ebene)
# ❌ SysAdmin kann NICHT:
# - Mandant-Daten lesen (TrusteeContracts, ChatWorkflows, etc.)
# - User-Rollen innerhalb eines Mandanten sehen
# - Feature-Instanz-Inhalte einsehen
# - RBAC-Regeln eines spezifischen Mandanten exportieren
# Enforcement in RBAC-Check:
def checkPermission(ctx: RequestContext, context: str, item: str, action: str) -> AccessLevel:
"""RBAC-Prüfung mit SysAdmin-Blockade für Mandant-Daten."""
# KRITISCH: SysAdmin hat KEINEN Zugriff auf Mandant-Daten!
if ctx.user.isSysAdmin and ctx.mandateId:
# SysAdmin versucht auf Mandant-Daten zuzugreifen
# Prüfe ob er auch reguläre Rollen im Mandanten hat
if not ctx.roleIds:
# Keine Rollen im Mandanten → kein Zugriff
logger.warning(
f"SysAdmin {ctx.user.id} blocked from mandate {ctx.mandateId} data"
)
return AccessLevel.NONE
# Normale RBAC-Auflösung für alle (inkl. SysAdmin mit Rollen)
rules = getRulesForUserBulk(ctx.user.id, ctx.mandateId, ctx.featureInstanceId)
# ... Rest der Permission-Logik
5.4 SysAdmin braucht Datenzugriff? → Muss Mandate-Admin werden
Design-Entscheidung: SysAdmin kann sich NICHT selbst zu einem Mandanten hinzufügen.
Wenn ein SysAdmin auf Mandant-Daten zugreifen muss:
- Ein Mandate-Admin muss den SysAdmin als regulären User zum Mandanten hinzufügen
- Der SysAdmin unterliegt dann der normalen RBAC-Prüfung
- Alle Aktionen werden vollständig auditiert
Begründung:
- 🔒 Keine Self-Service-Eskalation: SysAdmin kann sich keine Rechte selbst geben
- 📊 4-Augen-Prinzip: Mandate-Admin muss Zugriff explizit gewähren
- 🔍 Klarer Audit-Trail: Wer hat wem wann Zugriff gegeben
5.5 User zu Mandant hinzufügen (createUserMandate Endpoint)
class UserMandateCreate(BaseModel):
"""Request-Model für User-Mandant-Verknüpfung"""
targetUserId: str
roleIds: List[str] # Rollen die dem User im Mandant zugewiesen werden
@router.post("/api/mandates/{mandateId}/users")
@limiter.limit("30/minute")
async def addUserToMandate(
mandateId: str,
data: UserMandateCreate,
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Fügt einen User zu einem Mandanten hinzu.
Nur Mandate-Admin darf User hinzufügen.
SysAdmin kann sich NICHT selbst hinzufügen (Self-Eskalation Prevention).
"""
# 1. SysAdmin Self-Eskalation Prevention
if currentUser.isSysAdmin and data.targetUserId == currentUser.id:
raise HTTPException(
403,
"SysAdmin cannot add themselves to a mandate. "
"A Mandate-Admin must grant access."
)
# 2. Mandate-Admin Berechtigung prüfen
if not hasMandateAdminRole(ctx, mandateId):
raise HTTPException(403, "Mandate-Admin role required to add users")
# 3. Prüfen ob User existiert
targetUser = db.getUser(data.targetUserId)
if not targetUser:
raise HTTPException(404, f"User {data.targetUserId} not found")
# 4. Prüfen ob User bereits Mitglied ist
existingMembership = db.getUserMandate(data.targetUserId, mandateId)
if existingMembership:
raise HTTPException(409, f"User {data.targetUserId} is already member of mandate")
# 5. Rollen validieren (müssen im Mandant existieren)
for roleId in data.roleIds:
role = db.getRole(roleId)
if not role:
raise HTTPException(404, f"Role {roleId} not found")
if role.mandateId and role.mandateId != mandateId:
raise HTTPException(400, f"Role {roleId} belongs to different mandate")
# 6. UserMandate erstellen
async with db.transaction() as tx:
userMandate = UserMandate(
userId=data.targetUserId,
mandateId=mandateId,
enabled=True
)
await tx.create(userMandate)
# 7. Rollen zuweisen via Junction Table
for roleId in data.roleIds:
userMandateRole = UserMandateRole(
userMandateId=userMandate.id,
roleId=roleId
)
await tx.create(userMandateRole)
await tx.commit()
# 8. Audit
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId=mandateId,
action="user_added_to_mandate",
details=f"targetUser={data.targetUserId}, roles={data.roleIds}"
)
return {"message": "User added to mandate", "userMandateId": userMandate.id}
@router.delete("/api/mandates/{mandateId}/users/{targetUserId}")
@limiter.limit("30/minute")
async def removeUserFromMandate(
mandateId: str,
targetUserId: str,
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
):
"""Entfernt einen User aus einem Mandanten."""
# Mandate-Admin Berechtigung prüfen
if not hasMandateAdminRole(ctx, mandateId):
raise HTTPException(403, "Mandate-Admin role required")
# User darf sich nicht selbst entfernen wenn er letzter Admin ist
# (sonst ist Mandant ohne Admin)
membership = db.getUserMandate(targetUserId, mandateId)
if not membership:
raise HTTPException(404, "User is not member of mandate")
# UserMandate löschen (CASCADE löscht UserMandateRoles)
db.delete(UserMandate, membership.id)
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId=mandateId,
action="user_removed_from_mandate",
details=f"targetUser={targetUserId}"
)
return {"message": "User removed from mandate"}
5.6 Übersicht: SysAdmin vs. Mandate-Admin
| Aktion | SysAdmin (isSysAdmin=true) |
Mandate-Admin (Rolle) |
|---|---|---|
| Mandanten erstellen | ✅ | ❌ |
| Mandanten löschen | ✅ | ❌ |
| User global verwalten | ✅ (enable/disable) | ❌ |
| Globale RBAC-Templates | ✅ | ❌ |
| Mandant-Daten lesen | ❌ | ✅ |
| User zu Mandant hinzufügen | ❌ | ✅ |
| RBAC im Mandant ändern | ❌ | ✅ |
| Feature-Instanzen verwalten | ❌ | ✅ |
6. Rate Limiting (Bestehende Implementierung erweitern)
6.1 Bestehende Rate Limits (routeSecurityLocal.py)
# Bereits implementiert im Gateway:
@router.post("/login")
@limiter.limit("30/minute") # ✅
@router.post("/register")
@limiter.limit("10/minute") # ✅
@router.post("/password-reset-request")
@limiter.limit("5/minute") # ✅
@router.post("/password-reset")
@limiter.limit("10/minute") # ✅
@router.get("/me")
@limiter.limit("30/minute") # ✅
@router.post("/refresh")
@limiter.limit("60/minute") # ✅
@router.post("/logout")
@limiter.limit("30/minute") # ✅
6.2 Zusätzliche Rate Limits für Multi-Tenant
# Neue Limits für Admin-Endpoints
@router.get("/admin/mandates")
@limiter.limit("60/minute")
@router.post("/admin/access-request")
@limiter.limit("5/minute") # Streng für kritische Aktionen
@router.post("/admin/access-request/{requestId}/approve")
@limiter.limit("10/minute")
# RBAC-Endpoints
@router.get("/rbac/rules")
@limiter.limit("120/minute") # Häufiger für UI
@router.post("/rbac/rules")
@limiter.limit("30/minute")
7. Audit Logging (Bestehende Implementierung erweitern)
7.1 Bestehende AuditLogger-Nutzung
# Bereits implementiert in routeSecurityLocal.py:
# Login erfolgreich
audit_logger.logUserAccess(
userId=str(user.id),
mandateId=str(user.mandateId),
action="login",
successInfo="local_auth_success"
)
# Login fehlgeschlagen
audit_logger.logUserAccess(
userId="unknown",
mandateId="unknown",
action="login",
successInfo=f"failed: {error_msg}"
)
# Logout
audit_logger.logUserAccess(
userId=str(currentUser.id),
mandateId=str(currentUser.mandateId),
action="logout",
successInfo=f"revoked_tokens: {revoked}"
)
# Password Reset
audit_logger.logSecurityEvent(
userId="unknown",
mandateId="unknown",
action="password_reset_via_token",
details="Password reset completed via magic link"
)
7.2 Erweiterte Audit-Events für Multi-Tenant
# Neue Audit-Events hinzufügen:
# Mandant-Wechsel
audit_logger.logUserAccess(
userId=userId,
mandateId=newMandateId,
action="mandate_switch",
successInfo=f"from:{oldMandateId}"
)
# Feature-Zugriff
audit_logger.logUserAccess(
userId=userId,
mandateId=mandateId,
action="feature_access",
successInfo=f"instance:{featureInstanceId}"
)
# RBAC-Änderungen
audit_logger.logSecurityEvent(
userId=userId,
mandateId=mandateId,
action="rbac_rule_created",
details=f"role:{roleId}, item:{item}"
)
# SysAdmin-Aktionen
audit_logger.logSecurityEvent(
userId=sysAdminId,
mandateId=targetMandateId,
action="sysadmin_action",
details=f"Action: {action_type}"
)
7.3 Audit-Log Retention Policy
Aufbewahrungsfristen für Audit-Logs:
| Log-Typ | Aufbewahrung | Begründung |
|---|---|---|
| Security Events | 24 Monate | Login/Logout, RBAC-Änderungen, Admin-Aktionen |
| Data Access | 12 Monate | DSGVO-Auskunft, Datenexporte |
| User Activity | 6 Monate | Feature-Zugriffe, Mandant-Wechsel |
| System Events | 3 Monate | Technische Logs, Performance |
Implementation:
# Konfiguration in APP_CONFIG
AUDIT_RETENTION_SECURITY_MONTHS = 24
AUDIT_RETENTION_DATA_ACCESS_MONTHS = 12
AUDIT_RETENTION_USER_ACTIVITY_MONTHS = 6
AUDIT_RETENTION_SYSTEM_MONTHS = 3
# Cleanup-Job wird über den bestehenden EventManager (APScheduler) registriert
# Siehe: modules/shared/eventManagement.py
async def cleanupExpiredAuditLogs(batchSize: int = 1000):
"""
Löscht Audit-Logs nach Ablauf der Aufbewahrungsfrist.
Löscht in Batches um Lock-Contention zu vermeiden.
"""
now = getUtcTimestamp()
retentionPolicies = {
"security": AUDIT_RETENTION_SECURITY_MONTHS * 30 * 24 * 3600,
"data_access": AUDIT_RETENTION_DATA_ACCESS_MONTHS * 30 * 24 * 3600,
"user_activity": AUDIT_RETENTION_USER_ACTIVITY_MONTHS * 30 * 24 * 3600,
"system": AUDIT_RETENTION_SYSTEM_MONTHS * 30 * 24 * 3600,
}
for logType, retentionSeconds in retentionPolicies.items():
cutoffTime = now - retentionSeconds
totalDeleted = 0
while True:
async with db.transaction() as tx:
# Batch-Delete um Lock-Contention zu vermeiden
deleted = await tx.execute(
'''DELETE FROM "AuditLog" WHERE id IN (
SELECT id FROM "AuditLog"
WHERE "logType" = :logType AND "timestamp" < :cutoff
LIMIT :batch
)''',
{"logType": logType, "cutoff": cutoffTime, "batch": batchSize}
)
await tx.commit()
if deleted == 0:
break
totalDeleted += deleted
if totalDeleted > 0:
logger.info(f"Deleted {totalDeleted} expired {logType} audit logs")
# Registrierung im featuresLifecycle.py (analog zu bestehenden scheduled Jobs):
# eventManager.registerCron(
# jobId="audit_cleanup",
# func=cleanupExpiredAuditLogs,
# cronKwargs={"hour": "3", "minute": "0"} # Täglich um 03:00
# )
8. RBAC Export/Import (Mandantenspezifisch)
8.1 Übersicht: Wer kann was exportieren?
| Scope | Wer kann exportieren? | Wer kann importieren? |
|---|---|---|
| Global (Templates) | SysAdmin | SysAdmin |
| Mandant | Mandate-Admin | Mandate-Admin |
| Feature-Instanz | Feature-Admin | Feature-Admin |
8.2 Export-Format (JSON)
{
"version": "2.0",
"exportedAt": "2026-01-16T10:00:00Z",
"exportedBy": "admin@example.com",
"scope": {
"type": "mandate",
"mandateId": "uuid-456",
"mandateName": "Althaus Consulting",
"featureInstanceId": null
},
"roles": [
{
"roleLabel": "trustee-admin",
"description": {"en": "Trustee Administrator", "de": "Treuhand-Administrator"},
"featureCode": "trustee"
}
],
"accessRules": [
{
"roleLabel": "trustee-admin",
"context": "DATA",
"item": "TrusteeContract",
"view": true,
"read": "g",
"create": "g",
"update": "g",
"delete": "m"
},
{
"roleLabel": "trustee-admin",
"context": "UI",
"item": "nav.trustee.contracts",
"view": true
}
]
}
8.3 Export-Endpoints
# ============================================
# GLOBAL EXPORT (SysAdmin only)
# ============================================
@router.get("/admin/rbac/global/export")
@limiter.limit("10/minute")
async def exportGlobalRbac(
featureCode: Optional[str] = Query(None, description="Filter by feature"),
admin: User = Depends(requireSysAdmin)
) -> RbacExport:
"""
Exportiert globale (Template) RBAC-Regeln.
Nur SysAdmin.
"""
# Globale Rollen (mandateId = None)
roles = db.getRoles(mandateId=None, featureCode=featureCode)
roleIds = [r.id for r in roles]
# AccessRules für diese Rollen
rules = db.getAccessRulesByRoleIds(roleIds)
return RbacExport(
scope={"type": "global", "mandateId": None, "featureCode": featureCode},
roles=[r.toExportFormat() for r in roles],
accessRules=[r.toExportFormat() for r in rules]
)
# ============================================
# MANDATE EXPORT (Mandate-Admin)
# ============================================
@router.get("/api/mandates/{mandateId}/rbac/export")
@limiter.limit("10/minute")
async def exportMandateRbac(
mandateId: str,
featureCode: Optional[str] = Query(None),
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
) -> RbacExport:
"""
Exportiert RBAC-Regeln eines Mandanten.
Nur Mandate-Admin.
"""
if not hasMandateAdminRole(ctx, mandateId):
raise HTTPException(403, "Mandate-Admin role required")
mandate = db.getMandate(mandateId)
if not mandate:
raise HTTPException(404, "Mandate not found")
# Mandant-spezifische Rollen
roles = db.getRoles(mandateId=mandateId, featureCode=featureCode)
roleIds = [r.id for r in roles]
rules = db.getAccessRulesByRoleIds(roleIds)
audit_logger.logDataAccess(
userId=str(currentUser.id),
mandateId=mandateId,
action="rbac_export",
details=f"featureCode={featureCode or 'all'}"
)
return RbacExport(
scope={
"type": "mandate",
"mandateId": mandateId,
"mandateName": mandate.name,
"featureCode": featureCode
},
roles=[r.toExportFormat() for r in roles],
accessRules=[r.toExportFormat() for r in rules]
)
# ============================================
# FEATURE INSTANCE EXPORT (Feature-Admin)
# ============================================
@router.get("/api/features/instances/{instanceId}/rbac/export")
@limiter.limit("10/minute")
async def exportInstanceRbac(
instanceId: str,
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
) -> RbacExport:
"""
Exportiert RBAC-Regeln einer Feature-Instanz.
Nur Feature-Admin.
"""
if not hasAdminRoleForInstance(ctx, instanceId):
raise HTTPException(403, "Feature-Admin role required")
instance = db.getFeatureInstance(instanceId)
if not instance:
raise HTTPException(404, "Feature instance not found")
# Instanz-spezifische Rollen
roles = db.getRoles(featureInstanceId=instanceId)
roleIds = [r.id for r in roles]
rules = db.getAccessRulesByRoleIds(roleIds)
return RbacExport(
scope={
"type": "instance",
"featureInstanceId": instanceId,
"mandateId": instance.mandateId,
"featureCode": instance.featureCode,
"instanceLabel": instance.label
},
roles=[r.toExportFormat() for r in roles],
accessRules=[r.toExportFormat() for r in rules]
)
8.4 Import-Endpoints
class RbacImportMode(str, Enum):
MERGE = "merge" # Upsert: Bestehende aktualisieren, fehlende hinzufügen
REPLACE = "replace" # Alle bestehenden im Scope löschen, dann importieren
ADD_ONLY = "add" # Nur fehlende hinzufügen, bestehende nicht ändern
@router.post("/api/mandates/{mandateId}/rbac/import")
@limiter.limit("5/minute")
async def importMandateRbac(
mandateId: str,
data: RbacExport,
mode: RbacImportMode = Query(RbacImportMode.MERGE),
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
) -> Dict:
"""
Importiert RBAC-Regeln in einen Mandanten.
Nur Mandate-Admin.
Modi:
- merge: Bestehende aktualisieren, fehlende hinzufügen
- replace: Alle bestehenden Regeln löschen, dann importieren
- add: Nur fehlende hinzufügen
"""
if not hasMandateAdminRole(ctx, mandateId):
raise HTTPException(403, "Mandate-Admin role required")
result = {"rolesCreated": 0, "rolesUpdated": 0, "rulesCreated": 0, "rulesUpdated": 0}
async with db.transaction() as tx:
# Bei REPLACE: Bestehende löschen
if mode == RbacImportMode.REPLACE:
existingRoles = await tx.getRoles(mandateId=mandateId)
for role in existingRoles:
if not role.isSystemRole: # System-Rollen nicht löschen
await tx.delete(Role, role.id)
# Rollen importieren
roleIdMapping = {} # exportedLabel -> newRoleId
for roleData in data.roles:
# Prüfen ob Rolle existiert
existing = await tx.getRole(
mandateId=mandateId,
roleLabel=roleData["roleLabel"],
featureCode=roleData.get("featureCode")
)
if existing:
if mode == RbacImportMode.ADD_ONLY:
roleIdMapping[roleData["roleLabel"]] = existing.id
continue
# Update description (context-Felder sind IMMUTABLE!)
await tx.update(Role, existing.id, {
"description": roleData.get("description", existing.description)
})
roleIdMapping[roleData["roleLabel"]] = existing.id
result["rolesUpdated"] += 1
else:
# Neue Rolle erstellen
newRole = Role(
roleLabel=roleData["roleLabel"],
description=roleData.get("description", {}),
featureCode=roleData.get("featureCode"),
mandateId=mandateId, # IMMUTABLE - wird beim Create gesetzt
featureInstanceId=None,
isSystemRole=False
)
await tx.create(newRole)
roleIdMapping[roleData["roleLabel"]] = newRole.id
result["rolesCreated"] += 1
# AccessRules importieren
for ruleData in data.accessRules:
roleId = roleIdMapping.get(ruleData["roleLabel"])
if not roleId:
continue # Rolle nicht gefunden/importiert
# Prüfen ob Regel existiert (roleId + context + item)
existing = await tx.getAccessRule(
roleId=roleId,
context=ruleData["context"],
item=ruleData.get("item")
)
if existing:
if mode == RbacImportMode.ADD_ONLY:
continue
# Update permissions (context und roleId sind IMMUTABLE!)
await tx.update(AccessRule, existing.id, {
"view": ruleData.get("view", False),
"read": ruleData.get("read"),
"create": ruleData.get("create"),
"update": ruleData.get("update"),
"delete": ruleData.get("delete")
})
result["rulesUpdated"] += 1
else:
# Neue Regel erstellen
newRule = AccessRule(
roleId=roleId, # IMMUTABLE
context=ruleData["context"], # IMMUTABLE
item=ruleData.get("item"),
view=ruleData.get("view", False),
read=ruleData.get("read"),
create=ruleData.get("create"),
update=ruleData.get("update"),
delete=ruleData.get("delete")
)
await tx.create(newRule)
result["rulesCreated"] += 1
await tx.commit()
# Audit
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId=mandateId,
action="rbac_import",
details=f"mode={mode.value}, result={result}"
)
return {"status": "success", "mode": mode.value, **result}
8.5 Export/Import für Mandantenwechsel
Use Case: Mandant "Althaus" möchte gleiche RBAC-Struktur wie Mandant "Soha Treuhand".
@router.post("/admin/rbac/copy")
@limiter.limit("5/minute")
async def copyRbacBetweenMandates(
sourceId: str = Query(..., description="Source mandate ID"),
targetId: str = Query(..., description="Target mandate ID"),
featureCode: Optional[str] = Query(None),
admin: User = Depends(requireSysAdmin)
) -> Dict:
"""
Kopiert RBAC-Struktur von einem Mandanten zu einem anderen.
Nur SysAdmin (da mandantenübergreifend).
"""
# 1. Export von Source
exportData = await exportMandateRbacInternal(sourceId, featureCode)
# 2. Import zu Target (mit MERGE-Modus)
result = await importMandateRbacInternal(targetId, exportData, RbacImportMode.MERGE)
audit_logger.logSecurityEvent(
userId=str(admin.id),
mandateId=targetId,
action="rbac_copy",
details=f"from={sourceId}, featureCode={featureCode or 'all'}"
)
return {"status": "success", "source": sourceId, "target": targetId, **result}
9. Invitation-Flow (Self-Service Onboarding)
9.1 Invitation Model
class Invitation(BaseModel):
"""Einladungs-Token für neue User"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
token: str = Field(default_factory=lambda: secrets.token_urlsafe(32))
# Ziel der Einladung
mandateId: str # FK → Mandate
featureInstanceId: Optional[str] # Optional: Direkt zu Feature-Instanz
roleIds: List[str] # Rollen die zugewiesen werden
# Einladungs-Details
email: Optional[str] # Für wen (optional, für Tracking)
createdBy: str # User-ID des Einladenden
createdAt: float # Timestamp
expiresAt: float # Ablaufzeit
# Status
usedBy: Optional[str] # User-ID der Person, die eingeladen wurde
usedAt: Optional[float] # Wann eingelöst
revokedAt: Optional[float] # Wann widerrufen (falls)
# Einschränkungen
maxUses: int = 1 # Wie oft kann Token verwendet werden
currentUses: int = 0
class InvitationCreate(BaseModel):
"""Request-Model für Einladungserstellung"""
email: Optional[str] = None
featureInstanceId: Optional[str] = None
roleIds: List[str]
expiresInHours: int = Field(default=24, ge=1, le=168) # 1h - 7 Tage
maxUses: int = Field(default=1, ge=1, le=100)
9.2 Einladung erstellen (Mandate-Admin)
@router.post("/api/mandates/{mandateId}/invitations")
@limiter.limit("30/minute")
async def createInvitation(
mandateId: str,
data: InvitationCreate,
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
) -> InvitationResponse:
"""
Erstellt einen Einladungslink für einen Mandanten.
Nur Mandate-Admin.
"""
if not hasMandateAdminRole(ctx, mandateId):
raise HTTPException(403, "Mandate-Admin role required")
# Rollen validieren
for roleId in data.roleIds:
role = db.getRole(roleId)
if not role:
raise HTTPException(404, f"Role {roleId} not found")
if role.mandateId and role.mandateId != mandateId:
raise HTTPException(400, f"Role {roleId} belongs to different mandate")
# Feature-Instanz validieren (falls angegeben)
if data.featureInstanceId:
instance = db.getFeatureInstance(data.featureInstanceId)
if not instance or instance.mandateId != mandateId:
raise HTTPException(400, "Feature instance not in this mandate")
# Einladung erstellen
invitation = Invitation(
mandateId=mandateId,
featureInstanceId=data.featureInstanceId,
roleIds=data.roleIds,
email=data.email,
createdBy=currentUser.id,
createdAt=getUtcTimestamp(),
expiresAt=getUtcTimestamp() + (data.expiresInHours * 3600),
maxUses=data.maxUses
)
db.create(invitation)
# Einladungs-URL generieren
mandate = db.getMandate(mandateId)
inviteUrl = f"{APP_CONFIG.BASE_URL}/invite/{invitation.token}"
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId=mandateId,
action="invitation_created",
details=f"email={data.email or 'any'}, expires={invitation.expiresAt}"
)
return InvitationResponse(
id=invitation.id,
token=invitation.token,
inviteUrl=inviteUrl,
mandateName=mandate.name,
expiresAt=invitation.expiresAt,
maxUses=invitation.maxUses
)
9.3 Einladung einlösen
@router.get("/api/invitations/{token}")
async def getInvitationDetails(token: str) -> InvitationPublicInfo:
"""
Öffentlicher Endpoint: Zeigt Einladungs-Details (ohne sensible Daten).
Für Landing-Page vor Registration/Login.
"""
invitation = db.getInvitationByToken(token)
if not invitation:
raise HTTPException(404, "Invitation not found or expired")
if invitation.expiresAt < getUtcTimestamp():
raise HTTPException(410, "Invitation has expired")
if invitation.revokedAt:
raise HTTPException(410, "Invitation has been revoked")
if invitation.currentUses >= invitation.maxUses:
raise HTTPException(410, "Invitation has been fully used")
mandate = db.getMandate(invitation.mandateId)
roles = db.getRolesByIds(invitation.roleIds)
return InvitationPublicInfo(
mandateName=mandate.name if mandate else "Unknown",
roles=[r.roleLabel for r in roles],
expiresAt=invitation.expiresAt,
requiresNewAccount=True # Frontend zeigt Register oder Login
)
@router.post("/api/invitations/{token}/accept")
@limiter.limit("10/minute")
async def acceptInvitation(
token: str,
currentUser: User = Depends(getCurrentUser)
) -> Dict:
"""
Einladung annehmen (eingeloggter User).
Fügt User zum Mandanten/Feature hinzu.
"""
invitation = db.getInvitationByToken(token)
# Validierungen
if not invitation:
raise HTTPException(404, "Invitation not found")
if invitation.expiresAt < getUtcTimestamp():
raise HTTPException(410, "Invitation has expired")
if invitation.revokedAt:
raise HTTPException(410, "Invitation has been revoked")
if invitation.currentUses >= invitation.maxUses:
raise HTTPException(410, "Invitation has been fully used")
# Prüfen ob User bereits Mitglied ist
existingMembership = db.getUserMandate(currentUser.id, invitation.mandateId)
async with db.transaction() as tx:
if not existingMembership:
# UserMandate erstellen
userMandate = UserMandate(
userId=currentUser.id,
mandateId=invitation.mandateId,
enabled=True
)
await tx.create(userMandate)
# Rollen zuweisen
for roleId in invitation.roleIds:
umRole = UserMandateRole(
userMandateId=userMandate.id,
roleId=roleId
)
await tx.create(umRole)
else:
# Nur neue Rollen hinzufügen
existingRoleIds = set(db.getRoleIdsForUserMandate(existingMembership.id))
for roleId in invitation.roleIds:
if roleId not in existingRoleIds:
umRole = UserMandateRole(
userMandateId=existingMembership.id,
roleId=roleId
)
await tx.create(umRole)
# Feature-Zugang (falls spezifiziert)
if invitation.featureInstanceId:
existingAccess = db.getFeatureAccess(currentUser.id, invitation.featureInstanceId)
if not existingAccess:
featureAccess = FeatureAccess(
userId=currentUser.id,
featureInstanceId=invitation.featureInstanceId,
enabled=True
)
await tx.create(featureAccess)
# Feature-Rollen zuweisen
for roleId in invitation.roleIds:
role = db.getRole(roleId)
if role and role.featureInstanceId == invitation.featureInstanceId:
faRole = FeatureAccessRole(
featureAccessId=featureAccess.id,
roleId=roleId
)
await tx.create(faRole)
# Invitation als verwendet markieren
await tx.update(Invitation, invitation.id, {
"currentUses": invitation.currentUses + 1,
"usedBy": currentUser.id,
"usedAt": getUtcTimestamp()
})
await tx.commit()
mandate = db.getMandate(invitation.mandateId)
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId=invitation.mandateId,
action="invitation_accepted",
details=f"invitationId={invitation.id}"
)
return {
"status": "success",
"message": f"Successfully joined {mandate.name}",
"mandateId": invitation.mandateId
}
9.4 Einladungen verwalten (Mandate-Admin)
@router.get("/api/mandates/{mandateId}/invitations")
async def listInvitations(
mandateId: str,
status: Optional[str] = Query(None, regex="^(active|expired|used|revoked)$"),
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
) -> List[InvitationListItem]:
"""Listet alle Einladungen eines Mandanten."""
if not hasMandateAdminRole(ctx, mandateId):
raise HTTPException(403, "Mandate-Admin role required")
invitations = db.getInvitations(mandateId=mandateId)
# Filter by status
now = getUtcTimestamp()
if status:
filtered = []
for inv in invitations:
if status == "active" and inv.expiresAt > now and not inv.revokedAt and inv.currentUses < inv.maxUses:
filtered.append(inv)
elif status == "expired" and inv.expiresAt <= now:
filtered.append(inv)
elif status == "used" and inv.currentUses >= inv.maxUses:
filtered.append(inv)
elif status == "revoked" and inv.revokedAt:
filtered.append(inv)
invitations = filtered
return [inv.toListItem() for inv in invitations]
@router.delete("/api/mandates/{mandateId}/invitations/{invitationId}")
async def revokeInvitation(
mandateId: str,
invitationId: str,
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
) -> Dict:
"""Widerruft eine Einladung."""
if not hasMandateAdminRole(ctx, mandateId):
raise HTTPException(403, "Mandate-Admin role required")
invitation = db.getInvitation(invitationId)
if not invitation or invitation.mandateId != mandateId:
raise HTTPException(404, "Invitation not found")
db.update(Invitation, invitationId, {
"revokedAt": getUtcTimestamp()
})
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId=mandateId,
action="invitation_revoked",
details=f"invitationId={invitationId}"
)
return {"status": "success", "message": "Invitation revoked"}
9.5 Registration mit Einladung (Neuer User)
@router.post("/api/invitations/{token}/register")
@limiter.limit("5/minute")
async def registerWithInvitation(
token: str,
data: UserRegistration
) -> AuthResponse:
"""
Registriert neuen User UND nimmt Einladung an.
Kombinierter Flow für neue User.
"""
invitation = db.getInvitationByToken(token)
# Validierungen (wie bei acceptInvitation)
if not invitation or invitation.expiresAt < getUtcTimestamp():
raise HTTPException(410, "Invitation expired or invalid")
if invitation.revokedAt or invitation.currentUses >= invitation.maxUses:
raise HTTPException(410, "Invitation no longer valid")
# Email-Check (falls Einladung für bestimmte Email)
if invitation.email and data.email != invitation.email:
raise HTTPException(400, f"This invitation is for {invitation.email}")
async with db.transaction() as tx:
# 1. User erstellen
newUser = User(
username=data.username,
email=data.email,
fullName=data.fullName,
language=data.language or "en",
enabled=True,
isSysAdmin=False
)
newUser.passwordHash = hashPassword(data.password)
await tx.create(newUser)
# 2. Zum Mandanten hinzufügen
userMandate = UserMandate(
userId=newUser.id,
mandateId=invitation.mandateId,
enabled=True
)
await tx.create(userMandate)
# Rollen zuweisen
for roleId in invitation.roleIds:
umRole = UserMandateRole(
userMandateId=userMandate.id,
roleId=roleId
)
await tx.create(umRole)
# 3. Feature-Zugang (falls spezifiziert)
if invitation.featureInstanceId:
featureAccess = FeatureAccess(
userId=newUser.id,
featureInstanceId=invitation.featureInstanceId,
enabled=True
)
await tx.create(featureAccess)
for roleId in invitation.roleIds:
role = await tx.getRole(roleId)
if role and role.featureInstanceId == invitation.featureInstanceId:
faRole = FeatureAccessRole(
featureAccessId=featureAccess.id,
roleId=roleId
)
await tx.create(faRole)
# 4. Invitation aktualisieren
await tx.update(Invitation, invitation.id, {
"currentUses": invitation.currentUses + 1,
"usedBy": newUser.id,
"usedAt": getUtcTimestamp()
})
await tx.commit()
# 5. Token generieren und einloggen
tokens = createTokenPair(newUser)
audit_logger.logUserAccess(
userId=str(newUser.id),
mandateId=invitation.mandateId,
action="register_via_invitation",
successInfo=f"invitationId={invitation.id}"
)
return AuthResponse(
user=newUser.toPublic(),
accessToken=tokens.accessToken,
refreshToken=tokens.refreshToken
)
10. Cascade Delete (Orphan Prevention)
8.1 PostgreSQL Foreign Keys
-- ============================================
-- MANDATE CASCADE
-- ============================================
-- UserMandate: Löschen wenn Mandate oder User gelöscht
ALTER TABLE "UserMandate"
ADD CONSTRAINT fk_usermandate_mandate
FOREIGN KEY ("mandateId") REFERENCES "Mandate"("id") ON DELETE CASCADE;
ALTER TABLE "UserMandate"
ADD CONSTRAINT fk_usermandate_user
FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE;
-- FeatureInstance: Löschen wenn Mandate gelöscht
ALTER TABLE "FeatureInstance"
ADD CONSTRAINT fk_featureinstance_mandate
FOREIGN KEY ("mandateId") REFERENCES "Mandate"("id") ON DELETE CASCADE;
-- Role: Löschen wenn Mandate gelöscht
ALTER TABLE "Role"
ADD CONSTRAINT fk_role_mandate
FOREIGN KEY ("mandateId") REFERENCES "Mandate"("id") ON DELETE CASCADE;
-- ============================================
-- FEATURE INSTANCE CASCADE
-- ============================================
-- FeatureAccess: Löschen wenn FeatureInstance gelöscht
ALTER TABLE "FeatureAccess"
ADD CONSTRAINT fk_featureaccess_instance
FOREIGN KEY ("featureInstanceId") REFERENCES "FeatureInstance"("id") ON DELETE CASCADE;
-- Role: Löschen wenn FeatureInstance gelöscht
ALTER TABLE "Role"
ADD CONSTRAINT fk_role_instance
FOREIGN KEY ("featureInstanceId") REFERENCES "FeatureInstance"("id") ON DELETE CASCADE;
-- ============================================
-- USER CASCADE
-- ============================================
-- FeatureAccess: Löschen wenn User gelöscht
ALTER TABLE "FeatureAccess"
ADD CONSTRAINT fk_featureaccess_user
FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE;
-- ============================================
-- ROLE CASCADE (WICHTIG!)
-- ============================================
-- AccessRule: Löschen wenn Role gelöscht
ALTER TABLE "AccessRule"
ADD CONSTRAINT fk_accessrule_role
FOREIGN KEY ("roleId") REFERENCES "Role"("id") ON DELETE CASCADE;
8.2 Junction Tables (SQL-Definition)
Models: Siehe Kapitel 2.3 für Pydantic-Models.
-- ============================================
-- JUNCTION TABLES
-- ============================================
-- UserMandate zu Role Verknüpfung
CREATE TABLE "UserMandateRole" (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
"userMandateId" UUID NOT NULL REFERENCES "UserMandate"("id") ON DELETE CASCADE,
"roleId" UUID NOT NULL REFERENCES "Role"("id") ON DELETE CASCADE,
UNIQUE("userMandateId", "roleId")
);
-- FeatureAccess zu Role Verknüpfung
CREATE TABLE "FeatureAccessRole" (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
"featureAccessId" UUID NOT NULL REFERENCES "FeatureAccess"("id") ON DELETE CASCADE,
"roleId" UUID NOT NULL REFERENCES "Role"("id") ON DELETE CASCADE,
UNIQUE("featureAccessId", "roleId")
);
8.3 Cleanup bei leerer Mitgliedschaft (Application-Level)
Design-Entscheidung: Cleanup wird auf Application-Level durchgeführt, nicht via DB-Trigger.
Begründung:
- Bessere Kontrolle über Transaktionsgrenzen
- Vermeidung von Race Conditions bei parallelen Löschungen
- Explizites Verhalten statt impliziter Trigger-Magie
- Einfacheres Debugging und Testing
async def removeRoleFromUserMandate(userMandateId: str, roleId: str):
"""
Entfernt eine Rolle von einem UserMandate.
Löscht das UserMandate, wenn keine Rollen mehr vorhanden sind.
Verwendet SELECT FOR UPDATE um Race Conditions zu vermeiden.
"""
async with db.transaction() as tx:
# 0. Lock auf UserMandate um Race Conditions zu vermeiden
await tx.execute(
'SELECT * FROM "UserMandate" WHERE id = :id FOR UPDATE',
{"id": userMandateId}
)
# 1. Rolle entfernen
await tx.delete(UserMandateRole, {"userMandateId": userMandateId, "roleId": roleId})
# 2. Prüfen ob noch Rollen vorhanden
remainingRoles = await tx.getRecordset(
UserMandateRole,
recordFilter={"userMandateId": userMandateId}
)
# 3. Wenn keine Rollen mehr: UserMandate löschen
if not remainingRoles:
await tx.delete(UserMandate, userMandateId)
logger.info(f"Deleted empty UserMandate {userMandateId}")
await tx.commit()
async def removeRoleFromFeatureAccess(featureAccessId: str, roleId: str):
"""
Entfernt eine Rolle von einem FeatureAccess.
Löscht das FeatureAccess, wenn keine Rollen mehr vorhanden sind.
Verwendet SELECT FOR UPDATE um Race Conditions zu vermeiden.
"""
async with db.transaction() as tx:
# 0. Lock auf FeatureAccess um Race Conditions zu vermeiden
await tx.execute(
'SELECT * FROM "FeatureAccess" WHERE id = :id FOR UPDATE',
{"id": featureAccessId}
)
# 1. Rolle entfernen
await tx.delete(FeatureAccessRole, {"featureAccessId": featureAccessId, "roleId": roleId})
# 2. Prüfen ob noch Rollen vorhanden
remainingRoles = await tx.getRecordset(
FeatureAccessRole,
recordFilter={"featureAccessId": featureAccessId}
)
# 3. Wenn keine Rollen mehr: FeatureAccess löschen
if not remainingRoles:
await tx.delete(FeatureAccess, featureAccessId)
logger.info(f"Deleted empty FeatureAccess {featureAccessId}")
await tx.commit()
Wichtig: Alle Rollen-Löschungen müssen über diese Funktionen erfolgen, nicht direkt auf der Junction Table.
8.4 Lösch-Kaskade Übersicht
Mandate löschen:
├── UserMandate → gelöscht (FK CASCADE)
│ └── UserMandateRole → gelöscht (FK CASCADE)
├── FeatureInstance → gelöscht (FK CASCADE)
│ ├── FeatureAccess → gelöscht (FK CASCADE)
│ │ └── FeatureAccessRole → gelöscht (FK CASCADE)
│ └── Role (instanz-spezifisch) → gelöscht (FK CASCADE)
│ └── AccessRule → gelöscht (FK CASCADE)
├── Role (mandant-spezifisch) → gelöscht (FK CASCADE)
│ └── AccessRule → gelöscht (FK CASCADE)
└── Feature-Daten (ChatWorkflow, etc.) → gelöscht (FK CASCADE)
Role löschen:
├── AccessRule (alle für diese Rolle) → gelöscht (FK CASCADE)
├── UserMandateRole (Verknüpfungen) → gelöscht (FK CASCADE)
│ └── Application: Prüft ob UserMandate leer → löscht wenn ja (siehe 8.3)
└── FeatureAccessRole (Verknüpfungen) → gelöscht (FK CASCADE)
└── Application: Prüft ob FeatureAccess leer → löscht wenn ja (siehe 8.3)
User löschen:
├── UserMandate → gelöscht (FK CASCADE)
│ └── UserMandateRole → gelöscht (FK CASCADE)
└── FeatureAccess → gelöscht (FK CASCADE)
└── FeatureAccessRole → gelöscht (FK CASCADE)
11. Transaktionen (Konsistenz-Garantie)
9.1 Multi-Step Operations in Transaktionen
async def createFeatureInstanceWithRoles(
featureCode: str,
mandateId: str,
label: str
) -> FeatureInstance:
"""
Erstellt FeatureInstance mit kopierten Rollen und Rules.
Alles in einer Transaktion für Konsistenz.
"""
async with db.transaction() as tx:
try:
# 1. FeatureInstance erstellen
instance = FeatureInstance(
featureCode=featureCode,
mandateId=mandateId,
label=label
)
await tx.create(instance)
# 2. Globale Template-Rollen finden
globalRoles = await tx.getRoles(
featureCode=featureCode,
mandateId=None
)
for templateRole in globalRoles:
# 3. Rolle für diese Instanz kopieren
newRole = Role(
roleLabel=templateRole.roleLabel,
description=templateRole.description,
featureCode=templateRole.featureCode,
mandateId=mandateId,
featureInstanceId=instance.id
)
await tx.create(newRole)
# 4. AccessRules der Template-Rolle kopieren
templateRules = await tx.getAccessRules(roleId=templateRole.id)
for rule in templateRules:
newRule = AccessRule(
roleId=newRole.id,
context=rule.context,
item=rule.item,
view=rule.view,
read=rule.read,
create=rule.create,
update=rule.update,
delete=rule.delete
)
await tx.create(newRule)
# Commit nur wenn alles erfolgreich
await tx.commit()
# Cache-Version inkrementieren
await onRbacChange(mandateId, instance.id)
return instance
except Exception as e:
await tx.rollback()
logger.error(f"Failed to create feature instance: {e}")
raise
12. API-Kontext
10.1 Multi-Mandant Token-Design
Design-Entscheidung: Ein JWT-Token ist nicht an einen spezifischen Mandanten gebunden.
Warum:
- User arbeitet im UI parallel in mehreren Mandanten (z.B. mehrere Browser-Tabs)
- Mandant-Wechsel erfordert keinen neuen Login
- Token enthält nur User-Identität, nicht Mandant-Kontext
Sicherheit:
- Der Request-Header (
X-Mandate-Id) bestimmt den aktiven Kontext - Bei jedem Request wird geprüft, ob der User Mitglied des Mandanten ist
- User kann nur auf Mandanten zugreifen, in denen er explizit Mitglied ist
- Kein Sicherheitsrisiko: Token-Kompromittierung erlaubt nur Zugriff auf Mandanten, in denen der User bereits berechtigt ist
JWT Token enthält:
├── userId: "abc-123"
├── sessionId: "sess-456"
├── authority: "local"
└── exp: 1737123456
Request Headers bestimmen Kontext:
├── X-Mandate-Id: "mandate-789"
└── X-Instance-Id: "instance-012" (optional)
10.2 Request-Kontext ermitteln
def getRequestContext(
mandateId: Optional[str] = Header(None, alias="X-Mandate-Id"),
featureInstanceId: Optional[str] = Header(None, alias="X-Instance-Id"),
currentUser: User = Depends(getCurrentUser)
) -> RequestContext:
"""
Ermittelt Kontext aus Headers.
Prüft Berechtigung und lädt Role-IDs.
WICHTIG: Auch SysAdmin braucht explizite Membership für Mandant-Kontext!
SysAdmin-Flag gibt keinen impliziten Zugriff auf Mandant-Daten.
"""
ctx = RequestContext(user=currentUser)
if mandateId:
# Prüfe Mandant-Mitgliedschaft - AUCH für SysAdmin!
# SysAdmin muss explizit zum Mandanten hinzugefügt werden
membership = db.getUserMandate(currentUser.id, mandateId)
if not membership:
# Kein impliziter Zugriff für SysAdmin - Fail-Fast!
raise HTTPException(403, "Not member of mandate")
if not membership.enabled:
raise HTTPException(403, "Mandate membership is disabled")
ctx.mandateId = mandateId
# Rollen via Junction Table laden
ctx.roleIds = db.getRoleIdsForUserMandate(membership.id)
if featureInstanceId:
# Prüfe Feature-Zugang - AUCH für SysAdmin!
access = db.getFeatureAccess(currentUser.id, featureInstanceId)
if not access:
raise HTTPException(403, "No access to feature instance")
if not access.enabled:
raise HTTPException(403, "Feature access is disabled")
ctx.featureInstanceId = featureInstanceId
# Instanz-Rollen hinzufügen
ctx.roleIds.extend(db.getRoleIdsForFeatureAccess(access.id))
return ctx
10.3 RBAC-Prüfung (Stateless + SysAdmin-Blockade)
def checkPermission(
ctx: RequestContext,
context: str, # "DATA", "UI", "RESOURCE"
item: str, # z.B. "TrusteeContract"
action: str # "view", "read", "create", "update", "delete"
) -> AccessLevel:
"""Prüft Berechtigung basierend auf Kontext und Role-IDs."""
# System-Level (kein Mandant): SysAdmin hat vollen Zugriff
if ctx.user.isSysAdmin and not ctx.mandateId:
return AccessLevel.ALL
# KRITISCH: SysAdmin-Flag gibt KEINEN Zugriff auf Mandant-Daten!
# SysAdmin muss reguläre Rollen im Mandanten haben
if ctx.mandateId and not ctx.roleIds:
# User (auch SysAdmin) hat keine Rollen im Mandanten → kein Zugriff
return AccessLevel.NONE
# Regeln für alle Role-IDs des Users laden (mit Caching)
rules = getRulesForUserBulk(
ctx.user.id,
ctx.mandateId,
ctx.featureInstanceId
)
# Beste Permission über alle Regeln
bestLevel = AccessLevel.NONE
for priority, rule in rules:
if not matchesItem(rule.item, item):
continue
if rule.context != context:
continue
level = getattr(rule, action, None)
if level and isMorePermissive(level, bestLevel):
bestLevel = level
return bestLevel
10.4 IMMUTABLE-Enforcement bei Updates
def validateUpdateRequest(model: str, recordId: str, updateData: dict):
"""
Blockiert Updates auf immutable Felder.
Wirft PermissionError wenn versucht wird, context-Felder zu ändern.
"""
IMMUTABLE = {
"Role": ["mandateId", "featureInstanceId", "featureCode"],
"AccessRule": ["context", "roleId"]
}
forbidden = IMMUTABLE.get(model, [])
violations = [f for f in forbidden if f in updateData]
if violations:
raise PermissionError(
f"Cannot update immutable fields on {model}: {violations}. "
f"Delete and recreate instead."
)
10.5 AccessRule Scope-Validierung bei Erstellung
Sicherheitsregel: Ein User darf nur AccessRules für Rollen erstellen, die in seinem Scope liegen.
def validateAccessRuleCreation(ctx: RequestContext, rule: AccessRule):
"""
Validiert, dass der User berechtigt ist, eine AccessRule für diese Rolle zu erstellen.
Regeln:
- SysAdmin kann Regeln für globale Rollen (mandateId=None) erstellen
- Mandate-Admin kann Regeln für Rollen seines Mandanten erstellen
- Niemand kann Regeln für Rollen anderer Mandanten erstellen
"""
role = db.getRole(rule.roleId)
if not role:
raise ValueError(f"Role {rule.roleId} not found")
# Globale Rollen: nur SysAdmin
if role.mandateId is None:
if not ctx.user.isSysAdmin:
raise PermissionError(
"Only SysAdmin can create AccessRules for global (template) roles"
)
return # OK
# Mandant-Rollen: nur wenn User im gleichen Mandant
if role.mandateId != ctx.mandateId:
raise PermissionError(
f"Cannot create AccessRules for roles of other mandates. "
f"Role belongs to mandate {role.mandateId}, your context is {ctx.mandateId}"
)
# Instanz-Rollen: zusätzlich prüfen ob User Admin der Instanz ist
if role.featureInstanceId:
if not hasAdminRoleForInstance(ctx, role.featureInstanceId):
raise PermissionError(
f"Admin role required to create AccessRules for instance-specific roles"
)
# Mandate-Rollen ohne Instanz: Mandate-Admin Rolle erforderlich
else:
if not hasMandateAdminRole(ctx):
raise PermissionError(
"Mandate-Admin role required to create AccessRules for mandate-level roles"
)
# Anwendung im Endpoint:
@router.post("/api/rbac/rules")
async def createAccessRule(
rule: AccessRule,
currentUser: User = Depends(getCurrentUser),
ctx: RequestContext = Depends(getRequestContext)
):
# 1. FK-Validierung: Role muss existieren
role = db.getRole(rule.roleId)
if not role:
raise HTTPException(404, f"Role {rule.roleId} not found")
# 2. Scope-Validierung (Role-Kontext prüfen)
validateAccessRuleCreation(ctx, rule)
# IMMUTABLE-Validierung ist hier nicht nötig (nur bei UPDATE)
# 3. Erstellen
createdRule = db.create(AccessRule, rule)
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId=ctx.mandateId or "global",
action="accessrule_created",
details=f"roleId={rule.roleId}, context={rule.context}, item={rule.item}"
)
return createdRule
13. DSGVO-Compliance Endpoints
Hinweis: Dieses Konzept behandelt nur die technische Code-Implementierung der DSGVO-Endpoints. Organisatorische Aspekte (z.B. Daten-Anonymisierung bei Export von Daten die andere User betreffen) sind Verantwortung des Users/Mandanten und werden hier nicht behandelt.
11.1 Recht auf Auskunft (Art. 15 DSGVO)
Anforderung: Betroffene Person kann Auskunft über alle gespeicherten Daten verlangen.
@router.get("/api/user/me/data-export")
@limiter.limit("5/hour") # Streng limitiert (aufwändige Operation)
async def exportUserData(
format: str = Query("json", regex="^(json|csv)$"),
currentUser: User = Depends(getCurrentUser)
) -> UserDataExport:
"""
Exportiert ALLE Daten des aktuellen Users (DSGVO Art. 15).
Enthält:
- Persönliche Daten (User-Objekt)
- Alle Mandant-Mitgliedschaften
- Alle Feature-Zugänge
- Alle vom User erstellten Daten (pro Mandant)
- Audit-Log Einträge (eigene Aktionen)
Returns:
UserDataExport mit allen Daten in strukturiertem Format
"""
export = UserDataExport(
exportedAt=getUtcTimestamp(),
userId=currentUser.id,
format=format
)
# 1. Persönliche Daten
export.personalData = {
"id": currentUser.id,
"username": currentUser.username,
"email": currentUser.email,
"fullName": currentUser.fullName,
"language": currentUser.language,
"createdAt": currentUser.createdAt,
"lastLogin": currentUser.lastLogin
}
# 2. Mandant-Mitgliedschaften
memberships = db.getUserMandates(currentUser.id)
export.mandateMemberships = []
for m in memberships:
mandate = db.getMandate(m.mandateId)
roles = db.getRolesForUserMandate(m.id)
export.mandateMemberships.append({
"mandateId": m.mandateId,
"mandateName": mandate.name if mandate else "Unknown",
"roles": [r.roleLabel for r in roles],
"joinedAt": m.createdAt,
"enabled": m.enabled
})
# 3. Feature-Zugänge
accesses = db.getFeatureAccessesForUser(currentUser.id)
export.featureAccesses = []
for a in accesses:
instance = db.getFeatureInstance(a.featureInstanceId)
roles = db.getRolesForFeatureAccess(a.id)
export.featureAccesses.append({
"featureInstanceId": a.featureInstanceId,
"featureCode": instance.featureCode if instance else "Unknown",
"instanceLabel": instance.label if instance else "Unknown",
"roles": [r.roleLabel for r in roles],
"grantedAt": a.createdAt
})
# 4. Vom User erstellte Daten (pro Mandant, _createdBy = userId)
export.createdData = {}
for m in memberships:
mandateData = {}
# Alle Tabellen mit _createdBy durchsuchen
for table in DATA_TABLES_WITH_CREATOR:
records = db.getRecordset(
table,
recordFilter={"_createdBy": currentUser.id, "mandateId": m.mandateId}
)
if records:
# Nur Metadaten, keine sensiblen Inhalte anderer User
mandateData[table.__name__] = [
{"id": r["id"], "createdAt": r.get("_createdAt"), "type": table.__name__}
for r in records
]
if mandateData:
export.createdData[m.mandateId] = mandateData
# 5. Audit-Log (eigene Aktionen der letzten 12 Monate)
export.auditLog = db.getAuditEntriesForUser(
currentUser.id,
since=getUtcTimestamp() - (365 * 24 * 3600)
)
# Audit: Export wurde angefordert
audit_logger.logDataAccess(
userId=str(currentUser.id),
mandateId="all",
action="gdpr_data_export",
details=f"Format: {format}"
)
return export
class UserDataExport(BaseModel):
"""DSGVO Art. 15 - Vollständiger Datenexport"""
exportedAt: float
userId: str
format: str
personalData: Dict[str, Any]
mandateMemberships: List[Dict[str, Any]]
featureAccesses: List[Dict[str, Any]]
createdData: Dict[str, Dict[str, List[Dict]]] # mandateId → table → records
auditLog: List[Dict[str, Any]]
11.2 Recht auf Datenübertragbarkeit (Art. 20 DSGVO)
Anforderung: Daten in strukturiertem, maschinenlesbarem Format bereitstellen.
@router.get("/api/user/me/data-portability")
@limiter.limit("3/hour")
async def exportPortableData(
mandateId: Optional[str] = Query(None, description="Optional: nur für einen Mandanten"),
currentUser: User = Depends(getCurrentUser)
) -> Response:
"""
Exportiert User-Daten in portablem Format (DSGVO Art. 20).
Format: JSON mit standardisierten Feldnamen
Kann direkt in andere Systeme importiert werden.
Returns:
JSON-Datei zum Download
"""
portableData = {
"schema_version": "1.0",
"exported_at": datetime.utcnow().isoformat(),
"source_system": "PowerOn Platform",
"user": {
"identifier": currentUser.username,
"email": currentUser.email,
"display_name": currentUser.fullName,
"preferred_language": currentUser.language
},
"data": {}
}
# Mandanten filtern
memberships = db.getUserMandates(currentUser.id)
if mandateId:
memberships = [m for m in memberships if m.mandateId == mandateId]
for m in memberships:
mandate = db.getMandate(m.mandateId)
mandateExport = {
"organization_name": mandate.name if mandate else "Unknown",
"membership_since": m.createdAt,
"content": {}
}
# Feature-spezifische Daten exportieren
accesses = db.getFeatureAccessesForUser(currentUser.id)
for access in accesses:
instance = db.getFeatureInstance(access.featureInstanceId)
if not instance or instance.mandateId != m.mandateId:
continue
featureData = _exportFeatureData(
featureCode=instance.featureCode,
instanceId=instance.id,
userId=currentUser.id
)
if featureData:
mandateExport["content"][instance.featureCode] = featureData
portableData["data"][m.mandateId] = mandateExport
# Audit
audit_logger.logDataAccess(
userId=str(currentUser.id),
mandateId=mandateId or "all",
action="gdpr_data_portability",
details=f"Exported portable data"
)
# Als Download-Datei zurückgeben
filename = f"poweron_export_{currentUser.username}_{datetime.utcnow().strftime('%Y%m%d')}.json"
return Response(
content=json.dumps(portableData, indent=2, ensure_ascii=False),
media_type="application/json",
headers={
"Content-Disposition": f'attachment; filename="{filename}"'
}
)
def _exportFeatureData(featureCode: str, instanceId: str, userId: str) -> Dict:
"""
Exportiert Feature-spezifische Daten im portablen Format.
Jedes Feature definiert sein Export-Schema.
"""
exporters = {
"trustee": _exportTrusteeData,
"chatbot": _exportChatbotData,
"workflow": _exportWorkflowData,
# Weitere Features...
}
exporter = exporters.get(featureCode)
if exporter:
return exporter(instanceId, userId)
return {}
def _exportTrusteeData(instanceId: str, userId: str) -> Dict:
"""Exportiert Trustee-Daten im portablen Format."""
contracts = db.getRecordset(
TrusteeContract,
recordFilter={"featureInstanceId": instanceId, "_createdBy": userId}
)
return {
"contracts": [
{
"reference": c.get("reference"),
"client_name": c.get("clientName"),
"created_at": c.get("_createdAt"),
"status": c.get("status")
# Keine internen IDs, nur portable Daten
}
for c in contracts
]
}
11.3 Recht auf Löschung (Art. 17 DSGVO)
Bereits implementiert via CASCADE DELETE, aber expliziter Endpoint für User-initiated Löschung:
@router.delete("/api/user/me")
@limiter.limit("1/day")
async def deleteOwnAccount(
confirmation: str = Body(..., description="Must be 'DELETE MY ACCOUNT'"),
currentUser: User = Depends(getCurrentUser)
) -> Dict:
"""
User löscht eigenen Account (DSGVO Art. 17).
ACHTUNG: Unwiderruflich! Löscht:
- User-Datensatz
- Alle Mandant-Mitgliedschaften (CASCADE)
- Alle Feature-Zugänge (CASCADE)
- Alle vom User erstellten Daten werden anonymisiert (_createdBy → 'deleted')
"""
if confirmation != "DELETE MY ACCOUNT":
raise HTTPException(400, "Confirmation text must be exactly 'DELETE MY ACCOUNT'")
# SysAdmin kann sich nicht selbst löschen
if currentUser.isSysAdmin:
raise HTTPException(400,
"SysAdmin accounts cannot be self-deleted. Contact another SysAdmin.")
# Vor dem Löschen: Datenexport anbieten
# (Frontend sollte dies vorher anzeigen)
# 1. Anonymisiere erstellte Daten (statt löschen - für Audit-Trail)
for table in DATA_TABLES_WITH_CREATOR:
db.execute(
f'UPDATE "{table.__name__}" SET "_createdBy" = \'deleted-user\' '
f'WHERE "_createdBy" = :userId',
{"userId": currentUser.id}
)
# 2. Audit BEVOR User gelöscht wird
audit_logger.logSecurityEvent(
userId=str(currentUser.id),
mandateId="all",
action="gdpr_account_deletion",
details=f"User {currentUser.username} deleted own account"
)
# 3. User löschen (CASCADE löscht Memberships, Accesses, Tokens, etc.)
db.delete(User, currentUser.id)
return {"message": "Account successfully deleted", "deletedAt": getUtcTimestamp()}
11.4 Übersicht DSGVO-Endpoints
| Endpoint | DSGVO | Beschreibung | Rate Limit |
|---|---|---|---|
GET /api/user/me/data-export |
Art. 15 | Vollständiger Datenexport (JSON/CSV) | 5/hour |
GET /api/user/me/data-portability |
Art. 20 | Portables Format für Übertragung | 3/hour |
DELETE /api/user/me |
Art. 17 | Account-Löschung | 1/day |
PUT /api/user/me |
Art. 16 | Datenberichtigung (bereits vorhanden) | 30/min |
14. Zusammenfassung
14.1 Key Design Decisions
| Entscheidung | Lösung |
|---|---|
| User-Mandant-Beziehung | m:n via UserMandate + Junction Table UserMandateRole |
| Feature-Zugriff | m:n via FeatureAccess + Junction Table FeatureAccessRole |
| Rollen-Verknüpfung | Via Junction Tables (keine Array-Felder) für CASCADE DELETE |
| Role-Kontext | IMMUTABLE (mandateId, featureInstanceId, featureCode) |
| AccessRule-Kontext | IMMUTABLE (context, roleId) + Scope-Validierung |
| Admin-Funktionen | isSysAdmin Flag = System-Zugriff, KEIN Daten-Zugriff, KEINE Self-Service-Eskalation |
| System-Funktionen | Endpoints ohne Mandant-Kontext |
| RBAC-Scope | Via Role: Global → Mandant → Instanz |
| Item-Notation | Dot-separated hierarchisch |
| Orphan Prevention | PostgreSQL CASCADE DELETE + Application-Level Cleanup (kein Trigger) |
| RBAC-Design | Stateless - kein Cache, direkt aus DB (Cloud-Ready) |
| Token-Design | User-gebunden, nicht Mandant-gebunden (parallele Mandant-Arbeit) |
| Template-Sync | Explizite Synchronisation von Rollen via Admin-Funktion (nur für neue Instanzen) |
| Initial-Rollen | System-Rollen werden im Bootstrap-Modul (interfaceBootstrap.py) erstellt |
| Deployment | Greenfield - keine Migration, keine Backwards Compatibility |
| DB-Skalierung | Infrastruktur-Aufgabe (Read Replicas, Pooling), nicht Code-Aufgabe |
| Read Replica | Kritische RBAC-Reads immer vom Primary (Replication Lag) |
| RBAC Export/Import | Global (SysAdmin), Mandant (Mandate-Admin), Instanz (Feature-Admin) |
| Invitation-Flow | Token-basiert, via Mandate-Admin, Self-Service Registration |
14.2 Bestehende Security-Features (Gateway)
| Feature | Datei | Status |
|---|---|---|
| Password Hashing (Argon2) | interfaceDbAppObjects.py |
✅ Vorhanden |
| JWT Token mit Refresh | jwtService.py |
✅ Vorhanden |
| Token Revocation | interfaceDbAppObjects.py |
✅ Vorhanden |
| httpOnly Cookies | jwtService.py |
✅ Vorhanden |
| Rate Limiting | routeSecurityLocal.py |
✅ Vorhanden |
| CSRF Protection | app.py |
✅ Vorhanden |
| Audit Logging | auditLogger.py |
✅ Vorhanden |
14.3 Vollständige Dateiliste (Code-Scan)
Legende: 🔴 Breaking Change | 🟡 Anpassung | 🟢 NEU
14.3.1 Datamodels (10 Dateien)
| Datei | Typ | Änderung |
|---|---|---|
datamodelUam.py |
🔴 | Entferne User.mandateId, User.roleLabels; Add User.isSysAdmin |
datamodelRbac.py |
🔴 | Role erweitern: mandateId, featureInstanceId, featureCode |
datamodelChat.py |
🟡 | mandateId bleibt, aber Zugriff via Context |
datamodelTrustee.py |
🟡 | mandateId bleibt, aber Zugriff via Context |
datamodelRealEstate.py |
🟡 | mandateId bleibt, aber Zugriff via Context |
datamodelVoice.py |
🟡 | mandateId bleibt, aber Zugriff via Context |
datamodelFiles.py |
🟡 | mandateId bleibt, aber Zugriff via Context |
datamodelMessaging.py |
🟡 | mandateId bleibt, aber Zugriff via Context |
datamodelNeutralizer.py |
🟡 | mandateId bleibt, aber Zugriff via Context |
datamodelSecurity.py |
🟡 | Token ohne mandateId |
datamodelFeatures.py |
🟢 | NEU: Feature, FeatureInstance, FeatureAccess |
datamodelMembership.py |
🟢 | NEU: UserMandate, UserMandateRole, FeatureAccessRole |
datamodelInvitation.py |
🟢 | NEU: Invitation |
14.3.2 Interfaces (8 Dateien)
| Datei | Typ | Änderung |
|---|---|---|
interfaceDbAppObjects.py |
🔴 | setUserContext() ohne mandateId, neue Membership-Methoden |
interfaceDbTrusteeObjects.py |
🔴 | currentUser.mandateId → Request-Context |
interfaceDbChatObjects.py |
🔴 | currentUser.mandateId → Request-Context |
interfaceDbRealEstateObjects.py |
🔴 | currentUser.mandateId → Request-Context |
interfaceDbComponentObjects.py |
🔴 | currentUser.mandateId → Request-Context |
interfaceVoiceObjects.py |
🔴 | currentUser.mandateId → Request-Context |
interfaceRbac.py |
🔴 | buildRbacWhereClause() mit explizitem mandateId Parameter |
interfaceBootstrap.py |
🔴 | Initiale Rollen ohne roleLabels am User |
interfaceFeatures.py |
🟢 | NEU: Template-Sync, Application-Level Cleanup |
14.3.3 Routes (18 Dateien)
| Datei | Typ | Änderung |
|---|---|---|
routeSecurityLocal.py |
🔴 | Login ohne mandateId im Token, roleLabels aus Memberships |
routeSecurityMsft.py |
🔴 | OAuth ohne mandateId im Token |
routeSecurityGoogle.py |
🔴 | OAuth ohne mandateId im Token |
routeSecurityAdmin.py |
🔴 | SysAdmin-Check via isSysAdmin, roleLabels → roleIds |
routeDataUsers.py |
🔴 | Filter via Membership statt mandateId |
routeDataMandates.py |
🟡 | Mandate-Admin Berechtigungen via Membership |
routeFeatureTrustee.py |
🔴 | Context aus Header (← routeDataTrustee.py) |
routeRbac.py |
🔴 | Scope-Validierung, roleLabels → roleIds |
routeAdminRbacRoles.py |
🔴 | Rollen mit Kontext-Validierung |
routeFeatureAutomation.py |
🟡 | Context-Handling (← routeAdminAutomationEvents.py) |
routeFeatureChatbot.py |
🔴 | Context aus Header (← routeChatbot.py) |
routeFeatureChatDynamic.py |
🔴 | Context aus Header (← routeChatPlayground.py) |
routeFeatureRealEstate.py |
🔴 | Context aus Header (← routeRealEstate.py) |
routeFeatureNeutralization.py |
🔴 | Context aus Header (← routeDataNeutralization.py) |
routeFeatures.py |
🟢 | NEU: Feature-Instanz-Management |
routeGdpr.py |
🟢 | NEU: DSGVO-Endpoints |
routeRbacExport.py |
🟢 | NEU: RBAC Export/Import |
routeInvitations.py |
🟢 | NEU: Invitation-Flow |
routeAdmin.py |
🟢 | NEU: SysAdmin-Endpoints |
14.3.4 Security & Auth (3 Dateien)
| Datei | Typ | Änderung |
|---|---|---|
security/rbac.py |
🔴 | Bulk Query, IMMUTABLE-Enforcement, SysAdmin-Blockade |
auth/authentication.py |
🔴 | Token ohne mandateId, Context aus Headers |
auth/tokenRefreshService.py |
🟡 | Refresh ohne mandateId |
14.3.5 Features (4 Dateien)
| Datei | Typ | Änderung |
|---|---|---|
features/chatbot/mainChatbot.py |
🔴 | currentUser.mandateId → Context |
features/realEstate/mainRealEstate.py |
🔴 | currentUser.mandateId → Context |
features/dynamicOptions/mainDynamicOptions.py |
🔴 | currentUser.mandateId → Context |
features/neutralizePlayground/mainNeutralizePlayground.py |
🔴 | currentUser.mandateId → Context |
14.3.6 Workflows & Services (3 Dateien)
| Datei | Typ | Änderung |
|---|---|---|
workflows/workflowManager.py |
🟡 | mandateId aus Context statt User |
workflows/methods/methodBase.py |
🔴 | roleLabels → roleIds |
services/serviceNeutralization/mainServiceNeutralization.py |
🟡 | Context-Handling |
14.3.7 Shared & Connectors (3 Dateien)
| Datei | Typ | Änderung |
|---|---|---|
shared/auditLogger.py |
🟡 | mandateId aus Context |
shared/configuration.py |
🟡 | Evtl. neue Config-Keys |
connectors/connectorDbPostgre.py |
🔴 | Junction Tables, IMMUTABLE Triggers |
Zusammenfassung:
- 🔴 Breaking Changes: 28 Dateien
- 🟡 Anpassungen: 12 Dateien
- 🟢 Neue Dateien: 9 Dateien
- Total: ~49 Dateien betroffen
14.4 Phasenweise Implementierung (AI-optimiert)
Prinzip: Jede Phase ist in sich abgeschlossen testbar und kann separat implementiert werden.
PHASE 1: Foundation (Datenmodelle & DB)
Geschätzte Komplexität: Mittel | Dateien: ~8
Ziel: Neue Datenstrukturen ohne Breaking Changes am bestehenden Code.
| Schritt | Datei | Aktion |
|---|---|---|
| 1.1 | datamodelFeatures.py |
🟢 NEU: Feature, FeatureInstance |
| 1.2 | datamodelMembership.py |
🟢 NEU: UserMandate, UserMandateRole, FeatureAccessRole |
| 1.3 | datamodelInvitation.py |
🟢 NEU: Invitation |
| 1.4 | datamodelRbac.py |
🟡 ADD: Role.mandateId, Role.featureInstanceId, Role.featureCode |
| 1.5 | datamodelUam.py |
🟡 ADD: User.isSysAdmin (alte Felder NOCH NICHT entfernen!) |
| 1.6 | connectorDbPostgre.py |
🟡 ADD: Neue Tabellen, Foreign Keys, IMMUTABLE Triggers |
| 1.7 | interfaceBootstrap.py |
🟡 ADD: Initiale Template-Rollen erstellen |
Test: Neue Tabellen existieren, bestehender Code funktioniert noch.
PHASE 2: RBAC Core (Kern-Logik)
Geschätzte Komplexität: Hoch | Dateien: ~5
Ziel: Neues RBAC-System parallel zum alten lauffähig.
| Schritt | Datei | Aktion |
|---|---|---|
| 2.1 | security/rbac.py |
🔴 Neue getRulesForUserBulk() mit Junction Tables |
| 2.2 | interfaceRbac.py |
🔴 buildRbacWhereClause() mit explizitem mandateId |
| 2.3 | interfaceDbAppObjects.py |
🟡 ADD: Neue Membership-Methoden (parallel zu alten) |
| 2.4 | interfaceFeatures.py |
🟢 NEU: Feature-Instanz-Management |
Test: Neue RBAC-Methoden funktionieren, alte noch vorhanden.
PHASE 3: Auth & Context (Authentifizierung)
Geschätzte Komplexität: Hoch | Dateien: ~6
Ziel: Request-Context-System einführen.
| Schritt | Datei | Aktion |
|---|---|---|
| 3.1 | auth/authentication.py |
🔴 getRequestContext() mit Headers |
| 3.2 | routeSecurityLocal.py |
🔴 Login mit neuem Token (ohne mandateId) |
| 3.3 | routeSecurityMsft.py |
🔴 OAuth anpassen |
| 3.4 | routeSecurityGoogle.py |
🔴 OAuth anpassen |
| 3.5 | datamodelSecurity.py |
🟡 Token ohne mandateId |
| 3.6 | auth/tokenRefreshService.py |
🟡 Refresh anpassen |
Test: Login funktioniert mit neuem Token-Format.
PHASE 4: Routes Migration (Schrittweise)
Geschätzte Komplexität: Mittel-Hoch | Dateien: ~18
Ziel: Alle Routes auf neues Context-System migrieren.
4.A - Admin Routes:
| Schritt | Datei |
|---|---|
| 4.A.1 | routeSecurityAdmin.py |
| 4.A.2 | routeAdminRbacRoles.py |
| 4.A.3 | routeDataUsers.py |
| 4.A.4 | routeDataMandates.py |
| 4.A.5 | routeRbac.py |
4.B - Feature Routes:
| Schritt | Datei |
|---|---|
| 4.B.1 | routeFeatureTrustee.py (← routeDataTrustee.py) |
| 4.B.2 | routeFeatureChatbot.py (← routeChatbot.py) |
| 4.B.3 | routeFeatureRealEstate.py (← routeRealEstate.py) |
| 4.B.4 | routeFeatureNeutralization.py (← routeDataNeutralization.py) |
| 4.B.5 | routeFeatureAutomation.py (← routeAdminAutomationEvents.py) |
| 4.B.6 | routeFeatureChatDynamic.py (← routeChatPlayground.py) |
Test: Jede Route einzeln testbar.
PHASE 5: Interfaces Migration
Geschätzte Komplexität: Mittel | Dateien: ~6
Ziel: Alle Interfaces auf Context-System migrieren.
| Schritt | Datei |
|---|---|
| 5.1 | interfaceDbTrusteeObjects.py |
| 5.2 | interfaceDbChatObjects.py |
| 5.3 | interfaceDbRealEstateObjects.py |
| 5.4 | interfaceDbComponentObjects.py |
| 5.5 | interfaceVoiceObjects.py |
Test: Interfaces funktionieren mit neuem Context.
PHASE 6: Features Migration
Geschätzte Komplexität: Mittel | Dateien: ~7
Ziel: Feature-Module auf Context-System migrieren.
| Schritt | Datei |
|---|---|
| 6.1 | features/chatbot/mainChatbot.py |
| 6.2 | features/realEstate/mainRealEstate.py |
| 6.3 | features/dynamicOptions/mainDynamicOptions.py |
| 6.4 | features/neutralizePlayground/mainNeutralizePlayground.py |
| 6.5 | workflows/workflowManager.py |
| 6.6 | workflows/methods/methodBase.py |
| 6.7 | services/serviceNeutralization/mainServiceNeutralization.py |
Test: Features funktionieren mit neuem Context.
PHASE 7: Cleanup & Breaking Changes
Geschätzte Komplexität: Niedrig | Dateien: ~3
Ziel: Alte Felder entfernen (erst wenn alles migriert ist!)
| Schritt | Datei | Aktion |
|---|---|---|
| 7.1 | datamodelUam.py |
🔴 ENTFERNE User.mandateId, User.roleLabels |
| 7.2 | interfaceDbAppObjects.py |
🔴 Alte Methoden entfernen |
| 7.3 | interfaceBootstrap.py |
🔴 Alte roleLabels-Logik entfernen |
Test: Vollständiger Integrationstest.
PHASE 8: Neue Features (Optional, parallel möglich)
Geschätzte Komplexität: Mittel | Dateien: ~6
Ziel: Neue Funktionalität hinzufügen.
| Schritt | Datei |
|---|---|
| 8.1 | routeFeatures.py (NEU) |
| 8.2 | routeInvitations.py (NEU) |
| 8.3 | routeRbacExport.py (NEU) |
| 8.4 | routeGdpr.py (NEU) |
| 8.5 | routeAdmin.py (NEU) |
Test: Neue Endpoints funktionieren.
14.5 AI-Implementierungs-Empfehlung
| Phase | Dateien | Empfehlung |
|---|---|---|
| 1 | 8 | ✅ Einzeln umsetzbar - Keine Breaking Changes |
| 2 | 5 | ✅ Einzeln umsetzbar - Parallele Implementierung |
| 3 | 6 | ⚠️ Zusammen umsetzen - Auth ist kritisch |
| 4 | 18 | ✅ In Gruppen - A und B separat |
| 5 | 6 | ✅ Einzeln umsetzbar |
| 6 | 7 | ✅ Einzeln umsetzbar |
| 7 | 3 | ⚠️ Erst nach Phase 6 - Breaking! |
| 8 | 6 | ✅ Parallel zu Phase 4-6 |
Gesamtstrategie:
- Phase 1-2 zuerst (Foundation)
- Phase 3 als Block (Auth-Critical)
- Phase 4-6 parallel/sequentiell
- Phase 7 erst ganz am Ende
- Phase 8 jederzeit parallel
14.6 DSGVO-Compliance Checkliste
| Anforderung | Implementierung | Status |
|---|---|---|
| Audit-Trail | auditLogger.py für alle Security-Events |
✅ |
| Audit-Retention | Aufbewahrungsfristen je Log-Typ (siehe 7.3) | ✅ Definiert |
| Löschrecht (Art. 17) | DELETE /api/user/me + CASCADE DELETE |
✅ Definiert |
| Zugriffsrecht (Art. 15) | GET /api/user/me/data-export |
✅ Definiert |
| Datenübertragbarkeit (Art. 20) | GET /api/user/me/data-portability |
✅ Definiert |
| Berichtigungsrecht (Art. 16) | PUT /api/user/me (bereits vorhanden) |
✅ |
| Verschlüsselung at rest | DB-Level (managed by cloud provider) | ✅ |
| Verschlüsselung in transit | HTTPS enforced | ✅ |
Out of Scope: Daten-Anonymisierung bei Export (organisatorische Verantwortung des Mandanten)