diff --git a/modules/features/syncDelta/mainSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py index 0cfaa173..e3d363a3 100644 --- a/modules/features/syncDelta/mainSyncDelta.py +++ b/modules/features/syncDelta/mainSyncDelta.py @@ -110,7 +110,7 @@ class ManagerSyncDelta: def _log_audit_event(self, action: str, status: str, details: str): """Log audit events for sync operations to memory.""" try: - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC") + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC") user_id = str(self.eventUser.id) if self.eventUser else "system" log_entry = f"{timestamp} | {user_id} | {action} | {status} | {details}" self.sync_audit_log.append(log_entry) @@ -146,7 +146,7 @@ class ManagerSyncDelta: return False # Generate log filename with current timestamp - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d_%H%M%S") + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d_%H%M%S") log_filename = f"log_{timestamp}.log" # Create log content @@ -388,7 +388,7 @@ class ManagerSyncDelta: async def backupSharepointFile(self, *, filename: str) -> bool: try: - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d_%H%M%S") + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d_%H%M%S") backup_filename = f"backup_{timestamp}_{filename}" await self.services.sharepoint.copy_file_async( site_id=self.targetSite['id'], @@ -452,7 +452,7 @@ class ManagerSyncDelta: return merged_data, details def createCsvContent(self, data: list[dict], existing_headers: dict | None = None) -> bytes: - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC") + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC") if existing_headers is None: existing_headers = {"header1": "Header 1", "header2": "Header 2"} if not data: @@ -476,7 +476,7 @@ class ManagerSyncDelta: return out.getvalue().encode('utf-8') def createExcelContent(self, data: list[dict], existing_headers: dict | None = None) -> bytes: - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC") + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC") if existing_headers is None: existing_headers = {"header1": "Header 1", "header2": "Header 2"} if not data: diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index d29c0f14..2c952058 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -548,9 +548,8 @@ class ChatObjects: ) # Debug: Store message and documents for debugging - only if debug enabled - debug_enabled = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - self._storeDebugMessageAndDocuments(chat_message) + from modules.shared.debugLogger import storeDebugMessageAndDocuments + storeDebugMessageAndDocuments(chat_message, self.currentUser) return chat_message @@ -975,148 +974,6 @@ class ChatObjects: return {"items": items} - def _storeDebugMessageAndDocuments(self, message: ChatMessage) -> None: - """ - Store message and documents (metadata and file bytes) for debugging purposes. - Structure: {log_dir}/debug/messages/m_round_task_action_timestamp/documentlist_label/ - - message.json, message_text.txt - - document_###_metadata.json - - document_###_ (actual file bytes) - - Args: - message: ChatMessage object to store - """ - try: - import os - import json - from datetime import datetime, UTC - - # Create base debug directory - # Use configured log directory instead of hardcoded test-chat - from modules.shared.configuration import APP_CONFIG - logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - debug_root = os.path.join(logDir, 'debug', 'messages') - os.makedirs(debug_root, exist_ok=True) - - # Generate timestamp - timestamp = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] - - # Create message folder name: m_round_task_action_timestamp - # Use actual values from message, not defaults - round_str = str(message.roundNumber) if message.roundNumber is not None else "0" - task_str = str(message.taskNumber) if message.taskNumber is not None else "0" - action_str = str(message.actionNumber) if message.actionNumber is not None else "0" - message_folder = f"{timestamp}_m_{round_str}_{task_str}_{action_str}" - - message_path = os.path.join(debug_root, message_folder) - os.makedirs(message_path, exist_ok=True) - - # Store message data - use dict() instead of model_dump() for compatibility - message_file = os.path.join(message_path, "message.json") - with open(message_file, "w", encoding="utf-8") as f: - # Convert message to dict manually to avoid model_dump() issues - message_dict = { - "id": message.id, - "workflowId": message.workflowId, - "parentMessageId": message.parentMessageId, - "message": message.message, - "role": message.role, - "status": message.status, - "sequenceNr": message.sequenceNr, - "publishedAt": message.publishedAt, - "roundNumber": message.roundNumber, - "taskNumber": message.taskNumber, - "actionNumber": message.actionNumber, - "documentsLabel": message.documentsLabel, - "actionId": message.actionId, - "actionMethod": message.actionMethod, - "actionName": message.actionName, - "success": message.success, - "documents": [] - } - json.dump(message_dict, f, indent=2, ensure_ascii=False, default=str) - - # Store message content as text - if message.message: - message_text_file = os.path.join(message_path, "message_text.txt") - with open(message_text_file, "w", encoding="utf-8") as f: - f.write(str(message.message)) - - # Store documents if provided - if message.documents and len(message.documents) > 0: - logger.info(f"Debug: Processing {len(message.documents)} documents") - - # Group documents by documentsLabel - documents_by_label = {} - for doc in message.documents: - label = message.documentsLabel or 'default' - if label not in documents_by_label: - documents_by_label[label] = [] - documents_by_label[label].append(doc) - - # Create subfolder for each document label - for label, docs in documents_by_label.items(): - # Sanitize label for filesystem - safe_label = "".join(c for c in str(label) if c.isalnum() or c in (' ', '-', '_')).rstrip() - safe_label = safe_label.replace(' ', '_') - if not safe_label: - safe_label = "default" - - label_folder = os.path.join(message_path, safe_label) - os.makedirs(label_folder, exist_ok=True) - logger.info(f"Debug: Created document folder: {label_folder}") - - # Store each document - for i, doc in enumerate(docs): - # Create document metadata file - doc_meta = { - "id": doc.id, - "messageId": doc.messageId, - "fileId": doc.fileId, - "fileName": doc.fileName, - "fileSize": doc.fileSize, - "mimeType": doc.mimeType, - "roundNumber": doc.roundNumber, - "taskNumber": doc.taskNumber, - "actionNumber": doc.actionNumber, - "actionId": doc.actionId - } - - doc_meta_file = os.path.join(label_folder, f"document_{i+1:03d}_metadata.json") - with open(doc_meta_file, "w", encoding="utf-8") as f: - json.dump(doc_meta, f, indent=2, ensure_ascii=False, default=str) - - logger.info(f"Debug: Stored document metadata for {doc.fileName}") - - # Also store the actual file bytes next to metadata for debugging - try: - # Lazy import to avoid circular deps at module load - from modules.interfaces import interfaceDbComponentObjects as comp - componentInterface = comp.getInterface(self.currentUser) - file_bytes = componentInterface.getFileData(doc.fileId) - if file_bytes: - # Build a safe filename preserving original name - safe_name = doc.fileName or f"document_{i+1:03d}" - # Avoid path traversal - safe_name = os.path.basename(safe_name) - doc_file_path = os.path.join(label_folder, f"document_{i+1:03d}_" + safe_name) - with open(doc_file_path, "wb") as df: - df.write(file_bytes) - logger.info(f"Debug: Stored document file bytes: {doc_file_path} ({len(file_bytes)} bytes)") - else: - logger.warning(f"Debug: No file bytes returned for fileId {doc.fileId}") - except Exception as e: - logger.error(f"Debug: Failed to store document file for {doc.fileName} (fileId {doc.fileId}): {e}") - - logger.info(f"Debug: Stored message and documents in {message_path}") - - except Exception as e: - logger.error(f"Debug: Failed to store message and documents: {e}") - import traceback - logger.error(f"Debug: Traceback: {traceback.format_exc()}") def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects': diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index c036db32..511fb311 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -15,12 +15,10 @@ from modules.datamodels.datamodelWeb import ( WebSearchResultItem, ) from modules.interfaces.interfaceAiObjects import AiObjects -from modules.shared.configuration import APP_CONFIG from modules.services.serviceAi.subCoreAi import SubCoreAi from modules.services.serviceAi.subDocumentProcessing import SubDocumentProcessing from modules.services.serviceAi.subWebResearch import SubWebResearch from modules.services.serviceAi.subDocumentGeneration import SubDocumentGeneration -from modules.services.serviceAi.subUtilities import SubUtilities logger = logging.getLogger(__name__) @@ -33,7 +31,6 @@ class AiService: - SubDocumentProcessing: Document chunking, processing, and merging logic - SubWebResearch: Web research and crawling functionality - SubDocumentGeneration: Single-file and multi-file document generation - - SubUtilities: Helper functions, text processing, and debugging utilities The main service acts as a coordinator: 1. Manages lazy initialization of sub-modules @@ -55,7 +52,6 @@ class AiService: self._documentProcessor = None # Lazy initialization self._webResearch = None # Lazy initialization self._documentGenerator = None # Lazy initialization - self._utilities = None # Lazy initialization @property def extractionService(self): @@ -99,14 +95,6 @@ class AiService: self._documentGenerator = SubDocumentGeneration(self.services, self.aiObjects, self.documentProcessor) return self._documentGenerator - @property - def utilities(self): - """Lazy initialization of utilities service.""" - if self._utilities is None: - logger.info("Lazy initializing SubUtilities...") - self._utilities = SubUtilities(self.services) - return self._utilities - async def _ensureAiObjectsInitialized(self): """Ensure aiObjects is initialized.""" if self.aiObjects is None: diff --git a/modules/services/serviceAi/subCoreAi.py b/modules/services/serviceAi/subCoreAi.py index 082527d2..d581214b 100644 --- a/modules/services/serviceAi/subCoreAi.py +++ b/modules/services/serviceAi/subCoreAi.py @@ -1,8 +1,8 @@ +import json import logging from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority -from modules.shared.debugLogger import writeDebugFile logger = logging.getLogger(__name__) @@ -95,13 +95,13 @@ class SubCoreAi: ) # Write the ACTUAL prompt sent to AI (including continuation context) - writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}", None) + self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}") response = await self.aiObjects.call(request) result = response.content # Write raw AI response to debug file - writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}", None) + self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}") # Emit stats for this iteration self.services.workflow.storeWorkflowStat( @@ -117,9 +117,10 @@ class SubCoreAi: # Check if this is a continuation response (only for supported formats) if loopInstructionFormat in LoopInstructionTexts: try: - import json + # Extract JSON substring if wrapped (e.g., ```json ... ```) + extracted = self.services.utils.jsonExtractString(result) # Try to parse as JSON to check for continuation attribute - parsed_result = json.loads(result) + parsed_result = json.loads(extracted) if isinstance(parsed_result, dict) and parsed_result.get("continuation") is not None: # This is a continuation response accumulatedContent.append(result) @@ -134,6 +135,7 @@ class SubCoreAi: # Not JSON, treat as final response accumulatedContent.append(result) logger.warning(f"Iteration {iteration}: Non-JSON response received") + self.services.utils.writeDebugFile(result, f"{debugPrefix}_error_non_json_response_iteration_{iteration}") break else: # This is the final response @@ -152,7 +154,7 @@ class SubCoreAi: final_result = self._mergeJsonContent(accumulatedContent) if accumulatedContent else "" # Write final result to debug file - writeDebugFile(final_result, f"{debugPrefix}_final_result", None) + self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result") logger.info(f"AI call completed: {len(accumulatedContent)} parts from {iteration} iterations") return final_result @@ -169,7 +171,6 @@ class SubCoreAi: continuation_description = "" if accumulatedContent: try: - import json last_response = accumulatedContent[-1] parsed_response = json.loads(last_response) if isinstance(parsed_response, dict) and parsed_response.get("continuation"): @@ -207,13 +208,13 @@ Continue generating content now.""" return accumulatedContent[0] try: - import json # Parse all JSON responses parsed_responses = [] for content in accumulatedContent: try: - parsed = json.loads(content) + extracted = self.services.utils.jsonExtractString(content) + parsed = json.loads(extracted) parsed_responses.append(parsed) except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON content: {str(e)}") @@ -244,6 +245,7 @@ Continue generating content now.""" logger.error(f"Error merging JSON content: {str(e)}") return accumulatedContent[0] # Return first response on error + def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: """ Build full prompt by replacing placeholders with their content. @@ -361,10 +363,10 @@ Continue generating content now.""" generation_prompt = await self._buildGenerationPrompt(prompt, extracted_content, outputFormat, title) generated_json = await self._callAiWithLooping(generation_prompt, options, "document_generation", loopInstructionFormat=loopInstructionFormat) - # Parse the generated JSON + # Parse the generated JSON (extract fenced/embedded JSON first) try: - import json - generated_data = json.loads(generated_json) + extracted_json = self.services.utils.jsonExtractString(generated_json) + generated_data = json.loads(extracted_json) except json.JSONDecodeError as e: logger.error(f"Failed to parse generated JSON: {str(e)}") logger.error(f"JSON content length: {len(generated_json)}") @@ -372,7 +374,7 @@ Continue generating content now.""" logger.error(f"JSON content around error position: {generated_json[max(0, e.pos-50):e.pos+50]}") # Write the problematic JSON to debug file - writeDebugFile(generated_json, "failed_json_parsing", None) + self.services.utils.writeDebugFile(generated_json, "failed_json_parsing") return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"} @@ -403,11 +405,7 @@ Continue generating content now.""" } # Log AI response for debugging - try: - writeDebugFile(str(result), "documentGenerationResponse", documents) - except Exception: - pass - + self.services.utils.writeDebugFile(str(result), "documentGenerationResponse", documents) return result except Exception as e: @@ -617,26 +615,6 @@ Continue generating content now.""" "imageChunkSize": image_chunk_size } - def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]: - """ - Get models capable of handling the specific operation with capability filtering. - """ - # Use the actual AI objects model selection instead of hardcoded default - if hasattr(self, 'aiObjects') and self.aiObjects: - # Let AiObjects handle the model selection - return [] - else: - # Fallback to default model if AiObjects not available - default_model = ModelCapabilities( - name="default", - maxTokens=4000, - capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"], - costPerToken=0.001, - processingTime=1.0, - isAvailable=True - ) - return [default_model] - def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: """ Build full prompt by replacing placeholders with their content. @@ -687,36 +665,6 @@ Continue generating content now.""" return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders) - def _reduceTextPrompt( - self, - prompt: str, - context: str, - model: ModelCapabilities, - options: AiCallOptions - ) -> str: - """ - Reduce text prompt size using typeGroup-aware chunking and merging. - """ - max_size = int(model.maxTokens * (1 - options.safetyMargin)) - - if options.compressPrompt: - # Reduce both prompt and context - target_size = max_size - current_size = len(prompt) + len(context) - reduction_factor = (target_size * 0.7) / current_size - - if reduction_factor < 1.0: - prompt = self._reduceText(prompt, reduction_factor) - context = self._reduceText(context, reduction_factor) - else: - # Only reduce context, preserve prompt integrity - max_context_size = max_size - len(prompt) - if len(context) > max_context_size: - reduction_factor = max_context_size / len(context) - context = self._reduceText(context, reduction_factor) - - return prompt + "\n\n" + context if context else prompt - def _extractTextFromContentParts(self, extracted_content) -> str: """ Extract text content from ExtractionService ContentPart objects. diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py index d11a1122..9af20b1e 100644 --- a/modules/services/serviceAi/subDocumentGeneration.py +++ b/modules/services/serviceAi/subDocumentGeneration.py @@ -1,4 +1,8 @@ +import re +import json import logging +import time +from datetime import datetime, UTC from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType @@ -82,8 +86,7 @@ class SubDocumentGeneration: Unified document processing that handles both single and multi-file cases. Always processes as multi-file structure internally. """ - import time - + # Create progress logger workflow = self.services.currentWorkflow progressLogger = self.services.workflow.createProgressLogger(workflow) @@ -102,8 +105,7 @@ class SubDocumentGeneration: progressLogger.updateProgress(operationId, 0.1, "Generating prompt") # Write prompt to debug file - from modules.shared.debugLogger import writeDebugFile - writeDebugFile(extractionPrompt, "extraction_prompt", documents) + self.services.utils.writeDebugFile(extractionPrompt, "extraction_prompt", documents) # Process with unified JSON pipeline using continuation logic aiResponse = await self.documentProcessor.processDocumentsWithContinuation( @@ -116,10 +118,8 @@ class SubDocumentGeneration: # Write AI response to debug file - from modules.shared.debugLogger import writeDebugFile - import json response_json = json.dumps(aiResponse, indent=2, ensure_ascii=False) if isinstance(aiResponse, dict) else str(aiResponse) - writeDebugFile(response_json, "ai_response", documents) + self.services.utils.writeDebugFile(response_json, "ai_response", documents) # Validate response structure if not self._validateUnifiedResponseStructure(aiResponse): @@ -341,7 +341,6 @@ class SubDocumentGeneration: requestOptions.operationType = OperationType.GENERAL # Create context with the extracted JSON content - import json context = f"Extracted JSON content:\n{json.dumps(docData, indent=2)}" request = AiCallRequest( @@ -356,7 +355,6 @@ class SubDocumentGeneration: if response and response.content: # Parse the AI response as JSON try: - import re result = response.content.strip() # Extract JSON from markdown if present @@ -488,9 +486,6 @@ Return only the JSON response. response = await ai_service.aiObjects.call(request) if response and response.content: - import json - import re - # Extract JSON from response result = response.content.strip() json_match = re.search(r'\{.*\}', result, re.DOTALL) @@ -516,10 +511,8 @@ Return only the JSON response. workflow = services.currentWorkflow # Serialize payload - import json as _json - from datetime import datetime, UTC ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - content_text = _json.dumps(payload, ensure_ascii=False, indent=2) + content_text = json.dumps(payload, ensure_ascii=False, indent=2) content_bytes = content_text.encode('utf-8') # Store as file via component storage @@ -548,7 +541,7 @@ Return only the JSON response. "message": "Raw extraction data saved", "status": "data", "sequenceNr": len(getattr(workflow, 'messages', []) or []) + 1, - "publishedAt": services.utils.getUtcTimestamp(), + "publishedAt": services.utils.timestampGetUtc(), "documentsLabel": label, "documents": [] } diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py index 8fe3714a..81d355e4 100644 --- a/modules/services/serviceAi/subDocumentProcessing.py +++ b/modules/services/serviceAi/subDocumentProcessing.py @@ -1,4 +1,7 @@ +import json import logging +import re +import time from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority @@ -105,11 +108,7 @@ class SubDocumentProcessing: mergedContent = self._mergeChunkResults(chunkResults, options) # Save merged extraction content to debug - try: - from modules.shared.debugLogger import writeDebugFile - writeDebugFile(mergedContent or '', "extractionMergedText") - except Exception: - pass + self.services.utils.writeDebugFile(mergedContent or '', "extractionMergedText") return mergedContent @@ -198,13 +197,8 @@ class SubDocumentProcessing: # Continue with original merged JSON instead of re-raising # Save merged JSON extraction content to debug - try: - from modules.shared.debugLogger import writeDebugFile - import json as _json - jsonStr = _json.dumps(mergedJsonDocument, ensure_ascii=False, indent=2) - writeDebugFile(jsonStr, "extractionMergedJson") - except Exception: - pass + jsonStr = json.dumps(mergedJsonDocument, ensure_ascii=False, indent=2) + self.services.utils.writeDebugFile(jsonStr, "extractionMergedJson") return mergedJsonDocument @@ -523,7 +517,6 @@ CONTINUATION INSTRUCTIONS: """Process chunks with proper mapping to preserve relationships.""" from modules.datamodels.datamodelExtraction import ChunkResult import asyncio - import time # Collect all chunks that need processing with proper indexing chunks_to_process = [] @@ -598,25 +591,8 @@ CONTINUATION INSTRUCTIONS: ) self.services.utils.debugLogToFile(f"Image analysis result for chunk {chunk_index}: length={len(ai_result) if ai_result else 0}, preview={ai_result[:200] if ai_result else 'None'}...", "AI_SERVICE") - # Save image extraction response to debug file - only if debug enabled - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - try: - import os - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - # Use configured log directory instead of hardcoded test-chat - from modules.shared.configuration import APP_CONFIG - logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - debug_root = os.path.join(logDir, 'debug') - os.makedirs(debug_root, exist_ok=True) - with open(os.path.join(debug_root, f"{ts}_extraction_image_chunk_{chunk_index}.txt"), "w", encoding="utf-8") as f: - f.write(f"EXTRACTION IMAGE RESPONSE:\n{ai_result if ai_result else 'No response'}\n") - except Exception: - pass + # Save image extraction response to debug file + self.services.utils.writeDebugFile(ai_result or 'No response', f"extraction_image_chunk_{chunk_index}") # Check if result is empty or None if not ai_result or not ai_result.strip(): @@ -630,8 +606,6 @@ CONTINUATION INSTRUCTIONS: # If generating JSON, clean image analysis result if generate_json: try: - import json - import re # Clean the response - remove markdown code blocks if present cleaned_result = ai_result.strip() @@ -720,18 +694,12 @@ CONTINUATION INSTRUCTIONS: self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE") # Save extraction prompt and response to debug - try: - from modules.shared.debugLogger import writeDebugFile - writeDebugFile(augmented_prompt, f"extraction-Chunk{chunk_index}-Prompt") - writeDebugFile(ai_result or '', f"extraction-Chunk{chunk_index}-Response") - except Exception: - pass + self.services.utils.writeDebugFile(augmented_prompt, f"extraction-Chunk{chunk_index}-Prompt") + self.services.utils.writeDebugFile(ai_result or '', f"extraction-Chunk{chunk_index}-Response") # If generating JSON, validate the response if generate_json: try: - import json - import re # Clean the response - remove markdown code blocks if present cleaned_result = ai_result.strip() @@ -821,18 +789,12 @@ CONTINUATION INSTRUCTIONS: self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE") # Save extraction prompt and response to debug - try: - from modules.shared.debugLogger import writeDebugFile - writeDebugFile(augmented_prompt_text, f"extractionChunk{chunk_index}-Prompt") - writeDebugFile(ai_result or '', f"extractionChunk{chunk_index}-Response") - except Exception: - pass + self.services.utils.writeDebugFile(augmented_prompt_text, f"extractionChunk{chunk_index}-Prompt") + self.services.utils.writeDebugFile(ai_result or '', f"extractionChunk{chunk_index}-Response") # If generating JSON, validate the response if generate_json: try: - import json - import re # Clean the response - remove markdown code blocks and extra formatting cleaned_result = ai_result.strip() @@ -1103,7 +1065,6 @@ CONTINUATION INSTRUCTIONS: options: Optional[AiCallOptions] = None ) -> Dict[str, Any]: """Merge chunk results in JSON mode - returns structured JSON document.""" - import json if not chunkResults: return {"metadata": {"title": "Empty Document"}, "sections": []} diff --git a/modules/services/serviceAi/subUtilities.py b/modules/services/serviceAi/subUtilities.py deleted file mode 100644 index 64508d71..00000000 --- a/modules/services/serviceAi/subUtilities.py +++ /dev/null @@ -1,319 +0,0 @@ -import logging -from typing import Dict, Any, List, Optional, Tuple, Union -from modules.datamodels.datamodelAi import ModelCapabilities, AiCallOptions - -logger = logging.getLogger(__name__) - - -class SubUtilities: - """Utility functions for text processing, debugging, and helper operations.""" - - def __init__(self, services): - """Initialize utilities service. - - Args: - services: Service center instance for accessing other services - """ - self.services = services - - def _writeTraceLog(self, contextText: str, data: Any) -> None: - """Write raw data to the central trace log file without truncation.""" - try: - import os - import json - from datetime import datetime, UTC - # Only write if logger is in debug mode - if logger.level > logging.DEBUG: - return - # Get log directory from configuration via service center if possible - logDir = None - try: - logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./") - except Exception: - pass - if not logDir: - logDir = "./" - if not os.path.isabs(logDir): - # Make it relative to gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - logDir = os.path.join(gatewayDir, logDir) - os.makedirs(logDir, exist_ok=True) - traceFile = os.path.join(logDir, "log_trace.log") - timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - traceEntry = f"[{timestamp}] {contextText}\n" + ("=" * 80) + "\n" - if data is None: - traceEntry += "No data provided\n" - else: - # Prefer exact text; if dict/list, pretty print JSON - try: - if isinstance(data, (dict, list)): - traceEntry += f"JSON Data:\n{json.dumps(data, indent=2, ensure_ascii=False)}\n" - else: - text = str(data) - traceEntry += f"Text Data:\n{text}\n" - except Exception: - traceEntry += f"Data (fallback): {str(data)}\n" - traceEntry += ("=" * 80) + "\n\n" - with open(traceFile, "a", encoding="utf-8") as f: - f.write(traceEntry) - except Exception: - # Swallow to avoid recursive logging issues - pass - - def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None: - """Persist raw AI response parts for debugging under configured log directory - only if debug enabled.""" - try: - # Check if debug logging is enabled - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if not debug_enabled: - return - - import os - from datetime import datetime, UTC - # Use configured log directory instead of hardcoded test-chat - from modules.shared.configuration import APP_CONFIG - logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - logDir = os.path.join(gatewayDir, logDir) - outDir = os.path.join(logDir, 'debug') - os.makedirs(outDir, exist_ok=True) - ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] - suffix = [] - if partIndex is not None: - suffix.append(f"part{partIndex}") - if continuation is not None: - suffix.append(f"cont_{str(continuation).lower()}") - if modelName: - safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName) - suffix.append(safeModel) - suffixStr = ('_' + '_'.join(suffix)) if suffix else '' - fname = f"{ts}_{label}{suffixStr}.txt" - fpath = os.path.join(outDir, fname) - with open(fpath, 'w', encoding='utf-8') as f: - f.write(content or '') - except Exception: - # Do not raise; best-effort debug write - pass - - def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool: - """ - Check if text exceeds model token limit with safety margin. - """ - # Simple character-based estimation (4 chars per token) - estimated_tokens = len(text) // 4 - max_tokens = int(model.maxTokens * (1 - safety_margin)) - return estimated_tokens > max_tokens - - def _reduceText(self, text: str, reduction_factor: float) -> str: - """ - Reduce text size by the specified factor. - """ - if reduction_factor >= 1.0: - return text - - target_length = int(len(text) * reduction_factor) - return text[:target_length] + "... [reduced]" - - def _extractTextFromContentParts(self, extracted_content) -> str: - """ - Extract text content from ExtractionService ContentPart objects. - """ - if not extracted_content or not hasattr(extracted_content, 'parts'): - return "" - - text_parts = [] - for part in extracted_content.parts: - if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']: - if hasattr(part, 'data') and part.data: - text_parts.append(part.data) - - return "\n\n".join(text_parts) - - def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: - """ - Build full prompt by replacing placeholders with their content. - Uses the new {{KEY:placeholder}} format. - """ - if not placeholders: - return prompt - - full_prompt = prompt - for placeholder, content in placeholders.items(): - # Replace both old format {{placeholder}} and new format {{KEY:placeholder}} - full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content) - full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content) - - return full_prompt - - def _reducePlanningPrompt( - self, - full_prompt: str, - placeholders: Optional[Dict[str, str]], - model: ModelCapabilities, - options: AiCallOptions - ) -> str: - """ - Reduce planning prompt size by summarizing placeholders while preserving prompt structure. - """ - if not placeholders: - return self._reduceText(full_prompt, 0.7) - - # Reduce placeholders while preserving prompt - reduced_placeholders = {} - for placeholder, content in placeholders.items(): - if len(content) > 1000: # Only reduce long content - reduction_factor = 0.7 - reduced_content = self._reduceText(content, reduction_factor) - reduced_placeholders[placeholder] = reduced_content - else: - reduced_placeholders[placeholder] = content - - return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders) - - def _reduceTextPrompt( - self, - prompt: str, - context: str, - model: ModelCapabilities, - options: AiCallOptions - ) -> str: - """ - Reduce text prompt size using typeGroup-aware chunking and merging. - """ - max_size = int(model.maxTokens * (1 - options.safetyMargin)) - - if options.compressPrompt: - # Reduce both prompt and context - target_size = max_size - current_size = len(prompt) + len(context) - reduction_factor = (target_size * 0.7) / current_size - - if reduction_factor < 1.0: - prompt = self._reduceText(prompt, reduction_factor) - context = self._reduceText(context, reduction_factor) - else: - # Only reduce context, preserve prompt integrity - max_context_size = max_size - len(prompt) - if len(context) > max_context_size: - reduction_factor = max_context_size / len(context) - context = self._reduceText(context, reduction_factor) - - return prompt + "\n\n" + context if context else prompt - - async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str: - """Compress content to target size.""" - if len(content.encode("utf-8")) <= targetSize: - return content - - try: - compressionPrompt = f""" - Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen, - behalte aber alle wichtigen Informationen bei: - - {content} - - Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen. - """ - - # Service must not call connectors directly; use simple truncation fallback here - data = content.encode("utf-8") - return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]" - except Exception as e: - logger.warning(f"AI compression failed, using truncation: {str(e)}") - return content[:targetSize] + "... [truncated]" - - def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List], options: AiCallOptions) -> Dict[str, int]: - """ - Get model capabilities for content processing, including appropriate size limits for chunking. - """ - # Estimate total content size - prompt_size = len(prompt.encode('utf-8')) - document_size = 0 - if documents: - # Rough estimate of document content size - for doc in documents: - document_size += getattr(doc, 'fileSize', 0) or 0 - - total_size = prompt_size + document_size - - # Use AiObjects to select the best model for this content size - # We'll simulate the model selection by checking available models - from modules.interfaces.interfaceAiObjects import aiModels - - # Find the best model for this content size and operation - best_model = None - best_context_length = 0 - - for model_name, model_info in aiModels.items(): - context_length = model_info.get("contextLength", 0) - - # Skip models with no context length or too small for content - if context_length == 0: - continue - - # Check if model supports the operation type - capabilities = model_info.get("capabilities", []) - from modules.datamodels.datamodelAi import OperationType - if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities: - continue - elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities: - continue - elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities: - continue - elif "text_generation" not in capabilities: - continue - - # Prefer models that can handle the content without chunking, but allow chunking if needed - if context_length >= total_size * 0.8: # 80% of content size - if context_length > best_context_length: - best_model = model_info - best_context_length = context_length - elif best_model is None: # Fallback to largest available model - if context_length > best_context_length: - best_model = model_info - best_context_length = context_length - - # Fallback to a reasonable default if no model found - if best_model is None: - best_model = { - "contextLength": 128000, # GPT-4o default - "llmName": "gpt-4o" - } - - # Calculate appropriate sizes - # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters) - context_length_bytes = int(best_model["contextLength"] * 4) - max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length - text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks - image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks - - logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}") - logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes") - logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes") - - return { - "maxContextBytes": max_context_bytes, - "textChunkSize": text_chunk_size, - "imageChunkSize": image_chunk_size - } - - def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]: - """ - Get models capable of handling the specific operation with capability filtering. - """ - # Use the actual AI objects model selection instead of hardcoded default - if hasattr(self, 'aiObjects') and self.aiObjects: - # Let AiObjects handle the model selection - return [] - else: - # Fallback to default model if AiObjects not available - default_model = ModelCapabilities( - name="default", - maxTokens=4000, - capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"], - costPerToken=0.001, - processingTime=1.0, - isAvailable=True - ) - return [default_model] diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py index bb654b5e..645b9bdf 100644 --- a/modules/services/serviceExtraction/subPipeline.py +++ b/modules/services/serviceExtraction/subPipeline.py @@ -3,7 +3,6 @@ import logging import os from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart -from modules.shared.configuration import APP_CONFIG from .subUtils import makeId from .subRegistry import ExtractorRegistry, ChunkerRegistry from .merging.mergerText import TextMerger diff --git a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py index bee1b82f..d5d28cf8 100644 --- a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py +++ b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py @@ -4,8 +4,14 @@ Base renderer class for all format renderers. from abc import ABC, abstractmethod from typing import Dict, Any, Tuple, List -import logging import json +import logging +import re +from datetime import datetime, UTC +import base64 +import io +from PIL import Image +from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType logger = logging.getLogger(__name__) @@ -188,7 +194,6 @@ class BaseRenderer(ABC): # Basic base64 validation try: - import base64 base64.b64decode(base64_data, validate=True) return True except Exception as e: @@ -201,10 +206,6 @@ class BaseRenderer(ABC): This is a helper method that format-specific renderers can use. """ try: - import base64 - from PIL import Image - import io - # Decode base64 data image_data = base64.b64decode(base64_data) image = Image.open(io.BytesIO(image_data)) @@ -221,10 +222,6 @@ class BaseRenderer(ABC): Returns the resized image as base64 string. """ try: - import base64 - from PIL import Image - import io - # Decode base64 data image_data = base64.b64decode(base64_data) image = Image.open(io.BytesIO(image_data)) @@ -305,7 +302,6 @@ class BaseRenderer(ABC): """Format timestamp for display.""" if timestamp: return timestamp - from datetime import datetime, UTC return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC") # ===== GENERIC AI STYLING HELPERS ===== @@ -328,7 +324,6 @@ class BaseRenderer(ABC): return default_styles try: - from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType request_options = AiCallOptions() request_options.operationType = OperationType.GENERAL @@ -342,15 +337,8 @@ class BaseRenderer(ABC): response = await ai_service.aiObjects.call(request) # Save styling prompt and response to debug - try: - from modules.shared.debugLogger import writeDebugFile - writeDebugFile(style_template, "rendererStylingPrompt") - writeDebugFile(response.content or '', "rendererStylingResponse") - except Exception: - pass - - import json - import re + self.services.utils.writeDebugFile(style_template, "rendererStylingPrompt") + self.services.utils.writeDebugFile(response.content or '', "rendererStylingResponse") # Clean and parse JSON result = response.content.strip() if response and response.content else "" diff --git a/modules/services/serviceGeneration/renderers/rendererImage.py b/modules/services/serviceGeneration/renderers/rendererImage.py index dab0496c..9da52466 100644 --- a/modules/services/serviceGeneration/renderers/rendererImage.py +++ b/modules/services/serviceGeneration/renderers/rendererImage.py @@ -60,11 +60,7 @@ class RendererImage(BaseRenderer): image_prompt = await self._create_image_generation_prompt(extracted_content, document_title, user_prompt, ai_service) # Save image generation prompt to debug - try: - from modules.shared.debugLogger import writeDebugFile - writeDebugFile(image_prompt, "rendererImageGenerationPrompt") - except Exception: - pass + ai_service.services.utils.writeDebugFile(image_prompt, "rendererImageGenerationPrompt") # Generate image using AI image_result = await ai_service.aiObjects.generateImage( @@ -75,11 +71,7 @@ class RendererImage(BaseRenderer): ) # Save image generation response to debug - try: - from modules.shared.debugLogger import writeDebugFile - writeDebugFile(str(image_result), "rendererImageGenerationResponse") - except Exception: - pass + ai_service.services.utils.writeDebugFile(str(image_result), "rendererImageGenerationResponse") # Extract base64 image data from result if image_result and image_result.get("success", False): diff --git a/modules/services/serviceGeneration/subPromptBuilder.py b/modules/services/serviceGeneration/subPromptBuilder.py index 716ebac0..c59909e2 100644 --- a/modules/services/serviceGeneration/subPromptBuilder.py +++ b/modules/services/serviceGeneration/subPromptBuilder.py @@ -391,25 +391,8 @@ DO NOT return a schema description - return actual extracted content in the JSON # Combine all parts finalPrompt = f"{genericIntro}\n\n{formatGuidelines}".strip() - # Save extraction prompt to debug file - only if debug enabled - try: - debug_enabled = services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - import os - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - # Use configured log directory instead of hardcoded test-chat - from modules.shared.configuration import APP_CONFIG - logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - debug_root = os.path.join(logDir, 'debug') - os.makedirs(debug_root, exist_ok=True) - with open(os.path.join(debug_root, f"{ts}_extraction_prompt.txt"), "w", encoding="utf-8") as f: - f.write(finalPrompt) - except Exception: - pass + # Save extraction prompt to debug file + services.utils.writeDebugFile(finalPrompt, "extraction_prompt") return finalPrompt diff --git a/modules/services/serviceNormalization/mainServiceNormalization.py b/modules/services/serviceNormalization/mainServiceNormalization.py index 2a932723..6748be72 100644 --- a/modules/services/serviceNormalization/mainServiceNormalization.py +++ b/modules/services/serviceNormalization/mainServiceNormalization.py @@ -116,8 +116,8 @@ class NormalizationService: if "canonicalHeaders" not in mapping: mapping["canonicalHeaders"] = canonicalSpec.get("canonicalHeaders", []) - # debug artifact - self._writeDebugArtifact("mapping.json", mapping) + # debug artifact (now routed via writeDebugFile) + self.services.utils.writeDebugArtifact("mapping.json", mapping) return mapping def applyMapping(self, mergedJson: Dict[str, Any], mappingSpec: Dict[str, Any]) -> Dict[str, Any]: @@ -198,8 +198,8 @@ class NormalizationService: ] } - # debug artifact - self._writeDebugArtifact("canonical_merged.json", canonical) + # debug artifact (now routed via writeDebugFile) + self.services.utils.writeDebugArtifact("canonical_merged.json", canonical) return canonical def validateCanonical(self, canonicalJson: Dict[str, Any]) -> Dict[str, Any]: @@ -218,7 +218,7 @@ class NormalizationService: "rowCount": len(rows), "success": len(rows) > 0 } - self._writeDebugArtifact("normalization_report.json", report) + self.services.utils.writeDebugArtifact("normalization_report.json", report) return report # Internal helpers @@ -239,32 +239,3 @@ class NormalizationService: text = text.upper() return text - def _writeDebugArtifact(self, fileName: str, obj: Any) -> None: - try: - debugEnabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if not debugEnabled: - return - # Use configured log directory instead of hardcoded test-chat - from modules.shared.configuration import APP_CONFIG - logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - root = os.path.join(logDir, 'debug') - os.makedirs(root, exist_ok=True) - # Prefix timestamp for files that are frequently overwritten - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - if fileName in ("mapping.json", "canonical_merged.json"): - outName = f"{ts}_{fileName}" - else: - outName = fileName - path = os.path.join(root, outName) - with open(path, "w", encoding="utf-8") as f: - if isinstance(obj, (dict, list)): - f.write(json.dumps(obj, ensure_ascii=False, indent=2)) - else: - f.write(str(obj)) - except Exception: - pass - - diff --git a/modules/services/serviceUtils/mainServiceUtils.py b/modules/services/serviceUtils/mainServiceUtils.py index caa07528..6d166a05 100644 --- a/modules/services/serviceUtils/mainServiceUtils.py +++ b/modules/services/serviceUtils/mainServiceUtils.py @@ -5,11 +5,11 @@ Provides centralized access to configuration, events, and other utilities. import logging import os -from typing import Any, Optional, Dict, Callable +from typing import Any, Optional, Dict, Callable, List from modules.shared.configuration import APP_CONFIG from modules.shared.eventManagement import eventManager from modules.shared.timezoneUtils import get_utc_timestamp -from modules.security.tokenManager import TokenManager +from modules.shared import jsonUtils logger = logging.getLogger(__name__) @@ -19,6 +19,8 @@ class UtilsService: def __init__(self, services): self.services = services + # ===== Event handling ===== + def eventRegisterCron(self, job_id: str, func: Callable, cron_kwargs: Dict[str, Any], replace_existing: bool = True, coalesce: bool = True, max_instances: int = 1, misfire_grace_time: int = 1800): @@ -113,7 +115,7 @@ class UtilsService: logger.error(f"Error getting config '{key}': {str(e)}") return default - def getUtcTimestamp(self) -> float: + def timestampGetUtc(self) -> float: """ Get current UTC timestamp. @@ -126,62 +128,83 @@ class UtilsService: logger.error(f"Error getting UTC timestamp: {str(e)}") return 0.0 - def getFreshConnectionToken(self, connectionId: str): + # ===== Debug Tools ===== + + def writeDebugFile(self, content: str, fileType: str, documents: Optional[List] = None) -> None: """ - Get a fresh token for a specific connection. - - Args: - connectionId: ID of the connection to get token for - - Returns: - Token object or None if not found/expired + Wrapper to write debug files via shared debugLogger. + Mirrors writeDebugFile() in modules.shared.debugLogger and keeps a single call site. """ try: - return TokenManager().getFreshToken(connectionId) - except Exception as e: - logger.error(f"Error getting fresh token for connection {connectionId}: {str(e)}") - return None + from modules.shared.debugLogger import writeDebugFile as _writeDebugFile + _writeDebugFile(content, fileType, documents) + except Exception: + # Silent fail to never break main flow + pass def debugLogToFile(self, message: str, context: str = "DEBUG"): """ - Log debug message to file if debug logging is enabled. - - Args: - message: Debug message to log - context: Context identifier for the debug message + Wrapper to log debug messages via shared debugLogger. + Mirrors debugLogToFile() in modules.shared.debugLogger. """ try: - # Check if debug logging is enabled - debug_enabled = self.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if not debug_enabled: - return - - # Get debug directory - # Use configured log directory instead of hardcoded test-chat - logDir = self.configGet("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - debug_dir = os.path.join(logDir, 'debug') - if not os.path.isabs(debug_dir): - # If relative path, make it relative to the gateway directory - gateway_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - debug_dir = os.path.join(gateway_dir, debug_dir) - - # Ensure debug directory exists - os.makedirs(debug_dir, exist_ok=True) - - # Create debug file path - debug_file = os.path.join(debug_dir, "debug_workflow.log") - - # Format the debug entry - timestamp = self.getUtcTimestamp() - debug_entry = f"[{timestamp}] [{context}] {message}\n" - - # Write to debug file - with open(debug_file, "a", encoding="utf-8") as f: - f.write(debug_entry) - - except Exception as e: - # Don't log debug errors to avoid recursion - pass \ No newline at end of file + from modules.shared.debugLogger import debugLogToFile as _debugLogToFile + _debugLogToFile(message, context) + except Exception: + # Silent fail to never break main flow + pass + + def storeDebugMessageAndDocuments(self, message, currentUser): + """ + Wrapper to store debug messages and documents via shared debugLogger. + Mirrors storeDebugMessageAndDocuments() in modules.shared.debugLogger. + """ + try: + from modules.shared.debugLogger import storeDebugMessageAndDocuments as _storeDebugMessageAndDocuments + _storeDebugMessageAndDocuments(message, currentUser) + except Exception: + # Silent fail to never break main flow + pass + + def writeDebugArtifact(self, fileName: str, obj: Any): + """ + Backward-compatible wrapper that now writes via writeDebugFile. + Accepts an object (dict/list/str), serializes if needed, and writes to debug folder. + """ + try: + # Serialize objects to JSON when applicable + if isinstance(obj, (dict, list)): + import json + content = json.dumps(obj, ensure_ascii=False, indent=2) + else: + content = str(obj) + + # Delegate to shared writeDebugFile; preserve provided file extension in fileName + from modules.shared.debugLogger import writeDebugFile as _writeDebugFile + _writeDebugFile(content, fileName) + except Exception: + # Silent fail to never break main flow + pass + + # ===== JSON utility wrappers ===== + + def jsonStripCodeFences(self, text: str) -> str: + return jsonUtils.stripCodeFences(text) + + def jsonExtractFirstBalanced(self, text: str) -> str: + return jsonUtils.extractFirstBalancedJson(text) + + def jsonNormalizeText(self, text: str) -> str: + return jsonUtils.normalizeJsonText(text) + + def jsonExtractString(self, text: str) -> str: + return jsonUtils.extractJsonString(text) + + def jsonTryParse(self, text) -> tuple: + return jsonUtils.tryParseJson(text) + + def jsonParseOrRaise(self, text): + return jsonUtils.parseJsonOrRaise(text) + + def jsonMergeRootLists(self, parts): + return jsonUtils.mergeRootLists(parts) \ No newline at end of file diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py index 1e7b9ae1..d9aaaa9f 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py @@ -293,6 +293,21 @@ Please provide a comprehensive summary of this conversation.""" logger.error(f"Error parsing connection reference: {str(e)}") return None + def getFreshConnectionToken(self, connectionId: str): + """Get a fresh token for a specific connection (moved from UtilsService). + + Args: + connectionId: ID of the connection to get token for + + Returns: + Token object or None if not found/expired + """ + try: + return TokenManager().getFreshToken(connectionId) + except Exception as e: + logger.error(f"Error getting fresh token for connection {connectionId}: {str(e)}") + return None + def getFileInfo(self, fileId: str) -> Dict[str, Any]: """Get file information""" file_item = self.interfaceDbComponent.getFile(fileId) diff --git a/modules/shared/debugLogger.py b/modules/shared/debugLogger.py index 947bf816..a4fd0032 100644 --- a/modules/shared/debugLogger.py +++ b/modules/shared/debugLogger.py @@ -4,24 +4,36 @@ Writes files chronologically to the configured log directory with sequential num """ import os from datetime import datetime, UTC -from typing import List, Optional +from typing import List, Optional, Any from modules.shared.configuration import APP_CONFIG -def _getDebugDir() -> str: - """Get the debug directory path from configuration.""" - # Get log directory from config (same as used by main logging system) +def _resolveLogDir() -> str: + """Resolve the absolute log directory from configuration.""" logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") if not os.path.isabs(logDir): # If relative path, make it relative to the gateway directory gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) logDir = os.path.join(gatewayDir, logDir) + return logDir + +def _ensureDir(path: str) -> None: + """Create directory if it does not exist.""" + os.makedirs(path, exist_ok=True) + +def _isDebugEnabled() -> bool: + """Check if debug workflow logging is enabled.""" + return APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) + +def _getDebugDir() -> str: + """Get the debug directory path from configuration.""" + # Get log directory from config (same as used by main logging system) + logDir = _resolveLogDir() # Create debug subdirectory within the log directory - debugDir = os.path.join(logDir, 'debug') + debugDir = os.path.join(logDir, 'debug/prompts') return debugDir - def _getNextSequenceNumber() -> int: """Get the next sequence number by counting existing files.""" debugDir = _getDebugDir() @@ -38,6 +50,7 @@ def writeDebugFile(content: str, fileType: str, documents: Optional[List] = None Write debug content to a file with sequential numbering. Writes the content as-is since it's already the final integrated prompt. Includes document list labels for tracing enhancement. + Only writes if debug logging is enabled via APP_DEBUG_CHAT_WORKFLOW_ENABLED config. Args: content: The main content to write (already integrated) @@ -45,15 +58,23 @@ def writeDebugFile(content: str, fileType: str, documents: Optional[List] = None documents: Optional list of documents for tracing """ try: + # Check if debug logging is enabled + if not _isDebugEnabled(): + return + debugDir = _getDebugDir() - os.makedirs(debugDir, exist_ok=True) + _ensureDir(debugDir) seqNum = _getNextSequenceNumber() ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S') # Add 3-digit sequence number for uniqueness tsWithSeq = f"{ts}-{seqNum:03d}" - filename = f"{tsWithSeq}-{fileType}.txt" + # Allow callers to pass an extension; if none, default to .txt + if "." in (fileType or ""): + filename = f"{tsWithSeq}-{fileType}" + else: + filename = f"{tsWithSeq}-{fileType}.txt" filepath = os.path.join(debugDir, filename) # Build content with document tracing @@ -73,7 +94,170 @@ def writeDebugFile(content: str, fileType: str, documents: Optional[List] = None # Write the content with document tracing with open(filepath, 'w', encoding='utf-8') as f: f.write(debug_content) - except Exception as e: - # Silent fail - don't break the main flow + # Don't log debug errors to avoid recursion pass + +def debugLogToFile(message: str, context: str = "DEBUG") -> None: + """ + Log debug message to file if debug logging is enabled. + + Args: + message: Debug message to log + context: Context identifier for the debug message + """ + try: + # Check if debug logging is enabled + if not _isDebugEnabled(): + return + + # Get debug directory + logDir = _resolveLogDir() + debug_dir = os.path.join(logDir, 'debug') + _ensureDir(debug_dir) + + # Create debug file path + debug_file = os.path.join(debug_dir, "debug_workflow.log") + + # Format the debug entry + from modules.shared.timezoneUtils import get_utc_timestamp + timestamp = get_utc_timestamp() + debug_entry = f"[{timestamp}] [{context}] {message}\n" + + # Write to debug file + with open(debug_file, "a", encoding="utf-8") as f: + f.write(debug_entry) + + except Exception as e: + # Don't log debug errors to avoid recursion + pass + +def storeDebugMessageAndDocuments(message, currentUser) -> None: + """ + Store message and documents (metadata and file bytes) for debugging purposes. + Structure: {log_dir}/debug/messages/m_round_task_action_timestamp/documentlist_label/ + - message.json, message_text.txt + - document_###_metadata.json + - document_###_ (actual file bytes) + + Args: + message: ChatMessage object to store + currentUser: Current user for component interface access + """ + try: + import json + from datetime import datetime, UTC + + # Create base debug directory + logDir = _resolveLogDir() + debug_root = os.path.join(logDir, 'debug', 'messages') + _ensureDir(debug_root) + + # Generate timestamp + timestamp = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] + + # Create message folder name: m_round_task_action_timestamp + # Use actual values from message, not defaults + round_str = str(message.roundNumber) if message.roundNumber is not None else "0" + task_str = str(message.taskNumber) if message.taskNumber is not None else "0" + action_str = str(message.actionNumber) if message.actionNumber is not None else "0" + message_folder = f"{timestamp}_m_{round_str}_{task_str}_{action_str}" + + message_path = os.path.join(debug_root, message_folder) + os.makedirs(message_path, exist_ok=True) + + # Store message data - use dict() instead of model_dump() for compatibility + message_file = os.path.join(message_path, "message.json") + with open(message_file, "w", encoding="utf-8") as f: + # Convert message to dict manually to avoid model_dump() issues + message_dict = { + "id": message.id, + "workflowId": message.workflowId, + "parentMessageId": message.parentMessageId, + "message": message.message, + "role": message.role, + "status": message.status, + "sequenceNr": message.sequenceNr, + "publishedAt": message.publishedAt, + "roundNumber": message.roundNumber, + "taskNumber": message.taskNumber, + "actionNumber": message.actionNumber, + "documentsLabel": message.documentsLabel, + "actionId": message.actionId, + "actionMethod": message.actionMethod, + "actionName": message.actionName, + "success": message.success, + "documents": [] + } + json.dump(message_dict, f, indent=2, ensure_ascii=False, default=str) + + # Store message content as text + if message.message: + message_text_file = os.path.join(message_path, "message_text.txt") + with open(message_text_file, "w", encoding="utf-8") as f: + f.write(str(message.message)) + + # Store documents if provided + if message.documents and len(message.documents) > 0: + # Group documents by documentsLabel + documents_by_label = {} + for doc in message.documents: + label = message.documentsLabel or 'default' + if label not in documents_by_label: + documents_by_label[label] = [] + documents_by_label[label].append(doc) + + # Create subfolder for each document label + for label, docs in documents_by_label.items(): + # Sanitize label for filesystem + safe_label = "".join(c for c in str(label) if c.isalnum() or c in (' ', '-', '_')).rstrip() + safe_label = safe_label.replace(' ', '_') + if not safe_label: + safe_label = "default" + + label_folder = os.path.join(message_path, safe_label) + _ensureDir(label_folder) + + # Store each document + for i, doc in enumerate(docs): + # Create document metadata file + doc_meta = { + "id": doc.id, + "messageId": doc.messageId, + "fileId": doc.fileId, + "fileName": doc.fileName, + "fileSize": doc.fileSize, + "mimeType": doc.mimeType, + "roundNumber": doc.roundNumber, + "taskNumber": doc.taskNumber, + "actionNumber": doc.actionNumber, + "actionId": doc.actionId + } + + doc_meta_file = os.path.join(label_folder, f"document_{i+1:03d}_metadata.json") + with open(doc_meta_file, "w", encoding="utf-8") as f: + json.dump(doc_meta, f, indent=2, ensure_ascii=False, default=str) + + # Also store the actual file bytes next to metadata for debugging + try: + # Lazy import to avoid circular deps at module load + from modules.interfaces import interfaceDbComponentObjects as comp + componentInterface = comp.getInterface(currentUser) + file_bytes = componentInterface.getFileData(doc.fileId) + if file_bytes: + # Build a safe filename preserving original name + safe_name = doc.fileName or f"document_{i+1:03d}" + # Avoid path traversal + safe_name = os.path.basename(safe_name) + doc_file_path = os.path.join(label_folder, f"document_{i+1:03d}_" + safe_name) + with open(doc_file_path, "wb") as df: + df.write(file_bytes) + else: + pass + except Exception as e: + pass + + except Exception as e: + # Silent fail - don't break main flow + pass + diff --git a/modules/shared/jsonUtils.py b/modules/shared/jsonUtils.py new file mode 100644 index 00000000..8a236182 --- /dev/null +++ b/modules/shared/jsonUtils.py @@ -0,0 +1,137 @@ +import json +import logging +from typing import Any, Dict, List, Optional, Tuple, Union + +logger = logging.getLogger(__name__) + + +def stripCodeFences(text: str) -> str: + """Remove ```json / ``` fences and surrounding whitespace if present.""" + if not text: + return text + s = text.strip() + if s.startswith("```") and s.endswith("```"): + # Remove first/last triple backticks + # Commonly starts with ```json\n + # Strip opening backticks + i = 3 + # Skip optional language tag like 'json' + while i < len(s) and s[i] != '\n': + i += 1 + if i < len(s) and s[i] == '\n': + s = s[i+1:] + # Strip trailing ``` + if s.endswith("```"): + s = s[:-3] + return s.strip() + return s + + +def extractFirstBalancedJson(text: str) -> str: + """Return the first balanced JSON object/array substring; otherwise return trimmed input.""" + if not text: + return text + s = text.strip() + # Find first '{' or '[' + brace = s.find('{') + bracket = s.find('[') + start = -1 + if brace != -1 and (bracket == -1 or brace < bracket): + start = brace + elif bracket != -1: + start = bracket + if start == -1: + return s + # Scan for matching close using a simple stack + stack: List[str] = [] + for i in range(start, len(s)): + ch = s[i] + if ch in '{[': + stack.append(ch) + elif ch in '}]': + if not stack: + continue + opener = stack.pop() + if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'): + continue + if not stack: + return s[start:i+1].strip() + return s + + +def normalizeJsonText(text: str) -> str: + """Light normalization: remove BOM, normalize smart quotes.""" + if not text: + return text + s = text + # Remove UTF-8 BOM if present + if s.startswith('\ufeff'): + s = s.lstrip('\ufeff') + # Normalize smart quotes to straight quotes + s = s.replace('“', '"').replace('”', '"').replace('’', "'").replace('‘', "'") + return s + + +def extractJsonString(text: str) -> str: + """Strip code fences, normalize, then extract first balanced JSON substring.""" + s = normalizeJsonText(text) + s = stripCodeFences(s) + s = extractFirstBalancedJson(s) + return s.strip() + + +def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]: + """Extract and parse JSON; return (obj, error, cleaned_str).""" + if isinstance(text, bytes): + try: + text = text.decode('utf-8', errors='replace') + except Exception: + text = str(text) + cleaned = extractJsonString(text or "") + try: + return json.loads(cleaned), None, cleaned + except Exception as e: + return None, e, cleaned + + +def parseJsonOrRaise(text: Union[str, bytes]) -> Union[Dict, List]: + obj, err, cleaned = tryParseJson(text) + if err is not None: + logger.error(f"parse_json_or_raise failed: {err}. Cleaned preview: {cleaned[:200]}...") + raise err + return obj + + +def mergeRootLists(json_parts: List[Union[str, Dict, List]]) -> Dict[str, Any]: + """ + Generic merger for root-level lists: take first dict as base; for each subsequent part: + - if value is list and same key exists as list, extend it + - if key absent, add it + - for non-list keys, keep the original (from the first part) + Sets continuation=None if present in base. + """ + base: Optional[Dict[str, Any]] = None + parsed: List[Dict[str, Any]] = [] + for part in json_parts: + if isinstance(part, (dict, list)): + obj = part + else: + obj, err, _ = tryParseJson(part) + if err is not None or not isinstance(obj, (dict, list)): + continue + if isinstance(obj, dict): + parsed.append(obj) + if not parsed: + return {} + base = dict(parsed[0]) + for obj in parsed[1:]: + for k, v in obj.items(): + if isinstance(v, list) and isinstance(base.get(k), list): + base[k].extend(v) + elif k not in base: + base[k] = v + if 'continuation' in base: + base['continuation'] = None + return base + + diff --git a/modules/shared/progressLogger_example.py b/modules/shared/progressLogger_example.py deleted file mode 100644 index 13146846..00000000 --- a/modules/shared/progressLogger_example.py +++ /dev/null @@ -1,183 +0,0 @@ -""" -Example usage of ProgressLogger for workflow progress tracking. - -This file demonstrates how to use the ProgressLogger utility to track -progress of long-running operations in workflows. -""" - -import asyncio -import time -from modules.shared.progressLogger import ProgressLogger - - -async def exampleDocumentProcessing(workflowService, workflow): - """Example of document processing with progress tracking.""" - - # Create progress logger - progressLogger = workflowService.createProgressLogger(workflow) - operationId = f"docProcess_{workflow.id}_{int(time.time())}" - - try: - # Start operation - progressLogger.startOperation( - operationId, - "Extract", - "Document Processing", - "Processing 5 documents" - ) - - # Simulate processing steps - documents = ["doc1.pdf", "doc2.docx", "doc3.txt", "doc4.xlsx", "doc5.pdf"] - - for i, doc in enumerate(documents): - # Update progress for each document - progress = (i + 1) / len(documents) - progressLogger.updateProgress( - operationId, - progress, - f"Processing {doc} ({i+1}/{len(documents)})" - ) - - # Simulate processing time - await asyncio.sleep(0.5) - - # Complete operation - progressLogger.completeOperation(operationId, True) - - except Exception as e: - # Complete with failure - progressLogger.completeOperation(operationId, False) - raise - - -async def exampleAiProcessing(workflowService, workflow): - """Example of AI processing with chunk progress tracking.""" - - progressLogger = workflowService.createProgressLogger(workflow) - operationId = f"aiProcess_{workflow.id}_{int(time.time())}" - - try: - # Start operation - progressLogger.startOperation( - operationId, - "AI", - "Content Analysis", - "Processing 10 chunks" - ) - - # Simulate AI processing with chunks - chunks = list(range(1, 11)) - - for i, chunk in enumerate(chunks): - progress = (i + 1) / len(chunks) - progressLogger.updateProgress( - operationId, - progress, - f"Processing chunk {chunk} of {len(chunks)}" - ) - - # Simulate AI processing time - await asyncio.sleep(0.3) - - # Complete operation - progressLogger.completeOperation(operationId, True) - - except Exception as e: - progressLogger.completeOperation(operationId, False) - raise - - -async def exampleWorkflowTask(workflowService, workflow): - """Example of workflow task execution with progress tracking.""" - - progressLogger = workflowService.createProgressLogger(workflow) - operationId = f"workflowTask_{workflow.id}_{int(time.time())}" - - try: - # Start operation - progressLogger.startOperation( - operationId, - "Workflow", - "Task Execution", - "Executing data analysis task" - ) - - # Simulate task steps - steps = [ - "Initializing analysis", - "Loading data", - "Processing data", - "Generating results", - "Saving output" - ] - - for i, step in enumerate(steps): - progress = (i + 1) / len(steps) - progressLogger.updateProgress( - operationId, - progress, - step - ) - - # Simulate step processing time - await asyncio.sleep(0.4) - - # Complete operation - progressLogger.completeOperation(operationId, True) - - except Exception as e: - progressLogger.completeOperation(operationId, False) - raise - - -# Example of how to integrate into existing services -class ExampleService: - """Example service showing integration with ProgressLogger.""" - - def __init__(self, workflowService): - self.workflowService = workflowService - - async def processWithProgress(self, workflow, data): - """Process data with progress tracking.""" - - progressLogger = self.workflowService.createProgressLogger(workflow) - operationId = f"example_{workflow.id}_{int(time.time())}" - - try: - # Start operation - progressLogger.startOperation( - operationId, - "Example", - "Data Processing", - f"Processing {len(data)} items" - ) - - # Process data in chunks - chunkSize = 10 - totalChunks = (len(data) + chunkSize - 1) // chunkSize - - for i in range(0, len(data), chunkSize): - chunk = data[i:i + chunkSize] - chunkNum = i // chunkSize + 1 - - progress = chunkNum / totalChunks - progressLogger.updateProgress( - operationId, - progress, - f"Processing chunk {chunkNum}/{totalChunks}" - ) - - # Process chunk - await self._processChunk(chunk) - - # Complete operation - progressLogger.completeOperation(operationId, True) - - except Exception as e: - progressLogger.completeOperation(operationId, False) - raise - - async def _processChunk(self, chunk): - """Process a chunk of data.""" - # Simulate processing - await asyncio.sleep(0.1) diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py index 367b04e8..961bc562 100644 --- a/modules/workflows/methods/methodOutlook.py +++ b/modules/workflows/methods/methodOutlook.py @@ -8,14 +8,11 @@ import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC import json -import uuid import requests from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelChat import ActionResult, ActionDocument -from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority, AiCallRequest -from modules.datamodels.datamodelChat import ChatDocument -from modules.datamodels.datamodelUam import ConnectionStatus +from modules.datamodels.datamodelAi import AiCallOptions logger = logging.getLogger(__name__) @@ -48,7 +45,7 @@ class MethodOutlook(MethodBase): logger.debug(f"Found connection: {userConnection.id}, status: {userConnection.status.value}, authority: {userConnection.authority.value}") # Get a fresh token for this connection - token = self.services.utils.getFreshConnectionToken(userConnection.id) + token = self.services.workflow.getFreshConnectionToken(userConnection.id) if not token: logger.error(f"Fresh token not found for connection: {userConnection.id}") logger.debug(f"Connection details: {userConnection}") @@ -466,7 +463,7 @@ class MethodOutlook(MethodBase): "authority": "microsoft", "reference": connectionReference }, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } return ActionResult.isSuccess( @@ -696,7 +693,7 @@ class MethodOutlook(MethodBase): "authority": "microsoft", "reference": connectionReference }, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } return ActionResult( @@ -819,7 +816,7 @@ class MethodOutlook(MethodBase): "authority": "microsoft", "reference": connectionReference }, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } return ActionResult( @@ -929,7 +926,7 @@ class MethodOutlook(MethodBase): "authority": "microsoft", "reference": connectionReference }, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } return ActionResult( @@ -1070,7 +1067,7 @@ class MethodOutlook(MethodBase): "authority": "microsoft", "reference": connectionReference }, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } return ActionResult( @@ -1369,7 +1366,7 @@ LOOP_INSTRUCTION""" "aiGenerated": True, "context": context, "emailStyle": emailStyle, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } return ActionResult( diff --git a/modules/workflows/methods/methodSharepoint.py b/modules/workflows/methods/methodSharepoint.py index 6dd75bac..f0d020c2 100644 --- a/modules/workflows/methods/methodSharepoint.py +++ b/modules/workflows/methods/methodSharepoint.py @@ -787,7 +787,7 @@ class MethodSharepoint(MethodBase): "totalResults": len(found_documents), "maxResults": maxResults, "foundDocuments": found_documents, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } except Exception as e: @@ -1069,7 +1069,7 @@ class MethodSharepoint(MethodBase): "authority": "microsoft", "reference": connectionReference }, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } # Use default JSON format for output @@ -1423,7 +1423,7 @@ class MethodSharepoint(MethodBase): "authority": "microsoft", "reference": connectionReference }, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } # Use default JSON format for output @@ -1820,7 +1820,7 @@ class MethodSharepoint(MethodBase): "includeSubfolders": includeSubfolders, "sitesSearched": len(sites), "listResults": list_results, - "timestamp": self.services.utils.getUtcTimestamp() + "timestamp": self.services.utils.timestampGetUtc() } # Use default JSON format for output diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py index fd649337..19d08164 100644 --- a/modules/workflows/processing/core/actionExecutor.py +++ b/modules/workflows/processing/core/actionExecutor.py @@ -278,13 +278,13 @@ class ActionExecutor: return "react_misc.jsonl" # Daily suffix for React files - dateSuffix = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d") + dateSuffix = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d") baseReactFile = _reactFileForContext(contextText) name, ext = os.path.splitext(baseReactFile) reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}") # Format the trace entry with better structure - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # Create a structured trace entry traceEntry = f"[{timestamp}] {contextText}\n" diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py index fe543bb3..6a195c16 100644 --- a/modules/workflows/processing/core/messageCreator.py +++ b/modules/workflows/processing/core/messageCreator.py @@ -56,7 +56,7 @@ class MessageCreator: "message": taskSummary, "status": "step", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": "task_plan", "documents": [], # Add workflow context fields - use current workflow round instead of hardcoded 1 @@ -86,7 +86,7 @@ class MessageCreator: "message": f"🚀 **Task {taskProgress}**", "status": "step", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": f"task_{taskIndex}_start", "documents": [], # Add workflow context fields @@ -163,7 +163,7 @@ class MessageCreator: "message": messageText, "status": "step", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "actionId": action.id, "actionMethod": action.execMethod, "actionName": action.execAction, @@ -214,7 +214,7 @@ class MessageCreator: "message": completionMessage, "status": "step", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": f"task_{taskIndex}_completion", "documents": [], # Add workflow context fields @@ -243,7 +243,7 @@ class MessageCreator: "message": f"🔄 **Task {taskIndex}** needs retry: {reviewResult.improvements}", "status": "step", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": f"task_{taskIndex}_retry", "documents": [], "roundNumber": workflow.currentRound, @@ -277,7 +277,7 @@ class MessageCreator: "message": errorMessage, "status": "step", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "actionId": None, "actionMethod": "task", "actionName": "task_error", diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py index e3dc977c..51d488a5 100644 --- a/modules/workflows/processing/modes/modeActionplan.py +++ b/modules/workflows/processing/modes/modeActionplan.py @@ -288,7 +288,7 @@ class ActionplanMode(BaseMode): "message": f"⚡ **Action {actionNumber}/{totalActions}** (Method {action.execMethod}.{action.execAction})", "status": "step", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": f"action_{actionNumber}_start", "documents": [], "actionProgress": "running", @@ -601,7 +601,7 @@ class ActionplanMode(BaseMode): retryCount=createdAction.get("retryCount", 0), retryMax=createdAction.get("retryMax", 3), processingTime=createdAction.get("processingTime"), - timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())), + timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())), result=createdAction.get("result"), resultDocuments=createdAction.get("resultDocuments", []), userMessage=createdAction.get("userMessage") @@ -745,7 +745,7 @@ class ActionplanMode(BaseMode): retryCount=createdAction.get("retryCount", 0), retryMax=createdAction.get("retryMax", 3), processingTime=createdAction.get("processingTime"), - timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())), + timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())), result=createdAction.get("result"), resultDocuments=createdAction.get("resultDocuments", []), userMessage=createdAction.get("userMessage") @@ -780,7 +780,7 @@ class ActionplanMode(BaseMode): traceFile = os.path.join(logDir, "log_trace.log") # Format the trace entry with better structure - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # Create a structured trace entry traceEntry = f"[{timestamp}] {contextText}\n" diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py index 84d38a45..2918ebba 100644 --- a/modules/workflows/processing/modes/modeReact.py +++ b/modules/workflows/processing/modes/modeReact.py @@ -693,7 +693,7 @@ class ReactMode(BaseMode): "message": messageContent, "status": status, "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": documentsLabel, "documents": [], "roundNumber": workflow.currentRound, @@ -821,7 +821,7 @@ Return only the user-friendly message, no technical details.""" retryCount=createdAction.get("retryCount", 0), retryMax=createdAction.get("retryMax", 3), processingTime=createdAction.get("processingTime"), - timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())), + timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())), result=createdAction.get("result"), resultDocuments=createdAction.get("resultDocuments", []), userMessage=createdAction.get("userMessage") @@ -912,7 +912,7 @@ Return only the user-friendly message, no technical details.""" retryCount=createdAction.get("retryCount", 0), retryMax=createdAction.get("retryMax", 3), processingTime=createdAction.get("processingTime"), - timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())), + timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())), result=createdAction.get("result"), resultDocuments=createdAction.get("resultDocuments", []), userMessage=createdAction.get("userMessage") @@ -996,13 +996,13 @@ Return only the user-friendly message, no technical details.""" return "react_misc.jsonl" # Daily suffix for React files - dateSuffix = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d") + dateSuffix = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d") baseReactFile = _reactFileForContext(contextText) name, ext = os.path.splitext(baseReactFile) reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}") # Format the trace entry with better structure - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # Create a structured trace entry traceEntry = f"[{timestamp}] {contextText}\n" diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index 787edbe1..13da0519 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -301,7 +301,7 @@ class WorkflowProcessor: traceFile = os.path.join(logDir, "log_trace.log") # Format the trace entry - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] traceEntry = f"[{timestamp}] {contextText}\n" # Add data if provided - show full content without truncation @@ -378,7 +378,7 @@ class WorkflowProcessor: 'actions': [action.to_dict() for action in taskActions] if taskActions else [], 'review_result': reviewResult, 'workflow_id': workflow.id, - 'handover_time': self.services.utils.getUtcTimestamp() + 'handover_time': self.services.utils.timestampGetUtc() } logger.info(f"Prepared handover for task {taskStep.id} in workflow {workflow.id}") return handoverData diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index f5f65060..743f840e 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -32,7 +32,7 @@ class WorkflowManager: try: # Debug log to check workflowMode parameter logger.info(f"WorkflowManager received workflowMode: {workflowMode}") - currentTime = self.services.utils.getUtcTimestamp() + currentTime = self.services.utils.timestampGetUtc() if workflowId: workflow = self.services.workflow.getWorkflow(workflowId) @@ -117,7 +117,7 @@ class WorkflowManager: raise ValueError(f"Workflow {workflowId} not found") workflow.status = "stopped" - workflow.lastActivity = self.services.utils.getUtcTimestamp() + workflow.lastActivity = self.services.utils.timestampGetUtc() self.services.workflow.updateWorkflow(workflowId, { "status": "stopped", "lastActivity": workflow.lastActivity @@ -173,7 +173,7 @@ class WorkflowManager: "message": userInput.prompt, "status": "first", "sequenceNr": 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": context_label, "documents": [], # Add workflow context fields @@ -387,7 +387,7 @@ class WorkflowManager: "message": "🛑 Workflow stopped by user", "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": "workflow_stopped", "documents": [], # Add workflow context fields @@ -402,7 +402,7 @@ class WorkflowManager: # Update workflow status to stopped workflow.status = "stopped" - workflow.lastActivity = self.services.utils.getUtcTimestamp() + workflow.lastActivity = self.services.utils.timestampGetUtc() self.services.workflow.updateWorkflow(workflow.id, { "status": "stopped", "lastActivity": workflow.lastActivity @@ -417,7 +417,7 @@ class WorkflowManager: "message": "🛑 Workflow stopped by user", "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": "workflow_stopped", "documents": [], # Add workflow context fields @@ -432,7 +432,7 @@ class WorkflowManager: # Update workflow status to stopped workflow.status = "stopped" - workflow.lastActivity = self.services.utils.getUtcTimestamp() + workflow.lastActivity = self.services.utils.timestampGetUtc() self.services.workflow.updateWorkflow(workflow.id, { "status": "stopped", "lastActivity": workflow.lastActivity, @@ -456,7 +456,7 @@ class WorkflowManager: "message": f"Workflow failed: {'Unknown error'}", "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": "workflow_failure", "documents": [], # Add workflow context fields @@ -471,7 +471,7 @@ class WorkflowManager: # Update workflow status to failed workflow.status = "failed" - workflow.lastActivity = self.services.utils.getUtcTimestamp() + workflow.lastActivity = self.services.utils.timestampGetUtc() self.services.workflow.updateWorkflow(workflow.id, { "status": "failed", "lastActivity": workflow.lastActivity, @@ -500,7 +500,7 @@ class WorkflowManager: "message": f"Error processing workflow results: {str(e)}", "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": "workflow_error", "documents": [], # Add workflow context fields @@ -515,7 +515,7 @@ class WorkflowManager: # Update workflow status to failed workflow.status = "failed" - workflow.lastActivity = self.services.utils.getUtcTimestamp() + workflow.lastActivity = self.services.utils.timestampGetUtc() self.services.workflow.updateWorkflow(workflow.id, { "status": "failed", "lastActivity": workflow.lastActivity, @@ -541,7 +541,7 @@ class WorkflowManager: "message": feedback, "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.getUtcTimestamp(), + "publishedAt": self.services.utils.timestampGetUtc(), "documentsLabel": "workflow_feedback", "documents": [], # Add workflow context fields @@ -558,7 +558,7 @@ class WorkflowManager: # Update workflow status to completed workflow.status = "completed" - workflow.lastActivity = self.services.utils.getUtcTimestamp() + workflow.lastActivity = self.services.utils.timestampGetUtc() # Update workflow in database self.services.workflow.updateWorkflow(workflow.id, { diff --git a/test_document_processing.py b/test_document_processing.py deleted file mode 100644 index 7d6bb64f..00000000 --- a/test_document_processing.py +++ /dev/null @@ -1,555 +0,0 @@ -""" -Test script for document processing and DOCX generation. -Calls the main AI service directly to process PDF documents and generate DOCX summaries. -""" - -import asyncio -import sys -import os -import logging -import base64 -from datetime import datetime -from pathlib import Path - -# Add the gateway module to the path -sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'modules')) - -from modules.datamodels.datamodelChat import ChatDocument -from modules.datamodels.datamodelAi import EnhancedAiCallOptions -from modules.services.serviceAi.mainServiceAi import AiService -from modules.services.serviceGeneration.mainServiceGeneration import GenerationService - -# Set up logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - - -async def process_documents_and_generate_summary(): - """Process documents using the main AI service with intelligent chunk integration.""" - logger.info("🚀 Starting intelligent chunk integration test...") - - # Find testdata directory - testdata_path = Path("../wiki/poweron/testdata") - if not testdata_path.exists(): - # Try relative to current directory - testdata_path = Path("wiki/poweron/testdata") - if not testdata_path.exists(): - # Try relative to parent directory - testdata_path = Path("../wiki/poweron/testdata") - if not testdata_path.exists(): - logger.error(f"❌ Testdata path not found. Tried:") - logger.error(f" - ../wiki/poweron/testdata") - logger.error(f" - wiki/poweron/testdata") - logger.error(f" - ../wiki/poweron/testdata") - logger.info("Please ensure the testdata folder exists with PDF documents") - return False - - # Find all supported document files - supported_extensions = [ - # Document formats - "*.pdf", "*.docx", "*.xlsx", "*.pptx", "*.ppt", - # Image formats - "*.jpg", "*.jpeg", "*.png", "*.gif", "*.webp", "*.bmp", "*.tiff", - # Text and code files - "*.txt", "*.md", "*.log", "*.rtf", "*.tex", "*.rst", "*.adoc", "*.org", "*.pod", - "*.java", "*.js", "*.jsx", "*.ts", "*.tsx", "*.py", "*.rb", "*.go", "*.rs", "*.cpp", "*.c", "*.h", "*.hpp", "*.cc", "*.cxx", - "*.cs", "*.php", "*.swift", "*.kt", "*.scala", "*.clj", "*.hs", "*.ml", "*.fs", "*.vb", "*.dart", "*.r", "*.m", "*.pl", "*.sh", - "*.html", "*.htm", "*.css", "*.scss", "*.sass", "*.less", "*.vue", "*.svelte", - "*.config", "*.ini", "*.cfg", "*.conf", "*.properties", "*.yaml", "*.yml", "*.toml", "*.json", "*.xml", - "*.bat", "*.ps1", "*.psm1", "*.psd1", "*.vbs", "*.wsf", "*.cmd", "*.com", - "*.csv", "*.tsv", "*.tab", "*.dat", "*.data", - "*.man", "*.1", "*.2", "*.3", "*.4", "*.5", "*.6", "*.7", "*.8", "*.9", "*.n", "*.l", "*.m", "*.r", "*.t", "*.x", "*.y", "*.z", - "*.diff", "*.patch", "*.gitignore", "*.dockerignore", "*.editorconfig", "*.gitattributes", - "*.env", "*.env.local", "*.env.development", "*.env.production", "*.env.test", - "*.lock", "*.lockb", "*.lockfile", "*.pkg-lock", "*.yarn-lock" - ] - document_files = [] - for ext in supported_extensions: - document_files.extend(list(testdata_path.glob(ext))) - - logger.info(f"Found {len(document_files)} document files in testdata:") - for doc_file in document_files: - logger.info(f" - {doc_file.name}") - - if not document_files: - logger.error("❌ No supported document files found in testdata folder") - return False - - try: - # Mock the database interface to provide our file data BEFORE creating AI service - class TestDbInterface: - def __init__(self, file_data_map): - self.file_data_map = file_data_map - - def getFileData(self, file_id): - logger.info(f"TestDbInterface.getFileData called with file_id: {file_id}") - data = self.file_data_map.get(file_id) - if data: - logger.info(f"✅ Found file data for {file_id}: {len(data)} bytes") - else: - logger.warning(f"❌ No file data found for {file_id}") - return data - - # Create file data mapping - file_data_map = {} - for i, doc_file in enumerate(document_files): - with open(doc_file, 'rb') as f: - file_data_map[f"test_doc_{i+1}"] = f.read() - logger.info(f"📁 Loaded {doc_file.name} as test_doc_{i+1}: {len(file_data_map[f'test_doc_{i+1}'])} bytes") - - # Mock the database interface BEFORE creating AI service - import modules.interfaces.interfaceDbComponentObjects as db_interface_module - original_get_interface = db_interface_module.getInterface - db_interface_module.getInterface = lambda: TestDbInterface(file_data_map) - logger.info("🔧 Database interface mocked successfully") - - # Create a mock service center with utils - class MockServiceCenter: - def __init__(self): - self.utils = MockUtils() - - class MockUtils: - def debugLogToFile(self, message, label): - logger.debug(f"[{label}] {message}") - print(f"DEBUG [{label}]: {message}") # Also print to console for visibility - - # Only write to debug file if debug logging is enabled (matching real implementation) - debug_enabled = self.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - try: - import os - from datetime import datetime, UTC - debug_dir = self.configGet("APP_DEBUG_CHAT_WORKFLOW_DIR", "./test-chat") - if not os.path.isabs(debug_dir): - # If relative path, make it relative to the gateway directory - gateway_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - debug_dir = os.path.join(gateway_dir, debug_dir) - - os.makedirs(debug_dir, exist_ok=True) - debug_file = os.path.join(debug_dir, "debug_workflow.log") - timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - debug_entry = f"[{timestamp}] [{label}] {message}\n" - with open(debug_file, "a", encoding="utf-8") as f: - f.write(debug_entry) - except Exception: - pass # Don't fail on debug logging errors - - def configGet(self, key, default): - # Return debug settings - if key == "APP_DEBUG_CHAT_WORKFLOW_ENABLED": - return True - elif key == "APP_DEBUG_CHAT_WORKFLOW_DIR": - return "./test-chat" - return default - - mock_service_center = MockServiceCenter() - - # Initialize the main AI service - let it handle everything - logger.info("🔧 Initializing main AI service...") - ai_service = await AiService.create(mock_service_center) - - # Create test documents - the AI service will handle file access internally - documents = [] - logger.info(f"📁 Found {len(document_files)} document files") - for i, doc_file in enumerate(document_files): - logger.info(f"📄 Processing file {i+1}/{len(document_files)}: {doc_file.name}") - # Determine MIME type based on file extension - mime_type = "application/octet-stream" # default - if doc_file.suffix.lower() == '.pdf': - mime_type = "application/pdf" - elif doc_file.suffix.lower() in ['.jpg', '.jpeg']: - mime_type = "image/jpeg" - elif doc_file.suffix.lower() == '.png': - mime_type = "image/png" - elif doc_file.suffix.lower() == '.gif': - mime_type = "image/gif" - elif doc_file.suffix.lower() == '.docx': - mime_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" - elif doc_file.suffix.lower() == '.xlsx': - mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" - elif doc_file.suffix.lower() == '.pptx': - mime_type = "application/vnd.openxmlformats-officedocument.presentationml.presentation" - elif doc_file.suffix.lower() == '.ppt': - mime_type = "application/vnd.ms-powerpoint" - elif doc_file.suffix.lower() == '.html': - mime_type = "text/html" - elif doc_file.suffix.lower() == '.csv': - mime_type = "text/csv" - elif doc_file.suffix.lower() == '.json': - mime_type = "application/json" - elif doc_file.suffix.lower() in ['.txt', '.md']: - mime_type = "text/plain" - - chat_doc = ChatDocument( - fileId=f"test_doc_{i+1}", - messageId=f"test_message_{i+1}", - fileName=doc_file.name, - mimeType=mime_type, - fileSize=doc_file.stat().st_size, - roundNumber=1, - taskNumber=1, - actionNumber=1, - actionId=f"test_action_{i+1}" - ) - documents.append(chat_doc) - logger.info(f"✅ Created ChatDocument: {chat_doc.fileName} ({chat_doc.mimeType}) - {chat_doc.fileSize} bytes") - - logger.info(f"📄 Created {len(documents)} document objects") - - # Create enhanced AI call options for intelligent chunked processing - ai_options = EnhancedAiCallOptions( - operationType="general", - enableParallelProcessing=True, - maxConcurrentChunks=5, # Increased for better testing - preserveChunkMetadata=True, - chunkSeparator="\n\n---\n\n" - ) - - # Call the main AI service directly - let it handle everything including DOCX generation - logger.info("🤖 Calling main AI service with intelligent merging...") - - - # Run a single end-to-end test to avoid the loop issue - logger.info("🧪 Running single end-to-end test...") - - userPrompt = "Analyze the document containing mails for customer use cases. Can you create one file for each email in plain text format?" - - # userPrompt = "Can you create one file for each section in the document" - - # userPrompt = "Analyze these documents and create a fitting image for the content" - - # userPrompt = "Extract the table from file and produce 2 lists in excel. one list with all entries, one list only with entries that are yellow highlighted." - - # userPrompt = "Create a docx file containing a summary and the COMPLETE list from the pdf file, having one additional column with a 'x' marker for all items, which are yellow highlighted." - - # userPrompt = "Create a docx file containing the combined documents in french language." - - try: - # Single AI call with DOCX generation - ai_response = await ai_service.callAi( - prompt=userPrompt, - documents=documents, - options=ai_options, - outputFormat="txt", - title="Kunden und Use Cases" - ) - - logger.info(f"✅ End-to-end test completed successfully") - logger.info(f"📊 Response type: {type(ai_response)}") - logger.info(f"📊 Response length: {len(str(ai_response))} characters") - - # Single test result - test_results = [{ - "test_name": "End-to-End DOCX Generation", - "success": True, - "response_type": type(ai_response).__name__, - "response_length": len(str(ai_response)), - "response": ai_response - }] - - except Exception as e: - logger.error(f"❌ End-to-end test failed: {str(e)}") - test_results = [{ - "test_name": "End-to-End DOCX Generation", - "success": False, - "error": str(e), - "response": None - }] - - logger.info(f"🎯 Completed 1 end-to-end test") - - # Process all test results and save outputs - logger.info("📊 Processing test results...") - - successful_tests = [r for r in test_results if r['success']] - failed_tests = [r for r in test_results if not r['success']] - - logger.info(f"✅ Successful tests: {len(successful_tests)}") - logger.info(f"❌ Failed tests: {len(failed_tests)}") - - # Display test results summary - logger.info("=" * 80) - logger.info("END-TO-END TEST RESULTS SUMMARY") - logger.info("=" * 80) - for i, result in enumerate(test_results, 1): - status = "✅ PASS" if result['success'] else "❌ FAIL" - logger.info(f"Test {i}: {result['test_name']} - {status}") - if result['success']: - logger.info(f" Response Type: {result['response_type']}") - logger.info(f" Response Length: {result['response_length']} characters") - else: - logger.info(f" Error: {result['error']}") - logger.info("=" * 80) - - # Create output directory if it doesn't exist - output_dir = Path("test-chat/unittestoutput") - output_dir.mkdir(parents=True, exist_ok=True) - - # Save all test results and generated files - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - - logger.info("💾 Saving test results and generated files...") - - try: - for i, result in enumerate(successful_tests, 1): - test_name = result['test_name'].replace(' ', '_').lower() - response = result['response'] - - logger.info(f"💾 Saving Test {i}: {result['test_name']}") - - # Handle different response types - if isinstance(response, dict): - # Document generation response - if 'documents' in response and response['documents']: - logger.info(f"📄 Found {len(response['documents'])} documents in response") - - for j, doc in enumerate(response['documents']): - doc_name = doc.get('documentName', f'{test_name}_document_{j+1}') - doc_data = doc.get('documentData', '') - doc_mime = doc.get('mimeType', 'application/octet-stream') - - logger.info(f"📄 Document {j+1}: {doc_name}") - logger.info(f"📄 MIME Type: {doc_mime}") - logger.info(f"📄 Data length: {len(doc_data)} characters") - - # Determine file extension with better MIME type detection - file_ext = '.bin' # Default fallback - - if doc_mime: - if 'docx' in doc_mime.lower() or 'wordprocessingml' in doc_mime.lower(): - file_ext = '.docx' - elif 'pdf' in doc_mime.lower(): - file_ext = '.pdf' - elif 'txt' in doc_mime.lower() or 'plain' in doc_mime.lower(): - file_ext = '.txt' - elif 'html' in doc_mime.lower(): - file_ext = '.html' - elif 'json' in doc_mime.lower(): - file_ext = '.json' - elif 'csv' in doc_mime.lower(): - file_ext = '.csv' - elif 'xlsx' in doc_mime.lower() or 'spreadsheetml' in doc_mime.lower(): - file_ext = '.xlsx' - elif 'pptx' in doc_mime.lower() or 'presentationml' in doc_mime.lower(): - file_ext = '.pptx' - elif 'markdown' in doc_mime.lower() or 'md' in doc_mime.lower(): - file_ext = '.md' - elif 'png' in doc_mime.lower() or 'image' in doc_mime.lower(): - file_ext = '.png' - elif 'jpg' in doc_mime.lower() or 'jpeg' in doc_mime.lower(): - file_ext = '.jpg' - else: - logger.warning(f"⚠️ Unknown MIME type: {doc_mime}, using .bin") - - # Also check filename for hints - if doc_name and '.' in doc_name: - name_ext = '.' + doc_name.split('.')[-1].lower() - if name_ext in ['.docx', '.pdf', '.txt', '.html', '.json', '.csv', '.xlsx', '.pptx', '.md', '.png', '.jpg', '.jpeg']: - file_ext = name_ext - logger.info(f"📄 Using extension from filename: {file_ext}") - - logger.info(f"📄 Final file extension: {file_ext}") - - # Save document - output_path = output_dir / f"{test_name}_{timestamp}{file_ext}" - - # Handle different content types - if file_ext in ['.md', '.txt', '.html', '.json', '.csv']: - # Text-based formats - save directly as text - with open(output_path, 'w', encoding='utf-8') as f: - f.write(doc_data) - logger.info(f"✅ Document saved as text: {output_path} ({len(doc_data)} characters)") - elif file_ext in ['.png', '.jpg', '.jpeg']: - # Image formats - decode from base64 - try: - doc_bytes = base64.b64decode(doc_data) - with open(output_path, 'wb') as f: - f.write(doc_bytes) - logger.info(f"✅ Image saved: {output_path} ({len(doc_bytes)} bytes)") - except Exception as e: - logger.warning(f"⚠️ Failed to decode image as base64: {e}") - # Save as text if base64 decoding fails - with open(output_path, 'w', encoding='utf-8') as f: - f.write(doc_data) - logger.info(f"✅ Image saved as text (fallback): {output_path}") - else: - # Other binary formats - decode from base64 - try: - doc_bytes = base64.b64decode(doc_data) - with open(output_path, 'wb') as f: - f.write(doc_bytes) - logger.info(f"✅ Document saved as binary: {output_path} ({len(doc_bytes)} bytes)") - except Exception as e: - logger.warning(f"⚠️ Failed to decode document as base64: {e}") - # Save as text if base64 decoding fails - with open(output_path, 'w', encoding='utf-8') as f: - f.write(doc_data) - logger.info(f"✅ Document saved as text (fallback): {output_path}") - - # Also save raw content as text - content = response.get('content', '') - if content: - text_path = output_dir / f"{test_name}_content_{timestamp}.txt" - with open(text_path, 'w', encoding='utf-8') as f: - # Handle both string and dictionary content - if isinstance(content, dict): - import json - f.write(json.dumps(content, indent=2, ensure_ascii=False)) - else: - f.write(str(content)) - logger.info(f"✅ Content saved: {text_path}") - - elif isinstance(response, str): - # Text response - text_path = output_dir / f"{test_name}_response_{timestamp}.txt" - with open(text_path, 'w', encoding='utf-8') as f: - f.write(response) - logger.info(f"✅ Text response saved: {text_path}") - - else: - logger.warning(f"⚠️ Unknown response type for {result['test_name']}: {type(response)}") - - # Save failed test details - if failed_tests: - error_path = output_dir / f"failed_tests_{timestamp}.txt" - with open(error_path, 'w', encoding='utf-8') as f: - f.write("# Failed Test Details\n\n") - for i, result in enumerate(failed_tests, 1): - f.write(f"## Test {i}: {result['test_name']}\n") - f.write(f"**Error:** {result['error']}\n\n") - logger.info(f"✅ Failed test details saved: {error_path}") - - except Exception as e: - logger.error(f"❌ Error saving test results: {str(e)}") - return False - - # Save comprehensive test report - report_path = output_dir / f"end_to_end_test_report_{timestamp}.txt" - with open(report_path, 'w', encoding='utf-8') as f: - f.write(f"# End-to-End AI Service Test Report\n") - f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") - - f.write(f"## Test Configuration\n") - f.write(f"- Documents processed: {len(documents)}\n") - f.write(f"- Processing method: Intelligent Token-Aware Merging\n") - f.write(f"- Parallel processing: {ai_options.enableParallelProcessing}\n") - f.write(f"- Max concurrent chunks: {ai_options.maxConcurrentChunks}\n") - f.write(f"- Chunk metadata preserved: {ai_options.preserveChunkMetadata}\n") - f.write(f"- Chunk separator: '{ai_options.chunkSeparator}'\n\n") - - f.write(f"## Document Inventory\n") - for i, doc in enumerate(documents, 1): - f.write(f"{i}. **{doc.fileName}**\n") - f.write(f" - MIME Type: {doc.mimeType}\n") - f.write(f" - File Size: {doc.fileSize:,} bytes\n") - f.write(f" - File ID: {doc.fileId}\n\n") - - f.write(f"## Test Results Summary\n") - f.write(f"- Total Tests: {len(test_results)}\n") - f.write(f"- Successful: {len(successful_tests)}\n") - f.write(f"- Failed: {len(failed_tests)}\n") - f.write(f"- Success Rate: {len(successful_tests)/len(test_results)*100:.1f}%\n\n") - - f.write(f"## Detailed Test Results\n") - for i, result in enumerate(test_results, 1): - f.write(f"### Test {i}: {result['test_name']}\n") - f.write(f"**Status:** {'✅ PASS' if result['success'] else '❌ FAIL'}\n") - - if result['success']: - f.write(f"**Response Type:** {result['response_type']}\n") - f.write(f"**Response Length:** {result['response_length']} characters\n") - - # Show response preview - response_preview = str(result['response'])[:500] - f.write(f"**Response Preview:**\n```\n{response_preview}...\n```\n\n") - else: - f.write(f"**Error:** {result['error']}\n\n") - - f.write(f"## Technical Implementation Details\n") - f.write(f"This test validates the complete AI service pipeline:\n\n") - f.write(f"### Tested Components:\n") - f.write(f"- **Document Extraction**: PDF, DOCX, images, etc.\n") - f.write(f"- **Intelligent Chunking**: Token-aware merging\n") - f.write(f"- **Model Selection**: Automatic AI model choice\n") - f.write(f"- **Parallel Processing**: Concurrent chunk processing\n") - f.write(f"- **Document Generation**: DOCX, PDF, text output\n") - f.write(f"- **Error Handling**: Graceful failure management\n\n") - - f.write(f"### Performance Metrics:\n") - f.write(f"- **Chunk Optimization**: Intelligent merging reduces AI calls\n") - f.write(f"- **Processing Speed**: Parallel execution\n") - f.write(f"- **Memory Efficiency**: Token-aware chunking\n") - f.write(f"- **Output Quality**: Multiple format support\n\n") - - f.write(f"## Generated Files\n") - for i, result in enumerate(successful_tests, 1): - test_name = result['test_name'].replace(' ', '_').lower() - f.write(f"- **Test {i}**: {result['test_name']} → `{test_name}_*_{timestamp}.*`\n") - - if failed_tests: - f.write(f"- **Failed Tests**: `failed_tests_{timestamp}.txt`\n") - - f.write(f"- **This Report**: `end_to_end_test_report_{timestamp}.txt`\n\n") - - f.write(f"The end-to-end test successfully validates the complete AI service\n") - f.write(f"pipeline from document input to formatted output generation.\n") - - logger.info(f"✅ Comprehensive test report saved: {report_path}") - - # Show debug file locations - debug_files = [] - try: - debug_dir = Path("test-chat") - if debug_dir.exists(): - debug_files.extend(list(debug_dir.glob("*.log"))) - debug_files.extend(list(debug_dir.glob("ai/*.txt"))) - - if debug_files: - logger.info("📁 Debug files created:") - for debug_file in debug_files: - logger.info(f" - {debug_file}") - else: - logger.info("📁 No debug files found in test-chat directory") - except Exception as e: - logger.warning(f"Could not list debug files: {e}") - - # Restore original database interface - db_interface_module.getInterface = original_get_interface - - return True - - except Exception as e: - logger.error(f"❌ Error during document processing: {str(e)}") - import traceback - logger.error(f"Traceback: {traceback.format_exc()}") - - # Restore original database interface in case of error - try: - db_interface_module.getInterface = original_get_interface - except: - pass - - return False - -async def main(): - """Main function to run the intelligent chunk integration test.""" - logger.info("🎯 Starting Intelligent Chunk Integration Test") - logger.info("=" * 60) - - success = await process_documents_and_generate_summary() - - if success: - logger.info("🎉 Intelligent chunk integration test completed successfully!") - logger.info("✅ Main AI service handled all processing internally") - logger.info("✅ Intelligent token-aware merging activated") - logger.info("✅ DOCX document generated directly by AI service") - logger.info("✅ Detailed chunk integration analysis saved") - logger.info("✅ Performance optimization achieved") - else: - logger.error("❌ Test failed!") - logger.error("Please check the error messages above for details") - - logger.info("=" * 60) - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file