gateway/modules/shared/attributeUtils.py
2026-04-12 10:12:03 +02:00

358 lines
15 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Shared utilities for model attributes and labels.
"""
from pydantic import BaseModel, Field, ConfigDict
from pydantic_core import PydanticUndefined
from typing import Dict, Any, List, Type, Optional, Union
import inspect
import importlib
import os
import logging
logger = logging.getLogger(__name__)
# Define the AttributeDefinition class here instead of importing it
class AttributeDefinition(BaseModel):
"""Definition of a model attribute with its metadata."""
name: str
type: str
label: str
description: Optional[str] = None
required: bool = False
default: Any = None
options: Optional[Union[str, List[Any]]] = None
validation: Optional[Dict[str, Any]] = None
ui: Optional[Dict[str, Any]] = None
readonly: bool = False
editable: bool = True
visible: bool = True
order: int = 0
placeholder: Optional[str] = None
fkSource: Optional[str] = None
fkDisplayField: Optional[str] = None
def _getModelLabelEntry(modelName: str) -> Dict[str, Any]:
"""Resolve label data produced by @i18nModel (see modules.shared.i18nRegistry.MODEL_LABELS)."""
try:
from modules.shared.i18nRegistry import MODEL_LABELS as i18nModelLabels
except ImportError:
return {}
return i18nModelLabels.get(modelName) or {}
def getModelLabels(modelName: str) -> Dict[str, str]:
"""Get labels for a model's attributes in the specified language.
Reads @i18nModel registration (German base strings); resolves via resolveText().
"""
modelData = _getModelLabelEntry(modelName)
attributeLabels = modelData.get("attributes", {})
from modules.shared.i18nRegistry import resolveText
result: Dict[str, str] = {}
for attr, translations in attributeLabels.items():
resolved = resolveText(translations)
result[attr] = resolved if resolved else f"[{attr}]"
return result
def _resolveOptionLabels(options):
"""Resolve frontend_options label values via resolveText().
CRITICAL: deep-copy so the shared json_schema_extra dicts are never mutated.
Without the copy, each request would re-translate the already-translated
label, wrapping it in another layer of ``[…]`` brackets.
"""
if not isinstance(options, list):
return options
import copy
from modules.shared.i18nRegistry import resolveText
resolved = copy.deepcopy(options)
for opt in resolved:
if not isinstance(opt, dict) or "label" not in opt:
continue
opt["label"] = resolveText(opt["label"])
return resolved
def _mergedAttributeLabels(modelClass: Type[BaseModel]) -> Dict[str, str]:
"""Merge attribute labels from model MRO (base classes first, subclass overrides)."""
try:
baseIdx = modelClass.__mro__.index(BaseModel)
except ValueError:
return getModelLabels(modelClass.__name__)
merged: Dict[str, str] = {}
for cls in reversed(modelClass.__mro__[:baseIdx]):
merged.update(getModelLabels(cls.__name__))
return merged
def getModelLabel(modelName: str) -> str:
"""Get the label for a model via resolveText()."""
modelData = _getModelLabelEntry(modelName)
modelLabel = modelData.get("model", {})
from modules.shared.i18nRegistry import resolveText
resolved = resolveText(modelLabel)
return resolved if resolved else f"[{modelName}]"
def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguage: str = "en") -> Dict[str, Any]:
"""
Get attribute definitions for a model class.
Args:
modelClass: The model class to get attributes for
userLanguage: Language code for translations (default: "en")
Returns:
Dictionary containing model label and attribute definitions
"""
if not modelClass:
return {}
attributes = []
model_name = modelClass.__name__
labels = _mergedAttributeLabels(modelClass)
model_label = getModelLabel(model_name)
# Pydantic v2 only
fields = modelClass.model_fields
for name, field in fields.items():
# Extract frontend metadata from field info
# In Pydantic v2, the field from model_fields.items() IS the FieldInfo object
# FieldInfo objects have the 'extra' dict directly on them
field_info = field
# Check both direct attributes and extra field for frontend metadata
frontend_type = None
frontend_readonly = False
frontend_required = field.is_required()
frontend_options = None
frontend_visible = True # Default visible
frontend_fk_source = None # FK dropdown source (e.g., "/api/users/")
frontend_fk_display_field = None # Which field of the FK target to display (e.g., "username", "name")
if field_info:
# Try direct attributes first (though these won't exist for custom kwargs)
frontend_type = getattr(field_info, "frontend_type", None)
frontend_readonly = getattr(field_info, "frontend_readonly", False)
frontend_required = getattr(
field_info, "frontend_required", frontend_required
)
frontend_options = getattr(field_info, "frontend_options", None)
# If not found, check json_schema_extra (Pydantic v2 stores custom kwargs here)
if frontend_type is None and hasattr(field_info, "json_schema_extra"):
json_extra = field_info.json_schema_extra
if isinstance(json_extra, dict):
frontend_type = json_extra.get("frontend_type")
elif callable(json_extra):
# If it's a callable, we can't easily extract from it, skip
pass
# Check extra field (Pydantic v2 stores custom kwargs here)
# This is the main location where frontend_type is stored
if hasattr(field_info, "extra"):
extra_dict = field_info.extra
if isinstance(extra_dict, dict):
if frontend_type is None:
frontend_type = extra_dict.get("frontend_type")
# Also extract other fields from extra if it exists
if hasattr(field_info, "extra") and isinstance(field_info.extra, dict):
extra_dict = field_info.extra
if not frontend_readonly:
frontend_readonly = extra_dict.get("frontend_readonly", False)
if frontend_required == field.is_required():
frontend_required = extra_dict.get("frontend_required", frontend_required)
if frontend_options is None:
frontend_options = extra_dict.get("frontend_options")
# Also check json_schema_extra for other fields
if hasattr(field_info, "json_schema_extra"):
json_extra = field_info.json_schema_extra
if isinstance(json_extra, dict):
if not frontend_readonly and "frontend_readonly" in json_extra:
frontend_readonly = json_extra.get("frontend_readonly", False)
if frontend_required == field.is_required() and "frontend_required" in json_extra:
frontend_required = json_extra.get("frontend_required", frontend_required)
if frontend_options is None and "frontend_options" in json_extra:
frontend_options = json_extra.get("frontend_options")
# Extract frontend_visible (default True, can be set to False to hide field)
if "frontend_visible" in json_extra:
frontend_visible = json_extra.get("frontend_visible", True)
# Extract frontend_fk_source for FK dropdown references
if "frontend_fk_source" in json_extra:
frontend_fk_source = json_extra.get("frontend_fk_source")
# Extract frontend_fk_display_field - which field of FK target to display
if "frontend_fk_display_field" in json_extra:
frontend_fk_display_field = json_extra.get("frontend_fk_display_field")
# Use frontend type if available, otherwise detect from Python type
if frontend_type:
field_type = frontend_type
else:
# Check if it's TextMultilingual type
annotation_str = str(field.annotation)
# Check both the module path and class name for TextMultilingual
if ('TextMultilingual' in annotation_str or
(hasattr(field.annotation, '__name__') and field.annotation.__name__ == 'TextMultilingual') or
'datamodelUtils.TextMultilingual' in annotation_str or
'datamodels.datamodelUtils.TextMultilingual' in annotation_str):
field_type = 'multilingual'
elif hasattr(field.annotation, "__name__"):
annotation_name = field.annotation.__name__
# Check if it's a Dict type (for JSON/object fields)
if annotation_name == 'Dict' or annotation_str.startswith('typing.Dict') or annotation_str.startswith('Dict['):
field_type = 'object' # Will be rendered as textarea for JSON editing
else:
field_type = annotation_name
else:
field_type = str(field.annotation)
# Extract default value from field
# In Pydantic v2, FieldInfo has a 'default' attribute
field_default = None
if hasattr(field_info, 'default'):
default_value = field_info.default
# Handle default_factory (callable that generates default)
if hasattr(field_info, 'default_factory') and callable(field_info.default_factory):
# Don't call it here - it's meant to be called per-instance
# Instead, store a marker that indicates it exists
field_default = None # Frontend should use first option or specific logic
elif default_value is not ... and default_value is not PydanticUndefined:
# Ellipsis or PydanticUndefined means no default
# Convert enum to its value if it's an enum
if hasattr(default_value, 'value'):
field_default = default_value.value
else:
field_default = default_value
# Safely get description
description = ""
try:
if hasattr(field_info, "description") and field_info.description:
description = str(field_info.description)
except Exception:
pass
# Hide "id" fields by default unless explicitly set to visible
# Also hide fields ending with "Id" that are FK references (unless they have fkSource)
if name == "id":
frontend_visible = False # Never show primary key in forms/tables
attr_def = {
"name": name,
"type": field_type,
"required": frontend_required,
"description": description,
"label": labels.get(name, name),
"placeholder": f"Please enter {labels.get(name, name)}",
"editable": not frontend_readonly,
"visible": frontend_visible,
"order": len(attributes),
"readonly": frontend_readonly,
"options": _resolveOptionLabels(frontend_options),
"default": field_default,
}
# Add FK source for dropdown rendering if specified
if frontend_fk_source:
attr_def["fkSource"] = frontend_fk_source
# Also add display field if specified (which field of FK target to show)
if frontend_fk_display_field:
attr_def["fkDisplayField"] = frontend_fk_display_field
attributes.append(attr_def)
return {"model": model_label, "attributes": attributes}
def getModelClasses() -> Dict[str, Type[BaseModel]]:
"""
Dynamically get all model classes from all model modules.
Returns:
Dict[str, Type[BaseModel]]: Dictionary of model class names to their classes
"""
modelClasses = {}
# Get the interfaces directory path
interfaces_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "interfaces"
)
# Find all model files in interfaces directory
for fileName in os.listdir(interfaces_dir):
if fileName.endswith("Model.py"):
# Convert fileName to module name (e.g., gatewayModel.py -> gatewayModel)
module_name = fileName[:-3]
# Import the module dynamically
module = importlib.import_module(f"modules.interfaces.{module_name}")
# Get all classes from the module
for name, obj in inspect.getmembers(module):
if (
inspect.isclass(obj)
and issubclass(obj, BaseModel)
and obj != BaseModel
):
modelClasses[name] = obj
# Also get models from datamodels directory
datamodels_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "datamodels"
)
# Find all model files in datamodels directory
for fileName in os.listdir(datamodels_dir):
if fileName.startswith("datamodel") and fileName.endswith(".py"):
# Convert fileName to module name (e.g., datamodelUtils.py -> datamodelUtils)
module_name = fileName[:-3]
try:
# Import the module dynamically
module = importlib.import_module(f"modules.datamodels.{module_name}")
# Get all classes from the module
for name, obj in inspect.getmembers(module):
if (
inspect.isclass(obj)
and issubclass(obj, BaseModel)
and obj != BaseModel
):
modelClasses[name] = obj
except Exception as e:
logger.warning(f"Error importing module {module_name}: {str(e)}", exc_info=True)
# Continue with other modules even if one fails
return modelClasses
class AttributeResponse(BaseModel):
"""Response model for entity attributes"""
attributes: List[AttributeDefinition]
model_config = ConfigDict(
json_schema_extra={
"example": {
"attributes": [
{
"name": "username",
"label": "Username",
"type": "string",
"required": True,
"placeholder": "Please enter username",
"editable": True,
"visible": True,
"order": 0,
}
]
}
}
)