table rendering with proper api controled rendering

This commit is contained in:
ValueOn AG 2025-11-01 23:00:55 +01:00
parent 37aad732f5
commit 4d3ca3342a
11 changed files with 1065 additions and 144 deletions

View file

@ -0,0 +1,72 @@
"""
Pagination models for server-side pagination, sorting, and filtering.
All models use camelStyle naming convention for consistency with frontend.
"""
from typing import List, Dict, Any, Optional, Generic, TypeVar
from pydantic import BaseModel, Field
import math
T = TypeVar('T')
class SortField(BaseModel):
"""
Single sort field configuration.
"""
field: str = Field(..., description="Field name to sort by")
direction: str = Field(..., description="Sort direction: 'asc' or 'desc'")
class PaginationParams(BaseModel):
"""
Complete pagination state including page, sorting, and filters.
"""
page: int = Field(ge=1, description="Current page number (1-based)")
pageSize: int = Field(ge=1, le=1000, description="Number of items per page")
sort: List[SortField] = Field(default_factory=list, description="List of sort fields in priority order")
filters: Optional[Dict[str, Any]] = Field(default=None, description="Filter criteria (structure TBD for future implementation)")
class PaginationRequest(BaseModel):
"""
Pagination request parameters sent from frontend to backend.
All fields are optional. If pagination=None, no pagination is applied.
"""
pagination: Optional[PaginationParams] = None
class PaginatedResult(BaseModel):
"""
Internal result structure from interface layer.
Used when pagination is requested.
"""
items: List[Any]
totalItems: int
totalPages: int # Calculated as: math.ceil(totalItems / pageSize)
class PaginationMetadata(BaseModel):
"""
Pagination metadata returned to frontend for rendering controls.
Contains all information needed to render pagination UI and handle user interactions.
"""
currentPage: int = Field(..., description="Current page number (1-based)")
pageSize: int = Field(..., description="Number of items per page")
totalItems: int = Field(..., description="Total number of items across all pages (after filters)")
totalPages: int = Field(..., description="Total number of pages (calculated from totalItems / pageSize)")
sort: List[SortField] = Field(..., description="Current sort configuration applied")
filters: Optional[Dict[str, Any]] = Field(default=None, description="Current filters applied (for future use)")
class PaginatedResponse(BaseModel, Generic[T]):
"""
Response containing paginated data and metadata.
"""
items: List[T] = Field(..., description="Array of items for current page")
pagination: Optional[PaginationMetadata] = Field(..., description="Pagination metadata (None if pagination not applied)")
class Config:
arbitrary_types_allowed = True

View file

