gateway/modules/features/chatbot/mainChatbot.py

499 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Chatbot Feature Container - Main Module.
Handles feature initialization and RBAC catalog registration.
"""
import logging
from typing import Dict, List, Any, Optional
logger = logging.getLogger(__name__)
# Feature metadata
FEATURE_CODE = "chatbot"
FEATURE_LABEL = {"en": "Chatbot", "de": "Chatbot", "fr": "Chatbot"}
FEATURE_ICON = "mdi-robot"
# UI Objects for RBAC catalog
UI_OBJECTS = [
{
"objectKey": "ui.feature.chatbot.conversations",
"label": {"en": "Conversations", "de": "Konversationen", "fr": "Conversations"},
"meta": {"area": "conversations"}
}
]
# Resource Objects for RBAC catalog
RESOURCE_OBJECTS = [
{
"objectKey": "resource.feature.chatbot.startStream",
"label": {"en": "Start Chat (Stream)", "de": "Chat starten (Stream)", "fr": "Démarrer chat (Stream)"},
"meta": {"endpoint": "/api/chatbot/{instanceId}/start/stream", "method": "POST"}
},
{
"objectKey": "resource.feature.chatbot.stop",
"label": {"en": "Stop Chat", "de": "Chat stoppen", "fr": "Arrêter chat"},
"meta": {"endpoint": "/api/chatbot/{instanceId}/stop/{workflowId}", "method": "POST"}
},
{
"objectKey": "resource.feature.chatbot.threads",
"label": {"en": "Get Threads", "de": "Threads abrufen", "fr": "Récupérer threads"},
"meta": {"endpoint": "/api/chatbot/{instanceId}/threads", "method": "GET"}
},
{
"objectKey": "resource.feature.chatbot.delete",
"label": {"en": "Delete Chat", "de": "Chat löschen", "fr": "Supprimer chat"},
"meta": {"endpoint": "/api/chatbot/{instanceId}/{workflowId}", "method": "DELETE"}
},
]
# Service requirements - services this feature needs from the service center
# Format: [{serviceKey, meta}]. Used by getChatbotServices() to resolve only needed services.
REQUIRED_SERVICES = [
{
"serviceKey": "chat",
"meta": {"usage": "File info, document handling"}
},
{
"serviceKey": "ai",
"meta": {"usage": "AI calls, conversation name generation"}
},
{
"serviceKey": "billing",
"meta": {"usage": "Usage tracking, balance checks"}
},
{
"serviceKey": "streaming",
"meta": {"usage": "Event manager, ChatStreamingHelper"}
},
]
# Template roles for this feature
# Role names MUST follow convention: {featureCode}-{roleName}
TEMPLATE_ROLES = [
{
"roleLabel": "chatbot-viewer",
"description": {
"en": "Chatbot Viewer - View chat threads (read-only)",
"de": "Chatbot Betrachter - Chat-Threads ansehen (nur lesen)",
"fr": "Visualiseur Chatbot - Consulter les threads (lecture seule)"
},
"accessRules": [
# UI: only threads view, NO active chat
{"context": "UI", "item": "ui.feature.chatbot.threads", "view": True},
# RESOURCE: can list threads only
{"context": "RESOURCE", "item": "resource.feature.chatbot.threads", "view": True},
# DATA access (own records, read-only)
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
]
},
{
"roleLabel": "chatbot-user",
"description": {
"en": "Chatbot User - Use the chatbot and manage own threads",
"de": "Chatbot Benutzer - Chatbot nutzen und eigene Threads verwalten",
"fr": "Utilisateur Chatbot - Utiliser le chatbot et gérer ses threads"
},
"accessRules": [
# UI: full access to all views
{"context": "UI", "item": "ui.feature.chatbot.conversations", "view": True},
{"context": "UI", "item": "ui.feature.chatbot.threads", "view": True},
# Resource access: can start/stop chats, view threads, delete own
{"context": "RESOURCE", "item": "resource.feature.chatbot.startStream", "view": True},
{"context": "RESOURCE", "item": "resource.feature.chatbot.stop", "view": True},
{"context": "RESOURCE", "item": "resource.feature.chatbot.threads", "view": True},
{"context": "RESOURCE", "item": "resource.feature.chatbot.delete", "view": True},
# DATA access (own records)
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"},
]
},
{
"roleLabel": "chatbot-admin",
"description": {
"en": "Chatbot Admin - Full access to all chatbot features",
"de": "Chatbot Admin - Vollzugriff auf alle Chatbot-Funktionen",
"fr": "Administrateur Chatbot - Accès complet à toutes les fonctions chatbot"
},
"accessRules": [
# Full UI access
{"context": "UI", "item": None, "view": True},
# Full resource access
{"context": "RESOURCE", "item": None, "view": True},
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
]
},
]
def getFeatureDefinition() -> Dict[str, Any]:
"""Return the feature definition for registration."""
return {
"code": FEATURE_CODE,
"label": FEATURE_LABEL,
"icon": FEATURE_ICON,
}
def getRequiredServiceKeys() -> List[str]:
"""Return list of service keys this feature requires."""
return [s["serviceKey"] for s in REQUIRED_SERVICES]
def getChatbotServices(
user,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
workflow=None,
) -> Any:
"""
Get a service hub for the chatbot feature using the service center.
Resolves only the services declared in REQUIRED_SERVICES.
Returns a hub-like object with: chat, ai, billing, streaming,
plus interfaceDbComponent, user, mandateId, featureInstanceId.
"""
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface
# Provide workflow or placeholder so billing/etc get featureCode
_workflow = workflow
if _workflow is None:
_workflow = type("_Placeholder", (), {"featureCode": FEATURE_CODE})()
ctx = ServiceCenterContext(
user=user,
mandate_id=mandateId,
feature_instance_id=featureInstanceId,
workflow=_workflow,
)
hub = _ChatbotServiceHub()
hub.user = user
hub.mandateId = mandateId
hub.featureInstanceId = featureInstanceId
hub.workflow = workflow
hub.interfaceDbComponent = getComponentInterface(user, mandateId=mandateId, featureInstanceId=featureInstanceId)
for spec in REQUIRED_SERVICES:
key = spec["serviceKey"]
try:
svc = getService(key, ctx, legacy_hub=None)
setattr(hub, key, svc)
except Exception as e:
logger.warning(f"Could not resolve service '{key}' for chatbot: {e}")
setattr(hub, key, None)
return hub
def getChatStreamingHelper():
"""
Get ChatStreamingHelper utility class (used by chatbot for message normalization).
Resolves via service center streaming service.
"""
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
# Minimal context - streaming service only needs it for resolver
ctx = ServiceCenterContext(user=__get_placeholder_user(), mandate_id=None, feature_instance_id=None)
streaming = getService("streaming", ctx, legacy_hub=None)
return streaming.getChatStreamingHelper() if streaming else None
def __get_placeholder_user():
"""Placeholder user for contexts that only need service resolution (e.g. ChatStreamingHelper)."""
from modules.datamodels.datamodelUam import User
return User(id="system", username="system", email=None, fullName="System Placeholder")
def getEventManager(user, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""
Get the global event manager for SSE streaming (used by chatbot routes).
"""
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=user,
mandate_id=mandateId,
feature_instance_id=featureInstanceId,
)
streaming = getService("streaming", ctx, legacy_hub=None)
return streaming.getEventManager()
class _ChatbotServiceHub:
"""Lightweight hub exposing only services required by the chatbot feature."""
user = None
mandateId = None
featureInstanceId = None
workflow = None
interfaceDbComponent = None
chat = None
ai = None
billing = None
streaming = None
featureCode = "chatbot"
allowedProviders = None
def getUiObjects() -> List[Dict[str, Any]]:
"""Return UI objects for RBAC catalog registration."""
return UI_OBJECTS
def getResourceObjects() -> List[Dict[str, Any]]:
"""Return resource objects for RBAC catalog registration."""
return RESOURCE_OBJECTS
def getTemplateRoles() -> List[Dict[str, Any]]:
"""Return template roles for this feature."""
return TEMPLATE_ROLES
def registerFeature(catalogService) -> bool:
"""
Register this feature's RBAC objects in the catalog.
Args:
catalogService: The RBAC catalog service instance
Returns:
True if registration was successful
"""
try:
# Register UI objects
for uiObj in UI_OBJECTS:
catalogService.registerUiObject(
featureCode=FEATURE_CODE,
objectKey=uiObj["objectKey"],
label=uiObj["label"],
meta=uiObj.get("meta")
)
# Register Resource objects
for resObj in RESOURCE_OBJECTS:
catalogService.registerResourceObject(
featureCode=FEATURE_CODE,
objectKey=resObj["objectKey"],
label=resObj["label"],
meta=resObj.get("meta")
)
# Sync template roles to database
_syncTemplateRolesToDb()
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects")
return True
except Exception as e:
logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
return False
def getChatbotServices(
user,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
workflow=None,
) -> "_ChatbotServiceHub":
"""
Get lightweight service hub for chatbot (chat, ai, streaming) without loading
the full legacy Services hub. Avoids ~90 ms from _loadFeatureInterfaces +
_loadFeatureServices; only instantiates required services.
Uses interfaceFeatureChatbot (ChatObjects) for interfaceDbChat to avoid
duplicate DB init - chatProcess reuses hub.interfaceDbChat.
"""
from modules.services import PublicService
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
from modules.services.serviceChat.mainServiceChat import ChatService
from modules.services.serviceAi.mainServiceAi import AiService
from modules.services.serviceStreaming.mainServiceStreaming import StreamingService
hub = _ChatbotServiceHub()
hub.user = user
hub.mandateId = mandateId
hub.featureInstanceId = featureInstanceId
hub.workflow = workflow
hub.featureCode = "chatbot"
hub.allowedProviders = None
hub.interfaceDbApp = getAppInterface(user, mandateId=mandateId)
# interfaceDbComponent: lazy-loaded on first access (saves ~100300 ms when no file uploads)
hub._interfaceDbComponent_val = None
# Use ChatObjects (interfaceFeatureChatbot) - same as chatProcess, avoids extra interfaceDbChat init
hub.interfaceDbChat = getChatbotInterface(
user, mandateId=mandateId, featureInstanceId=featureInstanceId
)
hub.chat = PublicService(ChatService(hub))
hub.ai = PublicService(AiService(hub), functionsOnly=False)
hub.streaming = PublicService(StreamingService(hub))
# Resolve billing from service center (required for _preflight_billing_check and billing callback)
try:
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
_workflow = workflow or type("_Placeholder", (), {"featureCode": FEATURE_CODE})()
ctx = ServiceCenterContext(
user=user,
mandate_id=mandateId,
feature_instance_id=featureInstanceId,
workflow=_workflow,
)
hub.billing = getService("billing", ctx, legacy_hub=None)
except Exception as e:
logger.warning(f"Could not resolve billing service for chatbot: {e}")
hub.billing = None
return hub
class _ChatbotServiceHub:
"""Lightweight hub with chat, ai, streaming for chatbot; avoids full Services init."""
user = None
mandateId = None
featureInstanceId = None
workflow = None
interfaceDbApp = None
_interfaceDbComponent_val = None
interfaceDbChat = None
@property
def interfaceDbComponent(self):
"""Lazy-load interfaceDbComponent on first access (saves ~100300 ms when no files)."""
if self._interfaceDbComponent_val is None:
from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface
self._interfaceDbComponent_val = getComponentInterface(
self.user, mandateId=self.mandateId, featureInstanceId=self.featureInstanceId
)
return self._interfaceDbComponent_val
chat = None
ai = None
billing = None
streaming = None
featureCode = "chatbot"
allowedProviders = None
def _syncTemplateRolesToDb() -> int:
"""
Sync template roles and their AccessRules to the database.
Creates global template roles (mandateId=None) if they don't exist.
Returns:
Number of roles created/updated
"""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
rootInterface = getRootInterface()
# Get existing template roles for this feature (Pydantic models)
existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE)
# Filter to template roles (mandateId is None)
templateRoles = [r for r in existingRoles if r.mandateId is None]
existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles}
createdCount = 0
for roleTemplate in TEMPLATE_ROLES:
roleLabel = roleTemplate["roleLabel"]
if roleLabel in existingRoleLabels:
roleId = existingRoleLabels[roleLabel]
# Ensure AccessRules exist for this role
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
else:
# Create new template role
newRole = Role(
roleLabel=roleLabel,
description=roleTemplate.get("description", {}),
featureCode=FEATURE_CODE,
mandateId=None, # Global template
featureInstanceId=None,
isSystemRole=False
)
createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump())
roleId = createdRole.get("id")
# Create AccessRules for this role
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
logger.info(f"Created template role '{roleLabel}' with ID {roleId}")
createdCount += 1
if createdCount > 0:
logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles")
return createdCount
except Exception as e:
logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}")
return 0
def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
"""
Ensure AccessRules exist for a role based on templates.
Args:
rootInterface: Root interface instance
roleId: Role ID
ruleTemplates: List of rule templates
Returns:
Number of rules created
"""
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
# Get existing rules for this role (Pydantic models)
existingRules = rootInterface.getAccessRulesByRole(roleId)
# Create a set of existing rule signatures to avoid duplicates
# IMPORTANT: Use .value for enum comparison, not str() which gives "AccessRuleContext.DATA" in Python 3.11+
existingSignatures = set()
for rule in existingRules:
sig = (rule.context.value if rule.context else None, rule.item)
existingSignatures.add(sig)
createdCount = 0
for template in ruleTemplates:
context = template.get("context", "UI")
item = template.get("item")
sig = (context, item)
if sig in existingSignatures:
continue
# Map context string to enum
if context == "UI":
contextEnum = AccessRuleContext.UI
elif context == "DATA":
contextEnum = AccessRuleContext.DATA
elif context == "RESOURCE":
contextEnum = AccessRuleContext.RESOURCE
else:
contextEnum = context
newRule = AccessRule(
roleId=roleId,
context=contextEnum,
item=item,
view=template.get("view", False),
read=template.get("read"),
create=template.get("create"),
update=template.get("update"),
delete=template.get("delete"),
)
rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
createdCount += 1
if createdCount > 0:
logger.debug(f"Created {createdCount} AccessRules for role {roleId}")
return createdCount