initial base for coding system path

This commit is contained in:
ValueOn AG 2026-01-03 02:38:16 +01:00
parent e4b633cac6
commit fa6479f40d
18 changed files with 692 additions and 179 deletions

View file

@ -288,36 +288,54 @@ class AiCallLooper:
# Use generic data-based merging for all use cases # Use generic data-based merging for all use cases
try: try:
# Parse all accumulated JSON strings and current response # Strategy: Merge strings first for incomplete JSON, then parse and merge parsed objects
# This ensures incomplete JSON from part 1 is preserved
allJsonStrings = accumulatedDirectJson + [result]
# Step 1: Merge all JSON strings using existing overlap detection
mergedJsonString = allJsonStrings[0] if allJsonStrings else ""
for jsonStr in allJsonStrings[1:]:
mergedJsonString = JsonResponseHandler.mergeJsonStringsWithOverlap(mergedJsonString, jsonStr)
# Step 2: Try to parse the merged string
extracted = extractJsonString(mergedJsonString)
parsed, parseErr, _ = tryParseJson(extracted)
if parseErr is None and parsed:
# Parsing succeeded - normalize and use
normalized = self._normalizeJsonStructure(parsed, useCaseId)
parsedJsonForUseCase = normalized
result = json.dumps(normalized, indent=2, ensure_ascii=False)
else:
# Parsing failed - try to extract partial data for section_content
if useCaseId == "section_content":
# Use existing mergeDeepStructures approach: parse what we can from each part
allParsed = [] allParsed = []
for jsonStr in accumulatedDirectJson + [result]: for jsonStr in allJsonStrings:
extracted = extractJsonString(jsonStr) extracted = extractJsonString(jsonStr)
parsed, parseErr, _ = tryParseJson(extracted) parsed, parseErr, _ = tryParseJson(extracted)
if parseErr is None and parsed: if parseErr is None and parsed:
# Normalize structure: ensure consistent format
normalized = self._normalizeJsonStructure(parsed, useCaseId) normalized = self._normalizeJsonStructure(parsed, useCaseId)
allParsed.append(normalized) allParsed.append(normalized)
if allParsed and len(allParsed) > 1: if allParsed:
# Generic recursive merge of parsed JSON objects # Use existing mergeDeepStructures for intelligent merging
mergedJsonObj = self._mergeJsonObjectsRecursively(allParsed) if len(allParsed) > 1:
mergedJsonObj = allParsed[0]
# Reconstruct merged JSON string for nextObj in allParsed[1:]:
mergedJsonString = json.dumps(mergedJsonObj, indent=2, ensure_ascii=False) mergedJsonObj = JsonResponseHandler.mergeDeepStructures(
parsedJsonForUseCase = mergedJsonObj mergedJsonObj, nextObj, iteration, f"section_content.merge"
result = mergedJsonString )
logger.info(f"Successfully merged {len(accumulatedDirectJson) + 1} JSON fragments using generic recursive merge")
elif allParsed:
# Only one parsed JSON, use it directly
parsedJsonForUseCase = allParsed[0]
result = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False)
else: else:
# Fallback to string merging if parsing fails mergedJsonObj = allParsed[0]
logger.warning("Failed to parse all JSON fragments for data-based merge, falling back to string merging")
mergedJsonString = accumulatedDirectJson[0] if accumulatedDirectJson else result parsedJsonForUseCase = mergedJsonObj
for prevJson in accumulatedDirectJson[1:]: result = json.dumps(mergedJsonObj, indent=2, ensure_ascii=False)
mergedJsonString = JsonResponseHandler.mergeJsonStringsWithOverlap(mergedJsonString, prevJson) else:
mergedJsonString = JsonResponseHandler.mergeJsonStringsWithOverlap(mergedJsonString, result) # All parsing failed - use string merge result
result = mergedJsonString
else:
# Not section_content - use string merge result
result = mergedJsonString result = mergedJsonString
except Exception as e: except Exception as e:
logger.warning(f"Failed data-based merge, falling back to string merging: {e}") logger.warning(f"Failed data-based merge, falling back to string merging: {e}")
@ -647,128 +665,6 @@ class AiCallLooper:
# For other use cases, return as-is (they have their own structures) # For other use cases, return as-is (they have their own structures)
return parsed return parsed
def _mergeJsonObjectsRecursively(self, jsonObjects: List[Any]) -> Any:
"""
GENERIC recursive merge function for JSON objects.
Works for ANY JSON structure - handles lists, dicts, and primitives intelligently.
Merge strategy:
- Lists/Arrays: Merge by removing duplicates based on content (works for rows, items, elements, etc.)
- Dicts/Objects: Merge properties, recursively merging nested structures
- Primitives: Use the latest value
Args:
jsonObjects: List of parsed JSON objects to merge
Returns:
Merged JSON object
"""
if not jsonObjects:
return None
if len(jsonObjects) == 1:
return jsonObjects[0]
# Start with first object and merge others into it
merged = jsonObjects[0]
for obj in jsonObjects[1:]:
merged = self._mergeTwoObjects(merged, obj)
return merged
def _mergeTwoObjects(self, obj1: Any, obj2: Any) -> Any:
"""
Merge two JSON objects recursively.
Args:
obj1: First object (base)
obj2: Second object (to merge into obj1)
Returns:
Merged object
"""
# Handle None values
if obj1 is None:
return obj2
if obj2 is None:
return obj1
# Handle different types
if isinstance(obj1, dict) and isinstance(obj2, dict):
# Merge dictionaries
merged = dict(obj1) # Start with copy of obj1
for key, value2 in obj2.items():
if key in merged:
# Key exists in both - recursively merge
merged[key] = self._mergeTwoObjects(merged[key], value2)
else:
# New key - add it
merged[key] = value2
return merged
elif isinstance(obj1, list) and isinstance(obj2, list):
# Merge lists by removing duplicates based on content
merged = list(obj1) # Start with copy of obj1
seenItems = set() # Track seen items to avoid duplicates
# Add all items from obj1 with their keys
for item in merged:
itemKey = self._createItemKey(item)
seenItems.add(itemKey)
# Add items from obj2 that aren't duplicates
for item in obj2:
itemKey = self._createItemKey(item)
if itemKey not in seenItems:
seenItems.add(itemKey)
merged.append(item)
return merged
else:
# Different types or primitives - use obj2 (latest value)
return obj2
def _createItemKey(self, item: Any) -> Any:
"""
Create a key for an item to detect duplicates.
Works generically for any JSON structure.
Args:
item: Item to create key for
Returns:
Key that can be used for duplicate detection
"""
if isinstance(item, dict):
# For dicts, create key from all values (or specific identifying fields)
# Try to find common identifying fields first
if "id" in item:
return ("id", item["id"])
elif "type" in item and "content" in item:
# For elements with type and content, use type + content hash
content = item.get("content", {})
if isinstance(content, dict):
# For tables/lists, use type + first few rows/items for key
if "rows" in content:
rows = content.get("rows", [])
return ("type", item["type"], "rows", tuple(rows[:3]) if rows else ())
elif "items" in content:
items = content.get("items", [])
return ("type", item["type"], "items", tuple(items[:3]) if items else ())
return ("type", item["type"], tuple(sorted(item.items())))
else:
# Generic: use sorted items tuple
return tuple(sorted(item.items()))
elif isinstance(item, (list, tuple)):
# For lists/tuples, use the tuple itself as key
return tuple(item) if isinstance(item, list) else item
else:
# For primitives, use the value itself
return item
async def _defineKpisFromPrompt( async def _defineKpisFromPrompt(
self, self,
userPrompt: str, userPrompt: str,

View file

@ -13,7 +13,7 @@ from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, Operati
# Type hint for renderer parameter # Type hint for renderer parameter
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from modules.services.serviceGeneration.renderers.rendererBaseTemplate import BaseRenderer from modules.services.serviceGeneration.renderers.documentRendererBaseTemplate import BaseRenderer
_RendererLike = BaseRenderer _RendererLike = BaseRenderer
else: else:
_RendererLike = Any _RendererLike = Any

View file

@ -15,6 +15,7 @@ from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata, DocumentData from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata, DocumentData
from modules.datamodels.datamodelExtraction import ContentPart from modules.datamodels.datamodelExtraction import ContentPart
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
from modules.shared.jsonUtils import extractJsonString
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -129,11 +130,25 @@ class CodeGenerationPath:
operationType=OperationTypeEnum.DATA_GENERATE.value operationType=OperationTypeEnum.DATA_GENERATE.value
) )
# Create summary JSON for content field
summaryContent = {
"type": "code_generation",
"metadata": codeStructure.get("metadata", {}),
"files": [
{
"filename": doc.documentName,
"mimeType": doc.mimeType
}
for doc in documents
],
"fileCount": len(documents)
}
self.services.chat.progressLogFinish(codeOperationId, True) self.services.chat.progressLogFinish(codeOperationId, True)
return AiResponse( return AiResponse(
documents=documents, documents=documents,
content=None, content=json.dumps(summaryContent, ensure_ascii=False),
metadata=metadata metadata=metadata
) )
@ -191,20 +206,22 @@ Request: {userPrompt}
Language: {language} Language: {language}
IMPORTANT: If the request mentions multiple files (e.g., "3 files", "config.json and customers.json", etc.), you MUST include ALL requested files in the files array. Set projectType to "multi_file" when multiple files are requested.
Create a JSON structure with: Create a JSON structure with:
1. metadata: {{"language": "{language}", "projectType": "single_file|multi_file", "projectName": "..."}} 1. metadata: {{"language": "{language}", "projectType": "single_file|multi_file", "projectName": "..."}}
2. files: Array of file structures, each with: 2. files: Array of file structures, each with:
- id: Unique identifier - id: Unique identifier (e.g., "file_1", "file_2")
- filename: File name (e.g., "main.py", "utils.py") - filename: File name (e.g., "config.json", "customers.json", "main.py")
- fileType: File extension (e.g., "py", "js") - fileType: File extension (e.g., "json", "py", "js", "csv", "xml")
- dependencies: List of file IDs this file depends on (for multi-file projects) - dependencies: List of file IDs this file depends on (for multi-file projects)
- imports: List of import statements (for dependency extraction) - imports: List of import statements (for dependency extraction)
- functions: Array of function signatures {{"name": "...", "signature": "..."}} - functions: Array of function signatures {{"name": "...", "signature": "..."}}
- classes: Array of class definitions {{"name": "...", "signature": "..."}} - classes: Array of class definitions {{"name": "...", "signature": "..."}}
For single-file projects, return one file. For multi-file projects, break down into logical modules. For single-file projects, return one file. For multi-file projects, include ALL requested files in the files array.
Return ONLY valid JSON in this format: Example for single file:
{{ {{
"metadata": {{ "metadata": {{
"language": "{language}", "language": "{language}",
@ -214,8 +231,8 @@ Return ONLY valid JSON in this format:
"files": [ "files": [
{{ {{
"id": "file_1", "id": "file_1",
"filename": "main.py", "filename": "config.json",
"fileType": "py", "fileType": "json",
"dependencies": [], "dependencies": [],
"imports": [], "imports": [],
"functions": [], "functions": [],
@ -223,6 +240,46 @@ Return ONLY valid JSON in this format:
}} }}
] ]
}} }}
Example for multiple files:
{{
"metadata": {{
"language": "{language}",
"projectType": "multi_file",
"projectName": "generated-project"
}},
"files": [
{{
"id": "file_1",
"filename": "config.json",
"fileType": "json",
"dependencies": [],
"imports": [],
"functions": [],
"classes": []
}},
{{
"id": "file_2",
"filename": "customers.json",
"fileType": "json",
"dependencies": [],
"imports": [],
"functions": [],
"classes": []
}},
{{
"id": "file_3",
"filename": "settings.json",
"fileType": "json",
"dependencies": [],
"imports": [],
"functions": [],
"classes": []
}}
]
}}
Return ONLY valid JSON matching the request above.
""" """
# Use generic looping system with code_structure use case # Use generic looping system with code_structure use case
@ -239,7 +296,9 @@ Return ONLY valid JSON in this format:
contentParts=contentParts contentParts=contentParts
) )
parsed = json.loads(structureJson) # Extract JSON from markdown fences if present
extractedJson = extractJsonString(structureJson)
parsed = json.loads(extractedJson)
return parsed return parsed
async def _generateCodeContent( async def _generateCodeContent(
@ -561,7 +620,9 @@ Return ONLY valid JSON in this format:
debugPrefix=f"code_content_{fileStructure.get('id', 'file')}", debugPrefix=f"code_content_{fileStructure.get('id', 'file')}",
) )
parsed = json.loads(contentJson) # Extract JSON from markdown fences if present
extractedJson = extractJsonString(contentJson)
parsed = json.loads(extractedJson)
# Extract file content and metadata # Extract file content and metadata
files = parsed.get("files", []) files = parsed.get("files", [])

View file

@ -5,7 +5,7 @@ Base renderer class for code format renderers.
""" """
from abc import abstractmethod from abc import abstractmethod
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import logging import logging

View file

@ -7,7 +7,7 @@ Renderer registry for automatic discovery and registration of renderers.
import logging import logging
import importlib import importlib
from typing import Dict, Type, List, Optional from typing import Dict, Type, List, Optional
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -38,7 +38,7 @@ class RendererRegistry:
# Scan all Python files in the renderers directory # Scan all Python files in the renderers directory
for filePath in renderersDir.glob("*.py"): for filePath in renderersDir.glob("*.py"):
if filePath.name in ['registry.py', 'rendererBaseTemplate.py', '__init__.py']: if filePath.name in ['registry.py', 'documentRendererBaseTemplate.py', '__init__.py']:
continue continue
# Extract module name from filename # Extract module name from filename

View file

@ -4,7 +4,7 @@
CSV renderer for report generation. CSV renderer for report generation.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional

View file

@ -4,7 +4,7 @@
DOCX renderer for report generation using python-docx. DOCX renderer for report generation using python-docx.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import io import io

View file

@ -4,7 +4,7 @@
HTML renderer for report generation. HTML renderer for report generation.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional

View file

@ -4,7 +4,7 @@
Image renderer for report generation using AI image generation. Image renderer for report generation using AI image generation.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import logging import logging

View file

@ -4,7 +4,7 @@
JSON renderer for report generation. JSON renderer for report generation.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import json import json

View file

@ -4,7 +4,7 @@
Markdown renderer for report generation. Markdown renderer for report generation.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional

View file

@ -4,7 +4,7 @@
PDF renderer for report generation using reportlab. PDF renderer for report generation using reportlab.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import io import io

View file

@ -7,7 +7,7 @@ import json
import re import re
from datetime import datetime, UTC from datetime import datetime, UTC
from typing import Dict, Any, Optional, List from typing import Dict, Any, Optional, List
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -4,7 +4,7 @@
Text renderer for report generation. Text renderer for report generation.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional

View file

@ -4,7 +4,7 @@
Excel renderer for report generation using openpyxl. Excel renderer for report generation using openpyxl.
""" """
from .rendererBaseTemplate import BaseRenderer from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import io import io

View file

@ -282,7 +282,7 @@ class MethodAi(MethodBase):
), ),
"generateCode": WorkflowActionDefinition( "generateCode": WorkflowActionDefinition(
actionId="ai.generateCode", actionId="ai.generateCode",
description="Generate code files - explicitly sets intent to 'code'. If the prompt specifies file formats to deliver, include them in the prompt", description="Generate one or multiple code files in a single action - explicitly sets intent to 'code'. This action can generate multiple files (e.g., config.json, customers.json, settings.json) when the prompt requests multiple files. If the prompt specifies file formats to deliver, include them in the prompt. IMPORTANT: When the user requests multiple files (e.g., 'generate 3 JSON files'), use a SINGLE ai.generateCode action with a prompt that describes ALL requested files, rather than splitting into multiple actions.",
dynamicMode=True, dynamicMode=True,
parameters={ parameters={
"prompt": WorkflowActionParameter( "prompt": WorkflowActionParameter(
@ -290,7 +290,7 @@ class MethodAi(MethodBase):
type="str", type="str",
frontendType=FrontendType.TEXTAREA, frontendType=FrontendType.TEXTAREA,
required=True, required=True,
description="Description of code to generate" description="Description of code to generate. If multiple files are requested, describe ALL files in this single prompt (e.g., 'Generate 3 JSON files: 1) config.json with..., 2) customers.json with..., 3) settings.json with...')."
), ),
"documentList": WorkflowActionParameter( "documentList": WorkflowActionParameter(
name="documentList", name="documentList",
@ -303,9 +303,9 @@ class MethodAi(MethodBase):
name="resultType", name="resultType",
type="str", type="str",
frontendType=FrontendType.SELECT, frontendType=FrontendType.SELECT,
frontendOptions=["py", "js", "ts", "html", "java", "cpp", "txt"], frontendOptions=["py", "js", "ts", "html", "java", "cpp", "txt", "json", "csv", "xml"],
required=False, required=False,
description="Output format (html, js, py, etc.). Optional: if omitted, formats are determined from prompt by AI. With per-document format determination, AI can determine different formats for different documents based on prompt." description="Output format (html, js, py, json, csv, xml, etc.). Optional: if omitted, formats are determined from prompt by AI. This action can return MULTIPLE files in a single call when the prompt requests multiple files. With per-document format determination, AI can determine different formats for different files based on prompt. When multiple files are requested, the action will return multiple documents (one per file)."
) )
}, },
execute=generateCode.__get__(self, self.__class__) execute=generateCode.__get__(self, self.__class__)

View file

@ -0,0 +1,556 @@
#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Code Generation Formats Test 11 - Tests code generation in JSON, CSV, and XML formats
Tests code generation with structured data formats including validation and formatting.
"""
import asyncio
import json
import sys
import os
import time
import csv
import io
import xml.etree.ElementTree as ET
from typing import Dict, Any, List, Optional
# Add the gateway to path (go up 2 levels from tests/functional/)
_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
# Import the service initialization
from modules.services import getInterface as getServices
from modules.datamodels.datamodelChat import UserInputRequest, WorkflowModeEnum
from modules.datamodels.datamodelUam import User
from modules.features.workflow import chatStart
import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
class CodeGenerationFormatsTester11:
def __init__(self):
# Use root user for testing (has full access to everything)
from modules.interfaces.interfaceDbAppObjects import getRootInterface
rootInterface = getRootInterface()
self.testUser = rootInterface.currentUser
# Initialize services using the existing system
self.services = getServices(self.testUser, None) # Test user, no workflow
self.workflow = None
self.testResults = {}
self.generatedDocuments = {}
async def initialize(self):
"""Initialize the test environment."""
# Enable debug file logging for tests
from modules.shared.configuration import APP_CONFIG
APP_CONFIG.set("APP_DEBUG_CHAT_WORKFLOW_ENABLED", True)
# Set logging level to INFO to see workflow progress
import logging
logging.getLogger().setLevel(logging.INFO)
print(f"Initialized test with user: {self.testUser.id}")
print(f"Mandate ID: {self.testUser.mandateId}")
print(f"Debug logging enabled: {APP_CONFIG.get('APP_DEBUG_CHAT_WORKFLOW_ENABLED', False)}")
def createTestPrompt(self, format: str) -> str:
"""Create a test prompt for code generation in the specified format.
The prompt requests 3 files for each format:
- Structured data generation appropriate for the format
- Proper formatting and validation
"""
formatPrompts = {
"json": (
"Generate 3 JSON code files for a customer management system:\n"
"1) Create a config.json file with:\n"
" - Application name: 'Customer Manager'\n"
" - Version: '1.0.0'\n"
" - Database settings: host, port, name\n"
" - API settings: baseUrl, timeout\n"
"2) Create a customers.json file with an array of customer objects:\n"
" - Each customer should have: id, name, email, phone, address\n"
" - Include at least 3 sample customers\n"
"3) Create a settings.json file with:\n"
" - Theme settings: darkMode, fontSize, language\n"
" - Notification settings: email, sms, push\n"
" - Feature flags: enableAnalytics, enableReports\n\n"
"Format all files as valid JSON with proper indentation."
),
"csv": (
"Generate 3 CSV code files for expense tracking:\n"
"1) Create an expenses.csv file with:\n"
" - Header row: Documentname, Datum, Händler, Kreditkartennummer, Gesamtbetrag, Währung, MWST-Satz\n"
" - Data rows with at least 5 expense entries\n"
" - Use consistent date format (DD.MM.YYYY)\n"
" - Use CHF as currency\n"
" - Use 7.7% as VAT rate\n"
"2) Create a categories.csv file with:\n"
" - Header row: CategoryID, CategoryName, Description, ParentCategory\n"
" - Data rows with at least 8 categories\n"
"3) Create a vendors.csv file with:\n"
" - Header row: VendorID, VendorName, ContactPerson, Email, Phone, Address\n"
" - Data rows with at least 6 vendors\n\n"
"Format all files as valid CSV with proper header row and consistent column count."
),
"xml": (
"Generate 3 XML code files for a product catalog:\n"
"1) Create a products.xml file with:\n"
" - Root element: <catalog>\n"
" - Each product as <product> element with:\n"
" - <id>, <name>, <description>, <price>, <category>\n"
" - Include at least 4 products\n"
"2) Create a categories.xml file with:\n"
" - Root element: <categories>\n"
" - Each category as <category> element with:\n"
" - <id>, <name>, <description>, <parentId>\n"
" - Include at least 5 categories\n"
"3) Create a suppliers.xml file with:\n"
" - Root element: <suppliers>\n"
" - Each supplier as <supplier> element with:\n"
" - <id>, <name>, <contact>, <address>\n"
" - Include at least 3 suppliers\n\n"
"Format all files as valid XML with proper indentation and structure."
)
}
return formatPrompts.get(format.lower(), formatPrompts["json"])
async def generateCodeInFormat(self, format: str) -> Dict[str, Any]:
"""Generate code in the specified format using workflow."""
print("\n" + "="*80)
print(f"GENERATING CODE IN {format.upper()} FORMAT")
print("="*80)
prompt = self.createTestPrompt(format)
print(f"Prompt: {prompt[:200]}...")
# Create user input request
userInput = UserInputRequest(
prompt=prompt,
listFileId=[],
userLanguage="en"
)
# Start workflow
print(f"\nStarting workflow for {format.upper()} code generation...")
workflow = await chatStart(
currentUser=self.testUser,
userInput=userInput,
workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC,
workflowId=None
)
if not workflow:
return {
"success": False,
"error": "Failed to start workflow"
}
self.workflow = workflow
print(f"Workflow started: {workflow.id}")
# Wait for workflow completion (no timeout - wait indefinitely)
print(f"Waiting for workflow completion...")
completed = await self.waitForWorkflowCompletion(timeout=None)
if not completed:
return {
"success": False,
"error": "Workflow did not complete",
"workflowId": workflow.id,
"status": workflow.status if workflow else "unknown"
}
# Analyze results
results = self.analyzeWorkflowResults()
# Extract documents for this format
documents = results.get("documents", [])
formatDocuments = [d for d in documents if d.get("fileName", "").endswith(f".{format.lower()}")]
return {
"success": True,
"format": format,
"workflowId": workflow.id,
"status": results.get("status"),
"documentCount": len(formatDocuments),
"documents": formatDocuments,
"results": results
}
async def waitForWorkflowCompletion(self, timeout: Optional[int] = None, checkInterval: int = 2) -> bool:
"""Wait for workflow to complete."""
if not self.workflow:
return False
startTime = time.time()
lastStatus = None
interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
if timeout is None:
print("Waiting indefinitely (no timeout)")
while True:
# Check timeout only if specified
if timeout is not None and time.time() - startTime > timeout:
print(f"\n⏱️ Timeout after {timeout} seconds")
return False
# Get current workflow status
try:
currentWorkflow = interfaceDbChat.getWorkflow(self.workflow.id)
if not currentWorkflow:
print("\n❌ Workflow not found")
return False
currentStatus = currentWorkflow.status
elapsed = int(time.time() - startTime)
# Print status if it changed
if currentStatus != lastStatus:
print(f"Workflow status: {currentStatus} (elapsed: {elapsed}s)")
lastStatus = currentStatus
# Check if workflow is complete
if currentStatus in ["completed", "stopped", "failed"]:
self.workflow = currentWorkflow
statusIcon = "" if currentStatus == "completed" else ""
print(f"\n{statusIcon} Workflow finished with status: {currentStatus} (elapsed: {elapsed}s)")
return currentStatus == "completed"
# Wait before next check
await asyncio.sleep(checkInterval)
except Exception as e:
print(f"\n⚠️ Error checking workflow status: {str(e)}")
await asyncio.sleep(checkInterval)
def analyzeWorkflowResults(self) -> Dict[str, Any]:
"""Analyze workflow results and extract information."""
if not self.workflow:
return {"error": "No workflow to analyze"}
interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
workflow = interfaceDbChat.getWorkflow(self.workflow.id)
if not workflow:
return {"error": "Workflow not found"}
# Get unified chat data
chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None)
# Count messages
messages = chatData.get("messages", [])
userMessages = [m for m in messages if m.get("role") == "user"]
assistantMessages = [m for m in messages if m.get("role") == "assistant"]
# Count documents
documents = chatData.get("documents", [])
# Get logs
logs = chatData.get("logs", [])
results = {
"workflowId": workflow.id,
"status": workflow.status,
"workflowMode": str(workflow.workflowMode) if hasattr(workflow, 'workflowMode') else None,
"currentRound": workflow.currentRound,
"totalTasks": workflow.totalTasks,
"totalActions": workflow.totalActions,
"messageCount": len(messages),
"userMessageCount": len(userMessages),
"assistantMessageCount": len(assistantMessages),
"documentCount": len(documents),
"logCount": len(logs),
"documents": documents,
"logs": logs
}
print(f"\nWorkflow Results:")
print(f" Status: {results['status']}")
print(f" Tasks: {results['totalTasks']}")
print(f" Actions: {results['totalActions']}")
print(f" Messages: {results['messageCount']}")
print(f" Documents: {results['documentCount']}")
# Print document details
if documents:
print(f"\nGenerated Documents:")
for doc in documents:
fileName = doc.get("fileName", "unknown")
fileSize = doc.get("fileSize", 0)
mimeType = doc.get("mimeType", "unknown")
print(f" - {fileName} ({fileSize} bytes, {mimeType})")
return results
def verifyCodeFormat(self, document: Dict[str, Any], expectedFormat: str) -> Dict[str, Any]:
"""Verify that a code file matches the expected format and is valid."""
fileName = document.get("fileName", "")
mimeType = document.get("mimeType", "")
fileSize = document.get("fileSize", 0)
# Expected MIME types
expectedMimeTypes = {
"json": ["application/json"],
"csv": ["text/csv"],
"xml": ["application/xml", "text/xml"]
}
# Expected file extensions
expectedExtensions = {
"json": [".json"],
"csv": [".csv"],
"xml": [".xml"]
}
formatLower = expectedFormat.lower()
expectedMimes = expectedMimeTypes.get(formatLower, [])
expectedExts = expectedExtensions.get(formatLower, [])
# Check file extension
hasCorrectExtension = any(fileName.lower().endswith(ext) for ext in expectedExts)
# Check MIME type
hasCorrectMimeType = any(mimeType.lower() == mime.lower() for mime in expectedMimes)
# Check file size (should be > 0)
hasValidSize = fileSize > 0
# Try to read and validate content
isValidContent = False
validationError = None
try:
# Get file content from fileId
fileId = document.get("fileId")
if fileId and hasattr(self.services, 'interfaceDbComponent'):
fileData = self.services.interfaceDbComponent.getFileData(fileId)
if fileData:
content = fileData.decode('utf-8') if isinstance(fileData, bytes) else fileData
# Validate format-specific syntax
if formatLower == "json":
try:
json.loads(content)
isValidContent = True
except json.JSONDecodeError as e:
validationError = f"Invalid JSON: {str(e)}"
elif formatLower == "csv":
try:
reader = csv.reader(io.StringIO(content))
rows = list(reader)
if len(rows) > 0:
# Check header row exists
headerCount = len(rows[0])
# Check all rows have same column count
allRowsValid = all(len(row) == headerCount for row in rows)
isValidContent = allRowsValid
if not allRowsValid:
validationError = "CSV rows have inconsistent column counts"
else:
validationError = "CSV file is empty"
except Exception as e:
validationError = f"CSV parsing error: {str(e)}"
elif formatLower == "xml":
try:
ET.fromstring(content)
isValidContent = True
except ET.ParseError as e:
validationError = f"Invalid XML: {str(e)}"
else:
validationError = "Could not read file data"
else:
validationError = "No fileId available"
except Exception as e:
validationError = f"Error reading/validating file: {str(e)}"
verification = {
"format": expectedFormat,
"fileName": fileName,
"mimeType": mimeType,
"fileSize": fileSize,
"hasCorrectExtension": hasCorrectExtension,
"hasCorrectMimeType": hasCorrectMimeType,
"hasValidSize": hasValidSize,
"isValidContent": isValidContent,
"validationError": validationError,
"isValid": hasCorrectExtension and hasValidSize and hasCorrectMimeType,
"isComplete": hasCorrectExtension and hasValidSize and hasCorrectMimeType and isValidContent
}
return verification
async def testAllFormats(self) -> Dict[str, Any]:
"""Test code generation in JSON, CSV, and XML formats."""
print("\n" + "="*80)
print("TESTING CODE GENERATION IN ALL FORMATS")
print("="*80)
# Test all code formats
formats = ["json", "csv", "xml"]
results = {}
for format in formats:
try:
print(f"\n{'='*80}")
print(f"Testing {format.upper()} format...")
print(f"{'='*80}")
result = await self.generateCodeInFormat(format)
results[format] = result
if result.get("success"):
documents = result.get("documents", [])
if documents:
# Verify all documents (expecting 3 files per format)
verifications = []
for doc in documents:
verification = self.verifyCodeFormat(doc, format)
verifications.append(verification)
result["verifications"] = verifications
# Count valid documents
validCount = sum(1 for v in verifications if v.get("isValid"))
contentValidCount = sum(1 for v in verifications if v.get("isValidContent"))
print(f"\n{format.upper()} generation successful!")
print(f" Documents: {len(documents)} (expected: 3)")
print(f" Valid Format: {validCount}/{len(documents)}")
print(f" Valid Content: {contentValidCount}/{len(documents)}")
# Print details for each file
for i, verification in enumerate(verifications, 1):
statusIcon = "" if verification.get("isValid") else ""
contentIcon = "" if verification.get("isValidContent") else ""
print(f" File {i}: {statusIcon} Format, {contentIcon} Content - {verification.get('fileName', 'unknown')}")
if verification.get("validationError"):
print(f" Error: {verification['validationError']}")
else:
print(f"\n⚠️ {format.upper()} generation completed but no documents found")
else:
error = result.get("error", "Unknown error")
print(f"\n{format.upper()} generation failed: {error}")
# Small delay between tests
await asyncio.sleep(2)
except Exception as e:
import traceback
print(f"\n❌ Error testing {format.upper()}: {str(e)}")
print(traceback.format_exc())
results[format] = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
return results
async def runTest(self):
"""Run the complete test."""
print("\n" + "="*80)
print("CODE GENERATION FORMATS TEST 11 - JSON, CSV, XML")
print("="*80)
try:
# Initialize
await self.initialize()
# Test all formats
formatResults = await self.testAllFormats()
# Summary
print("\n" + "="*80)
print("TEST SUMMARY")
print("="*80)
# Format tests summary
print("\nFormat Tests:")
successCount = 0
failCount = 0
completeCount = 0 # Files with valid content
for format, result in formatResults.items():
if result.get("success"):
successCount += 1
verifications = result.get("verifications", [])
docCount = result.get("documentCount", 0)
# Count valid files
validCount = sum(1 for v in verifications if v.get("isValid"))
contentValidCount = sum(1 for v in verifications if v.get("isValidContent"))
completeCount += contentValidCount
# Overall status (all files valid)
allValid = len(verifications) > 0 and all(v.get("isValid") for v in verifications)
allContentValid = len(verifications) > 0 and all(v.get("isValidContent") for v in verifications)
statusIcon = "" if allValid else "⚠️"
contentIcon = "" if allContentValid else ""
print(f"{statusIcon} {format.upper():6s}: {'PASS' if allValid else 'PARTIAL'} - {docCount} file(s) ({validCount} valid format, {contentValidCount} valid content)")
# Print errors if any
for v in verifications:
if v.get("validationError"):
print(f" {v.get('fileName', 'unknown')}: {v['validationError']}")
else:
failCount += 1
error = result.get("error", "Unknown error")
print(f"{format.upper():6s}: FAIL - {error}")
print(f"\nFormat Tests: {successCount} passed, {failCount} failed out of {len(formatResults)} formats")
print(f"Valid Content Files: {completeCount} total files with valid content")
self.testResults = {
"success": failCount == 0,
"formatTests": {
"successCount": successCount,
"failCount": failCount,
"completeCount": completeCount,
"totalFormats": len(formatResults),
"results": formatResults
},
"totalSuccess": successCount,
"totalFail": failCount
}
return self.testResults
except Exception as e:
import traceback
print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}")
print(f"Traceback:\n{traceback.format_exc()}")
self.testResults = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
return self.testResults
async def main():
"""Run code generation formats test 11."""
tester = CodeGenerationFormatsTester11()
results = await tester.runTest()
# Print final results as JSON for easy parsing
print("\n" + "="*80)
print("FINAL RESULTS (JSON)")
print("="*80)
print(json.dumps(results, indent=2, default=str))
if __name__ == "__main__":
asyncio.run(main())