From 4d3ca3342a513e8da6a2a4cfb829044ddde48583 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sat, 1 Nov 2025 23:00:55 +0100
Subject: [PATCH] table rendering with proper api controled rendering
---
modules/datamodels/datamodelPagination.py | 72 +++++
modules/interfaces/interfaceDbAppObjects.py | 146 ++++++++-
modules/interfaces/interfaceDbChatObjects.py | 295 ++++++++++++++++--
.../interfaces/interfaceDbComponentObjects.py | 203 +++++++++---
modules/routes/routeDataFiles.py | 58 +++-
modules/routes/routeDataMandates.py | 59 +++-
modules/routes/routeDataPrompts.py | 58 +++-
modules/routes/routeDataUsers.py | 62 +++-
modules/routes/routeVoiceGoogle.py | 10 +-
modules/routes/routeWorkflows.py | 188 +++++++++--
modules/shared/attributeUtils.py | 58 +++-
11 files changed, 1065 insertions(+), 144 deletions(-)
create mode 100644 modules/datamodels/datamodelPagination.py
diff --git a/modules/datamodels/datamodelPagination.py b/modules/datamodels/datamodelPagination.py
new file mode 100644
index 00000000..3222e0e7
--- /dev/null
+++ b/modules/datamodels/datamodelPagination.py
@@ -0,0 +1,72 @@
+"""
+Pagination models for server-side pagination, sorting, and filtering.
+
+All models use camelStyle naming convention for consistency with frontend.
+"""
+
+from typing import List, Dict, Any, Optional, Generic, TypeVar
+from pydantic import BaseModel, Field
+import math
+
+T = TypeVar('T')
+
+
+class SortField(BaseModel):
+ """
+ Single sort field configuration.
+ """
+ field: str = Field(..., description="Field name to sort by")
+ direction: str = Field(..., description="Sort direction: 'asc' or 'desc'")
+
+
+class PaginationParams(BaseModel):
+ """
+ Complete pagination state including page, sorting, and filters.
+ """
+ page: int = Field(ge=1, description="Current page number (1-based)")
+ pageSize: int = Field(ge=1, le=1000, description="Number of items per page")
+ sort: List[SortField] = Field(default_factory=list, description="List of sort fields in priority order")
+ filters: Optional[Dict[str, Any]] = Field(default=None, description="Filter criteria (structure TBD for future implementation)")
+
+
+class PaginationRequest(BaseModel):
+ """
+ Pagination request parameters sent from frontend to backend.
+ All fields are optional. If pagination=None, no pagination is applied.
+ """
+ pagination: Optional[PaginationParams] = None
+
+
+class PaginatedResult(BaseModel):
+ """
+ Internal result structure from interface layer.
+ Used when pagination is requested.
+ """
+ items: List[Any]
+ totalItems: int
+ totalPages: int # Calculated as: math.ceil(totalItems / pageSize)
+
+
+class PaginationMetadata(BaseModel):
+ """
+ Pagination metadata returned to frontend for rendering controls.
+ Contains all information needed to render pagination UI and handle user interactions.
+ """
+ currentPage: int = Field(..., description="Current page number (1-based)")
+ pageSize: int = Field(..., description="Number of items per page")
+ totalItems: int = Field(..., description="Total number of items across all pages (after filters)")
+ totalPages: int = Field(..., description="Total number of pages (calculated from totalItems / pageSize)")
+ sort: List[SortField] = Field(..., description="Current sort configuration applied")
+ filters: Optional[Dict[str, Any]] = Field(default=None, description="Current filters applied (for future use)")
+
+
+class PaginatedResponse(BaseModel, Generic[T]):
+ """
+ Response containing paginated data and metadata.
+ """
+ items: List[T] = Field(..., description="Array of items for current page")
+ pagination: Optional[PaginationMetadata] = Field(..., description="Pagination metadata (None if pagination not applied)")
+
+ class Config:
+ arbitrary_types_allowed = True
+
diff --git a/modules/interfaces/interfaceDbAppObjects.py b/modules/interfaces/interfaceDbAppObjects.py
index 8da37568..938fe756 100644
--- a/modules/interfaces/interfaceDbAppObjects.py
+++ b/modules/interfaces/interfaceDbAppObjects.py
@@ -4,7 +4,8 @@ Manages users and mandates for authentication.
"""
import logging
-from typing import Dict, Any, List, Optional
+import math
+from typing import Dict, Any, List, Optional, Union
from passlib.context import CryptContext
import uuid
@@ -26,6 +27,7 @@ from modules.datamodels.datamodelNeutralizer import (
DataNeutraliserConfig,
DataNeutralizerAttributes,
)
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
logger = logging.getLogger(__name__)
@@ -233,6 +235,57 @@ class AppObjects:
"""
return self.access.canModify(model_class, recordId)
+ def _applyFilters(self, records: List[Dict[str, Any]], filters: Dict[str, Any]) -> List[Dict[str, Any]]:
+ """Apply filter criteria to records (implementation for future filtering)."""
+ # TODO: Implement filtering logic when needed
+ return records
+
+ def _applySorting(self, records: List[Dict[str, Any]], sortFields: List[Any]) -> List[Dict[str, Any]]:
+ """Apply multi-level sorting to records using stable sort (sorts from least to most significant field)."""
+ if not sortFields:
+ return records
+
+ # Start with a copy to avoid modifying original
+ sortedRecords = list(records)
+
+ # Sort from least significant to most significant field (reverse order)
+ # Python's sort is stable, so this creates proper multi-level sorting
+ for sortField in reversed(sortFields):
+ # Handle both dict and object formats
+ if isinstance(sortField, dict):
+ fieldName = sortField.get("field")
+ direction = sortField.get("direction", "asc")
+ else:
+ fieldName = getattr(sortField, "field", None)
+ direction = getattr(sortField, "direction", "asc")
+
+ if not fieldName:
+ continue
+
+ isDesc = (direction == "desc")
+
+ def sortKey(record):
+ value = record.get(fieldName)
+ # Handle None values - place them at the end for both directions
+ if value is None:
+ # Use a special value that sorts last
+ return (1, "") # (is_none_flag, empty_value) - sorts after (0, ...)
+ else:
+ # Return tuple with type indicator for proper comparison
+ if isinstance(value, (int, float)):
+ return (0, value)
+ elif isinstance(value, str):
+ return (0, value)
+ elif isinstance(value, bool):
+ return (0, value)
+ else:
+ return (0, str(value))
+
+ # Sort with reverse parameter for descending
+ sortedRecords.sort(key=sortKey, reverse=isDesc)
+
+ return sortedRecords
+
def getInitialId(self, model_class: type) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(model_class)
@@ -247,14 +300,52 @@ class AppObjects:
# User methods
- def getUsersByMandate(self, mandateId: str) -> List[User]:
- """Returns users for a specific mandate if user has access."""
+ def getUsersByMandate(self, mandateId: str, pagination: Optional[PaginationParams] = None) -> Union[List[User], PaginatedResult]:
+ """
+ Returns users for a specific mandate if user has access.
+ Supports optional pagination, sorting, and filtering.
+
+ Args:
+ mandateId: The mandate ID to get users for
+ pagination: Optional pagination parameters. If None, returns all items.
+
+ Returns:
+ If pagination is None: List[User]
+ If pagination is provided: PaginatedResult with items and metadata
+ """
# Get users for this mandate
users = self.db.getRecordset(UserInDB, recordFilter={"mandateId": mandateId})
filteredUsers = self._uam(UserInDB, users)
- # Convert to User models
- return [User(**user) for user in filteredUsers]
+ # If no pagination requested, return all items
+ if pagination is None:
+ return [User(**user) for user in filteredUsers]
+
+ # Apply filtering (if filters provided)
+ if pagination.filters:
+ filteredUsers = self._applyFilters(filteredUsers, pagination.filters)
+
+ # Apply sorting (in order of sortFields)
+ if pagination.sort:
+ filteredUsers = self._applySorting(filteredUsers, pagination.sort)
+
+ # Count total items after filters
+ totalItems = len(filteredUsers)
+ totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
+
+ # Apply pagination (skip/limit)
+ startIdx = (pagination.page - 1) * pagination.pageSize
+ endIdx = startIdx + pagination.pageSize
+ pagedUsers = filteredUsers[startIdx:endIdx]
+
+ # Convert to model objects
+ items = [User(**user) for user in pagedUsers]
+
+ return PaginatedResult(
+ items=items,
+ totalItems=totalItems,
+ totalPages=totalPages
+ )
def getUserByUsername(self, username: str) -> Optional[User]:
"""Returns a user by username."""
@@ -638,11 +729,50 @@ class AppObjects:
# Mandate methods
- def getAllMandates(self) -> List[Mandate]:
- """Returns all mandates based on user access level."""
+ def getAllMandates(self, pagination: Optional[PaginationParams] = None) -> Union[List[Mandate], PaginatedResult]:
+ """
+ Returns all mandates based on user access level.
+ Supports optional pagination, sorting, and filtering.
+
+ Args:
+ pagination: Optional pagination parameters. If None, returns all items.
+
+ Returns:
+ If pagination is None: List[Mandate]
+ If pagination is provided: PaginatedResult with items and metadata
+ """
allMandates = self.db.getRecordset(Mandate)
filteredMandates = self._uam(Mandate, allMandates)
- return [Mandate(**mandate) for mandate in filteredMandates]
+
+ # If no pagination requested, return all items
+ if pagination is None:
+ return [Mandate(**mandate) for mandate in filteredMandates]
+
+ # Apply filtering (if filters provided)
+ if pagination.filters:
+ filteredMandates = self._applyFilters(filteredMandates, pagination.filters)
+
+ # Apply sorting (in order of sortFields)
+ if pagination.sort:
+ filteredMandates = self._applySorting(filteredMandates, pagination.sort)
+
+ # Count total items after filters
+ totalItems = len(filteredMandates)
+ totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
+
+ # Apply pagination (skip/limit)
+ startIdx = (pagination.page - 1) * pagination.pageSize
+ endIdx = startIdx + pagination.pageSize
+ pagedMandates = filteredMandates[startIdx:endIdx]
+
+ # Convert to model objects
+ items = [Mandate(**mandate) for mandate in pagedMandates]
+
+ return PaginatedResult(
+ items=items,
+ totalItems=totalItems,
+ totalPages=totalPages
+ )
def getMandate(self, mandateId: str) -> Optional[Mandate]:
"""Returns a mandate by ID if user has access."""
diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py
index deea239a..78f8a2c5 100644
--- a/modules/interfaces/interfaceDbChatObjects.py
+++ b/modules/interfaces/interfaceDbChatObjects.py
@@ -3,24 +3,16 @@ Interface to LucyDOM database and AI Connectors.
Uses the JSON connector for data access with added language support.
"""
-import os
import logging
import uuid
-from datetime import datetime, UTC, timezone
-from typing import Dict, Any, List, Optional, Union, get_origin, get_args
+import math
+from typing import Dict, Any, List, Optional, Union
import asyncio
from modules.interfaces.interfaceDbChatAccess import ChatAccess
+
from modules.datamodels.datamodelChat import (
- ActionItem,
- TaskResult,
- TaskItem,
- TaskStatus,
- ActionResult
-)
-from modules.datamodels.datamodelChat import (
- UserInputRequest,
ChatDocument,
ChatStat,
ChatLog,
@@ -32,6 +24,7 @@ from modules.datamodels.datamodelUam import User
# DYNAMIC PART: Connectors to the Interface
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.timezoneUtils import getUtcTimestamp
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
# Basic Configurations
from modules.shared.configuration import APP_CONFIG
@@ -197,6 +190,56 @@ class ChatObjects:
"""Delegate to access control module."""
return self.access.canModify(model_class, recordId)
+ def _applyFilters(self, records: List[Dict[str, Any]], filters: Dict[str, Any]) -> List[Dict[str, Any]]:
+ """Apply filter criteria to records (implementation for future filtering)."""
+ # TODO: Implement filtering logic when needed
+ return records
+
+ def _applySorting(self, records: List[Dict[str, Any]], sortFields: List[Any]) -> List[Dict[str, Any]]:
+ """Apply multi-level sorting to records using stable sort (sorts from least to most significant field)."""
+ if not sortFields:
+ return records
+
+ # Start with a copy to avoid modifying original
+ sortedRecords = list(records)
+
+ # Sort from least significant to most significant field (reverse order)
+ # Python's sort is stable, so this creates proper multi-level sorting
+ for sortField in reversed(sortFields):
+ # Handle both dict and object formats
+ if isinstance(sortField, dict):
+ fieldName = sortField.get("field")
+ direction = sortField.get("direction", "asc")
+ else:
+ fieldName = getattr(sortField, "field", None)
+ direction = getattr(sortField, "direction", "asc")
+
+ if not fieldName:
+ continue
+
+ isDesc = (direction == "desc")
+
+ def sortKey(record):
+ value = record.get(fieldName)
+ # Handle None values - place them at the end for both directions
+ if value is None:
+ # Use a special value that sorts last
+ return (1, "") # (is_none_flag, empty_value) - sorts after (0, ...)
+ else:
+ # Return tuple with type indicator for proper comparison
+ if isinstance(value, (int, float)):
+ return (0, value)
+ elif isinstance(value, str):
+ return (0, value)
+ elif isinstance(value, bool):
+ return (0, value)
+ else:
+ return (0, str(value))
+
+ # Sort with reverse parameter for descending
+ sortedRecords.sort(key=sortKey, reverse=isDesc)
+
+ return sortedRecords
# Utilities
@@ -208,10 +251,47 @@ class ChatObjects:
# Workflow methods
- def getWorkflows(self) -> List[Dict[str, Any]]:
- """Returns workflows based on user access level."""
+ def getWorkflows(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
+ """
+ Returns workflows based on user access level.
+ Supports optional pagination, sorting, and filtering.
+
+ Args:
+ pagination: Optional pagination parameters. If None, returns all items.
+
+ Returns:
+ If pagination is None: List[Dict[str, Any]]
+ If pagination is provided: PaginatedResult with items and metadata
+ """
allWorkflows = self.db.getRecordset(ChatWorkflow)
- return self._uam(ChatWorkflow, allWorkflows)
+ filteredWorkflows = self._uam(ChatWorkflow, allWorkflows)
+
+ # If no pagination requested, return all items
+ if pagination is None:
+ return filteredWorkflows
+
+ # Apply filtering (if filters provided)
+ if pagination.filters:
+ filteredWorkflows = self._applyFilters(filteredWorkflows, pagination.filters)
+
+ # Apply sorting (in order of sortFields)
+ if pagination.sort:
+ filteredWorkflows = self._applySorting(filteredWorkflows, pagination.sort)
+
+ # Count total items after filters
+ totalItems = len(filteredWorkflows)
+ totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
+
+ # Apply pagination (skip/limit)
+ startIdx = (pagination.page - 1) * pagination.pageSize
+ endIdx = startIdx + pagination.pageSize
+ pagedWorkflows = filteredWorkflows[startIdx:endIdx]
+
+ return PaginatedResult(
+ items=pagedWorkflows,
+ totalItems=totalItems,
+ totalPages=totalPages
+ )
def getWorkflow(self, workflowId: str) -> Optional[ChatWorkflow]:
"""Returns a workflow by ID if user has access."""
@@ -389,26 +469,118 @@ class ChatObjects:
# Message methods
- def getMessages(self, workflowId: str) -> List[ChatMessage]:
- """Returns messages for a workflow if user has access to the workflow."""
+ def getMessages(self, workflowId: str, pagination: Optional[PaginationParams] = None) -> Union[List[ChatMessage], PaginatedResult]:
+ """
+ Returns messages for a workflow if user has access to the workflow.
+ Supports optional pagination, sorting, and filtering.
+
+ Args:
+ workflowId: The workflow ID to get messages for
+ pagination: Optional pagination parameters. If None, returns all items.
+
+ Returns:
+ If pagination is None: List[ChatMessage]
+ If pagination is provided: PaginatedResult with items and metadata
+ """
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
- return []
+ if pagination is None:
+ return []
+ return PaginatedResult(items=[], totalItems=0, totalPages=0)
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
- return []
+ if pagination is None:
+ return []
+ return PaginatedResult(items=[], totalItems=0, totalPages=0)
# Get messages for this workflow from normalized table
messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
- # Sort messages by publishedAt timestamp to ensure chronological order
- messages.sort(key=lambda x: x.get("publishedAt", x.get("timestamp", "0")))
+ # Convert raw messages to dict format for sorting/filtering
+ messageDicts = []
+ for msg in messages:
+ messageDicts.append({
+ "id": msg.get("id"),
+ "workflowId": msg.get("workflowId"),
+ "parentMessageId": msg.get("parentMessageId"),
+ "documentsLabel": msg.get("documentsLabel"),
+ "message": msg.get("message"),
+ "role": msg.get("role", "assistant"),
+ "status": msg.get("status", "step"),
+ "sequenceNr": msg.get("sequenceNr", 0),
+ "publishedAt": msg.get("publishedAt", msg.get("timestamp", getUtcTimestamp())),
+ "success": msg.get("success"),
+ "actionId": msg.get("actionId"),
+ "actionMethod": msg.get("actionMethod"),
+ "actionName": msg.get("actionName"),
+ "roundNumber": msg.get("roundNumber"),
+ "taskNumber": msg.get("taskNumber"),
+ "actionNumber": msg.get("actionNumber"),
+ "taskProgress": msg.get("taskProgress"),
+ "actionProgress": msg.get("actionProgress")
+ })
+
+ # Apply default sorting by publishedAt if no sort specified
+ if pagination is None or not pagination.sort:
+ messageDicts.sort(key=lambda x: x.get("publishedAt", getUtcTimestamp()))
+
+ # Apply filtering (if filters provided)
+ if pagination and pagination.filters:
+ messageDicts = self._applyFilters(messageDicts, pagination.filters)
+
+ # Apply sorting (in order of sortFields)
+ if pagination and pagination.sort:
+ messageDicts = self._applySorting(messageDicts, pagination.sort)
+
+ # If no pagination requested, return all items
+ if pagination is None:
+ # Convert messages to ChatMessage objects and load documents
+ chat_messages = []
+ for msg in messageDicts:
+ # Load documents from normalized documents table
+ documents = self.getDocuments(msg["id"])
+
+ # Create ChatMessage object with loaded documents
+ chat_message = ChatMessage(
+ id=msg["id"],
+ workflowId=msg["workflowId"],
+ parentMessageId=msg.get("parentMessageId"),
+ documents=documents,
+ documentsLabel=msg.get("documentsLabel"),
+ message=msg.get("message"),
+ role=msg.get("role", "assistant"),
+ status=msg.get("status", "step"),
+ sequenceNr=msg.get("sequenceNr", 0),
+ publishedAt=msg.get("publishedAt", getUtcTimestamp()),
+ success=msg.get("success"),
+ actionId=msg.get("actionId"),
+ actionMethod=msg.get("actionMethod"),
+ actionName=msg.get("actionName"),
+ roundNumber=msg.get("roundNumber"),
+ taskNumber=msg.get("taskNumber"),
+ actionNumber=msg.get("actionNumber"),
+ taskProgress=msg.get("taskProgress"),
+ actionProgress=msg.get("actionProgress")
+ )
+
+ chat_messages.append(chat_message)
+
+ return chat_messages
+
+ # Count total items after filters
+ totalItems = len(messageDicts)
+ totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
+
+ # Apply pagination (skip/limit)
+ startIdx = (pagination.page - 1) * pagination.pageSize
+ endIdx = startIdx + pagination.pageSize
+ pagedMessageDicts = messageDicts[startIdx:endIdx]
# Convert messages to ChatMessage objects and load documents
chat_messages = []
- for msg in messages:
+ for msg in pagedMessageDicts:
# Load documents from normalized documents table
documents = self.getDocuments(msg["id"])
@@ -437,8 +609,11 @@ class ChatObjects:
chat_messages.append(chat_message)
-
- return chat_messages
+ return PaginatedResult(
+ items=chat_messages,
+ totalItems=totalItems,
+ totalPages=totalPages
+ )
def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
"""Creates a message for a workflow if user has access."""
@@ -765,24 +940,84 @@ class ChatObjects:
# Log methods
- def getLogs(self, workflowId: str) -> List[ChatLog]:
- """Returns logs for a workflow if user has access to the workflow."""
+ def getLogs(self, workflowId: str, pagination: Optional[PaginationParams] = None) -> Union[List[ChatLog], PaginatedResult]:
+ """
+ Returns logs for a workflow if user has access to the workflow.
+ Supports optional pagination, sorting, and filtering.
+
+ Args:
+ workflowId: The workflow ID to get logs for
+ pagination: Optional pagination parameters. If None, returns all items.
+
+ Returns:
+ If pagination is None: List[ChatLog]
+ If pagination is provided: PaginatedResult with items and metadata
+ """
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
- return []
+ if pagination is None:
+ return []
+ return PaginatedResult(items=[], totalItems=0, totalPages=0)
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
- return []
+ if pagination is None:
+ return []
+ return PaginatedResult(items=[], totalItems=0, totalPages=0)
# Get logs for this workflow from normalized table
logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
- # Sort logs by timestamp (Unix timestamps)
- logs.sort(key=lambda x: float(x.get("timestamp", 0)))
+ # Convert raw logs to dict format for sorting/filtering
+ logDicts = []
+ for log in logs:
+ logDicts.append({
+ "id": log.get("id"),
+ "workflowId": log.get("workflowId"),
+ "message": log.get("message"),
+ "type": log.get("type"),
+ "timestamp": log.get("timestamp", getUtcTimestamp()),
+ "agentName": log.get("agentName"),
+ "status": log.get("status"),
+ "progress": log.get("progress"),
+ "mandateId": log.get("mandateId"),
+ "userId": log.get("userId")
+ })
- return [ChatLog(**log) for log in logs]
+ # Apply default sorting by timestamp if no sort specified
+ if pagination is None or not pagination.sort:
+ logDicts.sort(key=lambda x: float(x.get("timestamp", 0)))
+
+ # Apply filtering (if filters provided)
+ if pagination and pagination.filters:
+ logDicts = self._applyFilters(logDicts, pagination.filters)
+
+ # Apply sorting (in order of sortFields)
+ if pagination and pagination.sort:
+ logDicts = self._applySorting(logDicts, pagination.sort)
+
+ # If no pagination requested, return all items
+ if pagination is None:
+ return [ChatLog(**log) for log in logDicts]
+
+ # Count total items after filters
+ totalItems = len(logDicts)
+ totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
+
+ # Apply pagination (skip/limit)
+ startIdx = (pagination.page - 1) * pagination.pageSize
+ endIdx = startIdx + pagination.pageSize
+ pagedLogDicts = logDicts[startIdx:endIdx]
+
+ # Convert to model objects
+ items = [ChatLog(**log) for log in pagedLogDicts]
+
+ return PaginatedResult(
+ items=items,
+ totalItems=totalItems,
+ totalPages=totalPages
+ )
def createLog(self, logData: Dict[str, Any]) -> ChatLog:
"""Creates a log entry for a workflow if user has access."""
diff --git a/modules/interfaces/interfaceDbComponentObjects.py b/modules/interfaces/interfaceDbComponentObjects.py
index 20e7aae1..fba237da 100644
--- a/modules/interfaces/interfaceDbComponentObjects.py
+++ b/modules/interfaces/interfaceDbComponentObjects.py
@@ -7,7 +7,8 @@ import os
import logging
import base64
import hashlib
-from typing import Dict, Any, List, Optional
+import math
+from typing import Dict, Any, List, Optional, Union
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.interfaces.interfaceDbComponentAccess import ComponentAccess
@@ -17,6 +18,7 @@ from modules.datamodels.datamodelVoice import VoiceSettings
from modules.datamodels.datamodelUam import User, Mandate
from modules.shared.configuration import APP_CONFIG
from modules.shared.timezoneUtils import getUtcTimestamp
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
logger = logging.getLogger(__name__)
@@ -244,6 +246,56 @@ class ComponentObjects:
"""Delegate to access control module."""
return self.access.canModify(model_class, recordId)
+ def _applyFilters(self, records: List[Dict[str, Any]], filters: Dict[str, Any]) -> List[Dict[str, Any]]:
+ """Apply filter criteria to records (implementation for future filtering)."""
+ # TODO: Implement filtering logic when needed
+ return records
+
+ def _applySorting(self, records: List[Dict[str, Any]], sortFields: List[Any]) -> List[Dict[str, Any]]:
+ """Apply multi-level sorting to records using stable sort (sorts from least to most significant field)."""
+ if not sortFields:
+ return records
+
+ # Start with a copy to avoid modifying original
+ sortedRecords = list(records)
+
+ # Sort from least significant to most significant field (reverse order)
+ # Python's sort is stable, so this creates proper multi-level sorting
+ for sortField in reversed(sortFields):
+ # Handle both dict and object formats
+ if isinstance(sortField, dict):
+ fieldName = sortField.get("field")
+ direction = sortField.get("direction", "asc")
+ else:
+ fieldName = getattr(sortField, "field", None)
+ direction = getattr(sortField, "direction", "asc")
+
+ if not fieldName:
+ continue
+
+ isDesc = (direction == "desc")
+
+ def sortKey(record):
+ value = record.get(fieldName)
+ # Handle None values - place them at the end for both directions
+ if value is None:
+ # Use a special value that sorts last
+ return (1, "") # (is_none_flag, empty_value) - sorts after (0, ...)
+ else:
+ # Return tuple with type indicator for proper comparison
+ if isinstance(value, (int, float)):
+ return (0, value)
+ elif isinstance(value, str):
+ return (0, value)
+ elif isinstance(value, bool):
+ return (0, value)
+ else:
+ return (0, str(value))
+
+ # Sort with reverse parameter for descending
+ sortedRecords.sort(key=sortKey, reverse=isDesc)
+
+ return sortedRecords
# Utilities
@@ -255,18 +307,57 @@ class ComponentObjects:
# Prompt methods
- def getAllPrompts(self) -> List[Prompt]:
- """Returns prompts based on user access level."""
+ def getAllPrompts(self, pagination: Optional[PaginationParams] = None) -> Union[List[Prompt], PaginatedResult]:
+ """
+ Returns prompts based on user access level.
+ Supports optional pagination, sorting, and filtering.
+
+ Args:
+ pagination: Optional pagination parameters. If None, returns all items.
+
+ Returns:
+ If pagination is None: List[Prompt]
+ If pagination is provided: PaginatedResult with items and metadata
+ """
try:
allPrompts = self.db.getRecordset(Prompt)
filteredPrompts = self._uam(Prompt, allPrompts)
- # Convert to Prompt objects
- return [Prompt(**prompt) for prompt in filteredPrompts]
+ # If no pagination requested, return all items
+ if pagination is None:
+ return [Prompt(**prompt) for prompt in filteredPrompts]
+
+ # Apply filtering (if filters provided)
+ if pagination.filters:
+ filteredPrompts = self._applyFilters(filteredPrompts, pagination.filters)
+
+ # Apply sorting (in order of sortFields)
+ if pagination.sort:
+ filteredPrompts = self._applySorting(filteredPrompts, pagination.sort)
+
+ # Count total items after filters
+ totalItems = len(filteredPrompts)
+ totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
+
+ # Apply pagination (skip/limit)
+ startIdx = (pagination.page - 1) * pagination.pageSize
+ endIdx = startIdx + pagination.pageSize
+ pagedPrompts = filteredPrompts[startIdx:endIdx]
+
+ # Convert to model objects
+ items = [Prompt(**prompt) for prompt in pagedPrompts]
+
+ return PaginatedResult(
+ items=items,
+ totalItems=totalItems,
+ totalPages=totalPages
+ )
except Exception as e:
logger.error(f"Error getting prompts: {str(e)}")
- return []
+ if pagination is None:
+ return []
+ return PaginatedResult(items=[], totalItems=0, totalPages=0)
def getPrompt(self, promptId: str) -> Optional[Prompt]:
"""Returns a prompt by ID if user has access."""
@@ -454,39 +545,79 @@ class ComponentObjects:
# File methods - metadata-based operations
- def getAllFiles(self) -> List[FileItem]:
- """Returns files based on user access level."""
+ def getAllFiles(self, pagination: Optional[PaginationParams] = None) -> Union[List[FileItem], PaginatedResult]:
+ """
+ Returns files based on user access level.
+ Supports optional pagination, sorting, and filtering.
+
+ Args:
+ pagination: Optional pagination parameters. If None, returns all items.
+
+ Returns:
+ If pagination is None: List[FileItem]
+ If pagination is provided: PaginatedResult with items and metadata
+ """
allFiles = self.db.getRecordset(FileItem)
filteredFiles = self._uam(FileItem, allFiles)
- # Convert database records to FileItem instances
- fileItems = []
- for file in filteredFiles:
- try:
- # Ensure proper values, use defaults for invalid data
- creationDate = file.get("creationDate")
- if creationDate is None or not isinstance(creationDate, (int, float)) or creationDate <= 0:
- creationDate = getUtcTimestamp()
-
- fileName = file.get("fileName")
- if not fileName or fileName == "None":
- continue # Skip records with invalid fileName
-
- fileItem = FileItem(
- id=file.get("id"),
- mandateId=file.get("mandateId"),
- fileName=fileName,
- mimeType=file.get("mimeType"),
- fileHash=file.get("fileHash"),
- fileSize=file.get("fileSize"),
- creationDate=creationDate
- )
- fileItems.append(fileItem)
- except Exception as e:
- logger.warning(f"Skipping invalid file record: {str(e)}")
- continue
-
- return fileItems
+ # Convert database records to FileItem instances (for both paginated and non-paginated)
+ def convertFileItems(files):
+ fileItems = []
+ for file in files:
+ try:
+ # Ensure proper values, use defaults for invalid data
+ creationDate = file.get("creationDate")
+ if creationDate is None or not isinstance(creationDate, (int, float)) or creationDate <= 0:
+ creationDate = getUtcTimestamp()
+
+ fileName = file.get("fileName")
+ if not fileName or fileName == "None":
+ continue # Skip records with invalid fileName
+
+ fileItem = FileItem(
+ id=file.get("id"),
+ mandateId=file.get("mandateId"),
+ fileName=fileName,
+ mimeType=file.get("mimeType"),
+ fileHash=file.get("fileHash"),
+ fileSize=file.get("fileSize"),
+ creationDate=creationDate
+ )
+ fileItems.append(fileItem)
+ except Exception as e:
+ logger.warning(f"Skipping invalid file record: {str(e)}")
+ continue
+ return fileItems
+
+ # If no pagination requested, return all items
+ if pagination is None:
+ return convertFileItems(filteredFiles)
+
+ # Apply filtering (if filters provided)
+ if pagination.filters:
+ filteredFiles = self._applyFilters(filteredFiles, pagination.filters)
+
+ # Apply sorting (in order of sortFields)
+ if pagination.sort:
+ filteredFiles = self._applySorting(filteredFiles, pagination.sort)
+
+ # Count total items after filters
+ totalItems = len(filteredFiles)
+ totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
+
+ # Apply pagination (skip/limit)
+ startIdx = (pagination.page - 1) * pagination.pageSize
+ endIdx = startIdx + pagination.pageSize
+ pagedFiles = filteredFiles[startIdx:endIdx]
+
+ # Convert to model objects
+ items = convertFileItems(pagedFiles)
+
+ return PaginatedResult(
+ items=items,
+ totalItems=totalItems,
+ totalPages=totalPages
+ )
def getFile(self, fileId: str) -> Optional[FileItem]:
"""Returns a file by ID if user has access."""
diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py
index 0facb8f6..7c0f60c0 100644
--- a/modules/routes/routeDataFiles.py
+++ b/modules/routes/routeDataFiles.py
@@ -2,6 +2,7 @@ from fastapi import APIRouter, HTTPException, Depends, File, UploadFile, Form, P
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Optional
import logging
+import json
# Import auth module
from modules.security.auth import limiter, getCurrentUser
@@ -11,6 +12,7 @@ import modules.interfaces.interfaceDbComponentObjects as interfaceDbComponentObj
from modules.datamodels.datamodelFiles import FileItem, FilePreview
from modules.shared.attributeUtils import getModelAttributeDefinitions
from modules.datamodels.datamodelUam import User
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
logger = logging.getLogger(__name__)
@@ -31,21 +33,61 @@ router = APIRouter(
}
)
-@router.get("/list", response_model=List[FileItem])
+@router.get("/list", response_model=PaginatedResponse[FileItem])
@limiter.limit("30/minute")
async def get_files(
request: Request,
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
-) -> List[FileItem]:
- """Get all files"""
+) -> PaginatedResponse[FileItem]:
+ """
+ Get files with optional pagination, sorting, and filtering.
+
+ Query Parameters:
+ - pagination: JSON-encoded PaginationParams object, or None for no pagination
+
+ Examples:
+ - GET /api/files/list (no pagination - returns all items)
+ - GET /api/files/list?pagination={"page":1,"pageSize":10,"sort":[]}
+ - GET /api/files/list?pagination={"page":2,"pageSize":20,"sort":[{"field":"fileName","direction":"asc"}]}
+ """
try:
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ paginationParams = PaginationParams(**paginationDict) if paginationDict else None
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
managementInterface = interfaceDbComponentObjects.getInterface(currentUser)
+ result = managementInterface.getAllFiles(pagination=paginationParams)
- # Get all files generically - only metadata, no binary data
- files = managementInterface.getAllFiles()
-
- # Return files directly since they are already FileItem objects
- return files
+ # If pagination was requested, result is PaginatedResult
+ # If no pagination, result is List[FileItem]
+ if paginationParams:
+ return PaginatedResponse(
+ items=result.items,
+ pagination=PaginationMetadata(
+ currentPage=paginationParams.page,
+ pageSize=paginationParams.pageSize,
+ totalItems=result.totalItems,
+ totalPages=result.totalPages,
+ sort=paginationParams.sort,
+ filters=paginationParams.filters
+ )
+ )
+ else:
+ return PaginatedResponse(
+ items=result,
+ pagination=None
+ )
+ except HTTPException:
+ raise
except Exception as e:
logger.error(f"Error getting files: {str(e)}")
raise HTTPException(
diff --git a/modules/routes/routeDataMandates.py b/modules/routes/routeDataMandates.py
index 97c4477e..abdcc4e6 100644
--- a/modules/routes/routeDataMandates.py
+++ b/modules/routes/routeDataMandates.py
@@ -3,10 +3,11 @@ Mandate routes for the backend API.
Implements the endpoints for mandate management.
"""
-from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response
-from typing import List, Dict, Any
+from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
+from typing import List, Dict, Any, Optional
from fastapi import status
import logging
+import json
# Import auth module
from modules.security.auth import limiter, getCurrentUser
@@ -17,6 +18,7 @@ from modules.shared.attributeUtils import getModelAttributeDefinitions
# Import the model classes
from modules.datamodels.datamodelUam import Mandate, User
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
logger = logging.getLogger(__name__)
@@ -31,17 +33,60 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
-@router.get("/", response_model=List[Mandate])
+@router.get("/", response_model=PaginatedResponse[Mandate])
@limiter.limit("30/minute")
async def get_mandates(
request: Request,
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
-) -> List[Mandate]:
- """Get all mandates"""
+) -> PaginatedResponse[Mandate]:
+ """
+ Get mandates with optional pagination, sorting, and filtering.
+
+ Query Parameters:
+ - pagination: JSON-encoded PaginationParams object, or None for no pagination
+
+ Examples:
+ - GET /api/mandates/ (no pagination - returns all items)
+ - GET /api/mandates/?pagination={"page":1,"pageSize":10,"sort":[]}
+ """
try:
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ paginationParams = PaginationParams(**paginationDict) if paginationDict else None
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
appInterface = interfaceDbAppObjects.getInterface(currentUser)
- mandates = appInterface.getAllMandates()
- return mandates
+ result = appInterface.getAllMandates(pagination=paginationParams)
+
+ # If pagination was requested, result is PaginatedResult
+ # If no pagination, result is List[Mandate]
+ if paginationParams:
+ return PaginatedResponse(
+ items=result.items,
+ pagination=PaginationMetadata(
+ currentPage=paginationParams.page,
+ pageSize=paginationParams.pageSize,
+ totalItems=result.totalItems,
+ totalPages=result.totalPages,
+ sort=paginationParams.sort,
+ filters=paginationParams.filters
+ )
+ )
+ else:
+ return PaginatedResponse(
+ items=result,
+ pagination=None
+ )
+ except HTTPException:
+ raise
except Exception as e:
logger.error(f"Error getting mandates: {str(e)}")
raise HTTPException(
diff --git a/modules/routes/routeDataPrompts.py b/modules/routes/routeDataPrompts.py
index 19b28798..48e194cf 100644
--- a/modules/routes/routeDataPrompts.py
+++ b/modules/routes/routeDataPrompts.py
@@ -1,7 +1,8 @@
-from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request
-from typing import List, Dict, Any
+from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Query
+from typing import List, Dict, Any, Optional
from fastapi import status
import logging
+import json
# Import auth module
from modules.security.auth import limiter, getCurrentUser
@@ -10,6 +11,7 @@ from modules.security.auth import limiter, getCurrentUser
import modules.interfaces.interfaceDbComponentObjects as interfaceDbComponentObjects
from modules.datamodels.datamodelUtils import Prompt
from modules.datamodels.datamodelUam import User
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
logger = logging.getLogger(__name__)
@@ -21,16 +23,58 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
-@router.get("", response_model=List[Prompt])
+@router.get("", response_model=PaginatedResponse[Prompt])
@limiter.limit("30/minute")
async def get_prompts(
request: Request,
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
-) -> List[Prompt]:
- """Get all prompts"""
+) -> PaginatedResponse[Prompt]:
+ """
+ Get prompts with optional pagination, sorting, and filtering.
+
+ Query Parameters:
+ - pagination: JSON-encoded PaginationParams object, or None for no pagination
+
+ Examples:
+ - GET /api/prompts (no pagination - returns all items)
+ - GET /api/prompts?pagination={"page":1,"pageSize":10,"sort":[]}
+ - GET /api/prompts?pagination={"page":2,"pageSize":20,"sort":[{"field":"name","direction":"asc"}]}
+ """
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ paginationParams = PaginationParams(**paginationDict) if paginationDict else None
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
managementInterface = interfaceDbComponentObjects.getInterface(currentUser)
- prompts = managementInterface.getAllPrompts()
- return prompts
+ result = managementInterface.getAllPrompts(pagination=paginationParams)
+
+ # If pagination was requested, result is PaginatedResult
+ # If no pagination, result is List[Prompt]
+ if paginationParams:
+ return PaginatedResponse(
+ items=result.items,
+ pagination=PaginationMetadata(
+ currentPage=paginationParams.page,
+ pageSize=paginationParams.pageSize,
+ totalItems=result.totalItems,
+ totalPages=result.totalPages,
+ sort=paginationParams.sort,
+ filters=paginationParams.filters
+ )
+ )
+ else:
+ return PaginatedResponse(
+ items=result,
+ pagination=None
+ )
@router.post("", response_model=Prompt)
@limiter.limit("10/minute")
diff --git a/modules/routes/routeDataUsers.py b/modules/routes/routeDataUsers.py
index 7b7e627f..b97ed721 100644
--- a/modules/routes/routeDataUsers.py
+++ b/modules/routes/routeDataUsers.py
@@ -3,10 +3,11 @@ User routes for the backend API.
Implements the endpoints for user management.
"""
-from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response
+from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
+import json
# Import interfaces and models
import modules.interfaces.interfaceDbAppObjects as interfaceDbAppObjects
@@ -14,6 +15,7 @@ from modules.security.auth import getCurrentUser, limiter, getCurrentUser
# Import the attribute definition and helper functions
from modules.datamodels.datamodelUam import User, UserPrivilege
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
logger = logging.getLogger(__name__)
@@ -24,21 +26,65 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
-@router.get("/", response_model=List[User])
+@router.get("/", response_model=PaginatedResponse[User])
@limiter.limit("30/minute")
async def get_users(
request: Request,
- mandateId: Optional[str] = None,
+ mandateId: Optional[str] = Query(None, description="Mandate ID to filter users"),
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
-) -> List[User]:
- """Get all users in the current mandate"""
+) -> PaginatedResponse[User]:
+ """
+ Get users with optional pagination, sorting, and filtering.
+
+ Query Parameters:
+ - mandateId: Optional mandate ID to filter users
+ - pagination: JSON-encoded PaginationParams object, or None for no pagination
+
+ Examples:
+ - GET /api/users/ (no pagination - returns all users)
+ - GET /api/users/?pagination={"page":1,"pageSize":10,"sort":[]}
+ """
try:
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ paginationParams = PaginationParams(**paginationDict) if paginationDict else None
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
appInterface = interfaceDbAppObjects.getInterface(currentUser)
# If mandateId is provided, use it, otherwise use the current user's mandate
targetMandateId = mandateId or currentUser.mandateId
- # Get all users without filtering by enabled status
- users = appInterface.getUsersByMandate(targetMandateId)
- return users
+ # Get users with optional pagination
+ result = appInterface.getUsersByMandate(targetMandateId, pagination=paginationParams)
+
+ # If pagination was requested, result is PaginatedResult
+ # If no pagination, result is List[User]
+ if paginationParams:
+ return PaginatedResponse(
+ items=result.items,
+ pagination=PaginationMetadata(
+ currentPage=paginationParams.page,
+ pageSize=paginationParams.pageSize,
+ totalItems=result.totalItems,
+ totalPages=result.totalPages,
+ sort=paginationParams.sort,
+ filters=paginationParams.filters
+ )
+ )
+ else:
+ return PaginatedResponse(
+ items=result,
+ pagination=None
+ )
+ except HTTPException:
+ raise
except Exception as e:
logger.error(f"Error getting users: {str(e)}")
raise HTTPException(
diff --git a/modules/routes/routeVoiceGoogle.py b/modules/routes/routeVoiceGoogle.py
index 0e1b009f..7f33b19c 100644
--- a/modules/routes/routeVoiceGoogle.py
+++ b/modules/routes/routeVoiceGoogle.py
@@ -325,9 +325,17 @@ async def get_available_languages(currentUser: User = Depends(getCurrentUser)):
@router.get("/voices")
async def get_available_voices(
languageCode: Optional[str] = None,
+ language_code: Optional[str] = None, # Accept both camelCase and snake_case
currentUser: User = Depends(getCurrentUser)
):
- """Get available voices from Google Cloud Text-to-Speech."""
+ """
+ Get available voices from Google Cloud Text-to-Speech.
+ Accepts languageCode (camelCase) or language_code (snake_case) query parameter.
+ """
+ # Use language_code if provided (frontend sends this), otherwise use languageCode
+ if language_code:
+ languageCode = language_code
+
try:
logger.info(f"🎤 Getting available voices, language filter: {languageCode}")
diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py
index b3ff1a87..ea52a067 100644
--- a/modules/routes/routeWorkflows.py
+++ b/modules/routes/routeWorkflows.py
@@ -4,6 +4,7 @@ Implements the endpoints for workflow management according to the state machine.
"""
import logging
+import json
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Response, status, Request
@@ -24,6 +25,7 @@ from modules.datamodels.datamodelChat import (
)
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse
from modules.datamodels.datamodelUam import User
+from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
@@ -43,18 +45,51 @@ def getServiceChat(currentUser: User):
return interfaceDbChatObjects.getInterface(currentUser)
# Consolidated endpoint for getting all workflows
-@router.get("/", response_model=List[ChatWorkflow])
+@router.get("/", response_model=PaginatedResponse[ChatWorkflow])
@limiter.limit("120/minute")
async def get_workflows(
request: Request,
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
-) -> List[ChatWorkflow]:
- """Get all workflows for the current user."""
+) -> PaginatedResponse[ChatWorkflow]:
+ """
+ Get workflows with optional pagination, sorting, and filtering.
+
+ Query Parameters:
+ - pagination: JSON-encoded PaginationParams object, or None for no pagination
+
+ Examples:
+ - GET /api/workflows/ (no pagination - returns all workflows)
+ - GET /api/workflows/?pagination={"page":1,"pageSize":10,"sort":[]}
+ """
try:
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ paginationParams = PaginationParams(**paginationDict) if paginationDict else None
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
appInterface = getInterface(currentUser)
- workflows_data = appInterface.getWorkflows()
+ result = appInterface.getWorkflows(pagination=paginationParams)
# Convert raw dictionaries to ChatWorkflow objects by loading each workflow properly
+ # If pagination was requested, result is PaginatedResult with items as dicts
+ # If no pagination, result is List[Dict]
+ if paginationParams:
+ workflows_data = result.items
+ totalItems = result.totalItems
+ totalPages = result.totalPages
+ else:
+ workflows_data = result
+ totalItems = len(result)
+ totalPages = 1
+
workflows = []
for workflow_data in workflows_data:
try:
@@ -67,7 +102,25 @@ async def get_workflows(
# Skip invalid workflows instead of failing the entire request
continue
- return workflows
+ if paginationParams:
+ return PaginatedResponse(
+ items=workflows,
+ pagination=PaginationMetadata(
+ currentPage=paginationParams.page,
+ pageSize=paginationParams.pageSize,
+ totalItems=totalItems,
+ totalPages=totalPages,
+ sort=paginationParams.sort,
+ filters=paginationParams.filters
+ )
+ )
+ else:
+ return PaginatedResponse(
+ items=workflows,
+ pagination=None
+ )
+ except HTTPException:
+ raise
except Exception as e:
logger.error(f"Error getting workflows: {str(e)}")
raise HTTPException(
@@ -185,16 +238,36 @@ async def get_workflow_status(
)
# API Endpoint for workflow logs with selective data transfer
-@router.get("/{workflowId}/logs", response_model=List[ChatLog])
+@router.get("/{workflowId}/logs", response_model=PaginatedResponse[ChatLog])
@limiter.limit("120/minute")
async def get_workflow_logs(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
- logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs"),
+ logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs (legacy selective data transfer)"),
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
-) -> List[ChatLog]:
- """Get logs for a workflow with support for selective data transfer."""
+) -> PaginatedResponse[ChatLog]:
+ """
+ Get logs for a workflow with optional pagination, sorting, and filtering.
+ Also supports legacy selective data transfer via logId parameter.
+
+ Query Parameters:
+ - logId: Optional log ID for selective data transfer (returns only logs after this ID)
+ - pagination: JSON-encoded PaginationParams object, or None for no pagination
+ """
try:
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ paginationParams = PaginationParams(**paginationDict) if paginationDict else None
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
# Get service center
interfaceDbChat = getServiceChat(currentUser)
@@ -206,18 +279,43 @@ async def get_workflow_logs(
detail=f"Workflow with ID {workflowId} not found"
)
- # Get all logs
- allLogs = interfaceDbChat.getLogs(workflowId)
+ # Get logs with optional pagination
+ result = interfaceDbChat.getLogs(workflowId, pagination=paginationParams)
- # Apply selective data transfer if logId is provided
+ # Handle legacy selective data transfer if logId is provided (takes precedence over pagination)
if logId:
+ # If pagination was requested, result is PaginatedResult, otherwise List[ChatLog]
+ allLogs = result.items if paginationParams else result
+
# Find the index of the log with the given ID
logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
if logIndex >= 0:
# Return only logs after the specified log
- return allLogs[logIndex + 1:]
+ filteredLogs = allLogs[logIndex + 1:]
+ return PaginatedResponse(
+ items=filteredLogs,
+ pagination=None
+ )
- return allLogs
+ # If pagination was requested, result is PaginatedResult
+ # If no pagination, result is List[ChatLog]
+ if paginationParams:
+ return PaginatedResponse(
+ items=result.items,
+ pagination=PaginationMetadata(
+ currentPage=paginationParams.page,
+ pageSize=paginationParams.pageSize,
+ totalItems=result.totalItems,
+ totalPages=result.totalPages,
+ sort=paginationParams.sort,
+ filters=paginationParams.filters
+ )
+ )
+ else:
+ return PaginatedResponse(
+ items=result,
+ pagination=None
+ )
except HTTPException:
raise
except Exception as e:
@@ -228,16 +326,36 @@ async def get_workflow_logs(
)
# API Endpoint for workflow messages with selective data transfer
-@router.get("/{workflowId}/messages", response_model=List[ChatMessage])
+@router.get("/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage])
@limiter.limit("120/minute")
async def get_workflow_messages(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
- messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages"),
+ messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages (legacy selective data transfer)"),
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
-) -> List[ChatMessage]:
- """Get messages for a workflow with support for selective data transfer."""
+) -> PaginatedResponse[ChatMessage]:
+ """
+ Get messages for a workflow with optional pagination, sorting, and filtering.
+ Also supports legacy selective data transfer via messageId parameter.
+
+ Query Parameters:
+ - messageId: Optional message ID for selective data transfer (returns only messages after this ID)
+ - pagination: JSON-encoded PaginationParams object, or None for no pagination
+ """
try:
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ paginationParams = PaginationParams(**paginationDict) if paginationDict else None
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
# Get service center
interfaceDbChat = getServiceChat(currentUser)
@@ -249,19 +367,43 @@ async def get_workflow_messages(
detail=f"Workflow with ID {workflowId} not found"
)
- # Get all messages
- allMessages = interfaceDbChat.getMessages(workflowId)
+ # Get messages with optional pagination
+ result = interfaceDbChat.getMessages(workflowId, pagination=paginationParams)
- # Apply selective data transfer if messageId is provided
+ # Handle legacy selective data transfer if messageId is provided (takes precedence over pagination)
if messageId:
+ # If pagination was requested, result is PaginatedResult, otherwise List[ChatMessage]
+ allMessages = result.items if paginationParams else result
+
# Find the index of the message with the given ID
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
# Return only messages after the specified message
filteredMessages = allMessages[messageIndex + 1:]
- return filteredMessages
+ return PaginatedResponse(
+ items=filteredMessages,
+ pagination=None
+ )
- return allMessages
+ # If pagination was requested, result is PaginatedResult
+ # If no pagination, result is List[ChatMessage]
+ if paginationParams:
+ return PaginatedResponse(
+ items=result.items,
+ pagination=PaginationMetadata(
+ currentPage=paginationParams.page,
+ pageSize=paginationParams.pageSize,
+ totalItems=result.totalItems,
+ totalPages=result.totalPages,
+ sort=paginationParams.sort,
+ filters=paginationParams.filters
+ )
+ )
+ else:
+ return PaginatedResponse(
+ items=result,
+ pagination=None
+ )
except HTTPException:
raise
except Exception as e:
diff --git a/modules/shared/attributeUtils.py b/modules/shared/attributeUtils.py
index 90f3a766..1dc58779 100644
--- a/modules/shared/attributeUtils.py
+++ b/modules/shared/attributeUtils.py
@@ -7,6 +7,9 @@ from typing import Dict, Any, List, Type, Optional
import inspect
import importlib
import os
+import logging
+
+logger = logging.getLogger(__name__)
# Define the AttributeDefinition class here instead of importing it
@@ -107,7 +110,9 @@ def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguag
fields = modelClass.model_fields
for name, field in fields.items():
# Extract frontend metadata from field info
- field_info = field.field_info if hasattr(field, "field_info") else None
+ # In Pydantic v2, the field from model_fields.items() IS the FieldInfo object
+ # FieldInfo objects have the 'extra' dict directly on them
+ field_info = field
# Check both direct attributes and extra field for frontend metadata
frontend_type = None
frontend_readonly = False
@@ -115,7 +120,7 @@ def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguag
frontend_options = None
if field_info:
- # Try direct attributes first
+ # Try direct attributes first (though these won't exist for custom kwargs)
frontend_type = getattr(field_info, "frontend_type", None)
frontend_readonly = getattr(field_info, "frontend_readonly", False)
frontend_required = getattr(
@@ -123,22 +128,43 @@ def getModelAttributeDefinitions(modelClass: Type[BaseModel] = None, userLanguag
)
frontend_options = getattr(field_info, "frontend_options", None)
- # If not found, check extra field
- if hasattr(field_info, "extra") and field_info.extra:
- if frontend_type is None:
- frontend_type = field_info.extra.get("frontend_type")
+ # If not found, check json_schema_extra (Pydantic v2 stores custom kwargs here)
+ if frontend_type is None and hasattr(field_info, "json_schema_extra"):
+ json_extra = field_info.json_schema_extra
+ if isinstance(json_extra, dict):
+ frontend_type = json_extra.get("frontend_type")
+ elif callable(json_extra):
+ # If it's a callable, we can't easily extract from it, skip
+ pass
+
+ # Check extra field (Pydantic v2 stores custom kwargs here)
+ # This is the main location where frontend_type is stored
+ if hasattr(field_info, "extra"):
+ extra_dict = field_info.extra
+ if isinstance(extra_dict, dict):
+ if frontend_type is None:
+ frontend_type = extra_dict.get("frontend_type")
+
+ # Also extract other fields from extra if it exists
+ if hasattr(field_info, "extra") and isinstance(field_info.extra, dict):
+ extra_dict = field_info.extra
if not frontend_readonly:
- frontend_readonly = field_info.extra.get(
- "frontend_readonly", False
- )
- if (
- frontend_required == field.is_required()
- ): # Only override if we didn't get it from direct attribute
- frontend_required = field_info.extra.get(
- "frontend_required", frontend_required
- )
+ frontend_readonly = extra_dict.get("frontend_readonly", False)
+ if frontend_required == field.is_required():
+ frontend_required = extra_dict.get("frontend_required", frontend_required)
if frontend_options is None:
- frontend_options = field_info.extra.get("frontend_options")
+ frontend_options = extra_dict.get("frontend_options")
+
+ # Also check json_schema_extra for other fields
+ if hasattr(field_info, "json_schema_extra"):
+ json_extra = field_info.json_schema_extra
+ if isinstance(json_extra, dict):
+ if not frontend_readonly and "frontend_readonly" in json_extra:
+ frontend_readonly = json_extra.get("frontend_readonly", False)
+ if frontend_required == field.is_required() and "frontend_required" in json_extra:
+ frontend_required = json_extra.get("frontend_required", frontend_required)
+ if frontend_options is None and "frontend_options" in json_extra:
+ frontend_options = json_extra.get("frontend_options")
# Use frontend type if available, otherwise fall back to Python type
field_type = (