From a7c9e0221f59cae9b90775d03445a4c60c901273 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Fri, 31 Oct 2025 00:28:09 +0100
Subject: [PATCH] refactored names to camelCaes part 2 of 2
---
modules/connectors/connectorVoiceGoogle.py | 188 +++----
modules/features/syncDelta/mainSyncDelta.py | 222 ++++-----
modules/interfaces/interfaceVoiceObjects.py | 38 +-
modules/routes/routeDataConnections.py | 63 ++-
modules/services/serviceAi/mainServiceAi.py | 26 +-
.../serviceAi/subDocumentProcessing.py | 463 ------------------
.../mainServiceExtraction.py | 274 ++++++++++-
.../services/serviceUtils/mainServiceUtils.py | 22 +-
modules/shared/eventManagement.py | 54 +-
9 files changed, 581 insertions(+), 769 deletions(-)
delete mode 100644 modules/services/serviceAi/subDocumentProcessing.py
diff --git a/modules/connectors/connectorVoiceGoogle.py b/modules/connectors/connectorVoiceGoogle.py
index 5cb43f35..68d0cd31 100644
--- a/modules/connectors/connectorVoiceGoogle.py
+++ b/modules/connectors/connectorVoiceGoogle.py
@@ -55,13 +55,13 @@ class ConnectorGoogleSpeech:
logger.error(f"❌ Failed to initialize Google Cloud clients: {e}")
raise
- async def speech_to_text(self, audioContent: bytes, language: str = "de-DE",
+ async def speechToText(self, audioContent: bytes, language: str = "de-DE",
sampleRate: int = None, channels: int = None) -> Dict:
"""
Convert speech to text using Google Cloud Speech-to-Text API.
Args:
- audio_content: Raw audio data (various formats supported)
+ audioContent: Raw audio data (various formats supported)
language: Language code (e.g., 'de-DE', 'en-US')
sample_rate: Audio sample rate (auto-detected if None)
channels: Number of audio channels (auto-detected if None)
@@ -72,7 +72,7 @@ class ConnectorGoogleSpeech:
try:
# Auto-detect audio format if not provided
if sampleRate is None or channels is None:
- validation = self.validate_audio_format(audioContent)
+ validation = self.validateAudioFormat(audioContent)
if not validation["valid"]:
return {
"success": False,
@@ -348,8 +348,8 @@ class ConnectorGoogleSpeech:
"error": str(e)
}
- async def translate_text(self, text: str, target_language: str = "en",
- source_language: str = "de") -> Dict:
+ async def translateText(self, text: str, targetLanguage: str = "en",
+ sourceLanguage: str = "de") -> Dict:
"""
Translate text using Google Cloud Translation API.
@@ -370,28 +370,28 @@ class ConnectorGoogleSpeech:
"error": "Empty text provided"
}
- logger.info(f"🌐 Translating: '{text}' ({source_language} -> {target_language})")
+ logger.info(f"🌐 Translating: '{text}' ({sourceLanguage} -> {targetLanguage})")
# Perform translation
result = self.translate_client.translate(
text,
- source_language=source_language,
- target_language=target_language
+ source_language=sourceLanguage,
+ target_language=targetLanguage
)
- translated_text = result['translatedText']
- detected_language = result.get('detectedSourceLanguage', source_language)
+ translatedText = result['translatedText']
+ detectedLanguage = result.get('detectedSourceLanguage', sourceLanguage)
# Decode HTML entities in translated text
- translated_text = html.unescape(translated_text)
+ translatedText = html.unescape(translatedText)
- logger.info(f"✅ Translation successful: '{translated_text}'")
+ logger.info(f"✅ Translation successful: '{translatedText}'")
return {
"success": True,
- "translated_text": translated_text,
- "source_language": detected_language,
- "target_language": target_language,
+ "translated_text": translatedText,
+ "source_language": detectedLanguage,
+ "target_language": targetLanguage,
"original_text": text
}
@@ -403,14 +403,14 @@ class ConnectorGoogleSpeech:
"error": str(e)
}
- async def speech_to_translated_text(self, audio_content: bytes,
- from_language: str = "de-DE",
- to_language: str = "en") -> Dict:
+ async def speechToTranslatedText(self, audioContent: bytes,
+ fromLanguage: str = "de-DE",
+ toLanguage: str = "en") -> Dict:
"""
Complete pipeline: Speech-to-Text + Translation.
Args:
- audio_content: Raw audio data
+ audioContent: Raw audio data
from_language: Source language for speech recognition
to_language: Target language for translation
@@ -418,52 +418,52 @@ class ConnectorGoogleSpeech:
Dict containing original text, translated text, and metadata
"""
try:
- logger.info(f"🔄 Starting speech-to-translation pipeline: {from_language} -> {to_language}")
+ logger.info(f"🔄 Starting speech-to-translation pipeline: {fromLanguage} -> {toLanguage}")
# Step 1: Speech-to-Text
- speech_result = await self.speech_to_text(
- audio_content=audio_content,
- language=from_language
+ speechResult = await self.speechToText(
+ audioContent=audioContent,
+ language=fromLanguage
)
- if not speech_result["success"]:
+ if not speechResult["success"]:
return {
"success": False,
"original_text": "",
"translated_text": "",
- "error": f"Speech recognition failed: {speech_result.get('error', 'Unknown error')}"
+ "error": f"Speech recognition failed: {speechResult.get('error', 'Unknown error')}"
}
- original_text = speech_result["text"]
+ originalText = speechResult["text"]
# Step 2: Translation
- translation_result = await self.translate_text(
- text=original_text,
- source_language=from_language.split('-')[0], # Convert 'de-DE' to 'de'
- target_language=to_language.split('-')[0] # Convert 'en-US' to 'en'
+ translationResult = await self.translateText(
+ text=originalText,
+ sourceLanguage=fromLanguage.split('-')[0], # Convert 'de-DE' to 'de'
+ targetLanguage=toLanguage.split('-')[0] # Convert 'en-US' to 'en'
)
- if not translation_result["success"]:
+ if not translationResult["success"]:
return {
"success": False,
- "original_text": original_text,
+ "original_text": originalText,
"translated_text": "",
- "error": f"Translation failed: {translation_result.get('error', 'Unknown error')}"
+ "error": f"Translation failed: {translationResult.get('error', 'Unknown error')}"
}
- translated_text = translation_result["translated_text"]
+ translatedText = translationResult["translated_text"]
logger.info(f"✅ Complete pipeline successful:")
- logger.info(f" Original: '{original_text}'")
- logger.info(f" Translated: '{translated_text}'")
+ logger.info(f" Original: '{originalText}'")
+ logger.info(f" Translated: '{translatedText}'")
return {
"success": True,
- "original_text": original_text,
- "translated_text": translated_text,
- "confidence": speech_result["confidence"],
- "source_language": from_language,
- "target_language": to_language
+ "original_text": originalText,
+ "translated_text": translatedText,
+ "confidence": speechResult["confidence"],
+ "source_language": fromLanguage,
+ "target_language": toLanguage
}
except Exception as e:
@@ -475,19 +475,19 @@ class ConnectorGoogleSpeech:
"error": str(e)
}
- def validate_audio_format(self, audio_content: bytes) -> Dict:
+ def validateAudioFormat(self, audioContent: bytes) -> Dict:
"""
Validate audio format for Google Cloud Speech-to-Text.
Args:
- audio_content: Raw audio data
+ audioContent: Raw audio data
Returns:
Dict containing validation results
"""
try:
# Basic validation
- if len(audio_content) < 100:
+ if len(audioContent) < 100:
return {
"valid": False,
"error": "Audio too short (less than 100 bytes)"
@@ -499,11 +499,11 @@ class ConnectorGoogleSpeech:
channels = 1 # Default fallback
# Debug: Log first few bytes for format detection
- logger.debug(f"Audio header bytes: {audio_content[:20].hex()}")
- logger.debug(f"Audio content length: {len(audio_content)} bytes")
+ logger.debug(f"Audio header bytes: {audioContent[:20].hex()}")
+ logger.debug(f"Audio content length: {len(audioContent)} bytes")
# Check for WEBM/OPUS format (common from web recordings)
- if audio_content.startswith(b'\x1a\x45\xdf\xa3'):
+ if audioContent.startswith(b'\x1a\x45\xdf\xa3'):
audio_format = "webm_opus"
sample_rate = 48000 # WEBM OPUS typically uses 48kHz
channels = 1
@@ -511,10 +511,10 @@ class ConnectorGoogleSpeech:
# Check for specific header patterns seen in logs (43c381...)
# This appears to be a different audio format or corrupted WEBM
- elif audio_content.startswith(b'\x43\xc3\x81') and len(audio_content) > 1000:
+ elif audioContent.startswith(b'\x43\xc3\x81') and len(audioContent) > 1000:
# This might be a different format or corrupted audio
# Try to detect if it's actually WEBM by looking deeper
- if b'webm' in audio_content[:200] or b'opus' in audio_content[:200]:
+ if b'webm' in audioContent[:200] or b'opus' in audioContent[:200]:
audio_format = "webm_opus"
sample_rate = 48000
channels = 1
@@ -524,58 +524,58 @@ class ConnectorGoogleSpeech:
audio_format = "linear16"
sample_rate = 16000
channels = 1
- logger.warning(f"Unknown audio format with header {audio_content[:8].hex()}, trying LINEAR16")
+ logger.warning(f"Unknown audio format with header {audioContent[:8].hex()}, trying LINEAR16")
# Check for WEBM format (alternative detection)
- elif b'webm' in audio_content[:100].lower() or b'opus' in audio_content[:100].lower():
+ elif b'webm' in audioContent[:100].lower() or b'opus' in audioContent[:100].lower():
audio_format = "webm_opus"
sample_rate = 48000 # WEBM OPUS typically uses 48kHz
channels = 1
logger.info(f"Detected WEBM format: {sample_rate}Hz, {channels}ch")
# Check for MediaRecorder WEBM chunks (common in browser recordings)
- elif audio_content.startswith(b'\x1a\x45\xdf\xa3') and len(audio_content) > 1000:
+ elif audioContent.startswith(b'\x1a\x45\xdf\xa3') and len(audioContent) > 1000:
audio_format = "webm_opus"
sample_rate = 48000 # Browser MediaRecorder typically uses 48kHz
channels = 1
logger.info(f"Detected MediaRecorder WEBM: {sample_rate}Hz, {channels}ch")
# Check for OPUS format by looking for OPUS magic bytes
- elif audio_content.startswith(b'OpusHead') or b'OpusHead' in audio_content[:50]:
+ elif audioContent.startswith(b'OpusHead') or b'OpusHead' in audioContent[:50]:
audio_format = "webm_opus"
sample_rate = 48000 # OPUS typically uses 48kHz
channels = 1
logger.info(f"Detected OPUS format: {sample_rate}Hz, {channels}ch")
# Check for OGG format (often contains OPUS)
- elif audio_content.startswith(b'OggS'):
+ elif audioContent.startswith(b'OggS'):
audio_format = "webm_opus"
sample_rate = 48000 # OGG OPUS typically uses 48kHz
channels = 1
logger.info(f"Detected OGG format: {sample_rate}Hz, {channels}ch")
# Check for WAV format
- elif audio_content.startswith(b'RIFF') and b'WAVE' in audio_content[:12]:
+ elif audioContent.startswith(b'RIFF') and b'WAVE' in audioContent[:12]:
audio_format = "wav"
# Try to extract sample rate from WAV header
try:
# WAV header sample rate is at offset 24-27 (little endian)
- sample_rate = int.from_bytes(audio_content[24:28], 'little')
- channels = int.from_bytes(audio_content[22:24], 'little')
+ sample_rate = int.from_bytes(audioContent[24:28], 'little')
+ channels = int.from_bytes(audioContent[22:24], 'little')
logger.info(f"Detected WAV format: {sample_rate}Hz, {channels}ch")
except:
sample_rate = 16000 # Fallback
channels = 1
# Check for MP3 format
- elif audio_content.startswith(b'\xff\xfb') or audio_content.startswith(b'ID3'):
+ elif audioContent.startswith(b'\xff\xfb') or audioContent.startswith(b'ID3'):
audio_format = "mp3"
sample_rate = 44100 # MP3 typically uses 44.1kHz
channels = 2 # Usually stereo
logger.info(f"Detected MP3 format: {sample_rate}Hz, {channels}ch")
# Check for FLAC format
- elif audio_content.startswith(b'fLaC'):
+ elif audioContent.startswith(b'fLaC'):
audio_format = "flac"
sample_rate = 44100 # Common FLAC sample rate
channels = 2
@@ -594,31 +594,31 @@ class ConnectorGoogleSpeech:
estimated_duration = 3.0 # Assume 3 seconds for web recordings
else:
# Rough estimate for uncompressed audio
- estimated_duration = len(audio_content) / (sample_rate * channels * 2) # 16-bit = 2 bytes per sample
+ estimated_duration = len(audioContent) / (sample_rate * channels * 2) # 16-bit = 2 bytes per sample
# Check if audio is too short (less than 0.5 seconds)
if estimated_duration < 0.5:
logger.warning(f"Audio too short: {estimated_duration:.2f}s, may not be recognized")
# Log audio details for debugging
- logger.info(f"Audio analysis: {len(audio_content)} bytes, {estimated_duration:.2f}s, {sample_rate}Hz, {channels}ch, format={audio_format}")
+ logger.info(f"Audio analysis: {len(audioContent)} bytes, {estimated_duration:.2f}s, {sample_rate}Hz, {channels}ch, format={audio_format}")
# Check audio levels (simple check for silence)
if audio_format == "webm_opus":
# For WEBM, we can't easily check levels, but log the first few bytes
- logger.debug(f"Audio sample bytes: {audio_content[:20].hex()}")
+ logger.debug(f"Audio sample bytes: {audioContent[:20].hex()}")
# Check if audio has some variation (not all same bytes)
- if len(audio_content) > 100:
- sample_bytes = audio_content[100:200] # Skip header
+ if len(audioContent) > 100:
+ sample_bytes = audioContent[100:200] # Skip header
if len(set(sample_bytes)) < 5: # Less than 5 different byte values
logger.warning("Audio may be silent or very quiet (low byte variation)")
else:
logger.debug(f"Audio has good byte variation: {len(set(sample_bytes))} unique values")
else:
# For PCM audio, check for silence
- if len(audio_content) > 100:
+ if len(audioContent) > 100:
# Convert first 100 bytes to check for silence
- sample_bytes = audio_content[:100]
+ sample_bytes = audioContent[:100]
if all(b == 0 for b in sample_bytes):
logger.warning("Audio appears to be silent (all zeros)")
else:
@@ -632,7 +632,7 @@ class ConnectorGoogleSpeech:
"format": audio_format,
"sample_rate": sample_rate,
"channels": channels,
- "size": len(audio_content),
+ "size": len(audioContent),
"estimated_duration": estimated_duration
}
@@ -642,7 +642,7 @@ class ConnectorGoogleSpeech:
"error": f"Validation error: {e}"
}
- async def text_to_speech(self, text: str, language_code: str = "de-DE", voice_name: str = None) -> Dict[str, Any]:
+ async def textToSpeech(self, text: str, languageCode: str = "de-DE", voiceName: str = None) -> Dict[str, Any]:
"""
Convert text to speech using Google Cloud Text-to-Speech.
@@ -655,38 +655,38 @@ class ConnectorGoogleSpeech:
Dict with success status and audio data
"""
try:
- logger.info(f"Converting text to speech: '{text[:50]}...' in {language_code}")
+ logger.info(f"Converting text to speech: '{text[:50]}...' in {languageCode}")
# Set up the synthesis input
- synthesis_input = texttospeech.SynthesisInput(text=text)
+ synthesisInput = texttospeech.SynthesisInput(text=text)
# Build the voice request
- selected_voice = voice_name or self._get_default_voice(language_code)
+ selectedVoice = voiceName or self._getDefaultVoice(languageCode)
- if not selected_voice:
+ if not selectedVoice:
return {
"success": False,
- "error": f"No voice specified for language {language_code}. Please select a voice."
+ "error": f"No voice specified for language {languageCode}. Please select a voice."
}
- logger.info(f"Using TTS voice: {selected_voice} for language: {language_code}")
+ logger.info(f"Using TTS voice: {selectedVoice} for language: {languageCode}")
voice = texttospeech.VoiceSelectionParams(
- language_code=language_code,
- name=selected_voice,
+ language_code=languageCode,
+ name=selectedVoice,
ssml_gender=texttospeech.SsmlVoiceGender.NEUTRAL
)
# Select the type of audio file to return
- audio_config = texttospeech.AudioConfig(
+ audioConfig = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
# Perform the text-to-speech request
response = self.tts_client.synthesize_speech(
- input=synthesis_input,
+ input=synthesisInput,
voice=voice,
- audio_config=audio_config
+ audio_config=audioConfig
)
# Return the audio content
@@ -694,7 +694,7 @@ class ConnectorGoogleSpeech:
"success": True,
"audio_content": response.audio_content,
"audio_format": "mp3",
- "language_code": language_code,
+ "language_code": languageCode,
"voice_name": voice.name
}
@@ -705,14 +705,14 @@ class ConnectorGoogleSpeech:
"error": f"Text-to-Speech failed: {str(e)}"
}
- def _get_default_voice(self, language_code: str) -> str:
+ def _getDefaultVoice(self, languageCode: str) -> str:
"""
Get default voice name for a language code.
Returns None - no defaults, let the frontend handle voice selection.
"""
return None
- async def get_available_languages(self) -> Dict[str, Any]:
+ async def getAvailableLanguages(self) -> Dict[str, Any]:
"""
Get available languages from Google Cloud Text-to-Speech.
@@ -749,7 +749,7 @@ class ConnectorGoogleSpeech:
"languages": []
}
- async def get_available_voices(self, language_code: Optional[str] = None) -> Dict[str, Any]:
+ async def getAvailableVoices(self, languageCode: Optional[str] = None) -> Dict[str, Any]:
"""
Get available voices from Google Cloud Text-to-Speech.
@@ -760,19 +760,19 @@ class ConnectorGoogleSpeech:
Dict containing success status and list of available voices
"""
try:
- logger.info(f"🎤 Getting available voices from Google Cloud TTS, language filter: {language_code}")
+ logger.info(f"🎤 Getting available voices from Google Cloud TTS, language filter: {languageCode}")
# List voices from Google Cloud TTS
voices = self.tts_client.list_voices()
- available_voices = []
+ availableVoices = []
for voice in voices:
# Extract language code from voice name (e.g., 'de-DE-Wavenet-A' -> 'de-DE')
- voice_language = voice.language_codes[0] if voice.language_codes else None
+ voiceLanguage = voice.language_codes[0] if voice.language_codes else None
# Filter by language if specified
- if language_code and voice_language != language_code:
+ if languageCode and voiceLanguage != languageCode:
continue
# Determine gender from voice name (A/C = male, B/D = female)
@@ -784,25 +784,25 @@ class ConnectorGoogleSpeech:
gender = "Female"
# Create voice info
- voice_info = {
+ voiceInfo = {
"name": voice.name,
- "language_code": voice_language,
+ "language_code": voiceLanguage,
"gender": gender,
"ssml_gender": voice.ssml_gender.name if voice.ssml_gender else "NEUTRAL",
"natural_sample_rate_hertz": voice.natural_sample_rate_hertz
}
- available_voices.append(voice_info)
+ availableVoices.append(voiceInfo)
# Sort by language code, then by gender, then by name
- available_voices.sort(key=lambda x: (x["language_code"], x["gender"], x["name"]))
+ availableVoices.sort(key=lambda x: (x["language_code"], x["gender"], x["name"]))
- logger.info(f"✅ Found {len(available_voices)} voices for language filter: {language_code}")
+ logger.info(f"✅ Found {len(availableVoices)} voices for language filter: {languageCode}")
return {
"success": True,
- "voices": available_voices,
- "total_count": len(available_voices)
+ "voices": availableVoices,
+ "total_count": len(availableVoices)
}
except Exception as e:
diff --git a/modules/features/syncDelta/mainSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py
index 3fc9e7af..2d6a2b1b 100644
--- a/modules/features/syncDelta/mainSyncDelta.py
+++ b/modules/features/syncDelta/mainSyncDelta.py
@@ -76,7 +76,7 @@ class ManagerSyncDelta:
try:
if not eventUser:
logger.error("Event user not found - SharePoint connection required")
- self._log_audit_event("SYNC_INIT", "FAILED", "Event user not found")
+ self._logAuditEvent("SYNC_INIT", "FAILED", "Event user not found")
else:
self.services = getServices(eventUser, None)
# Read config values using services
@@ -88,56 +88,56 @@ class ManagerSyncDelta:
logger.error(
f"No SharePoint connection found for user: {self.SHAREPOINT_USER_ID}"
)
- self._log_audit_event("SYNC_INIT", "FAILED", f"No SharePoint connection for user: {self.SHAREPOINT_USER_ID}")
+ self._logAuditEvent("SYNC_INIT", "FAILED", f"No SharePoint connection for user: {self.SHAREPOINT_USER_ID}")
else:
# Configure SharePoint service token and set connector reference
if not self.services.sharepoint.setAccessTokenFromConnection(
self.sharepointConnection
):
logger.error("Failed to set SharePoint token from UserConnection")
- self._log_audit_event("SYNC_INIT", "FAILED", "Failed to set SharePoint token")
+ self._logAuditEvent("SYNC_INIT", "FAILED", "Failed to set SharePoint token")
else:
logger.info(
f"SharePoint token configured for connection: {self.sharepointConnection.id}"
)
- self._log_audit_event("SYNC_INIT", "SUCCESS", f"SharePoint token configured for connection: {self.sharepointConnection.id}")
+ self._logAuditEvent("SYNC_INIT", "SUCCESS", f"SharePoint token configured for connection: {self.sharepointConnection.id}")
except Exception as e:
logger.error(f"Initialization error in ManagerSyncDelta.__init__: {e}")
- self._log_audit_event("SYNC_INIT", "ERROR", f"Initialization error: {str(e)}")
+ self._logAuditEvent("SYNC_INIT", "ERROR", f"Initialization error: {str(e)}")
- def _log_audit_event(self, action: str, status: str, details: str):
+ def _logAuditEvent(self, action: str, status: str, details: str):
"""Log audit events for sync operations to memory."""
try:
timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
- user_id = str(self.eventUser.id) if self.eventUser else "system"
- log_entry = f"{timestamp} | {user_id} | {action} | {status} | {details}"
- self.sync_audit_log.append(log_entry)
- logger.info(f"Sync Audit: {log_entry}")
+ userId = str(self.eventUser.id) if self.eventUser else "system"
+ logEntry = f"{timestamp} | {userId} | {action} | {status} | {details}"
+ self.sync_audit_log.append(logEntry)
+ logger.info(f"Sync Audit: {logEntry}")
except Exception as e:
logger.warning(f"Failed to log audit event: {str(e)}")
- def _log_sync_changes(self, merge_details: dict, sync_mode: str):
+ def _logSyncChanges(self, mergeDetails: dict, syncMode: str):
"""Log detailed field changes for sync operations."""
try:
# Log summary statistics
- summary = f"Sync {sync_mode} - Updated: {merge_details['updated']}, Added: {merge_details['added']}, Unchanged: {merge_details['unchanged']}"
- self._log_audit_event("SYNC_CHANGES_SUMMARY", "INFO", summary)
+ summary = f"Sync {syncMode} - Updated: {mergeDetails['updated']}, Added: {mergeDetails['added']}, Unchanged: {mergeDetails['unchanged']}"
+ self._logAuditEvent("SYNC_CHANGES_SUMMARY", "INFO", summary)
# Log individual field changes (limit to first 10 to avoid spam)
- for change in merge_details['changes'][:10]:
+ for change in mergeDetails['changes'][:10]:
# Truncate very long changes to avoid logging issues
if len(change) > 500:
change = change[:500] + "... [truncated]"
- self._log_audit_event("SYNC_FIELD_CHANGE", "INFO", f"{sync_mode}: {change}")
+ self._logAuditEvent("SYNC_FIELD_CHANGE", "INFO", f"{syncMode}: {change}")
# Log count if there were more changes
- if len(merge_details['changes']) > 10:
- self._log_audit_event("SYNC_FIELD_CHANGE", "INFO", f"{sync_mode}: ... and {len(merge_details['changes']) - 10} more changes")
+ if len(mergeDetails['changes']) > 10:
+ self._logAuditEvent("SYNC_FIELD_CHANGE", "INFO", f"{syncMode}: ... and {len(mergeDetails['changes']) - 10} more changes")
except Exception as e:
logger.warning(f"Failed to log sync changes: {str(e)}")
- async def _save_audit_log_to_sharepoint(self):
+ async def _saveAuditLogToSharepoint(self):
"""Save the sync audit log to SharePoint."""
try:
if not self.sync_audit_log or not self.targetSite:
@@ -160,12 +160,12 @@ class ManagerSyncDelta:
)
logger.info(f"Sync audit log saved to SharePoint: {log_filename}")
- self._log_audit_event("AUDIT_LOG_SAVE", "SUCCESS", f"Audit log saved to SharePoint: {log_filename}")
+ self._logAuditEvent("AUDIT_LOG_SAVE", "SUCCESS", f"Audit log saved to SharePoint: {log_filename}")
return True
except Exception as e:
logger.error(f"Failed to save audit log to SharePoint: {str(e)}")
- self._log_audit_event("AUDIT_LOG_SAVE", "FAILED", f"Failed to save audit log: {str(e)}")
+ self._logAuditEvent("AUDIT_LOG_SAVE", "FAILED", f"Failed to save audit log: {str(e)}")
return False
def getSyncFileName(self) -> str:
@@ -246,12 +246,12 @@ class ManagerSyncDelta:
"""Perform Tickets to SharePoint synchronization using list-based interface and local CSV/XLSX handling."""
try:
logger.info(f"Starting JIRA to SharePoint synchronization (Mode: {self.SYNC_MODE})")
- self._log_audit_event("SYNC_START", "INFO", f"Starting JIRA to SharePoint sync (Mode: {self.SYNC_MODE})")
+ self._logAuditEvent("SYNC_START", "INFO", f"Starting JIRA to SharePoint sync (Mode: {self.SYNC_MODE})")
# Initialize interface
if not await self.initializeInterface():
logger.error("Failed to initialize connectors")
- self._log_audit_event("SYNC_INTERFACE", "FAILED", "Failed to initialize connectors")
+ self._logAuditEvent("SYNC_INTERFACE", "FAILED", "Failed to initialize connectors")
return False
# Dump current Jira fields to text file for reference
@@ -287,7 +287,7 @@ class ManagerSyncDelta:
if self.SYNC_MODE == "xlsx":
# Export tickets to list
data_list = await sync_interface.exportTicketsAsList()
- self._log_audit_event("SYNC_EXPORT", "INFO", f"Exported {len(data_list)} tickets from JIRA")
+ self._logAuditEvent("SYNC_EXPORT", "INFO", f"Exported {len(data_list)} tickets from JIRA")
# Read existing Excel headers/content
existing_data = []
existing_headers = {"header1": "Header 1", "header2": "Header 2"}
@@ -303,7 +303,7 @@ class ManagerSyncDelta:
merged_data, merge_details = self.mergeJiraWithExistingDetailed(data_list, existing_data)
# Log detailed changes for Excel mode
- self._log_sync_changes(merge_details, "EXCEL")
+ self._logSyncChanges(merge_details, "EXCEL")
await self.backupSharepointFile(filename=sync_file_name)
excel_bytes = self.createExcelContent(merged_data, existing_headers)
@@ -319,15 +319,15 @@ class ManagerSyncDelta:
siteId=self.targetSite['id'], filePath=file_path
)
excel_rows, _ = self.parseExcelContent(excel_content)
- self._log_audit_event("SYNC_IMPORT", "INFO", f"Importing {len(excel_rows)} Excel rows back to tickets")
+ self._logAuditEvent("SYNC_IMPORT", "INFO", f"Importing {len(excel_rows)} Excel rows back to tickets")
except Exception as e:
excel_rows = []
- self._log_audit_event("SYNC_IMPORT", "WARNING", f"Failed to download Excel for import: {str(e)}")
+ self._logAuditEvent("SYNC_IMPORT", "WARNING", f"Failed to download Excel for import: {str(e)}")
await sync_interface.importListToTickets(excel_rows)
else: # CSV mode (default)
# Export tickets to list
data_list = await sync_interface.exportTicketsAsList()
- self._log_audit_event("SYNC_EXPORT", "INFO", f"Exported {len(data_list)} tickets from JIRA")
+ self._logAuditEvent("SYNC_EXPORT", "INFO", f"Exported {len(data_list)} tickets from JIRA")
# Prepare headers by reading existing CSV if present
existing_headers = {"header1": "Header 1", "header2": "Header 2"}
existing_data: list[dict] = []
@@ -361,26 +361,26 @@ class ManagerSyncDelta:
)
df = pd.read_csv(io.BytesIO(csv_content), skiprows=2, quoting=1, escapechar='\\', on_bad_lines='skip', engine='python')
csv_rows = df.to_dict('records')
- self._log_audit_event("SYNC_IMPORT", "INFO", f"Importing {len(csv_rows)} CSV rows back to tickets")
+ self._logAuditEvent("SYNC_IMPORT", "INFO", f"Importing {len(csv_rows)} CSV rows back to tickets")
except Exception as e:
csv_rows = []
- self._log_audit_event("SYNC_IMPORT", "WARNING", f"Failed to download CSV for import: {str(e)}")
+ self._logAuditEvent("SYNC_IMPORT", "WARNING", f"Failed to download CSV for import: {str(e)}")
await sync_interface.importListToTickets(csv_rows)
logger.info(f"JIRA to SharePoint synchronization completed successfully (Mode: {self.SYNC_MODE})")
- self._log_audit_event("SYNC_COMPLETE", "SUCCESS", f"JIRA to SharePoint sync completed successfully (Mode: {self.SYNC_MODE})")
+ self._logAuditEvent("SYNC_COMPLETE", "SUCCESS", f"JIRA to SharePoint sync completed successfully (Mode: {self.SYNC_MODE})")
# Save audit log to SharePoint
- await self._save_audit_log_to_sharepoint()
+ await self._saveAuditLogToSharepoint()
return True
except Exception as e:
logger.error(f"Error during JIRA to SharePoint synchronization: {str(e)}")
- self._log_audit_event("SYNC_ERROR", "FAILED", f"Error during sync: {str(e)}")
+ self._logAuditEvent("SYNC_ERROR", "FAILED", f"Error during sync: {str(e)}")
# Save audit log to SharePoint even on error
- await self._save_audit_log_to_sharepoint()
+ await self._saveAuditLogToSharepoint()
return False
@@ -395,14 +395,14 @@ class ManagerSyncDelta:
destFolder=self.SHAREPOINT_BACKUP_FOLDER,
destFile=backup_filename,
)
- self._log_audit_event("SYNC_BACKUP", "SUCCESS", f"Backed up file: {filename} -> {backup_filename}")
+ self._logAuditEvent("SYNC_BACKUP", "SUCCESS", f"Backed up file: {filename} -> {backup_filename}")
return True
except Exception as e:
if "itemNotFound" in str(e) or "404" in str(e):
- self._log_audit_event("SYNC_BACKUP", "SKIPPED", f"File not found for backup: {filename}")
+ self._logAuditEvent("SYNC_BACKUP", "SKIPPED", f"File not found for backup: {filename}")
return True
logger.warning(f"Backup failed: {e}")
- self._log_audit_event("SYNC_BACKUP", "FAILED", f"Backup failed for {filename}: {str(e)}")
+ self._logAuditEvent("SYNC_BACKUP", "FAILED", f"Backup failed for {filename}: {str(e)}")
return False
def mergeJiraWithExistingDetailed(self, jira_data: list[dict], existing_data: list[dict]) -> tuple[list[dict], dict]:
@@ -536,18 +536,18 @@ class ManagerSyncDelta:
if not isinstance(content, list):
return ""
- def extract_text_from_content(content_list, list_level=0):
+ def extractTextFromContent(contentList, listLevel=0):
"""Recursively extract text from ADF content with proper formatting."""
- text_parts = []
- list_counter = 1
+ textParts = []
+ listCounter = 1
- for item in content_list:
+ for item in contentList:
if not isinstance(item, dict):
continue
- item_type = item.get("type", "")
+ itemType = item.get("type", "")
- if item_type == "text":
+ if itemType == "text":
# Extract text content, preserving formatting
text = item.get("text", "")
marks = item.get("marks", [])
@@ -567,99 +567,99 @@ class ManagerSyncDelta:
if href:
text = f"[{text}]({href})"
- text_parts.append(text)
+ textParts.append(text)
- elif item_type == "hardBreak":
- text_parts.append("\n")
+ elif itemType == "hardBreak":
+ textParts.append("\n")
- elif item_type == "paragraph":
- paragraph_content = item.get("content", [])
- if paragraph_content:
- paragraph_text = extract_text_from_content(paragraph_content, list_level)
- if paragraph_text.strip():
- text_parts.append(paragraph_text)
+ elif itemType == "paragraph":
+ paragraphContent = item.get("content", [])
+ if paragraphContent:
+ paragraphText = extractTextFromContent(paragraphContent, listLevel)
+ if paragraphText.strip():
+ textParts.append(paragraphText)
- elif item_type == "bulletList":
- list_content = item.get("content", [])
- for list_item in list_content:
- if list_item.get("type") == "listItem":
- list_item_content = list_item.get("content", [])
- for list_paragraph in list_item_content:
- if list_paragraph.get("type") == "paragraph":
- list_paragraph_content = list_paragraph.get("content", [])
- if list_paragraph_content:
- indent = " " * list_level
- bullet_text = extract_text_from_content(list_paragraph_content, list_level + 1)
- if bullet_text.strip():
- text_parts.append(f"{indent}• {bullet_text}")
+ elif itemType == "bulletList":
+ listContent = item.get("content", [])
+ for listItem in listContent:
+ if listItem.get("type") == "listItem":
+ listItemContent = listItem.get("content", [])
+ for listParagraph in listItemContent:
+ if listParagraph.get("type") == "paragraph":
+ listParagraphContent = listParagraph.get("content", [])
+ if listParagraphContent:
+ indent = " " * listLevel
+ bulletText = extractTextFromContent(listParagraphContent, listLevel + 1)
+ if bulletText.strip():
+ textParts.append(f"{indent}• {bulletText}")
- elif item_type == "orderedList":
- list_content = item.get("content", [])
- for list_item in list_content:
- if list_item.get("type") == "listItem":
- list_item_content = list_item.get("content", [])
- for list_paragraph in list_item_content:
- if list_paragraph.get("type") == "paragraph":
- list_paragraph_content = list_paragraph.get("content", [])
- if list_paragraph_content:
- indent = " " * list_level
- ordered_text = extract_text_from_content(list_paragraph_content, list_level + 1)
- if ordered_text.strip():
- text_parts.append(f"{indent}{list_counter}. {ordered_text}")
- list_counter += 1
+ elif itemType == "orderedList":
+ listContent = item.get("content", [])
+ for listItem in listContent:
+ if listItem.get("type") == "listItem":
+ listItemContent = listItem.get("content", [])
+ for listParagraph in listItemContent:
+ if listParagraph.get("type") == "paragraph":
+ listParagraphContent = listParagraph.get("content", [])
+ if listParagraphContent:
+ indent = " " * listLevel
+ orderedText = extractTextFromContent(listParagraphContent, listLevel + 1)
+ if orderedText.strip():
+ textParts.append(f"{indent}{listCounter}. {orderedText}")
+ listCounter += 1
- elif item_type == "listItem":
+ elif itemType == "listItem":
# Handle nested list items
- list_item_content = item.get("content", [])
- if list_item_content:
- text_parts.append(extract_text_from_content(list_item_content, list_level))
+ listItemContent = item.get("content", [])
+ if listItemContent:
+ textParts.append(extractTextFromContent(listItemContent, listLevel))
- elif item_type == "embedCard":
+ elif itemType == "embedCard":
# Handle embedded content (videos, etc.)
attrs = item.get("attrs", {})
url = attrs.get("url", "")
if url:
- text_parts.append(f"[Embedded Content: {url}]")
+ textParts.append(f"[Embedded Content: {url}]")
- elif item_type == "codeBlock":
+ elif itemType == "codeBlock":
# Handle code blocks
- code_content = item.get("content", [])
- if code_content:
- code_text = extract_text_from_content(code_content, list_level)
- if code_text.strip():
- text_parts.append(f"```\n{code_text}\n```")
+ codeContent = item.get("content", [])
+ if codeContent:
+ codeText = extractTextFromContent(codeContent, listLevel)
+ if codeText.strip():
+ textParts.append(f"```\n{codeText}\n```")
- elif item_type == "blockquote":
+ elif itemType == "blockquote":
# Handle blockquotes
- quote_content = item.get("content", [])
- if quote_content:
- quote_text = extract_text_from_content(quote_content, list_level)
- if quote_text.strip():
- text_parts.append(f"> {quote_text}")
+ quoteContent = item.get("content", [])
+ if quoteContent:
+ quoteText = extractTextFromContent(quoteContent, listLevel)
+ if quoteText.strip():
+ textParts.append(f"> {quoteText}")
- elif item_type == "heading":
+ elif itemType == "heading":
# Handle headings
- heading_content = item.get("content", [])
- if heading_content:
- heading_text = extract_text_from_content(heading_content, list_level)
- if heading_text.strip():
+ headingContent = item.get("content", [])
+ if headingContent:
+ headingText = extractTextFromContent(headingContent, listLevel)
+ if headingText.strip():
level = item.get("attrs", {}).get("level", 1)
- text_parts.append(f"{'#' * level} {heading_text}")
+ textParts.append(f"{'#' * level} {headingText}")
- elif item_type == "rule":
+ elif itemType == "rule":
# Handle horizontal rules
- text_parts.append("---")
+ textParts.append("---")
else:
# Handle unknown types by trying to extract content
if "content" in item:
- content_text = extract_text_from_content(item.get("content", []), list_level)
- if content_text.strip():
- text_parts.append(content_text)
+ contentText = extractTextFromContent(item.get("content", []), listLevel)
+ if contentText.strip():
+ textParts.append(contentText)
- return "\n".join(text_parts)
+ return "\n".join(textParts)
- result = extract_text_from_content(content)
+ result = extractTextFromContent(content)
return result.strip()
# Utility: dump all ticket fields (name -> field id) to a text file (generic)
@@ -800,7 +800,7 @@ def startSyncManager(eventUser):
if _sync_manager.APP_ENV_TYPE == "prod":
_sync_manager.services.utils.eventRegisterCron(
job_id="syncDelta.syncTicket",
- func=scheduled_sync,
+ func=scheduledSync,
cron_kwargs={"minute": "0,20,40"},
replace_existing=True,
coalesce=True,
@@ -814,7 +814,7 @@ def startSyncManager(eventUser):
logger.error(f"Failed to register scheduler for DG sync: {str(e)}")
return _sync_manager
-async def scheduled_sync():
+async def scheduledSync():
"""Scheduled sync function that uses the global sync manager."""
try:
global _sync_manager
diff --git a/modules/interfaces/interfaceVoiceObjects.py b/modules/interfaces/interfaceVoiceObjects.py
index 2bb1b729..e83ef77d 100644
--- a/modules/interfaces/interfaceVoiceObjects.py
+++ b/modules/interfaces/interfaceVoiceObjects.py
@@ -74,10 +74,10 @@ class VoiceObjects:
logger.info(f"🎤 Speech-to-text request: {len(audioContent)} bytes, language: {language}")
connector = self._getGoogleSpeechConnector()
- result = await connector.speech_to_text(
- audio_content=audioContent,
+ result = await connector.speechToText(
+ audioContent=audioContent,
language=language,
- sample_rate=sampleRate,
+ sampleRate=sampleRate,
channels=channels
)
@@ -123,10 +123,10 @@ class VoiceObjects:
}
connector = self._getGoogleSpeechConnector()
- result = await connector.translate_text(
+ result = await connector.translateText(
text=text,
- source_language=sourceLanguage,
- target_language=targetLanguage
+ sourceLanguage=sourceLanguage,
+ targetLanguage=targetLanguage
)
if result["success"]:
@@ -164,10 +164,10 @@ class VoiceObjects:
logger.info(f"🔄 Speech-to-translation pipeline: {fromLanguage} -> {toLanguage}")
connector = self._getGoogleSpeechConnector()
- result = await connector.speech_to_translated_text(
- audio_content=audioContent,
- from_language=fromLanguage,
- to_language=toLanguage
+ result = await connector.speechToTranslatedText(
+ audioContent=audioContent,
+ fromLanguage=fromLanguage,
+ toLanguage=toLanguage
)
if result["success"]:
@@ -213,10 +213,10 @@ class VoiceObjects:
}
connector = self._getGoogleSpeechConnector()
- result = await connector.text_to_speech(
+ result = await connector.textToSpeech(
text=text,
- language_code=languageCode,
- voice_name=voiceName
+ languageCode=languageCode,
+ voiceName=voiceName
)
if result["success"]:
@@ -356,7 +356,7 @@ class VoiceObjects:
logger.info("🌐 Getting available languages from Google Cloud TTS")
connector = self._getGoogleSpeechConnector()
- result = await connector.get_available_languages()
+ result = await connector.getAvailableLanguages()
if result["success"]:
logger.info(f"✅ Found {len(result['languages'])} available languages")
@@ -387,7 +387,7 @@ class VoiceObjects:
logger.info(f"🎤 Getting available voices, language filter: {languageCode}")
connector = self._getGoogleSpeechConnector()
- result = await connector.get_available_voices(language_code=languageCode)
+ result = await connector.getAvailableVoices(languageCode=languageCode)
if result["success"]:
logger.info(f"✅ Found {len(result['voices'])} voices for language filter: {languageCode}")
@@ -420,7 +420,7 @@ class VoiceObjects:
logger.debug(f"Validating audio format: {len(audioContent)} bytes")
connector = self._getGoogleSpeechConnector()
- result = connector.validate_audio_format(audioContent)
+ result = connector.validateAudioFormat(audioContent)
if result["valid"]:
logger.debug(f"✅ Audio validation successful: {result['format']}, {result['sample_rate']}Hz, {result['channels']}ch")
@@ -451,10 +451,10 @@ class VoiceObjects:
connector = self._getGoogleSpeechConnector()
# Test with a simple translation
- testResult = await connector.translate_text(
+ testResult = await connector.translateText(
text="Hello",
- source_language="en",
- target_language="de"
+ sourceLanguage="en",
+ targetLanguage="de"
)
if testResult["success"]:
diff --git a/modules/routes/routeDataConnections.py b/modules/routes/routeDataConnections.py
index eec8d140..684b95d2 100644
--- a/modules/routes/routeDataConnections.py
+++ b/modules/routes/routeDataConnections.py
@@ -23,62 +23,62 @@ from modules.shared.timezoneUtils import getUtcTimestamp
# Configure logger
logger = logging.getLogger(__name__)
-def get_token_status_for_connection(interface, connection_id: str) -> tuple[str, Optional[float]]:
+def getTokenStatusForConnection(interface, connectionId: str) -> tuple[str, Optional[float]]:
"""
Get token status and expiration for a connection.
Args:
interface: The database interface
- connection_id: The connection ID to check
+ connectionId: The connection ID to check
Returns:
- tuple: (token_status, token_expires_at)
- - token_status: 'active', 'expired', or 'none'
- - token_expires_at: UTC timestamp or None
+ tuple: (tokenStatus, tokenExpiresAt)
+ - tokenStatus: 'active', 'expired', or 'none'
+ - tokenExpiresAt: UTC timestamp or None
"""
try:
# Query tokens table for the latest token for this connection
tokens = interface.db.getRecordset(
Token,
- recordFilter={"connectionId": connection_id}
+ recordFilter={"connectionId": connectionId}
)
if not tokens:
return "none", None
# Find the most recent token (highest createdAt timestamp)
- latest_token = None
- latest_created_at = 0
+ latestToken = None
+ latestCreatedAt = 0
- for token_data in tokens:
- created_at = token_data.get("createdAt", 0)
- if created_at > latest_created_at:
- latest_created_at = created_at
- latest_token = token_data
+ for tokenData in tokens:
+ createdAt = tokenData.get("createdAt", 0)
+ if createdAt > latestCreatedAt:
+ latestCreatedAt = createdAt
+ latestToken = tokenData
- if not latest_token:
+ if not latestToken:
return "none", None
# Check if token is expired
- expires_at = latest_token.get("expiresAt")
- if not expires_at:
+ expiresAt = latestToken.get("expiresAt")
+ if not expiresAt:
return "none", None
- current_time = getUtcTimestamp()
+ currentTime = getUtcTimestamp()
# Add 5 minute buffer for proactive refresh
- buffer_time = 5 * 60 # 5 minutes in seconds
- if expires_at <= current_time:
- return "expired", expires_at
- elif expires_at <= (current_time + buffer_time):
+ bufferTime = 5 * 60 # 5 minutes in seconds
+ if expiresAt <= currentTime:
+ return "expired", expiresAt
+ elif expiresAt <= (currentTime + bufferTime):
# Token expires soon - mark as active but log for proactive refresh
- logger.debug(f"Token for connection {connection_id} expires soon (in {expires_at - current_time} seconds)")
- return "active", expires_at
+ logger.debug(f"Token for connection {connectionId} expires soon (in {expiresAt - currentTime} seconds)")
+ return "active", expiresAt
else:
- return "active", expires_at
+ return "active", expiresAt
except Exception as e:
- logger.error(f"Error getting token status for connection {connection_id}: {str(e)}")
+ logger.error(f"Error getting token status for connection {connectionId}: {str(e)}")
return "none", None
router = APIRouter(
@@ -121,7 +121,7 @@ async def get_connections(
enhanced_connections = []
for connection in connections:
# Get token status for this connection
- token_status, token_expires_at = get_token_status_for_connection(interface, connection.id)
+ tokenStatus, tokenExpiresAt = getTokenStatusForConnection(interface, connection.id)
# Create enhanced connection with token status
enhanced_connection = UserConnection(
@@ -135,8 +135,8 @@ async def get_connections(
connectedAt=connection.connectedAt,
lastChecked=connection.lastChecked,
expiresAt=connection.expiresAt,
- tokenStatus=token_status,
- tokenExpiresAt=token_expires_at
+ tokenStatus=tokenStatus,
+ tokenExpiresAt=tokenExpiresAt
)
enhanced_connections.append(enhanced_connection)
@@ -252,9 +252,8 @@ async def update_connection(
# Update connection - models now handle timestamp serialization automatically
interface.db.recordModify(UserConnection, connectionId, connection.model_dump())
-
# Get token status for the updated connection
- token_status, token_expires_at = get_token_status_for_connection(interface, connectionId)
+ tokenStatus, tokenExpiresAt = getTokenStatusForConnection(interface, connectionId)
# Create enhanced connection with token status
enhanced_connection = UserConnection(
@@ -268,8 +267,8 @@ async def update_connection(
connectedAt=connection.connectedAt,
lastChecked=connection.lastChecked,
expiresAt=connection.expiresAt,
- tokenStatus=token_status,
- tokenExpiresAt=token_expires_at
+ tokenStatus=tokenStatus,
+ tokenExpiresAt=tokenExpiresAt
)
return enhanced_connection
diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py
index 87b56ceb..381b87fa 100644
--- a/modules/services/serviceAi/mainServiceAi.py
+++ b/modules/services/serviceAi/mainServiceAi.py
@@ -6,7 +6,6 @@ from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.interfaces.interfaceAiObjects import AiObjects
-from modules.services.serviceAi.subDocumentProcessing import SubDocumentProcessing
from modules.shared.jsonUtils import (
extractJsonString,
repairBrokenJson,
@@ -33,7 +32,6 @@ class AiService:
self.aiObjects = None # Will be initialized in create() or _ensureAiObjectsInitialized()
# Submodules initialized as None - will be set in _initializeSubmodules() after aiObjects is ready
self.extractionService = None
- self.documentProcessor = None
def _initializeSubmodules(self):
"""Initialize all submodules after aiObjects is ready."""
@@ -43,10 +41,6 @@ class AiService:
if self.extractionService is None:
logger.info("Initializing ExtractionService...")
self.extractionService = ExtractionService(self.services)
-
- if self.documentProcessor is None:
- logger.info("Initializing SubDocumentProcessing...")
- self.documentProcessor = SubDocumentProcessing(self.services, self.aiObjects)
async def _ensureAiObjectsInitialized(self):
"""Ensure aiObjects is initialized and submodules are ready."""
@@ -529,7 +523,7 @@ Respond with ONLY a JSON object in this exact format:
)
try:
- # Ensure AI connectors are initialized before delegating to documentProcessor/generator
+ # Ensure AI connectors are initialized before delegating to generator
if hasattr(self.services, 'ai') and hasattr(self.services.ai, '_ensureAiObjectsInitialized'):
await self.services.ai._ensureAiObjectsInitialized()
if options is None or (hasattr(options, 'operationType') and options.operationType is None):
@@ -562,7 +556,7 @@ Respond with ONLY a JSON object in this exact format:
# Use unified generation method for all document generation
if documents and len(documents) > 0:
self.services.workflow.progressLogUpdate(aiOperationId, 0.2, f"Extracting content from {len(documents)} documents")
- extracted_content = await self.documentProcessor.callAiText(prompt, documents, options, aiOperationId)
+ extracted_content = await self.callAiText(prompt, documents, options, aiOperationId)
else:
self.services.workflow.progressLogUpdate(aiOperationId, 0.2, "Preparing for direct generation")
extracted_content = None
@@ -649,7 +643,7 @@ Respond with ONLY a JSON object in this exact format:
self.services.workflow.progressLogUpdate(aiOperationId, 0.5, "Processing text call")
if documents:
# Use document processing for text calls with documents
- result = await self.documentProcessor.callAiText(prompt, documents, options, aiOperationId)
+ result = await self.callAiText(prompt, documents, options, aiOperationId)
else:
# Use shared core function for direct text calls
result = await self._callAiWithLooping(prompt, options, "text", None, None, aiOperationId)
@@ -758,3 +752,17 @@ Respond with ONLY a JSON object in this exact format:
logger.error(f"Error in AI image generation: {str(e)}")
return {"success": False, "error": str(e)}
+ async def callAiText(
+ self,
+ prompt: str,
+ documents: Optional[List[ChatDocument]],
+ options: AiCallOptions,
+ operationId: Optional[str] = None
+ ) -> str:
+ """
+ Handle text calls with document processing through ExtractionService.
+ UNIFIED PROCESSING: Always use per-chunk processing for consistency.
+ """
+ await self._ensureAiObjectsInitialized()
+ return await self.extractionService.processDocumentsPerChunk(documents, prompt, self.aiObjects, options, operationId)
+
diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py
deleted file mode 100644
index 2903c066..00000000
--- a/modules/services/serviceAi/subDocumentProcessing.py
+++ /dev/null
@@ -1,463 +0,0 @@
-import json
-import logging
-import re
-import time
-from typing import Dict, Any, List, Optional
-from modules.datamodels.datamodelChat import ChatDocument
-from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum
-from modules.datamodels.datamodelExtraction import ContentExtracted, PartResult, ExtractionOptions, MergeStrategy
-# Resolve forward refs for ExtractionOptions (OperationTypeEnum) at runtime without using unsupported args
-try:
- # Import here to avoid circular import at module load time
- from modules.datamodels.datamodelAi import OperationTypeEnum
- # Provide parent namespace so Pydantic can resolve forward refs
- ExtractionOptions.__pydantic_parent_namespace__ = {"OperationTypeEnum": OperationTypeEnum}
- ExtractionOptions.model_rebuild()
-except Exception as _e:
- logging.getLogger(__name__).warning(f"ExtractionOptions forward-ref rebuild skipped: {_e}")
-from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
-
-logger = logging.getLogger(__name__)
-
-
-class SubDocumentProcessing:
- """Document processing operations including chunking, processing, and merging."""
-
- def __init__(self, services, aiObjects):
- """Initialize document processing service.
-
- Args:
- services: Service center instance for accessing other services
- aiObjects: Initialized AiObjects instance
- """
- self.services = services
- self.aiObjects = aiObjects
- self._extractionService = None
-
- @property
- def extractionService(self):
- """Lazy initialization of extraction service."""
- if self._extractionService is None:
- logger.info("Lazy initializing ExtractionService...")
- self._extractionService = ExtractionService(self.services)
- return self._extractionService
-
-
- async def processDocumentsPerChunk(
- self,
- documents: List[ChatDocument],
- prompt: str,
- options: Optional[AiCallOptions] = None,
- operationId: Optional[str] = None
- ) -> str:
- """
- Process documents with model-aware chunking and merge results.
- NEW: Uses model-aware chunking in AI call phase instead of extraction phase.
-
- Args:
- documents: List of ChatDocument objects to process
- prompt: AI prompt for processing
- options: AI call options
- operationId: Optional operation ID for progress tracking
-
- Returns:
- Merged AI results as string with preserved document structure
- """
- if not documents:
- return ""
-
- # Create operationId if not provided
- if not operationId:
- import time
- workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
- operationId = f"ai_text_extract_{workflowId}_{int(time.time())}"
- self.services.workflow.progressLogStart(
- operationId,
- "AI Text Extract",
- "Document Processing",
- f"Processing {len(documents)} documents"
- )
-
- try:
- # Build extraction options using Pydantic model
- mergeStrategy = MergeStrategy(
- useIntelligentMerging=True,
- prompt=prompt,
- groupBy="typeGroup",
- orderBy="id",
- mergeType="concatenate"
- )
-
- extractionOptions = ExtractionOptions(
- prompt=prompt,
- operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT,
- processDocumentsIndividually=True,
- mergeStrategy=mergeStrategy
- )
-
- logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}")
-
- # Extract content WITHOUT chunking
- if operationId:
- self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents")
- extractionResult = self.extractionService.extractContent(documents, extractionOptions)
-
- if not isinstance(extractionResult, list):
- if operationId:
- self.services.workflow.progressLogFinish(operationId, False)
- return "[Error: No extraction results]"
-
- # Process parts (not chunks) with model-aware AI calls
- if operationId:
- self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts")
- partResults = await self._processPartsWithMapping(extractionResult, prompt, options, operationId)
-
- # Merge results using existing merging system
- if operationId:
- self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results")
- mergedContent = self._mergePartResults(partResults, options)
-
- # Save merged extraction content to debug
- self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text")
-
- if operationId:
- self.services.workflow.progressLogFinish(operationId, True)
-
- return mergedContent
- except Exception as e:
- logger.error(f"Error in processDocumentsPerChunk: {str(e)}")
- if operationId:
- self.services.workflow.progressLogFinish(operationId, False)
- raise
-
- async def callAiText(
- self,
- prompt: str,
- documents: Optional[List[ChatDocument]],
- options: AiCallOptions,
- operationId: Optional[str] = None
- ) -> str:
- """
- Handle text calls with document processing through ExtractionService.
- UNIFIED PROCESSING: Always use per-chunk processing for consistency.
- """
- return await self.processDocumentsPerChunk(documents, prompt, options, operationId)
-
- async def _processPartsWithMapping(
- self,
- extractionResult: List[ContentExtracted],
- prompt: str,
- options: Optional[AiCallOptions] = None,
- operationId: Optional[str] = None
- ) -> List['PartResult']:
- """Process content parts with model-aware chunking and proper mapping."""
- from modules.datamodels.datamodelExtraction import PartResult
- import asyncio
-
- # Collect all parts that need processing
- partsToProcess = []
- partIndex = 0
-
- for ec in extractionResult:
- for part in ec.parts:
- if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
- # Skip empty container parts
- if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
- logger.debug(f"Skipping empty container part: mimeType={part.mimeType}")
- continue
-
- partsToProcess.append({
- 'part': part,
- 'part_index': partIndex,
- 'document_id': ec.id
- })
- partIndex += 1
-
- logger.info(f"Processing {len(partsToProcess)} parts with model-aware chunking")
-
- totalParts = len(partsToProcess)
-
- # Process parts in parallel
- processedCount = [0] # Use list to allow modification in nested function
-
- async def processSinglePart(partInfo: Dict) -> PartResult:
- part = partInfo['part']
- part_index = partInfo['part_index']
- documentId = partInfo['document_id']
-
- start_time = time.time()
-
- try:
- # Create AI call request with content part
- from modules.datamodels.datamodelAi import AiCallRequest
- request = AiCallRequest(
- prompt=prompt,
- context="", # Context is in the content part
- options=options,
- contentParts=[part] # Pass as list for unified processing
- )
-
- # Update progress before AI call
- if operationId and totalParts > 0:
- processedCount[0] += 1
- progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9
- self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}")
-
- # Call AI with model-aware chunking
- response = await self.aiObjects.call(request)
-
- processing_time = time.time() - start_time
-
- return PartResult(
- originalPart=part,
- aiResult=response.content,
- partIndex=part_index,
- documentId=documentId,
- processingTime=processing_time,
- metadata={
- "success": True,
- "partSize": len(part.data) if part.data else 0,
- "resultSize": len(response.content),
- "typeGroup": part.typeGroup,
- "modelName": response.modelName,
- "priceUsd": response.priceUsd
- }
- )
-
- except Exception as e:
- processing_time = time.time() - start_time
- logger.warning(f"Error processing part {part_index}: {str(e)}")
-
- return PartResult(
- originalPart=part,
- aiResult=f"[Error processing part: {str(e)}]",
- partIndex=part_index,
- documentId=documentId,
- processingTime=processing_time,
- metadata={
- "success": False,
- "error": str(e),
- "partSize": len(part.data) if part.data else 0,
- "typeGroup": part.typeGroup
- }
- )
-
- # Process parts with concurrency control
- maxConcurrent = 5
- if options and hasattr(options, 'maxConcurrentParts'):
- maxConcurrent = options.maxConcurrentParts
-
- semaphore = asyncio.Semaphore(maxConcurrent)
-
- async def processWithSemaphore(partInfo):
- async with semaphore:
- return await processSinglePart(partInfo)
-
- tasks = [processWithSemaphore(part_info) for part_info in partsToProcess]
- partResults = await asyncio.gather(*tasks, return_exceptions=True)
-
- # Handle exceptions
- processedResults = []
- for i, result in enumerate(partResults):
- if isinstance(result, Exception):
- part_info = partsToProcess[i]
- processedResults.append(PartResult(
- originalPart=part_info['part'],
- aiResult=f"[Error in parallel processing: {str(result)}]",
- partIndex=part_info['part_index'],
- documentId=part_info['document_id'],
- processingTime=0.0,
- metadata={"success": False, "error": str(result)}
- ))
- elif result is not None:
- processedResults.append(result)
-
- logger.info(f"Completed processing {len(processedResults)} parts")
- return processedResults
-
- def _mergePartResults(
- self,
- partResults: List['PartResult'],
- options: Optional[AiCallOptions] = None
- ) -> str:
- """Merge part results using existing sophisticated merging system."""
- if not partResults:
- return ""
-
- # Convert PartResults back to ContentParts for existing merger system
- from modules.datamodels.datamodelExtraction import ContentPart
- content_parts = []
- for part_result in partResults:
- # Create ContentPart from PartResult with proper typeGroup
- content_part = ContentPart(
- id=part_result.originalPart.id,
- parentId=part_result.originalPart.parentId,
- label=part_result.originalPart.label,
- typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup
- mimeType=part_result.originalPart.mimeType,
- data=part_result.aiResult, # Use AI result as data
- metadata={
- **part_result.originalPart.metadata,
- "aiResult": True,
- "partIndex": part_result.partIndex,
- "documentId": part_result.documentId,
- "processingTime": part_result.processingTime,
- "success": part_result.metadata.get("success", False)
- }
- )
- content_parts.append(content_part)
-
- # Use existing merging strategy from options
- merge_strategy = MergeStrategy(
- useIntelligentMerging=True,
- groupBy="documentId", # Group by document
- orderBy="partIndex", # Order by part index
- mergeType="concatenate"
- )
-
-
- # Apply existing merging logic using the sophisticated merging system
- from modules.services.serviceExtraction.subPipeline import _applyMerging
- merged_parts = _applyMerging(content_parts, merge_strategy)
-
- # Convert merged parts back to final string
- final_content = "\n\n".join([part.data for part in merged_parts])
-
- logger.info(f"Merged {len(partResults)} parts using existing sophisticated merging system")
- return final_content.strip()
-
- def _convertPartResultsToJson(
- self,
- partResults: List['PartResult'],
- options: Optional[AiCallOptions] = None
- ) -> Dict[str, Any]:
- """Convert part results to JSON format using existing sophisticated merging system."""
- if not partResults:
- return {"metadata": {"title": "Empty Document"}, "sections": []}
-
- # Convert PartResults back to ContentParts for existing merger system
- from modules.datamodels.datamodelExtraction import ContentPart
- content_parts = []
- for part_result in partResults:
- # Create ContentPart from PartResult with proper typeGroup
- content_part = ContentPart(
- id=part_result.originalPart.id,
- parentId=part_result.originalPart.parentId,
- label=part_result.originalPart.label,
- typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup
- mimeType=part_result.originalPart.mimeType,
- data=part_result.aiResult, # Use AI result as data
- metadata={
- **part_result.originalPart.metadata,
- "aiResult": True,
- "partIndex": part_result.partIndex,
- "documentId": part_result.documentId,
- "processingTime": part_result.processingTime,
- "success": part_result.metadata.get("success", False)
- }
- )
- content_parts.append(content_part)
-
- # Use existing merging strategy for JSON mode
- merge_strategy = MergeStrategy(
- useIntelligentMerging=True,
- groupBy="documentId", # Group by document
- orderBy="partIndex", # Order by part index
- mergeType="concatenate"
- )
-
-
- # Apply existing merging logic using the sophisticated merging system
- from modules.services.serviceExtraction.subPipeline import _applyMerging
- merged_parts = _applyMerging(content_parts, merge_strategy)
-
- # Convert merged parts to JSON format
- all_sections = []
- document_titles = []
-
- for part in merged_parts:
- if part.metadata.get("success", False):
- try:
- # Parse JSON from AI result
- part_json = json.loads(part.data)
-
- # Check if this is a multi-file response (has "documents" key)
- if isinstance(part_json, dict) and "documents" in part_json:
- # This is a multi-file response - merge all documents
- logger.debug(f"Processing multi-file response from part {part.id} with {len(part_json['documents'])} documents")
-
- # Return multi-file response directly
- return {
- "metadata": part_json.get("metadata", {"title": "Merged Document"}),
- "documents": part_json["documents"]
- }
-
- # Extract sections from single-file response
- elif isinstance(part_json, dict) and "sections" in part_json:
- for section in part_json["sections"]:
- # Add part context to section
- section["metadata"] = section.get("metadata", {})
- section["metadata"]["source_part"] = part.id
- section["metadata"]["source_document"] = part.metadata.get("documentId", "unknown")
- section["metadata"]["part_index"] = part.metadata.get("partIndex", 0)
- all_sections.append(section)
-
- # Extract document title
- if isinstance(part_json, dict) and "metadata" in part_json:
- title = part_json["metadata"].get("title", "")
- if title and title not in document_titles:
- document_titles.append(title)
-
- except json.JSONDecodeError as e:
- logger.warning(f"Failed to parse JSON from part {part.id}: {str(e)}")
- # Create a fallback section for invalid JSON
- fallback_section = {
- "id": f"error_section_{part.id}",
- "title": "Error Section",
- "content_type": "paragraph",
- "elements": [{
- "text": f"Error parsing part {part.id}: {str(e)}"
- }],
- "order": part.metadata.get("partIndex", 0),
- "metadata": {
- "source_document": part.metadata.get("documentId", "unknown"),
- "part_id": part.id,
- "error": str(e)
- }
- }
- all_sections.append(fallback_section)
- else:
- # Handle error parts
- error_section = {
- "id": f"error_section_{part.id}",
- "title": "Error Section",
- "content_type": "paragraph",
- "elements": [{
- "text": f"Error in part {part.id}: {part.metadata.get('error', 'Unknown error')}"
- }],
- "order": part.metadata.get("partIndex", 0),
- "metadata": {
- "source_document": part.metadata.get("documentId", "unknown"),
- "part_id": part.id,
- "error": part.metadata.get('error', 'Unknown error')
- }
- }
- all_sections.append(error_section)
-
- # Sort sections by order
- all_sections.sort(key=lambda x: x.get("order", 0))
-
- # Create merged document with sections
- merged_document = {
- "metadata": {
- "title": document_titles[0] if document_titles else "Merged Document",
- "extraction_method": "model_aware_chunking_with_merging",
- "version": "2.0"
- },
- "sections": all_sections,
- "summary": f"Merged document using sophisticated merging system",
- "tags": ["merged", "ai_generated", "model_aware", "sophisticated_merging"]
- }
-
- logger.info(f"Converted {len(partResults)} parts to JSON format using existing sophisticated merging system")
- return merged_document
diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py
index 1e0c1d21..b61cd1cc 100644
--- a/modules/services/serviceExtraction/mainServiceExtraction.py
+++ b/modules/services/serviceExtraction/mainServiceExtraction.py
@@ -2,12 +2,13 @@ from typing import Any, Dict, List, Optional, Union
import uuid
import logging
import time
+import asyncio
from .subRegistry import ExtractorRegistry, ChunkerRegistry
-from .subPipeline import runExtraction
-from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy, ExtractionOptions
+from .subPipeline import runExtraction, _applyMerging
+from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy, ExtractionOptions, PartResult
from modules.datamodels.datamodelChat import ChatDocument
-from modules.datamodels.datamodelAi import AiCallResponse
+from modules.datamodels.datamodelAi import AiCallResponse, AiCallRequest, AiCallOptions, OperationTypeEnum
from modules.aicore.aicoreModelRegistry import modelRegistry
@@ -389,4 +390,271 @@ class ExtractionService:
# This could be enhanced with structure-aware merging
return self._mergeConcatenate(structureParts, aiResultParts, strategy)
+ async def processDocumentsPerChunk(
+ self,
+ documents: List[ChatDocument],
+ prompt: str,
+ aiObjects: Any,
+ options: Optional[AiCallOptions] = None,
+ operationId: Optional[str] = None
+ ) -> str:
+ """
+ Process documents with model-aware chunking and merge results.
+ NEW: Uses model-aware chunking in AI call phase instead of extraction phase.
+
+ Args:
+ documents: List of ChatDocument objects to process
+ prompt: AI prompt for processing
+ aiObjects: AiObjects instance for making AI calls
+ options: AI call options
+ operationId: Optional operation ID for progress tracking
+
+ Returns:
+ Merged AI results as string with preserved document structure
+ """
+ if not documents:
+ return ""
+
+ # Create operationId if not provided
+ if not operationId:
+ workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
+ operationId = f"ai_text_extract_{workflowId}_{int(time.time())}"
+ self.services.workflow.progressLogStart(
+ operationId,
+ "AI Text Extract",
+ "Document Processing",
+ f"Processing {len(documents)} documents"
+ )
+
+ try:
+ # Build extraction options using Pydantic model
+ mergeStrategy = MergeStrategy(
+ useIntelligentMerging=True,
+ prompt=prompt,
+ groupBy="typeGroup",
+ orderBy="id",
+ mergeType="concatenate"
+ )
+
+ extractionOptions = ExtractionOptions(
+ prompt=prompt,
+ operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT,
+ processDocumentsIndividually=True,
+ mergeStrategy=mergeStrategy
+ )
+
+ logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}")
+
+ # Extract content WITHOUT chunking
+ if operationId:
+ self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents")
+ extractionResult = self.extractContent(documents, extractionOptions)
+
+ if not isinstance(extractionResult, list):
+ if operationId:
+ self.services.workflow.progressLogFinish(operationId, False)
+ return "[Error: No extraction results]"
+
+ # Process parts (not chunks) with model-aware AI calls
+ if operationId:
+ self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts")
+ partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId)
+
+ # Merge results using existing merging system
+ if operationId:
+ self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results")
+ mergedContent = self._mergePartResults(partResults, options)
+
+ # Save merged extraction content to debug
+ self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text")
+
+ if operationId:
+ self.services.workflow.progressLogFinish(operationId, True)
+
+ return mergedContent
+ except Exception as e:
+ logger.error(f"Error in processDocumentsPerChunk: {str(e)}")
+ if operationId:
+ self.services.workflow.progressLogFinish(operationId, False)
+ raise
+
+ async def _processPartsWithMapping(
+ self,
+ extractionResult: List[ContentExtracted],
+ prompt: str,
+ aiObjects: Any,
+ options: Optional[AiCallOptions] = None,
+ operationId: Optional[str] = None
+ ) -> List[PartResult]:
+ """Process content parts with model-aware chunking and proper mapping."""
+
+ # Collect all parts that need processing
+ partsToProcess = []
+ partIndex = 0
+
+ for ec in extractionResult:
+ for part in ec.parts:
+ if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
+ # Skip empty container parts
+ if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
+ logger.debug(f"Skipping empty container part: mimeType={part.mimeType}")
+ continue
+
+ partsToProcess.append({
+ 'part': part,
+ 'part_index': partIndex,
+ 'document_id': ec.id
+ })
+ partIndex += 1
+
+ logger.info(f"Processing {len(partsToProcess)} parts with model-aware chunking")
+
+ totalParts = len(partsToProcess)
+
+ # Process parts in parallel
+ processedCount = [0] # Use list to allow modification in nested function
+
+ async def processSinglePart(partInfo: Dict) -> PartResult:
+ part = partInfo['part']
+ part_index = partInfo['part_index']
+ documentId = partInfo['document_id']
+
+ start_time = time.time()
+
+ try:
+ # Create AI call request with content part
+ request = AiCallRequest(
+ prompt=prompt,
+ context="", # Context is in the content part
+ options=options,
+ contentParts=[part] # Pass as list for unified processing
+ )
+
+ # Update progress before AI call
+ if operationId and totalParts > 0:
+ processedCount[0] += 1
+ progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9
+ self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}")
+
+ # Call AI with model-aware chunking
+ response = await aiObjects.call(request)
+
+ processing_time = time.time() - start_time
+
+ return PartResult(
+ originalPart=part,
+ aiResult=response.content,
+ partIndex=part_index,
+ documentId=documentId,
+ processingTime=processing_time,
+ metadata={
+ "success": True,
+ "partSize": len(part.data) if part.data else 0,
+ "resultSize": len(response.content),
+ "typeGroup": part.typeGroup,
+ "modelName": response.modelName,
+ "priceUsd": response.priceUsd
+ }
+ )
+
+ except Exception as e:
+ processing_time = time.time() - start_time
+ logger.warning(f"Error processing part {part_index}: {str(e)}")
+
+ return PartResult(
+ originalPart=part,
+ aiResult=f"[Error processing part: {str(e)}]",
+ partIndex=part_index,
+ documentId=documentId,
+ processingTime=processing_time,
+ metadata={
+ "success": False,
+ "error": str(e),
+ "partSize": len(part.data) if part.data else 0,
+ "typeGroup": part.typeGroup
+ }
+ )
+
+ # Process parts with concurrency control
+ maxConcurrent = 5
+ if options and hasattr(options, 'maxConcurrentParts'):
+ maxConcurrent = options.maxConcurrentParts
+
+ semaphore = asyncio.Semaphore(maxConcurrent)
+
+ async def processWithSemaphore(partInfo):
+ async with semaphore:
+ return await processSinglePart(partInfo)
+
+ tasks = [processWithSemaphore(part_info) for part_info in partsToProcess]
+ partResults = await asyncio.gather(*tasks, return_exceptions=True)
+
+ # Handle exceptions
+ processedResults = []
+ for i, result in enumerate(partResults):
+ if isinstance(result, Exception):
+ part_info = partsToProcess[i]
+ processedResults.append(PartResult(
+ originalPart=part_info['part'],
+ aiResult=f"[Error in parallel processing: {str(result)}]",
+ partIndex=part_info['part_index'],
+ documentId=part_info['document_id'],
+ processingTime=0.0,
+ metadata={"success": False, "error": str(result)}
+ ))
+ elif result is not None:
+ processedResults.append(result)
+
+ logger.info(f"Completed processing {len(processedResults)} parts")
+ return processedResults
+
+ def _mergePartResults(
+ self,
+ partResults: List[PartResult],
+ options: Optional[AiCallOptions] = None
+ ) -> str:
+ """Merge part results using existing sophisticated merging system."""
+ if not partResults:
+ return ""
+
+ # Convert PartResults back to ContentParts for existing merger system
+ content_parts = []
+ for part_result in partResults:
+ # Create ContentPart from PartResult with proper typeGroup
+ content_part = ContentPart(
+ id=part_result.originalPart.id,
+ parentId=part_result.originalPart.parentId,
+ label=part_result.originalPart.label,
+ typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup
+ mimeType=part_result.originalPart.mimeType,
+ data=part_result.aiResult, # Use AI result as data
+ metadata={
+ **part_result.originalPart.metadata,
+ "aiResult": True,
+ "partIndex": part_result.partIndex,
+ "documentId": part_result.documentId,
+ "processingTime": part_result.processingTime,
+ "success": part_result.metadata.get("success", False)
+ }
+ )
+ content_parts.append(content_part)
+
+ # Use existing merging strategy from options
+ merge_strategy = MergeStrategy(
+ useIntelligentMerging=True,
+ groupBy="documentId", # Group by document
+ orderBy="partIndex", # Order by part index
+ mergeType="concatenate"
+ )
+
+
+ # Apply existing merging logic using the sophisticated merging system
+ merged_parts = _applyMerging(content_parts, merge_strategy)
+
+ # Convert merged parts back to final string
+ final_content = "\n\n".join([part.data for part in merged_parts])
+
+ logger.info(f"Merged {len(partResults)} parts using existing sophisticated merging system")
+ return final_content.strip()
+
diff --git a/modules/services/serviceUtils/mainServiceUtils.py b/modules/services/serviceUtils/mainServiceUtils.py
index c8a78fea..86a3cdd7 100644
--- a/modules/services/serviceUtils/mainServiceUtils.py
+++ b/modules/services/serviceUtils/mainServiceUtils.py
@@ -36,14 +36,14 @@ class UtilsService:
misfire_grace_time: Grace time for misfired jobs in seconds
"""
try:
- eventManager.register_cron(
- job_id=job_id,
+ eventManager.registerCron(
+ jobId=job_id,
func=func,
- cron_kwargs=cron_kwargs,
- replace_existing=replace_existing,
+ cronKwargs=cron_kwargs,
+ replaceExisting=replace_existing,
coalesce=coalesce,
- max_instances=max_instances,
- misfire_grace_time=misfire_grace_time
+ maxInstances=max_instances,
+ misfireGraceTime=misfire_grace_time
)
logger.info(f"Registered cron job '{job_id}' with schedule: {cron_kwargs}")
except Exception as e:
@@ -68,16 +68,16 @@ class UtilsService:
misfire_grace_time: Grace time for misfired jobs in seconds
"""
try:
- eventManager.register_interval(
- job_id=job_id,
+ eventManager.registerInterval(
+ jobId=job_id,
func=func,
seconds=seconds,
minutes=minutes,
hours=hours,
- replace_existing=replace_existing,
+ replaceExisting=replace_existing,
coalesce=coalesce,
- max_instances=max_instances,
- misfire_grace_time=misfire_grace_time
+ maxInstances=max_instances,
+ misfireGraceTime=misfire_grace_time
)
logger.info(f"Registered interval job '{job_id}' (h={hours}, m={minutes}, s={seconds})")
except Exception as e:
diff --git a/modules/shared/eventManagement.py b/modules/shared/eventManagement.py
index 0804c156..cf2a5b82 100644
--- a/modules/shared/eventManagement.py
+++ b/modules/shared/eventManagement.py
@@ -43,47 +43,47 @@ class EventManagement:
except Exception as exc:
logger.error(f"Error stopping scheduler: {exc}")
- def register_cron(
+ def registerCron(
self,
- job_id: str,
+ jobId: str,
func: Callable,
*,
- cron_kwargs: Optional[Dict[str, Any]] = None,
- replace_existing: bool = True,
+ cronKwargs: Optional[Dict[str, Any]] = None,
+ replaceExisting: bool = True,
coalesce: bool = True,
- max_instances: int = 1,
- misfire_grace_time: int = 1800,
+ maxInstances: int = 1,
+ misfireGraceTime: int = 1800,
**kwargs: Any,
) -> None:
"""
Register a job using CronTrigger. Provide cron fields as keyword args, e.g.:
- cron_kwargs={"minute": "0,20,40"}
+ cronKwargs={"minute": "0,20,40"}
"""
- trigger = CronTrigger(timezone=self._timezone, **(cron_kwargs or {}))
+ trigger = CronTrigger(timezone=self._timezone, **(cronKwargs or {}))
self.scheduler.add_job(
func,
trigger,
- id=job_id,
- replace_existing=replace_existing,
+ id=jobId,
+ replace_existing=replaceExisting,
coalesce=coalesce,
- max_instances=max_instances,
- misfire_grace_time=misfire_grace_time,
+ max_instances=maxInstances,
+ misfire_grace_time=misfireGraceTime,
**kwargs,
)
- logger.info(f"Registered cron job '{job_id}' with args {cron_kwargs}")
+ logger.info(f"Registered cron job '{jobId}' with args {cronKwargs}")
- def register_interval(
+ def registerInterval(
self,
- job_id: str,
+ jobId: str,
func: Callable,
*,
seconds: Optional[int] = None,
minutes: Optional[int] = None,
hours: Optional[int] = None,
- replace_existing: bool = True,
+ replaceExisting: bool = True,
coalesce: bool = True,
- max_instances: int = 1,
- misfire_grace_time: int = 1800,
+ maxInstances: int = 1,
+ misfireGraceTime: int = 1800,
**kwargs: Any,
) -> None:
"""
@@ -95,23 +95,23 @@ class EventManagement:
self.scheduler.add_job(
func,
trigger,
- id=job_id,
- replace_existing=replace_existing,
+ id=jobId,
+ replace_existing=replaceExisting,
coalesce=coalesce,
- max_instances=max_instances,
- misfire_grace_time=misfire_grace_time,
+ max_instances=maxInstances,
+ misfire_grace_time=misfireGraceTime,
**kwargs,
)
logger.info(
- f"Registered interval job '{job_id}' (h={hours}, m={minutes}, s={seconds})"
+ f"Registered interval job '{jobId}' (h={hours}, m={minutes}, s={seconds})"
)
- def remove(self, job_id: str) -> None:
+ def remove(self, jobId: str) -> None:
try:
- self.scheduler.remove_job(job_id)
- logger.info(f"Removed job '{job_id}'")
+ self.scheduler.remove_job(jobId)
+ logger.info(f"Removed job '{jobId}'")
except Exception as exc:
- logger.warning(f"Could not remove job '{job_id}': {exc}")
+ logger.warning(f"Could not remove job '{jobId}': {exc}")
# Singleton instance for easy import and reuse