gateway/test_concurrency_fixes.py
2025-09-05 23:35:01 +02:00

237 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""
Test script to verify concurrency improvements in DatabaseConnector.
This script simulates multiple users accessing the database simultaneously.
"""
import os
import sys
import time
import threading
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import tempfile
import shutil
# Add the gateway directory to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from modules.connectors.connectorDbJson import DatabaseConnector
from modules.connectors.connectorPool import get_connector, return_connector
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def test_concurrent_record_operations():
"""Test concurrent record creation, modification, and deletion."""
# Create temporary database directory
temp_dir = tempfile.mkdtemp()
db_host = temp_dir
db_database = "test_db"
try:
logger.info("Starting concurrency test...")
def user_operation(user_id: int, operation_count: int = 10):
"""Simulate a user performing database operations."""
try:
# Get a dedicated connector for this user
db = get_connector(
dbHost=db_host,
dbDatabase=db_database,
userId=f"user_{user_id}"
)
results = []
for i in range(operation_count):
# Create a record
record = {
"id": f"user_{user_id}_record_{i}",
"data": f"User {user_id} data {i}",
"timestamp": time.time()
}
# Create record
created = db.recordCreate("test_table", record)
results.append(f"Created: {created['id']}")
# Modify record
record["data"] = f"Modified by user {user_id} - {i}"
modified = db.recordModify("test_table", record["id"], record)
results.append(f"Modified: {modified['id']}")
# Small delay to increase chance of race conditions
time.sleep(0.001)
# Return connector to pool
return_connector(db)
return results
except Exception as e:
logger.error(f"User {user_id} error: {e}")
return [f"Error: {e}"]
# Test with multiple concurrent users
num_users = 20
operations_per_user = 5
logger.info(f"Testing with {num_users} users, {operations_per_user} operations each")
start_time = time.time()
with ThreadPoolExecutor(max_workers=num_users) as executor:
# Submit all user operations
futures = [
executor.submit(user_operation, user_id, operations_per_user)
for user_id in range(num_users)
]
# Collect results
all_results = []
for future in as_completed(futures):
try:
result = future.result()
all_results.extend(result)
except Exception as e:
logger.error(f"Future error: {e}")
end_time = time.time()
# Verify data integrity
db = get_connector(dbHost=db_host, dbDatabase=db_database, userId="verifier")
# Check that all records exist and are consistent
all_records = db.getRecordset("test_table")
expected_count = num_users * operations_per_user
logger.info(f"Expected records: {expected_count}")
logger.info(f"Actual records: {len(all_records)}")
logger.info(f"Test completed in {end_time - start_time:.2f} seconds")
# Check for data consistency
record_ids = set(record["id"] for record in all_records)
expected_ids = set(f"user_{user_id}_record_{i}" for user_id in range(num_users) for i in range(operations_per_user))
missing_ids = expected_ids - record_ids
extra_ids = record_ids - expected_ids
if missing_ids:
logger.error(f"Missing records: {missing_ids}")
if extra_ids:
logger.error(f"Extra records: {extra_ids}")
# Check for data corruption (records with wrong user data)
corrupted_records = []
for record in all_records:
record_id = record["id"]
user_id = int(record_id.split("_")[1])
if f"Modified by user {user_id}" not in record.get("data", ""):
corrupted_records.append(record_id)
if corrupted_records:
logger.error(f"Corrupted records: {corrupted_records}")
success = len(missing_ids) == 0 and len(extra_ids) == 0 and len(corrupted_records) == 0
if success:
logger.info("✅ Concurrency test PASSED - No data corruption detected")
else:
logger.error("❌ Concurrency test FAILED - Data corruption detected")
return success
finally:
# Cleanup
try:
shutil.rmtree(temp_dir)
logger.info("Cleaned up temporary directory")
except Exception as e:
logger.error(f"Error cleaning up: {e}")
def test_metadata_consistency():
"""Test that metadata operations are atomic."""
temp_dir = tempfile.mkdtemp()
db_host = temp_dir
db_database = "test_metadata"
try:
logger.info("Testing metadata consistency...")
def concurrent_metadata_operations(user_id: int):
"""Perform concurrent metadata operations."""
db = get_connector(
dbHost=db_host,
dbDatabase=db_database,
userId=f"user_{user_id}"
)
try:
# Create multiple records rapidly
for i in range(10):
record = {
"id": f"user_{user_id}_meta_{i}",
"data": f"Metadata test {user_id}-{i}"
}
db.recordCreate("metadata_test", record)
time.sleep(0.001) # Small delay
return True
except Exception as e:
logger.error(f"Metadata test error for user {user_id}: {e}")
return False
finally:
return_connector(db)
# Run concurrent metadata operations
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(concurrent_metadata_operations, i) for i in range(10)]
results = [future.result() for future in as_completed(futures)]
# Verify metadata consistency
db = get_connector(dbHost=db_host, dbDatabase=db_database, userId="verifier")
records = db.getRecordset("metadata_test")
# Check that metadata is consistent
metadata = db._loadTableMetadata("metadata_test")
expected_count = len(records)
actual_count = len(metadata["recordIds"])
logger.info(f"Expected record count: {expected_count}")
logger.info(f"Metadata record count: {actual_count}")
success = expected_count == actual_count
if success:
logger.info("✅ Metadata consistency test PASSED")
else:
logger.error("❌ Metadata consistency test FAILED")
return success
finally:
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.error(f"Error cleaning up: {e}")
if __name__ == "__main__":
logger.info("Starting concurrency tests...")
# Test 1: Concurrent record operations
test1_passed = test_concurrent_record_operations()
# Test 2: Metadata consistency
test2_passed = test_metadata_consistency()
# Overall result
if test1_passed and test2_passed:
logger.info("🎉 All concurrency tests PASSED!")
sys.exit(0)
else:
logger.error("💥 Some concurrency tests FAILED!")
sys.exit(1)