@ -4,7 +4,8 @@ Manages users and mandates for authentication.
"""
import logging
from typing import Dict, Any, List, Optional
import math
from typing import Dict, Any, List, Optional, Union
from passlib.context import CryptContext
import uuid
@ -26,6 +27,7 @@ from modules.datamodels.datamodelNeutralizer import (
DataNeutraliserConfig,
DataNeutralizerAttributes,
)
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
logger = logging.getLogger(__name__)
@ -233,6 +235,57 @@ class AppObjects:
"""
return self.access.canModify(model_class, recordId)
def _applyFilters(self, records: List[Dict[str, Any]], filters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Apply filter criteria to records (implementation for future filtering)."""
# TODO: Implement filtering logic when needed
return records
def _applySorting(self, records: List[Dict[str, Any]], sortFields: List[Any]) -> List[Dict[str, Any]]:
"""Apply multi-level sorting to records using stable sort (sorts from least to most significant field)."""
if not sortFields:
return records
# Start with a copy to avoid modifying original
sortedRecords = list(records)
# Sort from least significant to most significant field (reverse order)
# Python's sort is stable, so this creates proper multi-level sorting
for sortField in reversed(sortFields):
# Handle both dict and object formats
if isinstance(sortField, dict):
fieldName = sortField.get("field")
direction = sortField.get("direction", "asc")
else:
fieldName = getattr(sortField, "field", None)
direction = getattr(sortField, "direction", "asc")
if not fieldName:
continue
isDesc = (direction == "desc")
def sortKey(record):
value = record.get(fieldName)
# Handle None values - place them at the end for both directions
if value is None:
# Use a special value that sorts last
return (1, "") # (is_none_flag, empty_value) - sorts after (0, ...)
else:
# Return tuple with type indicator for proper comparison
if isinstance(value, (int, float)):
return (0, value)
elif isinstance(value, str):
return (0, value)
elif isinstance(value, bool):
return (0, value)
else:
return (0, str(value))
# Sort with reverse parameter for descending
sortedRecords.sort(key=sortKey, reverse=isDesc)
return sortedRecords
def getInitialId(self, model_class: type) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(model_class)
@ -247,14 +300,52 @@ class AppObjects:
# User methods
def getUsersByMandate(self, mandateId: str) -> List[User]:
"""Returns users for a specific mandate if user has access."""
def getUsersByMandate(self, mandateId: str, pagination: Optional[PaginationParams] = None) -> Union[List[User], PaginatedResult]:
"""
Returns users for a specific mandate if user has access.
Supports optional pagination, sorting, and filtering.
Args:
mandateId: The mandate ID to get users for
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[User]
If pagination is provided: PaginatedResult with items and metadata
"""
# Get users for this mandate
users = self.db.getRecordset(UserInDB, recordFilter={"mandateId": mandateId})
filteredUsers = self._uam(UserInDB, users)
# Convert to User models
return [User(**user) for user in filteredUsers]
# If no pagination requested, return all items
if pagination is None:
return [User(**user) for user in filteredUsers]
# Apply filtering (if filters provided)
if pagination.filters:
filteredUsers = self._applyFilters(filteredUsers, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredUsers = self._applySorting(filteredUsers, pagination.sort)
# Count total items after filters
totalItems = len(filteredUsers)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedUsers = filteredUsers[startIdx:endIdx]
# Convert to model objects
items = [User(**user) for user in pagedUsers]
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def getUserByUsername(self, username: str) -> Optional[User]:
"""Returns a user by username."""
@ -638,11 +729,50 @@ class AppObjects:
# Mandate methods
def getAllMandates(self) -> List[Mandate]:
"""Returns all mandates based on user access level."""
def getAllMandates(self, pagination: Optional[PaginationParams] = None) -> Union[List[Mandate], PaginatedResult]:
"""
Returns all mandates based on user access level.
Supports optional pagination, sorting, and filtering.
Args:
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[Mandate]
If pagination is provided: PaginatedResult with items and metadata
"""
allMandates = self.db.getRecordset(Mandate)
filteredMandates = self._uam(Mandate, allMandates)
return [Mandate(**mandate) for mandate in filteredMandates]
# If no pagination requested, return all items
if pagination is None:
return [Mandate(**mandate) for mandate in filteredMandates]
# Apply filtering (if filters provided)
if pagination.filters:
filteredMandates = self._applyFilters(filteredMandates, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredMandates = self._applySorting(filteredMandates, pagination.sort)
# Count total items after filters
totalItems = len(filteredMandates)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedMandates = filteredMandates[startIdx:endIdx]
# Convert to model objects
items = [Mandate(**mandate) for mandate in pagedMandates]
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def getMandate(self, mandateId: str) -> Optional[Mandate]:
"""Returns a mandate by ID if user has access."""

View file

@ -3,24 +3,16 @@ Interface to LucyDOM database and AI Connectors.
Uses the JSON connector for data access with added language support.
"""
import os
import logging
import uuid
from datetime import datetime, UTC, timezone
from typing import Dict, Any, List, Optional, Union, get_origin, get_args
import math
from typing import Dict, Any, List, Optional, Union
import asyncio
from modules.interfaces.interfaceDbChatAccess import ChatAccess
from modules.datamodels.datamodelChat import (
ActionItem,
TaskResult,
TaskItem,
TaskStatus,
ActionResult
)
from modules.datamodels.datamodelChat import (
UserInputRequest,
ChatDocument,
ChatStat,
ChatLog,
@ -32,6 +24,7 @@ from modules.datamodels.datamodelUam import User
# DYNAMIC PART: Connectors to the Interface
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.timezoneUtils import getUtcTimestamp
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
# Basic Configurations
from modules.shared.configuration import APP_CONFIG
@ -197,6 +190,56 @@ class ChatObjects:
"""Delegate to access control module."""
return self.access.canModify(model_class, recordId)
def _applyFilters(self, records: List[Dict[str, Any]], filters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Apply filter criteria to records (implementation for future filtering)."""
# TODO: Implement filtering logic when needed
return records
def _applySorting(self, records: List[Dict[str, Any]], sortFields: List[Any]) -> List[Dict[str, Any]]:
"""Apply multi-level sorting to records using stable sort (sorts from least to most significant field)."""
if not sortFields:
return records
# Start with a copy to avoid modifying original
sortedRecords = list(records)
# Sort from least significant to most significant field (reverse order)
# Python's sort is stable, so this creates proper multi-level sorting
for sortField in reversed(sortFields):
# Handle both dict and object formats
if isinstance(sortField, dict):
fieldName = sortField.get("field")
direction = sortField.get("direction", "asc")
else:
fieldName = getattr(sortField, "field", None)
direction = getattr(sortField, "direction", "asc")
if not fieldName:
continue
isDesc = (direction == "desc")
def sortKey(record):
value = record.get(fieldName)
# Handle None values - place them at the end for both directions
if value is None:
# Use a special value that sorts last
return (1, "") # (is_none_flag, empty_value) - sorts after (0, ...)
else:
# Return tuple with type indicator for proper comparison
if isinstance(value, (int, float)):
return (0, value)
elif isinstance(value, str):
return (0, value)
elif isinstance(value, bool):
return (0, value)
else:
return (0, str(value))
# Sort with reverse parameter for descending
sortedRecords.sort(key=sortKey, reverse=isDesc)
return sortedRecords
# Utilities
@ -208,10 +251,47 @@ class ChatObjects:
# Workflow methods
def getWorkflows(self) -> List[Dict[str, Any]]:
"""Returns workflows based on user access level."""
def getWorkflows(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
"""
Returns workflows based on user access level.
Supports optional pagination, sorting, and filtering.
Args:
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[Dict[str, Any]]
If pagination is provided: PaginatedResult with items and metadata
"""
allWorkflows = self.db.getRecordset(ChatWorkflow)
return self._uam(ChatWorkflow, allWorkflows)
filteredWorkflows = self._uam(ChatWorkflow, allWorkflows)
# If no pagination requested, return all items
if pagination is None:
return filteredWorkflows
# Apply filtering (if filters provided)
if pagination.filters:
filteredWorkflows = self._applyFilters(filteredWorkflows, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredWorkflows = self._applySorting(filteredWorkflows, pagination.sort)
# Count total items after filters
totalItems = len(filteredWorkflows)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedWorkflows = filteredWorkflows[startIdx:endIdx]
return PaginatedResult(
items=pagedWorkflows,
totalItems=totalItems,
totalPages=totalPages
)
def getWorkflow(self, workflowId: str) -> Optional[ChatWorkflow]:
"""Returns a workflow by ID if user has access."""
@ -389,26 +469,118 @@ class ChatObjects:
# Message methods
def getMessages(self, workflowId: str) -> List[ChatMessage]:
"""Returns messages for a workflow if user has access to the workflow."""
def getMessages(self, workflowId: str, pagination: Optional[PaginationParams] = None) -> Union[List[ChatMessage], PaginatedResult]:
"""
Returns messages for a workflow if user has access to the workflow.
Supports optional pagination, sorting, and filtering.
Args:
workflowId: The workflow ID to get messages for
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[ChatMessage]
If pagination is provided: PaginatedResult with items and metadata
"""
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return []
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return []
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
# Get messages for this workflow from normalized table
messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
# Sort messages by publishedAt timestamp to ensure chronological order
messages.sort(key=lambda x: x.get("publishedAt", x.get("timestamp", "0")))
# Convert raw messages to dict format for sorting/filtering
messageDicts = []
for msg in messages:
messageDicts.append({
"id": msg.get("id"),
"workflowId": msg.get("workflowId"),
"parentMessageId": msg.get("parentMessageId"),
"documentsLabel": msg.get("documentsLabel"),
"message": msg.get("message"),
"role": msg.get("role", "assistant"),
"status": msg.get("status", "step"),
"sequenceNr": msg.get("sequenceNr", 0),
"publishedAt": msg.get("publishedAt", msg.get("timestamp", getUtcTimestamp())),
"success": msg.get("success"),
"actionId": msg.get("actionId"),
"actionMethod": msg.get("actionMethod"),
"actionName": msg.get("actionName"),
"roundNumber": msg.get("roundNumber"),
"taskNumber": msg.get("taskNumber"),
"actionNumber": msg.get("actionNumber"),
"taskProgress": msg.get("taskProgress"),
"actionProgress": msg.get("actionProgress")
})
# Apply default sorting by publishedAt if no sort specified
if pagination is None or not pagination.sort:
messageDicts.sort(key=lambda x: x.get("publishedAt", getUtcTimestamp()))
# Apply filtering (if filters provided)
if pagination and pagination.filters:
messageDicts = self._applyFilters(messageDicts, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination and pagination.sort:
messageDicts = self._applySorting(messageDicts, pagination.sort)
# If no pagination requested, return all items
if pagination is None:
# Convert messages to ChatMessage objects and load documents
chat_messages = []
for msg in messageDicts:
# Load documents from normalized documents table
documents = self.getDocuments(msg["id"])
# Create ChatMessage object with loaded documents
chat_message = ChatMessage(
id=msg["id"],
workflowId=msg["workflowId"],
parentMessageId=msg.get("parentMessageId"),
documents=documents,
documentsLabel=msg.get("documentsLabel"),
message=msg.get("message"),
role=msg.get("role", "assistant"),
status=msg.get("status", "step"),
sequenceNr=msg.get("sequenceNr", 0),
publishedAt=msg.get("publishedAt", getUtcTimestamp()),
success=msg.get("success"),
actionId=msg.get("actionId"),
actionMethod=msg.get("actionMethod"),
actionName=msg.get("actionName"),
roundNumber=msg.get("roundNumber"),
taskNumber=msg.get("taskNumber"),
actionNumber=msg.get("actionNumber"),
taskProgress=msg.get("taskProgress"),
actionProgress=msg.get("actionProgress")
)
chat_messages.append(chat_message)
return chat_messages
# Count total items after filters
totalItems = len(messageDicts)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedMessageDicts = messageDicts[startIdx:endIdx]
# Convert messages to ChatMessage objects and load documents
chat_messages = []
for msg in messages:
for msg in pagedMessageDicts:
# Load documents from normalized documents table
documents = self.getDocuments(msg["id"])
@ -437,8 +609,11 @@ class ChatObjects:
chat_messages.append(chat_message)
return chat_messages
return PaginatedResult(
items=chat_messages,
totalItems=totalItems,
totalPages=totalPages
)
def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
"""Creates a message for a workflow if user has access."""
@ -765,24 +940,84 @@ class ChatObjects:
# Log methods
def getLogs(self, workflowId: str) -> List[ChatLog]:
"""Returns logs for a workflow if user has access to the workflow."""
def getLogs(self, workflowId: str, pagination: Optional[PaginationParams] = None) -> Union[List[ChatLog], PaginatedResult]:
"""
Returns logs for a workflow if user has access to the workflow.
Supports optional pagination, sorting, and filtering.
Args:
workflowId: The workflow ID to get logs for
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[ChatLog]
If pagination is provided: PaginatedResult with items and metadata
"""
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return []
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return []
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
# Get logs for this workflow from normalized table
logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
# Sort logs by timestamp (Unix timestamps)
logs.sort(key=lambda x: float(x.get("timestamp", 0)))
# Convert raw logs to dict format for sorting/filtering
logDicts = []
for log in logs:
logDicts.append({
"id": log.get("id"),
"workflowId": log.get("workflowId"),
"message": log.get("message"),
"type": log.get("type"),
"timestamp": log.get("timestamp", getUtcTimestamp()),
"agentName": log.get("agentName"),
"status": log.get("status"),
"progress": log.get("progress"),
"mandateId": log.get("mandateId"),
"userId": log.get("userId")
})
return [ChatLog(**log) for log in logs]
# Apply default sorting by timestamp if no sort specified
if pagination is None or not pagination.sort:
logDicts.sort(key=lambda x: float(x.get("timestamp", 0)))
# Apply filtering (if filters provided)
if pagination and pagination.filters:
logDicts = self._applyFilters(logDicts, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination and pagination.sort:
logDicts = self._applySorting(logDicts, pagination.sort)
# If no pagination requested, return all items
if pagination is None:
return [ChatLog(**log) for log in logDicts]
# Count total items after filters
totalItems = len(logDicts)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedLogDicts = logDicts[startIdx:endIdx]
# Convert to model objects
items = [ChatLog(**log) for log in pagedLogDicts]
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def createLog(self, logData: Dict[str, Any]) -> ChatLog:
"""Creates a log entry for a workflow if user has access."""

View file

@ -7,7 +7,8 @@ import os
import logging
import base64
import hashlib
from typing import Dict, Any, List, Optional
import math
from typing import Dict, Any, List, Optional, Union
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.interfaces.interfaceDbComponentAccess import ComponentAccess
@ -17,6 +18,7 @@ from modules.datamodels.datamodelVoice import VoiceSettings
from modules.datamodels.datamodelUam import User, Mandate
from modules.shared.configuration import APP_CONFIG
from modules.shared.timezoneUtils import getUtcTimestamp
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
logger = logging.getLogger(__name__)
@ -244,6 +246,56 @@ class ComponentObjects:
"""Delegate to access control module."""
return self.access.canModify(model_class, recordId)
def _applyFilters(self, records: List[Dict[str, Any]], filters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Apply filter criteria to records (implementation for future filtering)."""
# TODO: Implement filtering logic when needed
return records
def _applySorting(self, records: List[Dict[str, Any]], sortFields: List[Any]) -> List[Dict[str, Any]]:
"""Apply multi-level sorting to records using stable sort (sorts from least to most significant field)."""
if not sortFields:
return records
# Start with a copy to avoid modifying original
sortedRecords = list(records)
# Sort from least significant to most significant field (reverse order)
# Python's sort is stable, so this creates proper multi-level sorting
for sortField in reversed(sortFields):
# Handle both dict and object formats
if isinstance(sortField, dict):
fieldName = sortField.get("field")
direction = sortField.get("direction", "asc")
else:
fieldName = getattr(sortField, "field", None)
direction = getattr(sortField, "direction", "asc")
if not fieldName:
continue
isDesc = (direction == "desc")
def sortKey(record):
value = record.get(fieldName)
# Handle None values - place them at the end for both directions
if value is None:
# Use a special value that sorts last
return (1, "") # (is_none_flag, empty_value) - sorts after (0, ...)
else:
# Return tuple with type indicator for proper comparison
if isinstance(value, (int, float)):
return (0, value)
elif isinstance(value, str):
return (0, value)
elif isinstance(value, bool):
return (0, value)
else:
return (0, str(value))
# Sort with reverse parameter for descending
sortedRecords.sort(key=sortKey, reverse=isDesc)
return sortedRecords
# Utilities
@ -255,18 +307,57 @@ class ComponentObjects:
# Prompt methods
def getAllPrompts(self) -> List[Prompt]:
"""Returns prompts based on user access level."""
def getAllPrompts(self, pagination: Optional[PaginationParams] = None) -> Union[List[Prompt], PaginatedResult]:
"""
Returns prompts based on user access level.
Supports optional pagination, sorting, and filtering.
Args:
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[Prompt]
If pagination is provided: PaginatedResult with items and metadata
"""
try:
allPrompts = self.db.getRecordset(Prompt)
filteredPrompts = self._uam(Prompt, allPrompts)
# Convert to Prompt objects
return [Prompt(**prompt) for prompt in filteredPrompts]
# If no pagination requested, return all items
if pagination is None:
return [Prompt(**prompt) for prompt in filteredPrompts]
# Apply filtering (if filters provided)
if pagination.filters:
filteredPrompts = self._applyFilters(filteredPrompts, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredPrompts = self._applySorting(filteredPrompts, pagination.sort)
# Count total items after filters
totalItems = len(filteredPrompts)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedPrompts = filteredPrompts[startIdx:endIdx]
# Convert to model objects
items = [Prompt(**prompt) for prompt in pagedPrompts]
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
except Exception as e:
logger.error(f"Error getting prompts: {str(e)}")
return []
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
def getPrompt(self, promptId: str) -> Optional[Prompt]:
"""Returns a prompt by ID if user has access."""
@ -454,39 +545,79 @@ class ComponentObjects:
# File methods - metadata-based operations
def getAllFiles(self) -> List[FileItem]:
"""Returns files based on user access level."""
def getAllFiles(self, pagination: Optional[PaginationParams] = None) -> Union[List[FileItem], PaginatedResult]:
"""
Returns files based on user access level.
Supports optional pagination, sorting, and filtering.
Args:
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[FileItem]
If pagination is provided: PaginatedResult with items and metadata
"""
allFiles = self.db.getRecordset(FileItem)
filteredFiles = self._uam(FileItem, allFiles)
# Convert database records to FileItem instances
fileItems = []
for file in filteredFiles:
try:
# Ensure proper values, use defaults for invalid data
creationDate = file.get("creationDate")
if creationDate is None or not isinstance(creationDate, (int, float)) or creationDate <= 0:
creationDate = getUtcTimestamp()
fileName = file.get("fileName")
if not fileName or fileName == "None":
continue # Skip records with invalid fileName
fileItem = FileItem(
id=file.get("id"),
mandateId=file.get("mandateId"),
fileName=fileName,
mimeType=file.get("mimeType"),
fileHash=file.get("fileHash"),
fileSize=file.get("fileSize"),
creationDate=creationDate
)
fileItems.append(fileItem)
except Exception as e:
logger.warning(f"Skipping invalid file record: {str(e)}")
continue
return fileItems
# Convert database records to FileItem instances (for both paginated and non-paginated)
def convertFileItems(files):
fileItems = []
for file in files:
try:
# Ensure proper values, use defaults for invalid data
creationDate = file.get("creationDate")
if creationDate is None or not isinstance(creationDate, (int, float)) or creationDate <= 0:
creationDate = getUtcTimestamp()
fileName = file.get("fileName")
if not fileName or fileName == "None":
continue # Skip records with invalid fileName
fileItem = FileItem(
id=file.get("id"),
mandateId=file.get("mandateId"),
fileName=fileName,
mimeType=file.get("mimeType"),
fileHash=file.get("fileHash"),
fileSize=file.get("fileSize"),
creationDate=creationDate
)
fileItems.append(fileItem)
except Exception as e:
logger.warning(f"Skipping invalid file record: {str(e)}")
continue
return fileItems
# If no pagination requested, return all items
if pagination is None:
return convertFileItems(filteredFiles)
# Apply filtering (if filters provided)
if pagination.filters:
filteredFiles = self._applyFilters(filteredFiles, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredFiles = self._applySorting(filteredFiles, pagination.sort)
# Count total items after filters
totalItems = len(filteredFiles)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedFiles = filteredFiles[startIdx:endIdx]
# Convert to model objects
items = convertFileItems(pagedFiles)
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def getFile(self, fileId: str) -> Optional[FileItem]:
"""Returns a file by ID if user has access."""

View file

@ -2,6 +2,7 @@ from fastapi import APIRouter, HTTPException, Depends, File, UploadFile, Form, P
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Optional
import logging
import json
# Import auth module
from modules.security.auth import limiter, getCurrentUser
@ -11,6 +12,7 @@ import modules.interfaces.interfaceDbComponentObjects as interfaceDbComponentObj
from modules.datamodels.datamodelFiles import FileItem, FilePreview
from modules.shared.attributeUtils import getModelAttributeDefinitions
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
logger = logging.getLogger(__name__)
@ -31,21 +33,61 @@ router = APIRouter(
}
)
@router.get("/list", response_model=List[FileItem])
@router.get("/list", response_model=PaginatedResponse[FileItem])
@limiter.limit("30/minute")
async def get_files(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> List[FileItem]:
"""Get all files"""
) -> PaginatedResponse[FileItem]:
"""
Get files with optional pagination, sorting, and filtering.
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Examples:
- GET /api/files/list (no pagination - returns all items)
- GET /api/files/list?pagination={"page":1,"pageSize":10,"sort":[]}
- GET /api/files/list?pagination={"page":2,"pageSize":20,"sort":[{"field":"fileName","direction":"asc"}]}
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
managementInterface = interfaceDbComponentObjects.getInterface(currentUser)
result = managementInterface.getAllFiles(pagination=paginationParams)
# Get all files generically - only metadata, no binary data
files = managementInterface.getAllFiles()
# Return files directly since they are already FileItem objects
return files
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[FileItem]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting files: {str(e)}")
raise HTTPException(

View file

@ -3,10 +3,11 @@ Mandate routes for the backend API.
Implements the endpoints for mandate management.
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response
from typing import List, Dict, Any
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
import json
# Import auth module
from modules.security.auth import limiter, getCurrentUser
@ -17,6 +18,7 @@ from modules.shared.attributeUtils import getModelAttributeDefinitions
# Import the model classes
from modules.datamodels.datamodelUam import Mandate, User
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
logger = logging.getLogger(__name__)
@ -31,17 +33,60 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
@router.get("/", response_model=List[Mandate])
@router.get("/", response_model=PaginatedResponse[Mandate])
@limiter.limit("30/minute")
async def get_mandates(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> List[Mandate]:
"""Get all mandates"""
) -> PaginatedResponse[Mandate]:
"""
Get mandates with optional pagination, sorting, and filtering.
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Examples:
- GET /api/mandates/ (no pagination - returns all items)
- GET /api/mandates/?pagination={"page":1,"pageSize":10,"sort":[]}
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
appInterface = interfaceDbAppObjects.getInterface(currentUser)
mandates = appInterface.getAllMandates()
return mandates
result = appInterface.getAllMandates(pagination=paginationParams)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[Mandate]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting mandates: {str(e)}")
raise HTTPException(

View file

@ -1,7 +1,8 @@
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request
from typing import List, Dict, Any
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
import json
# Import auth module
from modules.security.auth import limiter, getCurrentUser
@ -10,6 +11,7 @@ from modules.security.auth import limiter, getCurrentUser
import modules.interfaces.interfaceDbComponentObjects as interfaceDbComponentObjects
from modules.datamodels.datamodelUtils import Prompt
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
logger = logging.getLogger(__name__)
@ -21,16 +23,58 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
@router.get("", response_model=List[Prompt])
@router.get("", response_model=PaginatedResponse[Prompt])
@limiter.limit("30/minute")
async def get_prompts(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> List[Prompt]:
"""Get all prompts"""
) -> PaginatedResponse[Prompt]:
"""
Get prompts with optional pagination, sorting, and filtering.
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Examples:
- GET /api/prompts (no pagination - returns all items)
- GET /api/prompts?pagination={"page":1,"pageSize":10,"sort":[]}
- GET /api/prompts?pagination={"page":2,"pageSize":20,"sort":[{"field":"name","direction":"asc"}]}
"""
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
managementInterface = interfaceDbComponentObjects.getInterface(currentUser)
prompts = managementInterface.getAllPrompts()
return prompts
result = managementInterface.getAllPrompts(pagination=paginationParams)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[Prompt]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
@router.post("", response_model=Prompt)
@limiter.limit("10/minute")

View file

@ -3,10 +3,11 @@ User routes for the backend API.
Implements the endpoints for user management.
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
import json
# Import interfaces and models
import modules.interfaces.interfaceDbAppObjects as interfaceDbAppObjects
@ -14,6 +15,7 @@ from modules.security.auth import getCurrentUser, limiter, getCurrentUser
# Import the attribute definition and helper functions
from modules.datamodels.datamodelUam import User, UserPrivilege
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
logger = logging.getLogger(__name__)
@ -24,21 +26,65 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
@router.get("/", response_model=List[User])
@router.get("/", response_model=PaginatedResponse[User])
@limiter.limit("30/minute")
async def get_users(
request: Request,
mandateId: Optional[str] = None,
mandateId: Optional[str] = Query(None, description="Mandate ID to filter users"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> List[User]:
"""Get all users in the current mandate"""
) -> PaginatedResponse[User]:
"""
Get users with optional pagination, sorting, and filtering.
Query Parameters:
- mandateId: Optional mandate ID to filter users
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Examples:
- GET /api/users/ (no pagination - returns all users)
- GET /api/users/?pagination={"page":1,"pageSize":10,"sort":[]}
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
appInterface = interfaceDbAppObjects.getInterface(currentUser)
# If mandateId is provided, use it, otherwise use the current user's mandate
targetMandateId = mandateId or currentUser.mandateId
# Get all users without filtering by enabled status
users = appInterface.getUsersByMandate(targetMandateId)
return users
# Get users with optional pagination
result = appInterface.getUsersByMandate(targetMandateId, pagination=paginationParams)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[User]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting users: {str(e)}")
raise HTTPException(

View file

@ -325,9 +325,17 @@ async def get_available_languages(currentUser: User = Depends(getCurrentUser)):
@router.get("/voices")
async def get_available_voices(
languageCode: Optional[str] = None,
language_code: Optional[str] = None, # Accept both camelCase and snake_case
currentUser: User = Depends(getCurrentUser)
):
"""Get available voices from Google Cloud Text-to-Speech."""
"""
Get available voices from Google Cloud Text-to-Speech.
Accepts languageCode (camelCase) or language_code (snake_case) query parameter.
"""
# Use language_code if provided (frontend sends this), otherwise use languageCode
if language_code:
languageCode = language_code
try:
logger.info(f"🎤 Getting available voices, language filter: {languageCode}")

View file

@ -4,6 +4,7 @@ Implements the endpoints for workflow management according to the state machine.
"""
import logging
import json
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Response, status, Request
@ -24,6 +25,7 @@ from modules.datamodels.datamodelChat import (
)
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
@ -43,18 +45,51 @@ def getServiceChat(currentUser: User):
return interfaceDbChatObjects.getInterface(currentUser)
# Consolidated endpoint for getting all workflows
@router.get("/", response_model=List[ChatWorkflow])
@router.get("/", response_model=PaginatedResponse[ChatWorkflow])
@limiter.limit("120/minute")
async def get_workflows(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> List[ChatWorkflow]:
"""Get all workflows for the current user."""
) -> PaginatedResponse[ChatWorkflow]:
"""
Get workflows with optional pagination, sorting, and filtering.
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Examples:
- GET /api/workflows/ (no pagination - returns all workflows)
- GET /api/workflows/?pagination={"page":1,"pageSize":10,"sort":[]}
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
appInterface = getInterface(currentUser)
workflows_data = appInterface.getWorkflows()
result = appInterface.getWorkflows(pagination=paginationParams)
# Convert raw dictionaries to ChatWorkflow objects by loading each workflow properly
# If pagination was requested, result is PaginatedResult with items as dicts
# If no pagination, result is List[Dict]
if paginationParams:
workflows_data = result.items
totalItems = result.totalItems
totalPages = result.totalPages
else:
workflows_data = result
totalItems = len(result)
totalPages = 1
workflows = []
for workflow_data in workflows_data:
try:
@ -67,7 +102,25 @@ async def get_workflows(
# Skip invalid workflows instead of failing the entire request
continue
return workflows
if paginationParams:
return PaginatedResponse(
items=workflows,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=totalItems,
totalPages=totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=workflows,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflows: {str(e)}")
raise HTTPException(
@ -185,16 +238,36 @@ async def get_workflow_status(
)
# API Endpoint for workflow logs with selective data transfer
@router.get("/{workflowId}/logs", response_model=List[ChatLog])
@router.get("/{workflowId}/logs", response_model=PaginatedResponse[ChatLog])
@limiter.limit("120/minute")
async def get_workflow_logs(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs"),
logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs (legacy selective data transfer)"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> List[ChatLog]:
"""Get logs for a workflow with support for selective data transfer."""
) -> PaginatedResponse[ChatLog]:
"""
Get logs for a workflow with optional pagination, sorting, and filtering.
Also supports legacy selective data transfer via logId parameter.
Query Parameters:
- logId: Optional log ID for selective data transfer (returns only logs after this ID)
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get service center
interfaceDbChat = getServiceChat(currentUser)
@ -206,18 +279,43 @@ async def get_workflow_logs(
detail=f"Workflow with ID {workflowId} not found"
)
# Get all logs
allLogs = interfaceDbChat.getLogs(workflowId)
# Get logs with optional pagination
result = interfaceDbChat.getLogs(workflowId, pagination=paginationParams)
# Apply selective data transfer if logId is provided
# Handle legacy selective data transfer if logId is provided (takes precedence over pagination)
if logId:
# If pagination was requested, result is PaginatedResult, otherwise List[ChatLog]
allLogs = result.items if paginationParams else result
# Find the index of the log with the given ID
logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
if logIndex >= 0:
# Return only logs after the specified log
return allLogs[logIndex + 1:]
filteredLogs = allLogs[logIndex + 1:]
return PaginatedResponse(
items=filteredLogs,
pagination=None
)
return allLogs
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[ChatLog]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
@ -228,16 +326,36 @@ async def get_workflow_logs(
)
# API Endpoint for workflow messages with selective data transfer
@router.get("/{workflowId}/messages", response_model=List[ChatMessage])
@router.get("/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage])
@limiter.limit("120/minute")
async def get_workflow_messages(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages"),
messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages (legacy selective data transfer)"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> List[ChatMessage]:
"""Get messages for a workflow with support for selective data transfer."""
) -> PaginatedResponse[ChatMessage]:
"""
Get messages for a workflow with optional pagination, sorting, and filtering.
Also supports legacy selective data transfer via messageId parameter.
Query Parameters:
- messageId: Optional message ID for selective data transfer (returns only messages after this ID)
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get service center
interfaceDbChat = getServiceChat(currentUser)
@ -249,19 +367,43 @@ async def get_workflow_messages(
detail=f"Workflow with ID {workflowId} not found"
)
# Get all messages
allMessages = interfaceDbChat.getMessages(workflowId)
# Get messages with optional pagination
result = interfaceDbChat.getMessages(workflowId, pagination=paginationParams)
# Apply selective data transfer if messageId is provided
# Handle legacy selective data transfer if messageId is provided (takes precedence over pagination)
if messageId:
# If pagination was requested, result is PaginatedResult, otherwise List[ChatMessage]
allMessages = result.items if paginationParams else result
# Find the index of the message with the given ID
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
# Return only messages after the specified message
filteredMessages = allMessages[messageIndex + 1:]
return filteredMessages
return PaginatedResponse(
items=filteredMessages,
pagination=None
)
return allMessages
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[ChatMessage]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:

View file

@ -7,6 +7,9 @@ from typing import Dict, Any, List, Type, Optional
import inspect
import importlib
import os
import logging
logger = logging.getLogger(__name__)
# Define the AttributeDefinition class here instead of importing it
@ -107,7 +110,9 @@ def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguag
fields = modelClass.model_fields
for name, field in fields.items():
# Extract frontend metadata from field info
field_info = field.field_info if hasattr(field, "field_info") else None
# In Pydantic v2, the field from model_fields.items() IS the FieldInfo object
# FieldInfo objects have the 'extra' dict directly on them
field_info = field
# Check both direct attributes and extra field for frontend metadata
frontend_type = None
frontend_readonly = False
@ -115,7 +120,7 @@ def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguag
frontend_options = None
if field_info:
# Try direct attributes first
# Try direct attributes first (though these won't exist for custom kwargs)
frontend_type = getattr(field_info, "frontend_type", None)
frontend_readonly = getattr(field_info, "frontend_readonly", False)
frontend_required = getattr(
@ -123,22 +128,43 @@ def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguag
)
frontend_options = getattr(field_info, "frontend_options", None)
# If not found, check extra field
if hasattr(field_info, "extra") and field_info.extra:
if frontend_type is None:
frontend_type = field_info.extra.get("frontend_type")
# If not found, check json_schema_extra (Pydantic v2 stores custom kwargs here)
if frontend_type is None and hasattr(field_info, "json_schema_extra"):
json_extra = field_info.json_schema_extra
if isinstance(json_extra, dict):
frontend_type = json_extra.get("frontend_type")
elif callable(json_extra):
# If it's a callable, we can't easily extract from it, skip
pass
# Check extra field (Pydantic v2 stores custom kwargs here)
# This is the main location where frontend_type is stored
if hasattr(field_info, "extra"):
extra_dict = field_info.extra
if isinstance(extra_dict, dict):
if frontend_type is None:
frontend_type = extra_dict.get("frontend_type")
# Also extract other fields from extra if it exists
if hasattr(field_info, "extra") and isinstance(field_info.extra, dict):
extra_dict = field_info.extra
if not frontend_readonly:
frontend_readonly = field_info.extra.get(
"frontend_readonly", False
)
if (
frontend_required == field.is_required()
): # Only override if we didn't get it from direct attribute
frontend_required = field_info.extra.get(
"frontend_required", frontend_required
)
frontend_readonly = extra_dict.get("frontend_readonly", False)
if frontend_required == field.is_required():
frontend_required = extra_dict.get("frontend_required", frontend_required)
if frontend_options is None:
frontend_options = field_info.extra.get("frontend_options")
frontend_options = extra_dict.get("frontend_options")
# Also check json_schema_extra for other fields
if hasattr(field_info, "json_schema_extra"):
json_extra = field_info.json_schema_extra
if isinstance(json_extra, dict):
if not frontend_readonly and "frontend_readonly" in json_extra:
frontend_readonly = json_extra.get("frontend_readonly", False)
if frontend_required == field.is_required() and "frontend_required" in json_extra:
frontend_required = json_extra.get("frontend_required", frontend_required)
if frontend_options is None and "frontend_options" in json_extra:
frontend_options = json_extra.get("frontend_options")
# Use frontend type if available, otherwise fall back to Python type
field_type = (