gateway/tests/unit/services/test_json_extraction_merging.py
2026-01-22 17:00:29 +01:00

386 lines
14 KiB
Python

#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Test script for JSON extraction response detection and merging.
Run: python gateway/tests/unit/services/test_json_extraction_merging.py
"""
import json
import sys
import os
# Add gateway to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
from modules.datamodels.datamodelExtraction import ContentPart
from modules.features.aichat.serviceExtraction.mainServiceExtraction import ExtractionService
def test_detects_json_with_code_fences():
"""Test that JSON extraction responses with markdown code fences are detected"""
print("Test 1: Detecting JSON with code fences...")
service = ExtractionService(None)
content_part = ContentPart(
id="test1",
label="test1",
typeGroup="text",
mimeType="text/plain",
data='```json\n{"extracted_content": {"text": "Sample text", "tables": []}}\n```'
)
result = service._isJsonExtractionResponse([content_part])
assert result == True, "Should detect JSON with code fences"
print(" [PASS]")
def test_detects_json_without_code_fences():
"""Test that JSON extraction responses without code fences are detected"""
print("Test 2: Detecting JSON without code fences...")
service = ExtractionService(None)
content_part = ContentPart(
id="test2",
label="test2",
typeGroup="text",
mimeType="text/plain",
data='{"extracted_content": {"text": "Sample text", "tables": []}}'
)
result = service._isJsonExtractionResponse([content_part])
assert result == True, "Should detect JSON without code fences"
print(" [PASS]")
def test_rejects_non_extraction_json():
"""Test that regular JSON (without extracted_content) is rejected"""
print("Test 3: Rejecting non-extraction JSON...")
service = ExtractionService(None)
content_part = ContentPart(
id="test3",
label="test3",
typeGroup="text",
mimeType="text/plain",
data='{"documents": [{"sections": []}]}'
)
result = service._isJsonExtractionResponse([content_part])
assert result == False, "Should reject non-extraction JSON"
print(" [PASS]")
def test_rejects_non_json_content():
"""Test that non-JSON content is rejected"""
print("Test 4: Rejecting non-JSON content...")
service = ExtractionService(None)
content_part = ContentPart(
id="test4",
label="test4",
typeGroup="text",
mimeType="text/plain",
data="This is plain text, not JSON"
)
result = service._isJsonExtractionResponse([content_part])
assert result == False, "Should reject non-JSON content"
print(" [PASS]")
def test_merges_tables_with_same_headers():
"""Test that tables with identical headers are merged"""
print("Test 5: Merging tables with same headers...")
service = ExtractionService(None)
part1 = ContentPart(
id="test1",
label="test1",
typeGroup="table",
mimeType="application/json",
data='```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Amount"], "rows": [["Alice", "100"], ["Bob", "200"]]}]}}\n```'
)
part2 = ContentPart(
id="test2",
label="test2",
typeGroup="table",
mimeType="application/json",
data='```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Amount"], "rows": [["Charlie", "300"], ["Alice", "100"]]}]}}\n```'
)
merged = service._mergeJsonExtractionResponses([part1, part2])
# Should have one table group with merged rows
assert len(merged["extracted_content"]["tables"]) == 1, f"Should have one merged table, got {len(merged['extracted_content']['tables'])}"
table = merged["extracted_content"]["tables"][0]
assert table["headers"] == ["Name", "Amount"], f"Headers should match, got {table['headers']}"
# Should have 3 unique rows (Alice appears twice but should be deduplicated)
assert len(table["rows"]) == 3, f"Should have 3 unique rows, got {len(table['rows'])}"
assert ["Alice", "100"] in table["rows"], "Alice row should be present"
assert ["Bob", "200"] in table["rows"], "Bob row should be present"
assert ["Charlie", "300"] in table["rows"], "Charlie row should be present"
print(" [PASS]")
def test_merges_multiple_json_blocks_separated_by_dash():
"""Test that multiple JSON blocks separated by --- are merged"""
print("Test 6: Merging multiple JSON blocks separated by ---...")
service = ExtractionService(None)
# Create content part with multiple JSON blocks separated by ---
part1 = ContentPart(
id="test1",
label="test1",
typeGroup="table",
mimeType="application/json",
data='```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Amount"], "rows": [["Alice", "100"]]}]}}\n```\n---\n```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Amount"], "rows": [["Bob", "200"]]}]}}\n```'
)
merged = service._mergeJsonExtractionResponses([part1])
# Should have one table with merged rows from both JSON blocks
assert len(merged["extracted_content"]["tables"]) == 1, f"Should have one merged table, got {len(merged['extracted_content']['tables'])}"
table = merged["extracted_content"]["tables"][0]
assert table["headers"] == ["Name", "Amount"], f"Headers should match, got {table['headers']}"
assert len(table["rows"]) == 2, f"Should have 2 rows, got {len(table['rows'])}"
assert ["Alice", "100"] in table["rows"], "Alice row should be present"
assert ["Bob", "200"] in table["rows"], "Bob row should be present"
print(" [PASS]")
def test_merges_text_content():
"""Test that text content from multiple parts is merged"""
print("Test 7: Merging text content...")
service = ExtractionService(None)
part1 = ContentPart(
id="test1",
label="test1",
typeGroup="text",
mimeType="text/plain",
data='```json\n{"extracted_content": {"text": "First paragraph."}}\n```'
)
part2 = ContentPart(
id="test2",
label="test2",
typeGroup="text",
mimeType="text/plain",
data='```json\n{"extracted_content": {"text": "Second paragraph."}}\n```'
)
merged = service._mergeJsonExtractionResponses([part1, part2])
# Text should be concatenated with newlines
text = merged["extracted_content"]["text"]
assert "First paragraph." in text, "First paragraph should be present"
assert "Second paragraph." in text, "Second paragraph should be present"
print(" [PASS]")
def test_merges_headings_and_lists():
"""Test that headings and lists are merged"""
print("Test 8: Merging headings and lists...")
service = ExtractionService(None)
part1 = ContentPart(
id="test1",
label="test1",
typeGroup="text",
mimeType="text/plain",
data='```json\n{"extracted_content": {"headings": [{"level": 1, "text": "Title 1"}], "lists": [{"type": "bullet", "items": ["Item 1"]}]}}\n```'
)
part2 = ContentPart(
id="test2",
label="test2",
typeGroup="text",
mimeType="text/plain",
data='```json\n{"extracted_content": {"headings": [{"level": 2, "text": "Subtitle 1"}], "lists": [{"type": "bullet", "items": ["Item 2"]}]}}\n```'
)
merged = service._mergeJsonExtractionResponses([part1, part2])
# Should have 2 headings
assert len(merged["extracted_content"]["headings"]) == 2, f"Should have 2 headings, got {len(merged['extracted_content']['headings'])}"
assert merged["extracted_content"]["headings"][0]["text"] == "Title 1", "First heading should be Title 1"
assert merged["extracted_content"]["headings"][1]["text"] == "Subtitle 1", "Second heading should be Subtitle 1"
# Should have 2 lists
assert len(merged["extracted_content"]["lists"]) == 2, f"Should have 2 lists, got {len(merged['extracted_content']['lists'])}"
assert merged["extracted_content"]["lists"][0]["items"] == ["Item 1"], "First list should have Item 1"
assert merged["extracted_content"]["lists"][1]["items"] == ["Item 2"], "Second list should have Item 2"
print(" [PASS]")
def test_handles_empty_content_parts():
"""Test that empty content parts are handled gracefully"""
print("Test 9: Handling empty content parts...")
service = ExtractionService(None)
part1 = ContentPart(
id="test1",
label="test1",
typeGroup="text",
mimeType="text/plain",
data='```json\n{"extracted_content": {"text": "Some text"}}\n```'
)
part2 = ContentPart(
id="test2",
label="test2",
typeGroup="text",
mimeType="text/plain",
data="" # Empty part
)
merged = service._mergeJsonExtractionResponses([part1, part2])
# Should still have the text from part1
assert merged["extracted_content"]["text"] == "Some text", "Should have text from part1"
print(" [PASS]")
def test_merges_tables_with_different_headers():
"""Test that tables with different headers are kept separate"""
print("Test 10: Keeping tables with different headers separate...")
service = ExtractionService(None)
part1 = ContentPart(
id="test1",
label="test1",
typeGroup="table",
mimeType="application/json",
data='```json\n{"extracted_content": {"tables": [{"headers": ["Date", "Amount"], "rows": [["2024-01-01", "100"]]}]}}\n```'
)
part2 = ContentPart(
id="test2",
label="test2",
typeGroup="table",
mimeType="application/json",
data='```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Email"], "rows": [["Alice", "alice@example.com"]]}]}}\n```'
)
merged = service._mergeJsonExtractionResponses([part1, part2])
# Should have 2 separate tables (different headers)
assert len(merged["extracted_content"]["tables"]) == 2, f"Should have 2 separate tables, got {len(merged['extracted_content']['tables'])}"
# Check first table
table1 = merged["extracted_content"]["tables"][0]
assert table1["headers"] == ["Date", "Amount"], "First table should have Date/Amount headers"
assert len(table1["rows"]) == 1, "First table should have 1 row"
# Check second table
table2 = merged["extracted_content"]["tables"][1]
assert table2["headers"] == ["Name", "Email"], "Second table should have Name/Email headers"
assert len(table2["rows"]) == 1, "Second table should have 1 row"
print(" [PASS]")
def test_real_world_scenario():
"""Test with a realistic scenario similar to the debug file"""
print("Test 11: Real-world scenario (multiple documents, multiple JSON blocks)...")
service = ExtractionService(None)
# Simulate 3 documents, each with a table extraction response
part1 = ContentPart(
id="doc1",
label="doc1",
typeGroup="table",
mimeType="application/json",
data='```json\n{"extracted_content": {"tables": [{"headers": ["Transaction ID", "Date", "Amount"], "rows": [["TXN001", "2024-01-01", "100.00"], ["TXN002", "2024-01-02", "200.00"]]}]}}\n```'
)
part2 = ContentPart(
id="doc2",
label="doc2",
typeGroup="table",
mimeType="application/json",
data='```json\n{"extracted_content": {"tables": [{"headers": ["Transaction ID", "Date", "Amount"], "rows": [["TXN003", "2024-01-03", "300.00"], ["TXN001", "2024-01-01", "100.00"]]}]}}\n```'
)
# Part 3 has multiple JSON blocks separated by ---
part3 = ContentPart(
id="doc3",
label="doc3",
typeGroup="table",
mimeType="application/json",
data='```json\n{"extracted_content": {"tables": [{"headers": ["Transaction ID", "Date", "Amount"], "rows": [["TXN004", "2024-01-04", "400.00"]]}]}}\n```\n---\n```json\n{"extracted_content": {"tables": [{"headers": ["Transaction ID", "Date", "Amount"], "rows": [["TXN005", "2024-01-05", "500.00"]]}]}}\n```'
)
merged = service._mergeJsonExtractionResponses([part1, part2, part3])
# Should have one merged table with all unique transactions
assert len(merged["extracted_content"]["tables"]) == 1, f"Should have one merged table, got {len(merged['extracted_content']['tables'])}"
table = merged["extracted_content"]["tables"][0]
assert table["headers"] == ["Transaction ID", "Date", "Amount"], "Headers should match"
# Should have 5 unique rows (TXN001 appears twice but should be deduplicated)
assert len(table["rows"]) == 5, f"Should have 5 unique rows, got {len(table['rows'])}"
# Verify all transactions are present
transaction_ids = [row[0] for row in table["rows"]]
assert "TXN001" in transaction_ids, "TXN001 should be present"
assert "TXN002" in transaction_ids, "TXN002 should be present"
assert "TXN003" in transaction_ids, "TXN003 should be present"
assert "TXN004" in transaction_ids, "TXN004 should be present"
assert "TXN005" in transaction_ids, "TXN005 should be present"
# Verify TXN001 appears only once (deduplicated)
assert transaction_ids.count("TXN001") == 1, "TXN001 should appear only once (deduplicated)"
print(" [PASS]")
def main():
"""Run all tests"""
print("=" * 60)
print("Testing JSON Extraction Response Detection and Merging")
print("=" * 60)
print()
tests = [
test_detects_json_with_code_fences,
test_detects_json_without_code_fences,
test_rejects_non_extraction_json,
test_rejects_non_json_content,
test_merges_tables_with_same_headers,
test_merges_multiple_json_blocks_separated_by_dash,
test_merges_text_content,
test_merges_headings_and_lists,
test_handles_empty_content_parts,
test_merges_tables_with_different_headers,
test_real_world_scenario,
]
passed = 0
failed = 0
for test in tests:
try:
test()
passed += 1
except AssertionError as e:
print(f" [FAIL] {e}")
failed += 1
except Exception as e:
print(f" [ERROR] {e}")
import traceback
traceback.print_exc()
failed += 1
print()
print("=" * 60)
print(f"Results: {passed} passed, {failed} failed")
print("=" * 60)
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())