From ab5618a9a5c3c429e724d27364dbf7610e53a64b Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 20 Oct 2025 12:22:01 +0200
Subject: [PATCH] centralized debug handling
---
modules/features/syncDelta/mainSyncDelta.py | 10 +-
modules/interfaces/interfaceDbChatObjects.py | 147 +----
modules/services/serviceAi/mainServiceAi.py | 12 -
modules/services/serviceAi/subCoreAi.py | 84 +--
.../serviceAi/subDocumentGeneration.py | 25 +-
.../serviceAi/subDocumentProcessing.py | 63 +-
modules/services/serviceAi/subUtilities.py | 319 ----------
.../services/serviceExtraction/subPipeline.py | 1 -
.../renderers/rendererBaseTemplate.py | 30 +-
.../renderers/rendererImage.py | 12 +-
.../serviceGeneration/subPromptBuilder.py | 21 +-
.../mainServiceNormalization.py | 39 +-
.../services/serviceUtils/mainServiceUtils.py | 131 +++--
.../serviceWorkflow/mainServiceWorkflow.py | 15 +
modules/shared/debugLogger.py | 204 ++++++-
modules/shared/jsonUtils.py | 137 +++++
modules/shared/progressLogger_example.py | 183 ------
modules/workflows/methods/methodOutlook.py | 19 +-
modules/workflows/methods/methodSharepoint.py | 8 +-
.../processing/core/actionExecutor.py | 4 +-
.../processing/core/messageCreator.py | 12 +-
.../processing/modes/modeActionplan.py | 8 +-
.../workflows/processing/modes/modeReact.py | 10 +-
.../workflows/processing/workflowProcessor.py | 4 +-
modules/workflows/workflowManager.py | 26 +-
test_document_processing.py | 555 ------------------
26 files changed, 529 insertions(+), 1550 deletions(-)
delete mode 100644 modules/services/serviceAi/subUtilities.py
create mode 100644 modules/shared/jsonUtils.py
delete mode 100644 modules/shared/progressLogger_example.py
delete mode 100644 test_document_processing.py
diff --git a/modules/features/syncDelta/mainSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py
index 0cfaa173..e3d363a3 100644
--- a/modules/features/syncDelta/mainSyncDelta.py
+++ b/modules/features/syncDelta/mainSyncDelta.py
@@ -110,7 +110,7 @@ class ManagerSyncDelta:
def _log_audit_event(self, action: str, status: str, details: str):
"""Log audit events for sync operations to memory."""
try:
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
user_id = str(self.eventUser.id) if self.eventUser else "system"
log_entry = f"{timestamp} | {user_id} | {action} | {status} | {details}"
self.sync_audit_log.append(log_entry)
@@ -146,7 +146,7 @@ class ManagerSyncDelta:
return False
# Generate log filename with current timestamp
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d_%H%M%S")
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d_%H%M%S")
log_filename = f"log_{timestamp}.log"
# Create log content
@@ -388,7 +388,7 @@ class ManagerSyncDelta:
async def backupSharepointFile(self, *, filename: str) -> bool:
try:
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d_%H%M%S")
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d_%H%M%S")
backup_filename = f"backup_{timestamp}_{filename}"
await self.services.sharepoint.copy_file_async(
site_id=self.targetSite['id'],
@@ -452,7 +452,7 @@ class ManagerSyncDelta:
return merged_data, details
def createCsvContent(self, data: list[dict], existing_headers: dict | None = None) -> bytes:
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
if existing_headers is None:
existing_headers = {"header1": "Header 1", "header2": "Header 2"}
if not data:
@@ -476,7 +476,7 @@ class ManagerSyncDelta:
return out.getvalue().encode('utf-8')
def createExcelContent(self, data: list[dict], existing_headers: dict | None = None) -> bytes:
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
if existing_headers is None:
existing_headers = {"header1": "Header 1", "header2": "Header 2"}
if not data:
diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py
index d29c0f14..2c952058 100644
--- a/modules/interfaces/interfaceDbChatObjects.py
+++ b/modules/interfaces/interfaceDbChatObjects.py
@@ -548,9 +548,8 @@ class ChatObjects:
)
# Debug: Store message and documents for debugging - only if debug enabled
- debug_enabled = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- self._storeDebugMessageAndDocuments(chat_message)
+ from modules.shared.debugLogger import storeDebugMessageAndDocuments
+ storeDebugMessageAndDocuments(chat_message, self.currentUser)
return chat_message
@@ -975,148 +974,6 @@ class ChatObjects:
return {"items": items}
- def _storeDebugMessageAndDocuments(self, message: ChatMessage) -> None:
- """
- Store message and documents (metadata and file bytes) for debugging purposes.
- Structure: {log_dir}/debug/messages/m_round_task_action_timestamp/documentlist_label/
- - message.json, message_text.txt
- - document_###_metadata.json
- - document_###_ (actual file bytes)
-
- Args:
- message: ChatMessage object to store
- """
- try:
- import os
- import json
- from datetime import datetime, UTC
-
- # Create base debug directory
- # Use configured log directory instead of hardcoded test-chat
- from modules.shared.configuration import APP_CONFIG
- logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
- if not os.path.isabs(logDir):
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- logDir = os.path.join(gatewayDir, logDir)
- debug_root = os.path.join(logDir, 'debug', 'messages')
- os.makedirs(debug_root, exist_ok=True)
-
- # Generate timestamp
- timestamp = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
-
- # Create message folder name: m_round_task_action_timestamp
- # Use actual values from message, not defaults
- round_str = str(message.roundNumber) if message.roundNumber is not None else "0"
- task_str = str(message.taskNumber) if message.taskNumber is not None else "0"
- action_str = str(message.actionNumber) if message.actionNumber is not None else "0"
- message_folder = f"{timestamp}_m_{round_str}_{task_str}_{action_str}"
-
- message_path = os.path.join(debug_root, message_folder)
- os.makedirs(message_path, exist_ok=True)
-
- # Store message data - use dict() instead of model_dump() for compatibility
- message_file = os.path.join(message_path, "message.json")
- with open(message_file, "w", encoding="utf-8") as f:
- # Convert message to dict manually to avoid model_dump() issues
- message_dict = {
- "id": message.id,
- "workflowId": message.workflowId,
- "parentMessageId": message.parentMessageId,
- "message": message.message,
- "role": message.role,
- "status": message.status,
- "sequenceNr": message.sequenceNr,
- "publishedAt": message.publishedAt,
- "roundNumber": message.roundNumber,
- "taskNumber": message.taskNumber,
- "actionNumber": message.actionNumber,
- "documentsLabel": message.documentsLabel,
- "actionId": message.actionId,
- "actionMethod": message.actionMethod,
- "actionName": message.actionName,
- "success": message.success,
- "documents": []
- }
- json.dump(message_dict, f, indent=2, ensure_ascii=False, default=str)
-
- # Store message content as text
- if message.message:
- message_text_file = os.path.join(message_path, "message_text.txt")
- with open(message_text_file, "w", encoding="utf-8") as f:
- f.write(str(message.message))
-
- # Store documents if provided
- if message.documents and len(message.documents) > 0:
- logger.info(f"Debug: Processing {len(message.documents)} documents")
-
- # Group documents by documentsLabel
- documents_by_label = {}
- for doc in message.documents:
- label = message.documentsLabel or 'default'
- if label not in documents_by_label:
- documents_by_label[label] = []
- documents_by_label[label].append(doc)
-
- # Create subfolder for each document label
- for label, docs in documents_by_label.items():
- # Sanitize label for filesystem
- safe_label = "".join(c for c in str(label) if c.isalnum() or c in (' ', '-', '_')).rstrip()
- safe_label = safe_label.replace(' ', '_')
- if not safe_label:
- safe_label = "default"
-
- label_folder = os.path.join(message_path, safe_label)
- os.makedirs(label_folder, exist_ok=True)
- logger.info(f"Debug: Created document folder: {label_folder}")
-
- # Store each document
- for i, doc in enumerate(docs):
- # Create document metadata file
- doc_meta = {
- "id": doc.id,
- "messageId": doc.messageId,
- "fileId": doc.fileId,
- "fileName": doc.fileName,
- "fileSize": doc.fileSize,
- "mimeType": doc.mimeType,
- "roundNumber": doc.roundNumber,
- "taskNumber": doc.taskNumber,
- "actionNumber": doc.actionNumber,
- "actionId": doc.actionId
- }
-
- doc_meta_file = os.path.join(label_folder, f"document_{i+1:03d}_metadata.json")
- with open(doc_meta_file, "w", encoding="utf-8") as f:
- json.dump(doc_meta, f, indent=2, ensure_ascii=False, default=str)
-
- logger.info(f"Debug: Stored document metadata for {doc.fileName}")
-
- # Also store the actual file bytes next to metadata for debugging
- try:
- # Lazy import to avoid circular deps at module load
- from modules.interfaces import interfaceDbComponentObjects as comp
- componentInterface = comp.getInterface(self.currentUser)
- file_bytes = componentInterface.getFileData(doc.fileId)
- if file_bytes:
- # Build a safe filename preserving original name
- safe_name = doc.fileName or f"document_{i+1:03d}"
- # Avoid path traversal
- safe_name = os.path.basename(safe_name)
- doc_file_path = os.path.join(label_folder, f"document_{i+1:03d}_" + safe_name)
- with open(doc_file_path, "wb") as df:
- df.write(file_bytes)
- logger.info(f"Debug: Stored document file bytes: {doc_file_path} ({len(file_bytes)} bytes)")
- else:
- logger.warning(f"Debug: No file bytes returned for fileId {doc.fileId}")
- except Exception as e:
- logger.error(f"Debug: Failed to store document file for {doc.fileName} (fileId {doc.fileId}): {e}")
-
- logger.info(f"Debug: Stored message and documents in {message_path}")
-
- except Exception as e:
- logger.error(f"Debug: Failed to store message and documents: {e}")
- import traceback
- logger.error(f"Debug: Traceback: {traceback.format_exc()}")
def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects':
diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py
index c036db32..511fb311 100644
--- a/modules/services/serviceAi/mainServiceAi.py
+++ b/modules/services/serviceAi/mainServiceAi.py
@@ -15,12 +15,10 @@ from modules.datamodels.datamodelWeb import (
WebSearchResultItem,
)
from modules.interfaces.interfaceAiObjects import AiObjects
-from modules.shared.configuration import APP_CONFIG
from modules.services.serviceAi.subCoreAi import SubCoreAi
from modules.services.serviceAi.subDocumentProcessing import SubDocumentProcessing
from modules.services.serviceAi.subWebResearch import SubWebResearch
from modules.services.serviceAi.subDocumentGeneration import SubDocumentGeneration
-from modules.services.serviceAi.subUtilities import SubUtilities
logger = logging.getLogger(__name__)
@@ -33,7 +31,6 @@ class AiService:
- SubDocumentProcessing: Document chunking, processing, and merging logic
- SubWebResearch: Web research and crawling functionality
- SubDocumentGeneration: Single-file and multi-file document generation
- - SubUtilities: Helper functions, text processing, and debugging utilities
The main service acts as a coordinator:
1. Manages lazy initialization of sub-modules
@@ -55,7 +52,6 @@ class AiService:
self._documentProcessor = None # Lazy initialization
self._webResearch = None # Lazy initialization
self._documentGenerator = None # Lazy initialization
- self._utilities = None # Lazy initialization
@property
def extractionService(self):
@@ -99,14 +95,6 @@ class AiService:
self._documentGenerator = SubDocumentGeneration(self.services, self.aiObjects, self.documentProcessor)
return self._documentGenerator
- @property
- def utilities(self):
- """Lazy initialization of utilities service."""
- if self._utilities is None:
- logger.info("Lazy initializing SubUtilities...")
- self._utilities = SubUtilities(self.services)
- return self._utilities
-
async def _ensureAiObjectsInitialized(self):
"""Ensure aiObjects is initialized."""
if self.aiObjects is None:
diff --git a/modules/services/serviceAi/subCoreAi.py b/modules/services/serviceAi/subCoreAi.py
index 082527d2..d581214b 100644
--- a/modules/services/serviceAi/subCoreAi.py
+++ b/modules/services/serviceAi/subCoreAi.py
@@ -1,8 +1,8 @@
+import json
import logging
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority
-from modules.shared.debugLogger import writeDebugFile
logger = logging.getLogger(__name__)
@@ -95,13 +95,13 @@ class SubCoreAi:
)
# Write the ACTUAL prompt sent to AI (including continuation context)
- writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}", None)
+ self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.aiObjects.call(request)
result = response.content
# Write raw AI response to debug file
- writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}", None)
+ self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration
self.services.workflow.storeWorkflowStat(
@@ -117,9 +117,10 @@ class SubCoreAi:
# Check if this is a continuation response (only for supported formats)
if loopInstructionFormat in LoopInstructionTexts:
try:
- import json
+ # Extract JSON substring if wrapped (e.g., ```json ... ```)
+ extracted = self.services.utils.jsonExtractString(result)
# Try to parse as JSON to check for continuation attribute
- parsed_result = json.loads(result)
+ parsed_result = json.loads(extracted)
if isinstance(parsed_result, dict) and parsed_result.get("continuation") is not None:
# This is a continuation response
accumulatedContent.append(result)
@@ -134,6 +135,7 @@ class SubCoreAi:
# Not JSON, treat as final response
accumulatedContent.append(result)
logger.warning(f"Iteration {iteration}: Non-JSON response received")
+ self.services.utils.writeDebugFile(result, f"{debugPrefix}_error_non_json_response_iteration_{iteration}")
break
else:
# This is the final response
@@ -152,7 +154,7 @@ class SubCoreAi:
final_result = self._mergeJsonContent(accumulatedContent) if accumulatedContent else ""
# Write final result to debug file
- writeDebugFile(final_result, f"{debugPrefix}_final_result", None)
+ self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
logger.info(f"AI call completed: {len(accumulatedContent)} parts from {iteration} iterations")
return final_result
@@ -169,7 +171,6 @@ class SubCoreAi:
continuation_description = ""
if accumulatedContent:
try:
- import json
last_response = accumulatedContent[-1]
parsed_response = json.loads(last_response)
if isinstance(parsed_response, dict) and parsed_response.get("continuation"):
@@ -207,13 +208,13 @@ Continue generating content now."""
return accumulatedContent[0]
try:
- import json
# Parse all JSON responses
parsed_responses = []
for content in accumulatedContent:
try:
- parsed = json.loads(content)
+ extracted = self.services.utils.jsonExtractString(content)
+ parsed = json.loads(extracted)
parsed_responses.append(parsed)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON content: {str(e)}")
@@ -244,6 +245,7 @@ Continue generating content now."""
logger.error(f"Error merging JSON content: {str(e)}")
return accumulatedContent[0] # Return first response on error
+
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
@@ -361,10 +363,10 @@ Continue generating content now."""
generation_prompt = await self._buildGenerationPrompt(prompt, extracted_content, outputFormat, title)
generated_json = await self._callAiWithLooping(generation_prompt, options, "document_generation", loopInstructionFormat=loopInstructionFormat)
- # Parse the generated JSON
+ # Parse the generated JSON (extract fenced/embedded JSON first)
try:
- import json
- generated_data = json.loads(generated_json)
+ extracted_json = self.services.utils.jsonExtractString(generated_json)
+ generated_data = json.loads(extracted_json)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse generated JSON: {str(e)}")
logger.error(f"JSON content length: {len(generated_json)}")
@@ -372,7 +374,7 @@ Continue generating content now."""
logger.error(f"JSON content around error position: {generated_json[max(0, e.pos-50):e.pos+50]}")
# Write the problematic JSON to debug file
- writeDebugFile(generated_json, "failed_json_parsing", None)
+ self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"}
@@ -403,11 +405,7 @@ Continue generating content now."""
}
# Log AI response for debugging
- try:
- writeDebugFile(str(result), "documentGenerationResponse", documents)
- except Exception:
- pass
-
+ self.services.utils.writeDebugFile(str(result), "documentGenerationResponse", documents)
return result
except Exception as e:
@@ -617,26 +615,6 @@ Continue generating content now."""
"imageChunkSize": image_chunk_size
}
- def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]:
- """
- Get models capable of handling the specific operation with capability filtering.
- """
- # Use the actual AI objects model selection instead of hardcoded default
- if hasattr(self, 'aiObjects') and self.aiObjects:
- # Let AiObjects handle the model selection
- return []
- else:
- # Fallback to default model if AiObjects not available
- default_model = ModelCapabilities(
- name="default",
- maxTokens=4000,
- capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"],
- costPerToken=0.001,
- processingTime=1.0,
- isAvailable=True
- )
- return [default_model]
-
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
@@ -687,36 +665,6 @@ Continue generating content now."""
return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
- def _reduceTextPrompt(
- self,
- prompt: str,
- context: str,
- model: ModelCapabilities,
- options: AiCallOptions
- ) -> str:
- """
- Reduce text prompt size using typeGroup-aware chunking and merging.
- """
- max_size = int(model.maxTokens * (1 - options.safetyMargin))
-
- if options.compressPrompt:
- # Reduce both prompt and context
- target_size = max_size
- current_size = len(prompt) + len(context)
- reduction_factor = (target_size * 0.7) / current_size
-
- if reduction_factor < 1.0:
- prompt = self._reduceText(prompt, reduction_factor)
- context = self._reduceText(context, reduction_factor)
- else:
- # Only reduce context, preserve prompt integrity
- max_context_size = max_size - len(prompt)
- if len(context) > max_context_size:
- reduction_factor = max_context_size / len(context)
- context = self._reduceText(context, reduction_factor)
-
- return prompt + "\n\n" + context if context else prompt
-
def _extractTextFromContentParts(self, extracted_content) -> str:
"""
Extract text content from ExtractionService ContentPart objects.
diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py
index d11a1122..9af20b1e 100644
--- a/modules/services/serviceAi/subDocumentGeneration.py
+++ b/modules/services/serviceAi/subDocumentGeneration.py
@@ -1,4 +1,8 @@
+import re
+import json
import logging
+import time
+from datetime import datetime, UTC
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
@@ -82,8 +86,7 @@ class SubDocumentGeneration:
Unified document processing that handles both single and multi-file cases.
Always processes as multi-file structure internally.
"""
- import time
-
+
# Create progress logger
workflow = self.services.currentWorkflow
progressLogger = self.services.workflow.createProgressLogger(workflow)
@@ -102,8 +105,7 @@ class SubDocumentGeneration:
progressLogger.updateProgress(operationId, 0.1, "Generating prompt")
# Write prompt to debug file
- from modules.shared.debugLogger import writeDebugFile
- writeDebugFile(extractionPrompt, "extraction_prompt", documents)
+ self.services.utils.writeDebugFile(extractionPrompt, "extraction_prompt", documents)
# Process with unified JSON pipeline using continuation logic
aiResponse = await self.documentProcessor.processDocumentsWithContinuation(
@@ -116,10 +118,8 @@ class SubDocumentGeneration:
# Write AI response to debug file
- from modules.shared.debugLogger import writeDebugFile
- import json
response_json = json.dumps(aiResponse, indent=2, ensure_ascii=False) if isinstance(aiResponse, dict) else str(aiResponse)
- writeDebugFile(response_json, "ai_response", documents)
+ self.services.utils.writeDebugFile(response_json, "ai_response", documents)
# Validate response structure
if not self._validateUnifiedResponseStructure(aiResponse):
@@ -341,7 +341,6 @@ class SubDocumentGeneration:
requestOptions.operationType = OperationType.GENERAL
# Create context with the extracted JSON content
- import json
context = f"Extracted JSON content:\n{json.dumps(docData, indent=2)}"
request = AiCallRequest(
@@ -356,7 +355,6 @@ class SubDocumentGeneration:
if response and response.content:
# Parse the AI response as JSON
try:
- import re
result = response.content.strip()
# Extract JSON from markdown if present
@@ -488,9 +486,6 @@ Return only the JSON response.
response = await ai_service.aiObjects.call(request)
if response and response.content:
- import json
- import re
-
# Extract JSON from response
result = response.content.strip()
json_match = re.search(r'\{.*\}', result, re.DOTALL)
@@ -516,10 +511,8 @@ Return only the JSON response.
workflow = services.currentWorkflow
# Serialize payload
- import json as _json
- from datetime import datetime, UTC
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- content_text = _json.dumps(payload, ensure_ascii=False, indent=2)
+ content_text = json.dumps(payload, ensure_ascii=False, indent=2)
content_bytes = content_text.encode('utf-8')
# Store as file via component storage
@@ -548,7 +541,7 @@ Return only the JSON response.
"message": "Raw extraction data saved",
"status": "data",
"sequenceNr": len(getattr(workflow, 'messages', []) or []) + 1,
- "publishedAt": services.utils.getUtcTimestamp(),
+ "publishedAt": services.utils.timestampGetUtc(),
"documentsLabel": label,
"documents": []
}
diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py
index 8fe3714a..81d355e4 100644
--- a/modules/services/serviceAi/subDocumentProcessing.py
+++ b/modules/services/serviceAi/subDocumentProcessing.py
@@ -1,4 +1,7 @@
+import json
import logging
+import re
+import time
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority
@@ -105,11 +108,7 @@ class SubDocumentProcessing:
mergedContent = self._mergeChunkResults(chunkResults, options)
# Save merged extraction content to debug
- try:
- from modules.shared.debugLogger import writeDebugFile
- writeDebugFile(mergedContent or '', "extractionMergedText")
- except Exception:
- pass
+ self.services.utils.writeDebugFile(mergedContent or '', "extractionMergedText")
return mergedContent
@@ -198,13 +197,8 @@ class SubDocumentProcessing:
# Continue with original merged JSON instead of re-raising
# Save merged JSON extraction content to debug
- try:
- from modules.shared.debugLogger import writeDebugFile
- import json as _json
- jsonStr = _json.dumps(mergedJsonDocument, ensure_ascii=False, indent=2)
- writeDebugFile(jsonStr, "extractionMergedJson")
- except Exception:
- pass
+ jsonStr = json.dumps(mergedJsonDocument, ensure_ascii=False, indent=2)
+ self.services.utils.writeDebugFile(jsonStr, "extractionMergedJson")
return mergedJsonDocument
@@ -523,7 +517,6 @@ CONTINUATION INSTRUCTIONS:
"""Process chunks with proper mapping to preserve relationships."""
from modules.datamodels.datamodelExtraction import ChunkResult
import asyncio
- import time
# Collect all chunks that need processing with proper indexing
chunks_to_process = []
@@ -598,25 +591,8 @@ CONTINUATION INSTRUCTIONS:
)
self.services.utils.debugLogToFile(f"Image analysis result for chunk {chunk_index}: length={len(ai_result) if ai_result else 0}, preview={ai_result[:200] if ai_result else 'None'}...", "AI_SERVICE")
- # Save image extraction response to debug file - only if debug enabled
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- try:
- import os
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- # Use configured log directory instead of hardcoded test-chat
- from modules.shared.configuration import APP_CONFIG
- logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
- if not os.path.isabs(logDir):
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- logDir = os.path.join(gatewayDir, logDir)
- debug_root = os.path.join(logDir, 'debug')
- os.makedirs(debug_root, exist_ok=True)
- with open(os.path.join(debug_root, f"{ts}_extraction_image_chunk_{chunk_index}.txt"), "w", encoding="utf-8") as f:
- f.write(f"EXTRACTION IMAGE RESPONSE:\n{ai_result if ai_result else 'No response'}\n")
- except Exception:
- pass
+ # Save image extraction response to debug file
+ self.services.utils.writeDebugFile(ai_result or 'No response', f"extraction_image_chunk_{chunk_index}")
# Check if result is empty or None
if not ai_result or not ai_result.strip():
@@ -630,8 +606,6 @@ CONTINUATION INSTRUCTIONS:
# If generating JSON, clean image analysis result
if generate_json:
try:
- import json
- import re
# Clean the response - remove markdown code blocks if present
cleaned_result = ai_result.strip()
@@ -720,18 +694,12 @@ CONTINUATION INSTRUCTIONS:
self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE")
# Save extraction prompt and response to debug
- try:
- from modules.shared.debugLogger import writeDebugFile
- writeDebugFile(augmented_prompt, f"extraction-Chunk{chunk_index}-Prompt")
- writeDebugFile(ai_result or '', f"extraction-Chunk{chunk_index}-Response")
- except Exception:
- pass
+ self.services.utils.writeDebugFile(augmented_prompt, f"extraction-Chunk{chunk_index}-Prompt")
+ self.services.utils.writeDebugFile(ai_result or '', f"extraction-Chunk{chunk_index}-Response")
# If generating JSON, validate the response
if generate_json:
try:
- import json
- import re
# Clean the response - remove markdown code blocks if present
cleaned_result = ai_result.strip()
@@ -821,18 +789,12 @@ CONTINUATION INSTRUCTIONS:
self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE")
# Save extraction prompt and response to debug
- try:
- from modules.shared.debugLogger import writeDebugFile
- writeDebugFile(augmented_prompt_text, f"extractionChunk{chunk_index}-Prompt")
- writeDebugFile(ai_result or '', f"extractionChunk{chunk_index}-Response")
- except Exception:
- pass
+ self.services.utils.writeDebugFile(augmented_prompt_text, f"extractionChunk{chunk_index}-Prompt")
+ self.services.utils.writeDebugFile(ai_result or '', f"extractionChunk{chunk_index}-Response")
# If generating JSON, validate the response
if generate_json:
try:
- import json
- import re
# Clean the response - remove markdown code blocks and extra formatting
cleaned_result = ai_result.strip()
@@ -1103,7 +1065,6 @@ CONTINUATION INSTRUCTIONS:
options: Optional[AiCallOptions] = None
) -> Dict[str, Any]:
"""Merge chunk results in JSON mode - returns structured JSON document."""
- import json
if not chunkResults:
return {"metadata": {"title": "Empty Document"}, "sections": []}
diff --git a/modules/services/serviceAi/subUtilities.py b/modules/services/serviceAi/subUtilities.py
deleted file mode 100644
index 64508d71..00000000
--- a/modules/services/serviceAi/subUtilities.py
+++ /dev/null
@@ -1,319 +0,0 @@
-import logging
-from typing import Dict, Any, List, Optional, Tuple, Union
-from modules.datamodels.datamodelAi import ModelCapabilities, AiCallOptions
-
-logger = logging.getLogger(__name__)
-
-
-class SubUtilities:
- """Utility functions for text processing, debugging, and helper operations."""
-
- def __init__(self, services):
- """Initialize utilities service.
-
- Args:
- services: Service center instance for accessing other services
- """
- self.services = services
-
- def _writeTraceLog(self, contextText: str, data: Any) -> None:
- """Write raw data to the central trace log file without truncation."""
- try:
- import os
- import json
- from datetime import datetime, UTC
- # Only write if logger is in debug mode
- if logger.level > logging.DEBUG:
- return
- # Get log directory from configuration via service center if possible
- logDir = None
- try:
- logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./")
- except Exception:
- pass
- if not logDir:
- logDir = "./"
- if not os.path.isabs(logDir):
- # Make it relative to gateway directory
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- logDir = os.path.join(gatewayDir, logDir)
- os.makedirs(logDir, exist_ok=True)
- traceFile = os.path.join(logDir, "log_trace.log")
- timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
- traceEntry = f"[{timestamp}] {contextText}\n" + ("=" * 80) + "\n"
- if data is None:
- traceEntry += "No data provided\n"
- else:
- # Prefer exact text; if dict/list, pretty print JSON
- try:
- if isinstance(data, (dict, list)):
- traceEntry += f"JSON Data:\n{json.dumps(data, indent=2, ensure_ascii=False)}\n"
- else:
- text = str(data)
- traceEntry += f"Text Data:\n{text}\n"
- except Exception:
- traceEntry += f"Data (fallback): {str(data)}\n"
- traceEntry += ("=" * 80) + "\n\n"
- with open(traceFile, "a", encoding="utf-8") as f:
- f.write(traceEntry)
- except Exception:
- # Swallow to avoid recursive logging issues
- pass
-
- def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None:
- """Persist raw AI response parts for debugging under configured log directory - only if debug enabled."""
- try:
- # Check if debug logging is enabled
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if not debug_enabled:
- return
-
- import os
- from datetime import datetime, UTC
- # Use configured log directory instead of hardcoded test-chat
- from modules.shared.configuration import APP_CONFIG
- logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
- if not os.path.isabs(logDir):
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- logDir = os.path.join(gatewayDir, logDir)
- outDir = os.path.join(logDir, 'debug')
- os.makedirs(outDir, exist_ok=True)
- ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
- suffix = []
- if partIndex is not None:
- suffix.append(f"part{partIndex}")
- if continuation is not None:
- suffix.append(f"cont_{str(continuation).lower()}")
- if modelName:
- safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName)
- suffix.append(safeModel)
- suffixStr = ('_' + '_'.join(suffix)) if suffix else ''
- fname = f"{ts}_{label}{suffixStr}.txt"
- fpath = os.path.join(outDir, fname)
- with open(fpath, 'w', encoding='utf-8') as f:
- f.write(content or '')
- except Exception:
- # Do not raise; best-effort debug write
- pass
-
- def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool:
- """
- Check if text exceeds model token limit with safety margin.
- """
- # Simple character-based estimation (4 chars per token)
- estimated_tokens = len(text) // 4
- max_tokens = int(model.maxTokens * (1 - safety_margin))
- return estimated_tokens > max_tokens
-
- def _reduceText(self, text: str, reduction_factor: float) -> str:
- """
- Reduce text size by the specified factor.
- """
- if reduction_factor >= 1.0:
- return text
-
- target_length = int(len(text) * reduction_factor)
- return text[:target_length] + "... [reduced]"
-
- def _extractTextFromContentParts(self, extracted_content) -> str:
- """
- Extract text content from ExtractionService ContentPart objects.
- """
- if not extracted_content or not hasattr(extracted_content, 'parts'):
- return ""
-
- text_parts = []
- for part in extracted_content.parts:
- if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']:
- if hasattr(part, 'data') and part.data:
- text_parts.append(part.data)
-
- return "\n\n".join(text_parts)
-
- def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
- """
- Build full prompt by replacing placeholders with their content.
- Uses the new {{KEY:placeholder}} format.
- """
- if not placeholders:
- return prompt
-
- full_prompt = prompt
- for placeholder, content in placeholders.items():
- # Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
- full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
- full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
-
- return full_prompt
-
- def _reducePlanningPrompt(
- self,
- full_prompt: str,
- placeholders: Optional[Dict[str, str]],
- model: ModelCapabilities,
- options: AiCallOptions
- ) -> str:
- """
- Reduce planning prompt size by summarizing placeholders while preserving prompt structure.
- """
- if not placeholders:
- return self._reduceText(full_prompt, 0.7)
-
- # Reduce placeholders while preserving prompt
- reduced_placeholders = {}
- for placeholder, content in placeholders.items():
- if len(content) > 1000: # Only reduce long content
- reduction_factor = 0.7
- reduced_content = self._reduceText(content, reduction_factor)
- reduced_placeholders[placeholder] = reduced_content
- else:
- reduced_placeholders[placeholder] = content
-
- return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
-
- def _reduceTextPrompt(
- self,
- prompt: str,
- context: str,
- model: ModelCapabilities,
- options: AiCallOptions
- ) -> str:
- """
- Reduce text prompt size using typeGroup-aware chunking and merging.
- """
- max_size = int(model.maxTokens * (1 - options.safetyMargin))
-
- if options.compressPrompt:
- # Reduce both prompt and context
- target_size = max_size
- current_size = len(prompt) + len(context)
- reduction_factor = (target_size * 0.7) / current_size
-
- if reduction_factor < 1.0:
- prompt = self._reduceText(prompt, reduction_factor)
- context = self._reduceText(context, reduction_factor)
- else:
- # Only reduce context, preserve prompt integrity
- max_context_size = max_size - len(prompt)
- if len(context) > max_context_size:
- reduction_factor = max_context_size / len(context)
- context = self._reduceText(context, reduction_factor)
-
- return prompt + "\n\n" + context if context else prompt
-
- async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str:
- """Compress content to target size."""
- if len(content.encode("utf-8")) <= targetSize:
- return content
-
- try:
- compressionPrompt = f"""
- Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen,
- behalte aber alle wichtigen Informationen bei:
-
- {content}
-
- Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
- """
-
- # Service must not call connectors directly; use simple truncation fallback here
- data = content.encode("utf-8")
- return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]"
- except Exception as e:
- logger.warning(f"AI compression failed, using truncation: {str(e)}")
- return content[:targetSize] + "... [truncated]"
-
- def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List], options: AiCallOptions) -> Dict[str, int]:
- """
- Get model capabilities for content processing, including appropriate size limits for chunking.
- """
- # Estimate total content size
- prompt_size = len(prompt.encode('utf-8'))
- document_size = 0
- if documents:
- # Rough estimate of document content size
- for doc in documents:
- document_size += getattr(doc, 'fileSize', 0) or 0
-
- total_size = prompt_size + document_size
-
- # Use AiObjects to select the best model for this content size
- # We'll simulate the model selection by checking available models
- from modules.interfaces.interfaceAiObjects import aiModels
-
- # Find the best model for this content size and operation
- best_model = None
- best_context_length = 0
-
- for model_name, model_info in aiModels.items():
- context_length = model_info.get("contextLength", 0)
-
- # Skip models with no context length or too small for content
- if context_length == 0:
- continue
-
- # Check if model supports the operation type
- capabilities = model_info.get("capabilities", [])
- from modules.datamodels.datamodelAi import OperationType
- if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities:
- continue
- elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities:
- continue
- elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities:
- continue
- elif "text_generation" not in capabilities:
- continue
-
- # Prefer models that can handle the content without chunking, but allow chunking if needed
- if context_length >= total_size * 0.8: # 80% of content size
- if context_length > best_context_length:
- best_model = model_info
- best_context_length = context_length
- elif best_model is None: # Fallback to largest available model
- if context_length > best_context_length:
- best_model = model_info
- best_context_length = context_length
-
- # Fallback to a reasonable default if no model found
- if best_model is None:
- best_model = {
- "contextLength": 128000, # GPT-4o default
- "llmName": "gpt-4o"
- }
-
- # Calculate appropriate sizes
- # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters)
- context_length_bytes = int(best_model["contextLength"] * 4)
- max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length
- text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks
- image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks
-
- logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}")
- logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes")
- logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes")
-
- return {
- "maxContextBytes": max_context_bytes,
- "textChunkSize": text_chunk_size,
- "imageChunkSize": image_chunk_size
- }
-
- def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]:
- """
- Get models capable of handling the specific operation with capability filtering.
- """
- # Use the actual AI objects model selection instead of hardcoded default
- if hasattr(self, 'aiObjects') and self.aiObjects:
- # Let AiObjects handle the model selection
- return []
- else:
- # Fallback to default model if AiObjects not available
- default_model = ModelCapabilities(
- name="default",
- maxTokens=4000,
- capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"],
- costPerToken=0.001,
- processingTime=1.0,
- isAvailable=True
- )
- return [default_model]
diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py
index bb654b5e..645b9bdf 100644
--- a/modules/services/serviceExtraction/subPipeline.py
+++ b/modules/services/serviceExtraction/subPipeline.py
@@ -3,7 +3,6 @@ import logging
import os
from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart
-from modules.shared.configuration import APP_CONFIG
from .subUtils import makeId
from .subRegistry import ExtractorRegistry, ChunkerRegistry
from .merging.mergerText import TextMerger
diff --git a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
index bee1b82f..d5d28cf8 100644
--- a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
+++ b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
@@ -4,8 +4,14 @@ Base renderer class for all format renderers.
from abc import ABC, abstractmethod
from typing import Dict, Any, Tuple, List
-import logging
import json
+import logging
+import re
+from datetime import datetime, UTC
+import base64
+import io
+from PIL import Image
+from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
logger = logging.getLogger(__name__)
@@ -188,7 +194,6 @@ class BaseRenderer(ABC):
# Basic base64 validation
try:
- import base64
base64.b64decode(base64_data, validate=True)
return True
except Exception as e:
@@ -201,10 +206,6 @@ class BaseRenderer(ABC):
This is a helper method that format-specific renderers can use.
"""
try:
- import base64
- from PIL import Image
- import io
-
# Decode base64 data
image_data = base64.b64decode(base64_data)
image = Image.open(io.BytesIO(image_data))
@@ -221,10 +222,6 @@ class BaseRenderer(ABC):
Returns the resized image as base64 string.
"""
try:
- import base64
- from PIL import Image
- import io
-
# Decode base64 data
image_data = base64.b64decode(base64_data)
image = Image.open(io.BytesIO(image_data))
@@ -305,7 +302,6 @@ class BaseRenderer(ABC):
"""Format timestamp for display."""
if timestamp:
return timestamp
- from datetime import datetime, UTC
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
# ===== GENERIC AI STYLING HELPERS =====
@@ -328,7 +324,6 @@ class BaseRenderer(ABC):
return default_styles
try:
- from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
request_options = AiCallOptions()
request_options.operationType = OperationType.GENERAL
@@ -342,15 +337,8 @@ class BaseRenderer(ABC):
response = await ai_service.aiObjects.call(request)
# Save styling prompt and response to debug
- try:
- from modules.shared.debugLogger import writeDebugFile
- writeDebugFile(style_template, "rendererStylingPrompt")
- writeDebugFile(response.content or '', "rendererStylingResponse")
- except Exception:
- pass
-
- import json
- import re
+ self.services.utils.writeDebugFile(style_template, "rendererStylingPrompt")
+ self.services.utils.writeDebugFile(response.content or '', "rendererStylingResponse")
# Clean and parse JSON
result = response.content.strip() if response and response.content else ""
diff --git a/modules/services/serviceGeneration/renderers/rendererImage.py b/modules/services/serviceGeneration/renderers/rendererImage.py
index dab0496c..9da52466 100644
--- a/modules/services/serviceGeneration/renderers/rendererImage.py
+++ b/modules/services/serviceGeneration/renderers/rendererImage.py
@@ -60,11 +60,7 @@ class RendererImage(BaseRenderer):
image_prompt = await self._create_image_generation_prompt(extracted_content, document_title, user_prompt, ai_service)
# Save image generation prompt to debug
- try:
- from modules.shared.debugLogger import writeDebugFile
- writeDebugFile(image_prompt, "rendererImageGenerationPrompt")
- except Exception:
- pass
+ ai_service.services.utils.writeDebugFile(image_prompt, "rendererImageGenerationPrompt")
# Generate image using AI
image_result = await ai_service.aiObjects.generateImage(
@@ -75,11 +71,7 @@ class RendererImage(BaseRenderer):
)
# Save image generation response to debug
- try:
- from modules.shared.debugLogger import writeDebugFile
- writeDebugFile(str(image_result), "rendererImageGenerationResponse")
- except Exception:
- pass
+ ai_service.services.utils.writeDebugFile(str(image_result), "rendererImageGenerationResponse")
# Extract base64 image data from result
if image_result and image_result.get("success", False):
diff --git a/modules/services/serviceGeneration/subPromptBuilder.py b/modules/services/serviceGeneration/subPromptBuilder.py
index 716ebac0..c59909e2 100644
--- a/modules/services/serviceGeneration/subPromptBuilder.py
+++ b/modules/services/serviceGeneration/subPromptBuilder.py
@@ -391,25 +391,8 @@ DO NOT return a schema description - return actual extracted content in the JSON
# Combine all parts
finalPrompt = f"{genericIntro}\n\n{formatGuidelines}".strip()
- # Save extraction prompt to debug file - only if debug enabled
- try:
- debug_enabled = services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- import os
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- # Use configured log directory instead of hardcoded test-chat
- from modules.shared.configuration import APP_CONFIG
- logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
- if not os.path.isabs(logDir):
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- logDir = os.path.join(gatewayDir, logDir)
- debug_root = os.path.join(logDir, 'debug')
- os.makedirs(debug_root, exist_ok=True)
- with open(os.path.join(debug_root, f"{ts}_extraction_prompt.txt"), "w", encoding="utf-8") as f:
- f.write(finalPrompt)
- except Exception:
- pass
+ # Save extraction prompt to debug file
+ services.utils.writeDebugFile(finalPrompt, "extraction_prompt")
return finalPrompt
diff --git a/modules/services/serviceNormalization/mainServiceNormalization.py b/modules/services/serviceNormalization/mainServiceNormalization.py
index 2a932723..6748be72 100644
--- a/modules/services/serviceNormalization/mainServiceNormalization.py
+++ b/modules/services/serviceNormalization/mainServiceNormalization.py
@@ -116,8 +116,8 @@ class NormalizationService:
if "canonicalHeaders" not in mapping:
mapping["canonicalHeaders"] = canonicalSpec.get("canonicalHeaders", [])
- # debug artifact
- self._writeDebugArtifact("mapping.json", mapping)
+ # debug artifact (now routed via writeDebugFile)
+ self.services.utils.writeDebugArtifact("mapping.json", mapping)
return mapping
def applyMapping(self, mergedJson: Dict[str, Any], mappingSpec: Dict[str, Any]) -> Dict[str, Any]:
@@ -198,8 +198,8 @@ class NormalizationService:
]
}
- # debug artifact
- self._writeDebugArtifact("canonical_merged.json", canonical)
+ # debug artifact (now routed via writeDebugFile)
+ self.services.utils.writeDebugArtifact("canonical_merged.json", canonical)
return canonical
def validateCanonical(self, canonicalJson: Dict[str, Any]) -> Dict[str, Any]:
@@ -218,7 +218,7 @@ class NormalizationService:
"rowCount": len(rows),
"success": len(rows) > 0
}
- self._writeDebugArtifact("normalization_report.json", report)
+ self.services.utils.writeDebugArtifact("normalization_report.json", report)
return report
# Internal helpers
@@ -239,32 +239,3 @@ class NormalizationService:
text = text.upper()
return text
- def _writeDebugArtifact(self, fileName: str, obj: Any) -> None:
- try:
- debugEnabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if not debugEnabled:
- return
- # Use configured log directory instead of hardcoded test-chat
- from modules.shared.configuration import APP_CONFIG
- logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
- if not os.path.isabs(logDir):
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- logDir = os.path.join(gatewayDir, logDir)
- root = os.path.join(logDir, 'debug')
- os.makedirs(root, exist_ok=True)
- # Prefix timestamp for files that are frequently overwritten
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- if fileName in ("mapping.json", "canonical_merged.json"):
- outName = f"{ts}_{fileName}"
- else:
- outName = fileName
- path = os.path.join(root, outName)
- with open(path, "w", encoding="utf-8") as f:
- if isinstance(obj, (dict, list)):
- f.write(json.dumps(obj, ensure_ascii=False, indent=2))
- else:
- f.write(str(obj))
- except Exception:
- pass
-
-
diff --git a/modules/services/serviceUtils/mainServiceUtils.py b/modules/services/serviceUtils/mainServiceUtils.py
index caa07528..6d166a05 100644
--- a/modules/services/serviceUtils/mainServiceUtils.py
+++ b/modules/services/serviceUtils/mainServiceUtils.py
@@ -5,11 +5,11 @@ Provides centralized access to configuration, events, and other utilities.
import logging
import os
-from typing import Any, Optional, Dict, Callable
+from typing import Any, Optional, Dict, Callable, List
from modules.shared.configuration import APP_CONFIG
from modules.shared.eventManagement import eventManager
from modules.shared.timezoneUtils import get_utc_timestamp
-from modules.security.tokenManager import TokenManager
+from modules.shared import jsonUtils
logger = logging.getLogger(__name__)
@@ -19,6 +19,8 @@ class UtilsService:
def __init__(self, services):
self.services = services
+ # ===== Event handling =====
+
def eventRegisterCron(self, job_id: str, func: Callable, cron_kwargs: Dict[str, Any],
replace_existing: bool = True, coalesce: bool = True,
max_instances: int = 1, misfire_grace_time: int = 1800):
@@ -113,7 +115,7 @@ class UtilsService:
logger.error(f"Error getting config '{key}': {str(e)}")
return default
- def getUtcTimestamp(self) -> float:
+ def timestampGetUtc(self) -> float:
"""
Get current UTC timestamp.
@@ -126,62 +128,83 @@ class UtilsService:
logger.error(f"Error getting UTC timestamp: {str(e)}")
return 0.0
- def getFreshConnectionToken(self, connectionId: str):
+ # ===== Debug Tools =====
+
+ def writeDebugFile(self, content: str, fileType: str, documents: Optional[List] = None) -> None:
"""
- Get a fresh token for a specific connection.
-
- Args:
- connectionId: ID of the connection to get token for
-
- Returns:
- Token object or None if not found/expired
+ Wrapper to write debug files via shared debugLogger.
+ Mirrors writeDebugFile() in modules.shared.debugLogger and keeps a single call site.
"""
try:
- return TokenManager().getFreshToken(connectionId)
- except Exception as e:
- logger.error(f"Error getting fresh token for connection {connectionId}: {str(e)}")
- return None
+ from modules.shared.debugLogger import writeDebugFile as _writeDebugFile
+ _writeDebugFile(content, fileType, documents)
+ except Exception:
+ # Silent fail to never break main flow
+ pass
def debugLogToFile(self, message: str, context: str = "DEBUG"):
"""
- Log debug message to file if debug logging is enabled.
-
- Args:
- message: Debug message to log
- context: Context identifier for the debug message
+ Wrapper to log debug messages via shared debugLogger.
+ Mirrors debugLogToFile() in modules.shared.debugLogger.
"""
try:
- # Check if debug logging is enabled
- debug_enabled = self.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if not debug_enabled:
- return
-
- # Get debug directory
- # Use configured log directory instead of hardcoded test-chat
- logDir = self.configGet("APP_LOGGING_LOG_DIR", "./")
- if not os.path.isabs(logDir):
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- logDir = os.path.join(gatewayDir, logDir)
- debug_dir = os.path.join(logDir, 'debug')
- if not os.path.isabs(debug_dir):
- # If relative path, make it relative to the gateway directory
- gateway_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- debug_dir = os.path.join(gateway_dir, debug_dir)
-
- # Ensure debug directory exists
- os.makedirs(debug_dir, exist_ok=True)
-
- # Create debug file path
- debug_file = os.path.join(debug_dir, "debug_workflow.log")
-
- # Format the debug entry
- timestamp = self.getUtcTimestamp()
- debug_entry = f"[{timestamp}] [{context}] {message}\n"
-
- # Write to debug file
- with open(debug_file, "a", encoding="utf-8") as f:
- f.write(debug_entry)
-
- except Exception as e:
- # Don't log debug errors to avoid recursion
- pass
\ No newline at end of file
+ from modules.shared.debugLogger import debugLogToFile as _debugLogToFile
+ _debugLogToFile(message, context)
+ except Exception:
+ # Silent fail to never break main flow
+ pass
+
+ def storeDebugMessageAndDocuments(self, message, currentUser):
+ """
+ Wrapper to store debug messages and documents via shared debugLogger.
+ Mirrors storeDebugMessageAndDocuments() in modules.shared.debugLogger.
+ """
+ try:
+ from modules.shared.debugLogger import storeDebugMessageAndDocuments as _storeDebugMessageAndDocuments
+ _storeDebugMessageAndDocuments(message, currentUser)
+ except Exception:
+ # Silent fail to never break main flow
+ pass
+
+ def writeDebugArtifact(self, fileName: str, obj: Any):
+ """
+ Backward-compatible wrapper that now writes via writeDebugFile.
+ Accepts an object (dict/list/str), serializes if needed, and writes to debug folder.
+ """
+ try:
+ # Serialize objects to JSON when applicable
+ if isinstance(obj, (dict, list)):
+ import json
+ content = json.dumps(obj, ensure_ascii=False, indent=2)
+ else:
+ content = str(obj)
+
+ # Delegate to shared writeDebugFile; preserve provided file extension in fileName
+ from modules.shared.debugLogger import writeDebugFile as _writeDebugFile
+ _writeDebugFile(content, fileName)
+ except Exception:
+ # Silent fail to never break main flow
+ pass
+
+ # ===== JSON utility wrappers =====
+
+ def jsonStripCodeFences(self, text: str) -> str:
+ return jsonUtils.stripCodeFences(text)
+
+ def jsonExtractFirstBalanced(self, text: str) -> str:
+ return jsonUtils.extractFirstBalancedJson(text)
+
+ def jsonNormalizeText(self, text: str) -> str:
+ return jsonUtils.normalizeJsonText(text)
+
+ def jsonExtractString(self, text: str) -> str:
+ return jsonUtils.extractJsonString(text)
+
+ def jsonTryParse(self, text) -> tuple:
+ return jsonUtils.tryParseJson(text)
+
+ def jsonParseOrRaise(self, text):
+ return jsonUtils.parseJsonOrRaise(text)
+
+ def jsonMergeRootLists(self, parts):
+ return jsonUtils.mergeRootLists(parts)
\ No newline at end of file
diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py
index 1e7b9ae1..d9aaaa9f 100644
--- a/modules/services/serviceWorkflow/mainServiceWorkflow.py
+++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py
@@ -293,6 +293,21 @@ Please provide a comprehensive summary of this conversation."""
logger.error(f"Error parsing connection reference: {str(e)}")
return None
+ def getFreshConnectionToken(self, connectionId: str):
+ """Get a fresh token for a specific connection (moved from UtilsService).
+
+ Args:
+ connectionId: ID of the connection to get token for
+
+ Returns:
+ Token object or None if not found/expired
+ """
+ try:
+ return TokenManager().getFreshToken(connectionId)
+ except Exception as e:
+ logger.error(f"Error getting fresh token for connection {connectionId}: {str(e)}")
+ return None
+
def getFileInfo(self, fileId: str) -> Dict[str, Any]:
"""Get file information"""
file_item = self.interfaceDbComponent.getFile(fileId)
diff --git a/modules/shared/debugLogger.py b/modules/shared/debugLogger.py
index 947bf816..a4fd0032 100644
--- a/modules/shared/debugLogger.py
+++ b/modules/shared/debugLogger.py
@@ -4,24 +4,36 @@ Writes files chronologically to the configured log directory with sequential num
"""
import os
from datetime import datetime, UTC
-from typing import List, Optional
+from typing import List, Optional, Any
from modules.shared.configuration import APP_CONFIG
-def _getDebugDir() -> str:
- """Get the debug directory path from configuration."""
- # Get log directory from config (same as used by main logging system)
+def _resolveLogDir() -> str:
+ """Resolve the absolute log directory from configuration."""
logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
if not os.path.isabs(logDir):
# If relative path, make it relative to the gateway directory
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logDir = os.path.join(gatewayDir, logDir)
+ return logDir
+
+def _ensureDir(path: str) -> None:
+ """Create directory if it does not exist."""
+ os.makedirs(path, exist_ok=True)
+
+def _isDebugEnabled() -> bool:
+ """Check if debug workflow logging is enabled."""
+ return APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
+
+def _getDebugDir() -> str:
+ """Get the debug directory path from configuration."""
+ # Get log directory from config (same as used by main logging system)
+ logDir = _resolveLogDir()
# Create debug subdirectory within the log directory
- debugDir = os.path.join(logDir, 'debug')
+ debugDir = os.path.join(logDir, 'debug/prompts')
return debugDir
-
def _getNextSequenceNumber() -> int:
"""Get the next sequence number by counting existing files."""
debugDir = _getDebugDir()
@@ -38,6 +50,7 @@ def writeDebugFile(content: str, fileType: str, documents: Optional[List] = None
Write debug content to a file with sequential numbering.
Writes the content as-is since it's already the final integrated prompt.
Includes document list labels for tracing enhancement.
+ Only writes if debug logging is enabled via APP_DEBUG_CHAT_WORKFLOW_ENABLED config.
Args:
content: The main content to write (already integrated)
@@ -45,15 +58,23 @@ def writeDebugFile(content: str, fileType: str, documents: Optional[List] = None
documents: Optional list of documents for tracing
"""
try:
+ # Check if debug logging is enabled
+ if not _isDebugEnabled():
+ return
+
debugDir = _getDebugDir()
- os.makedirs(debugDir, exist_ok=True)
+ _ensureDir(debugDir)
seqNum = _getNextSequenceNumber()
ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S')
# Add 3-digit sequence number for uniqueness
tsWithSeq = f"{ts}-{seqNum:03d}"
- filename = f"{tsWithSeq}-{fileType}.txt"
+ # Allow callers to pass an extension; if none, default to .txt
+ if "." in (fileType or ""):
+ filename = f"{tsWithSeq}-{fileType}"
+ else:
+ filename = f"{tsWithSeq}-{fileType}.txt"
filepath = os.path.join(debugDir, filename)
# Build content with document tracing
@@ -73,7 +94,170 @@ def writeDebugFile(content: str, fileType: str, documents: Optional[List] = None
# Write the content with document tracing
with open(filepath, 'w', encoding='utf-8') as f:
f.write(debug_content)
-
except Exception as e:
- # Silent fail - don't break the main flow
+ # Don't log debug errors to avoid recursion
pass
+
+def debugLogToFile(message: str, context: str = "DEBUG") -> None:
+ """
+ Log debug message to file if debug logging is enabled.
+
+ Args:
+ message: Debug message to log
+ context: Context identifier for the debug message
+ """
+ try:
+ # Check if debug logging is enabled
+ if not _isDebugEnabled():
+ return
+
+ # Get debug directory
+ logDir = _resolveLogDir()
+ debug_dir = os.path.join(logDir, 'debug')
+ _ensureDir(debug_dir)
+
+ # Create debug file path
+ debug_file = os.path.join(debug_dir, "debug_workflow.log")
+
+ # Format the debug entry
+ from modules.shared.timezoneUtils import get_utc_timestamp
+ timestamp = get_utc_timestamp()
+ debug_entry = f"[{timestamp}] [{context}] {message}\n"
+
+ # Write to debug file
+ with open(debug_file, "a", encoding="utf-8") as f:
+ f.write(debug_entry)
+
+ except Exception as e:
+ # Don't log debug errors to avoid recursion
+ pass
+
+def storeDebugMessageAndDocuments(message, currentUser) -> None:
+ """
+ Store message and documents (metadata and file bytes) for debugging purposes.
+ Structure: {log_dir}/debug/messages/m_round_task_action_timestamp/documentlist_label/
+ - message.json, message_text.txt
+ - document_###_metadata.json
+ - document_###_ (actual file bytes)
+
+ Args:
+ message: ChatMessage object to store
+ currentUser: Current user for component interface access
+ """
+ try:
+ import json
+ from datetime import datetime, UTC
+
+ # Create base debug directory
+ logDir = _resolveLogDir()
+ debug_root = os.path.join(logDir, 'debug', 'messages')
+ _ensureDir(debug_root)
+
+ # Generate timestamp
+ timestamp = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
+
+ # Create message folder name: m_round_task_action_timestamp
+ # Use actual values from message, not defaults
+ round_str = str(message.roundNumber) if message.roundNumber is not None else "0"
+ task_str = str(message.taskNumber) if message.taskNumber is not None else "0"
+ action_str = str(message.actionNumber) if message.actionNumber is not None else "0"
+ message_folder = f"{timestamp}_m_{round_str}_{task_str}_{action_str}"
+
+ message_path = os.path.join(debug_root, message_folder)
+ os.makedirs(message_path, exist_ok=True)
+
+ # Store message data - use dict() instead of model_dump() for compatibility
+ message_file = os.path.join(message_path, "message.json")
+ with open(message_file, "w", encoding="utf-8") as f:
+ # Convert message to dict manually to avoid model_dump() issues
+ message_dict = {
+ "id": message.id,
+ "workflowId": message.workflowId,
+ "parentMessageId": message.parentMessageId,
+ "message": message.message,
+ "role": message.role,
+ "status": message.status,
+ "sequenceNr": message.sequenceNr,
+ "publishedAt": message.publishedAt,
+ "roundNumber": message.roundNumber,
+ "taskNumber": message.taskNumber,
+ "actionNumber": message.actionNumber,
+ "documentsLabel": message.documentsLabel,
+ "actionId": message.actionId,
+ "actionMethod": message.actionMethod,
+ "actionName": message.actionName,
+ "success": message.success,
+ "documents": []
+ }
+ json.dump(message_dict, f, indent=2, ensure_ascii=False, default=str)
+
+ # Store message content as text
+ if message.message:
+ message_text_file = os.path.join(message_path, "message_text.txt")
+ with open(message_text_file, "w", encoding="utf-8") as f:
+ f.write(str(message.message))
+
+ # Store documents if provided
+ if message.documents and len(message.documents) > 0:
+ # Group documents by documentsLabel
+ documents_by_label = {}
+ for doc in message.documents:
+ label = message.documentsLabel or 'default'
+ if label not in documents_by_label:
+ documents_by_label[label] = []
+ documents_by_label[label].append(doc)
+
+ # Create subfolder for each document label
+ for label, docs in documents_by_label.items():
+ # Sanitize label for filesystem
+ safe_label = "".join(c for c in str(label) if c.isalnum() or c in (' ', '-', '_')).rstrip()
+ safe_label = safe_label.replace(' ', '_')
+ if not safe_label:
+ safe_label = "default"
+
+ label_folder = os.path.join(message_path, safe_label)
+ _ensureDir(label_folder)
+
+ # Store each document
+ for i, doc in enumerate(docs):
+ # Create document metadata file
+ doc_meta = {
+ "id": doc.id,
+ "messageId": doc.messageId,
+ "fileId": doc.fileId,
+ "fileName": doc.fileName,
+ "fileSize": doc.fileSize,
+ "mimeType": doc.mimeType,
+ "roundNumber": doc.roundNumber,
+ "taskNumber": doc.taskNumber,
+ "actionNumber": doc.actionNumber,
+ "actionId": doc.actionId
+ }
+
+ doc_meta_file = os.path.join(label_folder, f"document_{i+1:03d}_metadata.json")
+ with open(doc_meta_file, "w", encoding="utf-8") as f:
+ json.dump(doc_meta, f, indent=2, ensure_ascii=False, default=str)
+
+ # Also store the actual file bytes next to metadata for debugging
+ try:
+ # Lazy import to avoid circular deps at module load
+ from modules.interfaces import interfaceDbComponentObjects as comp
+ componentInterface = comp.getInterface(currentUser)
+ file_bytes = componentInterface.getFileData(doc.fileId)
+ if file_bytes:
+ # Build a safe filename preserving original name
+ safe_name = doc.fileName or f"document_{i+1:03d}"
+ # Avoid path traversal
+ safe_name = os.path.basename(safe_name)
+ doc_file_path = os.path.join(label_folder, f"document_{i+1:03d}_" + safe_name)
+ with open(doc_file_path, "wb") as df:
+ df.write(file_bytes)
+ else:
+ pass
+ except Exception as e:
+ pass
+
+ except Exception as e:
+ # Silent fail - don't break main flow
+ pass
+
diff --git a/modules/shared/jsonUtils.py b/modules/shared/jsonUtils.py
new file mode 100644
index 00000000..8a236182
--- /dev/null
+++ b/modules/shared/jsonUtils.py
@@ -0,0 +1,137 @@
+import json
+import logging
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+logger = logging.getLogger(__name__)
+
+
+def stripCodeFences(text: str) -> str:
+ """Remove ```json / ``` fences and surrounding whitespace if present."""
+ if not text:
+ return text
+ s = text.strip()
+ if s.startswith("```") and s.endswith("```"):
+ # Remove first/last triple backticks
+ # Commonly starts with ```json\n
+ # Strip opening backticks
+ i = 3
+ # Skip optional language tag like 'json'
+ while i < len(s) and s[i] != '\n':
+ i += 1
+ if i < len(s) and s[i] == '\n':
+ s = s[i+1:]
+ # Strip trailing ```
+ if s.endswith("```"):
+ s = s[:-3]
+ return s.strip()
+ return s
+
+
+def extractFirstBalancedJson(text: str) -> str:
+ """Return the first balanced JSON object/array substring; otherwise return trimmed input."""
+ if not text:
+ return text
+ s = text.strip()
+ # Find first '{' or '['
+ brace = s.find('{')
+ bracket = s.find('[')
+ start = -1
+ if brace != -1 and (bracket == -1 or brace < bracket):
+ start = brace
+ elif bracket != -1:
+ start = bracket
+ if start == -1:
+ return s
+ # Scan for matching close using a simple stack
+ stack: List[str] = []
+ for i in range(start, len(s)):
+ ch = s[i]
+ if ch in '{[':
+ stack.append(ch)
+ elif ch in '}]':
+ if not stack:
+ continue
+ opener = stack.pop()
+ if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
+ continue
+ if not stack:
+ return s[start:i+1].strip()
+ return s
+
+
+def normalizeJsonText(text: str) -> str:
+ """Light normalization: remove BOM, normalize smart quotes."""
+ if not text:
+ return text
+ s = text
+ # Remove UTF-8 BOM if present
+ if s.startswith('\ufeff'):
+ s = s.lstrip('\ufeff')
+ # Normalize smart quotes to straight quotes
+ s = s.replace('“', '"').replace('”', '"').replace('’', "'").replace('‘', "'")
+ return s
+
+
+def extractJsonString(text: str) -> str:
+ """Strip code fences, normalize, then extract first balanced JSON substring."""
+ s = normalizeJsonText(text)
+ s = stripCodeFences(s)
+ s = extractFirstBalancedJson(s)
+ return s.strip()
+
+
+def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
+ """Extract and parse JSON; return (obj, error, cleaned_str)."""
+ if isinstance(text, bytes):
+ try:
+ text = text.decode('utf-8', errors='replace')
+ except Exception:
+ text = str(text)
+ cleaned = extractJsonString(text or "")
+ try:
+ return json.loads(cleaned), None, cleaned
+ except Exception as e:
+ return None, e, cleaned
+
+
+def parseJsonOrRaise(text: Union[str, bytes]) -> Union[Dict, List]:
+ obj, err, cleaned = tryParseJson(text)
+ if err is not None:
+ logger.error(f"parse_json_or_raise failed: {err}. Cleaned preview: {cleaned[:200]}...")
+ raise err
+ return obj
+
+
+def mergeRootLists(json_parts: List[Union[str, Dict, List]]) -> Dict[str, Any]:
+ """
+ Generic merger for root-level lists: take first dict as base; for each subsequent part:
+ - if value is list and same key exists as list, extend it
+ - if key absent, add it
+ - for non-list keys, keep the original (from the first part)
+ Sets continuation=None if present in base.
+ """
+ base: Optional[Dict[str, Any]] = None
+ parsed: List[Dict[str, Any]] = []
+ for part in json_parts:
+ if isinstance(part, (dict, list)):
+ obj = part
+ else:
+ obj, err, _ = tryParseJson(part)
+ if err is not None or not isinstance(obj, (dict, list)):
+ continue
+ if isinstance(obj, dict):
+ parsed.append(obj)
+ if not parsed:
+ return {}
+ base = dict(parsed[0])
+ for obj in parsed[1:]:
+ for k, v in obj.items():
+ if isinstance(v, list) and isinstance(base.get(k), list):
+ base[k].extend(v)
+ elif k not in base:
+ base[k] = v
+ if 'continuation' in base:
+ base['continuation'] = None
+ return base
+
+
diff --git a/modules/shared/progressLogger_example.py b/modules/shared/progressLogger_example.py
deleted file mode 100644
index 13146846..00000000
--- a/modules/shared/progressLogger_example.py
+++ /dev/null
@@ -1,183 +0,0 @@
-"""
-Example usage of ProgressLogger for workflow progress tracking.
-
-This file demonstrates how to use the ProgressLogger utility to track
-progress of long-running operations in workflows.
-"""
-
-import asyncio
-import time
-from modules.shared.progressLogger import ProgressLogger
-
-
-async def exampleDocumentProcessing(workflowService, workflow):
- """Example of document processing with progress tracking."""
-
- # Create progress logger
- progressLogger = workflowService.createProgressLogger(workflow)
- operationId = f"docProcess_{workflow.id}_{int(time.time())}"
-
- try:
- # Start operation
- progressLogger.startOperation(
- operationId,
- "Extract",
- "Document Processing",
- "Processing 5 documents"
- )
-
- # Simulate processing steps
- documents = ["doc1.pdf", "doc2.docx", "doc3.txt", "doc4.xlsx", "doc5.pdf"]
-
- for i, doc in enumerate(documents):
- # Update progress for each document
- progress = (i + 1) / len(documents)
- progressLogger.updateProgress(
- operationId,
- progress,
- f"Processing {doc} ({i+1}/{len(documents)})"
- )
-
- # Simulate processing time
- await asyncio.sleep(0.5)
-
- # Complete operation
- progressLogger.completeOperation(operationId, True)
-
- except Exception as e:
- # Complete with failure
- progressLogger.completeOperation(operationId, False)
- raise
-
-
-async def exampleAiProcessing(workflowService, workflow):
- """Example of AI processing with chunk progress tracking."""
-
- progressLogger = workflowService.createProgressLogger(workflow)
- operationId = f"aiProcess_{workflow.id}_{int(time.time())}"
-
- try:
- # Start operation
- progressLogger.startOperation(
- operationId,
- "AI",
- "Content Analysis",
- "Processing 10 chunks"
- )
-
- # Simulate AI processing with chunks
- chunks = list(range(1, 11))
-
- for i, chunk in enumerate(chunks):
- progress = (i + 1) / len(chunks)
- progressLogger.updateProgress(
- operationId,
- progress,
- f"Processing chunk {chunk} of {len(chunks)}"
- )
-
- # Simulate AI processing time
- await asyncio.sleep(0.3)
-
- # Complete operation
- progressLogger.completeOperation(operationId, True)
-
- except Exception as e:
- progressLogger.completeOperation(operationId, False)
- raise
-
-
-async def exampleWorkflowTask(workflowService, workflow):
- """Example of workflow task execution with progress tracking."""
-
- progressLogger = workflowService.createProgressLogger(workflow)
- operationId = f"workflowTask_{workflow.id}_{int(time.time())}"
-
- try:
- # Start operation
- progressLogger.startOperation(
- operationId,
- "Workflow",
- "Task Execution",
- "Executing data analysis task"
- )
-
- # Simulate task steps
- steps = [
- "Initializing analysis",
- "Loading data",
- "Processing data",
- "Generating results",
- "Saving output"
- ]
-
- for i, step in enumerate(steps):
- progress = (i + 1) / len(steps)
- progressLogger.updateProgress(
- operationId,
- progress,
- step
- )
-
- # Simulate step processing time
- await asyncio.sleep(0.4)
-
- # Complete operation
- progressLogger.completeOperation(operationId, True)
-
- except Exception as e:
- progressLogger.completeOperation(operationId, False)
- raise
-
-
-# Example of how to integrate into existing services
-class ExampleService:
- """Example service showing integration with ProgressLogger."""
-
- def __init__(self, workflowService):
- self.workflowService = workflowService
-
- async def processWithProgress(self, workflow, data):
- """Process data with progress tracking."""
-
- progressLogger = self.workflowService.createProgressLogger(workflow)
- operationId = f"example_{workflow.id}_{int(time.time())}"
-
- try:
- # Start operation
- progressLogger.startOperation(
- operationId,
- "Example",
- "Data Processing",
- f"Processing {len(data)} items"
- )
-
- # Process data in chunks
- chunkSize = 10
- totalChunks = (len(data) + chunkSize - 1) // chunkSize
-
- for i in range(0, len(data), chunkSize):
- chunk = data[i:i + chunkSize]
- chunkNum = i // chunkSize + 1
-
- progress = chunkNum / totalChunks
- progressLogger.updateProgress(
- operationId,
- progress,
- f"Processing chunk {chunkNum}/{totalChunks}"
- )
-
- # Process chunk
- await self._processChunk(chunk)
-
- # Complete operation
- progressLogger.completeOperation(operationId, True)
-
- except Exception as e:
- progressLogger.completeOperation(operationId, False)
- raise
-
- async def _processChunk(self, chunk):
- """Process a chunk of data."""
- # Simulate processing
- await asyncio.sleep(0.1)
diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py
index 367b04e8..961bc562 100644
--- a/modules/workflows/methods/methodOutlook.py
+++ b/modules/workflows/methods/methodOutlook.py
@@ -8,14 +8,11 @@ import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
import json
-import uuid
import requests
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
-from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority, AiCallRequest
-from modules.datamodels.datamodelChat import ChatDocument
-from modules.datamodels.datamodelUam import ConnectionStatus
+from modules.datamodels.datamodelAi import AiCallOptions
logger = logging.getLogger(__name__)
@@ -48,7 +45,7 @@ class MethodOutlook(MethodBase):
logger.debug(f"Found connection: {userConnection.id}, status: {userConnection.status.value}, authority: {userConnection.authority.value}")
# Get a fresh token for this connection
- token = self.services.utils.getFreshConnectionToken(userConnection.id)
+ token = self.services.workflow.getFreshConnectionToken(userConnection.id)
if not token:
logger.error(f"Fresh token not found for connection: {userConnection.id}")
logger.debug(f"Connection details: {userConnection}")
@@ -466,7 +463,7 @@ class MethodOutlook(MethodBase):
"authority": "microsoft",
"reference": connectionReference
},
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
return ActionResult.isSuccess(
@@ -696,7 +693,7 @@ class MethodOutlook(MethodBase):
"authority": "microsoft",
"reference": connectionReference
},
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
return ActionResult(
@@ -819,7 +816,7 @@ class MethodOutlook(MethodBase):
"authority": "microsoft",
"reference": connectionReference
},
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
return ActionResult(
@@ -929,7 +926,7 @@ class MethodOutlook(MethodBase):
"authority": "microsoft",
"reference": connectionReference
},
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
return ActionResult(
@@ -1070,7 +1067,7 @@ class MethodOutlook(MethodBase):
"authority": "microsoft",
"reference": connectionReference
},
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
return ActionResult(
@@ -1369,7 +1366,7 @@ LOOP_INSTRUCTION"""
"aiGenerated": True,
"context": context,
"emailStyle": emailStyle,
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
return ActionResult(
diff --git a/modules/workflows/methods/methodSharepoint.py b/modules/workflows/methods/methodSharepoint.py
index 6dd75bac..f0d020c2 100644
--- a/modules/workflows/methods/methodSharepoint.py
+++ b/modules/workflows/methods/methodSharepoint.py
@@ -787,7 +787,7 @@ class MethodSharepoint(MethodBase):
"totalResults": len(found_documents),
"maxResults": maxResults,
"foundDocuments": found_documents,
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
except Exception as e:
@@ -1069,7 +1069,7 @@ class MethodSharepoint(MethodBase):
"authority": "microsoft",
"reference": connectionReference
},
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
# Use default JSON format for output
@@ -1423,7 +1423,7 @@ class MethodSharepoint(MethodBase):
"authority": "microsoft",
"reference": connectionReference
},
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
# Use default JSON format for output
@@ -1820,7 +1820,7 @@ class MethodSharepoint(MethodBase):
"includeSubfolders": includeSubfolders,
"sitesSearched": len(sites),
"listResults": list_results,
- "timestamp": self.services.utils.getUtcTimestamp()
+ "timestamp": self.services.utils.timestampGetUtc()
}
# Use default JSON format for output
diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py
index fd649337..19d08164 100644
--- a/modules/workflows/processing/core/actionExecutor.py
+++ b/modules/workflows/processing/core/actionExecutor.py
@@ -278,13 +278,13 @@ class ActionExecutor:
return "react_misc.jsonl"
# Daily suffix for React files
- dateSuffix = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d")
+ dateSuffix = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d")
baseReactFile = _reactFileForContext(contextText)
name, ext = os.path.splitext(baseReactFile)
reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}")
# Format the trace entry with better structure
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Create a structured trace entry
traceEntry = f"[{timestamp}] {contextText}\n"
diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py
index fe543bb3..6a195c16 100644
--- a/modules/workflows/processing/core/messageCreator.py
+++ b/modules/workflows/processing/core/messageCreator.py
@@ -56,7 +56,7 @@ class MessageCreator:
"message": taskSummary,
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": "task_plan",
"documents": [],
# Add workflow context fields - use current workflow round instead of hardcoded 1
@@ -86,7 +86,7 @@ class MessageCreator:
"message": f"🚀 **Task {taskProgress}**",
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": f"task_{taskIndex}_start",
"documents": [],
# Add workflow context fields
@@ -163,7 +163,7 @@ class MessageCreator:
"message": messageText,
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"actionId": action.id,
"actionMethod": action.execMethod,
"actionName": action.execAction,
@@ -214,7 +214,7 @@ class MessageCreator:
"message": completionMessage,
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": f"task_{taskIndex}_completion",
"documents": [],
# Add workflow context fields
@@ -243,7 +243,7 @@ class MessageCreator:
"message": f"🔄 **Task {taskIndex}** needs retry: {reviewResult.improvements}",
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": f"task_{taskIndex}_retry",
"documents": [],
"roundNumber": workflow.currentRound,
@@ -277,7 +277,7 @@ class MessageCreator:
"message": errorMessage,
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"actionId": None,
"actionMethod": "task",
"actionName": "task_error",
diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py
index e3dc977c..51d488a5 100644
--- a/modules/workflows/processing/modes/modeActionplan.py
+++ b/modules/workflows/processing/modes/modeActionplan.py
@@ -288,7 +288,7 @@ class ActionplanMode(BaseMode):
"message": f"⚡ **Action {actionNumber}/{totalActions}** (Method {action.execMethod}.{action.execAction})",
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": f"action_{actionNumber}_start",
"documents": [],
"actionProgress": "running",
@@ -601,7 +601,7 @@ class ActionplanMode(BaseMode):
retryCount=createdAction.get("retryCount", 0),
retryMax=createdAction.get("retryMax", 3),
processingTime=createdAction.get("processingTime"),
- timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())),
+ timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())),
result=createdAction.get("result"),
resultDocuments=createdAction.get("resultDocuments", []),
userMessage=createdAction.get("userMessage")
@@ -745,7 +745,7 @@ class ActionplanMode(BaseMode):
retryCount=createdAction.get("retryCount", 0),
retryMax=createdAction.get("retryMax", 3),
processingTime=createdAction.get("processingTime"),
- timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())),
+ timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())),
result=createdAction.get("result"),
resultDocuments=createdAction.get("resultDocuments", []),
userMessage=createdAction.get("userMessage")
@@ -780,7 +780,7 @@ class ActionplanMode(BaseMode):
traceFile = os.path.join(logDir, "log_trace.log")
# Format the trace entry with better structure
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Create a structured trace entry
traceEntry = f"[{timestamp}] {contextText}\n"
diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py
index 84d38a45..2918ebba 100644
--- a/modules/workflows/processing/modes/modeReact.py
+++ b/modules/workflows/processing/modes/modeReact.py
@@ -693,7 +693,7 @@ class ReactMode(BaseMode):
"message": messageContent,
"status": status,
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": documentsLabel,
"documents": [],
"roundNumber": workflow.currentRound,
@@ -821,7 +821,7 @@ Return only the user-friendly message, no technical details."""
retryCount=createdAction.get("retryCount", 0),
retryMax=createdAction.get("retryMax", 3),
processingTime=createdAction.get("processingTime"),
- timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())),
+ timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())),
result=createdAction.get("result"),
resultDocuments=createdAction.get("resultDocuments", []),
userMessage=createdAction.get("userMessage")
@@ -912,7 +912,7 @@ Return only the user-friendly message, no technical details."""
retryCount=createdAction.get("retryCount", 0),
retryMax=createdAction.get("retryMax", 3),
processingTime=createdAction.get("processingTime"),
- timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())),
+ timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())),
result=createdAction.get("result"),
resultDocuments=createdAction.get("resultDocuments", []),
userMessage=createdAction.get("userMessage")
@@ -996,13 +996,13 @@ Return only the user-friendly message, no technical details."""
return "react_misc.jsonl"
# Daily suffix for React files
- dateSuffix = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d")
+ dateSuffix = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d")
baseReactFile = _reactFileForContext(contextText)
name, ext = os.path.splitext(baseReactFile)
reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}")
# Format the trace entry with better structure
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Create a structured trace entry
traceEntry = f"[{timestamp}] {contextText}\n"
diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py
index 787edbe1..13da0519 100644
--- a/modules/workflows/processing/workflowProcessor.py
+++ b/modules/workflows/processing/workflowProcessor.py
@@ -301,7 +301,7 @@ class WorkflowProcessor:
traceFile = os.path.join(logDir, "log_trace.log")
# Format the trace entry
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+ timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
traceEntry = f"[{timestamp}] {contextText}\n"
# Add data if provided - show full content without truncation
@@ -378,7 +378,7 @@ class WorkflowProcessor:
'actions': [action.to_dict() for action in taskActions] if taskActions else [],
'review_result': reviewResult,
'workflow_id': workflow.id,
- 'handover_time': self.services.utils.getUtcTimestamp()
+ 'handover_time': self.services.utils.timestampGetUtc()
}
logger.info(f"Prepared handover for task {taskStep.id} in workflow {workflow.id}")
return handoverData
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index f5f65060..743f840e 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -32,7 +32,7 @@ class WorkflowManager:
try:
# Debug log to check workflowMode parameter
logger.info(f"WorkflowManager received workflowMode: {workflowMode}")
- currentTime = self.services.utils.getUtcTimestamp()
+ currentTime = self.services.utils.timestampGetUtc()
if workflowId:
workflow = self.services.workflow.getWorkflow(workflowId)
@@ -117,7 +117,7 @@ class WorkflowManager:
raise ValueError(f"Workflow {workflowId} not found")
workflow.status = "stopped"
- workflow.lastActivity = self.services.utils.getUtcTimestamp()
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
self.services.workflow.updateWorkflow(workflowId, {
"status": "stopped",
"lastActivity": workflow.lastActivity
@@ -173,7 +173,7 @@ class WorkflowManager:
"message": userInput.prompt,
"status": "first",
"sequenceNr": 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": context_label,
"documents": [],
# Add workflow context fields
@@ -387,7 +387,7 @@ class WorkflowManager:
"message": "🛑 Workflow stopped by user",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": "workflow_stopped",
"documents": [],
# Add workflow context fields
@@ -402,7 +402,7 @@ class WorkflowManager:
# Update workflow status to stopped
workflow.status = "stopped"
- workflow.lastActivity = self.services.utils.getUtcTimestamp()
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
self.services.workflow.updateWorkflow(workflow.id, {
"status": "stopped",
"lastActivity": workflow.lastActivity
@@ -417,7 +417,7 @@ class WorkflowManager:
"message": "🛑 Workflow stopped by user",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": "workflow_stopped",
"documents": [],
# Add workflow context fields
@@ -432,7 +432,7 @@ class WorkflowManager:
# Update workflow status to stopped
workflow.status = "stopped"
- workflow.lastActivity = self.services.utils.getUtcTimestamp()
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
self.services.workflow.updateWorkflow(workflow.id, {
"status": "stopped",
"lastActivity": workflow.lastActivity,
@@ -456,7 +456,7 @@ class WorkflowManager:
"message": f"Workflow failed: {'Unknown error'}",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": "workflow_failure",
"documents": [],
# Add workflow context fields
@@ -471,7 +471,7 @@ class WorkflowManager:
# Update workflow status to failed
workflow.status = "failed"
- workflow.lastActivity = self.services.utils.getUtcTimestamp()
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
self.services.workflow.updateWorkflow(workflow.id, {
"status": "failed",
"lastActivity": workflow.lastActivity,
@@ -500,7 +500,7 @@ class WorkflowManager:
"message": f"Error processing workflow results: {str(e)}",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": "workflow_error",
"documents": [],
# Add workflow context fields
@@ -515,7 +515,7 @@ class WorkflowManager:
# Update workflow status to failed
workflow.status = "failed"
- workflow.lastActivity = self.services.utils.getUtcTimestamp()
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
self.services.workflow.updateWorkflow(workflow.id, {
"status": "failed",
"lastActivity": workflow.lastActivity,
@@ -541,7 +541,7 @@ class WorkflowManager:
"message": feedback,
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
- "publishedAt": self.services.utils.getUtcTimestamp(),
+ "publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": "workflow_feedback",
"documents": [],
# Add workflow context fields
@@ -558,7 +558,7 @@ class WorkflowManager:
# Update workflow status to completed
workflow.status = "completed"
- workflow.lastActivity = self.services.utils.getUtcTimestamp()
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
# Update workflow in database
self.services.workflow.updateWorkflow(workflow.id, {
diff --git a/test_document_processing.py b/test_document_processing.py
deleted file mode 100644
index 7d6bb64f..00000000
--- a/test_document_processing.py
+++ /dev/null
@@ -1,555 +0,0 @@
-"""
-Test script for document processing and DOCX generation.
-Calls the main AI service directly to process PDF documents and generate DOCX summaries.
-"""
-
-import asyncio
-import sys
-import os
-import logging
-import base64
-from datetime import datetime
-from pathlib import Path
-
-# Add the gateway module to the path
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'modules'))
-
-from modules.datamodels.datamodelChat import ChatDocument
-from modules.datamodels.datamodelAi import EnhancedAiCallOptions
-from modules.services.serviceAi.mainServiceAi import AiService
-from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
-
-# Set up logging
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-logger = logging.getLogger(__name__)
-
-
-async def process_documents_and_generate_summary():
- """Process documents using the main AI service with intelligent chunk integration."""
- logger.info("🚀 Starting intelligent chunk integration test...")
-
- # Find testdata directory
- testdata_path = Path("../wiki/poweron/testdata")
- if not testdata_path.exists():
- # Try relative to current directory
- testdata_path = Path("wiki/poweron/testdata")
- if not testdata_path.exists():
- # Try relative to parent directory
- testdata_path = Path("../wiki/poweron/testdata")
- if not testdata_path.exists():
- logger.error(f"❌ Testdata path not found. Tried:")
- logger.error(f" - ../wiki/poweron/testdata")
- logger.error(f" - wiki/poweron/testdata")
- logger.error(f" - ../wiki/poweron/testdata")
- logger.info("Please ensure the testdata folder exists with PDF documents")
- return False
-
- # Find all supported document files
- supported_extensions = [
- # Document formats
- "*.pdf", "*.docx", "*.xlsx", "*.pptx", "*.ppt",
- # Image formats
- "*.jpg", "*.jpeg", "*.png", "*.gif", "*.webp", "*.bmp", "*.tiff",
- # Text and code files
- "*.txt", "*.md", "*.log", "*.rtf", "*.tex", "*.rst", "*.adoc", "*.org", "*.pod",
- "*.java", "*.js", "*.jsx", "*.ts", "*.tsx", "*.py", "*.rb", "*.go", "*.rs", "*.cpp", "*.c", "*.h", "*.hpp", "*.cc", "*.cxx",
- "*.cs", "*.php", "*.swift", "*.kt", "*.scala", "*.clj", "*.hs", "*.ml", "*.fs", "*.vb", "*.dart", "*.r", "*.m", "*.pl", "*.sh",
- "*.html", "*.htm", "*.css", "*.scss", "*.sass", "*.less", "*.vue", "*.svelte",
- "*.config", "*.ini", "*.cfg", "*.conf", "*.properties", "*.yaml", "*.yml", "*.toml", "*.json", "*.xml",
- "*.bat", "*.ps1", "*.psm1", "*.psd1", "*.vbs", "*.wsf", "*.cmd", "*.com",
- "*.csv", "*.tsv", "*.tab", "*.dat", "*.data",
- "*.man", "*.1", "*.2", "*.3", "*.4", "*.5", "*.6", "*.7", "*.8", "*.9", "*.n", "*.l", "*.m", "*.r", "*.t", "*.x", "*.y", "*.z",
- "*.diff", "*.patch", "*.gitignore", "*.dockerignore", "*.editorconfig", "*.gitattributes",
- "*.env", "*.env.local", "*.env.development", "*.env.production", "*.env.test",
- "*.lock", "*.lockb", "*.lockfile", "*.pkg-lock", "*.yarn-lock"
- ]
- document_files = []
- for ext in supported_extensions:
- document_files.extend(list(testdata_path.glob(ext)))
-
- logger.info(f"Found {len(document_files)} document files in testdata:")
- for doc_file in document_files:
- logger.info(f" - {doc_file.name}")
-
- if not document_files:
- logger.error("❌ No supported document files found in testdata folder")
- return False
-
- try:
- # Mock the database interface to provide our file data BEFORE creating AI service
- class TestDbInterface:
- def __init__(self, file_data_map):
- self.file_data_map = file_data_map
-
- def getFileData(self, file_id):
- logger.info(f"TestDbInterface.getFileData called with file_id: {file_id}")
- data = self.file_data_map.get(file_id)
- if data:
- logger.info(f"✅ Found file data for {file_id}: {len(data)} bytes")
- else:
- logger.warning(f"❌ No file data found for {file_id}")
- return data
-
- # Create file data mapping
- file_data_map = {}
- for i, doc_file in enumerate(document_files):
- with open(doc_file, 'rb') as f:
- file_data_map[f"test_doc_{i+1}"] = f.read()
- logger.info(f"📁 Loaded {doc_file.name} as test_doc_{i+1}: {len(file_data_map[f'test_doc_{i+1}'])} bytes")
-
- # Mock the database interface BEFORE creating AI service
- import modules.interfaces.interfaceDbComponentObjects as db_interface_module
- original_get_interface = db_interface_module.getInterface
- db_interface_module.getInterface = lambda: TestDbInterface(file_data_map)
- logger.info("🔧 Database interface mocked successfully")
-
- # Create a mock service center with utils
- class MockServiceCenter:
- def __init__(self):
- self.utils = MockUtils()
-
- class MockUtils:
- def debugLogToFile(self, message, label):
- logger.debug(f"[{label}] {message}")
- print(f"DEBUG [{label}]: {message}") # Also print to console for visibility
-
- # Only write to debug file if debug logging is enabled (matching real implementation)
- debug_enabled = self.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- try:
- import os
- from datetime import datetime, UTC
- debug_dir = self.configGet("APP_DEBUG_CHAT_WORKFLOW_DIR", "./test-chat")
- if not os.path.isabs(debug_dir):
- # If relative path, make it relative to the gateway directory
- gateway_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- debug_dir = os.path.join(gateway_dir, debug_dir)
-
- os.makedirs(debug_dir, exist_ok=True)
- debug_file = os.path.join(debug_dir, "debug_workflow.log")
- timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
- debug_entry = f"[{timestamp}] [{label}] {message}\n"
- with open(debug_file, "a", encoding="utf-8") as f:
- f.write(debug_entry)
- except Exception:
- pass # Don't fail on debug logging errors
-
- def configGet(self, key, default):
- # Return debug settings
- if key == "APP_DEBUG_CHAT_WORKFLOW_ENABLED":
- return True
- elif key == "APP_DEBUG_CHAT_WORKFLOW_DIR":
- return "./test-chat"
- return default
-
- mock_service_center = MockServiceCenter()
-
- # Initialize the main AI service - let it handle everything
- logger.info("🔧 Initializing main AI service...")
- ai_service = await AiService.create(mock_service_center)
-
- # Create test documents - the AI service will handle file access internally
- documents = []
- logger.info(f"📁 Found {len(document_files)} document files")
- for i, doc_file in enumerate(document_files):
- logger.info(f"📄 Processing file {i+1}/{len(document_files)}: {doc_file.name}")
- # Determine MIME type based on file extension
- mime_type = "application/octet-stream" # default
- if doc_file.suffix.lower() == '.pdf':
- mime_type = "application/pdf"
- elif doc_file.suffix.lower() in ['.jpg', '.jpeg']:
- mime_type = "image/jpeg"
- elif doc_file.suffix.lower() == '.png':
- mime_type = "image/png"
- elif doc_file.suffix.lower() == '.gif':
- mime_type = "image/gif"
- elif doc_file.suffix.lower() == '.docx':
- mime_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
- elif doc_file.suffix.lower() == '.xlsx':
- mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
- elif doc_file.suffix.lower() == '.pptx':
- mime_type = "application/vnd.openxmlformats-officedocument.presentationml.presentation"
- elif doc_file.suffix.lower() == '.ppt':
- mime_type = "application/vnd.ms-powerpoint"
- elif doc_file.suffix.lower() == '.html':
- mime_type = "text/html"
- elif doc_file.suffix.lower() == '.csv':
- mime_type = "text/csv"
- elif doc_file.suffix.lower() == '.json':
- mime_type = "application/json"
- elif doc_file.suffix.lower() in ['.txt', '.md']:
- mime_type = "text/plain"
-
- chat_doc = ChatDocument(
- fileId=f"test_doc_{i+1}",
- messageId=f"test_message_{i+1}",
- fileName=doc_file.name,
- mimeType=mime_type,
- fileSize=doc_file.stat().st_size,
- roundNumber=1,
- taskNumber=1,
- actionNumber=1,
- actionId=f"test_action_{i+1}"
- )
- documents.append(chat_doc)
- logger.info(f"✅ Created ChatDocument: {chat_doc.fileName} ({chat_doc.mimeType}) - {chat_doc.fileSize} bytes")
-
- logger.info(f"📄 Created {len(documents)} document objects")
-
- # Create enhanced AI call options for intelligent chunked processing
- ai_options = EnhancedAiCallOptions(
- operationType="general",
- enableParallelProcessing=True,
- maxConcurrentChunks=5, # Increased for better testing
- preserveChunkMetadata=True,
- chunkSeparator="\n\n---\n\n"
- )
-
- # Call the main AI service directly - let it handle everything including DOCX generation
- logger.info("🤖 Calling main AI service with intelligent merging...")
-
-
- # Run a single end-to-end test to avoid the loop issue
- logger.info("🧪 Running single end-to-end test...")
-
- userPrompt = "Analyze the document containing mails for customer use cases. Can you create one file for each email in plain text format?"
-
- # userPrompt = "Can you create one file for each section in the document"
-
- # userPrompt = "Analyze these documents and create a fitting image for the content"
-
- # userPrompt = "Extract the table from file and produce 2 lists in excel. one list with all entries, one list only with entries that are yellow highlighted."
-
- # userPrompt = "Create a docx file containing a summary and the COMPLETE list from the pdf file, having one additional column with a 'x' marker for all items, which are yellow highlighted."
-
- # userPrompt = "Create a docx file containing the combined documents in french language."
-
- try:
- # Single AI call with DOCX generation
- ai_response = await ai_service.callAi(
- prompt=userPrompt,
- documents=documents,
- options=ai_options,
- outputFormat="txt",
- title="Kunden und Use Cases"
- )
-
- logger.info(f"✅ End-to-end test completed successfully")
- logger.info(f"📊 Response type: {type(ai_response)}")
- logger.info(f"📊 Response length: {len(str(ai_response))} characters")
-
- # Single test result
- test_results = [{
- "test_name": "End-to-End DOCX Generation",
- "success": True,
- "response_type": type(ai_response).__name__,
- "response_length": len(str(ai_response)),
- "response": ai_response
- }]
-
- except Exception as e:
- logger.error(f"❌ End-to-end test failed: {str(e)}")
- test_results = [{
- "test_name": "End-to-End DOCX Generation",
- "success": False,
- "error": str(e),
- "response": None
- }]
-
- logger.info(f"🎯 Completed 1 end-to-end test")
-
- # Process all test results and save outputs
- logger.info("📊 Processing test results...")
-
- successful_tests = [r for r in test_results if r['success']]
- failed_tests = [r for r in test_results if not r['success']]
-
- logger.info(f"✅ Successful tests: {len(successful_tests)}")
- logger.info(f"❌ Failed tests: {len(failed_tests)}")
-
- # Display test results summary
- logger.info("=" * 80)
- logger.info("END-TO-END TEST RESULTS SUMMARY")
- logger.info("=" * 80)
- for i, result in enumerate(test_results, 1):
- status = "✅ PASS" if result['success'] else "❌ FAIL"
- logger.info(f"Test {i}: {result['test_name']} - {status}")
- if result['success']:
- logger.info(f" Response Type: {result['response_type']}")
- logger.info(f" Response Length: {result['response_length']} characters")
- else:
- logger.info(f" Error: {result['error']}")
- logger.info("=" * 80)
-
- # Create output directory if it doesn't exist
- output_dir = Path("test-chat/unittestoutput")
- output_dir.mkdir(parents=True, exist_ok=True)
-
- # Save all test results and generated files
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
-
- logger.info("💾 Saving test results and generated files...")
-
- try:
- for i, result in enumerate(successful_tests, 1):
- test_name = result['test_name'].replace(' ', '_').lower()
- response = result['response']
-
- logger.info(f"💾 Saving Test {i}: {result['test_name']}")
-
- # Handle different response types
- if isinstance(response, dict):
- # Document generation response
- if 'documents' in response and response['documents']:
- logger.info(f"📄 Found {len(response['documents'])} documents in response")
-
- for j, doc in enumerate(response['documents']):
- doc_name = doc.get('documentName', f'{test_name}_document_{j+1}')
- doc_data = doc.get('documentData', '')
- doc_mime = doc.get('mimeType', 'application/octet-stream')
-
- logger.info(f"📄 Document {j+1}: {doc_name}")
- logger.info(f"📄 MIME Type: {doc_mime}")
- logger.info(f"📄 Data length: {len(doc_data)} characters")
-
- # Determine file extension with better MIME type detection
- file_ext = '.bin' # Default fallback
-
- if doc_mime:
- if 'docx' in doc_mime.lower() or 'wordprocessingml' in doc_mime.lower():
- file_ext = '.docx'
- elif 'pdf' in doc_mime.lower():
- file_ext = '.pdf'
- elif 'txt' in doc_mime.lower() or 'plain' in doc_mime.lower():
- file_ext = '.txt'
- elif 'html' in doc_mime.lower():
- file_ext = '.html'
- elif 'json' in doc_mime.lower():
- file_ext = '.json'
- elif 'csv' in doc_mime.lower():
- file_ext = '.csv'
- elif 'xlsx' in doc_mime.lower() or 'spreadsheetml' in doc_mime.lower():
- file_ext = '.xlsx'
- elif 'pptx' in doc_mime.lower() or 'presentationml' in doc_mime.lower():
- file_ext = '.pptx'
- elif 'markdown' in doc_mime.lower() or 'md' in doc_mime.lower():
- file_ext = '.md'
- elif 'png' in doc_mime.lower() or 'image' in doc_mime.lower():
- file_ext = '.png'
- elif 'jpg' in doc_mime.lower() or 'jpeg' in doc_mime.lower():
- file_ext = '.jpg'
- else:
- logger.warning(f"⚠️ Unknown MIME type: {doc_mime}, using .bin")
-
- # Also check filename for hints
- if doc_name and '.' in doc_name:
- name_ext = '.' + doc_name.split('.')[-1].lower()
- if name_ext in ['.docx', '.pdf', '.txt', '.html', '.json', '.csv', '.xlsx', '.pptx', '.md', '.png', '.jpg', '.jpeg']:
- file_ext = name_ext
- logger.info(f"📄 Using extension from filename: {file_ext}")
-
- logger.info(f"📄 Final file extension: {file_ext}")
-
- # Save document
- output_path = output_dir / f"{test_name}_{timestamp}{file_ext}"
-
- # Handle different content types
- if file_ext in ['.md', '.txt', '.html', '.json', '.csv']:
- # Text-based formats - save directly as text
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write(doc_data)
- logger.info(f"✅ Document saved as text: {output_path} ({len(doc_data)} characters)")
- elif file_ext in ['.png', '.jpg', '.jpeg']:
- # Image formats - decode from base64
- try:
- doc_bytes = base64.b64decode(doc_data)
- with open(output_path, 'wb') as f:
- f.write(doc_bytes)
- logger.info(f"✅ Image saved: {output_path} ({len(doc_bytes)} bytes)")
- except Exception as e:
- logger.warning(f"⚠️ Failed to decode image as base64: {e}")
- # Save as text if base64 decoding fails
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write(doc_data)
- logger.info(f"✅ Image saved as text (fallback): {output_path}")
- else:
- # Other binary formats - decode from base64
- try:
- doc_bytes = base64.b64decode(doc_data)
- with open(output_path, 'wb') as f:
- f.write(doc_bytes)
- logger.info(f"✅ Document saved as binary: {output_path} ({len(doc_bytes)} bytes)")
- except Exception as e:
- logger.warning(f"⚠️ Failed to decode document as base64: {e}")
- # Save as text if base64 decoding fails
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write(doc_data)
- logger.info(f"✅ Document saved as text (fallback): {output_path}")
-
- # Also save raw content as text
- content = response.get('content', '')
- if content:
- text_path = output_dir / f"{test_name}_content_{timestamp}.txt"
- with open(text_path, 'w', encoding='utf-8') as f:
- # Handle both string and dictionary content
- if isinstance(content, dict):
- import json
- f.write(json.dumps(content, indent=2, ensure_ascii=False))
- else:
- f.write(str(content))
- logger.info(f"✅ Content saved: {text_path}")
-
- elif isinstance(response, str):
- # Text response
- text_path = output_dir / f"{test_name}_response_{timestamp}.txt"
- with open(text_path, 'w', encoding='utf-8') as f:
- f.write(response)
- logger.info(f"✅ Text response saved: {text_path}")
-
- else:
- logger.warning(f"⚠️ Unknown response type for {result['test_name']}: {type(response)}")
-
- # Save failed test details
- if failed_tests:
- error_path = output_dir / f"failed_tests_{timestamp}.txt"
- with open(error_path, 'w', encoding='utf-8') as f:
- f.write("# Failed Test Details\n\n")
- for i, result in enumerate(failed_tests, 1):
- f.write(f"## Test {i}: {result['test_name']}\n")
- f.write(f"**Error:** {result['error']}\n\n")
- logger.info(f"✅ Failed test details saved: {error_path}")
-
- except Exception as e:
- logger.error(f"❌ Error saving test results: {str(e)}")
- return False
-
- # Save comprehensive test report
- report_path = output_dir / f"end_to_end_test_report_{timestamp}.txt"
- with open(report_path, 'w', encoding='utf-8') as f:
- f.write(f"# End-to-End AI Service Test Report\n")
- f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
-
- f.write(f"## Test Configuration\n")
- f.write(f"- Documents processed: {len(documents)}\n")
- f.write(f"- Processing method: Intelligent Token-Aware Merging\n")
- f.write(f"- Parallel processing: {ai_options.enableParallelProcessing}\n")
- f.write(f"- Max concurrent chunks: {ai_options.maxConcurrentChunks}\n")
- f.write(f"- Chunk metadata preserved: {ai_options.preserveChunkMetadata}\n")
- f.write(f"- Chunk separator: '{ai_options.chunkSeparator}'\n\n")
-
- f.write(f"## Document Inventory\n")
- for i, doc in enumerate(documents, 1):
- f.write(f"{i}. **{doc.fileName}**\n")
- f.write(f" - MIME Type: {doc.mimeType}\n")
- f.write(f" - File Size: {doc.fileSize:,} bytes\n")
- f.write(f" - File ID: {doc.fileId}\n\n")
-
- f.write(f"## Test Results Summary\n")
- f.write(f"- Total Tests: {len(test_results)}\n")
- f.write(f"- Successful: {len(successful_tests)}\n")
- f.write(f"- Failed: {len(failed_tests)}\n")
- f.write(f"- Success Rate: {len(successful_tests)/len(test_results)*100:.1f}%\n\n")
-
- f.write(f"## Detailed Test Results\n")
- for i, result in enumerate(test_results, 1):
- f.write(f"### Test {i}: {result['test_name']}\n")
- f.write(f"**Status:** {'✅ PASS' if result['success'] else '❌ FAIL'}\n")
-
- if result['success']:
- f.write(f"**Response Type:** {result['response_type']}\n")
- f.write(f"**Response Length:** {result['response_length']} characters\n")
-
- # Show response preview
- response_preview = str(result['response'])[:500]
- f.write(f"**Response Preview:**\n```\n{response_preview}...\n```\n\n")
- else:
- f.write(f"**Error:** {result['error']}\n\n")
-
- f.write(f"## Technical Implementation Details\n")
- f.write(f"This test validates the complete AI service pipeline:\n\n")
- f.write(f"### Tested Components:\n")
- f.write(f"- **Document Extraction**: PDF, DOCX, images, etc.\n")
- f.write(f"- **Intelligent Chunking**: Token-aware merging\n")
- f.write(f"- **Model Selection**: Automatic AI model choice\n")
- f.write(f"- **Parallel Processing**: Concurrent chunk processing\n")
- f.write(f"- **Document Generation**: DOCX, PDF, text output\n")
- f.write(f"- **Error Handling**: Graceful failure management\n\n")
-
- f.write(f"### Performance Metrics:\n")
- f.write(f"- **Chunk Optimization**: Intelligent merging reduces AI calls\n")
- f.write(f"- **Processing Speed**: Parallel execution\n")
- f.write(f"- **Memory Efficiency**: Token-aware chunking\n")
- f.write(f"- **Output Quality**: Multiple format support\n\n")
-
- f.write(f"## Generated Files\n")
- for i, result in enumerate(successful_tests, 1):
- test_name = result['test_name'].replace(' ', '_').lower()
- f.write(f"- **Test {i}**: {result['test_name']} → `{test_name}_*_{timestamp}.*`\n")
-
- if failed_tests:
- f.write(f"- **Failed Tests**: `failed_tests_{timestamp}.txt`\n")
-
- f.write(f"- **This Report**: `end_to_end_test_report_{timestamp}.txt`\n\n")
-
- f.write(f"The end-to-end test successfully validates the complete AI service\n")
- f.write(f"pipeline from document input to formatted output generation.\n")
-
- logger.info(f"✅ Comprehensive test report saved: {report_path}")
-
- # Show debug file locations
- debug_files = []
- try:
- debug_dir = Path("test-chat")
- if debug_dir.exists():
- debug_files.extend(list(debug_dir.glob("*.log")))
- debug_files.extend(list(debug_dir.glob("ai/*.txt")))
-
- if debug_files:
- logger.info("📁 Debug files created:")
- for debug_file in debug_files:
- logger.info(f" - {debug_file}")
- else:
- logger.info("📁 No debug files found in test-chat directory")
- except Exception as e:
- logger.warning(f"Could not list debug files: {e}")
-
- # Restore original database interface
- db_interface_module.getInterface = original_get_interface
-
- return True
-
- except Exception as e:
- logger.error(f"❌ Error during document processing: {str(e)}")
- import traceback
- logger.error(f"Traceback: {traceback.format_exc()}")
-
- # Restore original database interface in case of error
- try:
- db_interface_module.getInterface = original_get_interface
- except:
- pass
-
- return False
-
-async def main():
- """Main function to run the intelligent chunk integration test."""
- logger.info("🎯 Starting Intelligent Chunk Integration Test")
- logger.info("=" * 60)
-
- success = await process_documents_and_generate_summary()
-
- if success:
- logger.info("🎉 Intelligent chunk integration test completed successfully!")
- logger.info("✅ Main AI service handled all processing internally")
- logger.info("✅ Intelligent token-aware merging activated")
- logger.info("✅ DOCX document generated directly by AI service")
- logger.info("✅ Detailed chunk integration analysis saved")
- logger.info("✅ Performance optimization achieved")
- else:
- logger.error("❌ Test failed!")
- logger.error("Please check the error messages above for details")
-
- logger.info("=" * 60)
-
-if __name__ == "__main__":
- asyncio.run(main())
\ No newline at end of file