refactored ui
This commit is contained in:
parent
fc662fbe59
commit
33c9ec1830
14 changed files with 1494 additions and 92 deletions
|
|
@ -156,6 +156,11 @@ class DatabaseConnector:
|
||||||
# Ensure recordId is a string
|
# Ensure recordId is a string
|
||||||
recordId = str(recordId)
|
recordId = str(recordId)
|
||||||
|
|
||||||
|
# CRITICAL: Ensure record ID matches the file name
|
||||||
|
if "id" in record and str(record["id"]) != recordId:
|
||||||
|
logger.error(f"Record ID mismatch: file name ID ({recordId}) does not match record ID ({record['id']})")
|
||||||
|
raise ValueError(f"Record ID mismatch: file name ID ({recordId}) does not match record ID ({record['id']})")
|
||||||
|
|
||||||
# Add metadata
|
# Add metadata
|
||||||
currentTime = datetime.now().isoformat()
|
currentTime = datetime.now().isoformat()
|
||||||
if "_createdAt" not in record:
|
if "_createdAt" not in record:
|
||||||
|
|
@ -454,6 +459,11 @@ class DatabaseConnector:
|
||||||
if isinstance(record, BaseModel):
|
if isinstance(record, BaseModel):
|
||||||
record = to_dict(record)
|
record = to_dict(record)
|
||||||
|
|
||||||
|
# CRITICAL: Ensure we never modify the ID
|
||||||
|
if "id" in record and str(record["id"]) != recordId:
|
||||||
|
logger.error(f"Attempted to modify record ID from {recordId} to {record['id']}")
|
||||||
|
raise ValueError("Cannot modify record ID - it must match the file name")
|
||||||
|
|
||||||
# Update existing record with new data
|
# Update existing record with new data
|
||||||
existingRecord.update(record)
|
existingRecord.update(record)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,10 +2,14 @@
|
||||||
Access control for the Application.
|
Access control for the Application.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
from typing import Dict, Any, List, Optional
|
from typing import Dict, Any, List, Optional
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from modules.interfaces.serviceAppModel import UserPrivilege, Session, User
|
from modules.interfaces.serviceAppModel import UserPrivilege, Session, User
|
||||||
|
|
||||||
|
# Configure logger
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class AppAccess:
|
class AppAccess:
|
||||||
"""
|
"""
|
||||||
Access control class for Application interface.
|
Access control class for Application interface.
|
||||||
|
|
@ -38,14 +42,31 @@ class AppAccess:
|
||||||
"""
|
"""
|
||||||
filtered_records = []
|
filtered_records = []
|
||||||
|
|
||||||
# Apply filtering based on privilege
|
# Only SYSADMIN can see mandates
|
||||||
if self.privilege == UserPrivilege.SYSADMIN:
|
if table == "mandates":
|
||||||
filtered_records = recordset # System admins see all records
|
if self.privilege == UserPrivilege.SYSADMIN:
|
||||||
|
filtered_records = recordset
|
||||||
|
else:
|
||||||
|
filtered_records = []
|
||||||
|
# Special handling for users table
|
||||||
|
elif table == "users":
|
||||||
|
if self.privilege == UserPrivilege.SYSADMIN:
|
||||||
|
# SysAdmin sees all users
|
||||||
|
filtered_records = recordset
|
||||||
|
elif self.privilege == UserPrivilege.ADMIN:
|
||||||
|
# Admin sees all users in their mandate
|
||||||
|
filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId]
|
||||||
|
else:
|
||||||
|
# Regular users only see themselves
|
||||||
|
filtered_records = [r for r in recordset if r.get("id") == self.userId]
|
||||||
|
# System admins see all other records
|
||||||
|
elif self.privilege == UserPrivilege.SYSADMIN:
|
||||||
|
filtered_records = recordset
|
||||||
|
# For other records, admins see records in their mandate
|
||||||
elif self.privilege == UserPrivilege.ADMIN:
|
elif self.privilege == UserPrivilege.ADMIN:
|
||||||
# Admins see records in their mandate
|
|
||||||
filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId]
|
filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId]
|
||||||
else: # Regular users
|
# Regular users only see records they own within their mandate
|
||||||
# Users only see records they own within their mandate
|
else:
|
||||||
filtered_records = [r for r in recordset
|
filtered_records = [r for r in recordset
|
||||||
if r.get("mandateId","-") == self.mandateId and r.get("createdBy") == self.userId]
|
if r.get("mandateId","-") == self.mandateId and r.get("createdBy") == self.userId]
|
||||||
|
|
||||||
|
|
@ -55,13 +76,23 @@ class AppAccess:
|
||||||
|
|
||||||
# Set access control flags based on user permissions
|
# Set access control flags based on user permissions
|
||||||
if table == "mandates":
|
if table == "mandates":
|
||||||
record["_hideView"] = False # Everyone can view
|
record["_hideView"] = False # SYSADMIN can view
|
||||||
record["_hideEdit"] = not self.canModify("mandates", record_id)
|
record["_hideEdit"] = not self.canModify("mandates", record_id)
|
||||||
record["_hideDelete"] = not self.canModify("mandates", record_id)
|
record["_hideDelete"] = not self.canModify("mandates", record_id)
|
||||||
elif table == "users":
|
elif table == "users":
|
||||||
record["_hideView"] = False # Everyone can view
|
record["_hideView"] = False # Everyone can view users they have access to
|
||||||
record["_hideEdit"] = not self.canModify("users", record_id)
|
# SysAdmin can edit/delete any user
|
||||||
record["_hideDelete"] = not self.canModify("users", record_id)
|
if self.privilege == UserPrivilege.SYSADMIN:
|
||||||
|
record["_hideEdit"] = False
|
||||||
|
record["_hideDelete"] = False
|
||||||
|
# Admin can edit/delete users in their mandate
|
||||||
|
elif self.privilege == UserPrivilege.ADMIN:
|
||||||
|
record["_hideEdit"] = record.get("mandateId","-") != self.mandateId
|
||||||
|
record["_hideDelete"] = record.get("mandateId","-") != self.mandateId
|
||||||
|
# Regular users can only edit themselves
|
||||||
|
else:
|
||||||
|
record["_hideEdit"] = record.get("id") != self.userId
|
||||||
|
record["_hideDelete"] = True # Regular users cannot delete users
|
||||||
elif table == "sessions":
|
elif table == "sessions":
|
||||||
# Only show sessions for the current user or if admin
|
# Only show sessions for the current user or if admin
|
||||||
if self.privilege in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]:
|
if self.privilege in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]:
|
||||||
|
|
@ -97,7 +128,11 @@ class AppAccess:
|
||||||
Returns:
|
Returns:
|
||||||
Boolean indicating permission
|
Boolean indicating permission
|
||||||
"""
|
"""
|
||||||
# System admins can modify anything
|
# For mandates, only SYSADMIN can modify
|
||||||
|
if table == "mandates":
|
||||||
|
return self.privilege == UserPrivilege.SYSADMIN
|
||||||
|
|
||||||
|
# System admins can modify anything else
|
||||||
if self.privilege == UserPrivilege.SYSADMIN:
|
if self.privilege == UserPrivilege.SYSADMIN:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
@ -112,9 +147,6 @@ class AppAccess:
|
||||||
|
|
||||||
# Admins can modify anything in their mandate
|
# Admins can modify anything in their mandate
|
||||||
if self.privilege == UserPrivilege.ADMIN and record.get("mandateId","-") == self.mandateId:
|
if self.privilege == UserPrivilege.ADMIN and record.get("mandateId","-") == self.mandateId:
|
||||||
# Exception: Can't modify Root mandate unless you are a sysadmin
|
|
||||||
if table == "mandates" and record.get("initialid") and self.privilege != UserPrivilege.SYSADMIN:
|
|
||||||
return False
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Users can only modify their own records
|
# Users can only modify their own records
|
||||||
|
|
@ -130,8 +162,6 @@ class AppAccess:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Regular users can create most entities
|
# Regular users can create most entities
|
||||||
if table == "mandates":
|
|
||||||
return False # Regular users can't create mandates
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def validateSession(self, sessionId: str) -> bool:
|
def validateSession(self, sessionId: str) -> bool:
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,8 @@ class GatewayInterface:
|
||||||
logger.info("Creating Root mandate")
|
logger.info("Creating Root mandate")
|
||||||
rootMandate = Mandate(
|
rootMandate = Mandate(
|
||||||
name="Root",
|
name="Root",
|
||||||
language="en"
|
language="en",
|
||||||
|
enabled=True
|
||||||
)
|
)
|
||||||
createdMandate = self.db.recordCreate("mandates", rootMandate.to_dict())
|
createdMandate = self.db.recordCreate("mandates", rootMandate.to_dict())
|
||||||
logger.info(f"Root mandate created with ID {createdMandate['id']}")
|
logger.info(f"Root mandate created with ID {createdMandate['id']}")
|
||||||
|
|
@ -140,10 +141,10 @@ class GatewayInterface:
|
||||||
username="admin",
|
username="admin",
|
||||||
email="admin@example.com",
|
email="admin@example.com",
|
||||||
fullName="Administrator",
|
fullName="Administrator",
|
||||||
disabled=False,
|
enabled=True,
|
||||||
language="en",
|
language="en",
|
||||||
privilege=UserPrivilege.SYSADMIN,
|
privilege=UserPrivilege.SYSADMIN,
|
||||||
authenticationAuthority=AuthAuthority.LOCAL,
|
authenticationAuthority="local", # Using lowercase value directly
|
||||||
hashedPassword=self._getPasswordHash("The 1st Poweron Admin"), # Use a secure password in production!
|
hashedPassword=self._getPasswordHash("The 1st Poweron Admin"), # Use a secure password in production!
|
||||||
connections=[]
|
connections=[]
|
||||||
)
|
)
|
||||||
|
|
@ -323,8 +324,8 @@ class GatewayInterface:
|
||||||
if not user:
|
if not user:
|
||||||
raise ValueError("User not found")
|
raise ValueError("User not found")
|
||||||
|
|
||||||
# Check if the user is disabled
|
# Check if the user is enabled
|
||||||
if user.disabled:
|
if not user.enabled:
|
||||||
raise ValueError("User is disabled")
|
raise ValueError("User is disabled")
|
||||||
|
|
||||||
# Verify that the user has local authentication enabled
|
# Verify that the user has local authentication enabled
|
||||||
|
|
@ -342,7 +343,7 @@ class GatewayInterface:
|
||||||
return user
|
return user
|
||||||
|
|
||||||
def createUser(self, username: str, password: str = None, email: str = None,
|
def createUser(self, username: str, password: str = None, email: str = None,
|
||||||
fullName: str = None, language: str = "en", disabled: bool = False,
|
fullName: str = None, language: str = "en", enabled: bool = True,
|
||||||
privilege: UserPrivilege = UserPrivilege.USER,
|
privilege: UserPrivilege = UserPrivilege.USER,
|
||||||
authenticationAuthority: AuthAuthority = AuthAuthority.LOCAL,
|
authenticationAuthority: AuthAuthority = AuthAuthority.LOCAL,
|
||||||
externalId: str = None, externalUsername: str = None,
|
externalId: str = None, externalUsername: str = None,
|
||||||
|
|
@ -368,7 +369,7 @@ class GatewayInterface:
|
||||||
fullName=fullName,
|
fullName=fullName,
|
||||||
language=language,
|
language=language,
|
||||||
mandateId=self.mandateId,
|
mandateId=self.mandateId,
|
||||||
disabled=disabled,
|
enabled=enabled,
|
||||||
privilege=privilege,
|
privilege=privilege,
|
||||||
authenticationAuthority=authenticationAuthority,
|
authenticationAuthority=authenticationAuthority,
|
||||||
hashedPassword=self._getPasswordHash(password) if password else None,
|
hashedPassword=self._getPasswordHash(password) if password else None,
|
||||||
|
|
@ -439,11 +440,11 @@ class GatewayInterface:
|
||||||
|
|
||||||
def disableUser(self, userId: str) -> User:
|
def disableUser(self, userId: str) -> User:
|
||||||
"""Disables a user if current user has permission."""
|
"""Disables a user if current user has permission."""
|
||||||
return self.updateUser(userId, {"disabled": True})
|
return self.updateUser(userId, {"enabled": False})
|
||||||
|
|
||||||
def enableUser(self, userId: str) -> User:
|
def enableUser(self, userId: str) -> User:
|
||||||
"""Enables a user if current user has permission."""
|
"""Enables a user if current user has permission."""
|
||||||
return self.updateUser(userId, {"disabled": False})
|
return self.updateUser(userId, {"enabled": True})
|
||||||
|
|
||||||
def _deleteUserReferencedData(self, userId: str) -> None:
|
def _deleteUserReferencedData(self, userId: str) -> None:
|
||||||
"""Deletes all data associated with a user."""
|
"""Deletes all data associated with a user."""
|
||||||
|
|
@ -479,6 +480,38 @@ class GatewayInterface:
|
||||||
logger.error(f"Error deleting referenced data for user {userId}: {str(e)}")
|
logger.error(f"Error deleting referenced data for user {userId}: {str(e)}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def deleteUser(self, userId: str) -> bool:
|
||||||
|
"""Deletes a user if current user has permission."""
|
||||||
|
try:
|
||||||
|
# Get user
|
||||||
|
user = self.getUser(userId)
|
||||||
|
if not user:
|
||||||
|
raise ValueError(f"User {userId} not found")
|
||||||
|
|
||||||
|
if not self._canModify("users", userId):
|
||||||
|
raise PermissionError(f"No permission to delete user {userId}")
|
||||||
|
|
||||||
|
# Delete all referenced data first
|
||||||
|
self._deleteUserReferencedData(userId)
|
||||||
|
|
||||||
|
# Delete user record
|
||||||
|
success = self.db.recordDelete("users", userId)
|
||||||
|
if not success:
|
||||||
|
raise ValueError(f"Failed to delete user {userId}")
|
||||||
|
|
||||||
|
# Clear both table and metadata caches
|
||||||
|
if hasattr(self.db, '_tablesCache') and "users" in self.db._tablesCache:
|
||||||
|
del self.db._tablesCache["users"]
|
||||||
|
if hasattr(self.db, '_tableMetadataCache') and "users" in self.db._tableMetadataCache:
|
||||||
|
del self.db._tableMetadataCache["users"]
|
||||||
|
|
||||||
|
logger.info(f"User {userId} successfully deleted")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error deleting user: {str(e)}")
|
||||||
|
raise ValueError(f"Failed to delete user: {str(e)}")
|
||||||
|
|
||||||
# Mandate methods
|
# Mandate methods
|
||||||
|
|
||||||
def getAllMandates(self) -> List[Mandate]:
|
def getAllMandates(self) -> List[Mandate]:
|
||||||
|
|
@ -520,14 +553,22 @@ class GatewayInterface:
|
||||||
def updateMandate(self, mandateId: str, updateData: Dict[str, Any]) -> Mandate:
|
def updateMandate(self, mandateId: str, updateData: Dict[str, Any]) -> Mandate:
|
||||||
"""Updates a mandate if user has access."""
|
"""Updates a mandate if user has access."""
|
||||||
try:
|
try:
|
||||||
# Get mandate
|
logger.debug(f"Updating mandate {mandateId} with data: {updateData}")
|
||||||
|
|
||||||
|
# First check if user has permission to modify mandates
|
||||||
|
if not self._canModify("mandates", mandateId):
|
||||||
|
raise PermissionError(f"No permission to update mandate {mandateId}")
|
||||||
|
|
||||||
|
# Get mandate with access control
|
||||||
mandate = self.getMandate(mandateId)
|
mandate = self.getMandate(mandateId)
|
||||||
|
logger.debug(f"Retrieved mandate: {mandate}")
|
||||||
if not mandate:
|
if not mandate:
|
||||||
raise ValueError(f"Mandate {mandateId} not found")
|
raise ValueError(f"Mandate {mandateId} not found")
|
||||||
|
|
||||||
# Update mandate data using model
|
# Update mandate data using model
|
||||||
updatedData = mandate.to_dict()
|
updatedData = mandate.to_dict()
|
||||||
updatedData.update(updateData)
|
updatedData.update(updateData)
|
||||||
|
logger.debug(f"Updated data: {updatedData}")
|
||||||
updatedMandate = Mandate.from_dict(updatedData)
|
updatedMandate = Mandate.from_dict(updatedData)
|
||||||
|
|
||||||
# Update mandate record
|
# Update mandate record
|
||||||
|
|
|
||||||
|
|
@ -11,9 +11,9 @@ from modules.shared.attributeUtils import register_model_labels, AttributeDefini
|
||||||
|
|
||||||
class AuthAuthority(str, Enum):
|
class AuthAuthority(str, Enum):
|
||||||
"""Authentication authority enum"""
|
"""Authentication authority enum"""
|
||||||
LOCAL = "Local"
|
LOCAL = "local"
|
||||||
GOOGLE = "Google"
|
GOOGLE = "google"
|
||||||
MSFT = "Msft"
|
MSFT = "msft"
|
||||||
|
|
||||||
class UserPrivilege(str, Enum):
|
class UserPrivilege(str, Enum):
|
||||||
"""User privilege levels"""
|
"""User privilege levels"""
|
||||||
|
|
@ -33,6 +33,7 @@ class Mandate(BaseModel, ModelMixin):
|
||||||
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the mandate")
|
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the mandate")
|
||||||
name: str = Field(description="Name of the mandate")
|
name: str = Field(description="Name of the mandate")
|
||||||
language: str = Field(default="en", description="Default language of the mandate")
|
language: str = Field(default="en", description="Default language of the mandate")
|
||||||
|
enabled: bool = Field(default=True, description="Indicates whether the mandate is enabled")
|
||||||
|
|
||||||
# Register labels for Mandate
|
# Register labels for Mandate
|
||||||
register_model_labels(
|
register_model_labels(
|
||||||
|
|
@ -41,7 +42,8 @@ register_model_labels(
|
||||||
{
|
{
|
||||||
"id": {"en": "ID", "fr": "ID"},
|
"id": {"en": "ID", "fr": "ID"},
|
||||||
"name": {"en": "Name", "fr": "Nom"},
|
"name": {"en": "Name", "fr": "Nom"},
|
||||||
"language": {"en": "Language", "fr": "Langue"}
|
"language": {"en": "Language", "fr": "Langue"},
|
||||||
|
"enabled": {"en": "Enabled", "fr": "Activé"}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -131,7 +133,7 @@ class User(BaseModel, ModelMixin):
|
||||||
email: Optional[EmailStr] = Field(None, description="Email address of the user")
|
email: Optional[EmailStr] = Field(None, description="Email address of the user")
|
||||||
fullName: Optional[str] = Field(None, description="Full name of the user")
|
fullName: Optional[str] = Field(None, description="Full name of the user")
|
||||||
language: str = Field(default="en", description="Preferred language of the user")
|
language: str = Field(default="en", description="Preferred language of the user")
|
||||||
disabled: bool = Field(default=False, description="Indicates whether the user is disabled")
|
enabled: bool = Field(default=True, description="Indicates whether the user is enabled")
|
||||||
privilege: UserPrivilege = Field(default=UserPrivilege.USER, description="Permission level")
|
privilege: UserPrivilege = Field(default=UserPrivilege.USER, description="Permission level")
|
||||||
authenticationAuthority: AuthAuthority = Field(default=AuthAuthority.LOCAL, description="Primary authentication authority")
|
authenticationAuthority: AuthAuthority = Field(default=AuthAuthority.LOCAL, description="Primary authentication authority")
|
||||||
mandateId: Optional[str] = Field(None, description="ID of the mandate this user belongs to")
|
mandateId: Optional[str] = Field(None, description="ID of the mandate this user belongs to")
|
||||||
|
|
@ -147,7 +149,7 @@ register_model_labels(
|
||||||
"email": {"en": "Email", "fr": "Email"},
|
"email": {"en": "Email", "fr": "Email"},
|
||||||
"fullName": {"en": "Full Name", "fr": "Nom complet"},
|
"fullName": {"en": "Full Name", "fr": "Nom complet"},
|
||||||
"language": {"en": "Language", "fr": "Langue"},
|
"language": {"en": "Language", "fr": "Langue"},
|
||||||
"disabled": {"en": "Disabled", "fr": "Désactivé"},
|
"enabled": {"en": "Enabled", "fr": "Activé"},
|
||||||
"privilege": {"en": "Privilege", "fr": "Privilège"},
|
"privilege": {"en": "Privilege", "fr": "Privilège"},
|
||||||
"authenticationAuthority": {"en": "Auth Authority", "fr": "Autorité d'authentification"},
|
"authenticationAuthority": {"en": "Auth Authority", "fr": "Autorité d'authentification"},
|
||||||
"mandateId": {"en": "Mandate ID", "fr": "ID de mandat"},
|
"mandateId": {"en": "Mandate ID", "fr": "ID de mandat"},
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ from pydantic import BaseModel
|
||||||
from modules.security.auth import limiter, getCurrentUser
|
from modules.security.auth import limiter, getCurrentUser
|
||||||
|
|
||||||
# Import interfaces
|
# Import interfaces
|
||||||
import modules.interfaces.serviceManagementClass as serviceManagementClass
|
import modules.interfaces.serviceAppClass as serviceAppClass
|
||||||
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse, AttributeDefinition
|
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse, AttributeDefinition
|
||||||
|
|
||||||
# Import the model classes
|
# Import the model classes
|
||||||
|
|
@ -44,9 +44,9 @@ async def get_mandates(
|
||||||
) -> List[Mandate]:
|
) -> List[Mandate]:
|
||||||
"""Get all mandates"""
|
"""Get all mandates"""
|
||||||
try:
|
try:
|
||||||
managementInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
mandates = managementInterface.getMandates()
|
mandates = appInterface.getAllMandates()
|
||||||
return [Mandate.from_dict(mandate) for mandate in mandates]
|
return mandates
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error getting mandates: {str(e)}")
|
logger.error(f"Error getting mandates: {str(e)}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|
@ -63,8 +63,8 @@ async def get_mandate(
|
||||||
) -> Mandate:
|
) -> Mandate:
|
||||||
"""Get a specific mandate by ID"""
|
"""Get a specific mandate by ID"""
|
||||||
try:
|
try:
|
||||||
managementInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
mandate = managementInterface.getMandate(mandateId)
|
mandate = appInterface.getMandate(mandateId)
|
||||||
|
|
||||||
if not mandate:
|
if not mandate:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|
@ -72,7 +72,7 @@ async def get_mandate(
|
||||||
detail=f"Mandate with ID {mandateId} not found"
|
detail=f"Mandate with ID {mandateId} not found"
|
||||||
)
|
)
|
||||||
|
|
||||||
return Mandate.from_dict(mandate)
|
return mandate
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -91,13 +91,13 @@ async def create_mandate(
|
||||||
) -> Mandate:
|
) -> Mandate:
|
||||||
"""Create a new mandate"""
|
"""Create a new mandate"""
|
||||||
try:
|
try:
|
||||||
managementInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
|
|
||||||
# Convert Mandate to dict for interface
|
|
||||||
mandate_data = mandateData.to_dict()
|
|
||||||
|
|
||||||
# Create mandate
|
# Create mandate
|
||||||
newMandate = managementInterface.createMandate(mandate_data)
|
newMandate = appInterface.createMandate(
|
||||||
|
name=mandateData.name,
|
||||||
|
language=mandateData.language
|
||||||
|
)
|
||||||
|
|
||||||
if not newMandate:
|
if not newMandate:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|
@ -105,7 +105,7 @@ async def create_mandate(
|
||||||
detail="Failed to create mandate"
|
detail="Failed to create mandate"
|
||||||
)
|
)
|
||||||
|
|
||||||
return Mandate.from_dict(newMandate)
|
return newMandate
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -125,21 +125,18 @@ async def update_mandate(
|
||||||
) -> Mandate:
|
) -> Mandate:
|
||||||
"""Update an existing mandate"""
|
"""Update an existing mandate"""
|
||||||
try:
|
try:
|
||||||
managementInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
|
|
||||||
# Check if mandate exists
|
# Check if mandate exists
|
||||||
existingMandate = managementInterface.getMandate(mandateId)
|
existingMandate = appInterface.getMandate(mandateId)
|
||||||
if not existingMandate:
|
if not existingMandate:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_404_NOT_FOUND,
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
detail=f"Mandate with ID {mandateId} not found"
|
detail=f"Mandate with ID {mandateId} not found"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Convert Mandate to dict for interface
|
|
||||||
update_data = mandateData.to_dict()
|
|
||||||
|
|
||||||
# Update mandate
|
# Update mandate
|
||||||
updatedMandate = managementInterface.updateMandate(mandateId, update_data)
|
updatedMandate = appInterface.updateMandate(mandateId, mandateData.to_dict())
|
||||||
|
|
||||||
if not updatedMandate:
|
if not updatedMandate:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|
@ -147,7 +144,7 @@ async def update_mandate(
|
||||||
detail="Failed to update mandate"
|
detail="Failed to update mandate"
|
||||||
)
|
)
|
||||||
|
|
||||||
return Mandate.from_dict(updatedMandate)
|
return updatedMandate
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -166,10 +163,10 @@ async def delete_mandate(
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Delete a mandate"""
|
"""Delete a mandate"""
|
||||||
try:
|
try:
|
||||||
appInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
|
|
||||||
# Check if mandate exists
|
# Check if mandate exists
|
||||||
existingMandate = appInterface.getMandateById(mandateId)
|
existingMandate = appInterface.getMandate(mandateId)
|
||||||
if not existingMandate:
|
if not existingMandate:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_404_NOT_FOUND,
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
|
|
||||||
|
|
@ -54,11 +54,6 @@ async def create_prompt(
|
||||||
# Create prompt
|
# Create prompt
|
||||||
newPrompt = managementInterface.createPrompt(prompt_data)
|
newPrompt = managementInterface.createPrompt(prompt_data)
|
||||||
|
|
||||||
# Set current time for createdAt if it exists in the model
|
|
||||||
promptAttributes = getModelAttributeDefinitions(Prompt)
|
|
||||||
if "createdAt" in promptAttributes["attributes"] and hasattr(newPrompt, "createdAt"):
|
|
||||||
newPrompt["createdAt"] = datetime.now().isoformat()
|
|
||||||
|
|
||||||
return Prompt.from_dict(newPrompt)
|
return Prompt.from_dict(newPrompt)
|
||||||
|
|
||||||
@router.get("/{promptId}", response_model=Prompt)
|
@router.get("/{promptId}", response_model=Prompt)
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import os
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
# Import interfaces and models
|
# Import interfaces and models
|
||||||
import modules.interfaces.serviceManagementClass as serviceManagementClass
|
import modules.interfaces.serviceAppClass as serviceAppClass
|
||||||
from modules.security.auth import getCurrentUser, limiter, getCurrentUser
|
from modules.security.auth import getCurrentUser, limiter, getCurrentUser
|
||||||
|
|
||||||
# Import the attribute definition and helper functions
|
# Import the attribute definition and helper functions
|
||||||
|
|
@ -34,13 +34,17 @@ router = APIRouter(
|
||||||
@limiter.limit("30/minute")
|
@limiter.limit("30/minute")
|
||||||
async def get_users(
|
async def get_users(
|
||||||
request: Request,
|
request: Request,
|
||||||
|
mandateId: Optional[str] = None,
|
||||||
currentUser: User = Depends(getCurrentUser)
|
currentUser: User = Depends(getCurrentUser)
|
||||||
) -> List[User]:
|
) -> List[User]:
|
||||||
"""Get all users in the current mandate"""
|
"""Get all users in the current mandate"""
|
||||||
try:
|
try:
|
||||||
managementInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
users = managementInterface.getUsers()
|
# If mandateId is provided, use it, otherwise use the current user's mandate
|
||||||
return [User.from_dict(user) for user in users]
|
targetMandateId = mandateId or currentUser.mandateId
|
||||||
|
# Get all users without filtering by enabled status
|
||||||
|
users = appInterface.getUsersByMandate(targetMandateId)
|
||||||
|
return users
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error getting users: {str(e)}")
|
logger.error(f"Error getting users: {str(e)}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|
@ -57,8 +61,9 @@ async def get_user(
|
||||||
) -> User:
|
) -> User:
|
||||||
"""Get a specific user by ID"""
|
"""Get a specific user by ID"""
|
||||||
try:
|
try:
|
||||||
managementInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
user = managementInterface.getUser(userId)
|
# Get user without filtering by enabled status
|
||||||
|
user = appInterface.getUser(userId)
|
||||||
|
|
||||||
if not user:
|
if not user:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|
@ -66,7 +71,7 @@ async def get_user(
|
||||||
detail=f"User with ID {userId} not found"
|
detail=f"User with ID {userId} not found"
|
||||||
)
|
)
|
||||||
|
|
||||||
return User.from_dict(user)
|
return user
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -80,24 +85,19 @@ async def get_user(
|
||||||
@limiter.limit("10/minute")
|
@limiter.limit("10/minute")
|
||||||
async def create_user(
|
async def create_user(
|
||||||
request: Request,
|
request: Request,
|
||||||
user: User,
|
user_data: User = Body(...),
|
||||||
currentUser: User = Depends(getCurrentUser)
|
currentUser: User = Depends(getCurrentUser)
|
||||||
) -> User:
|
) -> User:
|
||||||
"""Create a new user"""
|
"""Create a new user"""
|
||||||
managementInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
|
|
||||||
# Convert User to dict for interface
|
# Convert User to dict for interface
|
||||||
user_data = user.to_dict()
|
user_dict = user_data.dict()
|
||||||
|
|
||||||
# Create user
|
# Create user
|
||||||
newUser = managementInterface.createUser(user_data)
|
newUser = appInterface.createUser(user_dict)
|
||||||
|
|
||||||
# Set current time for createdAt if it exists in the model
|
return newUser
|
||||||
userAttributes = getModelAttributeDefinitions(User)
|
|
||||||
if "createdAt" in userAttributes["attributes"] and hasattr(newUser, "createdAt"):
|
|
||||||
newUser["createdAt"] = datetime.now().isoformat()
|
|
||||||
|
|
||||||
return User.from_dict(newUser)
|
|
||||||
|
|
||||||
@router.put("/{userId}", response_model=User)
|
@router.put("/{userId}", response_model=User)
|
||||||
@limiter.limit("10/minute")
|
@limiter.limit("10/minute")
|
||||||
|
|
@ -108,10 +108,10 @@ async def update_user(
|
||||||
currentUser: User = Depends(getCurrentUser)
|
currentUser: User = Depends(getCurrentUser)
|
||||||
) -> User:
|
) -> User:
|
||||||
"""Update an existing user"""
|
"""Update an existing user"""
|
||||||
managementInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
|
|
||||||
# Check if the user exists
|
# Check if the user exists
|
||||||
existingUser = managementInterface.getUser(userId)
|
existingUser = appInterface.getUser(userId)
|
||||||
if not existingUser:
|
if not existingUser:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_404_NOT_FOUND,
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
|
@ -119,10 +119,10 @@ async def update_user(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Convert User to dict for interface
|
# Convert User to dict for interface
|
||||||
update_data = userData.to_dict()
|
update_data = userData.dict()
|
||||||
|
|
||||||
# Update user
|
# Update user
|
||||||
updatedUser = managementInterface.updateUser(userId, update_data)
|
updatedUser = appInterface.updateUser(userId, update_data)
|
||||||
|
|
||||||
if not updatedUser:
|
if not updatedUser:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|
@ -130,7 +130,7 @@ async def update_user(
|
||||||
detail="Error updating the user"
|
detail="Error updating the user"
|
||||||
)
|
)
|
||||||
|
|
||||||
return User.from_dict(updatedUser)
|
return updatedUser
|
||||||
|
|
||||||
@router.delete("/{userId}", response_model=Dict[str, Any])
|
@router.delete("/{userId}", response_model=Dict[str, Any])
|
||||||
@limiter.limit("10/minute")
|
@limiter.limit("10/minute")
|
||||||
|
|
@ -140,7 +140,7 @@ async def delete_user(
|
||||||
currentUser: User = Depends(getCurrentUser)
|
currentUser: User = Depends(getCurrentUser)
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Delete a user"""
|
"""Delete a user"""
|
||||||
appInterface = serviceManagementClass.getInterface(currentUser)
|
appInterface = serviceAppClass.getInterface(currentUser)
|
||||||
|
|
||||||
# Check if the user exists
|
# Check if the user exists
|
||||||
existingUser = appInterface.getUser(userId)
|
existingUser = appInterface.getUser(userId)
|
||||||
|
|
|
||||||
|
|
@ -156,25 +156,25 @@ async def register_user(
|
||||||
# Set the mandate ID on the interface
|
# Set the mandate ID on the interface
|
||||||
appInterface.mandateId = defaultMandateId
|
appInterface.mandateId = defaultMandateId
|
||||||
|
|
||||||
# Create user with individual parameters
|
# Create user with local authentication
|
||||||
newUser = appInterface.createUser(
|
user = appInterface.createUser(
|
||||||
username=userData.username,
|
username=userData.username,
|
||||||
password=password, # Pass the plain text password - createUser will hash it
|
password=password,
|
||||||
email=userData.email,
|
email=userData.email,
|
||||||
fullName=userData.fullName,
|
fullName=userData.fullName,
|
||||||
language=userData.language,
|
language=userData.language,
|
||||||
disabled=userData.disabled,
|
enabled=userData.enabled,
|
||||||
privilege=userData.privilege,
|
privilege=userData.privilege,
|
||||||
authenticationAuthority=userData.authenticationAuthority
|
authenticationAuthority=AuthAuthority.LOCAL
|
||||||
)
|
)
|
||||||
|
|
||||||
if not newUser:
|
if not user:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
detail="Failed to register user"
|
detail="Failed to register user"
|
||||||
)
|
)
|
||||||
|
|
||||||
return newUser
|
return user
|
||||||
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,8 @@ def _getUserBase(token: str = Depends(oauth2Scheme)) -> User:
|
||||||
logger.warning(f"User {username} not found")
|
logger.warning(f"User {username} not found")
|
||||||
raise credentialsException
|
raise credentialsException
|
||||||
|
|
||||||
if user.disabled:
|
# Check if user is enabled
|
||||||
|
if not user.enabled:
|
||||||
logger.warning(f"User {username} is disabled")
|
logger.warning(f"User {username} is disabled")
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is disabled")
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is disabled")
|
||||||
|
|
||||||
|
|
@ -139,7 +140,8 @@ def _getUserBase(token: str = Depends(oauth2Scheme)) -> User:
|
||||||
|
|
||||||
def getCurrentUser(currentUser: User = Depends(_getUserBase)) -> User:
|
def getCurrentUser(currentUser: User = Depends(_getUserBase)) -> User:
|
||||||
"""Get current active user with additional validation."""
|
"""Get current active user with additional validation."""
|
||||||
if currentUser.disabled:
|
# Check if current user is enabled
|
||||||
|
if not currentUser.enabled:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_403_FORBIDDEN,
|
status_code=status.HTTP_403_FORBIDDEN,
|
||||||
detail="User is disabled"
|
detail="User is disabled"
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,53 @@
|
||||||
....................... TASKS
|
....................... TASKS
|
||||||
|
|
||||||
|
FIXES:
|
||||||
|
- can you add generic sorting by field and pagination to the table, also to have possibility to move table borders
|
||||||
|
|
||||||
|
now to adapt files.js with following specifics:
|
||||||
|
- ADD function: File upload instead of form
|
||||||
|
- MandateId not editable, to use user's mandateId
|
||||||
|
- Actions: Edit, Delete
|
||||||
|
|
||||||
|
|
||||||
|
now to adapt files.js with following specifics:
|
||||||
|
- ADD function: File upload instead of form
|
||||||
|
- MandateId not editable, to use user's mandateId
|
||||||
|
- Actions: Edit, Delete
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
To remove unused functions in files.js and to consolidate it to match new formGeneric.
|
||||||
|
|
||||||
|
FormGeneric
|
||||||
|
- ADD prompt: no mandateid for new prompt in the form
|
||||||
|
- ALLE: After add, modify, delete to reload data new from api
|
||||||
|
- ADD User: Kein Passwortfeld, connections noch drin
|
||||||
|
- DELETE User: Fehler
|
||||||
|
- UPDATE Mandate: enabled als Text statt boolean
|
||||||
|
- ADD Mandate: error sending
|
||||||
|
- Connections: Missing
|
||||||
|
|
||||||
|
UAC:
|
||||||
|
- Users disabled nicht sichtbar!
|
||||||
|
|
||||||
|
Workflow:
|
||||||
|
- Liste der Prompts nicht aktualisiert nach Änderungen in FormGeneric
|
||||||
|
-
|
||||||
|
|
||||||
|
- diese seite gesucht: "GET /.well-known/appspecific/com.chrome.devtools.json HTTP/1.1" 404 ?
|
||||||
|
|
||||||
|
|
||||||
|
Test paths:
|
||||||
|
- Admin
|
||||||
|
- User
|
||||||
|
- MSFT-Google
|
||||||
|
- Alle Management items
|
||||||
|
- Workflow
|
||||||
|
- Connections on/off
|
||||||
|
- Mail 2 Connectors
|
||||||
|
|
||||||
|
|
||||||
Agents and Manager:
|
Agents and Manager:
|
||||||
- To adapt prompts to match document handling, done by agents
|
- To adapt prompts to match document handling, done by agents
|
||||||
- agents to use service object and to work stepwise:
|
- agents to use service object and to work stepwise:
|
||||||
|
|
|
||||||
Binary file not shown.
|
Before Width: | Height: | Size: 3.8 KiB |
|
|
@ -1 +0,0 @@
|
||||||
OK 1.0
|
|
||||||
926
tool_test01ui.py
Normal file
926
tool_test01ui.py
Normal file
|
|
@ -0,0 +1,926 @@
|
||||||
|
"""
|
||||||
|
UI Test Procedure for PowerOn Frontend
|
||||||
|
Tests CRUD operations, user registration, authentication, and access control
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Dict, List, Any
|
||||||
|
import requests
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import Enum
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
handlers=[
|
||||||
|
logging.FileHandler('ui_test_report.log'),
|
||||||
|
logging.StreamHandler(sys.stdout)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Test configuration
|
||||||
|
BASE_URL = "http://localhost:8080" # Adjust based on your frontend URL
|
||||||
|
API_URL = "http://localhost:8000" # Adjust based on your backend URL
|
||||||
|
|
||||||
|
class UserRole(Enum):
|
||||||
|
SYSADMIN = "sysadmin"
|
||||||
|
ADMIN = "admin"
|
||||||
|
USER = "user"
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TestResult:
|
||||||
|
test_name: str
|
||||||
|
success: bool
|
||||||
|
message: str
|
||||||
|
details: Dict[str, Any] = None
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TestReport:
|
||||||
|
timestamp: str
|
||||||
|
total_tests: int
|
||||||
|
passed_tests: int
|
||||||
|
failed_tests: int
|
||||||
|
results: List[TestResult]
|
||||||
|
bugs_found: List[Dict[str, str]]
|
||||||
|
required_adaptations: List[Dict[str, str]]
|
||||||
|
|
||||||
|
class UITestSuite:
|
||||||
|
def __init__(self):
|
||||||
|
self.session = requests.Session()
|
||||||
|
self.test_results = []
|
||||||
|
self.bugs_found = []
|
||||||
|
self.required_adaptations = []
|
||||||
|
self.current_user = None
|
||||||
|
self.current_role = None
|
||||||
|
|
||||||
|
def run_all_tests(self):
|
||||||
|
"""Run all test categories"""
|
||||||
|
logger.info("Starting UI Test Suite")
|
||||||
|
|
||||||
|
# Test user registration and authentication
|
||||||
|
self.test_user_registration()
|
||||||
|
self.test_user_authentication()
|
||||||
|
|
||||||
|
# Test CRUD operations for each module
|
||||||
|
self.test_files_module()
|
||||||
|
self.test_mandates_module()
|
||||||
|
self.test_prompts_module()
|
||||||
|
self.test_users_module()
|
||||||
|
|
||||||
|
# Generate and save report
|
||||||
|
self.generate_report()
|
||||||
|
|
||||||
|
def test_user_registration(self):
|
||||||
|
"""Test user registration for different roles"""
|
||||||
|
logger.info("Testing User Registration")
|
||||||
|
|
||||||
|
test_users = [
|
||||||
|
{"username": "test_sysadmin", "password": "Test123!", "role": UserRole.SYSADMIN},
|
||||||
|
{"username": "test_admin", "password": "Test123!", "role": UserRole.ADMIN},
|
||||||
|
{"username": "test_user", "password": "Test123!", "role": UserRole.USER}
|
||||||
|
]
|
||||||
|
|
||||||
|
for user in test_users:
|
||||||
|
try:
|
||||||
|
# Create userData object matching User model
|
||||||
|
user_data = {
|
||||||
|
"username": user["username"],
|
||||||
|
"email": f"{user['username']}@test.com",
|
||||||
|
"fullName": f"Test {user['role'].value}",
|
||||||
|
"language": "en",
|
||||||
|
"enabled": True,
|
||||||
|
"privilege": user["role"].value,
|
||||||
|
"authenticationAuthority": "local",
|
||||||
|
"mandateId": None, # Will be set by the backend
|
||||||
|
"connections": []
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.post(
|
||||||
|
f"{API_URL}/api/local/register",
|
||||||
|
json={
|
||||||
|
"userData": user_data,
|
||||||
|
"password": user["password"]
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
"X-CSRF-Token": "test-csrf-token",
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
logger.info(f"Successfully registered {user['role'].value} user")
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Register {user['role'].value}",
|
||||||
|
True,
|
||||||
|
f"Successfully registered {user['role'].value} user"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
error_msg = f"Failed to register {user['role'].value} user: {response.status_code}"
|
||||||
|
if response.text:
|
||||||
|
error_msg += f" - {response.text}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Register {user['role'].value}",
|
||||||
|
False,
|
||||||
|
error_msg,
|
||||||
|
{"status_code": response.status_code, "response": response.text}
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Exception during registration: {str(e)}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Register {user['role'].value}",
|
||||||
|
False,
|
||||||
|
error_msg
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_user_authentication(self):
|
||||||
|
"""Test login and logout for different roles"""
|
||||||
|
logger.info("Testing User Authentication")
|
||||||
|
|
||||||
|
test_users = [
|
||||||
|
{"username": "test_sysadmin", "password": "Test123!", "role": UserRole.SYSADMIN},
|
||||||
|
{"username": "test_admin", "password": "Test123!", "role": UserRole.ADMIN},
|
||||||
|
{"username": "test_user", "password": "Test123!", "role": UserRole.USER}
|
||||||
|
]
|
||||||
|
|
||||||
|
for user in test_users:
|
||||||
|
# Test login
|
||||||
|
try:
|
||||||
|
response = self.session.post(
|
||||||
|
f"{API_URL}/api/local/login",
|
||||||
|
data={
|
||||||
|
"username": user["username"],
|
||||||
|
"password": user["password"]
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
"X-CSRF-Token": "test-csrf-token" # Add CSRF token
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Login {user['role'].value}",
|
||||||
|
True,
|
||||||
|
f"Successfully logged in as {user['role'].value}"
|
||||||
|
))
|
||||||
|
|
||||||
|
# Test logout
|
||||||
|
logout_response = self.session.post(f"{API_URL}/api/security/local/logout")
|
||||||
|
if logout_response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Logout {user['role'].value}",
|
||||||
|
True,
|
||||||
|
f"Successfully logged out as {user['role'].value}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Logout {user['role'].value}",
|
||||||
|
False,
|
||||||
|
f"Failed to logout as {user['role'].value}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Login {user['role'].value}",
|
||||||
|
False,
|
||||||
|
f"Failed to login as {user['role'].value}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Login {user['role'].value}",
|
||||||
|
False,
|
||||||
|
f"Exception during login: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_files_module(self):
|
||||||
|
"""Test CRUD operations for files module"""
|
||||||
|
logger.info("Testing Files Module")
|
||||||
|
|
||||||
|
# Test for each role
|
||||||
|
for role in UserRole:
|
||||||
|
self.current_role = role
|
||||||
|
self._login_as_role(role)
|
||||||
|
|
||||||
|
# Test view files
|
||||||
|
self._test_view_files()
|
||||||
|
|
||||||
|
# Test create file
|
||||||
|
self._test_create_file()
|
||||||
|
|
||||||
|
# Test modify file
|
||||||
|
self._test_modify_file()
|
||||||
|
|
||||||
|
# Test delete file
|
||||||
|
self._test_delete_file()
|
||||||
|
|
||||||
|
self._logout()
|
||||||
|
|
||||||
|
def test_mandates_module(self):
|
||||||
|
"""Test CRUD operations for mandates module"""
|
||||||
|
logger.info("Testing Mandates Module")
|
||||||
|
|
||||||
|
for role in UserRole:
|
||||||
|
self.current_role = role
|
||||||
|
self._login_as_role(role)
|
||||||
|
|
||||||
|
# Test view mandates
|
||||||
|
self._test_view_mandates()
|
||||||
|
|
||||||
|
# Test create mandate
|
||||||
|
self._test_create_mandate()
|
||||||
|
|
||||||
|
# Test modify mandate
|
||||||
|
self._test_modify_mandate()
|
||||||
|
|
||||||
|
# Test delete mandate
|
||||||
|
self._test_delete_mandate()
|
||||||
|
|
||||||
|
self._logout()
|
||||||
|
|
||||||
|
def test_prompts_module(self):
|
||||||
|
"""Test CRUD operations for prompts module"""
|
||||||
|
logger.info("Testing Prompts Module")
|
||||||
|
|
||||||
|
for role in UserRole:
|
||||||
|
self.current_role = role
|
||||||
|
self._login_as_role(role)
|
||||||
|
|
||||||
|
# Test view prompts
|
||||||
|
self._test_view_prompts()
|
||||||
|
|
||||||
|
# Test create prompt
|
||||||
|
self._test_create_prompt()
|
||||||
|
|
||||||
|
# Test modify prompt
|
||||||
|
self._test_modify_prompt()
|
||||||
|
|
||||||
|
# Test delete prompt
|
||||||
|
self._test_delete_prompt()
|
||||||
|
|
||||||
|
self._logout()
|
||||||
|
|
||||||
|
def test_users_module(self):
|
||||||
|
"""Test CRUD operations for users module"""
|
||||||
|
logger.info("Testing Users Module")
|
||||||
|
|
||||||
|
for role in UserRole:
|
||||||
|
self.current_role = role
|
||||||
|
self._login_as_role(role)
|
||||||
|
|
||||||
|
# Test view users
|
||||||
|
self._test_view_users()
|
||||||
|
|
||||||
|
# Test create user
|
||||||
|
self._test_create_user()
|
||||||
|
|
||||||
|
# Test modify user
|
||||||
|
self._test_modify_user()
|
||||||
|
|
||||||
|
# Test delete user
|
||||||
|
self._test_delete_user()
|
||||||
|
|
||||||
|
self._logout()
|
||||||
|
|
||||||
|
def _login_as_role(self, role: UserRole):
|
||||||
|
"""Helper method to login as a specific role"""
|
||||||
|
username = f"test_{role.value}"
|
||||||
|
max_retries = 3
|
||||||
|
retry_delay = 12 # 12 seconds delay between retries (to stay under 5 per minute)
|
||||||
|
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
# Always wait before attempting login
|
||||||
|
if attempt == 0:
|
||||||
|
logger.info(f"Waiting {retry_delay}s before first login attempt for {role.value}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Rate limit reached, waiting {retry_delay}s before retry {attempt + 1} for {role.value} login")
|
||||||
|
time.sleep(retry_delay)
|
||||||
|
|
||||||
|
response = self.session.post(
|
||||||
|
f"{API_URL}/api/local/login",
|
||||||
|
data={
|
||||||
|
"username": username,
|
||||||
|
"password": "Test123!"
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
"X-CSRF-Token": "test-csrf-token"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.current_user = response.json()
|
||||||
|
logger.info(f"Successfully logged in as {role.value}")
|
||||||
|
return
|
||||||
|
elif response.status_code == 429:
|
||||||
|
logger.info(f"Rate limit reached for {role.value} login: {response.text}")
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
logger.error(f"Max retries reached for {role.value} login after rate limits")
|
||||||
|
raise Exception(f"Max retries reached for {role.value} login after rate limits")
|
||||||
|
else:
|
||||||
|
error_msg = f"Failed to login as {role.value}: {response.status_code}"
|
||||||
|
if response.text:
|
||||||
|
error_msg += f" - {response.text}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
raise Exception(error_msg)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if attempt == max_retries - 1:
|
||||||
|
logger.error(f"Login error for {role.value} after {max_retries} attempts: {str(e)}")
|
||||||
|
raise
|
||||||
|
continue
|
||||||
|
|
||||||
|
def _logout(self):
|
||||||
|
"""Helper method to logout"""
|
||||||
|
self.session.post(f"{API_URL}/api/security/local/logout")
|
||||||
|
self.current_user = None
|
||||||
|
|
||||||
|
def _test_view_files(self):
|
||||||
|
"""Test viewing files"""
|
||||||
|
try:
|
||||||
|
response = self.session.get(f"{API_URL}/api/files")
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Files as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully viewed files"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Files as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to view files: {response.status_code}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Files as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception viewing files: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_create_file(self):
|
||||||
|
"""Test creating a file"""
|
||||||
|
try:
|
||||||
|
# Create a test file
|
||||||
|
test_file = {
|
||||||
|
"name": f"test_file_{self.current_role.value}",
|
||||||
|
"content": "Test content",
|
||||||
|
"type": "text/plain"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.post(
|
||||||
|
f"{API_URL}/api/files",
|
||||||
|
json=test_file
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create File as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully created file"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create File as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to create file: {response.status_code}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create File as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception creating file: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_modify_file(self):
|
||||||
|
"""Test modifying a file"""
|
||||||
|
try:
|
||||||
|
# First get a file to modify
|
||||||
|
files_response = self.session.get(f"{API_URL}/api/files")
|
||||||
|
if files_response.status_code == 200 and files_response.json():
|
||||||
|
file_id = files_response.json()[0]["id"]
|
||||||
|
|
||||||
|
# Modify the file
|
||||||
|
update_data = {
|
||||||
|
"name": f"modified_file_{self.current_role.value}",
|
||||||
|
"content": "Modified content"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.put(
|
||||||
|
f"{API_URL}/api/files/{file_id}",
|
||||||
|
json=update_data
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify File as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully modified file"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify File as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to modify file: {response.status_code}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify File as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
"No files available to modify"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify File as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception modifying file: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_delete_file(self):
|
||||||
|
"""Test deleting a file"""
|
||||||
|
try:
|
||||||
|
# First get a file to delete
|
||||||
|
files_response = self.session.get(f"{API_URL}/api/files")
|
||||||
|
if files_response.status_code == 200 and files_response.json():
|
||||||
|
file_id = files_response.json()[0]["id"]
|
||||||
|
|
||||||
|
response = self.session.delete(f"{API_URL}/api/files/{file_id}")
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete File as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully deleted file"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete File as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to delete file: {response.status_code}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete File as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
"No files available to delete"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete File as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception deleting file: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_view_mandates(self):
|
||||||
|
"""Test viewing mandates"""
|
||||||
|
try:
|
||||||
|
response = self.session.get(f"{API_URL}/api/mandates")
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Mandates as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully viewed mandates"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Mandates as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to view mandates: {response.status_code}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Mandates as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception viewing mandates: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_create_mandate(self):
|
||||||
|
"""Test creating a mandate"""
|
||||||
|
try:
|
||||||
|
test_mandate = {
|
||||||
|
"name": f"test_mandate_{self.current_role.value}",
|
||||||
|
"description": "Test mandate",
|
||||||
|
"enabled": True
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.post(
|
||||||
|
f"{API_URL}/api/mandates",
|
||||||
|
json=test_mandate
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create Mandate as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully created mandate"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create Mandate as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to create mandate: {response.status_code}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create Mandate as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception creating mandate: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_modify_mandate(self):
|
||||||
|
"""Test modifying a mandate"""
|
||||||
|
try:
|
||||||
|
# First get a mandate to modify
|
||||||
|
mandates_response = self.session.get(f"{API_URL}/api/mandates")
|
||||||
|
if mandates_response.status_code == 200 and mandates_response.json():
|
||||||
|
mandate_id = mandates_response.json()[0]["id"]
|
||||||
|
|
||||||
|
update_data = {
|
||||||
|
"name": f"modified_mandate_{self.current_role.value}",
|
||||||
|
"description": "Modified mandate"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.put(
|
||||||
|
f"{API_URL}/api/mandates/{mandate_id}",
|
||||||
|
json=update_data
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify Mandate as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully modified mandate"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify Mandate as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to modify mandate: {response.status_code}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify Mandate as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
"No mandates available to modify"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify Mandate as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception modifying mandate: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_delete_mandate(self):
|
||||||
|
"""Test deleting a mandate"""
|
||||||
|
try:
|
||||||
|
# First get a mandate to delete
|
||||||
|
mandates_response = self.session.get(f"{API_URL}/api/mandates")
|
||||||
|
if mandates_response.status_code == 200 and mandates_response.json():
|
||||||
|
mandate_id = mandates_response.json()[0]["id"]
|
||||||
|
|
||||||
|
response = self.session.delete(f"{API_URL}/api/mandates/{mandate_id}")
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete Mandate as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully deleted mandate"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete Mandate as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to delete mandate: {response.status_code}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete Mandate as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
"No mandates available to delete"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete Mandate as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception deleting mandate: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_view_prompts(self):
|
||||||
|
"""Test viewing prompts"""
|
||||||
|
try:
|
||||||
|
response = self.session.get(f"{API_URL}/api/prompts")
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Prompts as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully viewed prompts"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Prompts as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to view prompts: {response.status_code}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Prompts as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception viewing prompts: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_create_prompt(self):
|
||||||
|
"""Test creating a prompt"""
|
||||||
|
try:
|
||||||
|
test_prompt = {
|
||||||
|
"name": f"test_prompt_{self.current_role.value}",
|
||||||
|
"content": "Test prompt content",
|
||||||
|
"category": "test"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.post(
|
||||||
|
f"{API_URL}/api/prompts",
|
||||||
|
json=test_prompt
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create Prompt as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully created prompt"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create Prompt as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to create prompt: {response.status_code}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create Prompt as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception creating prompt: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_modify_prompt(self):
|
||||||
|
"""Test modifying a prompt"""
|
||||||
|
try:
|
||||||
|
# First get a prompt to modify
|
||||||
|
prompts_response = self.session.get(f"{API_URL}/api/prompts")
|
||||||
|
if prompts_response.status_code == 200 and prompts_response.json():
|
||||||
|
prompt_id = prompts_response.json()[0]["id"]
|
||||||
|
|
||||||
|
update_data = {
|
||||||
|
"name": f"modified_prompt_{self.current_role.value}",
|
||||||
|
"content": "Modified prompt content"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.put(
|
||||||
|
f"{API_URL}/api/prompts/{prompt_id}",
|
||||||
|
json=update_data
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify Prompt as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully modified prompt"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify Prompt as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to modify prompt: {response.status_code}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify Prompt as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
"No prompts available to modify"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify Prompt as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception modifying prompt: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_delete_prompt(self):
|
||||||
|
"""Test deleting a prompt"""
|
||||||
|
try:
|
||||||
|
# First get a prompt to delete
|
||||||
|
prompts_response = self.session.get(f"{API_URL}/api/prompts")
|
||||||
|
if prompts_response.status_code == 200 and prompts_response.json():
|
||||||
|
prompt_id = prompts_response.json()[0]["id"]
|
||||||
|
|
||||||
|
response = self.session.delete(f"{API_URL}/api/prompts/{prompt_id}")
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete Prompt as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully deleted prompt"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete Prompt as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to delete prompt: {response.status_code}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete Prompt as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
"No prompts available to delete"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete Prompt as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception deleting prompt: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_view_users(self):
|
||||||
|
"""Test viewing users"""
|
||||||
|
try:
|
||||||
|
response = self.session.get(f"{API_URL}/api/users")
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Users as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully viewed users"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Users as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to view users: {response.status_code}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"View Users as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception viewing users: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_create_user(self):
|
||||||
|
"""Test creating a user"""
|
||||||
|
try:
|
||||||
|
test_user = {
|
||||||
|
"username": f"new_user_{self.current_role.value}",
|
||||||
|
"password": "Test123!",
|
||||||
|
"email": f"new_user_{self.current_role.value}@test.com",
|
||||||
|
"privilege": "user"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.post(
|
||||||
|
f"{API_URL}/api/users",
|
||||||
|
json=test_user
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create User as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully created user"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create User as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to create user: {response.status_code}"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Create User as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception creating user: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_modify_user(self):
|
||||||
|
"""Test modifying a user"""
|
||||||
|
try:
|
||||||
|
# First get a user to modify
|
||||||
|
users_response = self.session.get(f"{API_URL}/api/users")
|
||||||
|
if users_response.status_code == 200 and users_response.json():
|
||||||
|
user_id = users_response.json()[0]["id"]
|
||||||
|
|
||||||
|
update_data = {
|
||||||
|
"username": f"modified_user_{self.current_role.value}",
|
||||||
|
"email": f"modified_user_{self.current_role.value}@test.com"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.session.put(
|
||||||
|
f"{API_URL}/api/users/{user_id}",
|
||||||
|
json=update_data
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify User as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully modified user"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify User as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to modify user: {response.status_code}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify User as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
"No users available to modify"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Modify User as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception modifying user: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def _test_delete_user(self):
|
||||||
|
"""Test deleting a user"""
|
||||||
|
try:
|
||||||
|
# First get a user to delete
|
||||||
|
users_response = self.session.get(f"{API_URL}/api/users")
|
||||||
|
if users_response.status_code == 200 and users_response.json():
|
||||||
|
user_id = users_response.json()[0]["id"]
|
||||||
|
|
||||||
|
response = self.session.delete(f"{API_URL}/api/users/{user_id}")
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete User as {self.current_role.value}",
|
||||||
|
True,
|
||||||
|
"Successfully deleted user"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete User as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Failed to delete user: {response.status_code}"
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete User as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
"No users available to delete"
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
self.test_results.append(TestResult(
|
||||||
|
f"Delete User as {self.current_role.value}",
|
||||||
|
False,
|
||||||
|
f"Exception deleting user: {str(e)}"
|
||||||
|
))
|
||||||
|
|
||||||
|
def generate_report(self):
|
||||||
|
"""Generate test report"""
|
||||||
|
# Convert TestResult objects to dictionaries
|
||||||
|
serialized_results = [
|
||||||
|
{
|
||||||
|
"test_name": r.test_name,
|
||||||
|
"success": r.success,
|
||||||
|
"message": r.message,
|
||||||
|
"details": r.details
|
||||||
|
}
|
||||||
|
for r in self.test_results
|
||||||
|
]
|
||||||
|
|
||||||
|
report = {
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"total_tests": len(self.test_results),
|
||||||
|
"passed_tests": sum(1 for r in self.test_results if r.success),
|
||||||
|
"failed_tests": sum(1 for r in self.test_results if not r.success),
|
||||||
|
"results": serialized_results,
|
||||||
|
"bugs_found": self.bugs_found,
|
||||||
|
"required_adaptations": self.required_adaptations
|
||||||
|
}
|
||||||
|
|
||||||
|
# Save report to file
|
||||||
|
with open('ui_test_report.json', 'w') as f:
|
||||||
|
json.dump(report, f, indent=2)
|
||||||
|
|
||||||
|
# Print summary
|
||||||
|
logger.info(f"""
|
||||||
|
Test Report Summary:
|
||||||
|
-------------------
|
||||||
|
Total Tests: {report['total_tests']}
|
||||||
|
Passed: {report['passed_tests']}
|
||||||
|
Failed: {report['failed_tests']}
|
||||||
|
Bugs Found: {len(report['bugs_found'])}
|
||||||
|
Required Adaptations: {len(report['required_adaptations'])}
|
||||||
|
""")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_suite = UITestSuite()
|
||||||
|
test_suite.run_all_tests()
|
||||||
352
ui_test_report.json
Normal file
352
ui_test_report.json
Normal file
|
|
@ -0,0 +1,352 @@
|
||||||
|
{
|
||||||
|
"timestamp": "2025-06-02T20:18:49.451773",
|
||||||
|
"total_tests": 57,
|
||||||
|
"passed_tests": 6,
|
||||||
|
"failed_tests": 51,
|
||||||
|
"results": [
|
||||||
|
{
|
||||||
|
"test_name": "Register sysadmin",
|
||||||
|
"success": true,
|
||||||
|
"message": "Successfully registered sysadmin user",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Register admin",
|
||||||
|
"success": true,
|
||||||
|
"message": "Successfully registered admin user",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Register user",
|
||||||
|
"success": true,
|
||||||
|
"message": "Successfully registered user user",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Login sysadmin",
|
||||||
|
"success": true,
|
||||||
|
"message": "Successfully logged in as sysadmin",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Logout sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to logout as sysadmin",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Login admin",
|
||||||
|
"success": true,
|
||||||
|
"message": "Successfully logged in as admin",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Logout admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to logout as admin",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Login user",
|
||||||
|
"success": true,
|
||||||
|
"message": "Successfully logged in as user",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Logout user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to logout as user",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Files as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view files: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create File as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create file: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify File as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No files available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete File as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No files available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Files as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view files: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create File as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create file: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify File as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No files available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete File as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No files available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Files as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view files: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create File as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create file: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify File as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "No files available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete File as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "No files available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Mandates as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view mandates: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create Mandate as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create mandate: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify Mandate as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No mandates available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete Mandate as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No mandates available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Mandates as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view mandates: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create Mandate as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create mandate: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify Mandate as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No mandates available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete Mandate as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No mandates available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Mandates as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view mandates: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create Mandate as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create mandate: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify Mandate as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "No mandates available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete Mandate as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "No mandates available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Prompts as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view prompts: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create Prompt as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create prompt: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify Prompt as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No prompts available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete Prompt as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No prompts available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Prompts as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view prompts: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create Prompt as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create prompt: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify Prompt as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No prompts available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete Prompt as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No prompts available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Prompts as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view prompts: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create Prompt as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create prompt: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify Prompt as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "No prompts available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete Prompt as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "No prompts available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Users as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view users: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create User as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create user: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify User as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No users available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete User as sysadmin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No users available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Users as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view users: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create User as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create user: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify User as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No users available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete User as admin",
|
||||||
|
"success": false,
|
||||||
|
"message": "No users available to delete",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "View Users as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to view users: 405",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Create User as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "Failed to create user: 401",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Modify User as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "No users available to modify",
|
||||||
|
"details": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test_name": "Delete User as user",
|
||||||
|
"success": false,
|
||||||
|
"message": "No users available to delete",
|
||||||
|
"details": null
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"bugs_found": [],
|
||||||
|
"required_adaptations": []
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue