276 lines
No EOL
9.6 KiB
Python
276 lines
No EOL
9.6 KiB
Python
"""
|
|
Shared utilities for model attributes and labels.
|
|
"""
|
|
|
|
from pydantic import BaseModel, Field
|
|
from typing import Dict, Any, List, Type, Optional, Union
|
|
import inspect
|
|
import importlib
|
|
import os
|
|
from datetime import datetime
|
|
|
|
class ModelMixin:
|
|
"""Mixin class that provides serialization methods for Pydantic models."""
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""
|
|
Convert a Pydantic model to a dictionary.
|
|
Handles both Pydantic v1 and v2.
|
|
Properly serializes datetime fields to ISO format strings.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Dictionary representation of the model
|
|
"""
|
|
# Get the raw dictionary
|
|
if hasattr(self, 'model_dump'):
|
|
data: Dict[str, Any] = self.model_dump() # Pydantic v2
|
|
else:
|
|
data: Dict[str, Any] = self.dict() # Pydantic v1
|
|
|
|
# Convert datetime fields to ISO format strings
|
|
for key, value in data.items():
|
|
if isinstance(value, datetime):
|
|
data[key] = value.isoformat()
|
|
elif isinstance(value, (int, float)) and self._is_timestamp_field(key):
|
|
# Handle timestamp fields based on field metadata
|
|
try:
|
|
data[key] = datetime.fromtimestamp(value).isoformat()
|
|
except (ValueError, TypeError):
|
|
# If conversion fails, keep the original value
|
|
pass
|
|
|
|
return data
|
|
|
|
def _is_timestamp_field(self, field_name: str) -> bool:
|
|
"""
|
|
Check if a field is a timestamp field based on field metadata.
|
|
Looks for 'UTC timestamp' in the field description.
|
|
"""
|
|
try:
|
|
# Get field info from Pydantic model
|
|
if hasattr(self, 'model_fields'):
|
|
# Pydantic v2
|
|
field_info = self.model_fields.get(field_name)
|
|
if field_info and field_info.description:
|
|
return 'UTC timestamp' in field_info.description
|
|
elif hasattr(self, '__fields__'):
|
|
# Pydantic v1
|
|
field_info = self.__fields__.get(field_name)
|
|
if field_info and field_info.field_info and field_info.field_info.description:
|
|
return 'UTC timestamp' in field_info.field_info.description
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback: return False for safety
|
|
return False
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> 'ModelMixin':
|
|
"""
|
|
Create a Pydantic model instance from a dictionary.
|
|
|
|
Args:
|
|
data: Dictionary containing the model data
|
|
|
|
Returns:
|
|
ModelMixin: New instance of the model class
|
|
"""
|
|
return cls(**data)
|
|
|
|
# Define the AttributeDefinition class here instead of importing it
|
|
class AttributeDefinition(BaseModel, ModelMixin):
|
|
"""Definition of a model attribute with its metadata."""
|
|
name: str
|
|
type: str
|
|
label: str
|
|
description: Optional[str] = None
|
|
required: bool = False
|
|
default: Any = None
|
|
options: Optional[List[Any]] = None
|
|
validation: Optional[Dict[str, Any]] = None
|
|
ui: Optional[Dict[str, Any]] = None
|
|
|
|
# Global registry for model labels
|
|
MODEL_LABELS: Dict[str, Dict[str, Dict[str, str]]] = {}
|
|
|
|
def to_dict(model: BaseModel) -> Dict[str, Any]:
|
|
"""
|
|
Convert a Pydantic model to a dictionary.
|
|
Handles both Pydantic v1 and v2.
|
|
|
|
Args:
|
|
model: The Pydantic model instance to convert
|
|
|
|
Returns:
|
|
Dict[str, Any]: Dictionary representation of the model
|
|
"""
|
|
if hasattr(model, 'model_dump'):
|
|
return model.model_dump() # Pydantic v2
|
|
return model.dict() # Pydantic v1
|
|
|
|
def from_dict(model_class: Type[BaseModel], data: Dict[str, Any]) -> BaseModel:
|
|
"""
|
|
Create a Pydantic model instance from a dictionary.
|
|
|
|
Args:
|
|
model_class: The Pydantic model class to instantiate
|
|
data: Dictionary containing the model data
|
|
|
|
Returns:
|
|
BaseModel: New instance of the model class
|
|
"""
|
|
return model_class(**data)
|
|
|
|
def register_model_labels(model_name: str, model_label: Dict[str, str], labels: Dict[str, Dict[str, str]]):
|
|
"""
|
|
Register labels for a model's attributes and the model itself.
|
|
|
|
Args:
|
|
model_name: Name of the model class
|
|
model_label: Dictionary mapping language codes to model labels
|
|
e.g. {"en": "Prompt", "fr": "Invite"}
|
|
labels: Dictionary mapping attribute names to their translations
|
|
e.g. {"name": {"en": "Name", "fr": "Nom"}}
|
|
"""
|
|
MODEL_LABELS[model_name] = {
|
|
"model": model_label,
|
|
"attributes": labels
|
|
}
|
|
|
|
def get_model_labels(model_name: str, language: str = "en") -> Dict[str, str]:
|
|
"""
|
|
Get labels for a model's attributes in the specified language.
|
|
|
|
Args:
|
|
model_name: Name of the model class
|
|
language: Language code (default: "en")
|
|
|
|
Returns:
|
|
Dictionary mapping attribute names to their labels in the specified language
|
|
"""
|
|
model_data = MODEL_LABELS.get(model_name, {})
|
|
attribute_labels = model_data.get("attributes", {})
|
|
|
|
return {
|
|
attr: translations.get(language, translations.get("en", attr))
|
|
for attr, translations in attribute_labels.items()
|
|
}
|
|
|
|
def get_model_label(model_name: str, language: str = "en") -> str:
|
|
"""
|
|
Get the label for a model in the specified language.
|
|
|
|
Args:
|
|
model_name: Name of the model class
|
|
language: Language code (default: "en")
|
|
|
|
Returns:
|
|
Model label in the specified language, or model name if no label exists
|
|
"""
|
|
model_data = MODEL_LABELS.get(model_name, {})
|
|
model_label = model_data.get("model", {})
|
|
return model_label.get(language, model_label.get("en", model_name))
|
|
|
|
def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguage: str = "en") -> Dict[str, Any]:
|
|
"""
|
|
Get attribute definitions for a model class.
|
|
|
|
Args:
|
|
modelClass: The model class to get attributes for
|
|
userLanguage: Language code for translations (default: "en")
|
|
|
|
Returns:
|
|
Dictionary containing model label and attribute definitions
|
|
"""
|
|
if not modelClass:
|
|
return {}
|
|
|
|
attributes = []
|
|
model_name = modelClass.__name__
|
|
labels = get_model_labels(model_name, userLanguage)
|
|
model_label = get_model_label(model_name, userLanguage)
|
|
|
|
# Handle both Pydantic v1 and v2
|
|
if hasattr(modelClass, 'model_fields'): # Pydantic v2
|
|
fields = modelClass.model_fields
|
|
for name, field in fields.items():
|
|
attributes.append({
|
|
"name": name,
|
|
"type": field.annotation.__name__ if hasattr(field.annotation, "__name__") else str(field.annotation),
|
|
"required": field.is_required() if hasattr(field, "is_required") else True,
|
|
"description": field.description if hasattr(field, "description") else "",
|
|
"label": labels.get(name, name),
|
|
"placeholder": f"Please enter {labels.get(name, name)}",
|
|
"editable": True,
|
|
"visible": True,
|
|
"order": len(attributes)
|
|
})
|
|
else: # Pydantic v1
|
|
fields = modelClass.__fields__
|
|
for name, field in fields.items():
|
|
attributes.append({
|
|
"name": name,
|
|
"type": field.type_.__name__ if hasattr(field.type_, "__name__") else str(field.type_),
|
|
"required": field.required,
|
|
"description": field.field_info.description if hasattr(field.field_info, "description") else "",
|
|
"label": labels.get(name, name),
|
|
"placeholder": f"Please enter {labels.get(name, name)}",
|
|
"editable": True,
|
|
"visible": True,
|
|
"order": len(attributes)
|
|
})
|
|
|
|
return {
|
|
"model": model_label,
|
|
"attributes": attributes
|
|
}
|
|
|
|
def getModelClasses() -> Dict[str, Type[BaseModel]]:
|
|
"""
|
|
Dynamically get all model classes from all model modules.
|
|
|
|
Returns:
|
|
Dict[str, Type[BaseModel]]: Dictionary of model class names to their classes
|
|
"""
|
|
modelClasses = {}
|
|
|
|
# Get the interfaces directory path
|
|
interfaces_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'interfaces')
|
|
|
|
# Find all model files
|
|
for filename in os.listdir(interfaces_dir):
|
|
if filename.endswith('Model.py'):
|
|
# Convert filename to module name (e.g., gatewayModel.py -> gatewayModel)
|
|
module_name = filename[:-3]
|
|
|
|
# Import the module dynamically
|
|
module = importlib.import_module(f'modules.interfaces.{module_name}')
|
|
|
|
# Get all classes from the module
|
|
for name, obj in inspect.getmembers(module):
|
|
if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel:
|
|
modelClasses[name] = obj
|
|
|
|
return modelClasses
|
|
|
|
class AttributeResponse(BaseModel):
|
|
"""Response model for entity attributes"""
|
|
attributes: List[AttributeDefinition]
|
|
|
|
class Config:
|
|
schema_extra = {
|
|
"example": {
|
|
"attributes": [
|
|
{
|
|
"name": "username",
|
|
"label": "Username",
|
|
"type": "string",
|
|
"required": True,
|
|
"placeholder": "Please enter username",
|
|
"editable": True,
|
|
"visible": True,
|
|
"order": 0
|
|
}
|
|
]
|
|
}
|
|
} |