373 lines
15 KiB
Python
373 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
JSON Continuation Context Test 14 - Tests getContexts() with a specific cut JSON from debug prompts.
|
|
Reads a real AI response that was cut and analyzes the continuation contexts.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
import os
|
|
from typing import Dict, Any, Optional
|
|
|
|
# Add the gateway to path (go up 2 levels from tests/functional/)
|
|
_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
if _gateway_path not in sys.path:
|
|
sys.path.insert(0, _gateway_path)
|
|
|
|
# Import jsonContinuation
|
|
from modules.shared.jsonContinuation import getContexts
|
|
|
|
|
|
class JsonContinuationContextTester14:
|
|
def __init__(self):
|
|
self.testResults = {}
|
|
self.logBuffer = []
|
|
self.logFile = None
|
|
|
|
def _log(self, message: str):
|
|
"""Add message to log buffer."""
|
|
self.logBuffer.append(message)
|
|
print(message)
|
|
|
|
def _readDebugFile(self, fileName: str) -> Optional[str]:
|
|
"""Read a debug prompt file from local/debug/prompts/."""
|
|
try:
|
|
filePath = os.path.join(
|
|
os.path.dirname(__file__), "..", "..", "..", "local", "debug", "prompts",
|
|
fileName
|
|
)
|
|
with open(filePath, 'r', encoding='utf-8') as f:
|
|
return f.read()
|
|
except Exception as e:
|
|
self._log(f"Error reading file {fileName}: {e}")
|
|
return None
|
|
|
|
def _extractJsonFromResponse(self, content: str) -> str:
|
|
"""Extract JSON from response content (remove markdown code fences if present)."""
|
|
jsonContent = content.strip()
|
|
|
|
# Remove markdown code block markers
|
|
if jsonContent.startswith('```json'):
|
|
jsonContent = jsonContent[7:]
|
|
elif jsonContent.startswith('```'):
|
|
jsonContent = jsonContent[3:]
|
|
|
|
jsonContent = jsonContent.strip()
|
|
|
|
if jsonContent.endswith('```'):
|
|
jsonContent = jsonContent[:-3]
|
|
|
|
return jsonContent.strip()
|
|
|
|
async def testSpecificCutJson(self, fileName: str) -> Dict[str, Any]:
|
|
"""Test getContexts() with a specific cut JSON file."""
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log(f"TESTING CUT JSON FROM: {fileName}")
|
|
self._log("=" * 80)
|
|
|
|
# Read the file
|
|
content = self._readDebugFile(fileName)
|
|
if content is None:
|
|
return {"success": False, "error": f"Could not read file: {fileName}"}
|
|
|
|
# Extract JSON
|
|
jsonContent = self._extractJsonFromResponse(content)
|
|
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log("INPUT JSON (CUT)")
|
|
self._log("=" * 80)
|
|
self._log(f"Total length: {len(jsonContent)} characters")
|
|
self._log("")
|
|
|
|
# Show first and last parts
|
|
lines = jsonContent.split('\n')
|
|
if len(lines) > 40:
|
|
self._log("First 20 lines:")
|
|
for line in lines[:20]:
|
|
self._log(f" {line}")
|
|
self._log(f" ... ({len(lines) - 40} lines omitted) ...")
|
|
self._log("Last 20 lines:")
|
|
for line in lines[-20:]:
|
|
self._log(f" {line}")
|
|
else:
|
|
for line in lines:
|
|
self._log(f" {line}")
|
|
|
|
# Call getContexts()
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log("CALLING getContexts()")
|
|
self._log("=" * 80)
|
|
|
|
try:
|
|
contexts = getContexts(jsonContent)
|
|
except Exception as e:
|
|
self._log(f"ERROR calling getContexts(): {e}")
|
|
import traceback
|
|
self._log(traceback.format_exc())
|
|
return {"success": False, "error": str(e)}
|
|
|
|
# Log results
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log("RESULTS FROM getContexts()")
|
|
self._log("=" * 80)
|
|
|
|
# jsonParsingSuccess
|
|
self._log("")
|
|
self._log(f"jsonParsingSuccess: {contexts.jsonParsingSuccess}")
|
|
|
|
# overlapContext
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log("overlapContext:")
|
|
self._log("=" * 80)
|
|
self._log(f"Length: {len(contexts.overlapContext)} characters")
|
|
if contexts.overlapContext == "":
|
|
self._log(" (empty - JSON is complete, no cut point)")
|
|
else:
|
|
overlapLines = contexts.overlapContext.split('\n')
|
|
if len(overlapLines) > 20:
|
|
for line in overlapLines[:10]:
|
|
self._log(f" {line}")
|
|
self._log(f" ... ({len(overlapLines) - 20} lines omitted) ...")
|
|
for line in overlapLines[-10:]:
|
|
self._log(f" {line}")
|
|
else:
|
|
for line in overlapLines:
|
|
self._log(f" {line}")
|
|
|
|
# hierarchyContext
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log("hierarchyContext (for merging - should be exact input JSON):")
|
|
self._log("=" * 80)
|
|
self._log(f"Length: {len(contexts.hierarchyContext)} characters")
|
|
|
|
# Verify hierarchyContext equals input
|
|
if contexts.hierarchyContext == jsonContent:
|
|
self._log(" ✅ hierarchyContext == input JSON (CORRECT)")
|
|
else:
|
|
self._log(" ❌ hierarchyContext != input JSON (BUG!)")
|
|
self._log(f" Input length: {len(jsonContent)}, hierarchyContext length: {len(contexts.hierarchyContext)}")
|
|
# Show difference at the end
|
|
if len(contexts.hierarchyContext) > 0 and len(jsonContent) > 0:
|
|
minLen = min(len(contexts.hierarchyContext), len(jsonContent))
|
|
for i in range(minLen):
|
|
if contexts.hierarchyContext[i] != jsonContent[i]:
|
|
self._log(f" First difference at position {i}")
|
|
self._log(f" Input: ...{repr(jsonContent[max(0,i-20):i+20])}...")
|
|
self._log(f" Hierarchy: ...{repr(contexts.hierarchyContext[max(0,i-20):i+20])}...")
|
|
break
|
|
|
|
# hierarchyContextForPrompt
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log("hierarchyContextForPrompt (for AI prompt with budget/placeholders):")
|
|
self._log("=" * 80)
|
|
self._log(f"Length: {len(contexts.hierarchyContextForPrompt)} characters")
|
|
hierarchyPromptLines = contexts.hierarchyContextForPrompt.split('\n')
|
|
if len(hierarchyPromptLines) > 40:
|
|
for line in hierarchyPromptLines[:20]:
|
|
self._log(f" {line}")
|
|
self._log(f" ... ({len(hierarchyPromptLines) - 40} lines omitted) ...")
|
|
for line in hierarchyPromptLines[-20:]:
|
|
self._log(f" {line}")
|
|
else:
|
|
for line in hierarchyPromptLines:
|
|
self._log(f" {line}")
|
|
|
|
# completePart
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log("completePart (closed JSON for parsing):")
|
|
self._log("=" * 80)
|
|
self._log(f"Length: {len(contexts.completePart)} characters")
|
|
|
|
# Try to parse completePart
|
|
try:
|
|
parsed = json.loads(contexts.completePart)
|
|
self._log(" ✅ completePart is valid JSON")
|
|
self._log(f" Parsed type: {type(parsed).__name__}")
|
|
if isinstance(parsed, dict):
|
|
self._log(f" Keys: {list(parsed.keys())}")
|
|
elif isinstance(parsed, list):
|
|
self._log(f" List length: {len(parsed)}")
|
|
except json.JSONDecodeError as e:
|
|
self._log(f" ❌ completePart is NOT valid JSON: {e}")
|
|
|
|
completeLines = contexts.completePart.split('\n')
|
|
if len(completeLines) > 40:
|
|
self._log("")
|
|
self._log("First 20 lines:")
|
|
for line in completeLines[:20]:
|
|
self._log(f" {line}")
|
|
self._log(f" ... ({len(completeLines) - 40} lines omitted) ...")
|
|
self._log("Last 20 lines:")
|
|
for line in completeLines[-20:]:
|
|
self._log(f" {line}")
|
|
else:
|
|
for line in completeLines:
|
|
self._log(f" {line}")
|
|
|
|
# Summary
|
|
self._log("")
|
|
self._log("=" * 80)
|
|
self._log("SUMMARY")
|
|
self._log("=" * 80)
|
|
self._log(f" Input JSON length: {len(jsonContent)} chars")
|
|
self._log(f" jsonParsingSuccess: {contexts.jsonParsingSuccess}")
|
|
self._log(f" overlapContext length: {len(contexts.overlapContext)} chars")
|
|
self._log(f" overlapContext empty: {contexts.overlapContext == ''}")
|
|
self._log(f" hierarchyContext length: {len(contexts.hierarchyContext)} chars")
|
|
self._log(f" hierarchyContext == input: {contexts.hierarchyContext == jsonContent}")
|
|
self._log(f" hierarchyContextForPrompt length: {len(contexts.hierarchyContextForPrompt)} chars")
|
|
self._log(f" completePart length: {len(contexts.completePart)} chars")
|
|
|
|
return {
|
|
"success": True,
|
|
"fileName": fileName,
|
|
"inputLength": len(jsonContent),
|
|
"jsonParsingSuccess": contexts.jsonParsingSuccess,
|
|
"overlapContextLength": len(contexts.overlapContext),
|
|
"overlapContextEmpty": contexts.overlapContext == "",
|
|
"hierarchyContextLength": len(contexts.hierarchyContext),
|
|
"hierarchyContextEqualsInput": contexts.hierarchyContext == jsonContent,
|
|
"hierarchyContextForPromptLength": len(contexts.hierarchyContextForPrompt),
|
|
"completePartLength": len(contexts.completePart),
|
|
"contexts": {
|
|
"overlapContext": contexts.overlapContext,
|
|
"hierarchyContext": contexts.hierarchyContext[:500] + "..." if len(contexts.hierarchyContext) > 500 else contexts.hierarchyContext,
|
|
"hierarchyContextForPrompt": contexts.hierarchyContextForPrompt[:500] + "..." if len(contexts.hierarchyContextForPrompt) > 500 else contexts.hierarchyContextForPrompt,
|
|
"completePart": contexts.completePart[:500] + "..." if len(contexts.completePart) > 500 else contexts.completePart,
|
|
}
|
|
}
|
|
|
|
def _writeLogFile(self):
|
|
"""Write log buffer to file."""
|
|
logDir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "local", "debug")
|
|
os.makedirs(logDir, exist_ok=True)
|
|
logFilePath = os.path.join(logDir, "test14_json_continuation_context_results.txt")
|
|
|
|
with open(logFilePath, 'w', encoding='utf-8') as f:
|
|
f.write('\n'.join(self.logBuffer))
|
|
|
|
self.logFile = logFilePath
|
|
print(f"\n📝 Detailed log written to: {logFilePath}")
|
|
|
|
async def runTest(self):
|
|
"""Run the complete test."""
|
|
self._log("=" * 80)
|
|
self._log("JSON CONTINUATION CONTEXT TEST 14")
|
|
self._log("=" * 80)
|
|
self._log("Testing getContexts() with specific cut JSON from debug prompts")
|
|
|
|
results = {}
|
|
|
|
# Test files to analyze
|
|
testFiles = [
|
|
# The first AI response (iteration 1) - this is the cut JSON
|
|
"20260106-173342-020-chapter_1_section_section_2_response.txt",
|
|
]
|
|
|
|
# Also try to find today's response files dynamically
|
|
debugDir = os.path.join(
|
|
os.path.dirname(__file__), "..", "..", "..", "local", "debug", "prompts"
|
|
)
|
|
if os.path.exists(debugDir):
|
|
for fileName in os.listdir(debugDir):
|
|
if "section_2_response" in fileName and fileName.endswith(".txt"):
|
|
if fileName not in testFiles:
|
|
testFiles.append(fileName)
|
|
|
|
# Limit to first 3 files
|
|
testFiles = testFiles[:3]
|
|
|
|
for fileName in testFiles:
|
|
try:
|
|
result = await self.testSpecificCutJson(fileName)
|
|
results[fileName] = result
|
|
except Exception as e:
|
|
import traceback
|
|
self._log(f"\n❌ Error testing {fileName}: {str(e)}")
|
|
self._log(traceback.format_exc())
|
|
results[fileName] = {
|
|
"success": False,
|
|
"error": str(e),
|
|
"traceback": traceback.format_exc()
|
|
}
|
|
|
|
# Write log file
|
|
self._writeLogFile()
|
|
|
|
# Summary
|
|
print("\n" + "=" * 80)
|
|
print("TEST SUMMARY")
|
|
print("=" * 80)
|
|
|
|
successCount = 0
|
|
for fileName, result in results.items():
|
|
if result.get("success"):
|
|
successCount += 1
|
|
hierarchyMatch = result.get("hierarchyContextEqualsInput", False)
|
|
overlapEmpty = result.get("overlapContextEmpty", False)
|
|
jsonSuccess = result.get("jsonParsingSuccess", False)
|
|
|
|
status = "✅" if hierarchyMatch else "⚠️"
|
|
print(f"{status} {fileName}")
|
|
print(f" hierarchyContext == input: {hierarchyMatch}")
|
|
print(f" overlapContext empty: {overlapEmpty}")
|
|
print(f" jsonParsingSuccess: {jsonSuccess}")
|
|
else:
|
|
print(f"❌ {fileName}: {result.get('error', 'Unknown error')}")
|
|
|
|
print(f"\nResults: {successCount}/{len(results)} successful")
|
|
|
|
self.testResults = {
|
|
"success": successCount == len(results),
|
|
"totalFiles": len(results),
|
|
"successCount": successCount,
|
|
"results": results
|
|
}
|
|
|
|
return self.testResults
|
|
|
|
|
|
async def main():
|
|
"""Run JSON continuation context test 14."""
|
|
tester = JsonContinuationContextTester14()
|
|
results = await tester.runTest()
|
|
|
|
# Print final results as JSON for easy parsing
|
|
print("\n" + "=" * 80)
|
|
print("FINAL RESULTS (JSON)")
|
|
print("=" * 80)
|
|
|
|
# Create a simplified version for printing (contexts are too large)
|
|
printableResults = {
|
|
"success": results.get("success"),
|
|
"totalFiles": results.get("totalFiles"),
|
|
"successCount": results.get("successCount"),
|
|
"files": {}
|
|
}
|
|
for fileName, result in results.get("results", {}).items():
|
|
printableResults["files"][fileName] = {
|
|
"success": result.get("success"),
|
|
"inputLength": result.get("inputLength"),
|
|
"jsonParsingSuccess": result.get("jsonParsingSuccess"),
|
|
"overlapContextLength": result.get("overlapContextLength"),
|
|
"overlapContextEmpty": result.get("overlapContextEmpty"),
|
|
"hierarchyContextEqualsInput": result.get("hierarchyContextEqualsInput"),
|
|
"completePartLength": result.get("completePartLength"),
|
|
}
|
|
|
|
print(json.dumps(printableResults, indent=2, default=str))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|