297 lines
No EOL
12 KiB
Python
297 lines
No EOL
12 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Utility service for common operations across the gateway.
|
|
Provides centralized access to configuration, events, and other utilities.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any, Optional, Dict, Callable, List
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.shared.eventManagement import eventManager
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
from modules.shared import jsonUtils
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class UtilsService:
|
|
"""Utility service providing common operations."""
|
|
|
|
def __init__(self, services):
|
|
self.services = services
|
|
|
|
# ===== Event handling =====
|
|
|
|
def eventRegisterCron(self, job_id: str, func: Callable, cron_kwargs: Dict[str, Any],
|
|
replace_existing: bool = True, coalesce: bool = True,
|
|
max_instances: int = 1, misfire_grace_time: int = 1800):
|
|
"""
|
|
Register a cron job with the event manager.
|
|
|
|
Args:
|
|
job_id: Unique identifier for the job
|
|
func: Function to execute
|
|
cron_kwargs: Cron schedule parameters
|
|
replace_existing: Whether to replace existing job with same ID
|
|
coalesce: Whether to coalesce multiple pending executions
|
|
max_instances: Maximum number of concurrent instances
|
|
misfire_grace_time: Grace time for misfired jobs in seconds
|
|
"""
|
|
try:
|
|
eventManager.registerCron(
|
|
jobId=job_id,
|
|
func=func,
|
|
cronKwargs=cron_kwargs,
|
|
replaceExisting=replace_existing,
|
|
coalesce=coalesce,
|
|
maxInstances=max_instances,
|
|
misfireGraceTime=misfire_grace_time
|
|
)
|
|
logger.info(f"Registered cron job '{job_id}' with schedule: {cron_kwargs}")
|
|
except Exception as e:
|
|
logger.error(f"Error registering cron job '{job_id}': {str(e)}")
|
|
|
|
def eventRegisterInterval(self, job_id: str, func: Callable, seconds: Optional[int] = None,
|
|
minutes: Optional[int] = None, hours: Optional[int] = None,
|
|
replace_existing: bool = True, coalesce: bool = True,
|
|
max_instances: int = 1, misfire_grace_time: int = 1800):
|
|
"""
|
|
Register an interval job with the event manager.
|
|
|
|
Args:
|
|
job_id: Unique identifier for the job
|
|
func: Function to execute
|
|
seconds: Interval in seconds
|
|
minutes: Interval in minutes
|
|
hours: Interval in hours
|
|
replace_existing: Whether to replace existing job with same ID
|
|
coalesce: Whether to coalesce multiple pending executions
|
|
max_instances: Maximum number of concurrent instances
|
|
misfire_grace_time: Grace time for misfired jobs in seconds
|
|
"""
|
|
try:
|
|
eventManager.registerInterval(
|
|
jobId=job_id,
|
|
func=func,
|
|
seconds=seconds,
|
|
minutes=minutes,
|
|
hours=hours,
|
|
replaceExisting=replace_existing,
|
|
coalesce=coalesce,
|
|
maxInstances=max_instances,
|
|
misfireGraceTime=misfire_grace_time
|
|
)
|
|
logger.info(f"Registered interval job '{job_id}' (h={hours}, m={minutes}, s={seconds})")
|
|
except Exception as e:
|
|
logger.error(f"Error registering interval job '{job_id}': {str(e)}")
|
|
|
|
def eventRemove(self, job_id: str):
|
|
"""
|
|
Remove a scheduled job from the event manager.
|
|
|
|
Args:
|
|
job_id: ID of the job to remove
|
|
"""
|
|
try:
|
|
eventManager.remove(job_id)
|
|
logger.info(f"Removed job '{job_id}'")
|
|
except Exception as e:
|
|
logger.error(f"Error removing job '{job_id}': {str(e)}")
|
|
|
|
def configGet(self, key: str, default: Any = None, user_id: str = "system") -> Any:
|
|
"""
|
|
Get a configuration value with optional default.
|
|
|
|
Args:
|
|
key: Configuration key to retrieve
|
|
default: Default value if key not found
|
|
user_id: User ID for audit logging (default: "system")
|
|
|
|
Returns:
|
|
Configuration value or default
|
|
"""
|
|
try:
|
|
return APP_CONFIG.get(key, default, user_id)
|
|
except Exception as e:
|
|
logger.error(f"Error getting config '{key}': {str(e)}")
|
|
return default
|
|
|
|
def timestampGetUtc(self) -> float:
|
|
"""
|
|
Get current UTC timestamp.
|
|
|
|
Returns:
|
|
float: Current UTC timestamp in seconds
|
|
"""
|
|
try:
|
|
return getUtcTimestamp()
|
|
except Exception as e:
|
|
logger.error(f"Error getting UTC timestamp: {str(e)}")
|
|
return 0.0
|
|
|
|
# ===== Debug Tools =====
|
|
|
|
def writeDebugFile(self, content: str, fileType: str, documents: Optional[List] = None) -> None:
|
|
"""
|
|
Wrapper to write debug files via shared debugLogger.
|
|
Mirrors writeDebugFile() in modules.shared.debugLogger and keeps a single call site.
|
|
"""
|
|
try:
|
|
from modules.shared.debugLogger import writeDebugFile as _writeDebugFile
|
|
_writeDebugFile(content, fileType, documents)
|
|
except Exception:
|
|
# Silent fail to never break main flow
|
|
pass
|
|
|
|
def debugLogToFile(self, message: str, context: str = "DEBUG"):
|
|
"""
|
|
Wrapper to log debug messages via shared debugLogger.
|
|
Mirrors debugLogToFile() in modules.shared.debugLogger.
|
|
"""
|
|
try:
|
|
from modules.shared.debugLogger import debugLogToFile as _debugLogToFile
|
|
_debugLogToFile(message, context)
|
|
except Exception:
|
|
# Silent fail to never break main flow
|
|
pass
|
|
|
|
def storeDebugMessageAndDocuments(self, message, currentUser):
|
|
"""
|
|
Wrapper to store debug messages and documents via interfaceDbChat.
|
|
Mirrors storeDebugMessageAndDocuments() in modules.interfaces.interfaceDbChat.
|
|
"""
|
|
try:
|
|
from modules.interfaces.interfaceDbChat import storeDebugMessageAndDocuments as _storeDebugMessageAndDocuments
|
|
_storeDebugMessageAndDocuments(message, currentUser)
|
|
except Exception:
|
|
# Silent fail to never break main flow
|
|
pass
|
|
|
|
def writeDebugArtifact(self, fileName: str, obj: Any):
|
|
"""
|
|
Backward-compatible wrapper that now writes via writeDebugFile.
|
|
Accepts an object (dict/list/str), serializes if needed, and writes to debug folder.
|
|
"""
|
|
try:
|
|
# Serialize objects to JSON when applicable
|
|
if isinstance(obj, (dict, list)):
|
|
import json
|
|
content = json.dumps(obj, ensure_ascii=False, indent=2)
|
|
else:
|
|
content = str(obj)
|
|
|
|
# Delegate to shared writeDebugFile; preserve provided file extension in fileName
|
|
from modules.shared.debugLogger import writeDebugFile as _writeDebugFile
|
|
_writeDebugFile(content, fileName)
|
|
except Exception:
|
|
# Silent fail to never break main flow
|
|
pass
|
|
|
|
# ===== Prompt sanitization =====
|
|
|
|
def sanitizePromptContent(self, content: str, contentType: str = "text") -> str:
|
|
"""
|
|
Centralized prompt content sanitization to prevent injection attacks and ensure safe presentation.
|
|
|
|
This is the single source of truth for all prompt sanitization across the system.
|
|
Replaces all scattered sanitization functions with a unified approach.
|
|
|
|
Args:
|
|
content: The content to sanitize
|
|
contentType: Type of content ("text", "userinput", "json", "document")
|
|
|
|
Returns:
|
|
Safely sanitized content ready for AI prompt insertion
|
|
"""
|
|
if not content:
|
|
return ""
|
|
|
|
try:
|
|
import re
|
|
# Convert to string if not already
|
|
content_str = str(content)
|
|
|
|
# Remove null bytes and control characters (except newlines and tabs)
|
|
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content_str)
|
|
|
|
# Handle different content types with appropriate sanitization
|
|
if contentType == "userinput":
|
|
# Extra security for user-controlled content
|
|
# Escape curly braces to prevent placeholder injection
|
|
sanitized = sanitized.replace('{', '{{').replace('}', '}}')
|
|
# Escape quotes and wrap in single quotes
|
|
sanitized = sanitized.replace('"', '\\"').replace("'", "\\'")
|
|
return f"'{sanitized}'"
|
|
|
|
elif contentType == "json":
|
|
# For JSON content, escape quotes and backslashes
|
|
sanitized = sanitized.replace('\\', '\\\\')
|
|
sanitized = sanitized.replace('"', '\\"')
|
|
sanitized = sanitized.replace('\n', '\\n')
|
|
sanitized = sanitized.replace('\r', '\\r')
|
|
sanitized = sanitized.replace('\t', '\\t')
|
|
|
|
elif contentType == "document":
|
|
# For document content, escape special characters
|
|
sanitized = sanitized.replace('\\', '\\\\')
|
|
sanitized = sanitized.replace('"', '\\"')
|
|
sanitized = sanitized.replace("'", "\\'")
|
|
sanitized = sanitized.replace('\n', '\\n')
|
|
sanitized = sanitized.replace('\r', '\\r')
|
|
sanitized = sanitized.replace('\t', '\\t')
|
|
|
|
else: # contentType == "text" or default
|
|
# Basic text sanitization
|
|
sanitized = sanitized.replace('\\', '\\\\')
|
|
sanitized = sanitized.replace('"', '\\"')
|
|
sanitized = sanitized.replace("'", "\\'")
|
|
sanitized = sanitized.replace('\n', '\\n')
|
|
sanitized = sanitized.replace('\r', '\\r')
|
|
sanitized = sanitized.replace('\t', '\\t')
|
|
|
|
return sanitized
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sanitizing prompt content: {str(e)}")
|
|
# Return a safe fallback
|
|
return "[ERROR: Content could not be safely sanitized]"
|
|
|
|
# ===== JSON utility wrappers =====
|
|
|
|
def jsonStripCodeFences(self, text: str) -> str:
|
|
return jsonUtils.stripCodeFences(text)
|
|
|
|
def jsonExtractFirstBalanced(self, text: str) -> str:
|
|
return jsonUtils.extractFirstBalancedJson(text)
|
|
|
|
def jsonNormalizeText(self, text: str) -> str:
|
|
return jsonUtils.normalizeJsonText(text)
|
|
|
|
def jsonExtractString(self, text: str) -> str:
|
|
return jsonUtils.extractJsonString(text)
|
|
|
|
def jsonTryParse(self, text) -> tuple:
|
|
return jsonUtils.tryParseJson(text)
|
|
|
|
# ===== Enum utility functions =====
|
|
|
|
def mapToEnum(self, enum_class, value_str, default_value):
|
|
"""
|
|
Generic function to map string value to enum, using the enum value as key.
|
|
|
|
Args:
|
|
enum_class: The enum class to map to
|
|
value_str: String value to map
|
|
default_value: Default enum value if no match found
|
|
|
|
Returns:
|
|
Matching enum value or default value
|
|
"""
|
|
if not value_str:
|
|
return default_value
|
|
# Try to find enum by its value (case-insensitive)
|
|
for enum_item in enum_class:
|
|
if enum_item.value.lower() == value_str.lower():
|
|
return enum_item
|
|
# Fallback to default
|
|
return default_value |