gateway/modules/routes/routeGdpr.py
2026-01-19 09:18:37 +01:00

517 lines
19 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
GDPR compliance routes for the backend API.
Implements data subject rights according to GDPR regulations.
GDPR Articles implemented:
- Article 15: Right of access (data export)
- Article 16: Right to rectification (via existing update endpoints)
- Article 17: Right to erasure (account deletion)
- Article 20: Right to data portability (machine-readable export)
"""
from fastapi import APIRouter, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
import json
from pydantic import BaseModel, Field
from modules.auth import limiter, getCurrentUser
from modules.datamodels.datamodelUam import User
from modules.interfaces.interfaceDbAppObjects import getRootInterface
from modules.shared.timeUtils import getUtcTimestamp
from modules.shared.auditLogger import audit_logger
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/user/me",
tags=["GDPR"],
responses={404: {"description": "Not found"}}
)
# =============================================================================
# Response Models
# =============================================================================
class DataExportResponse(BaseModel):
"""Response model for GDPR data export"""
exportedAt: float
userId: str
userData: Dict[str, Any]
mandates: List[Dict[str, Any]]
featureAccesses: List[Dict[str, Any]]
invitationsCreated: List[Dict[str, Any]]
invitationsUsed: List[Dict[str, Any]]
class DataPortabilityResponse(BaseModel):
"""Machine-readable data portability response (JSON-LD format)"""
context: str = Field(alias="@context")
type: str = Field(alias="@type")
identifier: str
exportDate: str
data: Dict[str, Any]
class DeletionResult(BaseModel):
"""Result of account deletion"""
success: bool
userId: str
deletedAt: float
deletedData: List[str]
message: str
# =============================================================================
# Article 15: Right of Access
# =============================================================================
@router.get("/data-export", response_model=DataExportResponse)
@limiter.limit("5/minute")
async def exportUserData(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> DataExportResponse:
"""
Export all personal data (GDPR Article 15).
Returns all data associated with the authenticated user including:
- User profile data
- Mandate memberships
- Feature access records
- Invitations created and used
Note: This exports Gateway-level data only. Feature-specific data
(e.g., chat workflows, trustee contracts) should be exported via
feature-specific endpoints.
"""
try:
rootInterface = getRootInterface()
# User data (exclude sensitive fields)
userData = {
"id": str(currentUser.id),
"username": currentUser.username,
"email": currentUser.email,
"firstname": currentUser.firstname,
"lastname": currentUser.lastname,
"enabled": currentUser.enabled,
"isSysAdmin": getattr(currentUser, "isSysAdmin", False),
"createdAt": getattr(currentUser, "createdAt", None),
"updatedAt": getattr(currentUser, "updatedAt", None),
"lastLogin": getattr(currentUser, "lastLogin", None),
"language": getattr(currentUser, "language", None),
"authenticationAuthority": str(getattr(currentUser, "authenticationAuthority", ""))
}
# Mandate memberships
from modules.datamodels.datamodelMembership import UserMandate
userMandates = rootInterface.db.getRecordset(
UserMandate,
recordFilter={"userId": str(currentUser.id)}
)
mandates = []
for um in userMandates:
mandateId = um.get("mandateId")
# Get mandate details
from modules.datamodels.datamodelUam import Mandate
mandateRecords = rootInterface.db.getRecordset(
Mandate,
recordFilter={"id": mandateId}
)
mandateName = mandateRecords[0].get("name") if mandateRecords else "Unknown"
# Get roles for this membership
roleIds = rootInterface.getRoleIdsForUserMandate(um.get("id"))
mandates.append({
"userMandateId": um.get("id"),
"mandateId": mandateId,
"mandateName": mandateName,
"enabled": um.get("enabled", True),
"roleIds": roleIds,
"joinedAt": um.get("createdAt")
})
# Feature access records
from modules.datamodels.datamodelMembership import FeatureAccess
featureAccesses = rootInterface.db.getRecordset(
FeatureAccess,
recordFilter={"userId": str(currentUser.id)}
)
featureAccessList = []
for fa in featureAccesses:
instanceId = fa.get("featureInstanceId")
# Get instance details
from modules.datamodels.datamodelFeatures import FeatureInstance
instanceRecords = rootInterface.db.getRecordset(
FeatureInstance,
recordFilter={"id": instanceId}
)
instanceInfo = instanceRecords[0] if instanceRecords else {}
roleIds = rootInterface.getRoleIdsForFeatureAccess(fa.get("id"))
featureAccessList.append({
"featureAccessId": fa.get("id"),
"featureInstanceId": instanceId,
"featureCode": instanceInfo.get("featureCode"),
"instanceLabel": instanceInfo.get("label"),
"enabled": fa.get("enabled", True),
"roleIds": roleIds
})
# Invitations created by user
from modules.datamodels.datamodelInvitation import Invitation
invitationsCreated = rootInterface.db.getRecordset(
Invitation,
recordFilter={"createdBy": str(currentUser.id)}
)
invitationsCreatedList = [
{
"id": inv.get("id"),
"mandateId": inv.get("mandateId"),
"createdAt": inv.get("createdAt"),
"expiresAt": inv.get("expiresAt"),
"maxUses": inv.get("maxUses"),
"currentUses": inv.get("currentUses")
}
for inv in invitationsCreated
]
# Invitations used by user
invitationsUsed = rootInterface.db.getRecordset(
Invitation,
recordFilter={"usedBy": str(currentUser.id)}
)
invitationsUsedList = [
{
"id": inv.get("id"),
"mandateId": inv.get("mandateId"),
"usedAt": inv.get("usedAt")
}
for inv in invitationsUsed
]
# Audit log - GDPR Article 15 data export
audit_logger.logGdprEvent(
userId=str(currentUser.id),
mandateId="system",
action="gdpr_data_export",
details="User requested data export (GDPR Article 15 - Right of Access)",
ipAddress=request.client.host if request.client else None
)
logger.info(f"User {currentUser.id} exported personal data (GDPR Art. 15)")
return DataExportResponse(
exportedAt=getUtcTimestamp(),
userId=str(currentUser.id),
userData=userData,
mandates=mandates,
featureAccesses=featureAccessList,
invitationsCreated=invitationsCreatedList,
invitationsUsed=invitationsUsedList
)
except Exception as e:
logger.error(f"Error exporting user data: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to export data: {str(e)}"
)
# =============================================================================
# Article 20: Right to Data Portability
# =============================================================================
@router.get("/data-portability")
@limiter.limit("5/minute")
async def exportPortableData(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> JSONResponse:
"""
Export data in portable, machine-readable format (GDPR Article 20).
Returns data in JSON-LD format suitable for transfer to another service.
This is a structured format that can be easily parsed by machines.
"""
try:
# Get full export data first
rootInterface = getRootInterface()
# Build portable data structure
portableData = {
"@context": "https://schema.org",
"@type": "Person",
"identifier": str(currentUser.id),
"name": f"{currentUser.firstname or ''} {currentUser.lastname or ''}".strip() or currentUser.username,
"email": currentUser.email,
"additionalProperty": []
}
# Add profile properties
if currentUser.firstname:
portableData["givenName"] = currentUser.firstname
if currentUser.lastname:
portableData["familyName"] = currentUser.lastname
# Add mandate memberships as organization affiliations
from modules.datamodels.datamodelMembership import UserMandate
userMandates = rootInterface.db.getRecordset(
UserMandate,
recordFilter={"userId": str(currentUser.id)}
)
affiliations = []
for um in userMandates:
from modules.datamodels.datamodelUam import Mandate
mandateRecords = rootInterface.db.getRecordset(
Mandate,
recordFilter={"id": um.get("mandateId")}
)
if mandateRecords:
mandate = mandateRecords[0]
affiliations.append({
"@type": "Organization",
"identifier": um.get("mandateId"),
"name": mandate.get("name"),
"membershipActive": um.get("enabled", True)
})
if affiliations:
portableData["affiliation"] = affiliations
# Wrap in export envelope
exportEnvelope = {
"@context": "https://schema.org",
"@type": "DataDownload",
"identifier": f"export-{currentUser.id}-{int(getUtcTimestamp())}",
"dateCreated": _timestampToIso(getUtcTimestamp()),
"encodingFormat": "application/ld+json",
"about": portableData
}
# Audit log - GDPR Article 20 data portability
audit_logger.logGdprEvent(
userId=str(currentUser.id),
mandateId="system",
action="gdpr_data_portability",
details="User requested portable data export (GDPR Article 20 - Right to Data Portability)",
ipAddress=request.client.host if request.client else None
)
logger.info(f"User {currentUser.id} exported portable data (GDPR Art. 20)")
return JSONResponse(
content=exportEnvelope,
media_type="application/ld+json"
)
except Exception as e:
logger.error(f"Error exporting portable data: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to export data: {str(e)}"
)
# =============================================================================
# Article 17: Right to Erasure
# =============================================================================
@router.delete("/", response_model=DeletionResult)
@limiter.limit("1/hour")
async def deleteAccount(
request: Request,
confirmDeletion: bool = False,
currentUser: User = Depends(getCurrentUser)
) -> DeletionResult:
"""
Delete own account and all associated data (GDPR Article 17).
IMPORTANT: This action is irreversible!
- All user data will be permanently deleted
- All mandate memberships will be removed
- All feature accesses will be removed
- All created invitations will be revoked
Args:
confirmDeletion: Must be True to confirm deletion
"""
if not confirmDeletion:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Deletion not confirmed. Set confirmDeletion=true to proceed."
)
# Prevent SysAdmin self-deletion (safety measure)
if getattr(currentUser, "isSysAdmin", False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="SysAdmin accounts cannot be self-deleted. Contact another SysAdmin."
)
try:
rootInterface = getRootInterface()
deletedData = []
# 1. Revoke all invitations created by user
from modules.datamodels.datamodelInvitation import Invitation
userInvitations = rootInterface.db.getRecordset(
Invitation,
recordFilter={"createdBy": str(currentUser.id)}
)
for inv in userInvitations:
rootInterface.db.recordUpdate(
Invitation,
inv.get("id"),
{"revokedAt": getUtcTimestamp()}
)
deletedData.append(f"Invitations revoked: {len(userInvitations)}")
# 2. Delete feature accesses (CASCADE will delete FeatureAccessRoles)
from modules.datamodels.datamodelMembership import FeatureAccess
featureAccesses = rootInterface.db.getRecordset(
FeatureAccess,
recordFilter={"userId": str(currentUser.id)}
)
for fa in featureAccesses:
rootInterface.db.recordDelete(FeatureAccess, fa.get("id"))
deletedData.append(f"Feature accesses deleted: {len(featureAccesses)}")
# 3. Delete mandate memberships (CASCADE will delete UserMandateRoles)
from modules.datamodels.datamodelMembership import UserMandate
userMandates = rootInterface.db.getRecordset(
UserMandate,
recordFilter={"userId": str(currentUser.id)}
)
for um in userMandates:
rootInterface.db.recordDelete(UserMandate, um.get("id"))
deletedData.append(f"Mandate memberships deleted: {len(userMandates)}")
# 4. Delete active tokens
from modules.datamodels.datamodelSecurity import Token
userTokens = rootInterface.db.getRecordset(
Token,
recordFilter={"userId": str(currentUser.id)}
)
for token in userTokens:
rootInterface.db.recordDelete(Token, token.get("id"))
deletedData.append(f"Tokens deleted: {len(userTokens)}")
# 5. Delete user connections (OAuth)
from modules.datamodels.datamodelUam import UserConnection
userConnections = rootInterface.db.getRecordset(
UserConnection,
recordFilter={"userId": str(currentUser.id)}
)
for conn in userConnections:
rootInterface.db.recordDelete(UserConnection, conn.get("id"))
deletedData.append(f"Connections deleted: {len(userConnections)}")
# 6. Finally, delete the user
deletedAt = getUtcTimestamp()
rootInterface.db.recordDelete(User, str(currentUser.id))
deletedData.append("User account deleted")
# Audit log (before user is deleted) - GDPR Article 17 account deletion
audit_logger.logGdprEvent(
userId=str(currentUser.id),
mandateId="system",
action="gdpr_account_deletion",
details=f"User deleted own account (GDPR Article 17 - Right to Erasure). Data: {', '.join(deletedData)}",
ipAddress=request.client.host if request.client else None
)
logger.info(f"User {currentUser.id} deleted own account (GDPR Art. 17)")
return DeletionResult(
success=True,
userId=str(currentUser.id),
deletedAt=deletedAt,
deletedData=deletedData,
message="Account and all associated data have been permanently deleted."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting account: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete account: {str(e)}"
)
# =============================================================================
# Consent Information Endpoint
# =============================================================================
@router.get("/consent-info", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def getConsentInfo(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get information about data processing and user rights (GDPR transparency).
Returns information about:
- What data is collected
- How data is processed
- User rights under GDPR
- Contact information for data protection inquiries
"""
return {
"dataCollected": {
"profile": "Name, email, username, language preferences",
"authentication": "Login timestamps, authentication provider",
"memberships": "Mandate and feature access records",
"activity": "Audit logs for security-relevant actions"
},
"dataProcessing": {
"purpose": "Providing multi-tenant platform services",
"legalBasis": "Contract fulfillment and legitimate interest",
"retention": "Data retained while account is active, deleted upon account deletion"
},
"userRights": {
"access": "GET /api/user/me/data-export (Article 15)",
"portability": "GET /api/user/me/data-portability (Article 20)",
"erasure": "DELETE /api/user/me (Article 17)",
"rectification": "PUT /api/local/me (Article 16)"
},
"contact": {
"email": "privacy@example.com",
"note": "For data protection inquiries, please contact us with your user ID"
}
}
# =============================================================================
# Helper Functions
# =============================================================================
def _timestampToIso(timestamp: float) -> str:
"""Convert Unix timestamp to ISO 8601 format"""
from datetime import datetime, timezone
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
return dt.isoformat()