gateway/modules/services/__init__.py
2025-12-15 21:55:26 +01:00

99 lines
3.9 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from typing import Any
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelChat import ChatWorkflow
class PublicService:
"""Lightweight proxy exposing only public callable attributes of a target.
- Hides names starting with '_'
- Optionally restricts to callables only
- Optional name_filter predicate for allow-list patterns
"""
def __init__(self, target: Any, functionsOnly: bool = True, nameFilter=None):
self._target = target
self._functionsOnly = functionsOnly
self._nameFilter = nameFilter
def __getattr__(self, name: str):
if name.startswith('_'):
raise AttributeError(f"'{type(self._target).__name__}' attribute '{name}' is private")
if self._nameFilter and not self._nameFilter(name):
raise AttributeError(f"'{name}' not exposed by policy")
attr = getattr(self._target, name)
if self._functionsOnly and not callable(attr):
raise AttributeError(f"'{name}' is not a function")
return attr
def __dir__(self):
names = [
n for n in dir(self._target)
if not n.startswith('_')
and (not self._functionsOnly or callable(getattr(self._target, n, None)))
and (self._nameFilter(n) if self._nameFilter else True)
]
return sorted(names)
class Services:
def __init__(self, user: User, workflow: ChatWorkflow = None):
self.user: User = user
self.workflow: ChatWorkflow = workflow
self.currentUserPrompt: str = "" # Cleaned/normalized user intent for the current round
self.rawUserPrompt: str = "" # Original raw user message for the current round
# Initialize interfaces
from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
self.interfaceDbChat = getChatInterface(user)
from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface
self.interfaceDbApp = getAppInterface(user)
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
self.interfaceDbComponent = getComponentInterface(user)
# Initialize service packages
from .serviceExtraction.mainServiceExtraction import ExtractionService
self.extraction = PublicService(ExtractionService(self))
from .serviceGeneration.mainServiceGeneration import GenerationService
self.generation = PublicService(GenerationService(self))
from .serviceNeutralization.mainServiceNeutralization import NeutralizationService
self.neutralization = PublicService(NeutralizationService(self))
from .serviceSharepoint.mainServiceSharepoint import SharepointService
self.sharepoint = PublicService(SharepointService(self))
from .serviceAi.mainServiceAi import AiService
self.ai = PublicService(AiService(self), functionsOnly=False)
from .serviceTicket.mainServiceTicket import TicketService
self.ticket = PublicService(TicketService(self))
from .serviceChat.mainServiceChat import ChatService
self.chat = PublicService(ChatService(self))
from .serviceUtils.mainServiceUtils import UtilsService
self.utils = PublicService(UtilsService(self))
from .serviceWeb.mainServiceWeb import WebService
self.web = PublicService(WebService(self))
from .serviceSecurity.mainServiceSecurity import SecurityService
self.security = PublicService(SecurityService(self))
from .serviceMessaging.mainServiceMessaging import MessagingService
self.messaging = PublicService(MessagingService(self))
def getInterface(user: User, workflow: ChatWorkflow) -> Services:
return Services(user, workflow)