# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Utility service for common operations across the gateway. Provides centralized access to configuration, events, and other utilities. """ import logging from typing import Any, Optional, Dict, Callable, List from modules.shared.configuration import APP_CONFIG from modules.shared.eventManagement import eventManager from modules.shared.timeUtils import getUtcTimestamp from modules.shared import jsonUtils logger = logging.getLogger(__name__) class UtilsService: """Utility service providing common operations.""" def __init__(self, services): self.services = services # ===== Event handling ===== def eventRegisterCron(self, job_id: str, func: Callable, cron_kwargs: Dict[str, Any], replace_existing: bool = True, coalesce: bool = True, max_instances: int = 1, misfire_grace_time: int = 1800): """ Register a cron job with the event manager. Args: job_id: Unique identifier for the job func: Function to execute cron_kwargs: Cron schedule parameters replace_existing: Whether to replace existing job with same ID coalesce: Whether to coalesce multiple pending executions max_instances: Maximum number of concurrent instances misfire_grace_time: Grace time for misfired jobs in seconds """ try: eventManager.registerCron( jobId=job_id, func=func, cronKwargs=cron_kwargs, replaceExisting=replace_existing, coalesce=coalesce, maxInstances=max_instances, misfireGraceTime=misfire_grace_time ) logger.info(f"Registered cron job '{job_id}' with schedule: {cron_kwargs}") except Exception as e: logger.error(f"Error registering cron job '{job_id}': {str(e)}") def eventRegisterInterval(self, job_id: str, func: Callable, seconds: Optional[int] = None, minutes: Optional[int] = None, hours: Optional[int] = None, replace_existing: bool = True, coalesce: bool = True, max_instances: int = 1, misfire_grace_time: int = 1800): """ Register an interval job with the event manager. Args: job_id: Unique identifier for the job func: Function to execute seconds: Interval in seconds minutes: Interval in minutes hours: Interval in hours replace_existing: Whether to replace existing job with same ID coalesce: Whether to coalesce multiple pending executions max_instances: Maximum number of concurrent instances misfire_grace_time: Grace time for misfired jobs in seconds """ try: eventManager.registerInterval( jobId=job_id, func=func, seconds=seconds, minutes=minutes, hours=hours, replaceExisting=replace_existing, coalesce=coalesce, maxInstances=max_instances, misfireGraceTime=misfire_grace_time ) logger.info(f"Registered interval job '{job_id}' (h={hours}, m={minutes}, s={seconds})") except Exception as e: logger.error(f"Error registering interval job '{job_id}': {str(e)}") def eventRemove(self, job_id: str): """ Remove a scheduled job from the event manager. Args: job_id: ID of the job to remove """ try: eventManager.remove(job_id) logger.info(f"Removed job '{job_id}'") except Exception as e: logger.error(f"Error removing job '{job_id}': {str(e)}") def configGet(self, key: str, default: Any = None, user_id: str = "system") -> Any: """ Get a configuration value with optional default. Args: key: Configuration key to retrieve default: Default value if key not found user_id: User ID for audit logging (default: "system") Returns: Configuration value or default """ try: return APP_CONFIG.get(key, default, user_id) except Exception as e: logger.error(f"Error getting config '{key}': {str(e)}") return default def timestampGetUtc(self) -> float: """ Get current UTC timestamp. Returns: float: Current UTC timestamp in seconds """ try: return getUtcTimestamp() except Exception as e: logger.error(f"Error getting UTC timestamp: {str(e)}") return 0.0 # ===== Debug Tools ===== def writeDebugFile(self, content: str, fileType: str, documents: Optional[List] = None) -> None: """ Wrapper to write debug files via shared debugLogger. Mirrors writeDebugFile() in modules.shared.debugLogger and keeps a single call site. """ try: from modules.shared.debugLogger import writeDebugFile as _writeDebugFile _writeDebugFile(content, fileType, documents) except Exception: # Silent fail to never break main flow pass def debugLogToFile(self, message: str, context: str = "DEBUG"): """ Wrapper to log debug messages via shared debugLogger. Mirrors debugLogToFile() in modules.shared.debugLogger. """ try: from modules.shared.debugLogger import debugLogToFile as _debugLogToFile _debugLogToFile(message, context) except Exception: # Silent fail to never break main flow pass def storeDebugMessageAndDocuments(self, message, currentUser): """ Wrapper to store debug messages and documents via interfaceDbChatObjects. Mirrors storeDebugMessageAndDocuments() in modules.interfaces.interfaceDbChatObjects. """ try: from modules.interfaces.interfaceDbChatObjects import storeDebugMessageAndDocuments as _storeDebugMessageAndDocuments _storeDebugMessageAndDocuments(message, currentUser) except Exception: # Silent fail to never break main flow pass def writeDebugArtifact(self, fileName: str, obj: Any): """ Backward-compatible wrapper that now writes via writeDebugFile. Accepts an object (dict/list/str), serializes if needed, and writes to debug folder. """ try: # Serialize objects to JSON when applicable if isinstance(obj, (dict, list)): import json content = json.dumps(obj, ensure_ascii=False, indent=2) else: content = str(obj) # Delegate to shared writeDebugFile; preserve provided file extension in fileName from modules.shared.debugLogger import writeDebugFile as _writeDebugFile _writeDebugFile(content, fileName) except Exception: # Silent fail to never break main flow pass # ===== Prompt sanitization ===== def sanitizePromptContent(self, content: str, contentType: str = "text") -> str: """ Centralized prompt content sanitization to prevent injection attacks and ensure safe presentation. This is the single source of truth for all prompt sanitization across the system. Replaces all scattered sanitization functions with a unified approach. Args: content: The content to sanitize contentType: Type of content ("text", "userinput", "json", "document") Returns: Safely sanitized content ready for AI prompt insertion """ if not content: return "" try: import re # Convert to string if not already content_str = str(content) # Remove null bytes and control characters (except newlines and tabs) sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content_str) # Handle different content types with appropriate sanitization if contentType == "userinput": # Extra security for user-controlled content # Escape curly braces to prevent placeholder injection sanitized = sanitized.replace('{', '{{').replace('}', '}}') # Escape quotes and wrap in single quotes sanitized = sanitized.replace('"', '\\"').replace("'", "\\'") return f"'{sanitized}'" elif contentType == "json": # For JSON content, escape quotes and backslashes sanitized = sanitized.replace('\\', '\\\\') sanitized = sanitized.replace('"', '\\"') sanitized = sanitized.replace('\n', '\\n') sanitized = sanitized.replace('\r', '\\r') sanitized = sanitized.replace('\t', '\\t') elif contentType == "document": # For document content, escape special characters sanitized = sanitized.replace('\\', '\\\\') sanitized = sanitized.replace('"', '\\"') sanitized = sanitized.replace("'", "\\'") sanitized = sanitized.replace('\n', '\\n') sanitized = sanitized.replace('\r', '\\r') sanitized = sanitized.replace('\t', '\\t') else: # contentType == "text" or default # Basic text sanitization sanitized = sanitized.replace('\\', '\\\\') sanitized = sanitized.replace('"', '\\"') sanitized = sanitized.replace("'", "\\'") sanitized = sanitized.replace('\n', '\\n') sanitized = sanitized.replace('\r', '\\r') sanitized = sanitized.replace('\t', '\\t') return sanitized except Exception as e: logger.error(f"Error sanitizing prompt content: {str(e)}") # Return a safe fallback return "[ERROR: Content could not be safely sanitized]" # ===== JSON utility wrappers ===== def jsonStripCodeFences(self, text: str) -> str: return jsonUtils.stripCodeFences(text) def jsonExtractFirstBalanced(self, text: str) -> str: return jsonUtils.extractFirstBalancedJson(text) def jsonNormalizeText(self, text: str) -> str: return jsonUtils.normalizeJsonText(text) def jsonExtractString(self, text: str) -> str: return jsonUtils.extractJsonString(text) def jsonTryParse(self, text) -> tuple: return jsonUtils.tryParseJson(text) # ===== Enum utility functions ===== def mapToEnum(self, enum_class, value_str, default_value): """ Generic function to map string value to enum, using the enum value as key. Args: enum_class: The enum class to map to value_str: String value to map default_value: Default enum value if no match found Returns: Matching enum value or default value """ if not value_str: return default_value # Try to find enum by its value (case-insensitive) for enum_item in enum_class: if enum_item.value.lower() == value_str.lower(): return enum_item # Fallback to default return default_value