diff --git a/modules/connectors/connectorVoiceGoogle.py b/modules/connectors/connectorVoiceGoogle.py index 5cb43f35..68d0cd31 100644 --- a/modules/connectors/connectorVoiceGoogle.py +++ b/modules/connectors/connectorVoiceGoogle.py @@ -55,13 +55,13 @@ class ConnectorGoogleSpeech: logger.error(f"❌ Failed to initialize Google Cloud clients: {e}") raise - async def speech_to_text(self, audioContent: bytes, language: str = "de-DE", + async def speechToText(self, audioContent: bytes, language: str = "de-DE", sampleRate: int = None, channels: int = None) -> Dict: """ Convert speech to text using Google Cloud Speech-to-Text API. Args: - audio_content: Raw audio data (various formats supported) + audioContent: Raw audio data (various formats supported) language: Language code (e.g., 'de-DE', 'en-US') sample_rate: Audio sample rate (auto-detected if None) channels: Number of audio channels (auto-detected if None) @@ -72,7 +72,7 @@ class ConnectorGoogleSpeech: try: # Auto-detect audio format if not provided if sampleRate is None or channels is None: - validation = self.validate_audio_format(audioContent) + validation = self.validateAudioFormat(audioContent) if not validation["valid"]: return { "success": False, @@ -348,8 +348,8 @@ class ConnectorGoogleSpeech: "error": str(e) } - async def translate_text(self, text: str, target_language: str = "en", - source_language: str = "de") -> Dict: + async def translateText(self, text: str, targetLanguage: str = "en", + sourceLanguage: str = "de") -> Dict: """ Translate text using Google Cloud Translation API. @@ -370,28 +370,28 @@ class ConnectorGoogleSpeech: "error": "Empty text provided" } - logger.info(f"🌐 Translating: '{text}' ({source_language} -> {target_language})") + logger.info(f"🌐 Translating: '{text}' ({sourceLanguage} -> {targetLanguage})") # Perform translation result = self.translate_client.translate( text, - source_language=source_language, - target_language=target_language + source_language=sourceLanguage, + target_language=targetLanguage ) - translated_text = result['translatedText'] - detected_language = result.get('detectedSourceLanguage', source_language) + translatedText = result['translatedText'] + detectedLanguage = result.get('detectedSourceLanguage', sourceLanguage) # Decode HTML entities in translated text - translated_text = html.unescape(translated_text) + translatedText = html.unescape(translatedText) - logger.info(f"✅ Translation successful: '{translated_text}'") + logger.info(f"✅ Translation successful: '{translatedText}'") return { "success": True, - "translated_text": translated_text, - "source_language": detected_language, - "target_language": target_language, + "translated_text": translatedText, + "source_language": detectedLanguage, + "target_language": targetLanguage, "original_text": text } @@ -403,14 +403,14 @@ class ConnectorGoogleSpeech: "error": str(e) } - async def speech_to_translated_text(self, audio_content: bytes, - from_language: str = "de-DE", - to_language: str = "en") -> Dict: + async def speechToTranslatedText(self, audioContent: bytes, + fromLanguage: str = "de-DE", + toLanguage: str = "en") -> Dict: """ Complete pipeline: Speech-to-Text + Translation. Args: - audio_content: Raw audio data + audioContent: Raw audio data from_language: Source language for speech recognition to_language: Target language for translation @@ -418,52 +418,52 @@ class ConnectorGoogleSpeech: Dict containing original text, translated text, and metadata """ try: - logger.info(f"🔄 Starting speech-to-translation pipeline: {from_language} -> {to_language}") + logger.info(f"🔄 Starting speech-to-translation pipeline: {fromLanguage} -> {toLanguage}") # Step 1: Speech-to-Text - speech_result = await self.speech_to_text( - audio_content=audio_content, - language=from_language + speechResult = await self.speechToText( + audioContent=audioContent, + language=fromLanguage ) - if not speech_result["success"]: + if not speechResult["success"]: return { "success": False, "original_text": "", "translated_text": "", - "error": f"Speech recognition failed: {speech_result.get('error', 'Unknown error')}" + "error": f"Speech recognition failed: {speechResult.get('error', 'Unknown error')}" } - original_text = speech_result["text"] + originalText = speechResult["text"] # Step 2: Translation - translation_result = await self.translate_text( - text=original_text, - source_language=from_language.split('-')[0], # Convert 'de-DE' to 'de' - target_language=to_language.split('-')[0] # Convert 'en-US' to 'en' + translationResult = await self.translateText( + text=originalText, + sourceLanguage=fromLanguage.split('-')[0], # Convert 'de-DE' to 'de' + targetLanguage=toLanguage.split('-')[0] # Convert 'en-US' to 'en' ) - if not translation_result["success"]: + if not translationResult["success"]: return { "success": False, - "original_text": original_text, + "original_text": originalText, "translated_text": "", - "error": f"Translation failed: {translation_result.get('error', 'Unknown error')}" + "error": f"Translation failed: {translationResult.get('error', 'Unknown error')}" } - translated_text = translation_result["translated_text"] + translatedText = translationResult["translated_text"] logger.info(f"✅ Complete pipeline successful:") - logger.info(f" Original: '{original_text}'") - logger.info(f" Translated: '{translated_text}'") + logger.info(f" Original: '{originalText}'") + logger.info(f" Translated: '{translatedText}'") return { "success": True, - "original_text": original_text, - "translated_text": translated_text, - "confidence": speech_result["confidence"], - "source_language": from_language, - "target_language": to_language + "original_text": originalText, + "translated_text": translatedText, + "confidence": speechResult["confidence"], + "source_language": fromLanguage, + "target_language": toLanguage } except Exception as e: @@ -475,19 +475,19 @@ class ConnectorGoogleSpeech: "error": str(e) } - def validate_audio_format(self, audio_content: bytes) -> Dict: + def validateAudioFormat(self, audioContent: bytes) -> Dict: """ Validate audio format for Google Cloud Speech-to-Text. Args: - audio_content: Raw audio data + audioContent: Raw audio data Returns: Dict containing validation results """ try: # Basic validation - if len(audio_content) < 100: + if len(audioContent) < 100: return { "valid": False, "error": "Audio too short (less than 100 bytes)" @@ -499,11 +499,11 @@ class ConnectorGoogleSpeech: channels = 1 # Default fallback # Debug: Log first few bytes for format detection - logger.debug(f"Audio header bytes: {audio_content[:20].hex()}") - logger.debug(f"Audio content length: {len(audio_content)} bytes") + logger.debug(f"Audio header bytes: {audioContent[:20].hex()}") + logger.debug(f"Audio content length: {len(audioContent)} bytes") # Check for WEBM/OPUS format (common from web recordings) - if audio_content.startswith(b'\x1a\x45\xdf\xa3'): + if audioContent.startswith(b'\x1a\x45\xdf\xa3'): audio_format = "webm_opus" sample_rate = 48000 # WEBM OPUS typically uses 48kHz channels = 1 @@ -511,10 +511,10 @@ class ConnectorGoogleSpeech: # Check for specific header patterns seen in logs (43c381...) # This appears to be a different audio format or corrupted WEBM - elif audio_content.startswith(b'\x43\xc3\x81') and len(audio_content) > 1000: + elif audioContent.startswith(b'\x43\xc3\x81') and len(audioContent) > 1000: # This might be a different format or corrupted audio # Try to detect if it's actually WEBM by looking deeper - if b'webm' in audio_content[:200] or b'opus' in audio_content[:200]: + if b'webm' in audioContent[:200] or b'opus' in audioContent[:200]: audio_format = "webm_opus" sample_rate = 48000 channels = 1 @@ -524,58 +524,58 @@ class ConnectorGoogleSpeech: audio_format = "linear16" sample_rate = 16000 channels = 1 - logger.warning(f"Unknown audio format with header {audio_content[:8].hex()}, trying LINEAR16") + logger.warning(f"Unknown audio format with header {audioContent[:8].hex()}, trying LINEAR16") # Check for WEBM format (alternative detection) - elif b'webm' in audio_content[:100].lower() or b'opus' in audio_content[:100].lower(): + elif b'webm' in audioContent[:100].lower() or b'opus' in audioContent[:100].lower(): audio_format = "webm_opus" sample_rate = 48000 # WEBM OPUS typically uses 48kHz channels = 1 logger.info(f"Detected WEBM format: {sample_rate}Hz, {channels}ch") # Check for MediaRecorder WEBM chunks (common in browser recordings) - elif audio_content.startswith(b'\x1a\x45\xdf\xa3') and len(audio_content) > 1000: + elif audioContent.startswith(b'\x1a\x45\xdf\xa3') and len(audioContent) > 1000: audio_format = "webm_opus" sample_rate = 48000 # Browser MediaRecorder typically uses 48kHz channels = 1 logger.info(f"Detected MediaRecorder WEBM: {sample_rate}Hz, {channels}ch") # Check for OPUS format by looking for OPUS magic bytes - elif audio_content.startswith(b'OpusHead') or b'OpusHead' in audio_content[:50]: + elif audioContent.startswith(b'OpusHead') or b'OpusHead' in audioContent[:50]: audio_format = "webm_opus" sample_rate = 48000 # OPUS typically uses 48kHz channels = 1 logger.info(f"Detected OPUS format: {sample_rate}Hz, {channels}ch") # Check for OGG format (often contains OPUS) - elif audio_content.startswith(b'OggS'): + elif audioContent.startswith(b'OggS'): audio_format = "webm_opus" sample_rate = 48000 # OGG OPUS typically uses 48kHz channels = 1 logger.info(f"Detected OGG format: {sample_rate}Hz, {channels}ch") # Check for WAV format - elif audio_content.startswith(b'RIFF') and b'WAVE' in audio_content[:12]: + elif audioContent.startswith(b'RIFF') and b'WAVE' in audioContent[:12]: audio_format = "wav" # Try to extract sample rate from WAV header try: # WAV header sample rate is at offset 24-27 (little endian) - sample_rate = int.from_bytes(audio_content[24:28], 'little') - channels = int.from_bytes(audio_content[22:24], 'little') + sample_rate = int.from_bytes(audioContent[24:28], 'little') + channels = int.from_bytes(audioContent[22:24], 'little') logger.info(f"Detected WAV format: {sample_rate}Hz, {channels}ch") except: sample_rate = 16000 # Fallback channels = 1 # Check for MP3 format - elif audio_content.startswith(b'\xff\xfb') or audio_content.startswith(b'ID3'): + elif audioContent.startswith(b'\xff\xfb') or audioContent.startswith(b'ID3'): audio_format = "mp3" sample_rate = 44100 # MP3 typically uses 44.1kHz channels = 2 # Usually stereo logger.info(f"Detected MP3 format: {sample_rate}Hz, {channels}ch") # Check for FLAC format - elif audio_content.startswith(b'fLaC'): + elif audioContent.startswith(b'fLaC'): audio_format = "flac" sample_rate = 44100 # Common FLAC sample rate channels = 2 @@ -594,31 +594,31 @@ class ConnectorGoogleSpeech: estimated_duration = 3.0 # Assume 3 seconds for web recordings else: # Rough estimate for uncompressed audio - estimated_duration = len(audio_content) / (sample_rate * channels * 2) # 16-bit = 2 bytes per sample + estimated_duration = len(audioContent) / (sample_rate * channels * 2) # 16-bit = 2 bytes per sample # Check if audio is too short (less than 0.5 seconds) if estimated_duration < 0.5: logger.warning(f"Audio too short: {estimated_duration:.2f}s, may not be recognized") # Log audio details for debugging - logger.info(f"Audio analysis: {len(audio_content)} bytes, {estimated_duration:.2f}s, {sample_rate}Hz, {channels}ch, format={audio_format}") + logger.info(f"Audio analysis: {len(audioContent)} bytes, {estimated_duration:.2f}s, {sample_rate}Hz, {channels}ch, format={audio_format}") # Check audio levels (simple check for silence) if audio_format == "webm_opus": # For WEBM, we can't easily check levels, but log the first few bytes - logger.debug(f"Audio sample bytes: {audio_content[:20].hex()}") + logger.debug(f"Audio sample bytes: {audioContent[:20].hex()}") # Check if audio has some variation (not all same bytes) - if len(audio_content) > 100: - sample_bytes = audio_content[100:200] # Skip header + if len(audioContent) > 100: + sample_bytes = audioContent[100:200] # Skip header if len(set(sample_bytes)) < 5: # Less than 5 different byte values logger.warning("Audio may be silent or very quiet (low byte variation)") else: logger.debug(f"Audio has good byte variation: {len(set(sample_bytes))} unique values") else: # For PCM audio, check for silence - if len(audio_content) > 100: + if len(audioContent) > 100: # Convert first 100 bytes to check for silence - sample_bytes = audio_content[:100] + sample_bytes = audioContent[:100] if all(b == 0 for b in sample_bytes): logger.warning("Audio appears to be silent (all zeros)") else: @@ -632,7 +632,7 @@ class ConnectorGoogleSpeech: "format": audio_format, "sample_rate": sample_rate, "channels": channels, - "size": len(audio_content), + "size": len(audioContent), "estimated_duration": estimated_duration } @@ -642,7 +642,7 @@ class ConnectorGoogleSpeech: "error": f"Validation error: {e}" } - async def text_to_speech(self, text: str, language_code: str = "de-DE", voice_name: str = None) -> Dict[str, Any]: + async def textToSpeech(self, text: str, languageCode: str = "de-DE", voiceName: str = None) -> Dict[str, Any]: """ Convert text to speech using Google Cloud Text-to-Speech. @@ -655,38 +655,38 @@ class ConnectorGoogleSpeech: Dict with success status and audio data """ try: - logger.info(f"Converting text to speech: '{text[:50]}...' in {language_code}") + logger.info(f"Converting text to speech: '{text[:50]}...' in {languageCode}") # Set up the synthesis input - synthesis_input = texttospeech.SynthesisInput(text=text) + synthesisInput = texttospeech.SynthesisInput(text=text) # Build the voice request - selected_voice = voice_name or self._get_default_voice(language_code) + selectedVoice = voiceName or self._getDefaultVoice(languageCode) - if not selected_voice: + if not selectedVoice: return { "success": False, - "error": f"No voice specified for language {language_code}. Please select a voice." + "error": f"No voice specified for language {languageCode}. Please select a voice." } - logger.info(f"Using TTS voice: {selected_voice} for language: {language_code}") + logger.info(f"Using TTS voice: {selectedVoice} for language: {languageCode}") voice = texttospeech.VoiceSelectionParams( - language_code=language_code, - name=selected_voice, + language_code=languageCode, + name=selectedVoice, ssml_gender=texttospeech.SsmlVoiceGender.NEUTRAL ) # Select the type of audio file to return - audio_config = texttospeech.AudioConfig( + audioConfig = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.MP3 ) # Perform the text-to-speech request response = self.tts_client.synthesize_speech( - input=synthesis_input, + input=synthesisInput, voice=voice, - audio_config=audio_config + audio_config=audioConfig ) # Return the audio content @@ -694,7 +694,7 @@ class ConnectorGoogleSpeech: "success": True, "audio_content": response.audio_content, "audio_format": "mp3", - "language_code": language_code, + "language_code": languageCode, "voice_name": voice.name } @@ -705,14 +705,14 @@ class ConnectorGoogleSpeech: "error": f"Text-to-Speech failed: {str(e)}" } - def _get_default_voice(self, language_code: str) -> str: + def _getDefaultVoice(self, languageCode: str) -> str: """ Get default voice name for a language code. Returns None - no defaults, let the frontend handle voice selection. """ return None - async def get_available_languages(self) -> Dict[str, Any]: + async def getAvailableLanguages(self) -> Dict[str, Any]: """ Get available languages from Google Cloud Text-to-Speech. @@ -749,7 +749,7 @@ class ConnectorGoogleSpeech: "languages": [] } - async def get_available_voices(self, language_code: Optional[str] = None) -> Dict[str, Any]: + async def getAvailableVoices(self, languageCode: Optional[str] = None) -> Dict[str, Any]: """ Get available voices from Google Cloud Text-to-Speech. @@ -760,19 +760,19 @@ class ConnectorGoogleSpeech: Dict containing success status and list of available voices """ try: - logger.info(f"🎤 Getting available voices from Google Cloud TTS, language filter: {language_code}") + logger.info(f"🎤 Getting available voices from Google Cloud TTS, language filter: {languageCode}") # List voices from Google Cloud TTS voices = self.tts_client.list_voices() - available_voices = [] + availableVoices = [] for voice in voices: # Extract language code from voice name (e.g., 'de-DE-Wavenet-A' -> 'de-DE') - voice_language = voice.language_codes[0] if voice.language_codes else None + voiceLanguage = voice.language_codes[0] if voice.language_codes else None # Filter by language if specified - if language_code and voice_language != language_code: + if languageCode and voiceLanguage != languageCode: continue # Determine gender from voice name (A/C = male, B/D = female) @@ -784,25 +784,25 @@ class ConnectorGoogleSpeech: gender = "Female" # Create voice info - voice_info = { + voiceInfo = { "name": voice.name, - "language_code": voice_language, + "language_code": voiceLanguage, "gender": gender, "ssml_gender": voice.ssml_gender.name if voice.ssml_gender else "NEUTRAL", "natural_sample_rate_hertz": voice.natural_sample_rate_hertz } - available_voices.append(voice_info) + availableVoices.append(voiceInfo) # Sort by language code, then by gender, then by name - available_voices.sort(key=lambda x: (x["language_code"], x["gender"], x["name"])) + availableVoices.sort(key=lambda x: (x["language_code"], x["gender"], x["name"])) - logger.info(f"✅ Found {len(available_voices)} voices for language filter: {language_code}") + logger.info(f"✅ Found {len(availableVoices)} voices for language filter: {languageCode}") return { "success": True, - "voices": available_voices, - "total_count": len(available_voices) + "voices": availableVoices, + "total_count": len(availableVoices) } except Exception as e: diff --git a/modules/features/syncDelta/mainSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py index 3fc9e7af..2d6a2b1b 100644 --- a/modules/features/syncDelta/mainSyncDelta.py +++ b/modules/features/syncDelta/mainSyncDelta.py @@ -76,7 +76,7 @@ class ManagerSyncDelta: try: if not eventUser: logger.error("Event user not found - SharePoint connection required") - self._log_audit_event("SYNC_INIT", "FAILED", "Event user not found") + self._logAuditEvent("SYNC_INIT", "FAILED", "Event user not found") else: self.services = getServices(eventUser, None) # Read config values using services @@ -88,56 +88,56 @@ class ManagerSyncDelta: logger.error( f"No SharePoint connection found for user: {self.SHAREPOINT_USER_ID}" ) - self._log_audit_event("SYNC_INIT", "FAILED", f"No SharePoint connection for user: {self.SHAREPOINT_USER_ID}") + self._logAuditEvent("SYNC_INIT", "FAILED", f"No SharePoint connection for user: {self.SHAREPOINT_USER_ID}") else: # Configure SharePoint service token and set connector reference if not self.services.sharepoint.setAccessTokenFromConnection( self.sharepointConnection ): logger.error("Failed to set SharePoint token from UserConnection") - self._log_audit_event("SYNC_INIT", "FAILED", "Failed to set SharePoint token") + self._logAuditEvent("SYNC_INIT", "FAILED", "Failed to set SharePoint token") else: logger.info( f"SharePoint token configured for connection: {self.sharepointConnection.id}" ) - self._log_audit_event("SYNC_INIT", "SUCCESS", f"SharePoint token configured for connection: {self.sharepointConnection.id}") + self._logAuditEvent("SYNC_INIT", "SUCCESS", f"SharePoint token configured for connection: {self.sharepointConnection.id}") except Exception as e: logger.error(f"Initialization error in ManagerSyncDelta.__init__: {e}") - self._log_audit_event("SYNC_INIT", "ERROR", f"Initialization error: {str(e)}") + self._logAuditEvent("SYNC_INIT", "ERROR", f"Initialization error: {str(e)}") - def _log_audit_event(self, action: str, status: str, details: str): + def _logAuditEvent(self, action: str, status: str, details: str): """Log audit events for sync operations to memory.""" try: timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S UTC") - user_id = str(self.eventUser.id) if self.eventUser else "system" - log_entry = f"{timestamp} | {user_id} | {action} | {status} | {details}" - self.sync_audit_log.append(log_entry) - logger.info(f"Sync Audit: {log_entry}") + userId = str(self.eventUser.id) if self.eventUser else "system" + logEntry = f"{timestamp} | {userId} | {action} | {status} | {details}" + self.sync_audit_log.append(logEntry) + logger.info(f"Sync Audit: {logEntry}") except Exception as e: logger.warning(f"Failed to log audit event: {str(e)}") - def _log_sync_changes(self, merge_details: dict, sync_mode: str): + def _logSyncChanges(self, mergeDetails: dict, syncMode: str): """Log detailed field changes for sync operations.""" try: # Log summary statistics - summary = f"Sync {sync_mode} - Updated: {merge_details['updated']}, Added: {merge_details['added']}, Unchanged: {merge_details['unchanged']}" - self._log_audit_event("SYNC_CHANGES_SUMMARY", "INFO", summary) + summary = f"Sync {syncMode} - Updated: {mergeDetails['updated']}, Added: {mergeDetails['added']}, Unchanged: {mergeDetails['unchanged']}" + self._logAuditEvent("SYNC_CHANGES_SUMMARY", "INFO", summary) # Log individual field changes (limit to first 10 to avoid spam) - for change in merge_details['changes'][:10]: + for change in mergeDetails['changes'][:10]: # Truncate very long changes to avoid logging issues if len(change) > 500: change = change[:500] + "... [truncated]" - self._log_audit_event("SYNC_FIELD_CHANGE", "INFO", f"{sync_mode}: {change}") + self._logAuditEvent("SYNC_FIELD_CHANGE", "INFO", f"{syncMode}: {change}") # Log count if there were more changes - if len(merge_details['changes']) > 10: - self._log_audit_event("SYNC_FIELD_CHANGE", "INFO", f"{sync_mode}: ... and {len(merge_details['changes']) - 10} more changes") + if len(mergeDetails['changes']) > 10: + self._logAuditEvent("SYNC_FIELD_CHANGE", "INFO", f"{syncMode}: ... and {len(mergeDetails['changes']) - 10} more changes") except Exception as e: logger.warning(f"Failed to log sync changes: {str(e)}") - async def _save_audit_log_to_sharepoint(self): + async def _saveAuditLogToSharepoint(self): """Save the sync audit log to SharePoint.""" try: if not self.sync_audit_log or not self.targetSite: @@ -160,12 +160,12 @@ class ManagerSyncDelta: ) logger.info(f"Sync audit log saved to SharePoint: {log_filename}") - self._log_audit_event("AUDIT_LOG_SAVE", "SUCCESS", f"Audit log saved to SharePoint: {log_filename}") + self._logAuditEvent("AUDIT_LOG_SAVE", "SUCCESS", f"Audit log saved to SharePoint: {log_filename}") return True except Exception as e: logger.error(f"Failed to save audit log to SharePoint: {str(e)}") - self._log_audit_event("AUDIT_LOG_SAVE", "FAILED", f"Failed to save audit log: {str(e)}") + self._logAuditEvent("AUDIT_LOG_SAVE", "FAILED", f"Failed to save audit log: {str(e)}") return False def getSyncFileName(self) -> str: @@ -246,12 +246,12 @@ class ManagerSyncDelta: """Perform Tickets to SharePoint synchronization using list-based interface and local CSV/XLSX handling.""" try: logger.info(f"Starting JIRA to SharePoint synchronization (Mode: {self.SYNC_MODE})") - self._log_audit_event("SYNC_START", "INFO", f"Starting JIRA to SharePoint sync (Mode: {self.SYNC_MODE})") + self._logAuditEvent("SYNC_START", "INFO", f"Starting JIRA to SharePoint sync (Mode: {self.SYNC_MODE})") # Initialize interface if not await self.initializeInterface(): logger.error("Failed to initialize connectors") - self._log_audit_event("SYNC_INTERFACE", "FAILED", "Failed to initialize connectors") + self._logAuditEvent("SYNC_INTERFACE", "FAILED", "Failed to initialize connectors") return False # Dump current Jira fields to text file for reference @@ -287,7 +287,7 @@ class ManagerSyncDelta: if self.SYNC_MODE == "xlsx": # Export tickets to list data_list = await sync_interface.exportTicketsAsList() - self._log_audit_event("SYNC_EXPORT", "INFO", f"Exported {len(data_list)} tickets from JIRA") + self._logAuditEvent("SYNC_EXPORT", "INFO", f"Exported {len(data_list)} tickets from JIRA") # Read existing Excel headers/content existing_data = [] existing_headers = {"header1": "Header 1", "header2": "Header 2"} @@ -303,7 +303,7 @@ class ManagerSyncDelta: merged_data, merge_details = self.mergeJiraWithExistingDetailed(data_list, existing_data) # Log detailed changes for Excel mode - self._log_sync_changes(merge_details, "EXCEL") + self._logSyncChanges(merge_details, "EXCEL") await self.backupSharepointFile(filename=sync_file_name) excel_bytes = self.createExcelContent(merged_data, existing_headers) @@ -319,15 +319,15 @@ class ManagerSyncDelta: siteId=self.targetSite['id'], filePath=file_path ) excel_rows, _ = self.parseExcelContent(excel_content) - self._log_audit_event("SYNC_IMPORT", "INFO", f"Importing {len(excel_rows)} Excel rows back to tickets") + self._logAuditEvent("SYNC_IMPORT", "INFO", f"Importing {len(excel_rows)} Excel rows back to tickets") except Exception as e: excel_rows = [] - self._log_audit_event("SYNC_IMPORT", "WARNING", f"Failed to download Excel for import: {str(e)}") + self._logAuditEvent("SYNC_IMPORT", "WARNING", f"Failed to download Excel for import: {str(e)}") await sync_interface.importListToTickets(excel_rows) else: # CSV mode (default) # Export tickets to list data_list = await sync_interface.exportTicketsAsList() - self._log_audit_event("SYNC_EXPORT", "INFO", f"Exported {len(data_list)} tickets from JIRA") + self._logAuditEvent("SYNC_EXPORT", "INFO", f"Exported {len(data_list)} tickets from JIRA") # Prepare headers by reading existing CSV if present existing_headers = {"header1": "Header 1", "header2": "Header 2"} existing_data: list[dict] = [] @@ -361,26 +361,26 @@ class ManagerSyncDelta: ) df = pd.read_csv(io.BytesIO(csv_content), skiprows=2, quoting=1, escapechar='\\', on_bad_lines='skip', engine='python') csv_rows = df.to_dict('records') - self._log_audit_event("SYNC_IMPORT", "INFO", f"Importing {len(csv_rows)} CSV rows back to tickets") + self._logAuditEvent("SYNC_IMPORT", "INFO", f"Importing {len(csv_rows)} CSV rows back to tickets") except Exception as e: csv_rows = [] - self._log_audit_event("SYNC_IMPORT", "WARNING", f"Failed to download CSV for import: {str(e)}") + self._logAuditEvent("SYNC_IMPORT", "WARNING", f"Failed to download CSV for import: {str(e)}") await sync_interface.importListToTickets(csv_rows) logger.info(f"JIRA to SharePoint synchronization completed successfully (Mode: {self.SYNC_MODE})") - self._log_audit_event("SYNC_COMPLETE", "SUCCESS", f"JIRA to SharePoint sync completed successfully (Mode: {self.SYNC_MODE})") + self._logAuditEvent("SYNC_COMPLETE", "SUCCESS", f"JIRA to SharePoint sync completed successfully (Mode: {self.SYNC_MODE})") # Save audit log to SharePoint - await self._save_audit_log_to_sharepoint() + await self._saveAuditLogToSharepoint() return True except Exception as e: logger.error(f"Error during JIRA to SharePoint synchronization: {str(e)}") - self._log_audit_event("SYNC_ERROR", "FAILED", f"Error during sync: {str(e)}") + self._logAuditEvent("SYNC_ERROR", "FAILED", f"Error during sync: {str(e)}") # Save audit log to SharePoint even on error - await self._save_audit_log_to_sharepoint() + await self._saveAuditLogToSharepoint() return False @@ -395,14 +395,14 @@ class ManagerSyncDelta: destFolder=self.SHAREPOINT_BACKUP_FOLDER, destFile=backup_filename, ) - self._log_audit_event("SYNC_BACKUP", "SUCCESS", f"Backed up file: {filename} -> {backup_filename}") + self._logAuditEvent("SYNC_BACKUP", "SUCCESS", f"Backed up file: {filename} -> {backup_filename}") return True except Exception as e: if "itemNotFound" in str(e) or "404" in str(e): - self._log_audit_event("SYNC_BACKUP", "SKIPPED", f"File not found for backup: {filename}") + self._logAuditEvent("SYNC_BACKUP", "SKIPPED", f"File not found for backup: {filename}") return True logger.warning(f"Backup failed: {e}") - self._log_audit_event("SYNC_BACKUP", "FAILED", f"Backup failed for {filename}: {str(e)}") + self._logAuditEvent("SYNC_BACKUP", "FAILED", f"Backup failed for {filename}: {str(e)}") return False def mergeJiraWithExistingDetailed(self, jira_data: list[dict], existing_data: list[dict]) -> tuple[list[dict], dict]: @@ -536,18 +536,18 @@ class ManagerSyncDelta: if not isinstance(content, list): return "" - def extract_text_from_content(content_list, list_level=0): + def extractTextFromContent(contentList, listLevel=0): """Recursively extract text from ADF content with proper formatting.""" - text_parts = [] - list_counter = 1 + textParts = [] + listCounter = 1 - for item in content_list: + for item in contentList: if not isinstance(item, dict): continue - item_type = item.get("type", "") + itemType = item.get("type", "") - if item_type == "text": + if itemType == "text": # Extract text content, preserving formatting text = item.get("text", "") marks = item.get("marks", []) @@ -567,99 +567,99 @@ class ManagerSyncDelta: if href: text = f"[{text}]({href})" - text_parts.append(text) + textParts.append(text) - elif item_type == "hardBreak": - text_parts.append("\n") + elif itemType == "hardBreak": + textParts.append("\n") - elif item_type == "paragraph": - paragraph_content = item.get("content", []) - if paragraph_content: - paragraph_text = extract_text_from_content(paragraph_content, list_level) - if paragraph_text.strip(): - text_parts.append(paragraph_text) + elif itemType == "paragraph": + paragraphContent = item.get("content", []) + if paragraphContent: + paragraphText = extractTextFromContent(paragraphContent, listLevel) + if paragraphText.strip(): + textParts.append(paragraphText) - elif item_type == "bulletList": - list_content = item.get("content", []) - for list_item in list_content: - if list_item.get("type") == "listItem": - list_item_content = list_item.get("content", []) - for list_paragraph in list_item_content: - if list_paragraph.get("type") == "paragraph": - list_paragraph_content = list_paragraph.get("content", []) - if list_paragraph_content: - indent = " " * list_level - bullet_text = extract_text_from_content(list_paragraph_content, list_level + 1) - if bullet_text.strip(): - text_parts.append(f"{indent}• {bullet_text}") + elif itemType == "bulletList": + listContent = item.get("content", []) + for listItem in listContent: + if listItem.get("type") == "listItem": + listItemContent = listItem.get("content", []) + for listParagraph in listItemContent: + if listParagraph.get("type") == "paragraph": + listParagraphContent = listParagraph.get("content", []) + if listParagraphContent: + indent = " " * listLevel + bulletText = extractTextFromContent(listParagraphContent, listLevel + 1) + if bulletText.strip(): + textParts.append(f"{indent}• {bulletText}") - elif item_type == "orderedList": - list_content = item.get("content", []) - for list_item in list_content: - if list_item.get("type") == "listItem": - list_item_content = list_item.get("content", []) - for list_paragraph in list_item_content: - if list_paragraph.get("type") == "paragraph": - list_paragraph_content = list_paragraph.get("content", []) - if list_paragraph_content: - indent = " " * list_level - ordered_text = extract_text_from_content(list_paragraph_content, list_level + 1) - if ordered_text.strip(): - text_parts.append(f"{indent}{list_counter}. {ordered_text}") - list_counter += 1 + elif itemType == "orderedList": + listContent = item.get("content", []) + for listItem in listContent: + if listItem.get("type") == "listItem": + listItemContent = listItem.get("content", []) + for listParagraph in listItemContent: + if listParagraph.get("type") == "paragraph": + listParagraphContent = listParagraph.get("content", []) + if listParagraphContent: + indent = " " * listLevel + orderedText = extractTextFromContent(listParagraphContent, listLevel + 1) + if orderedText.strip(): + textParts.append(f"{indent}{listCounter}. {orderedText}") + listCounter += 1 - elif item_type == "listItem": + elif itemType == "listItem": # Handle nested list items - list_item_content = item.get("content", []) - if list_item_content: - text_parts.append(extract_text_from_content(list_item_content, list_level)) + listItemContent = item.get("content", []) + if listItemContent: + textParts.append(extractTextFromContent(listItemContent, listLevel)) - elif item_type == "embedCard": + elif itemType == "embedCard": # Handle embedded content (videos, etc.) attrs = item.get("attrs", {}) url = attrs.get("url", "") if url: - text_parts.append(f"[Embedded Content: {url}]") + textParts.append(f"[Embedded Content: {url}]") - elif item_type == "codeBlock": + elif itemType == "codeBlock": # Handle code blocks - code_content = item.get("content", []) - if code_content: - code_text = extract_text_from_content(code_content, list_level) - if code_text.strip(): - text_parts.append(f"```\n{code_text}\n```") + codeContent = item.get("content", []) + if codeContent: + codeText = extractTextFromContent(codeContent, listLevel) + if codeText.strip(): + textParts.append(f"```\n{codeText}\n```") - elif item_type == "blockquote": + elif itemType == "blockquote": # Handle blockquotes - quote_content = item.get("content", []) - if quote_content: - quote_text = extract_text_from_content(quote_content, list_level) - if quote_text.strip(): - text_parts.append(f"> {quote_text}") + quoteContent = item.get("content", []) + if quoteContent: + quoteText = extractTextFromContent(quoteContent, listLevel) + if quoteText.strip(): + textParts.append(f"> {quoteText}") - elif item_type == "heading": + elif itemType == "heading": # Handle headings - heading_content = item.get("content", []) - if heading_content: - heading_text = extract_text_from_content(heading_content, list_level) - if heading_text.strip(): + headingContent = item.get("content", []) + if headingContent: + headingText = extractTextFromContent(headingContent, listLevel) + if headingText.strip(): level = item.get("attrs", {}).get("level", 1) - text_parts.append(f"{'#' * level} {heading_text}") + textParts.append(f"{'#' * level} {headingText}") - elif item_type == "rule": + elif itemType == "rule": # Handle horizontal rules - text_parts.append("---") + textParts.append("---") else: # Handle unknown types by trying to extract content if "content" in item: - content_text = extract_text_from_content(item.get("content", []), list_level) - if content_text.strip(): - text_parts.append(content_text) + contentText = extractTextFromContent(item.get("content", []), listLevel) + if contentText.strip(): + textParts.append(contentText) - return "\n".join(text_parts) + return "\n".join(textParts) - result = extract_text_from_content(content) + result = extractTextFromContent(content) return result.strip() # Utility: dump all ticket fields (name -> field id) to a text file (generic) @@ -800,7 +800,7 @@ def startSyncManager(eventUser): if _sync_manager.APP_ENV_TYPE == "prod": _sync_manager.services.utils.eventRegisterCron( job_id="syncDelta.syncTicket", - func=scheduled_sync, + func=scheduledSync, cron_kwargs={"minute": "0,20,40"}, replace_existing=True, coalesce=True, @@ -814,7 +814,7 @@ def startSyncManager(eventUser): logger.error(f"Failed to register scheduler for DG sync: {str(e)}") return _sync_manager -async def scheduled_sync(): +async def scheduledSync(): """Scheduled sync function that uses the global sync manager.""" try: global _sync_manager diff --git a/modules/interfaces/interfaceVoiceObjects.py b/modules/interfaces/interfaceVoiceObjects.py index 2bb1b729..e83ef77d 100644 --- a/modules/interfaces/interfaceVoiceObjects.py +++ b/modules/interfaces/interfaceVoiceObjects.py @@ -74,10 +74,10 @@ class VoiceObjects: logger.info(f"🎤 Speech-to-text request: {len(audioContent)} bytes, language: {language}") connector = self._getGoogleSpeechConnector() - result = await connector.speech_to_text( - audio_content=audioContent, + result = await connector.speechToText( + audioContent=audioContent, language=language, - sample_rate=sampleRate, + sampleRate=sampleRate, channels=channels ) @@ -123,10 +123,10 @@ class VoiceObjects: } connector = self._getGoogleSpeechConnector() - result = await connector.translate_text( + result = await connector.translateText( text=text, - source_language=sourceLanguage, - target_language=targetLanguage + sourceLanguage=sourceLanguage, + targetLanguage=targetLanguage ) if result["success"]: @@ -164,10 +164,10 @@ class VoiceObjects: logger.info(f"🔄 Speech-to-translation pipeline: {fromLanguage} -> {toLanguage}") connector = self._getGoogleSpeechConnector() - result = await connector.speech_to_translated_text( - audio_content=audioContent, - from_language=fromLanguage, - to_language=toLanguage + result = await connector.speechToTranslatedText( + audioContent=audioContent, + fromLanguage=fromLanguage, + toLanguage=toLanguage ) if result["success"]: @@ -213,10 +213,10 @@ class VoiceObjects: } connector = self._getGoogleSpeechConnector() - result = await connector.text_to_speech( + result = await connector.textToSpeech( text=text, - language_code=languageCode, - voice_name=voiceName + languageCode=languageCode, + voiceName=voiceName ) if result["success"]: @@ -356,7 +356,7 @@ class VoiceObjects: logger.info("🌐 Getting available languages from Google Cloud TTS") connector = self._getGoogleSpeechConnector() - result = await connector.get_available_languages() + result = await connector.getAvailableLanguages() if result["success"]: logger.info(f"✅ Found {len(result['languages'])} available languages") @@ -387,7 +387,7 @@ class VoiceObjects: logger.info(f"🎤 Getting available voices, language filter: {languageCode}") connector = self._getGoogleSpeechConnector() - result = await connector.get_available_voices(language_code=languageCode) + result = await connector.getAvailableVoices(languageCode=languageCode) if result["success"]: logger.info(f"✅ Found {len(result['voices'])} voices for language filter: {languageCode}") @@ -420,7 +420,7 @@ class VoiceObjects: logger.debug(f"Validating audio format: {len(audioContent)} bytes") connector = self._getGoogleSpeechConnector() - result = connector.validate_audio_format(audioContent) + result = connector.validateAudioFormat(audioContent) if result["valid"]: logger.debug(f"✅ Audio validation successful: {result['format']}, {result['sample_rate']}Hz, {result['channels']}ch") @@ -451,10 +451,10 @@ class VoiceObjects: connector = self._getGoogleSpeechConnector() # Test with a simple translation - testResult = await connector.translate_text( + testResult = await connector.translateText( text="Hello", - source_language="en", - target_language="de" + sourceLanguage="en", + targetLanguage="de" ) if testResult["success"]: diff --git a/modules/routes/routeDataConnections.py b/modules/routes/routeDataConnections.py index eec8d140..684b95d2 100644 --- a/modules/routes/routeDataConnections.py +++ b/modules/routes/routeDataConnections.py @@ -23,62 +23,62 @@ from modules.shared.timezoneUtils import getUtcTimestamp # Configure logger logger = logging.getLogger(__name__) -def get_token_status_for_connection(interface, connection_id: str) -> tuple[str, Optional[float]]: +def getTokenStatusForConnection(interface, connectionId: str) -> tuple[str, Optional[float]]: """ Get token status and expiration for a connection. Args: interface: The database interface - connection_id: The connection ID to check + connectionId: The connection ID to check Returns: - tuple: (token_status, token_expires_at) - - token_status: 'active', 'expired', or 'none' - - token_expires_at: UTC timestamp or None + tuple: (tokenStatus, tokenExpiresAt) + - tokenStatus: 'active', 'expired', or 'none' + - tokenExpiresAt: UTC timestamp or None """ try: # Query tokens table for the latest token for this connection tokens = interface.db.getRecordset( Token, - recordFilter={"connectionId": connection_id} + recordFilter={"connectionId": connectionId} ) if not tokens: return "none", None # Find the most recent token (highest createdAt timestamp) - latest_token = None - latest_created_at = 0 + latestToken = None + latestCreatedAt = 0 - for token_data in tokens: - created_at = token_data.get("createdAt", 0) - if created_at > latest_created_at: - latest_created_at = created_at - latest_token = token_data + for tokenData in tokens: + createdAt = tokenData.get("createdAt", 0) + if createdAt > latestCreatedAt: + latestCreatedAt = createdAt + latestToken = tokenData - if not latest_token: + if not latestToken: return "none", None # Check if token is expired - expires_at = latest_token.get("expiresAt") - if not expires_at: + expiresAt = latestToken.get("expiresAt") + if not expiresAt: return "none", None - current_time = getUtcTimestamp() + currentTime = getUtcTimestamp() # Add 5 minute buffer for proactive refresh - buffer_time = 5 * 60 # 5 minutes in seconds - if expires_at <= current_time: - return "expired", expires_at - elif expires_at <= (current_time + buffer_time): + bufferTime = 5 * 60 # 5 minutes in seconds + if expiresAt <= currentTime: + return "expired", expiresAt + elif expiresAt <= (currentTime + bufferTime): # Token expires soon - mark as active but log for proactive refresh - logger.debug(f"Token for connection {connection_id} expires soon (in {expires_at - current_time} seconds)") - return "active", expires_at + logger.debug(f"Token for connection {connectionId} expires soon (in {expiresAt - currentTime} seconds)") + return "active", expiresAt else: - return "active", expires_at + return "active", expiresAt except Exception as e: - logger.error(f"Error getting token status for connection {connection_id}: {str(e)}") + logger.error(f"Error getting token status for connection {connectionId}: {str(e)}") return "none", None router = APIRouter( @@ -121,7 +121,7 @@ async def get_connections( enhanced_connections = [] for connection in connections: # Get token status for this connection - token_status, token_expires_at = get_token_status_for_connection(interface, connection.id) + tokenStatus, tokenExpiresAt = getTokenStatusForConnection(interface, connection.id) # Create enhanced connection with token status enhanced_connection = UserConnection( @@ -135,8 +135,8 @@ async def get_connections( connectedAt=connection.connectedAt, lastChecked=connection.lastChecked, expiresAt=connection.expiresAt, - tokenStatus=token_status, - tokenExpiresAt=token_expires_at + tokenStatus=tokenStatus, + tokenExpiresAt=tokenExpiresAt ) enhanced_connections.append(enhanced_connection) @@ -252,9 +252,8 @@ async def update_connection( # Update connection - models now handle timestamp serialization automatically interface.db.recordModify(UserConnection, connectionId, connection.model_dump()) - # Get token status for the updated connection - token_status, token_expires_at = get_token_status_for_connection(interface, connectionId) + tokenStatus, tokenExpiresAt = getTokenStatusForConnection(interface, connectionId) # Create enhanced connection with token status enhanced_connection = UserConnection( @@ -268,8 +267,8 @@ async def update_connection( connectedAt=connection.connectedAt, lastChecked=connection.lastChecked, expiresAt=connection.expiresAt, - tokenStatus=token_status, - tokenExpiresAt=token_expires_at + tokenStatus=tokenStatus, + tokenExpiresAt=tokenExpiresAt ) return enhanced_connection diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 87b56ceb..381b87fa 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -6,7 +6,6 @@ from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum from modules.interfaces.interfaceAiObjects import AiObjects -from modules.services.serviceAi.subDocumentProcessing import SubDocumentProcessing from modules.shared.jsonUtils import ( extractJsonString, repairBrokenJson, @@ -33,7 +32,6 @@ class AiService: self.aiObjects = None # Will be initialized in create() or _ensureAiObjectsInitialized() # Submodules initialized as None - will be set in _initializeSubmodules() after aiObjects is ready self.extractionService = None - self.documentProcessor = None def _initializeSubmodules(self): """Initialize all submodules after aiObjects is ready.""" @@ -43,10 +41,6 @@ class AiService: if self.extractionService is None: logger.info("Initializing ExtractionService...") self.extractionService = ExtractionService(self.services) - - if self.documentProcessor is None: - logger.info("Initializing SubDocumentProcessing...") - self.documentProcessor = SubDocumentProcessing(self.services, self.aiObjects) async def _ensureAiObjectsInitialized(self): """Ensure aiObjects is initialized and submodules are ready.""" @@ -529,7 +523,7 @@ Respond with ONLY a JSON object in this exact format: ) try: - # Ensure AI connectors are initialized before delegating to documentProcessor/generator + # Ensure AI connectors are initialized before delegating to generator if hasattr(self.services, 'ai') and hasattr(self.services.ai, '_ensureAiObjectsInitialized'): await self.services.ai._ensureAiObjectsInitialized() if options is None or (hasattr(options, 'operationType') and options.operationType is None): @@ -562,7 +556,7 @@ Respond with ONLY a JSON object in this exact format: # Use unified generation method for all document generation if documents and len(documents) > 0: self.services.workflow.progressLogUpdate(aiOperationId, 0.2, f"Extracting content from {len(documents)} documents") - extracted_content = await self.documentProcessor.callAiText(prompt, documents, options, aiOperationId) + extracted_content = await self.callAiText(prompt, documents, options, aiOperationId) else: self.services.workflow.progressLogUpdate(aiOperationId, 0.2, "Preparing for direct generation") extracted_content = None @@ -649,7 +643,7 @@ Respond with ONLY a JSON object in this exact format: self.services.workflow.progressLogUpdate(aiOperationId, 0.5, "Processing text call") if documents: # Use document processing for text calls with documents - result = await self.documentProcessor.callAiText(prompt, documents, options, aiOperationId) + result = await self.callAiText(prompt, documents, options, aiOperationId) else: # Use shared core function for direct text calls result = await self._callAiWithLooping(prompt, options, "text", None, None, aiOperationId) @@ -758,3 +752,17 @@ Respond with ONLY a JSON object in this exact format: logger.error(f"Error in AI image generation: {str(e)}") return {"success": False, "error": str(e)} + async def callAiText( + self, + prompt: str, + documents: Optional[List[ChatDocument]], + options: AiCallOptions, + operationId: Optional[str] = None + ) -> str: + """ + Handle text calls with document processing through ExtractionService. + UNIFIED PROCESSING: Always use per-chunk processing for consistency. + """ + await self._ensureAiObjectsInitialized() + return await self.extractionService.processDocumentsPerChunk(documents, prompt, self.aiObjects, options, operationId) + diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py deleted file mode 100644 index 2903c066..00000000 --- a/modules/services/serviceAi/subDocumentProcessing.py +++ /dev/null @@ -1,463 +0,0 @@ -import json -import logging -import re -import time -from typing import Dict, Any, List, Optional -from modules.datamodels.datamodelChat import ChatDocument -from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum -from modules.datamodels.datamodelExtraction import ContentExtracted, PartResult, ExtractionOptions, MergeStrategy -# Resolve forward refs for ExtractionOptions (OperationTypeEnum) at runtime without using unsupported args -try: - # Import here to avoid circular import at module load time - from modules.datamodels.datamodelAi import OperationTypeEnum - # Provide parent namespace so Pydantic can resolve forward refs - ExtractionOptions.__pydantic_parent_namespace__ = {"OperationTypeEnum": OperationTypeEnum} - ExtractionOptions.model_rebuild() -except Exception as _e: - logging.getLogger(__name__).warning(f"ExtractionOptions forward-ref rebuild skipped: {_e}") -from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService - -logger = logging.getLogger(__name__) - - -class SubDocumentProcessing: - """Document processing operations including chunking, processing, and merging.""" - - def __init__(self, services, aiObjects): - """Initialize document processing service. - - Args: - services: Service center instance for accessing other services - aiObjects: Initialized AiObjects instance - """ - self.services = services - self.aiObjects = aiObjects - self._extractionService = None - - @property - def extractionService(self): - """Lazy initialization of extraction service.""" - if self._extractionService is None: - logger.info("Lazy initializing ExtractionService...") - self._extractionService = ExtractionService(self.services) - return self._extractionService - - - async def processDocumentsPerChunk( - self, - documents: List[ChatDocument], - prompt: str, - options: Optional[AiCallOptions] = None, - operationId: Optional[str] = None - ) -> str: - """ - Process documents with model-aware chunking and merge results. - NEW: Uses model-aware chunking in AI call phase instead of extraction phase. - - Args: - documents: List of ChatDocument objects to process - prompt: AI prompt for processing - options: AI call options - operationId: Optional operation ID for progress tracking - - Returns: - Merged AI results as string with preserved document structure - """ - if not documents: - return "" - - # Create operationId if not provided - if not operationId: - import time - workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}" - operationId = f"ai_text_extract_{workflowId}_{int(time.time())}" - self.services.workflow.progressLogStart( - operationId, - "AI Text Extract", - "Document Processing", - f"Processing {len(documents)} documents" - ) - - try: - # Build extraction options using Pydantic model - mergeStrategy = MergeStrategy( - useIntelligentMerging=True, - prompt=prompt, - groupBy="typeGroup", - orderBy="id", - mergeType="concatenate" - ) - - extractionOptions = ExtractionOptions( - prompt=prompt, - operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT, - processDocumentsIndividually=True, - mergeStrategy=mergeStrategy - ) - - logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}") - - # Extract content WITHOUT chunking - if operationId: - self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents") - extractionResult = self.extractionService.extractContent(documents, extractionOptions) - - if not isinstance(extractionResult, list): - if operationId: - self.services.workflow.progressLogFinish(operationId, False) - return "[Error: No extraction results]" - - # Process parts (not chunks) with model-aware AI calls - if operationId: - self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts") - partResults = await self._processPartsWithMapping(extractionResult, prompt, options, operationId) - - # Merge results using existing merging system - if operationId: - self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results") - mergedContent = self._mergePartResults(partResults, options) - - # Save merged extraction content to debug - self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text") - - if operationId: - self.services.workflow.progressLogFinish(operationId, True) - - return mergedContent - except Exception as e: - logger.error(f"Error in processDocumentsPerChunk: {str(e)}") - if operationId: - self.services.workflow.progressLogFinish(operationId, False) - raise - - async def callAiText( - self, - prompt: str, - documents: Optional[List[ChatDocument]], - options: AiCallOptions, - operationId: Optional[str] = None - ) -> str: - """ - Handle text calls with document processing through ExtractionService. - UNIFIED PROCESSING: Always use per-chunk processing for consistency. - """ - return await self.processDocumentsPerChunk(documents, prompt, options, operationId) - - async def _processPartsWithMapping( - self, - extractionResult: List[ContentExtracted], - prompt: str, - options: Optional[AiCallOptions] = None, - operationId: Optional[str] = None - ) -> List['PartResult']: - """Process content parts with model-aware chunking and proper mapping.""" - from modules.datamodels.datamodelExtraction import PartResult - import asyncio - - # Collect all parts that need processing - partsToProcess = [] - partIndex = 0 - - for ec in extractionResult: - for part in ec.parts: - if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"): - # Skip empty container parts - if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0): - logger.debug(f"Skipping empty container part: mimeType={part.mimeType}") - continue - - partsToProcess.append({ - 'part': part, - 'part_index': partIndex, - 'document_id': ec.id - }) - partIndex += 1 - - logger.info(f"Processing {len(partsToProcess)} parts with model-aware chunking") - - totalParts = len(partsToProcess) - - # Process parts in parallel - processedCount = [0] # Use list to allow modification in nested function - - async def processSinglePart(partInfo: Dict) -> PartResult: - part = partInfo['part'] - part_index = partInfo['part_index'] - documentId = partInfo['document_id'] - - start_time = time.time() - - try: - # Create AI call request with content part - from modules.datamodels.datamodelAi import AiCallRequest - request = AiCallRequest( - prompt=prompt, - context="", # Context is in the content part - options=options, - contentParts=[part] # Pass as list for unified processing - ) - - # Update progress before AI call - if operationId and totalParts > 0: - processedCount[0] += 1 - progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9 - self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}") - - # Call AI with model-aware chunking - response = await self.aiObjects.call(request) - - processing_time = time.time() - start_time - - return PartResult( - originalPart=part, - aiResult=response.content, - partIndex=part_index, - documentId=documentId, - processingTime=processing_time, - metadata={ - "success": True, - "partSize": len(part.data) if part.data else 0, - "resultSize": len(response.content), - "typeGroup": part.typeGroup, - "modelName": response.modelName, - "priceUsd": response.priceUsd - } - ) - - except Exception as e: - processing_time = time.time() - start_time - logger.warning(f"Error processing part {part_index}: {str(e)}") - - return PartResult( - originalPart=part, - aiResult=f"[Error processing part: {str(e)}]", - partIndex=part_index, - documentId=documentId, - processingTime=processing_time, - metadata={ - "success": False, - "error": str(e), - "partSize": len(part.data) if part.data else 0, - "typeGroup": part.typeGroup - } - ) - - # Process parts with concurrency control - maxConcurrent = 5 - if options and hasattr(options, 'maxConcurrentParts'): - maxConcurrent = options.maxConcurrentParts - - semaphore = asyncio.Semaphore(maxConcurrent) - - async def processWithSemaphore(partInfo): - async with semaphore: - return await processSinglePart(partInfo) - - tasks = [processWithSemaphore(part_info) for part_info in partsToProcess] - partResults = await asyncio.gather(*tasks, return_exceptions=True) - - # Handle exceptions - processedResults = [] - for i, result in enumerate(partResults): - if isinstance(result, Exception): - part_info = partsToProcess[i] - processedResults.append(PartResult( - originalPart=part_info['part'], - aiResult=f"[Error in parallel processing: {str(result)}]", - partIndex=part_info['part_index'], - documentId=part_info['document_id'], - processingTime=0.0, - metadata={"success": False, "error": str(result)} - )) - elif result is not None: - processedResults.append(result) - - logger.info(f"Completed processing {len(processedResults)} parts") - return processedResults - - def _mergePartResults( - self, - partResults: List['PartResult'], - options: Optional[AiCallOptions] = None - ) -> str: - """Merge part results using existing sophisticated merging system.""" - if not partResults: - return "" - - # Convert PartResults back to ContentParts for existing merger system - from modules.datamodels.datamodelExtraction import ContentPart - content_parts = [] - for part_result in partResults: - # Create ContentPart from PartResult with proper typeGroup - content_part = ContentPart( - id=part_result.originalPart.id, - parentId=part_result.originalPart.parentId, - label=part_result.originalPart.label, - typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup - mimeType=part_result.originalPart.mimeType, - data=part_result.aiResult, # Use AI result as data - metadata={ - **part_result.originalPart.metadata, - "aiResult": True, - "partIndex": part_result.partIndex, - "documentId": part_result.documentId, - "processingTime": part_result.processingTime, - "success": part_result.metadata.get("success", False) - } - ) - content_parts.append(content_part) - - # Use existing merging strategy from options - merge_strategy = MergeStrategy( - useIntelligentMerging=True, - groupBy="documentId", # Group by document - orderBy="partIndex", # Order by part index - mergeType="concatenate" - ) - - - # Apply existing merging logic using the sophisticated merging system - from modules.services.serviceExtraction.subPipeline import _applyMerging - merged_parts = _applyMerging(content_parts, merge_strategy) - - # Convert merged parts back to final string - final_content = "\n\n".join([part.data for part in merged_parts]) - - logger.info(f"Merged {len(partResults)} parts using existing sophisticated merging system") - return final_content.strip() - - def _convertPartResultsToJson( - self, - partResults: List['PartResult'], - options: Optional[AiCallOptions] = None - ) -> Dict[str, Any]: - """Convert part results to JSON format using existing sophisticated merging system.""" - if not partResults: - return {"metadata": {"title": "Empty Document"}, "sections": []} - - # Convert PartResults back to ContentParts for existing merger system - from modules.datamodels.datamodelExtraction import ContentPart - content_parts = [] - for part_result in partResults: - # Create ContentPart from PartResult with proper typeGroup - content_part = ContentPart( - id=part_result.originalPart.id, - parentId=part_result.originalPart.parentId, - label=part_result.originalPart.label, - typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup - mimeType=part_result.originalPart.mimeType, - data=part_result.aiResult, # Use AI result as data - metadata={ - **part_result.originalPart.metadata, - "aiResult": True, - "partIndex": part_result.partIndex, - "documentId": part_result.documentId, - "processingTime": part_result.processingTime, - "success": part_result.metadata.get("success", False) - } - ) - content_parts.append(content_part) - - # Use existing merging strategy for JSON mode - merge_strategy = MergeStrategy( - useIntelligentMerging=True, - groupBy="documentId", # Group by document - orderBy="partIndex", # Order by part index - mergeType="concatenate" - ) - - - # Apply existing merging logic using the sophisticated merging system - from modules.services.serviceExtraction.subPipeline import _applyMerging - merged_parts = _applyMerging(content_parts, merge_strategy) - - # Convert merged parts to JSON format - all_sections = [] - document_titles = [] - - for part in merged_parts: - if part.metadata.get("success", False): - try: - # Parse JSON from AI result - part_json = json.loads(part.data) - - # Check if this is a multi-file response (has "documents" key) - if isinstance(part_json, dict) and "documents" in part_json: - # This is a multi-file response - merge all documents - logger.debug(f"Processing multi-file response from part {part.id} with {len(part_json['documents'])} documents") - - # Return multi-file response directly - return { - "metadata": part_json.get("metadata", {"title": "Merged Document"}), - "documents": part_json["documents"] - } - - # Extract sections from single-file response - elif isinstance(part_json, dict) and "sections" in part_json: - for section in part_json["sections"]: - # Add part context to section - section["metadata"] = section.get("metadata", {}) - section["metadata"]["source_part"] = part.id - section["metadata"]["source_document"] = part.metadata.get("documentId", "unknown") - section["metadata"]["part_index"] = part.metadata.get("partIndex", 0) - all_sections.append(section) - - # Extract document title - if isinstance(part_json, dict) and "metadata" in part_json: - title = part_json["metadata"].get("title", "") - if title and title not in document_titles: - document_titles.append(title) - - except json.JSONDecodeError as e: - logger.warning(f"Failed to parse JSON from part {part.id}: {str(e)}") - # Create a fallback section for invalid JSON - fallback_section = { - "id": f"error_section_{part.id}", - "title": "Error Section", - "content_type": "paragraph", - "elements": [{ - "text": f"Error parsing part {part.id}: {str(e)}" - }], - "order": part.metadata.get("partIndex", 0), - "metadata": { - "source_document": part.metadata.get("documentId", "unknown"), - "part_id": part.id, - "error": str(e) - } - } - all_sections.append(fallback_section) - else: - # Handle error parts - error_section = { - "id": f"error_section_{part.id}", - "title": "Error Section", - "content_type": "paragraph", - "elements": [{ - "text": f"Error in part {part.id}: {part.metadata.get('error', 'Unknown error')}" - }], - "order": part.metadata.get("partIndex", 0), - "metadata": { - "source_document": part.metadata.get("documentId", "unknown"), - "part_id": part.id, - "error": part.metadata.get('error', 'Unknown error') - } - } - all_sections.append(error_section) - - # Sort sections by order - all_sections.sort(key=lambda x: x.get("order", 0)) - - # Create merged document with sections - merged_document = { - "metadata": { - "title": document_titles[0] if document_titles else "Merged Document", - "extraction_method": "model_aware_chunking_with_merging", - "version": "2.0" - }, - "sections": all_sections, - "summary": f"Merged document using sophisticated merging system", - "tags": ["merged", "ai_generated", "model_aware", "sophisticated_merging"] - } - - logger.info(f"Converted {len(partResults)} parts to JSON format using existing sophisticated merging system") - return merged_document diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py index 1e0c1d21..b61cd1cc 100644 --- a/modules/services/serviceExtraction/mainServiceExtraction.py +++ b/modules/services/serviceExtraction/mainServiceExtraction.py @@ -2,12 +2,13 @@ from typing import Any, Dict, List, Optional, Union import uuid import logging import time +import asyncio from .subRegistry import ExtractorRegistry, ChunkerRegistry -from .subPipeline import runExtraction -from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy, ExtractionOptions +from .subPipeline import runExtraction, _applyMerging +from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy, ExtractionOptions, PartResult from modules.datamodels.datamodelChat import ChatDocument -from modules.datamodels.datamodelAi import AiCallResponse +from modules.datamodels.datamodelAi import AiCallResponse, AiCallRequest, AiCallOptions, OperationTypeEnum from modules.aicore.aicoreModelRegistry import modelRegistry @@ -389,4 +390,271 @@ class ExtractionService: # This could be enhanced with structure-aware merging return self._mergeConcatenate(structureParts, aiResultParts, strategy) + async def processDocumentsPerChunk( + self, + documents: List[ChatDocument], + prompt: str, + aiObjects: Any, + options: Optional[AiCallOptions] = None, + operationId: Optional[str] = None + ) -> str: + """ + Process documents with model-aware chunking and merge results. + NEW: Uses model-aware chunking in AI call phase instead of extraction phase. + + Args: + documents: List of ChatDocument objects to process + prompt: AI prompt for processing + aiObjects: AiObjects instance for making AI calls + options: AI call options + operationId: Optional operation ID for progress tracking + + Returns: + Merged AI results as string with preserved document structure + """ + if not documents: + return "" + + # Create operationId if not provided + if not operationId: + workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}" + operationId = f"ai_text_extract_{workflowId}_{int(time.time())}" + self.services.workflow.progressLogStart( + operationId, + "AI Text Extract", + "Document Processing", + f"Processing {len(documents)} documents" + ) + + try: + # Build extraction options using Pydantic model + mergeStrategy = MergeStrategy( + useIntelligentMerging=True, + prompt=prompt, + groupBy="typeGroup", + orderBy="id", + mergeType="concatenate" + ) + + extractionOptions = ExtractionOptions( + prompt=prompt, + operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT, + processDocumentsIndividually=True, + mergeStrategy=mergeStrategy + ) + + logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}") + + # Extract content WITHOUT chunking + if operationId: + self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents") + extractionResult = self.extractContent(documents, extractionOptions) + + if not isinstance(extractionResult, list): + if operationId: + self.services.workflow.progressLogFinish(operationId, False) + return "[Error: No extraction results]" + + # Process parts (not chunks) with model-aware AI calls + if operationId: + self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts") + partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId) + + # Merge results using existing merging system + if operationId: + self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results") + mergedContent = self._mergePartResults(partResults, options) + + # Save merged extraction content to debug + self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text") + + if operationId: + self.services.workflow.progressLogFinish(operationId, True) + + return mergedContent + except Exception as e: + logger.error(f"Error in processDocumentsPerChunk: {str(e)}") + if operationId: + self.services.workflow.progressLogFinish(operationId, False) + raise + + async def _processPartsWithMapping( + self, + extractionResult: List[ContentExtracted], + prompt: str, + aiObjects: Any, + options: Optional[AiCallOptions] = None, + operationId: Optional[str] = None + ) -> List[PartResult]: + """Process content parts with model-aware chunking and proper mapping.""" + + # Collect all parts that need processing + partsToProcess = [] + partIndex = 0 + + for ec in extractionResult: + for part in ec.parts: + if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"): + # Skip empty container parts + if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0): + logger.debug(f"Skipping empty container part: mimeType={part.mimeType}") + continue + + partsToProcess.append({ + 'part': part, + 'part_index': partIndex, + 'document_id': ec.id + }) + partIndex += 1 + + logger.info(f"Processing {len(partsToProcess)} parts with model-aware chunking") + + totalParts = len(partsToProcess) + + # Process parts in parallel + processedCount = [0] # Use list to allow modification in nested function + + async def processSinglePart(partInfo: Dict) -> PartResult: + part = partInfo['part'] + part_index = partInfo['part_index'] + documentId = partInfo['document_id'] + + start_time = time.time() + + try: + # Create AI call request with content part + request = AiCallRequest( + prompt=prompt, + context="", # Context is in the content part + options=options, + contentParts=[part] # Pass as list for unified processing + ) + + # Update progress before AI call + if operationId and totalParts > 0: + processedCount[0] += 1 + progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9 + self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}") + + # Call AI with model-aware chunking + response = await aiObjects.call(request) + + processing_time = time.time() - start_time + + return PartResult( + originalPart=part, + aiResult=response.content, + partIndex=part_index, + documentId=documentId, + processingTime=processing_time, + metadata={ + "success": True, + "partSize": len(part.data) if part.data else 0, + "resultSize": len(response.content), + "typeGroup": part.typeGroup, + "modelName": response.modelName, + "priceUsd": response.priceUsd + } + ) + + except Exception as e: + processing_time = time.time() - start_time + logger.warning(f"Error processing part {part_index}: {str(e)}") + + return PartResult( + originalPart=part, + aiResult=f"[Error processing part: {str(e)}]", + partIndex=part_index, + documentId=documentId, + processingTime=processing_time, + metadata={ + "success": False, + "error": str(e), + "partSize": len(part.data) if part.data else 0, + "typeGroup": part.typeGroup + } + ) + + # Process parts with concurrency control + maxConcurrent = 5 + if options and hasattr(options, 'maxConcurrentParts'): + maxConcurrent = options.maxConcurrentParts + + semaphore = asyncio.Semaphore(maxConcurrent) + + async def processWithSemaphore(partInfo): + async with semaphore: + return await processSinglePart(partInfo) + + tasks = [processWithSemaphore(part_info) for part_info in partsToProcess] + partResults = await asyncio.gather(*tasks, return_exceptions=True) + + # Handle exceptions + processedResults = [] + for i, result in enumerate(partResults): + if isinstance(result, Exception): + part_info = partsToProcess[i] + processedResults.append(PartResult( + originalPart=part_info['part'], + aiResult=f"[Error in parallel processing: {str(result)}]", + partIndex=part_info['part_index'], + documentId=part_info['document_id'], + processingTime=0.0, + metadata={"success": False, "error": str(result)} + )) + elif result is not None: + processedResults.append(result) + + logger.info(f"Completed processing {len(processedResults)} parts") + return processedResults + + def _mergePartResults( + self, + partResults: List[PartResult], + options: Optional[AiCallOptions] = None + ) -> str: + """Merge part results using existing sophisticated merging system.""" + if not partResults: + return "" + + # Convert PartResults back to ContentParts for existing merger system + content_parts = [] + for part_result in partResults: + # Create ContentPart from PartResult with proper typeGroup + content_part = ContentPart( + id=part_result.originalPart.id, + parentId=part_result.originalPart.parentId, + label=part_result.originalPart.label, + typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup + mimeType=part_result.originalPart.mimeType, + data=part_result.aiResult, # Use AI result as data + metadata={ + **part_result.originalPart.metadata, + "aiResult": True, + "partIndex": part_result.partIndex, + "documentId": part_result.documentId, + "processingTime": part_result.processingTime, + "success": part_result.metadata.get("success", False) + } + ) + content_parts.append(content_part) + + # Use existing merging strategy from options + merge_strategy = MergeStrategy( + useIntelligentMerging=True, + groupBy="documentId", # Group by document + orderBy="partIndex", # Order by part index + mergeType="concatenate" + ) + + + # Apply existing merging logic using the sophisticated merging system + merged_parts = _applyMerging(content_parts, merge_strategy) + + # Convert merged parts back to final string + final_content = "\n\n".join([part.data for part in merged_parts]) + + logger.info(f"Merged {len(partResults)} parts using existing sophisticated merging system") + return final_content.strip() + diff --git a/modules/services/serviceUtils/mainServiceUtils.py b/modules/services/serviceUtils/mainServiceUtils.py index c8a78fea..86a3cdd7 100644 --- a/modules/services/serviceUtils/mainServiceUtils.py +++ b/modules/services/serviceUtils/mainServiceUtils.py @@ -36,14 +36,14 @@ class UtilsService: misfire_grace_time: Grace time for misfired jobs in seconds """ try: - eventManager.register_cron( - job_id=job_id, + eventManager.registerCron( + jobId=job_id, func=func, - cron_kwargs=cron_kwargs, - replace_existing=replace_existing, + cronKwargs=cron_kwargs, + replaceExisting=replace_existing, coalesce=coalesce, - max_instances=max_instances, - misfire_grace_time=misfire_grace_time + maxInstances=max_instances, + misfireGraceTime=misfire_grace_time ) logger.info(f"Registered cron job '{job_id}' with schedule: {cron_kwargs}") except Exception as e: @@ -68,16 +68,16 @@ class UtilsService: misfire_grace_time: Grace time for misfired jobs in seconds """ try: - eventManager.register_interval( - job_id=job_id, + eventManager.registerInterval( + jobId=job_id, func=func, seconds=seconds, minutes=minutes, hours=hours, - replace_existing=replace_existing, + replaceExisting=replace_existing, coalesce=coalesce, - max_instances=max_instances, - misfire_grace_time=misfire_grace_time + maxInstances=max_instances, + misfireGraceTime=misfire_grace_time ) logger.info(f"Registered interval job '{job_id}' (h={hours}, m={minutes}, s={seconds})") except Exception as e: diff --git a/modules/shared/eventManagement.py b/modules/shared/eventManagement.py index 0804c156..cf2a5b82 100644 --- a/modules/shared/eventManagement.py +++ b/modules/shared/eventManagement.py @@ -43,47 +43,47 @@ class EventManagement: except Exception as exc: logger.error(f"Error stopping scheduler: {exc}") - def register_cron( + def registerCron( self, - job_id: str, + jobId: str, func: Callable, *, - cron_kwargs: Optional[Dict[str, Any]] = None, - replace_existing: bool = True, + cronKwargs: Optional[Dict[str, Any]] = None, + replaceExisting: bool = True, coalesce: bool = True, - max_instances: int = 1, - misfire_grace_time: int = 1800, + maxInstances: int = 1, + misfireGraceTime: int = 1800, **kwargs: Any, ) -> None: """ Register a job using CronTrigger. Provide cron fields as keyword args, e.g.: - cron_kwargs={"minute": "0,20,40"} + cronKwargs={"minute": "0,20,40"} """ - trigger = CronTrigger(timezone=self._timezone, **(cron_kwargs or {})) + trigger = CronTrigger(timezone=self._timezone, **(cronKwargs or {})) self.scheduler.add_job( func, trigger, - id=job_id, - replace_existing=replace_existing, + id=jobId, + replace_existing=replaceExisting, coalesce=coalesce, - max_instances=max_instances, - misfire_grace_time=misfire_grace_time, + max_instances=maxInstances, + misfire_grace_time=misfireGraceTime, **kwargs, ) - logger.info(f"Registered cron job '{job_id}' with args {cron_kwargs}") + logger.info(f"Registered cron job '{jobId}' with args {cronKwargs}") - def register_interval( + def registerInterval( self, - job_id: str, + jobId: str, func: Callable, *, seconds: Optional[int] = None, minutes: Optional[int] = None, hours: Optional[int] = None, - replace_existing: bool = True, + replaceExisting: bool = True, coalesce: bool = True, - max_instances: int = 1, - misfire_grace_time: int = 1800, + maxInstances: int = 1, + misfireGraceTime: int = 1800, **kwargs: Any, ) -> None: """ @@ -95,23 +95,23 @@ class EventManagement: self.scheduler.add_job( func, trigger, - id=job_id, - replace_existing=replace_existing, + id=jobId, + replace_existing=replaceExisting, coalesce=coalesce, - max_instances=max_instances, - misfire_grace_time=misfire_grace_time, + max_instances=maxInstances, + misfire_grace_time=misfireGraceTime, **kwargs, ) logger.info( - f"Registered interval job '{job_id}' (h={hours}, m={minutes}, s={seconds})" + f"Registered interval job '{jobId}' (h={hours}, m={minutes}, s={seconds})" ) - def remove(self, job_id: str) -> None: + def remove(self, jobId: str) -> None: try: - self.scheduler.remove_job(job_id) - logger.info(f"Removed job '{job_id}'") + self.scheduler.remove_job(jobId) + logger.info(f"Removed job '{jobId}'") except Exception as exc: - logger.warning(f"Could not remove job '{job_id}': {exc}") + logger.warning(f"Could not remove job '{jobId}': {exc}") # Singleton instance for easy import and reuse