185 lines
7.6 KiB
Python
185 lines
7.6 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Utility service for common operations across the gateway.
|
|
Provides centralized access to configuration, events, and other utilities.
|
|
Core service - not requested by features directly.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any, Optional, Dict, Callable, List
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.shared.eventManagement import eventManager
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
from modules.shared import jsonUtils
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class UtilsService:
|
|
"""Utility service providing common operations."""
|
|
|
|
def __init__(self, context, get_service: Callable[[str], Any]):
|
|
"""Initialize with service center context and resolver."""
|
|
self._context = context
|
|
self._get_service = get_service
|
|
|
|
# ===== Event handling =====
|
|
|
|
def eventRegisterCron(self, job_id: str, func: Callable, cron_kwargs: Dict[str, Any],
|
|
replace_existing: bool = True, coalesce: bool = True,
|
|
max_instances: int = 1, misfire_grace_time: int = 1800):
|
|
"""Register a cron job with the event manager."""
|
|
try:
|
|
eventManager.registerCron(
|
|
jobId=job_id,
|
|
func=func,
|
|
cronKwargs=cron_kwargs,
|
|
replaceExisting=replace_existing,
|
|
coalesce=coalesce,
|
|
maxInstances=max_instances,
|
|
misfireGraceTime=misfire_grace_time
|
|
)
|
|
logger.info(f"Registered cron job '{job_id}' with schedule: {cron_kwargs}")
|
|
except Exception as e:
|
|
logger.error(f"Error registering cron job '{job_id}': {str(e)}")
|
|
|
|
def eventRegisterInterval(self, job_id: str, func: Callable, seconds: Optional[int] = None,
|
|
minutes: Optional[int] = None, hours: Optional[int] = None,
|
|
replace_existing: bool = True, coalesce: bool = True,
|
|
max_instances: int = 1, misfire_grace_time: int = 1800):
|
|
"""Register an interval job with the event manager."""
|
|
try:
|
|
eventManager.registerInterval(
|
|
jobId=job_id,
|
|
func=func,
|
|
seconds=seconds,
|
|
minutes=minutes,
|
|
hours=hours,
|
|
replaceExisting=replace_existing,
|
|
coalesce=coalesce,
|
|
maxInstances=max_instances,
|
|
misfireGraceTime=misfire_grace_time
|
|
)
|
|
logger.info(f"Registered interval job '{job_id}' (h={hours}, m={minutes}, s={seconds})")
|
|
except Exception as e:
|
|
logger.error(f"Error registering interval job '{job_id}': {str(e)}")
|
|
|
|
def eventRemove(self, job_id: str):
|
|
"""Remove a scheduled job from the event manager."""
|
|
try:
|
|
eventManager.remove(job_id)
|
|
logger.info(f"Removed job '{job_id}'")
|
|
except Exception as e:
|
|
logger.error(f"Error removing job '{job_id}': {str(e)}")
|
|
|
|
def configGet(self, key: str, default: Any = None, user_id: str = "system") -> Any:
|
|
"""Get a configuration value with optional default."""
|
|
try:
|
|
return APP_CONFIG.get(key, default, user_id)
|
|
except Exception as e:
|
|
logger.error(f"Error getting config '{key}': {str(e)}")
|
|
return default
|
|
|
|
def timestampGetUtc(self) -> float:
|
|
"""Get current UTC timestamp."""
|
|
try:
|
|
return getUtcTimestamp()
|
|
except Exception as e:
|
|
logger.error(f"Error getting UTC timestamp: {str(e)}")
|
|
return 0.0
|
|
|
|
# ===== Debug Tools =====
|
|
|
|
def writeDebugFile(self, content: str, fileType: str, documents: Optional[List] = None) -> None:
|
|
"""Wrapper to write debug files via shared debugLogger."""
|
|
try:
|
|
from modules.shared.debugLogger import writeDebugFile as _writeDebugFile
|
|
_writeDebugFile(content, fileType, documents)
|
|
except Exception:
|
|
pass
|
|
|
|
def debugLogToFile(self, message: str, context: str = "DEBUG"):
|
|
"""Wrapper to log debug messages via shared debugLogger."""
|
|
try:
|
|
from modules.shared.debugLogger import debugLogToFile as _debugLogToFile
|
|
_debugLogToFile(message, context)
|
|
except Exception:
|
|
pass
|
|
|
|
def storeDebugMessageAndDocuments(self, message, currentUser, mandateId=None, featureInstanceId=None):
|
|
"""Wrapper to store debug messages and documents via interfaceDbChat."""
|
|
try:
|
|
from modules.interfaces.interfaceDbChat import storeDebugMessageAndDocuments as _storeDebugMessageAndDocuments
|
|
_storeDebugMessageAndDocuments(message, currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
except Exception:
|
|
pass
|
|
|
|
def writeDebugArtifact(self, fileName: str, obj: Any):
|
|
"""Backward-compatible wrapper that now writes via writeDebugFile."""
|
|
try:
|
|
import json
|
|
if isinstance(obj, (dict, list)):
|
|
content = json.dumps(obj, ensure_ascii=False, indent=2)
|
|
else:
|
|
content = str(obj)
|
|
from modules.shared.debugLogger import writeDebugFile as _writeDebugFile
|
|
_writeDebugFile(content, fileName)
|
|
except Exception:
|
|
pass
|
|
|
|
# ===== Prompt sanitization =====
|
|
|
|
def sanitizePromptContent(self, content: str, contentType: str = "text") -> str:
|
|
"""Centralized prompt content sanitization."""
|
|
if not content:
|
|
return ""
|
|
try:
|
|
import re
|
|
content_str = str(content)
|
|
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content_str)
|
|
if contentType == "userinput":
|
|
sanitized = sanitized.replace('{', '{{').replace('}', '}}')
|
|
sanitized = sanitized.replace('"', '\\"').replace("'", "\\'")
|
|
return f"'{sanitized}'"
|
|
elif contentType == "json":
|
|
sanitized = sanitized.replace('\\', '\\\\').replace('"', '\\"')
|
|
sanitized = sanitized.replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
|
|
elif contentType == "document":
|
|
sanitized = sanitized.replace('\\', '\\\\').replace('"', '\\"').replace("'", "\\'")
|
|
sanitized = sanitized.replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
|
|
else:
|
|
sanitized = sanitized.replace('\\', '\\\\').replace('"', '\\"').replace("'", "\\'")
|
|
sanitized = sanitized.replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
|
|
return sanitized
|
|
except Exception as e:
|
|
logger.error(f"Error sanitizing prompt content: {str(e)}")
|
|
return "[ERROR: Content could not be safely sanitized]"
|
|
|
|
# ===== JSON utility wrappers =====
|
|
|
|
def jsonStripCodeFences(self, text: str) -> str:
|
|
return jsonUtils.stripCodeFences(text)
|
|
|
|
def jsonExtractFirstBalanced(self, text: str) -> str:
|
|
return jsonUtils.extractFirstBalancedJson(text)
|
|
|
|
def jsonNormalizeText(self, text: str) -> str:
|
|
return jsonUtils.normalizeJsonText(text)
|
|
|
|
def jsonExtractString(self, text: str) -> str:
|
|
return jsonUtils.extractJsonString(text)
|
|
|
|
def jsonTryParse(self, text) -> tuple:
|
|
return jsonUtils.tryParseJson(text)
|
|
|
|
# ===== Enum utility functions =====
|
|
|
|
def mapToEnum(self, enum_class, value_str, default_value):
|
|
"""Map string value to enum."""
|
|
if not value_str:
|
|
return default_value
|
|
for enum_item in enum_class:
|
|
if enum_item.value.lower() == value_str.lower():
|
|
return enum_item
|
|
return default_value
|