# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Chatbot Feature Container - Main Module. Handles feature initialization and RBAC catalog registration. """ import logging from typing import Dict, List, Any, Optional logger = logging.getLogger(__name__) # Feature metadata FEATURE_CODE = "chatbot" FEATURE_LABEL = {"en": "Chatbot", "de": "Chatbot", "fr": "Chatbot"} FEATURE_ICON = "mdi-robot" # UI Objects for RBAC catalog UI_OBJECTS = [ { "objectKey": "ui.feature.chatbot.conversations", "label": {"en": "Conversations", "de": "Konversationen", "fr": "Conversations"}, "meta": {"area": "conversations"} } ] # Resource Objects for RBAC catalog RESOURCE_OBJECTS = [ { "objectKey": "resource.feature.chatbot.startStream", "label": {"en": "Start Chat (Stream)", "de": "Chat starten (Stream)", "fr": "Démarrer chat (Stream)"}, "meta": {"endpoint": "/api/chatbot/{instanceId}/start/stream", "method": "POST"} }, { "objectKey": "resource.feature.chatbot.stop", "label": {"en": "Stop Chat", "de": "Chat stoppen", "fr": "Arrêter chat"}, "meta": {"endpoint": "/api/chatbot/{instanceId}/stop/{workflowId}", "method": "POST"} }, { "objectKey": "resource.feature.chatbot.threads", "label": {"en": "Get Threads", "de": "Threads abrufen", "fr": "Récupérer threads"}, "meta": {"endpoint": "/api/chatbot/{instanceId}/threads", "method": "GET"} }, { "objectKey": "resource.feature.chatbot.delete", "label": {"en": "Delete Chat", "de": "Chat löschen", "fr": "Supprimer chat"}, "meta": {"endpoint": "/api/chatbot/{instanceId}/{workflowId}", "method": "DELETE"} }, ] # Service requirements - services this feature needs from the service center # Format: [{serviceKey, meta}]. Used by getChatbotServices() to resolve only needed services. REQUIRED_SERVICES = [ { "serviceKey": "chat", "meta": {"usage": "File info, document handling"} }, { "serviceKey": "ai", "meta": {"usage": "AI calls, conversation name generation"} }, { "serviceKey": "billing", "meta": {"usage": "Usage tracking, balance checks"} }, { "serviceKey": "streaming", "meta": {"usage": "Event manager, ChatStreamingHelper"} }, ] # Template roles for this feature # Role names MUST follow convention: {featureCode}-{roleName} TEMPLATE_ROLES = [ { "roleLabel": "chatbot-viewer", "description": { "en": "Chatbot Viewer - View chat threads (read-only)", "de": "Chatbot Betrachter - Chat-Threads ansehen (nur lesen)", "fr": "Visualiseur Chatbot - Consulter les threads (lecture seule)" }, "accessRules": [ # UI: only threads view, NO active chat {"context": "UI", "item": "ui.feature.chatbot.threads", "view": True}, # RESOURCE: can list threads only {"context": "RESOURCE", "item": "resource.feature.chatbot.threads", "view": True}, # DATA access (own records, read-only) {"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"}, ] }, { "roleLabel": "chatbot-user", "description": { "en": "Chatbot User - Use the chatbot and manage own threads", "de": "Chatbot Benutzer - Chatbot nutzen und eigene Threads verwalten", "fr": "Utilisateur Chatbot - Utiliser le chatbot et gérer ses threads" }, "accessRules": [ # UI: full access to all views {"context": "UI", "item": "ui.feature.chatbot.conversations", "view": True}, {"context": "UI", "item": "ui.feature.chatbot.threads", "view": True}, # Resource access: can start/stop chats, view threads, delete own {"context": "RESOURCE", "item": "resource.feature.chatbot.startStream", "view": True}, {"context": "RESOURCE", "item": "resource.feature.chatbot.stop", "view": True}, {"context": "RESOURCE", "item": "resource.feature.chatbot.threads", "view": True}, {"context": "RESOURCE", "item": "resource.feature.chatbot.delete", "view": True}, # DATA access (own records) {"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"}, ] }, { "roleLabel": "chatbot-admin", "description": { "en": "Chatbot Admin - Full access to all chatbot features", "de": "Chatbot Admin - Vollzugriff auf alle Chatbot-Funktionen", "fr": "Administrateur Chatbot - Accès complet à toutes les fonctions chatbot" }, "accessRules": [ # Full UI access {"context": "UI", "item": None, "view": True}, # Full resource access {"context": "RESOURCE", "item": None, "view": True}, # Full DATA access {"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"}, ] }, ] def getFeatureDefinition() -> Dict[str, Any]: """Return the feature definition for registration.""" return { "code": FEATURE_CODE, "label": FEATURE_LABEL, "icon": FEATURE_ICON, } def getRequiredServiceKeys() -> List[str]: """Return list of service keys this feature requires.""" return [s["serviceKey"] for s in REQUIRED_SERVICES] def getChatbotServices( user, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, workflow=None, ) -> Any: """ Get a service hub for the chatbot feature using the service center. Resolves only the services declared in REQUIRED_SERVICES. Returns a hub-like object with: chat, ai, billing, streaming, plus interfaceDbComponent, user, mandateId, featureInstanceId. """ from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface # Provide workflow or placeholder so billing/etc get featureCode _workflow = workflow if _workflow is None: _workflow = type("_Placeholder", (), {"featureCode": FEATURE_CODE})() ctx = ServiceCenterContext( user=user, mandate_id=mandateId, feature_instance_id=featureInstanceId, workflow=_workflow, ) hub = _ChatbotServiceHub() hub.user = user hub.mandateId = mandateId hub.featureInstanceId = featureInstanceId hub.workflow = workflow hub.interfaceDbComponent = getComponentInterface(user, mandateId=mandateId, featureInstanceId=featureInstanceId) for spec in REQUIRED_SERVICES: key = spec["serviceKey"] try: svc = getService(key, ctx, legacy_hub=None) setattr(hub, key, svc) except Exception as e: logger.warning(f"Could not resolve service '{key}' for chatbot: {e}") setattr(hub, key, None) return hub def getChatStreamingHelper(): """ Get ChatStreamingHelper utility class (used by chatbot for message normalization). Resolves via service center streaming service. """ from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext # Minimal context - streaming service only needs it for resolver ctx = ServiceCenterContext(user=__get_placeholder_user(), mandate_id=None, feature_instance_id=None) streaming = getService("streaming", ctx, legacy_hub=None) return streaming.getChatStreamingHelper() if streaming else None def __get_placeholder_user(): """Placeholder user for contexts that only need service resolution (e.g. ChatStreamingHelper).""" from modules.datamodels.datamodelUam import User return User(id="system", username="system", email=None, fullName="System Placeholder") def getEventManager(user, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None): """ Get the global event manager for SSE streaming (used by chatbot routes). """ from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=user, mandate_id=mandateId, feature_instance_id=featureInstanceId, ) streaming = getService("streaming", ctx, legacy_hub=None) return streaming.getEventManager() class _ChatbotServiceHub: """Lightweight hub exposing only services required by the chatbot feature.""" user = None mandateId = None featureInstanceId = None workflow = None interfaceDbComponent = None chat = None ai = None billing = None streaming = None featureCode = "chatbot" allowedProviders = None def getUiObjects() -> List[Dict[str, Any]]: """Return UI objects for RBAC catalog registration.""" return UI_OBJECTS def getResourceObjects() -> List[Dict[str, Any]]: """Return resource objects for RBAC catalog registration.""" return RESOURCE_OBJECTS def getTemplateRoles() -> List[Dict[str, Any]]: """Return template roles for this feature.""" return TEMPLATE_ROLES def registerFeature(catalogService) -> bool: """ Register this feature's RBAC objects in the catalog. Args: catalogService: The RBAC catalog service instance Returns: True if registration was successful """ try: # Register UI objects for uiObj in UI_OBJECTS: catalogService.registerUiObject( featureCode=FEATURE_CODE, objectKey=uiObj["objectKey"], label=uiObj["label"], meta=uiObj.get("meta") ) # Register Resource objects for resObj in RESOURCE_OBJECTS: catalogService.registerResourceObject( featureCode=FEATURE_CODE, objectKey=resObj["objectKey"], label=resObj["label"], meta=resObj.get("meta") ) # Sync template roles to database _syncTemplateRolesToDb() logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects") return True except Exception as e: logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}") return False def getChatbotServices( user, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, workflow=None, ) -> "_ChatbotServiceHub": """ Get lightweight service hub for chatbot (chat, ai, streaming) without loading the full legacy Services hub. Avoids ~90 ms from _loadFeatureInterfaces + _loadFeatureServices; only instantiates required services. Uses interfaceFeatureChatbot (ChatObjects) for interfaceDbChat to avoid duplicate DB init - chatProcess reuses hub.interfaceDbChat. """ from modules.services import PublicService from modules.interfaces.interfaceDbApp import getInterface as getAppInterface from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface from modules.services.serviceChat.mainServiceChat import ChatService from modules.services.serviceAi.mainServiceAi import AiService from modules.services.serviceStreaming.mainServiceStreaming import StreamingService hub = _ChatbotServiceHub() hub.user = user hub.mandateId = mandateId hub.featureInstanceId = featureInstanceId hub.workflow = workflow hub.featureCode = "chatbot" hub.allowedProviders = None hub.interfaceDbApp = getAppInterface(user, mandateId=mandateId) # interfaceDbComponent: lazy-loaded on first access (saves ~100–300 ms when no file uploads) hub._interfaceDbComponent_val = None # Use ChatObjects (interfaceFeatureChatbot) - same as chatProcess, avoids extra interfaceDbChat init hub.interfaceDbChat = getChatbotInterface( user, mandateId=mandateId, featureInstanceId=featureInstanceId ) hub.chat = PublicService(ChatService(hub)) hub.ai = PublicService(AiService(hub), functionsOnly=False) hub.streaming = PublicService(StreamingService(hub)) # Resolve billing from service center (required for _preflight_billing_check and billing callback) try: from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext _workflow = workflow or type("_Placeholder", (), {"featureCode": FEATURE_CODE})() ctx = ServiceCenterContext( user=user, mandate_id=mandateId, feature_instance_id=featureInstanceId, workflow=_workflow, ) hub.billing = getService("billing", ctx, legacy_hub=None) except Exception as e: logger.warning(f"Could not resolve billing service for chatbot: {e}") hub.billing = None return hub class _ChatbotServiceHub: """Lightweight hub with chat, ai, streaming for chatbot; avoids full Services init.""" user = None mandateId = None featureInstanceId = None workflow = None interfaceDbApp = None _interfaceDbComponent_val = None interfaceDbChat = None @property def interfaceDbComponent(self): """Lazy-load interfaceDbComponent on first access (saves ~100–300 ms when no files).""" if self._interfaceDbComponent_val is None: from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface self._interfaceDbComponent_val = getComponentInterface( self.user, mandateId=self.mandateId, featureInstanceId=self.featureInstanceId ) return self._interfaceDbComponent_val chat = None ai = None billing = None streaming = None featureCode = "chatbot" allowedProviders = None def _syncTemplateRolesToDb() -> int: """ Sync template roles and their AccessRules to the database. Creates global template roles (mandateId=None) if they don't exist. Returns: Number of roles created/updated """ try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext rootInterface = getRootInterface() # Get existing template roles for this feature (Pydantic models) existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE) # Filter to template roles (mandateId is None) templateRoles = [r for r in existingRoles if r.mandateId is None] existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles} createdCount = 0 for roleTemplate in TEMPLATE_ROLES: roleLabel = roleTemplate["roleLabel"] if roleLabel in existingRoleLabels: roleId = existingRoleLabels[roleLabel] # Ensure AccessRules exist for this role _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) else: # Create new template role newRole = Role( roleLabel=roleLabel, description=roleTemplate.get("description", {}), featureCode=FEATURE_CODE, mandateId=None, # Global template featureInstanceId=None, isSystemRole=False ) createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump()) roleId = createdRole.get("id") # Create AccessRules for this role _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) logger.info(f"Created template role '{roleLabel}' with ID {roleId}") createdCount += 1 if createdCount > 0: logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles") return createdCount except Exception as e: logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}") return 0 def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int: """ Ensure AccessRules exist for a role based on templates. Args: rootInterface: Root interface instance roleId: Role ID ruleTemplates: List of rule templates Returns: Number of rules created """ from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext # Get existing rules for this role (Pydantic models) existingRules = rootInterface.getAccessRulesByRole(roleId) # Create a set of existing rule signatures to avoid duplicates # IMPORTANT: Use .value for enum comparison, not str() which gives "AccessRuleContext.DATA" in Python 3.11+ existingSignatures = set() for rule in existingRules: sig = (rule.context.value if rule.context else None, rule.item) existingSignatures.add(sig) createdCount = 0 for template in ruleTemplates: context = template.get("context", "UI") item = template.get("item") sig = (context, item) if sig in existingSignatures: continue # Map context string to enum if context == "UI": contextEnum = AccessRuleContext.UI elif context == "DATA": contextEnum = AccessRuleContext.DATA elif context == "RESOURCE": contextEnum = AccessRuleContext.RESOURCE else: contextEnum = context newRule = AccessRule( roleId=roleId, context=contextEnum, item=item, view=template.get("view", False), read=template.get("read"), create=template.get("create"), update=template.get("update"), delete=template.get("delete"), ) rootInterface.db.recordCreate(AccessRule, newRule.model_dump()) createdCount += 1 if createdCount > 0: logger.debug(f"Created {createdCount} AccessRules for role {roleId}") return createdCount