85 lines
3 KiB
Python
85 lines
3 KiB
Python
"""
|
|
Interface for Messaging Services
|
|
Provides a unified interface for sending messages across different channels (Email, SMS, etc.)
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
from modules.connectors.connectorMessagingEmail import ConnectorMessagingEmail
|
|
from modules.connectors.connectorMessagingSms import ConnectorMessagingSms
|
|
from modules.datamodels.datamodelMessaging import MessagingChannel
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton factory for Messaging instances
|
|
_instancesMessaging = {}
|
|
|
|
|
|
class MessagingInterface:
|
|
"""
|
|
Interface for Messaging Services.
|
|
Provides a unified interface for sending messages across different channels.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the Messaging Interface."""
|
|
self._emailConnector: Optional[ConnectorMessagingEmail] = None
|
|
self._smsConnector: Optional[ConnectorMessagingSms] = None
|
|
|
|
def send(self, channel: MessagingChannel, recipient: str, subject: str, message: str) -> bool:
|
|
"""
|
|
Send a message via the specified channel.
|
|
|
|
Args:
|
|
channel: MessagingChannel Enum (EMAIL, SMS, etc.)
|
|
recipient: Recipient address (email address, phone number, etc.)
|
|
subject: Message subject (for email, ignored for SMS)
|
|
message: Message content
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
if channel == MessagingChannel.EMAIL:
|
|
return self._sendEmail(recipient, subject, message)
|
|
elif channel == MessagingChannel.SMS:
|
|
return self._sendSms(recipient, message)
|
|
else:
|
|
logger.error(f"Unknown channel: {channel}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error sending message via {channel}: {e}")
|
|
return False
|
|
|
|
def _sendEmail(self, recipient: str, subject: str, message: str) -> bool:
|
|
"""Send email via Email connector."""
|
|
if self._emailConnector is None:
|
|
try:
|
|
self._emailConnector = ConnectorMessagingEmail()
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Email connector: {e}")
|
|
return False
|
|
|
|
return self._emailConnector.send(recipient, subject, message)
|
|
|
|
def _sendSms(self, recipient: str, message: str) -> bool:
|
|
"""Send SMS via SMS connector."""
|
|
if self._smsConnector is None:
|
|
try:
|
|
self._smsConnector = ConnectorMessagingSms()
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize SMS connector: {e}")
|
|
return False
|
|
|
|
return self._smsConnector.send(recipient, "", message) # SMS has no subject
|
|
|
|
|
|
def getInterface() -> MessagingInterface:
|
|
"""
|
|
Returns a MessagingInterface instance (singleton pattern).
|
|
"""
|
|
if "default" not in _instancesMessaging:
|
|
_instancesMessaging["default"] = MessagingInterface()
|
|
|
|
return _instancesMessaging["default"]
|
|
|