#!/usr/bin/env python3 """ Test JSON Extraction from Incomplete/Broken JSON Tests the extraction of lastItemObject and cutItemObject from incomplete JSON responses """ import asyncio import json import sys import os import shutil from typing import Dict, Any, List # Add the gateway to path _gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if _gateway_path not in sys.path: sys.path.insert(0, _gateway_path) from modules.shared.jsonUtils import buildContinuationContext, extractSectionsFromDocument from modules.shared.debugLogger import _getBaseDebugDir class JsonExtractionTester: def __init__(self): self.testResults = {} def cleanupDebugFiles(self): """Delete debug folder and current log file before test run.""" try: # Get debug directory path debug_dir = _getBaseDebugDir() # Delete debug folder if it exists if os.path.exists(debug_dir): print(f"Cleaning up debug folder: {debug_dir}") shutil.rmtree(debug_dir) print(f" [OK] Debug folder deleted") # Also check for log file in the log directory from modules.shared.debugLogger import _resolveLogDir log_dir = _resolveLogDir() log_file = os.path.join(log_dir, "debug_workflow.log") if os.path.exists(log_file): print(f"Cleaning up log file: {log_file}") os.remove(log_file) print(f" [OK] Log file deleted") except Exception as e: print(f" [WARN] Error during cleanup: {e}") def createIncompleteTableJson(self) -> tuple[str, str]: """Create incomplete JSON with table that ends mid-row.""" complete_json = """{ "metadata": { "split_strategy": "single_document", "source_documents": [], "extraction_method": "ai_generation" }, "documents": [ { "id": "doc_1", "title": "First 4000 Prime Numbers", "filename": "prime_numbers_4000.csv", "sections": [ { "id": "section_primes_csv", "content_type": "table", "elements": [ { "headers": [], "rows": [ ["2", "3", "5", "7", "11", "13", "17", "19", "23", "29"], ["31", "37", "41", "43", "47", "53", "59", "61", "67", "71"], ["73", "79", "83", "89", "97", "101", "103", "107", "109", "113"], ["16871", "16879", "16883", "16889", "16901", "16903", "16921", "16927", "16931", "16937"] ], "caption": "" } ], "order": 0 } ] } ] }""" # Incomplete JSON - cuts off mid-row (CRITICAL: must not end with } or ]) # Remove all closing brackets and add incomplete row incomplete_json = complete_json.rstrip().rstrip('}').rstrip(']').rstrip('}').rstrip(']').rstrip('}') + ',\n ["16943", "16963", "16979", "16981", "16987", "16' return complete_json, incomplete_json def createIncompleteCodeBlockJson(self) -> tuple[str, str]: """Create incomplete JSON with code_block that ends mid-line.""" complete_json = """{ "metadata": { "split_strategy": "single_document", "source_documents": [], "extraction_method": "ai_generation" }, "documents": [ { "id": "doc_1", "title": "Prime Numbers CSV", "filename": "prime_numbers.csv", "sections": [ { "id": "section_primes_csv", "content_type": "code_block", "elements": [ { "code": "2,3,5,7,11,13,17,19,23,29\\n31,37,41,43,47,53,59,61,67,71\\n73,79,83,89,97,101,103,107,109,113\\n127,131,137,139,149,151,157,163,167,173\\n23773,23789,23801,23813,23819,23827,23831,23833,23857,23869", "language": "csv" } ], "order": 0 } ] } ] }""" # Incomplete JSON - cuts off mid-line (CRITICAL: must not end with } or ]) # Remove all closing brackets and add incomplete line incomplete_json = complete_json.rstrip().rstrip('}').rstrip(']').rstrip('}').rstrip(']').rstrip('}') + '\\n23873' return complete_json, incomplete_json def createIncompleteListJson(self) -> tuple[str, str]: """Create incomplete JSON with list that ends mid-item.""" complete_json = """{ "metadata": { "split_strategy": "single_document", "source_documents": [], "extraction_method": "ai_generation" }, "documents": [ { "id": "doc_1", "title": "Prime Numbers List", "filename": "prime_numbers.txt", "sections": [ { "id": "section_primes_list", "content_type": "bullet_list", "elements": [ { "items": ["2", "3", "5", "7", "11", "13", "17", "19", "23", "29"] } ], "order": 0 } ] } ] }""" # Incomplete JSON - cuts off mid-item (CRITICAL: must not end with } or ]) # Remove all closing brackets and add incomplete item incomplete_json = complete_json.rstrip().rstrip('}').rstrip(']').rstrip('}').rstrip(']').rstrip('}') + ',\n "31"' return complete_json, incomplete_json def testTableExtraction(self): """Test extraction from incomplete table JSON.""" print("\n" + "="*80) print("TEST 1: Table Extraction (incomplete row)") print("="*80) complete_json, incomplete_json = self.createIncompleteTableJson() # Parse complete JSON to get allSections complete_obj = json.loads(complete_json) allSections = extractSectionsFromDocument(complete_obj) print(f"Complete JSON sections: {len(allSections)}") print(f"Last section content_type: {allSections[0].get('content_type') if allSections else 'None'}") # Debug: Check what extractFirstBalancedJson returns from modules.shared.jsonUtils import extractFirstBalancedJson, stripCodeFences raw_json = stripCodeFences(incomplete_json.strip()) balanced_json = extractFirstBalancedJson(raw_json) balanced_length = len(balanced_json) cut_part = raw_json[balanced_length:].strip() print(f"\nDebug Info:") print(f" raw_json length: {len(raw_json)}") print(f" balanced_json length: {balanced_length}") print(f" cut_part length: {len(cut_part)}") print(f" cut_part content: {repr(cut_part[:200]) if cut_part else '(empty)'}") # Build continuation context continuationContext = buildContinuationContext(allSections, incomplete_json) print(f"\nExtraction Results:") print(f" content_type_for_items: {continuationContext.get('content_type_for_items')}") print(f" last_item_object: {continuationContext.get('last_item_object')}") print(f" cut_item_object: {continuationContext.get('cut_item_object')}") print(f" total_items_count: {continuationContext.get('total_items_count')}") # Validate results lastItem = continuationContext.get('last_item_object') cutItem = continuationContext.get('cut_item_object') contentType = continuationContext.get('content_type_for_items') success = True if contentType != "table": print(f" [FAIL] Expected content_type 'table', got '{contentType}'") success = False if not lastItem: print(f" [FAIL] last_item_object is empty") success = False if not cutItem: print(f" [FAIL] cut_item_object is empty") success = False if success: print(f" [PASS] All extractions successful") self.testResults['table'] = success return success def testCodeBlockExtraction(self): """Test extraction from incomplete code_block JSON.""" print("\n" + "="*80) print("TEST 2: Code Block Extraction (incomplete line)") print("="*80) complete_json, incomplete_json = self.createIncompleteCodeBlockJson() # Parse complete JSON to get allSections complete_obj = json.loads(complete_json) allSections = extractSectionsFromDocument(complete_obj) print(f"Complete JSON sections: {len(allSections)}") print(f"Last section content_type: {allSections[0].get('content_type') if allSections else 'None'}") # Debug: Check what extractFirstBalancedJson returns from modules.shared.jsonUtils import extractFirstBalancedJson, stripCodeFences raw_json = stripCodeFences(incomplete_json.strip()) balanced_json = extractFirstBalancedJson(raw_json) balanced_length = len(balanced_json) cut_part = raw_json[balanced_length:].strip() print(f"\nDebug Info:") print(f" raw_json length: {len(raw_json)}") print(f" balanced_json length: {balanced_length}") print(f" cut_part length: {len(cut_part)}") print(f" cut_part content: {repr(cut_part[:200]) if cut_part else '(empty)'}") # Build continuation context continuationContext = buildContinuationContext(allSections, incomplete_json) print(f"\nExtraction Results:") print(f" content_type_for_items: {continuationContext.get('content_type_for_items')}") print(f" last_item_object: {continuationContext.get('last_item_object')}") print(f" cut_item_object: {continuationContext.get('cut_item_object')}") print(f" total_items_count: {continuationContext.get('total_items_count')}") # Validate results lastItem = continuationContext.get('last_item_object') cutItem = continuationContext.get('cut_item_object') contentType = continuationContext.get('content_type_for_items') success = True if contentType != "code_block": print(f" [FAIL] Expected content_type 'code_block', got '{contentType}'") success = False if not lastItem: print(f" [FAIL] last_item_object is empty") success = False if not cutItem: print(f" [FAIL] cut_item_object is empty") success = False if success: print(f" [PASS] All extractions successful") self.testResults['code_block'] = success return success def testListExtraction(self): """Test extraction from incomplete list JSON.""" print("\n" + "="*80) print("TEST 3: List Extraction (incomplete item)") print("="*80) complete_json, incomplete_json = self.createIncompleteListJson() # Parse complete JSON to get allSections complete_obj = json.loads(complete_json) allSections = extractSectionsFromDocument(complete_obj) print(f"Complete JSON sections: {len(allSections)}") print(f"Last section content_type: {allSections[0].get('content_type') if allSections else 'None'}") # Debug: Check what extractFirstBalancedJson returns from modules.shared.jsonUtils import extractFirstBalancedJson, stripCodeFences raw_json = stripCodeFences(incomplete_json.strip()) balanced_json = extractFirstBalancedJson(raw_json) balanced_length = len(balanced_json) cut_part = raw_json[balanced_length:].strip() print(f"\nDebug Info:") print(f" raw_json length: {len(raw_json)}") print(f" balanced_json length: {balanced_length}") print(f" cut_part length: {len(cut_part)}") print(f" cut_part content: {repr(cut_part[:200]) if cut_part else '(empty)'}") # Build continuation context continuationContext = buildContinuationContext(allSections, incomplete_json) print(f"\nExtraction Results:") print(f" content_type_for_items: {continuationContext.get('content_type_for_items')}") print(f" last_item_object: {continuationContext.get('last_item_object')}") print(f" cut_item_object: {continuationContext.get('cut_item_object')}") print(f" total_items_count: {continuationContext.get('total_items_count')}") # Validate results lastItem = continuationContext.get('last_item_object') cutItem = continuationContext.get('cut_item_object') contentType = continuationContext.get('content_type_for_items') success = True if contentType not in ["bullet_list", "numbered_list"]: print(f" [FAIL] Expected content_type 'bullet_list' or 'numbered_list', got '{contentType}'") success = False if not lastItem: print(f" [FAIL] last_item_object is empty") success = False if not cutItem: print(f" [FAIL] cut_item_object is empty") success = False if success: print(f" [PASS] All extractions successful") self.testResults['list'] = success return success def createRealWorldTableJson(self) -> tuple[str, str]: """Create real-world incomplete JSON based on actual prompt pattern - table with many rows.""" # Last complete row (exactly as in real scenario) last_complete_row = ["16871", "16879", "16883", "16889", "16901", "16903", "16921", "16927", "16931", "16937"] complete_json = f"""{{ "metadata": {{ "split_strategy": "single_document", "source_documents": [], "extraction_method": "ai_generation" }}, "documents": [ {{ "id": "doc_1", "title": "First 4000 Prime Numbers", "filename": "prime_numbers_4000.csv", "sections": [ {{ "id": "section_primes_csv", "content_type": "table", "elements": [ {{ "headers": [], "rows": [ ["2", "3", "5", "7", "11", "13", "17", "19", "23", "29"], ["31", "37", "41", "43", "47", "53", "59", "61", "67", "71"], {json.dumps(last_complete_row)} ], "caption": "" }} ], "order": 0 }} ] }} ] }}""" # Incomplete JSON - cuts off mid-row (exactly like real scenario) # CRITICAL: Must not end with } or ] to be detected as incomplete # Find the position where rows array ends and add incomplete row before closing rows_end_pos = complete_json.rfind(']') if rows_end_pos != -1: # Insert incomplete row before the closing bracket, remove all closing brackets after incomplete_json = complete_json[:rows_end_pos] + ',\n ["16943", "16963", "16979", "16981", "16987", "16' else: # Fallback: remove all closing brackets and append incomplete_json = complete_json.rstrip().rstrip('}').rstrip(']').rstrip('}').rstrip(']').rstrip('}') + ',\n ["16943", "16963", "16979", "16981", "16987", "16' return complete_json, incomplete_json def testRealWorldTableExtraction(self): """Test extraction from real-world incomplete table JSON (like from actual prompt).""" print("\n" + "="*80) print("TEST 4: Real-World Table Extraction (400 rows scenario, incomplete row)") print("="*80) complete_json, incomplete_json = self.createRealWorldTableJson() # Parse complete JSON to get allSections complete_obj = json.loads(complete_json) allSections = extractSectionsFromDocument(complete_obj) print(f"Complete JSON sections: {len(allSections)}") if allSections: print(f"Last section content_type: {allSections[0].get('content_type')}") elements = allSections[0].get('elements', []) if elements and isinstance(elements[0], dict) and 'rows' in elements[0]: rows = elements[0].get('rows', []) print(f"Total rows in complete JSON: {len(rows)}") if rows: print(f"Last complete row: {rows[-1]}") # Test _extractSectionsRegex with incomplete JSON from modules.shared.jsonUtils import _extractSectionsRegex, repairBrokenJson print(f"\nTesting _extractSectionsRegex with incomplete JSON...") extracted_sections = _extractSectionsRegex(incomplete_json) print(f"Extracted sections: {len(extracted_sections)}") if extracted_sections: print(f"Extracted section content_type: {extracted_sections[0].get('content_type')}") # Test repairBrokenJson print(f"\nTesting repairBrokenJson...") repaired_json = repairBrokenJson(incomplete_json) if repaired_json: print(f"Repaired JSON successful") repaired_sections = extractSectionsFromDocument(repaired_json) print(f"Repaired sections: {len(repaired_sections)}") else: print(f"Repair failed") # Debug: Check what extractFirstBalancedJson returns from modules.shared.jsonUtils import extractFirstBalancedJson, stripCodeFences raw_json = stripCodeFences(incomplete_json.strip()) balanced_json = extractFirstBalancedJson(raw_json) balanced_length = len(balanced_json) cut_part = raw_json[balanced_length:].strip() print(f"\nDebug Info:") print(f" raw_json length: {len(raw_json)}") print(f" balanced_json length: {balanced_length}") print(f" cut_part length: {len(cut_part)}") print(f" cut_part content: {repr(cut_part[:200]) if cut_part else '(empty)'}") # Build continuation context continuationContext = buildContinuationContext(allSections, incomplete_json) print(f"\nExtraction Results:") print(f" content_type_for_items: {continuationContext.get('content_type_for_items')}") print(f" last_item_object: {continuationContext.get('last_item_object')}") print(f" cut_item_object: {continuationContext.get('cut_item_object')}") print(f" total_items_count: {continuationContext.get('total_items_count')}") # Validate results lastItem = continuationContext.get('last_item_object') cutItem = continuationContext.get('cut_item_object') contentType = continuationContext.get('content_type_for_items') success = True if contentType != "table": print(f" [FAIL] Expected content_type 'table', got '{contentType}'") success = False if not lastItem: print(f" [FAIL] last_item_object is empty") success = False if not cutItem: print(f" [FAIL] cut_item_object is empty") success = False if success: print(f" [PASS] All extractions successful") print(f" Last complete row: {lastItem}") print(f" Cut row: {cutItem}") self.testResults['real_world_table'] = success return success def runAllTests(self): """Run all extraction tests.""" print("\n" + "="*80) print("JSON EXTRACTION TESTS") print("Testing extraction of lastItemObject and cutItemObject from incomplete JSON") print("="*80) # Clean up debug folder and log file before starting tests print("\nCleaning up debug files...") self.cleanupDebugFiles() print("") results = [] results.append(self.testTableExtraction()) results.append(self.testCodeBlockExtraction()) results.append(self.testListExtraction()) results.append(self.testRealWorldTableExtraction()) # Summary print("\n" + "="*80) print("TEST SUMMARY") print("="*80) print(f"Table extraction: {'[PASS]' if self.testResults.get('table') else '[FAIL]'}") print(f"Code block extraction: {'[PASS]' if self.testResults.get('code_block') else '[FAIL]'}") print(f"List extraction: {'[PASS]' if self.testResults.get('list') else '[FAIL]'}") print(f"Real-world table extraction: {'[PASS]' if self.testResults.get('real_world_table') else '[FAIL]'}") allPassed = all(results) print(f"\nOverall: {'[PASS] ALL TESTS PASSED' if allPassed else '[FAIL] SOME TESTS FAILED'}") return allPassed async def main(): """Main test execution.""" tester = JsonExtractionTester() success = tester.runAllTests() return 0 if success else 1 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)