499 lines
18 KiB
Python
499 lines
18 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
||
# All rights reserved.
|
||
"""
|
||
Chatbot Feature Container - Main Module.
|
||
Handles feature initialization and RBAC catalog registration.
|
||
"""
|
||
|
||
import logging
|
||
from typing import Dict, List, Any, Optional
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Feature metadata
|
||
FEATURE_CODE = "chatbot"
|
||
FEATURE_LABEL = {"en": "Chatbot", "de": "Chatbot", "fr": "Chatbot"}
|
||
FEATURE_ICON = "mdi-robot"
|
||
|
||
# UI Objects for RBAC catalog
|
||
UI_OBJECTS = [
|
||
{
|
||
"objectKey": "ui.feature.chatbot.conversations",
|
||
"label": {"en": "Conversations", "de": "Konversationen", "fr": "Conversations"},
|
||
"meta": {"area": "conversations"}
|
||
}
|
||
]
|
||
|
||
# Resource Objects for RBAC catalog
|
||
RESOURCE_OBJECTS = [
|
||
{
|
||
"objectKey": "resource.feature.chatbot.startStream",
|
||
"label": {"en": "Start Chat (Stream)", "de": "Chat starten (Stream)", "fr": "Démarrer chat (Stream)"},
|
||
"meta": {"endpoint": "/api/chatbot/{instanceId}/start/stream", "method": "POST"}
|
||
},
|
||
{
|
||
"objectKey": "resource.feature.chatbot.stop",
|
||
"label": {"en": "Stop Chat", "de": "Chat stoppen", "fr": "Arrêter chat"},
|
||
"meta": {"endpoint": "/api/chatbot/{instanceId}/stop/{workflowId}", "method": "POST"}
|
||
},
|
||
{
|
||
"objectKey": "resource.feature.chatbot.threads",
|
||
"label": {"en": "Get Threads", "de": "Threads abrufen", "fr": "Récupérer threads"},
|
||
"meta": {"endpoint": "/api/chatbot/{instanceId}/threads", "method": "GET"}
|
||
},
|
||
{
|
||
"objectKey": "resource.feature.chatbot.delete",
|
||
"label": {"en": "Delete Chat", "de": "Chat löschen", "fr": "Supprimer chat"},
|
||
"meta": {"endpoint": "/api/chatbot/{instanceId}/{workflowId}", "method": "DELETE"}
|
||
},
|
||
]
|
||
|
||
# Service requirements - services this feature needs from the service center
|
||
# Format: [{serviceKey, meta}]. Used by getChatbotServices() to resolve only needed services.
|
||
REQUIRED_SERVICES = [
|
||
{
|
||
"serviceKey": "chat",
|
||
"meta": {"usage": "File info, document handling"}
|
||
},
|
||
{
|
||
"serviceKey": "ai",
|
||
"meta": {"usage": "AI calls, conversation name generation"}
|
||
},
|
||
{
|
||
"serviceKey": "billing",
|
||
"meta": {"usage": "Usage tracking, balance checks"}
|
||
},
|
||
{
|
||
"serviceKey": "streaming",
|
||
"meta": {"usage": "Event manager, ChatStreamingHelper"}
|
||
},
|
||
]
|
||
|
||
# Template roles for this feature
|
||
# Role names MUST follow convention: {featureCode}-{roleName}
|
||
TEMPLATE_ROLES = [
|
||
{
|
||
"roleLabel": "chatbot-viewer",
|
||
"description": {
|
||
"en": "Chatbot Viewer - View chat threads (read-only)",
|
||
"de": "Chatbot Betrachter - Chat-Threads ansehen (nur lesen)",
|
||
"fr": "Visualiseur Chatbot - Consulter les threads (lecture seule)"
|
||
},
|
||
"accessRules": [
|
||
# UI: only threads view, NO active chat
|
||
{"context": "UI", "item": "ui.feature.chatbot.threads", "view": True},
|
||
# RESOURCE: can list threads only
|
||
{"context": "RESOURCE", "item": "resource.feature.chatbot.threads", "view": True},
|
||
# DATA access (own records, read-only)
|
||
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
|
||
]
|
||
},
|
||
{
|
||
"roleLabel": "chatbot-user",
|
||
"description": {
|
||
"en": "Chatbot User - Use the chatbot and manage own threads",
|
||
"de": "Chatbot Benutzer - Chatbot nutzen und eigene Threads verwalten",
|
||
"fr": "Utilisateur Chatbot - Utiliser le chatbot et gérer ses threads"
|
||
},
|
||
"accessRules": [
|
||
# UI: full access to all views
|
||
{"context": "UI", "item": "ui.feature.chatbot.conversations", "view": True},
|
||
{"context": "UI", "item": "ui.feature.chatbot.threads", "view": True},
|
||
# Resource access: can start/stop chats, view threads, delete own
|
||
{"context": "RESOURCE", "item": "resource.feature.chatbot.startStream", "view": True},
|
||
{"context": "RESOURCE", "item": "resource.feature.chatbot.stop", "view": True},
|
||
{"context": "RESOURCE", "item": "resource.feature.chatbot.threads", "view": True},
|
||
{"context": "RESOURCE", "item": "resource.feature.chatbot.delete", "view": True},
|
||
# DATA access (own records)
|
||
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"},
|
||
]
|
||
},
|
||
{
|
||
"roleLabel": "chatbot-admin",
|
||
"description": {
|
||
"en": "Chatbot Admin - Full access to all chatbot features",
|
||
"de": "Chatbot Admin - Vollzugriff auf alle Chatbot-Funktionen",
|
||
"fr": "Administrateur Chatbot - Accès complet à toutes les fonctions chatbot"
|
||
},
|
||
"accessRules": [
|
||
# Full UI access
|
||
{"context": "UI", "item": None, "view": True},
|
||
# Full resource access
|
||
{"context": "RESOURCE", "item": None, "view": True},
|
||
# Full DATA access
|
||
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
|
||
]
|
||
},
|
||
]
|
||
|
||
|
||
def getFeatureDefinition() -> Dict[str, Any]:
|
||
"""Return the feature definition for registration."""
|
||
return {
|
||
"code": FEATURE_CODE,
|
||
"label": FEATURE_LABEL,
|
||
"icon": FEATURE_ICON,
|
||
}
|
||
|
||
|
||
def getRequiredServiceKeys() -> List[str]:
|
||
"""Return list of service keys this feature requires."""
|
||
return [s["serviceKey"] for s in REQUIRED_SERVICES]
|
||
|
||
|
||
def getChatbotServices(
|
||
user,
|
||
mandateId: Optional[str] = None,
|
||
featureInstanceId: Optional[str] = None,
|
||
workflow=None,
|
||
) -> Any:
|
||
"""
|
||
Get a service hub for the chatbot feature using the service center.
|
||
Resolves only the services declared in REQUIRED_SERVICES.
|
||
|
||
Returns a hub-like object with: chat, ai, billing, streaming,
|
||
plus interfaceDbComponent, user, mandateId, featureInstanceId.
|
||
"""
|
||
from modules.serviceCenter import getService
|
||
from modules.serviceCenter.context import ServiceCenterContext
|
||
from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface
|
||
|
||
# Provide workflow or placeholder so billing/etc get featureCode
|
||
_workflow = workflow
|
||
if _workflow is None:
|
||
_workflow = type("_Placeholder", (), {"featureCode": FEATURE_CODE})()
|
||
ctx = ServiceCenterContext(
|
||
user=user,
|
||
mandate_id=mandateId,
|
||
feature_instance_id=featureInstanceId,
|
||
workflow=_workflow,
|
||
)
|
||
|
||
hub = _ChatbotServiceHub()
|
||
hub.user = user
|
||
hub.mandateId = mandateId
|
||
hub.featureInstanceId = featureInstanceId
|
||
hub.workflow = workflow
|
||
hub.interfaceDbComponent = getComponentInterface(user, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
||
|
||
for spec in REQUIRED_SERVICES:
|
||
key = spec["serviceKey"]
|
||
try:
|
||
svc = getService(key, ctx, legacy_hub=None)
|
||
setattr(hub, key, svc)
|
||
except Exception as e:
|
||
logger.warning(f"Could not resolve service '{key}' for chatbot: {e}")
|
||
setattr(hub, key, None)
|
||
|
||
return hub
|
||
|
||
|
||
def getChatStreamingHelper():
|
||
"""
|
||
Get ChatStreamingHelper utility class (used by chatbot for message normalization).
|
||
Resolves via service center streaming service.
|
||
"""
|
||
from modules.serviceCenter import getService
|
||
from modules.serviceCenter.context import ServiceCenterContext
|
||
# Minimal context - streaming service only needs it for resolver
|
||
ctx = ServiceCenterContext(user=__get_placeholder_user(), mandate_id=None, feature_instance_id=None)
|
||
streaming = getService("streaming", ctx, legacy_hub=None)
|
||
return streaming.getChatStreamingHelper() if streaming else None
|
||
|
||
|
||
def __get_placeholder_user():
|
||
"""Placeholder user for contexts that only need service resolution (e.g. ChatStreamingHelper)."""
|
||
from modules.datamodels.datamodelUam import User
|
||
return User(id="system", username="system", email=None, fullName="System Placeholder")
|
||
|
||
|
||
def getEventManager(user, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
||
"""
|
||
Get the global event manager for SSE streaming (used by chatbot routes).
|
||
"""
|
||
from modules.serviceCenter import getService
|
||
from modules.serviceCenter.context import ServiceCenterContext
|
||
|
||
ctx = ServiceCenterContext(
|
||
user=user,
|
||
mandate_id=mandateId,
|
||
feature_instance_id=featureInstanceId,
|
||
)
|
||
streaming = getService("streaming", ctx, legacy_hub=None)
|
||
return streaming.getEventManager()
|
||
|
||
|
||
class _ChatbotServiceHub:
|
||
"""Lightweight hub exposing only services required by the chatbot feature."""
|
||
user = None
|
||
mandateId = None
|
||
featureInstanceId = None
|
||
workflow = None
|
||
interfaceDbComponent = None
|
||
chat = None
|
||
ai = None
|
||
billing = None
|
||
streaming = None
|
||
featureCode = "chatbot"
|
||
allowedProviders = None
|
||
|
||
|
||
def getUiObjects() -> List[Dict[str, Any]]:
|
||
"""Return UI objects for RBAC catalog registration."""
|
||
return UI_OBJECTS
|
||
|
||
|
||
def getResourceObjects() -> List[Dict[str, Any]]:
|
||
"""Return resource objects for RBAC catalog registration."""
|
||
return RESOURCE_OBJECTS
|
||
|
||
|
||
def getTemplateRoles() -> List[Dict[str, Any]]:
|
||
"""Return template roles for this feature."""
|
||
return TEMPLATE_ROLES
|
||
|
||
|
||
def registerFeature(catalogService) -> bool:
|
||
"""
|
||
Register this feature's RBAC objects in the catalog.
|
||
|
||
Args:
|
||
catalogService: The RBAC catalog service instance
|
||
|
||
Returns:
|
||
True if registration was successful
|
||
"""
|
||
try:
|
||
# Register UI objects
|
||
for uiObj in UI_OBJECTS:
|
||
catalogService.registerUiObject(
|
||
featureCode=FEATURE_CODE,
|
||
objectKey=uiObj["objectKey"],
|
||
label=uiObj["label"],
|
||
meta=uiObj.get("meta")
|
||
)
|
||
|
||
# Register Resource objects
|
||
for resObj in RESOURCE_OBJECTS:
|
||
catalogService.registerResourceObject(
|
||
featureCode=FEATURE_CODE,
|
||
objectKey=resObj["objectKey"],
|
||
label=resObj["label"],
|
||
meta=resObj.get("meta")
|
||
)
|
||
|
||
# Sync template roles to database
|
||
_syncTemplateRolesToDb()
|
||
|
||
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
|
||
return False
|
||
|
||
|
||
def getChatbotServices(
|
||
user,
|
||
mandateId: Optional[str] = None,
|
||
featureInstanceId: Optional[str] = None,
|
||
workflow=None,
|
||
) -> "_ChatbotServiceHub":
|
||
"""
|
||
Get lightweight service hub for chatbot (chat, ai, streaming) without loading
|
||
the full legacy Services hub. Avoids ~90 ms from _loadFeatureInterfaces +
|
||
_loadFeatureServices; only instantiates required services.
|
||
Uses interfaceFeatureChatbot (ChatObjects) for interfaceDbChat to avoid
|
||
duplicate DB init - chatProcess reuses hub.interfaceDbChat.
|
||
"""
|
||
from modules.services import PublicService
|
||
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
|
||
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
|
||
from modules.services.serviceChat.mainServiceChat import ChatService
|
||
from modules.services.serviceAi.mainServiceAi import AiService
|
||
from modules.services.serviceStreaming.mainServiceStreaming import StreamingService
|
||
|
||
hub = _ChatbotServiceHub()
|
||
hub.user = user
|
||
hub.mandateId = mandateId
|
||
hub.featureInstanceId = featureInstanceId
|
||
hub.workflow = workflow
|
||
hub.featureCode = "chatbot"
|
||
hub.allowedProviders = None
|
||
|
||
hub.interfaceDbApp = getAppInterface(user, mandateId=mandateId)
|
||
# interfaceDbComponent: lazy-loaded on first access (saves ~100–300 ms when no file uploads)
|
||
hub._interfaceDbComponent_val = None
|
||
# Use ChatObjects (interfaceFeatureChatbot) - same as chatProcess, avoids extra interfaceDbChat init
|
||
hub.interfaceDbChat = getChatbotInterface(
|
||
user, mandateId=mandateId, featureInstanceId=featureInstanceId
|
||
)
|
||
|
||
hub.chat = PublicService(ChatService(hub))
|
||
hub.ai = PublicService(AiService(hub), functionsOnly=False)
|
||
hub.streaming = PublicService(StreamingService(hub))
|
||
|
||
# Resolve billing from service center (required for _preflight_billing_check and billing callback)
|
||
try:
|
||
from modules.serviceCenter import getService
|
||
from modules.serviceCenter.context import ServiceCenterContext
|
||
_workflow = workflow or type("_Placeholder", (), {"featureCode": FEATURE_CODE})()
|
||
ctx = ServiceCenterContext(
|
||
user=user,
|
||
mandate_id=mandateId,
|
||
feature_instance_id=featureInstanceId,
|
||
workflow=_workflow,
|
||
)
|
||
hub.billing = getService("billing", ctx, legacy_hub=None)
|
||
except Exception as e:
|
||
logger.warning(f"Could not resolve billing service for chatbot: {e}")
|
||
hub.billing = None
|
||
|
||
return hub
|
||
|
||
|
||
class _ChatbotServiceHub:
|
||
"""Lightweight hub with chat, ai, streaming for chatbot; avoids full Services init."""
|
||
|
||
user = None
|
||
mandateId = None
|
||
featureInstanceId = None
|
||
workflow = None
|
||
interfaceDbApp = None
|
||
_interfaceDbComponent_val = None
|
||
interfaceDbChat = None
|
||
|
||
@property
|
||
def interfaceDbComponent(self):
|
||
"""Lazy-load interfaceDbComponent on first access (saves ~100–300 ms when no files)."""
|
||
if self._interfaceDbComponent_val is None:
|
||
from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface
|
||
self._interfaceDbComponent_val = getComponentInterface(
|
||
self.user, mandateId=self.mandateId, featureInstanceId=self.featureInstanceId
|
||
)
|
||
return self._interfaceDbComponent_val
|
||
chat = None
|
||
ai = None
|
||
billing = None
|
||
streaming = None
|
||
featureCode = "chatbot"
|
||
allowedProviders = None
|
||
|
||
|
||
def _syncTemplateRolesToDb() -> int:
|
||
"""
|
||
Sync template roles and their AccessRules to the database.
|
||
Creates global template roles (mandateId=None) if they don't exist.
|
||
|
||
Returns:
|
||
Number of roles created/updated
|
||
"""
|
||
try:
|
||
from modules.interfaces.interfaceDbApp import getRootInterface
|
||
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
|
||
|
||
rootInterface = getRootInterface()
|
||
|
||
# Get existing template roles for this feature (Pydantic models)
|
||
existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE)
|
||
# Filter to template roles (mandateId is None)
|
||
templateRoles = [r for r in existingRoles if r.mandateId is None]
|
||
existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles}
|
||
|
||
createdCount = 0
|
||
for roleTemplate in TEMPLATE_ROLES:
|
||
roleLabel = roleTemplate["roleLabel"]
|
||
|
||
if roleLabel in existingRoleLabels:
|
||
roleId = existingRoleLabels[roleLabel]
|
||
# Ensure AccessRules exist for this role
|
||
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
|
||
else:
|
||
# Create new template role
|
||
newRole = Role(
|
||
roleLabel=roleLabel,
|
||
description=roleTemplate.get("description", {}),
|
||
featureCode=FEATURE_CODE,
|
||
mandateId=None, # Global template
|
||
featureInstanceId=None,
|
||
isSystemRole=False
|
||
)
|
||
createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump())
|
||
roleId = createdRole.get("id")
|
||
|
||
# Create AccessRules for this role
|
||
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
|
||
|
||
logger.info(f"Created template role '{roleLabel}' with ID {roleId}")
|
||
createdCount += 1
|
||
|
||
if createdCount > 0:
|
||
logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles")
|
||
|
||
return createdCount
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}")
|
||
return 0
|
||
|
||
|
||
def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
|
||
"""
|
||
Ensure AccessRules exist for a role based on templates.
|
||
|
||
Args:
|
||
rootInterface: Root interface instance
|
||
roleId: Role ID
|
||
ruleTemplates: List of rule templates
|
||
|
||
Returns:
|
||
Number of rules created
|
||
"""
|
||
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
|
||
|
||
# Get existing rules for this role (Pydantic models)
|
||
existingRules = rootInterface.getAccessRulesByRole(roleId)
|
||
|
||
# Create a set of existing rule signatures to avoid duplicates
|
||
# IMPORTANT: Use .value for enum comparison, not str() which gives "AccessRuleContext.DATA" in Python 3.11+
|
||
existingSignatures = set()
|
||
for rule in existingRules:
|
||
sig = (rule.context.value if rule.context else None, rule.item)
|
||
existingSignatures.add(sig)
|
||
|
||
createdCount = 0
|
||
for template in ruleTemplates:
|
||
context = template.get("context", "UI")
|
||
item = template.get("item")
|
||
sig = (context, item)
|
||
|
||
if sig in existingSignatures:
|
||
continue
|
||
|
||
# Map context string to enum
|
||
if context == "UI":
|
||
contextEnum = AccessRuleContext.UI
|
||
elif context == "DATA":
|
||
contextEnum = AccessRuleContext.DATA
|
||
elif context == "RESOURCE":
|
||
contextEnum = AccessRuleContext.RESOURCE
|
||
else:
|
||
contextEnum = context
|
||
|
||
newRule = AccessRule(
|
||
roleId=roleId,
|
||
context=contextEnum,
|
||
item=item,
|
||
view=template.get("view", False),
|
||
read=template.get("read"),
|
||
create=template.get("create"),
|
||
update=template.get("update"),
|
||
delete=template.get("delete"),
|
||
)
|
||
rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
|
||
createdCount += 1
|
||
|
||
if createdCount > 0:
|
||
logger.debug(f"Created {createdCount} AccessRules for role {roleId}")
|
||
|
||
return createdCount
|