gateway/modules/features/trustee/routeFeatureTrustee.py
2026-04-26 22:53:44 +02:00

2669 lines
107 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Routes for Trustee feature data management.
Implements CRUD operations for organisations, roles, access, contracts, documents, and positions.
URL Structure: /api/trustee/{instanceId}/{entity}
- instanceId is the FeatureInstance ID (required for all operations)
- This ensures proper multi-tenant isolation at the URL level
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Query, Response, UploadFile, File, Form
from fastapi.responses import StreamingResponse
from typing import List, Dict, Any, Optional
from fastapi import status
from pydantic import BaseModel, Field
import logging
import json
import io
import base64
from modules.auth import limiter, getRequestContext, RequestContext
from .interfaceFeatureTrustee import getInterface
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
from .datamodelFeatureTrustee import (
TrusteeOrganisation,
TrusteeRole,
TrusteeAccess,
TrusteeContract,
TrusteeDocument,
TrusteePosition,
TrusteePositionView,
TrusteeDataAccount,
TrusteeDataJournalEntry,
TrusteeDataJournalLine,
TrusteeDataContact,
TrusteeDataAccountBalance,
TrusteeAccountingConfig,
TrusteeAccountingSync,
)
from modules.datamodels.datamodelPagination import (
PaginationParams,
PaginatedResponse,
PaginationMetadata,
normalize_pagination_dict,
)
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeFeatureTrustee")
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/trustee",
tags=["Trustee"],
responses={404: {"description": "Not found"}}
)
# ===== Helper Functions =====
def _parsePagination(pagination: Optional[str]) -> Optional[PaginationParams]:
"""Parse pagination parameter from JSON string."""
if not pagination:
return None
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
return None
def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
"""
Validate that the user has access to the feature instance.
Returns the mandateId for the instance.
Args:
instanceId: The FeatureInstance ID from URL
context: The request context with user info
Returns:
mandateId of the instance
Raises:
HTTPException 404 if instance not found
HTTPException 403 if user doesn't have access
"""
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instanceId)
if not instance:
raise HTTPException(
status_code=404,
detail=f"Feature instance '{instanceId}' not found"
)
# Verify it's a trustee instance
if instance.featureCode != "trustee":
raise HTTPException(
status_code=400,
detail=f"Instance '{instanceId}' is not a trustee instance"
)
# Verify user has access to this instance
if not context.isPlatformAdmin:
# Check if user has FeatureAccess for this instance
featureAccesses = rootInterface.getFeatureAccessesForUser(str(context.user.id))
hasAccess = any(
str(fa.featureInstanceId) == instanceId and fa.enabled
for fa in featureAccesses
)
if not hasAccess:
raise HTTPException(
status_code=403,
detail=f"Access denied to feature instance '{instanceId}'"
)
return str(instance.mandateId)
# ============================================================================
# QUICK ACTIONS ENDPOINT
# ============================================================================
@router.get("/{instanceId}/quick-actions")
@limiter.limit("60/minute")
def getQuickActions(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
language: str = Query(default="de", description="Language code for labels"),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Return RBAC-filtered quick actions for the Trustee dashboard."""
mandateId = _validateInstanceAccess(instanceId, context)
from .mainTrustee import QUICK_ACTIONS, QUICK_ACTION_CATEGORIES
userRoleLabels: set = set()
rootInterface = getRootInterface()
if context.isPlatformAdmin:
userRoleLabels.add("trustee-admin")
featureAccesses = rootInterface.getFeatureAccessesForUser(str(context.user.id))
for fa in featureAccesses:
if str(fa.featureInstanceId) == instanceId and fa.enabled:
# FeatureAccess (Pydantic) has no `roleIds` field; the join lives in
# FeatureAccessRole and must be looked up via the interface helper.
roleIds = rootInterface.getRoleIdsForFeatureAccess(str(fa.id))
for rid in roleIds:
role = rootInterface.getRole(str(rid))
if role and role.roleLabel:
userRoleLabels.add(role.roleLabel)
from modules.shared.i18nRegistry import resolveText
lang = (language or "de").strip() or "de"
filteredActions = []
for action in QUICK_ACTIONS:
required = set(action.get("requiredRoles", []))
if not userRoleLabels and not context.isPlatformAdmin:
continue
if context.isPlatformAdmin or required.intersection(userRoleLabels):
resolved = {
"id": action["id"],
"label": resolveText(action.get("label", {}), lang=lang),
"description": resolveText(action.get("description", {}), lang=lang),
"icon": action.get("icon", ""),
"color": action.get("color", ""),
"category": action.get("category", ""),
"actionType": action.get("actionType", ""),
"config": action.get("config", {}),
"sortOrder": action.get("sortOrder", 99),
}
if resolved["actionType"] == "agentPrompt" and "config" in resolved:
cfg = dict(resolved["config"])
if "uploadHint" in cfg:
cfg["uploadHint"] = resolveText(cfg["uploadHint"], lang=lang)
resolved["config"] = cfg
filteredActions.append(resolved)
filteredActions.sort(key=lambda a: a["sortOrder"])
resolvedCategories = [
{"id": c["id"], "label": resolveText(c.get("label", {}), lang=lang), "sortOrder": c.get("sortOrder", 99)}
for c in QUICK_ACTION_CATEGORIES
]
return {"actions": filteredActions, "categories": resolvedCategories}
# ============================================================================
# ATTRIBUTES ENDPOINT (for FormGeneratorTable)
# ============================================================================
# Mapping of entity names to Pydantic model classes
_TRUSTEE_ENTITY_MODELS = {
"TrusteeOrganisation": TrusteeOrganisation,
"TrusteeRole": TrusteeRole,
"TrusteeAccess": TrusteeAccess,
"TrusteeContract": TrusteeContract,
"TrusteeDocument": TrusteeDocument,
"TrusteePosition": TrusteePosition,
"TrusteePositionView": TrusteePositionView,
# Read-only sync tables (TrusteeData*) and accounting bookkeeping
"TrusteeDataAccount": TrusteeDataAccount,
"TrusteeDataJournalEntry": TrusteeDataJournalEntry,
"TrusteeDataJournalLine": TrusteeDataJournalLine,
"TrusteeDataContact": TrusteeDataContact,
"TrusteeDataAccountBalance": TrusteeDataAccountBalance,
"TrusteeAccountingConfig": TrusteeAccountingConfig,
"TrusteeAccountingSync": TrusteeAccountingSync,
}
@router.get("/{instanceId}/attributes/{entityType}")
@limiter.limit("30/minute")
def get_entity_attributes(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
entityType: str = Path(..., description="Entity type (e.g., TrusteeDocument)"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get attribute definitions for a Trustee entity.
Used by FormGeneratorTable for dynamic column generation.
"""
# Validate instance access
_validateInstanceAccess(instanceId, context)
# Check if entity type is valid
if entityType not in _TRUSTEE_ENTITY_MODELS:
raise HTTPException(
status_code=404,
detail=f"Unknown entity type: {entityType}. Valid types: {list(_TRUSTEE_ENTITY_MODELS.keys())}"
)
# Get the model class
modelClass = _TRUSTEE_ENTITY_MODELS[entityType]
# Import the attribute utils
from modules.shared.attributeUtils import getModelAttributeDefinitions
try:
attrDefs = getModelAttributeDefinitions(modelClass)
# Filter to only visible attributes
visibleAttrs = [
attr for attr in attrDefs.get("attributes", [])
if isinstance(attr, dict) and attr.get("visible", True)
]
return {"attributes": visibleAttrs}
except Exception as e:
logger.error(f"Error getting attributes for {entityType}: {e}")
raise HTTPException(
status_code=500,
detail=f"Error getting attributes for {entityType}: {str(e)}"
)
# ============================================================================
# OPTIONS ENDPOINTS (for dropdowns)
# ============================================================================
@router.get("/mime-types/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_mime_type_options(
request: Request,
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get supported MIME types from the document extraction service.
Returns: [{ value: "mime/type", label: "Description" }]
"""
from modules.serviceCenter.services.serviceExtraction.subRegistry import ExtractorRegistry
registry = ExtractorRegistry()
formats = registry.getSupportedFormats()
# Collect all unique MIME types
allMimeTypes = set()
for mimeList in formats.get("mime_types", {}).values():
allMimeTypes.update(mimeList)
# Sort and create options with labels
result = []
for mimeType in sorted(allMimeTypes):
# Create readable label from mime type
parts = mimeType.split("/")
if len(parts) == 2:
mainType, subType = parts
# Clean up subtype for label
label = subType.replace("vnd.", "").replace("x-", "").replace("-", " ").title()
result.append({"value": mimeType, "label": f"{label} ({mimeType})"})
else:
result.append({"value": mimeType, "label": mimeType})
return result
@router.get("/{instanceId}/organisations/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_organisation_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get organisation options for select dropdowns. Returns: [{ value, label }]"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllOrganisations(None)
items = result.items if hasattr(result, 'items') else result
return [{"value": org["id"], "label": org.get("label") or org["id"]} for org in items]
@router.get("/{instanceId}/roles/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_role_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get role options for select dropdowns. Returns: [{ value, label }]"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllRoles(None)
items = result.items if hasattr(result, 'items') else result
return [{"value": role["id"], "label": role.get("desc") or role["id"]} for role in items]
@router.get("/{instanceId}/contracts/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_contract_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
organisationId: Optional[str] = Query(None, description="Optional: Filter by organisation ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""
Get contract options for select dropdowns.
Optionally filter by organisationId to get only contracts for a specific organisation.
This is useful for dependent dropdowns in forms.
Returns: [{ value, label }]
"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
if organisationId:
# Gefiltert nach Organisation
items = interface.getContractsByOrganisation(organisationId)
else:
# Alle Contracts
result = interface.getAllContracts(None)
items = result.items if hasattr(result, 'items') else result
return [{"value": c.id, "label": c.label or c.name or c.id} for c in items]
@router.get("/{instanceId}/documents/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_document_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get document options for select dropdowns. Returns: [{ id, value, label }]"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllDocuments(None)
items = result.items if hasattr(result, 'items') else result
# Include 'id' for FK resolution in tables
return [{"id": d.id, "value": d.id, "label": d.documentName or d.id} for d in items]
@router.get("/{instanceId}/positions/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_position_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get position options for select dropdowns. Returns: [{ id, value, label }]"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllPositions(None)
items = result.items if hasattr(result, 'items') else result
def _makePositionLabel(p: TrusteePosition) -> str:
from datetime import datetime as _dt, timezone as _tz
parts = []
if p.valuta:
parts.append(_dt.fromtimestamp(p.valuta, tz=_tz.utc).strftime("%Y-%m-%d"))
if p.company:
parts.append(p.company[:30])
if p.desc:
parts.append(p.desc[:30])
return " - ".join(parts) if parts else p.id
# Include 'id' for FK resolution in tables
return [{"id": p.id, "value": p.id, "label": _makePositionLabel(p)} for p in items]
# ============================================================================
# CRUD ENDPOINTS
# ============================================================================
# ===== Organisation Routes =====
@router.get("/{instanceId}/organisations")
@limiter.limit("30/minute")
def get_organisations(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"),
context: RequestContext = Depends(getRequestContext)
):
"""Get all organisations for a feature instance with optional pagination."""
from modules.routes.routeHelpers import enrichRowsWithFkLabels
mandateId = _validateInstanceAccess(instanceId, context)
paginationParams = _parsePagination(pagination)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllOrganisations(paginationParams)
def _toDicts(items):
return [r.model_dump() if hasattr(r, "model_dump") else r for r in items]
if paginationParams and hasattr(result, 'items'):
enriched = enrichRowsWithFkLabels(_toDicts(result.items), TrusteeOrganisation)
return {
"items": enriched,
"pagination": PaginationMetadata(
currentPage=paginationParams.page or 1,
pageSize=paginationParams.pageSize or 20,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort if paginationParams else [],
filters=paginationParams.filters if paginationParams else None
).model_dump(),
}
items = result if isinstance(result, list) else result.items
enriched = enrichRowsWithFkLabels(_toDicts(items), TrusteeOrganisation)
return {"items": enriched, "pagination": None}
@router.get("/{instanceId}/organisations/{orgId}", response_model=TrusteeOrganisation)
@limiter.limit("30/minute")
def get_organisation(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
orgId: str = Path(..., description="Organisation ID"),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeOrganisation:
"""Get a single organisation by ID."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
org = interface.getOrganisation(orgId)
if not org:
raise HTTPException(status_code=404, detail=f"Organisation {orgId} not found")
return org
@router.post("/{instanceId}/organisations", response_model=TrusteeOrganisation, status_code=201)
@limiter.limit("10/minute")
def create_organisation(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: TrusteeOrganisation = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeOrganisation:
"""Create a new organisation."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.createOrganisation(data.model_dump())
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to create organisation"))
return result
@router.put("/{instanceId}/organisations/{orgId}", response_model=TrusteeOrganisation)
@limiter.limit("10/minute")
def update_organisation(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
orgId: str = Path(..., description="Organisation ID"),
data: TrusteeOrganisation = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeOrganisation:
"""Update an organisation."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getOrganisation(orgId)
if not existing:
raise HTTPException(status_code=404, detail=f"Organisation {orgId} not found")
result = interface.updateOrganisation(orgId, data.model_dump(exclude={"id"}))
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to update organisation"))
return result
@router.delete("/{instanceId}/organisations/{orgId}")
@limiter.limit("10/minute")
def delete_organisation(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
orgId: str = Path(..., description="Organisation ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete an organisation."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getOrganisation(orgId)
if not existing:
raise HTTPException(status_code=404, detail=f"Organisation {orgId} not found")
success = interface.deleteOrganisation(orgId)
if not success:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to delete organisation"))
return {"message": f"Organisation {orgId} deleted"}
# ===== Role Routes =====
@router.get("/{instanceId}/roles")
@limiter.limit("30/minute")
def get_roles(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
context: RequestContext = Depends(getRequestContext)
):
"""Get all roles with optional pagination."""
from modules.routes.routeHelpers import enrichRowsWithFkLabels
mandateId = _validateInstanceAccess(instanceId, context)
paginationParams = _parsePagination(pagination)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllRoles(paginationParams)
def _toDicts(items):
return [r.model_dump() if hasattr(r, "model_dump") else r for r in items]
if paginationParams and hasattr(result, 'items'):
enriched = enrichRowsWithFkLabels(_toDicts(result.items), TrusteeRole)
return {
"items": enriched,
"pagination": PaginationMetadata(
currentPage=paginationParams.page or 1,
pageSize=paginationParams.pageSize or 20,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort if paginationParams else [],
filters=paginationParams.filters if paginationParams else None
).model_dump(),
}
items = result if isinstance(result, list) else result.items
enriched = enrichRowsWithFkLabels(_toDicts(items), TrusteeRole)
return {"items": enriched, "pagination": None}
@router.get("/{instanceId}/roles/{roleId}", response_model=TrusteeRole)
@limiter.limit("30/minute")
def get_role(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeRole:
"""Get a single role by ID."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
role = interface.getRole(roleId)
if not role:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
return role
@router.post("/{instanceId}/roles", response_model=TrusteeRole, status_code=201)
@limiter.limit("10/minute")
def create_role(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: TrusteeRole = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeRole:
"""Create a new role (sysadmin only)."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.createRole(data.model_dump())
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to create role"))
return result
@router.put("/{instanceId}/roles/{roleId}", response_model=TrusteeRole)
@limiter.limit("10/minute")
def update_role(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(...),
data: TrusteeRole = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeRole:
"""Update a role (sysadmin only)."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getRole(roleId)
if not existing:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
result = interface.updateRole(roleId, data.model_dump(exclude={"id"}))
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to update role"))
return result
@router.delete("/{instanceId}/roles/{roleId}")
@limiter.limit("10/minute")
def delete_role(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a role (sysadmin only, fails if in use)."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getRole(roleId)
if not existing:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
success = interface.deleteRole(roleId)
if not success:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to delete role (may be in use)"))
return {"message": f"Role {roleId} deleted"}
# ===== Access Routes =====
@router.get("/{instanceId}/access")
@limiter.limit("30/minute")
def get_all_access(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
context: RequestContext = Depends(getRequestContext)
):
"""Get all access records with optional pagination."""
from modules.routes.routeHelpers import enrichRowsWithFkLabels
mandateId = _validateInstanceAccess(instanceId, context)
paginationParams = _parsePagination(pagination)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllAccess(paginationParams)
def _toDicts(items):
return [r.model_dump() if hasattr(r, "model_dump") else r for r in items]
if paginationParams and hasattr(result, 'items'):
enriched = enrichRowsWithFkLabels(_toDicts(result.items), TrusteeAccess)
return {
"items": enriched,
"pagination": PaginationMetadata(
currentPage=paginationParams.page or 1,
pageSize=paginationParams.pageSize or 20,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort if paginationParams else [],
filters=paginationParams.filters if paginationParams else None
).model_dump(),
}
items = result if isinstance(result, list) else result.items
enriched = enrichRowsWithFkLabels(_toDicts(items), TrusteeAccess)
return {"items": enriched, "pagination": None}
@router.get("/{instanceId}/access/{accessId}", response_model=TrusteeAccess)
@limiter.limit("30/minute")
def get_access(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
accessId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeAccess:
"""Get a single access record by ID."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
access = interface.getAccess(accessId)
if not access:
raise HTTPException(status_code=404, detail=f"Access {accessId} not found")
return access
@router.get("/{instanceId}/access/organisation/{orgId}", response_model=List[TrusteeAccess])
@limiter.limit("30/minute")
def get_access_by_organisation(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
orgId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> List[TrusteeAccess]:
"""Get all access records for an organisation."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
return interface.getAccessByOrganisation(orgId)
@router.get("/{instanceId}/access/user/{userId}", response_model=List[TrusteeAccess])
@limiter.limit("30/minute")
def get_access_by_user(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
userId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> List[TrusteeAccess]:
"""Get all access records for a user."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
return interface.getAccessByUser(userId)
@router.post("/{instanceId}/access", response_model=TrusteeAccess, status_code=201)
@limiter.limit("10/minute")
def create_access(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: TrusteeAccess = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeAccess:
"""Create a new access record."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.createAccess(data.model_dump())
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to create access"))
return result
@router.put("/{instanceId}/access/{accessId}", response_model=TrusteeAccess)
@limiter.limit("10/minute")
def update_access(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
accessId: str = Path(...),
data: TrusteeAccess = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeAccess:
"""Update an access record."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getAccess(accessId)
if not existing:
raise HTTPException(status_code=404, detail=f"Access {accessId} not found")
result = interface.updateAccess(accessId, data.model_dump(exclude={"id"}))
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to update access"))
return result
@router.delete("/{instanceId}/access/{accessId}")
@limiter.limit("10/minute")
def delete_access(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
accessId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete an access record."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getAccess(accessId)
if not existing:
raise HTTPException(status_code=404, detail=f"Access {accessId} not found")
success = interface.deleteAccess(accessId)
if not success:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to delete access"))
return {"message": f"Access {accessId} deleted"}
# ===== Contract Routes =====
@router.get("/{instanceId}/contracts")
@limiter.limit("30/minute")
def get_contracts(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
context: RequestContext = Depends(getRequestContext)
):
"""Get all contracts with optional pagination."""
from modules.routes.routeHelpers import enrichRowsWithFkLabels
mandateId = _validateInstanceAccess(instanceId, context)
paginationParams = _parsePagination(pagination)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllContracts(paginationParams)
def _toDicts(items):
return [r.model_dump() if hasattr(r, "model_dump") else r for r in items]
if paginationParams and hasattr(result, 'items'):
enriched = enrichRowsWithFkLabels(_toDicts(result.items), TrusteeContract)
return {
"items": enriched,
"pagination": PaginationMetadata(
currentPage=paginationParams.page or 1,
pageSize=paginationParams.pageSize or 20,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort if paginationParams else [],
filters=paginationParams.filters if paginationParams else None
).model_dump(),
}
items = result if isinstance(result, list) else result.items
enriched = enrichRowsWithFkLabels(_toDicts(items), TrusteeContract)
return {"items": enriched, "pagination": None}
@router.get("/{instanceId}/contracts/{contractId}", response_model=TrusteeContract)
@limiter.limit("30/minute")
def get_contract(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
contractId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeContract:
"""Get a single contract by ID."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
contract = interface.getContract(contractId)
if not contract:
raise HTTPException(status_code=404, detail=f"Contract {contractId} not found")
return contract
@router.get("/{instanceId}/contracts/organisation/{orgId}", response_model=List[TrusteeContract])
@limiter.limit("30/minute")
def get_contracts_by_organisation(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
orgId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> List[TrusteeContract]:
"""Get all contracts for an organisation."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
return interface.getContractsByOrganisation(orgId)
@router.post("/{instanceId}/contracts", response_model=TrusteeContract, status_code=201)
@limiter.limit("10/minute")
def create_contract(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: TrusteeContract = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeContract:
"""Create a new contract."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.createContract(data.model_dump())
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to create contract"))
return result
@router.put("/{instanceId}/contracts/{contractId}", response_model=TrusteeContract)
@limiter.limit("10/minute")
def update_contract(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
contractId: str = Path(...),
data: TrusteeContract = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeContract:
"""Update a contract (organisationId is immutable)."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getContract(contractId)
if not existing:
raise HTTPException(status_code=404, detail=f"Contract {contractId} not found")
result = interface.updateContract(contractId, data.model_dump(exclude={"id"}))
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to update contract (organisationId cannot be changed)"))
return result
@router.delete("/{instanceId}/contracts/{contractId}")
@limiter.limit("10/minute")
def delete_contract(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
contractId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a contract."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getContract(contractId)
if not existing:
raise HTTPException(status_code=404, detail=f"Contract {contractId} not found")
success = interface.deleteContract(contractId)
if not success:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to delete contract"))
return {"message": f"Contract {contractId} deleted"}
# ===== Document Routes =====
@router.get("/{instanceId}/documents")
@limiter.limit("30/minute")
def get_documents(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' for distinct column values, 'ids' for all filtered IDs"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext)
):
"""Get all documents (metadata only) with optional pagination."""
mandateId = _validateInstanceAccess(instanceId, context)
if mode in ("filterValues", "ids"):
return _handleDocumentMode(instanceId, mandateId, mode, column, pagination, context)
paginationParams = _parsePagination(pagination)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllDocuments(paginationParams)
def _itemsToDicts(items):
return [r.model_dump() if hasattr(r, 'model_dump') else r for r in items]
if paginationParams and hasattr(result, 'items'):
return {
"items": _itemsToDicts(result.items),
"pagination": PaginationMetadata(
currentPage=paginationParams.page or 1,
pageSize=paginationParams.pageSize or 20,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort if paginationParams else [],
filters=paginationParams.filters if paginationParams else None
).model_dump(),
}
items = result if isinstance(result, list) else result.items
return {"items": _itemsToDicts(items), "pagination": None}
def _handleDocumentMode(instanceId, mandateId, mode, column, pagination, context):
"""Handle mode=filterValues and mode=ids for trustee documents."""
from modules.routes.routeHelpers import handleIdsInMemory, handleFilterValuesInMemory, enrichRowsWithFkLabels
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
if mode == "filterValues":
if not column:
raise HTTPException(status_code=400, detail="column parameter required for mode=filterValues")
result = interface.getAllDocuments(None)
items = [r.model_dump() if hasattr(r, 'model_dump') else r for r in (result.items if hasattr(result, 'items') else result)]
enrichRowsWithFkLabels(items, TrusteeDocument)
return handleFilterValuesInMemory(items, column, pagination)
if mode == "ids":
result = interface.getAllDocuments(None)
items = [r.model_dump() if hasattr(r, 'model_dump') else r for r in (result.items if hasattr(result, 'items') else result)]
return handleIdsInMemory(items, pagination)
@router.get("/{instanceId}/documents/{documentId}", response_model=TrusteeDocument)
@limiter.limit("30/minute")
def get_document(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
documentId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeDocument:
"""Get document metadata by ID."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
doc = interface.getDocument(documentId)
if not doc:
raise HTTPException(status_code=404, detail=f"Document {documentId} not found")
return doc
@router.get("/{instanceId}/documents/{documentId}/data")
@limiter.limit("10/minute")
def get_document_data(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
documentId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
):
"""Download document binary data."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
doc = interface.getDocument(documentId)
if not doc:
raise HTTPException(status_code=404, detail=f"Document {documentId} not found")
data = interface.getDocumentData(documentId)
if not data:
raise HTTPException(status_code=404, detail=routeApiMsg("Document data not found"))
return StreamingResponse(
io.BytesIO(data),
media_type=doc.documentMimeType or "application/octet-stream",
headers={"Content-Disposition": f"attachment; filename={doc.documentName or 'document'}"}
)
@router.get("/{instanceId}/documents/contract/{contractId}", response_model=List[TrusteeDocument])
@limiter.limit("30/minute")
def get_documents_by_contract(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
contractId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> List[TrusteeDocument]:
"""Get all documents for a contract."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
return interface.getDocumentsByContract(contractId)
@router.post("/{instanceId}/documents", response_model=TrusteeDocument, status_code=201)
@limiter.limit("10/minute")
async def create_document(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeDocument:
"""Create a new document. Accepts JSON body with optional base64-encoded documentData."""
mandateId = _validateInstanceAccess(instanceId, context)
# Parse JSON body
body = await request.json()
# Handle documentData: convert base64 string to bytes if present
if "documentData" in body and body["documentData"]:
dataValue = body["documentData"]
if isinstance(dataValue, str):
# Base64-encoded data from frontend
try:
body["documentData"] = base64.b64decode(dataValue)
except Exception as e:
logger.warning(f"Failed to decode base64 documentData: {e}")
body["documentData"] = None
elif isinstance(dataValue, bytes):
# Already bytes
pass
else:
# Unknown format (e.g., File object serialized wrong)
body["documentData"] = None
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.createDocument(body)
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to create document"))
return result
@router.post("/{instanceId}/documents/upload", response_model=TrusteeDocument, status_code=201)
@limiter.limit("10/minute")
async def upload_document(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
file: UploadFile = File(..., description="Document file"),
documentName: str = Form(..., description="Document name"),
documentMimeType: str = Form(default="application/octet-stream", description="MIME type"),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeDocument:
"""Upload a document with multipart/form-data."""
mandateId = _validateInstanceAccess(instanceId, context)
# Read file content
fileContent = await file.read()
# Build document data
docData = {
"documentName": documentName,
"documentMimeType": documentMimeType or file.content_type or "application/octet-stream",
"documentData": fileContent
}
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.createDocument(docData)
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to create document"))
return result
@router.put("/{instanceId}/documents/{documentId}", response_model=TrusteeDocument)
@limiter.limit("10/minute")
def update_document(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
documentId: str = Path(...),
data: TrusteeDocument = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteeDocument:
"""Update document metadata."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getDocument(documentId)
if not existing:
raise HTTPException(status_code=404, detail=f"Document {documentId} not found")
result = interface.updateDocument(documentId, data.model_dump(exclude={"id"}))
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to update document"))
return result
@router.delete("/{instanceId}/documents/{documentId}")
@limiter.limit("10/minute")
def delete_document(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
documentId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a document."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getDocument(documentId)
if not existing:
raise HTTPException(status_code=404, detail=f"Document {documentId} not found")
success = interface.deleteDocument(documentId)
if not success:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to delete document"))
return {"message": f"Document {documentId} deleted"}
# ===== Position Routes =====
def _buildSyncStatusByPosition(interface, instanceId: str) -> Dict[str, Dict[str, Optional[str]]]:
"""Build a positionId -> {syncStatus, syncErrorMessage} map from
`TrusteeAccountingSync` records for the given feature instance.
Preference order matches the historic UI logic: ``synced`` overrides
``error``, so a successful retry hides an old failure. Any other status
(`pending`, `cancelled`, ...) is kept verbatim.
"""
from .datamodelFeatureTrustee import TrusteeAccountingSync
syncRecords = interface.db.getRecordset(
TrusteeAccountingSync, recordFilter={"featureInstanceId": instanceId}
) or []
syncMap: Dict[str, Dict[str, Optional[str]]] = {}
for rec in syncRecords:
positionId = rec.get("positionId")
if not positionId:
continue
status = rec.get("syncStatus")
errorMessage = rec.get("errorMessage")
current = syncMap.get(positionId)
prefer = (
current is None
or status == "synced"
or (current.get("syncStatus") != "synced" and status == "error")
)
if prefer:
syncMap[positionId] = {
"syncStatus": status,
"syncErrorMessage": errorMessage,
}
return syncMap
def _enrichPositionsWithSyncStatus(items: List[Dict[str, Any]], interface, instanceId: str) -> List[Dict[str, Any]]:
"""In-place enrich each position dict with `syncStatus` + `syncErrorMessage`."""
syncMap = _buildSyncStatusByPosition(interface, instanceId)
for row in items:
info = syncMap.get(row.get("id")) or {}
row["syncStatus"] = info.get("syncStatus")
row["syncErrorMessage"] = info.get("syncErrorMessage")
return items
@router.get("/{instanceId}/positions")
@limiter.limit("30/minute")
def get_positions(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' for distinct column values, 'ids' for all filtered IDs"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext)
):
"""Get all positions with optional pagination."""
mandateId = _validateInstanceAccess(instanceId, context)
if mode in ("filterValues", "ids"):
return _handlePositionMode(instanceId, mandateId, mode, column, pagination, context)
paginationParams = _parsePagination(pagination)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllPositions(paginationParams)
def _itemsToDicts(items):
return [r.model_dump() if hasattr(r, 'model_dump') else r for r in items]
if paginationParams and hasattr(result, 'items'):
items = _itemsToDicts(result.items)
_enrichPositionsWithSyncStatus(items, interface, instanceId)
return {
"items": items,
"pagination": PaginationMetadata(
currentPage=paginationParams.page or 1,
pageSize=paginationParams.pageSize or 20,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort if paginationParams else [],
filters=paginationParams.filters if paginationParams else None
).model_dump(),
}
rawItems = result if isinstance(result, list) else result.items
items = _itemsToDicts(rawItems)
_enrichPositionsWithSyncStatus(items, interface, instanceId)
return {"items": items, "pagination": None}
def _handlePositionMode(instanceId, mandateId, mode, column, pagination, context):
"""Handle mode=filterValues and mode=ids for trustee positions."""
from modules.routes.routeHelpers import handleIdsInMemory, handleFilterValuesInMemory, enrichRowsWithFkLabels
from .datamodelFeatureTrustee import TrusteePositionView
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
if mode == "filterValues":
if not column:
raise HTTPException(status_code=400, detail="column parameter required for mode=filterValues")
result = interface.getAllPositions(None)
items = [r.model_dump() if hasattr(r, 'model_dump') else r for r in (result.items if hasattr(result, 'items') else result)]
_enrichPositionsWithSyncStatus(items, interface, instanceId)
# Use the view model so FK labels for the synthetic columns also resolve.
enrichRowsWithFkLabels(items, TrusteePositionView)
return handleFilterValuesInMemory(items, column, pagination)
if mode == "ids":
result = interface.getAllPositions(None)
items = [r.model_dump() if hasattr(r, 'model_dump') else r for r in (result.items if hasattr(result, 'items') else result)]
_enrichPositionsWithSyncStatus(items, interface, instanceId)
return handleIdsInMemory(items, pagination)
@router.get("/{instanceId}/positions/{positionId}", response_model=TrusteePosition)
@limiter.limit("30/minute")
def get_position(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
positionId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteePosition:
"""Get a single position by ID."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
position = interface.getPosition(positionId)
if not position:
raise HTTPException(status_code=404, detail=f"Position {positionId} not found")
return position
@router.get("/{instanceId}/positions/contract/{contractId}", response_model=List[TrusteePosition])
@limiter.limit("30/minute")
def get_positions_by_contract(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
contractId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> List[TrusteePosition]:
"""Get all positions for a contract."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
return interface.getPositionsByContract(contractId)
@router.get("/{instanceId}/positions/organisation/{orgId}", response_model=List[TrusteePosition])
@limiter.limit("30/minute")
def get_positions_by_organisation(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
orgId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> List[TrusteePosition]:
"""Get all positions for an organisation."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
return interface.getPositionsByOrganisation(orgId)
@router.post("/{instanceId}/positions", response_model=TrusteePosition, status_code=201)
@limiter.limit("10/minute")
def create_position(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: TrusteePosition = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteePosition:
"""Create a new position."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.createPosition(data.model_dump())
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to create position"))
return result
@router.put("/{instanceId}/positions/{positionId}", response_model=TrusteePosition)
@limiter.limit("10/minute")
def update_position(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
positionId: str = Path(...),
data: TrusteePosition = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteePosition:
"""Update a position."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getPosition(positionId)
if not existing:
raise HTTPException(status_code=404, detail=f"Position {positionId} not found")
result = interface.updatePosition(positionId, data.model_dump(exclude={"id"}))
if not result:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to update position"))
return result
@router.delete("/{instanceId}/positions/{positionId}")
@limiter.limit("10/minute")
def delete_position(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
positionId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a position."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
existing = interface.getPosition(positionId)
if not existing:
raise HTTPException(status_code=404, detail=f"Position {positionId} not found")
success = interface.deletePosition(positionId)
if not success:
raise HTTPException(status_code=400, detail=routeApiMsg("Failed to delete position"))
return {"message": f"Position {positionId} deleted"}
# ===== Accounting Integration Endpoints =====
@router.get("/{instanceId}/accounting/connectors")
@limiter.limit("30/minute")
def get_available_accounting_connectors(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""List all available accounting system connectors with their config fields."""
_validateInstanceAccess(instanceId, context)
from .accounting.accountingRegistry import getAccountingRegistry
return getAccountingRegistry().getAvailableConnectors()
# Placeholder returned for secret config fields so frontend can prefill form without sending real secrets.
_CONFIG_PLACEHOLDER = "***"
def _getConfigMasked(connectorType: str, plainConfig: Dict[str, Any]) -> Dict[str, str]:
"""Build config with secret values replaced by placeholder for GET response."""
from .accounting.accountingRegistry import getAccountingRegistry
connector = getAccountingRegistry().getConnector(connectorType)
if not connector:
return {k: (v if isinstance(v, str) else str(v)) for k, v in (plainConfig or {}).items()}
secretKeys = {f.key for f in connector.getRequiredConfigFields() if f.secret}
return {
k: _CONFIG_PLACEHOLDER if k in secretKeys else (v if isinstance(v, str) else str(v) if v is not None else "")
for k, v in (plainConfig or {}).items()
}
@router.get("/{instanceId}/accounting/config")
@limiter.limit("30/minute")
def get_accounting_config(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get the active accounting config for this instance. Credentials are masked (secret fields = ***) for form prefill."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .datamodelFeatureTrustee import TrusteeAccountingConfig
from modules.shared.configuration import decryptValue
records = interface.db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId, "isActive": True})
if not records:
return {"configured": False}
record = {k: v for k, v in records[0].items() if not k.startswith("_")}
encryptedConfig = record.pop("encryptedConfig", None)
record["configured"] = True
if encryptedConfig:
try:
import json
plain = json.loads(decryptValue(encryptedConfig, keyName="accountingConfig"))
record["configMasked"] = _getConfigMasked(record.get("connectorType", ""), plain)
except Exception:
record["configMasked"] = {}
else:
record["configMasked"] = {}
return record
class SaveAccountingConfigBody(BaseModel):
"""Request body for saving accounting config. Ensures 'config' is present and used."""
connectorType: str = ""
displayLabel: str = ""
config: Dict[str, Any] = Field(default_factory=dict, description="Connector credentials (e.g. clientName, apiKey)")
@router.post("/{instanceId}/accounting/config", status_code=201)
@limiter.limit("5/minute")
async def save_accounting_config(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
body: SaveAccountingConfigBody = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Save or update the accounting config for this instance.
Body: { connectorType, displayLabel, config: { clientName, apiKey, ... } }
The 'config' object is stored encrypted; without it credentials would be empty in DB.
"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .datamodelFeatureTrustee import TrusteeAccountingConfig
from modules.shared.configuration import encryptValue
import uuid as _uuid
plainConfig = body.config if isinstance(body.config, dict) else {}
# When updating, empty config is normal (frontend never receives credentials from GET).
# Do not overwrite encryptedConfig with empty keep existing credentials.
if not plainConfig and body.connectorType:
logger.warning("Accounting config save: config is empty (credentials will not be stored or updated)")
else:
logger.info(
"Accounting config save: instanceId=%s connectorType=%s configKeys=%s",
instanceId, body.connectorType, list(plainConfig.keys())
)
existing = interface.db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId})
if existing:
configId = existing[0].get("id")
updatePayload = {
"connectorType": body.connectorType or "",
"displayLabel": body.displayLabel or "",
"isActive": True,
}
if plainConfig:
# Merge with existing: placeholder or empty = keep existing value (so form prefill does not overwrite secrets).
from modules.shared.configuration import decryptValue
existingEnc = existing[0].get("encryptedConfig") or ""
merged = {}
if existingEnc:
try:
merged = json.loads(decryptValue(existingEnc, keyName="accountingConfig"))
except Exception:
pass
for k, v in plainConfig.items():
if v is not None and str(v).strip() and str(v).strip() != _CONFIG_PLACEHOLDER:
merged[k] = v
updatePayload["encryptedConfig"] = encryptValue(json.dumps(merged), keyName="accountingConfig")
interface.db.recordModify(TrusteeAccountingConfig, configId, updatePayload)
await _refreshChartSilently(interface, instanceId)
return {"message": "Accounting config updated", "id": configId}
if not plainConfig:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=routeApiMsg("config is required for new integration (e.g. clientName, apiKey).")
)
encryptedConfig = encryptValue(json.dumps(plainConfig), keyName="accountingConfig")
configRecord = {
"id": str(_uuid.uuid4()),
"featureInstanceId": instanceId,
"connectorType": body.connectorType or "",
"displayLabel": body.displayLabel or "",
"encryptedConfig": encryptedConfig,
"isActive": True,
"mandateId": mandateId,
}
interface.db.recordCreate(TrusteeAccountingConfig, configRecord)
await _refreshChartSilently(interface, instanceId)
return {"message": "Accounting config created", "id": configRecord["id"]}
@router.post("/{instanceId}/accounting/test-connection")
@limiter.limit("5/minute")
async def test_accounting_connection(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Test the connection to the configured accounting system. On success, refreshes the local chart-of-accounts cache."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
result = await bridge.testConnection(instanceId)
if result.success:
await _refreshChartSilently(interface, instanceId)
return result.model_dump()
@router.delete("/{instanceId}/accounting/config")
@limiter.limit("5/minute")
def delete_accounting_config(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Remove the accounting integration for this instance."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .datamodelFeatureTrustee import TrusteeAccountingConfig
records = interface.db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId})
for r in records:
interface.db.recordDelete(TrusteeAccountingConfig, r.get("id"))
return {"message": "Accounting config removed"}
@router.get("/{instanceId}/accounting/chart-of-accounts")
@limiter.limit("10/minute")
async def get_chart_of_accounts(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
accountType: Optional[str] = Query(None, description="Filter by type: expense, asset, liability, revenue"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Load the chart of accounts from the connected accounting system. Optional filter by accountType."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
charts = await bridge.getChartOfAccounts(instanceId, accountType=accountType)
return [c.model_dump() for c in charts]
async def _refreshChartSilently(interface, instanceId: str) -> None:
"""Best-effort chart-of-accounts cache refresh. Logs but does not raise on failure."""
try:
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
charts = await bridge.refreshChartOfAccounts(instanceId)
logger.info(f"Chart cache refreshed: {len(charts)} entries for instance {instanceId}")
except Exception as e:
logger.warning(f"Chart cache refresh failed (non-critical): {e}")
@router.post("/{instanceId}/accounting/refresh-chart")
@limiter.limit("5/minute")
async def refresh_chart_of_accounts(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Explicitly refresh the locally cached chart of accounts from the external system."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
charts = await bridge.refreshChartOfAccounts(instanceId)
return {"message": f"Chart of accounts refreshed: {len(charts)} entries", "count": len(charts)}
TRUSTEE_ACCOUNTING_PUSH_JOB_TYPE = "trusteeAccountingPush"
async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> Dict[str, Any]:
"""BackgroundJob handler: pushes a batch of positions to the external
accounting system. Runs in the worker without blocking the original HTTP
request, so the user can continue navigating while the sync runs.
Reads inputs from `job["payload"]` (`positionIds`) and reports incremental
progress via `progressCb(percent, message)`. The job result has the same
shape that the legacy synchronous endpoint used to return.
"""
from modules.security.rootAccess import getRootUser
from .accounting.accountingBridge import AccountingBridge
instanceId = job["featureInstanceId"]
mandateId = job["mandateId"]
payload = job.get("payload") or {}
positionIds: List[str] = list(payload.get("positionIds") or [])
if not positionIds:
return {"total": 0, "success": 0, "skipped": 0, "errors": 0, "results": []}
rootUser = getRootUser()
interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId)
bridge = AccountingBridge(interface)
results = []
total = len(positionIds)
progressCb(2, f"Sync wird vorbereitet ({total} Position(en))...")
# Resolve connector + plain config once to avoid decryption rate-limits
# (mirrors the optimisation in pushBatchToAccounting). We push positions
# one-by-one inside the job so we can emit incremental progress and so
# one bad row never aborts the rest.
from .accounting.accountingBridge import SyncResult
try:
connector, plainConfig, configRecord = await bridge._resolveConnectorAndConfig(instanceId)
except Exception as resolveErr:
logger.exception("Accounting push: failed to resolve connector/config")
progressCb(100, "Verbindungsaufbau fehlgeschlagen.")
raise resolveErr
if not connector or not plainConfig:
results = [SyncResult(success=False, errorMessage="No active accounting configuration found") for _ in positionIds]
progressCb(100, "Keine aktive Buchhaltungs-Konfiguration gefunden.")
return {
"total": len(results),
"success": 0,
"skipped": 0,
"errors": len(results),
"results": [r.model_dump() for r in results],
}
for index, positionId in enumerate(positionIds, start=1):
result = await bridge.pushPositionToAccounting(
instanceId,
positionId,
_resolvedConnector=connector,
_resolvedPlainConfig=plainConfig,
_resolvedConfigRecord=configRecord,
)
results.append(result)
# Reserve 5..95% for the push loop, keep the tail for summary.
pct = 5 + int(90 * index / total)
progressCb(pct, f"Position {index}/{total} verarbeitet")
skipped = [r for r in results if not r.success and r.errorMessage and "already synced" in r.errorMessage]
failed = [r for r in results if not r.success and r not in skipped]
if skipped:
logger.info("Accounting sync: %s position(s) already synced, skipped", len(skipped))
if failed:
logger.warning(
"Accounting sync had %s failure(s): %s",
len(failed),
"; ".join(r.errorMessage or "unknown" for r in failed[:3]),
)
progressCb(100, "Sync abgeschlossen.")
return {
"total": len(results),
"success": sum(1 for r in results if r.success),
"skipped": len(skipped),
"errors": len(failed),
"results": [r.model_dump() for r in results],
}
try:
from modules.serviceCenter.services.serviceBackgroundJobs import registerJobHandler as _registerPushJobHandler
_registerPushJobHandler(TRUSTEE_ACCOUNTING_PUSH_JOB_TYPE, _trusteeAccountingPushJobHandler)
except Exception as _pushRegErr:
logger.warning("Failed to register trusteeAccountingPush job handler: %s", _pushRegErr)
@router.post("/{instanceId}/accounting/sync", status_code=status.HTTP_202_ACCEPTED)
@limiter.limit("5/minute")
async def sync_positions_to_accounting(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Submit a background job that pushes positions to the accounting system.
Body: ``{ positionIds: [...] }``
Returns ``{ jobId, status: "pending" }`` immediately so the user is not
blocked while the (potentially long) external accounting calls run.
Clients poll ``GET /api/jobs/{jobId}`` until status is ``SUCCESS`` /
``ERROR`` and then read the same ``{ total, success, skipped, errors,
results }`` payload from ``job.result`` that the legacy synchronous
endpoint returned.
"""
from modules.serviceCenter.services.serviceBackgroundJobs import startJob
mandateId = _validateInstanceAccess(instanceId, context)
positionIds = data.get("positionIds", [])
if not positionIds:
raise HTTPException(status_code=400, detail=routeApiMsg("positionIds required"))
jobId = await startJob(
TRUSTEE_ACCOUNTING_PUSH_JOB_TYPE,
{"positionIds": list(positionIds)},
mandateId=mandateId,
featureInstanceId=instanceId,
triggeredBy=context.user.id if context.user else None,
)
return {"jobId": jobId, "status": "pending"}
@router.post("/{instanceId}/accounting/sync/{positionId}")
@limiter.limit("10/minute")
async def sync_single_position_to_accounting(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
positionId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Sync a single position to the accounting system."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .accounting.accountingBridge import AccountingBridge
bridge = AccountingBridge(interface)
result = await bridge.pushPositionToAccounting(instanceId, positionId)
if not result.success:
logger.warning(
"Accounting sync failed for positionId=%s: %s",
positionId,
result.errorMessage or "unknown",
)
return result.model_dump()
@router.get("/{instanceId}/accounting/sync-status")
@limiter.limit("30/minute")
def get_sync_status(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get sync status of all positions for this instance."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .datamodelFeatureTrustee import TrusteeAccountingSync
records = interface.db.getRecordset(TrusteeAccountingSync, recordFilter={"featureInstanceId": instanceId})
items = [{k: v for k, v in r.items() if not k.startswith("_")} for r in records]
return {"items": items}
@router.get("/{instanceId}/accounting/sync-status/{positionId}")
@limiter.limit("30/minute")
def get_position_sync_status(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
positionId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get sync status for a specific position."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .datamodelFeatureTrustee import TrusteeAccountingSync
records = interface.db.getRecordset(TrusteeAccountingSync, recordFilter={"positionId": positionId, "featureInstanceId": instanceId})
items = [{k: v for k, v in r.items() if not k.startswith("_")} for r in records]
return {"items": items}
# ===== Accounting Data Import =====
TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE = "trusteeAccountingSync"
async def _trusteeAccountingSyncJobHandler(job: Dict[str, Any], progressCb) -> Dict[str, Any]:
"""BackgroundJob handler: imports accounting data from the external system.
Reads inputs from `job["payload"]` (dateFrom, dateTo, userId) and runs
`AccountingDataSync.importData(...)` in the worker's event loop without
blocking the original HTTP request that submitted the job.
"""
from modules.security.rootAccess import getRootUser
from .accounting.accountingDataSync import AccountingDataSync
instanceId = job["featureInstanceId"]
mandateId = job["mandateId"]
payload = job.get("payload") or {}
rootUser = getRootUser()
progressCb(5, "Initialisiere Import...")
interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId)
sync = AccountingDataSync(interface)
progressCb(10, "Verbinde mit Buchhaltungssystem...")
result = await sync.importData(
featureInstanceId=instanceId,
mandateId=mandateId,
dateFrom=payload.get("dateFrom"),
dateTo=payload.get("dateTo"),
progressCb=progressCb,
)
progressCb(100, "Import abgeschlossen.")
return result
try:
from modules.serviceCenter.services.serviceBackgroundJobs import registerJobHandler
registerJobHandler(TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE, _trusteeAccountingSyncJobHandler)
except Exception as _regErr:
logger.warning("Failed to register trusteeAccountingSync job handler: %s", _regErr)
@router.post("/{instanceId}/accounting/import-data", status_code=status.HTTP_202_ACCEPTED)
@limiter.limit("3/minute")
async def import_accounting_data(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: Dict[str, Any] = Body(default={}),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Submit a background job to import accounting data.
Returns immediately with `{ jobId }`; clients poll `GET /api/jobs/{jobId}`
until status is SUCCESS / ERROR.
"""
from modules.serviceCenter.services.serviceBackgroundJobs import startJob
mandateId = _validateInstanceAccess(instanceId, context)
payload = {
"dateFrom": data.get("dateFrom"),
"dateTo": data.get("dateTo"),
}
jobId = await startJob(
TRUSTEE_ACCOUNTING_SYNC_JOB_TYPE,
payload,
mandateId=mandateId,
featureInstanceId=instanceId,
triggeredBy=context.user.id if context.user else None,
)
return {"jobId": jobId, "status": "pending"}
@router.get("/{instanceId}/accounting/import-status")
@limiter.limit("30/minute")
def get_import_status(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get counts of imported TrusteeData* records for this instance."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .datamodelFeatureTrustee import (
TrusteeDataAccount, TrusteeDataJournalEntry, TrusteeDataJournalLine,
TrusteeDataContact, TrusteeDataAccountBalance, TrusteeAccountingConfig,
)
filt = {"featureInstanceId": instanceId}
counts = {
"accounts": len(interface.db.getRecordset(TrusteeDataAccount, recordFilter=filt) or []),
"journalEntries": len(interface.db.getRecordset(TrusteeDataJournalEntry, recordFilter=filt) or []),
"journalLines": len(interface.db.getRecordset(TrusteeDataJournalLine, recordFilter=filt) or []),
"contacts": len(interface.db.getRecordset(TrusteeDataContact, recordFilter=filt) or []),
"accountBalances": len(interface.db.getRecordset(TrusteeDataAccountBalance, recordFilter=filt) or []),
}
cfgRecords = interface.db.getRecordset(TrusteeAccountingConfig, recordFilter={"featureInstanceId": instanceId, "isActive": True})
if cfgRecords:
cfg = cfgRecords[0]
counts["lastSyncAt"] = cfg.get("lastSyncAt")
counts["lastSyncStatus"] = cfg.get("lastSyncStatus")
counts["lastSyncErrorMessage"] = cfg.get("lastSyncErrorMessage")
counts["lastSyncDateFrom"] = cfg.get("lastSyncDateFrom")
counts["lastSyncDateTo"] = cfg.get("lastSyncDateTo")
counts["lastSyncCounts"] = cfg.get("lastSyncCounts")
return counts
# ===== AI Data Cache =====
@router.post("/{instanceId}/accounting/clear-cache")
@limiter.limit("10/minute")
def clear_ai_data_cache(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Clear ONLY the AI feature-data query result cache (in-memory, ~5 min TTL).
Important: this does NOT touch the synchronised ``TrusteeData*`` tables.
The synced rows (chart of accounts, journal entries/lines, contacts, balances)
stay exactly as imported. To wipe those rows, use POST .../wipe-imported-data.
"""
_validateInstanceAccess(instanceId, context)
from modules.serviceCenter.services.serviceAgent.coreTools._featureSubAgentTools import clearFeatureQueryCache
removed = clearFeatureQueryCache(instanceId)
return {"cleared": removed, "featureInstanceId": instanceId}
@router.post("/{instanceId}/accounting/wipe-imported-data")
@limiter.limit("3/minute")
def wipe_imported_accounting_data(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Delete every ``TrusteeData*`` row imported for this feature instance.
Use when the source system was changed, test data needs to be cleared, or
the user suspects stale rows from earlier connector versions. Also resets
the ``lastSync*`` markers on the active config so the UI no longer reports
a stale "letzter Import" status. The connector configuration / credentials
remain untouched -- only synchronised payload data is removed.
"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
from .datamodelFeatureTrustee import (
TrusteeDataAccount, TrusteeDataJournalEntry, TrusteeDataJournalLine,
TrusteeDataContact, TrusteeDataAccountBalance, TrusteeAccountingConfig,
)
from modules.serviceCenter.services.serviceAgent.coreTools._featureSubAgentTools import clearFeatureQueryCache
removed: Dict[str, int] = {}
for tableName, model in [
("accounts", TrusteeDataAccount),
("journalEntries", TrusteeDataJournalEntry),
("journalLines", TrusteeDataJournalLine),
("contacts", TrusteeDataContact),
("accountBalances", TrusteeDataAccountBalance),
]:
try:
removed[tableName] = int(interface.db.recordDeleteWhere(model, {"featureInstanceId": instanceId}) or 0)
except Exception as ex:
logger.warning("wipeImportedData: failed for %s: %s", tableName, ex)
removed[tableName] = 0
cfgRecords = interface.db.getRecordset(
TrusteeAccountingConfig,
recordFilter={"featureInstanceId": instanceId, "isActive": True},
)
if cfgRecords:
cfgId = cfgRecords[0].get("id")
if cfgId:
try:
interface.db.recordModify(TrusteeAccountingConfig, cfgId, {
"lastSyncAt": None,
"lastSyncStatus": None,
"lastSyncErrorMessage": None,
"lastSyncDateFrom": None,
"lastSyncDateTo": None,
"lastSyncCounts": None,
})
except Exception as ex:
logger.warning("wipeImportedData: failed to reset lastSync* on cfg %s: %s", cfgId, ex)
cacheCleared = clearFeatureQueryCache(instanceId)
logger.info(
"wipeImportedData instance=%s removed=%s cacheCleared=%s",
instanceId, removed, cacheCleared,
)
return {
"removed": removed,
"totalRemoved": sum(removed.values()),
"cacheCleared": cacheCleared,
"featureInstanceId": instanceId,
}
# ===== Data Export =====
@router.get("/{instanceId}/accounting/export-data")
@limiter.limit("3/minute")
def export_accounting_data(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext),
) -> Response:
"""Export all TrusteeData* tables for this instance as a JSON download (admin only)."""
mandateId = _validateInstanceAccess(instanceId, context)
from .datamodelFeatureTrustee import (
TrusteeDataAccount,
TrusteeDataJournalEntry,
TrusteeDataJournalLine,
TrusteeDataContact,
TrusteeDataAccountBalance,
TrusteeAccountingConfig,
)
import time as _time
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
_filter = {"featureInstanceId": instanceId}
tables: Dict[str, Any] = {}
for tableName, model in [
("TrusteeDataAccount", TrusteeDataAccount),
("TrusteeDataJournalEntry", TrusteeDataJournalEntry),
("TrusteeDataJournalLine", TrusteeDataJournalLine),
("TrusteeDataContact", TrusteeDataContact),
("TrusteeDataAccountBalance", TrusteeDataAccountBalance),
]:
records = interface.db.getRecordset(model, recordFilter=_filter) or []
tables[tableName] = records
cfgRecords = interface.db.getRecordset(
TrusteeAccountingConfig,
recordFilter={"featureInstanceId": instanceId, "isActive": True},
)
syncInfo = {}
if cfgRecords:
cfg = cfgRecords[0]
syncInfo = {
"connectorType": cfg.get("connectorType", ""),
"lastSyncAt": cfg.get("lastSyncAt"),
"lastSyncStatus": cfg.get("lastSyncStatus", ""),
}
payload = {
"exportedAt": _time.time(),
"featureInstanceId": instanceId,
"mandateId": mandateId,
"syncInfo": syncInfo,
"tables": tables,
}
jsonBytes = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8")
return Response(
content=jsonBytes,
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="trustee_data_{instanceId[:8]}.json"'},
)
# ===== Position-Document Query =====
@router.get("/{instanceId}/positions/document/{documentId}", response_model=List[TrusteePosition])
@limiter.limit("30/minute")
def get_positions_by_document(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
documentId: str = Path(...),
context: RequestContext = Depends(getRequestContext)
) -> List[TrusteePosition]:
"""Get all positions generated from a specific document."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
return interface.getPositionsByDocument(documentId)
# ===== Instance Roles Management =====
# These endpoints allow feature admins to manage instance-specific roles and their AccessRules
def _validateInstanceAdmin(instanceId: str, context: RequestContext) -> str:
"""
Validate that the user has admin access to the feature instance.
Returns the mandateId if authorized.
This checks for the RESOURCE permission 'instance-roles.manage'
via AccessRules assigned to the user's current roles.
"""
mandateId = _validateInstanceAccess(instanceId, context)
# SysAdmin role always has access
if context.isPlatformAdmin:
return mandateId
# Check for instance-roles.manage resource permission via AccessRules
rootInterface = getRootInterface()
hasAdminPermission = False
for roleId in context.roleIds:
rules = rootInterface.db.getRecordset(
AccessRule,
{"roleId": roleId, "context": AccessRuleContext.RESOURCE.value, "item": "resource.trustee.instance-roles.manage"}
)
if rules:
hasAdminPermission = True
break
if not hasAdminPermission:
raise HTTPException(
status_code=403,
detail=routeApiMsg("Keine Berechtigung zur Rollenverwaltung")
)
return mandateId
def _serializeRoleForApi(role) -> Dict[str, Any]:
"""Dump a Role and resolve the multilingual ``description`` to a plain string.
The Role.description field is a ``TextMultilingual`` (``{xx, de, en, ...}``).
The frontend expects a plain string, so we resolve via the request language
here (same pattern as ``getQuickActions``). Without this the React tree
crashes with "Objects are not valid as a React child".
"""
from modules.shared.i18nRegistry import resolveText
payload = role.model_dump()
payload["description"] = resolveText(payload.get("description"))
return payload
@router.get("/{instanceId}/instance-roles")
@limiter.limit("30/minute")
def get_instance_roles(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
):
"""
Get all roles for this feature instance.
Requires feature admin permission.
"""
mandateId = _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
roles = rootInterface.getRolesByFeatureCode("trustee", featureInstanceId=instanceId)
return {"items": [_serializeRoleForApi(r) for r in roles], "pagination": None}
@router.get("/{instanceId}/instance-roles/{roleId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
def get_instance_role(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get a specific instance role."""
mandateId = _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
role = rootInterface.getRole(roleId)
if not role:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
# Verify role belongs to this instance
if str(role.featureInstanceId) != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
return _serializeRoleForApi(role)
@router.get("/{instanceId}/instance-roles/{roleId}/rules")
@limiter.limit("30/minute")
def get_instance_role_rules(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
context: RequestContext = Depends(getRequestContext)
):
"""
Get all AccessRules for a specific instance role.
Requires feature admin permission.
"""
mandateId = _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
role = rootInterface.getRole(roleId)
if not role or str(role.featureInstanceId) != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
rules = rootInterface.getAccessRulesByRole(roleId)
return {"items": [r.model_dump() for r in rules], "pagination": None}
@router.post("/{instanceId}/instance-roles/{roleId}/rules", response_model=Dict[str, Any], status_code=201)
@limiter.limit("10/minute")
def create_instance_role_rule(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
ruleData: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Create a new AccessRule for an instance role.
Requires feature admin permission.
"""
mandateId = _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
# Verify role belongs to this instance (Pydantic model)
role = rootInterface.getRole(roleId)
if not role or str(role.featureInstanceId) != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
# Create the rule
try:
contextStr = ruleData.get("context", "UI")
if isinstance(contextStr, str):
contextEnum = AccessRuleContext(contextStr.upper())
else:
contextEnum = contextStr
newRule = AccessRule(
roleId=roleId,
context=contextEnum,
item=ruleData.get("item"),
view=ruleData.get("view", False),
read=ruleData.get("read"),
create=ruleData.get("create"),
update=ruleData.get("update"),
delete=ruleData.get("delete"),
)
created = rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
return created
except Exception as e:
logger.error(f"Error creating AccessRule: {e}")
raise HTTPException(status_code=400, detail=f"Failed to create rule: {str(e)}")
@router.put("/{instanceId}/instance-roles/{roleId}/rules/{ruleId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def update_instance_role_rule(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
ruleId: str = Path(..., description="Rule ID"),
ruleData: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Update an AccessRule for an instance role.
Only view, read, create, update, delete can be changed.
Requires feature admin permission.
"""
mandateId = _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
# Verify role belongs to this instance (Pydantic model)
role = rootInterface.getRole(roleId)
if not role or str(role.featureInstanceId) != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
# Verify rule belongs to role (Pydantic model)
existingRule = rootInterface.getAccessRule(ruleId)
if not existingRule or str(existingRule.roleId) != roleId:
raise HTTPException(status_code=404, detail=f"Rule {ruleId} not found for this role")
# Update only allowed fields
updateData = {}
if "view" in ruleData:
updateData["view"] = ruleData["view"]
if "read" in ruleData:
updateData["read"] = ruleData["read"]
if "create" in ruleData:
updateData["create"] = ruleData["create"]
if "update" in ruleData:
updateData["update"] = ruleData["update"]
if "delete" in ruleData:
updateData["delete"] = ruleData["delete"]
if not updateData:
return existingRule.model_dump()
try:
updated = rootInterface.db.recordModify(AccessRule, ruleId, updateData)
return updated
except Exception as e:
logger.error(f"Error updating AccessRule: {e}")
raise HTTPException(status_code=400, detail=f"Failed to update rule: {str(e)}")
@router.delete("/{instanceId}/instance-roles/{roleId}/rules/{ruleId}")
@limiter.limit("10/minute")
def delete_instance_role_rule(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
ruleId: str = Path(..., description="Rule ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Delete an AccessRule for an instance role.
Requires feature admin permission.
"""
mandateId = _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
# Verify role belongs to this instance (Pydantic model)
role = rootInterface.getRole(roleId)
if not role or str(role.featureInstanceId) != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
# Verify rule belongs to role (Pydantic model)
existingRule = rootInterface.getAccessRule(ruleId)
if not existingRule or str(existingRule.roleId) != roleId:
raise HTTPException(status_code=404, detail=f"Rule {ruleId} not found for this role")
try:
rootInterface.db.recordDelete(AccessRule, ruleId)
return {"message": f"Rule {ruleId} deleted"}
except Exception as e:
logger.error(f"Error deleting AccessRule: {e}")
raise HTTPException(status_code=400, detail=f"Failed to delete rule: {str(e)}")
# =============================================================================
# Generic Read-Only Data Tables (consolidated TrusteeDataTablesView)
# =============================================================================
#
# These endpoints expose the seven additional Trustee tables that previously
# only had aggregate or specialised views. They are read-only:
# - TrusteeData* tables are populated by the accounting sync; manual edits
# would be overwritten on the next sync.
# - TrusteeAccountingConfig / TrusteeAccountingSync are operational records
# maintained by the connector layer.
#
# All seven endpoints share one helper (`_paginatedReadEndpoint`) that
# replicates the established pattern from `get_documents` / `get_positions`
# (Unified Filter API: mode=filterValues / mode=ids).
def _buildFeatureInternalResolvers(modelClass, db) -> Dict[str, Any]:
"""Build ``extraResolvers`` for FK fields that point to other Trustee models.
The builtin ``enrichRowsWithFkLabels`` only covers Mandate / FeatureInstance /
User / Role. Feature-internal FKs (e.g. ``journalEntryId`` -> ``TrusteeDataJournalEntry``)
need a resolver that queries the Trustee DB. This function discovers such fields
from the Pydantic model's ``fk_target`` annotations and creates a resolver per field.
Label strategy per target model:
- ``TrusteeDataJournalEntry``: ``"<externalId> | <bookingDate>"``
- Generic fallback: ``"<externalId>"`` or ``"<id[:8]>"``
"""
resolvers: Dict[str, Any] = {}
for name, fieldInfo in modelClass.model_fields.items():
extra = fieldInfo.json_schema_extra
if not extra or not isinstance(extra, dict):
continue
tgt = extra.get("fk_target")
if not isinstance(tgt, dict):
continue
tableName = tgt.get("table", "")
if tableName not in _TRUSTEE_ENTITY_MODELS:
continue
targetModel = _TRUSTEE_ENTITY_MODELS[tableName]
def _makeResolver(model, field=name):
def _resolve(ids: List[str]) -> Dict[str, Optional[str]]:
result: Dict[str, Optional[str]] = {i: None for i in ids}
try:
recs = db.getRecordset(model, recordFilter={"id": list(set(ids))}) or []
except Exception:
return result
for r in recs:
row = r if isinstance(r, dict) else r.model_dump() if hasattr(r, "model_dump") else {}
rid = row.get("id", "")
parts = []
for col in ("externalId", "reference", "bookingDate", "label", "name", "accountNumber"):
val = row.get(col)
if val is not None and val != "":
if col == "bookingDate" and isinstance(val, (int, float)):
from datetime import datetime, timezone
try:
parts.append(datetime.fromtimestamp(val, tz=timezone.utc).strftime("%Y-%m-%d"))
except Exception:
parts.append(str(val))
else:
parts.append(str(val))
if len(parts) >= 2:
break
result[rid] = " | ".join(parts) if parts else rid[:8]
return result
return _resolve
resolvers[name] = _makeResolver(targetModel)
return resolvers
def _paginatedReadEndpoint(
*,
instanceId: str,
context: RequestContext,
modelClass,
pagination: Optional[str],
mode: Optional[str],
column: Optional[str],
):
"""Generic paginated, RBAC-aware GET handler for a Trustee data model.
Mirrors the pattern used by `get_documents` / `get_positions`:
- mode=filterValues: distinct column values for filter UI
- mode=ids: full id list for "select all matching"
- default: paginated result via `getRecordsetPaginatedWithRBAC`
"""
from modules.interfaces.interfaceRbac import (
getRecordsetPaginatedWithRBAC,
)
from modules.routes.routeHelpers import (
handleIdsInMemory,
handleFilterValuesInMemory,
enrichRowsWithFkLabels,
)
mandateId = _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
if mode == "filterValues":
if not column:
raise HTTPException(status_code=400, detail="column parameter required for mode=filterValues")
result = getRecordsetPaginatedWithRBAC(
connector=interface.db,
modelClass=modelClass,
currentUser=interface.currentUser,
pagination=None,
recordFilter=None,
mandateId=interface.mandateId,
featureInstanceId=interface.featureInstanceId,
featureCode=interface.FEATURE_CODE,
)
rawItems = result.items if hasattr(result, "items") else result
items = [r.model_dump() if hasattr(r, "model_dump") else r for r in rawItems]
featureResolvers = _buildFeatureInternalResolvers(modelClass, interface.db)
enrichRowsWithFkLabels(items, modelClass, extraResolvers=featureResolvers or None)
return handleFilterValuesInMemory(items, column, pagination)
if mode == "ids":
result = getRecordsetPaginatedWithRBAC(
connector=interface.db,
modelClass=modelClass,
currentUser=interface.currentUser,
pagination=None,
recordFilter=None,
mandateId=interface.mandateId,
featureInstanceId=interface.featureInstanceId,
featureCode=interface.FEATURE_CODE,
)
items = result.items if hasattr(result, "items") else result
items = [r.model_dump() if hasattr(r, "model_dump") else r for r in items]
return handleIdsInMemory(items, pagination)
paginationParams = _parsePagination(pagination)
result = getRecordsetPaginatedWithRBAC(
connector=interface.db,
modelClass=modelClass,
currentUser=interface.currentUser,
pagination=paginationParams,
recordFilter=None,
mandateId=interface.mandateId,
featureInstanceId=interface.featureInstanceId,
featureCode=interface.FEATURE_CODE,
)
def _itemsToDicts(rawItems):
return [r.model_dump() if hasattr(r, "model_dump") else r for r in rawItems]
featureResolvers = _buildFeatureInternalResolvers(modelClass, interface.db)
if paginationParams and hasattr(result, "items"):
enriched = enrichRowsWithFkLabels(
_itemsToDicts(result.items), modelClass,
extraResolvers=featureResolvers or None,
)
return {
"items": enriched,
"pagination": PaginationMetadata(
currentPage=paginationParams.page or 1,
pageSize=paginationParams.pageSize or 20,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort if paginationParams else [],
filters=paginationParams.filters if paginationParams else None,
).model_dump(),
}
items = result.items if hasattr(result, "items") else result
enriched = enrichRowsWithFkLabels(
_itemsToDicts(items), modelClass,
extraResolvers=featureResolvers or None,
)
return {"items": enriched, "pagination": None}
@router.get("/{instanceId}/data/accounts")
@limiter.limit("30/minute")
def get_data_accounts(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' or 'ids'"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext),
):
"""Read-only list of synced chart-of-accounts entries (TrusteeDataAccount)."""
return _paginatedReadEndpoint(
instanceId=instanceId,
context=context,
modelClass=TrusteeDataAccount,
pagination=pagination,
mode=mode,
column=column,
)
@router.get("/{instanceId}/data/journal-entries")
@limiter.limit("30/minute")
def get_data_journal_entries(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' or 'ids'"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext),
):
"""Read-only list of synced journal entries (TrusteeDataJournalEntry)."""
return _paginatedReadEndpoint(
instanceId=instanceId,
context=context,
modelClass=TrusteeDataJournalEntry,
pagination=pagination,
mode=mode,
column=column,
)
@router.get("/{instanceId}/data/journal-lines")
@limiter.limit("30/minute")
def get_data_journal_lines(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' or 'ids'"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext),
):
"""Read-only list of synced journal lines (TrusteeDataJournalLine)."""
return _paginatedReadEndpoint(
instanceId=instanceId,
context=context,
modelClass=TrusteeDataJournalLine,
pagination=pagination,
mode=mode,
column=column,
)
@router.get("/{instanceId}/data/contacts")
@limiter.limit("30/minute")
def get_data_contacts(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' or 'ids'"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext),
):
"""Read-only list of synced contacts (TrusteeDataContact)."""
return _paginatedReadEndpoint(
instanceId=instanceId,
context=context,
modelClass=TrusteeDataContact,
pagination=pagination,
mode=mode,
column=column,
)
@router.get("/{instanceId}/data/account-balances")
@limiter.limit("30/minute")
def get_data_account_balances(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' or 'ids'"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext),
):
"""Read-only list of synced account balances (TrusteeDataAccountBalance)."""
return _paginatedReadEndpoint(
instanceId=instanceId,
context=context,
modelClass=TrusteeDataAccountBalance,
pagination=pagination,
mode=mode,
column=column,
)
@router.get("/{instanceId}/accounting/configs")
@limiter.limit("30/minute")
def get_accounting_configs(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' or 'ids'"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext),
):
"""Read-only list of accounting connector configurations (TrusteeAccountingConfig).
Note: secret config fields are stored masked in the underlying record;
UI consumers must rely on the dedicated `/accounting/config` endpoint
for secret-aware editing.
"""
return _paginatedReadEndpoint(
instanceId=instanceId,
context=context,
modelClass=TrusteeAccountingConfig,
pagination=pagination,
mode=mode,
column=column,
)
@router.get("/{instanceId}/accounting/syncs")
@limiter.limit("30/minute")
def get_accounting_syncs(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
mode: Optional[str] = Query(None, description="'filterValues' or 'ids'"),
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
context: RequestContext = Depends(getRequestContext),
):
"""Read-only list of accounting sync records (TrusteeAccountingSync)."""
return _paginatedReadEndpoint(
instanceId=instanceId,
context=context,
modelClass=TrusteeAccountingSync,
pagination=pagination,
mode=mode,
column=column,
)