diff --git a/modules/features/chatbotV2/__init__.py b/modules/features/chatbotV2/__init__.py deleted file mode 100644 index 70c9f29f..00000000 --- a/modules/features/chatbotV2/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -"""Chatbot V2 feature - context-aware chat with file upload and extraction.""" diff --git a/modules/features/chatbotV2/bridges/__init__.py b/modules/features/chatbotV2/bridges/__init__.py deleted file mode 100644 index 5d6343cb..00000000 --- a/modules/features/chatbotV2/bridges/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -"""Chatbot V2 bridges - AI and memory.""" - -from modules.features.chatbot.bridges.ai import AICenterChatModel, clear_workflow_allowed_providers -from .memory import ChatbotV2Checkpointer - -__all__ = ["AICenterChatModel", "clear_workflow_allowed_providers", "ChatbotV2Checkpointer"] diff --git a/modules/features/chatbotV2/bridges/memory.py b/modules/features/chatbotV2/bridges/memory.py deleted file mode 100644 index ce6c80ed..00000000 --- a/modules/features/chatbotV2/bridges/memory.py +++ /dev/null @@ -1,187 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Chatbot V2 checkpointer - maps LangGraph state to ChatbotV2 message storage. -""" - -import logging -import uuid -from typing import Any, Dict, List, Optional, Tuple, NamedTuple - -from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage -from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointMetadata - - -class CheckpointTuple(NamedTuple): - """Tuple containing config, checkpoint, metadata, parent_config, and pending_writes.""" - config: Dict[str, Any] - checkpoint: Checkpoint - metadata: CheckpointMetadata - parent_config: Optional[Dict[str, Any]] = None - pending_writes: Optional[List[Tuple[str, Any]]] = None - -from modules.features.chatbotV2.interfaceFeatureChatbotV2 import getInterface as getChatbotV2Interface -from modules.features.chatbotV2.datamodelFeatureChatbotV2 import ChatbotV2Message -from modules.datamodels.datamodelUam import User -from modules.shared.timeUtils import getUtcTimestamp - -logger = logging.getLogger(__name__) - - -def _sanitize_llm_response(text: str) -> str: - """Strip chat template tokens and trailing junk.""" - if not text or not isinstance(text, str): - return text or "" - for sentinel in ("<|im_start|>", "<|im_end|>", "<|endoftext|>", "<|user|>", "<|assistant|>"): - if sentinel in text: - text = text.split(sentinel)[0] - return text.strip() - - -class ChatbotV2Checkpointer(BaseCheckpointSaver): - """Checkpointer that stores messages via ChatbotV2 interface.""" - - def __init__( - self, - user: User, - workflow_id: str, - mandateId: Optional[str] = None, - featureInstanceId: Optional[str] = None - ): - self.user = user - self.workflow_id = workflow_id - self.interface = getChatbotV2Interface( - user, - mandateId=mandateId, - featureInstanceId=featureInstanceId - ) - - def _to_db_message( - self, - msg: BaseMessage, - sequence_nr: int, - round_number: int - ) -> Dict[str, Any]: - role = "user" - content = "" - if isinstance(msg, HumanMessage): - role = "user" - content = msg.content if isinstance(msg.content, str) else str(msg.content or "") - elif isinstance(msg, AIMessage): - role = "assistant" - content = msg.content if isinstance(msg.content, str) else str(msg.content or "") - content = _sanitize_llm_response(content) - elif isinstance(msg, SystemMessage): - role = "system" - content = msg.content if isinstance(msg.content, str) else str(msg.content or "") - return { - "id": str(uuid.uuid4()), - "conversationId": self.workflow_id, - "message": content, - "role": role, - "status": "step" if sequence_nr > 1 else "first", - "sequenceNr": sequence_nr, - "publishedAt": getUtcTimestamp(), - "roundNumber": round_number - } - - def _to_langchain(self, messages: List[ChatbotV2Message]) -> List[BaseMessage]: - result = [] - for m in messages: - if m.role == "user": - result.append(HumanMessage(content=m.message or "")) - elif m.role == "assistant": - result.append(AIMessage(content=m.message or "")) - elif m.role == "system": - result.append(SystemMessage(content=m.message or "")) - return result - - def put( - self, - config: Dict[str, Any], - checkpoint: Checkpoint, - metadata: CheckpointMetadata, - new_versions: Dict[str, int], - ) -> None: - thread_id = config.get("configurable", {}).get("thread_id", self.workflow_id) - conv = self.interface.getConversation(thread_id) - if not conv: - logger.warning(f"Conversation {thread_id} not found") - return - round_number = conv.currentRound or 1 - state = checkpoint.get("channel_values", {}) - messages = state.get("messages", []) - if not messages: - return - existing = self.interface.getMessages(thread_id) - existing_set = {(m.role, m.message) for m in existing} - existing_count = len(existing) - for i, msg in enumerate(messages): - if not isinstance(msg, (HumanMessage, AIMessage)): - continue - role = "user" if isinstance(msg, HumanMessage) else "assistant" - content = msg.content if isinstance(msg.content, str) else str(msg.content or "") - if isinstance(msg, AIMessage): - content = _sanitize_llm_response(content) - if not content or not content.strip(): - continue - if (role, content) in existing_set: - continue - existing_set.add((role, content)) - existing_count += 1 - db_msg = self._to_db_message(msg, existing_count, round_number) - self.interface.createMessage(db_msg) - self.interface.updateConversation(thread_id, {"lastActivity": getUtcTimestamp()}) - - def get(self, config: Dict[str, Any]) -> Optional[Checkpoint]: - thread_id = config.get("configurable", {}).get("thread_id", self.workflow_id) - conv = self.interface.getConversation(thread_id) - if not conv: - return None - messages = self.interface.getMessages(thread_id) - lc_messages = self._to_langchain(messages) - return { - "id": str(uuid.uuid4()), - "v": 1, - "ts": getUtcTimestamp(), - "channel_values": {"messages": lc_messages}, - "channel_versions": {}, - "versions_seen": {} - } - - # Async methods required for LangGraph ainvoke/astream - async def aget_tuple( - self, - config: Dict[str, Any], - ) -> Optional[CheckpointTuple]: - """Async version of get that returns tuple of (config, checkpoint, metadata).""" - checkpoint = self.get(config) - if checkpoint: - metadata: CheckpointMetadata = {"step": 0} - return CheckpointTuple( - config=config, - checkpoint=checkpoint, - metadata=metadata, - parent_config=None, - pending_writes=None, - ) - return None - - async def aput( - self, - config: Dict[str, Any], - checkpoint: Checkpoint, - metadata: CheckpointMetadata, - new_versions: Dict[str, int], - ) -> None: - """Async version of put.""" - self.put(config, checkpoint, metadata, new_versions) - - async def aput_writes( - self, - config: Dict[str, Any], - writes: List[Tuple[str, Any]], - task_id: str, - ) -> None: - """Async version of put_writes. No-op - writes are handled through aput().""" - pass diff --git a/modules/features/chatbotV2/chatbotV2.py b/modules/features/chatbotV2/chatbotV2.py deleted file mode 100644 index 75ce6aa9..00000000 --- a/modules/features/chatbotV2/chatbotV2.py +++ /dev/null @@ -1,210 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Chatbot V2 domain logic - simple chat LangGraph with context injection. -Uses chunk-based retrieval with retry: when AI says "nicht enthalten", -tries the next chunk batch until content is found or all chunks searched. -""" - -import asyncio -import logging -import re -from typing import Annotated, Optional, TYPE_CHECKING, List, Dict, Any - -from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage -from langgraph.graph.message import add_messages -from langgraph.graph import StateGraph, START, END -from langgraph.graph.state import CompiledStateGraph -from pydantic import BaseModel - -from modules.features.chatbotV2.bridges import AICenterChatModel, ChatbotV2Checkpointer -from modules.features.chatbotV2.contextChunkRetrieval import ( - chunk_sections, - chunk_text_blocks, - get_ordered_chunks_for_question, - get_chunk_batch, - response_indicates_not_found, - DEFAULT_CHUNK_SIZE, - DEFAULT_CHUNK_OVERLAP, -) - -if TYPE_CHECKING: - from modules.features.chatbotV2.config import ChatbotV2Config - -logger = logging.getLogger(__name__) - - -class ChatState(BaseModel): - """State for Chatbot V2 chat session.""" - messages: Annotated[list[BaseMessage], add_messages] - # Optional context for chunk retrieval (passed at invoke, not persisted) - chatbotv2_context: Optional[Dict[str, Any]] = None - - -# Default max context chars (~20k tokens) - fits GPT 25k limit with room for prompt + response -DEFAULT_MAX_CONTEXT_CHARS = 60_000 - - -def _build_system_prompt_from_chunks( - base_prompt: str, - chunks: List[Dict[str, Any]] -) -> str: - """Build system prompt from a list of chunks.""" - if not chunks: - return base_prompt - header = "\n\n--- DOCUMENT CONTEXT (use this to answer user questions) ---\n" - parts = [base_prompt, header] - current_file = None - for chunk in chunks: - fn = chunk.get("fileName", "document") - if fn != current_file: - parts.append(f"\n### {fn}\n") - current_file = fn - parts.append(chunk.get("text", "")) - return "\n".join(parts) - - -def build_context_system_prompt( - base_prompt: str, - extracted_context: dict, - user_question: str, - max_context_chars: Optional[int] = None, - chunk_size: Optional[int] = None, - chunk_overlap: Optional[int] = None, -) -> str: - """ - Build system prompt with first chunk batch (for single-call use). - For retry loop, use get_ordered_chunks_for_question + get_chunk_batch + _build_system_prompt_from_chunks. - """ - chunks = _get_all_chunks(extracted_context, chunk_size, chunk_overlap) - if not chunks: - return base_prompt - ordered = get_ordered_chunks_for_question(chunks, user_question or "") - selected = get_chunk_batch(ordered, 0, max_context_chars or DEFAULT_MAX_CONTEXT_CHARS) - return _build_system_prompt_from_chunks(base_prompt, selected) - - -def _get_all_chunks( - extracted_context: dict, - chunk_size: Optional[int], - chunk_overlap: Optional[int], -) -> List[Dict[str, Any]]: - """Get all chunks from extracted context.""" - sections = extracted_context.get("sections", []) - text_blocks = extracted_context.get("textBlocks", []) - if not sections and not text_blocks: - return [] - cs = chunk_size if chunk_size and chunk_size > 0 else DEFAULT_CHUNK_SIZE - co = chunk_overlap if chunk_overlap is not None and chunk_overlap >= 0 else DEFAULT_CHUNK_OVERLAP - if sections: - return chunk_sections(sections, chunk_size=cs, chunk_overlap=co) - return chunk_text_blocks(text_blocks, chunk_size=cs, chunk_overlap=co) - - -def create_chat_graph( - model: AICenterChatModel, - memory: ChatbotV2Checkpointer, -) -> CompiledStateGraph: - """ - Create chat graph with retry loop: when AI says content not found, - tries next chunk batch until found or exhausted. - Context params passed via state.chatbotv2_context at invoke time. - """ - - async def chat_node(state: ChatState) -> dict: - # State can be dict (LangGraph) or Pydantic model - state_dict = state if isinstance(state, dict) else (state.model_dump() if hasattr(state, "model_dump") else {}) - msgs = state_dict.get("messages", []) - if not msgs: - return {} - - ctx = state_dict.get("chatbotv2_context") or {} - ctx_dict = ctx.get("ctx_dict", {}) - user_question = ctx.get("user_question", "") - base_prompt = ctx.get("base_prompt", "Answer based on the provided context.") - max_chars = ctx.get("max_context_chars") or DEFAULT_MAX_CONTEXT_CHARS - chunk_size = ctx.get("chunk_size") or DEFAULT_CHUNK_SIZE - chunk_overlap = ctx.get("chunk_overlap") - - if max_chars <= 0: - max_chars = DEFAULT_MAX_CONTEXT_CHARS - if chunk_overlap is None or chunk_overlap < 0: - chunk_overlap = DEFAULT_CHUNK_OVERLAP - - user_msgs = [m for m in msgs if not isinstance(m, SystemMessage)] - if not user_msgs: - return {} - - # Get chunks - use DOCUMENT ORDER for retry (batch 0 = start, batch 1 = next part, etc.) - chunks = _get_all_chunks(ctx_dict, chunk_size, chunk_overlap) - if not chunks: - logger.warning("No chunks from ctx_dict - sections=%s, textBlocks=%s", - len(ctx_dict.get("sections", [])), len(ctx_dict.get("textBlocks", []))) - # Always use document order (chunkIndex) for systematic search through entire document - ordered = sorted(chunks, key=lambda c: c.get("chunkIndex", 0)) - batch_index = 0 - last_response = None - - logger.info("Chunk retrieval: %d chunks total, max_chars=%d, will try batches until found or exhausted", - len(ordered), max_chars) - - while True: - batch = get_chunk_batch(ordered, batch_index, max_chars) if ordered else [] - if not batch: - # No more chunks - return last response or final message - if last_response: - return {"messages": [last_response]} - return {"messages": [AIMessage( - content="Ich habe das gesamte Dokument durchsucht, konnte aber keine " - "passende Information zu Ihrer Frage finden. Bitte formulieren Sie die Frage " - "ggf. anders oder prüfen Sie, ob das Dokument die gewünschten Angaben enthält." - )]} - - system_prompt = _build_system_prompt_from_chunks(base_prompt, batch) - window = [SystemMessage(content=system_prompt)] + user_msgs - - response = None - for attempt in range(3): # Max 3 attempts (initial + 2 retries on rate limit) - try: - response = await model.ainvoke(window) - break - except Exception as exc: - err_str = str(exc).lower() - if ("429" in err_str or "rate limit" in err_str) and attempt < 2: - wait_secs = 6 - match = re.search(r"try again in ([\d.]+)s", err_str, re.IGNORECASE) - if match: - wait_secs = max(6, int(float(match.group(1))) + 1) - logger.warning("Rate limit hit on chunk batch %d, waiting %ds before retry (attempt %d/3)", - batch_index, wait_secs, attempt + 1) - await asyncio.sleep(wait_secs) - else: - if "No suitable model found" in str(exc): - return {"messages": [AIMessage( - content="Es tut mir leid, derzeit steht kein passendes KI-Modell zur Verfügung. " - "Bitte versuchen Sie es später erneut." - )]} - raise - - if response is None: - return {"messages": [AIMessage( - content="Ein Fehler ist aufgetreten. Bitte versuchen Sie es später erneut." - )]} - - content = response.content if hasattr(response, "content") else str(response) - if response_indicates_not_found(content): - logger.info("Chunk batch %d: AI said not found (%.0f chars), trying next batch", - batch_index, len(content)) - batch_index += 1 - last_response = response - await asyncio.sleep(5) # Pause before next batch to avoid rate limits - continue - - logger.info("Chunk batch %d: Found answer (%.0f chars)", batch_index, len(content)) - return {"messages": [response]} - - workflow = StateGraph(ChatState) - workflow.add_node("chat", chat_node) - workflow.add_edge(START, "chat") - workflow.add_edge("chat", END) - return workflow.compile(checkpointer=memory) diff --git a/modules/features/chatbotV2/config.py b/modules/features/chatbotV2/config.py deleted file mode 100644 index e63ba0ae..00000000 --- a/modules/features/chatbotV2/config.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Configuration system for Chatbot V2 instances. -Loads configuration from FeatureInstance.config JSONB field. -""" - -import logging -from dataclasses import dataclass -from typing import Optional, Dict, Any, List, TYPE_CHECKING - -if TYPE_CHECKING: - from modules.datamodels.datamodelFeatures import FeatureInstance - -logger = logging.getLogger(__name__) - -_config_cache: Dict[str, 'ChatbotV2Config'] = {} - -DEFAULT_SYSTEM_PROMPT = ( - "You are a helpful assistant. Answer questions based on the provided context documents. " - "When the user asks about the documents, use the extracted content to provide accurate answers. " - "If the context does not contain relevant information, say so." -) - - -@dataclass -class ModelConfig: - """Model configuration for Chatbot V2.""" - operationType: str = "DATA_ANALYSE" - processingMode: str = "BASIC" - allowedProviders: List[str] = None - - def __post_init__(self): - if self.allowedProviders is None: - self.allowedProviders = [] - - -def _parse_int(val: Optional[Any], default: Optional[int] = None) -> Optional[int]: - """Parse int from config value.""" - if val is None: - return default - if isinstance(val, int): - return val - try: - return int(val) - except (TypeError, ValueError): - return default - - -@dataclass -class ChatbotV2Config: - """Configuration for a Chatbot V2 instance.""" - - id: str - name: str - systemPrompt: str - model: ModelConfig - maxContextChars: Optional[int] = None # Max document chars in system prompt (~60k ≈ 20k tokens). None = default 60k. - chunkSize: Optional[int] = None # Chunk size in chars (~15k). None = default. - chunkOverlap: Optional[int] = None # Overlap between chunks in chars (~500). None = default. - - @classmethod - def from_dict(cls, data: Dict[str, Any], config_id: str = "default") -> 'ChatbotV2Config': - """Create ChatbotV2Config from dictionary.""" - system_prompt = data.get("systemPrompt") or DEFAULT_SYSTEM_PROMPT - - model_data = data.get("model", {}) - allowed_providers = model_data.get("allowedProviders") or data.get("allowedProviders", []) - model_config = ModelConfig( - operationType=model_data.get("operationType", "DATA_ANALYSE"), - processingMode=model_data.get("processingMode", "BASIC"), - allowedProviders=allowed_providers if isinstance(allowed_providers, list) else [] - ) - - return cls( - id=data.get("id", config_id), - name=data.get("name", "Chatbot V2"), - systemPrompt=system_prompt, - model=model_config, - maxContextChars=_parse_int(data.get("maxContextChars")), - chunkSize=_parse_int(data.get("chunkSize")), - chunkOverlap=_parse_int(data.get("chunkOverlap")) - ) - - -def load_chatbotv2_config_from_instance(instance: 'FeatureInstance') -> ChatbotV2Config: - """Load Chatbot V2 configuration from a FeatureInstance's config field.""" - instance_id = instance.id - - cache_key = f"instance_{instance_id}" - if cache_key in _config_cache: - return _config_cache[cache_key] - - config_data = instance.config or {} - config = ChatbotV2Config.from_dict(config_data, config_id=instance_id) - _config_cache[cache_key] = config - logger.info(f"Loaded chatbotv2 config from instance {instance_id}") - return config diff --git a/modules/features/chatbotV2/contextChunkRetrieval.py b/modules/features/chatbotV2/contextChunkRetrieval.py deleted file mode 100644 index 7e77c0f7..00000000 --- a/modules/features/chatbotV2/contextChunkRetrieval.py +++ /dev/null @@ -1,272 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Chunk-based context retrieval for Chatbot V2. -Splits documents into chunks and selects relevant chunks per user question. -If no relevant chunk is found, falls back to next chunks in document order - no context is lost. -""" - -import re -import logging -from typing import List, Dict, Any, Optional - -logger = logging.getLogger(__name__) - -# Default chunk size (~5k tokens each), overlap for context continuity -DEFAULT_CHUNK_SIZE = 15_000 -DEFAULT_CHUNK_OVERLAP = 500 - -# Stopwords for relevance scoring (DE/EN/FR - minimal set) -STOPWORDS = { - "der", "die", "das", "den", "dem", "des", "ein", "eine", "einer", "eines", - "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", - "of", "with", "by", "from", "is", "are", "was", "were", "be", "been", - "le", "la", "les", "un", "une", "des", "du", "de", "et", "ou", - "was", "wer", "wie", "wo", "wann", "warum", "welche", "welcher", -} - - -def _extract_keywords(text: str) -> set: - """Extract significant words for relevance scoring.""" - text_lower = text.lower() - words = re.findall(r"\b[a-zàâäéèêëïîôùûüÿæœçß]{2,}\b", text_lower) - return {w for w in words if w not in STOPWORDS and len(w) > 1} - - -def chunk_sections( - sections: List[Dict[str, Any]], - chunk_size: int = DEFAULT_CHUNK_SIZE, - chunk_overlap: int = DEFAULT_CHUNK_OVERLAP -) -> List[Dict[str, Any]]: - """ - Split sections into overlapping chunks. - Each chunk has: chunkIndex, text, fileName, fileId, sectionIndex. - """ - chunks = [] - chunk_index = 0 - - for sec_idx, section in enumerate(sections): - text = section.get("text", "") - file_name = section.get("fileName", "document") - file_id = section.get("fileId", "") - - if not text: - continue - - start = 0 - while start < len(text): - end = min(start + chunk_size, len(text)) - chunk_text = text[start:end] - chunks.append({ - "chunkIndex": chunk_index, - "text": chunk_text, - "fileName": file_name, - "fileId": file_id, - "sectionIndex": sec_idx, - "startChar": start, - "endChar": end, - }) - chunk_index += 1 - start = end - chunk_overlap if end < len(text) else len(text) - - return chunks - - -def chunk_text_blocks( - text_blocks: List[Dict[str, Any]], - chunk_size: int = DEFAULT_CHUNK_SIZE, - chunk_overlap: int = DEFAULT_CHUNK_OVERLAP -) -> List[Dict[str, Any]]: - """Split textBlocks (blocks per file) into chunks.""" - chunks = [] - chunk_index = 0 - - for doc in text_blocks: - blocks = doc.get("blocks", []) - file_name = doc.get("fileName", "document") - file_id = doc.get("fileId", "") - - text_parts = [] - for b in blocks: - text_parts.append(b.get("text", "")) - - full_text = "\n".join(text_parts) - if not full_text: - continue - - start = 0 - while start < len(full_text): - end = min(start + chunk_size, len(full_text)) - chunk_text = full_text[start:end] - chunks.append({ - "chunkIndex": chunk_index, - "text": chunk_text, - "fileName": file_name, - "fileId": file_id, - "startChar": start, - "endChar": end, - }) - chunk_index += 1 - start = end - chunk_overlap if end < len(full_text) else len(full_text) - - return chunks - - -def score_chunk_relevance(chunk_text: str, question: str) -> float: - """ - Score how relevant a chunk is to the user question. - Uses keyword overlap with simple IDF-like weighting. - Returns 0 if no overlap. - """ - if not question or not chunk_text: - return 0.0 - - q_words = _extract_keywords(question) - if not q_words: - return 0.0 - - chunk_lower = chunk_text.lower() - score = 0.0 - for w in q_words: - count = chunk_lower.count(w) - if count > 0: - score += 1.0 + 0.5 * min(count - 1, 3) - - return score - - -def get_ordered_chunks_for_question( - chunks: List[Dict[str, Any]], - question: str -) -> List[Dict[str, Any]]: - """ - Return chunks ordered by relevance (or doc order if no match). - Does NOT limit by max_context_chars - used for iterative batch retrieval. - """ - if not chunks: - return [] - - scored = [] - for c in chunks: - score = score_chunk_relevance(c.get("text", ""), question) - scored.append((score, c)) - - max_score = max(s for s, _ in scored) - if max_score > 0: - scored.sort(key=lambda x: (-x[0], x[1].get("chunkIndex", 0))) - else: - scored.sort(key=lambda x: x[1].get("chunkIndex", 0)) - logger.debug("No chunk matched question keywords - using chunks in document order") - - return [c for _, c in scored] - - -def get_chunk_batch( - ordered_chunks: List[Dict[str, Any]], - batch_index: int, - max_context_chars: int -) -> List[Dict[str, Any]]: - """ - Get the Nth batch of chunks that fits in max_context_chars. - Batch 0 = first chunk(s) that fit, batch 1 = next chunk(s), etc. - Returns [] when batch_index is beyond available chunks. - """ - if not ordered_chunks or batch_index < 0: - return [] - - used_chars = 0 - chunks_in_batch = 0 - start_idx = 0 - - # Find start index for this batch (skip chunks from previous batches) - for _ in range(batch_index): - batch_chars = 0 - i = start_idx - while i < len(ordered_chunks): - text = ordered_chunks[i].get("text", "") - if batch_chars + len(text) <= max_context_chars: - batch_chars += len(text) - i += 1 - else: - if batch_chars < max_context_chars and len(text) > 0: - batch_chars += min(len(text), max_context_chars - batch_chars) - i += 1 - break - start_idx = i - if start_idx >= len(ordered_chunks): - return [] - - # Collect chunks for this batch - selected = [] - for i in range(start_idx, len(ordered_chunks)): - chunk = ordered_chunks[i] - text = chunk.get("text", "") - if not text: - continue - if used_chars + len(text) <= max_context_chars: - selected.append(chunk) - used_chars += len(text) - else: - if used_chars < max_context_chars: - remaining = max_context_chars - used_chars - selected.append({**chunk, "text": text[:remaining]}) - break - - return selected - - -def select_chunks_for_question( - chunks: List[Dict[str, Any]], - question: str, - max_context_chars: int -) -> List[Dict[str, Any]]: - """ - Select first batch of chunks (for backward compatibility). - For iterative retry, use get_ordered_chunks_for_question + get_chunk_batch. - """ - ordered = get_ordered_chunks_for_question(chunks, question) - return get_chunk_batch(ordered, 0, max_context_chars) - - -# Phrases that indicate the AI found no relevant content (DE/EN/FR) -NOT_FOUND_PHRASES = [ - "nicht enthalten", - "nicht im kontext", - "nicht im dokument", - "nicht im bereitgestellten", - "nicht auffindbar", - "keine information", - "keine angaben", - "nicht gefunden", - "nicht verfügbar", - "kein hinweis", - "enthalten nicht", - "artikel nicht enthalten", - "nicht im vorliegenden", - "bereitgestellten kontext enthält nicht", - "im kontext nicht", - "not contained", - "not found", - "no information", - "not in the context", - "not in the document", - "pas dans le contexte", - "non trouvé", -] - - -def response_indicates_not_found(response_content: str) -> bool: - """ - Check if the AI response indicates that the requested content was not found. - When True, we should try the next chunk batch. - """ - if not response_content or not isinstance(response_content, str): - return False - text_lower = response_content.lower().strip() - # Only treat short/medium responses as "not found" - long answers may mention it incidentally - if len(text_lower) > 600: - return False - for phrase in NOT_FOUND_PHRASES: - if phrase in text_lower: - return True - return False diff --git a/modules/features/chatbotV2/contextExtractionLangGraph.py b/modules/features/chatbotV2/contextExtractionLangGraph.py deleted file mode 100644 index c34bf487..00000000 --- a/modules/features/chatbotV2/contextExtractionLangGraph.py +++ /dev/null @@ -1,160 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -LangGraph-based pipeline for extracting context from uploaded documents. -Creates chat context from PDF and text files (no domain-specific goals). -""" - -import logging -from typing import TypedDict, List, Dict, Any, Optional -from langgraph.graph import StateGraph, START, END - -logger = logging.getLogger(__name__) - - -class ContextExtractionState(TypedDict): - """State for context extraction pipeline.""" - # Input: list of {fileId, bytes, mimeType, fileName} - files: List[Dict[str, Any]] - # Extracted text blocks per file: [{fileId, fileName, blocks: [{page, text, block_id}]}] - textBlocks: List[Dict[str, Any]] - # Structured sections for chat context (simplified articles/sections) - sections: List[Dict[str, Any]] - # Optional summaries (empty for now - no LLM in extraction) - summaries: List[Dict[str, Any]] - errors: List[str] - - -def extract_text_node(state: ContextExtractionState) -> ContextExtractionState: - """Extract text from each file. PDF via BZOPdfExtractor, TXT as plain text.""" - text_blocks = [] - errors = list(state.get("errors", [])) - - for idx, file_info in enumerate(state.get("files", [])): - file_id = file_info.get("fileId", f"file_{idx}") - file_bytes = file_info.get("bytes") - mime_type = (file_info.get("mimeType") or "").lower() - file_name = file_info.get("fileName", f"document_{idx}") - - if not file_bytes: - errors.append(f"No content for file {file_name} ({file_id})") - continue - - blocks = [] - try: - if "pdf" in mime_type or file_name.lower().endswith(".pdf"): - from modules.features.realEstate.bzoPdfExtractor import BZOPdfExtractor - extractor = BZOPdfExtractor() - tb_list = extractor.extract_text_blocks(file_bytes, file_id) - for tb in tb_list: - blocks.append({ - "page": tb.page, - "text": tb.text, - "block_id": tb.block_id, - "bbox": tb.bbox - }) - logger.info(f"Extracted {len(blocks)} blocks from PDF {file_name}") - elif "text" in mime_type or file_name.lower().endswith(".txt"): - text = file_bytes.decode("utf-8", errors="replace") - lines = text.split("\n") - for i, line in enumerate(lines): - if line.strip(): - blocks.append({ - "page": 1, - "text": line.strip(), - "block_id": f"{file_id}_line_{i}", - "bbox": None - }) - logger.info(f"Extracted {len(blocks)} lines from text file {file_name}") - else: - errors.append(f"Unsupported format for {file_name}: {mime_type}") - except Exception as e: - logger.error(f"Error extracting {file_name}: {e}", exc_info=True) - errors.append(f"Extraction failed for {file_name}: {str(e)}") - - if blocks: - text_blocks.append({ - "fileId": file_id, - "fileName": file_name, - "blocks": blocks - }) - - return { - **state, - "textBlocks": text_blocks, - "errors": errors - } - - -def structure_content_node(state: ContextExtractionState) -> ContextExtractionState: - """Assemble text blocks into sections for chat context.""" - sections = [] - for doc in state.get("textBlocks", []): - file_name = doc.get("fileName", "document") - blocks = doc.get("blocks", []) - if not blocks: - continue - # Build section: combine blocks with page awareness - text_parts = [] - current_page = 0 - for b in blocks: - page = b.get("page", 1) - if page != current_page and text_parts: - text_parts.append("\n\n") - text_parts.append(b.get("text", "")) - current_page = page - full_text = "".join(text_parts).strip() - if full_text: - sections.append({ - "fileId": doc.get("fileId"), - "fileName": file_name, - "text": full_text, - "blockCount": len(blocks) - }) - return { - **state, - "sections": sections - } - - -def create_context_extraction_graph(): - """Create and compile the context extraction LangGraph.""" - workflow = StateGraph(ContextExtractionState) - workflow.add_node("extract_text", extract_text_node) - workflow.add_node("structure_content", structure_content_node) - workflow.add_edge(START, "extract_text") - workflow.add_edge("extract_text", "structure_content") - workflow.add_edge("structure_content", END) - return workflow.compile() - - -def run_extraction(files: List[Dict[str, Any]]) -> Dict[str, Any]: - """ - Run the context extraction pipeline on uploaded files. - - Args: - files: List of {fileId, bytes, mimeType, fileName} - - Returns: - { - "textBlocks": [...], - "sections": [...], - "summaries": [], - "errors": [...] - } - """ - state: ContextExtractionState = { - "files": files, - "textBlocks": [], - "sections": [], - "summaries": [], - "errors": [] - } - graph = create_context_extraction_graph() - final_state = graph.invoke(state) - return { - "textBlocks": final_state.get("textBlocks", []), - "sections": final_state.get("sections", []), - "summaries": final_state.get("summaries", []), - "errors": final_state.get("errors", []) - } diff --git a/modules/features/chatbotV2/datamodelFeatureChatbotV2.py b/modules/features/chatbotV2/datamodelFeatureChatbotV2.py deleted file mode 100644 index 2fbd463e..00000000 --- a/modules/features/chatbotV2/datamodelFeatureChatbotV2.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Pydantic models for Chatbot V2 feature. -Stores context per chat: uploaded files, extracted content, and conversation history. -""" - -import uuid -from typing import Optional, List, Dict, Any -from pydantic import BaseModel, Field - -from modules.shared.timeUtils import getUtcTimestamp - - -class ChatbotV2ContextFile(BaseModel): - """Uploaded file metadata for context extraction.""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - conversationId: str = Field(description="Foreign key to conversation") - fileId: str = Field(description="Foreign key to file in central Files table") - fileName: str = Field(description="Original file name") - mimeType: str = Field(default="application/octet-stream", description="MIME type") - fileSize: int = Field(default=0, description="File size in bytes") - uploadOrder: int = Field(default=0, description="Order of upload (0-based)") - - -class ChatbotV2ExtractedContext(BaseModel): - """Extracted content per conversation - text blocks and summaries for chat context.""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - conversationId: str = Field(description="Foreign key to conversation") - textBlocks: List[Dict[str, Any]] = Field(default_factory=list, description="Extracted text blocks per file (page, text, block_id)") - summaries: List[Dict[str, Any]] = Field(default_factory=list, description="Optional per-document or per-section summaries") - extractionStatus: str = Field(default="pending", description="pending|running|completed|failed") - errors: List[str] = Field(default_factory=list, description="Extraction errors if any") - createdAt: float = Field(default_factory=getUtcTimestamp, description="When extraction completed") - - -class ChatbotV2Document(BaseModel): - """Documents attached to chatbot V2 messages.""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - messageId: str = Field(description="Foreign key to message") - fileId: str = Field(description="Foreign key to file") - fileName: str = Field(description="Name of the file") - fileSize: int = Field(default=0, description="Size of the file") - mimeType: str = Field(default="application/octet-stream", description="MIME type") - - -class ChatbotV2Message(BaseModel): - """Messages in Chatbot V2 conversations.""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - conversationId: str = Field(description="Foreign key to conversation") - parentMessageId: Optional[str] = Field(None, description="Parent message ID for threading") - message: Optional[str] = Field(None, description="Message content") - role: str = Field(description="Role: user or assistant") - status: str = Field(default="step", description="Status: first, step, last") - sequenceNr: int = Field(default=0, description="Sequence number of the message") - publishedAt: Optional[float] = Field(default=None, description="When the message was published (UTC timestamp)") - roundNumber: int = Field(default=1, description="Round number in conversation") - - -class ChatbotV2Log(BaseModel): - """Log entries for Chatbot V2 conversations.""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - conversationId: str = Field(description="Foreign key to conversation") - message: str = Field(description="Log message") - type: str = Field(default="info", description="Log type: info, warning, error") - timestamp: float = Field(default_factory=getUtcTimestamp, description="When the log entry was created") - status: Optional[str] = Field(None, description="Status of the log entry") - progress: Optional[float] = Field(None, description="Progress indicator (0.0 to 1.0)") - - -class ChatbotV2Conversation(BaseModel): - """Chatbot V2 conversation - stores context information per chat.""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - featureInstanceId: str = Field(description="Feature instance ID for per-instance isolation") - mandateId: Optional[str] = Field(None, description="Mandate ID for RBAC") - name: Optional[str] = Field(None, description="Name of the conversation") - status: str = Field(default="extracting", description="extracting|ready|running|stopped") - currentRound: int = Field(default=0, description="Current round number") - lastActivity: float = Field(default_factory=getUtcTimestamp, description="Timestamp of last activity") - startedAt: float = Field(default_factory=getUtcTimestamp, description="When the conversation started") - extractedContextId: Optional[str] = Field(None, description="FK to ChatbotV2ExtractedContext when ready") - maxSteps: int = Field(default=10, description="Maximum number of chat rounds") - # Hydrated from child tables (not stored in DB as columns) - contextFiles: List[ChatbotV2ContextFile] = Field(default_factory=list, description="Uploaded context files") - messages: List[ChatbotV2Message] = Field(default_factory=list, description="Conversation messages") diff --git a/modules/features/chatbotV2/interfaceFeatureChatbotV2.py b/modules/features/chatbotV2/interfaceFeatureChatbotV2.py deleted file mode 100644 index 9dbc12a9..00000000 --- a/modules/features/chatbotV2/interfaceFeatureChatbotV2.py +++ /dev/null @@ -1,440 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Interface to Chatbot V2 database. -Manages context-aware conversations with file upload and extraction. -""" - -import logging -import math -import uuid -from typing import Dict, Any, List, Optional, Union - -from modules.connectors.connectorDbPostgre import DatabaseConnector -from modules.shared.configuration import APP_CONFIG -from modules.interfaces.interfaceRbac import getRecordsetWithRBAC -from modules.security.rbac import RbacClass -from modules.datamodels.datamodelRbac import AccessRuleContext -from modules.datamodels.datamodelUam import User, AccessLevel -from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult -from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp - -from .datamodelFeatureChatbotV2 import ( - ChatbotV2Conversation, - ChatbotV2ContextFile, - ChatbotV2ExtractedContext, - ChatbotV2Message, - ChatbotV2Document, - ChatbotV2Log, -) - -logger = logging.getLogger(__name__) - -_chatbotV2Interfaces: Dict[str, "ChatbotV2Objects"] = {} -FEATURE_CODE = "chatbotv2" - - -def getInterface( - currentUser: User, - mandateId: Optional[str] = None, - featureInstanceId: Optional[str] = None -) -> "ChatbotV2Objects": - """Get or create a ChatbotV2Objects instance for the given user context.""" - if not currentUser or not currentUser.id: - raise ValueError("Valid user context required") - - key = f"{currentUser.id}_{mandateId or ''}_{featureInstanceId or ''}" - if key not in _chatbotV2Interfaces: - _chatbotV2Interfaces[key] = ChatbotV2Objects( - currentUser, - mandateId=mandateId, - featureInstanceId=featureInstanceId - ) - else: - _chatbotV2Interfaces[key].setUserContext( - currentUser, - mandateId=mandateId, - featureInstanceId=featureInstanceId - ) - return _chatbotV2Interfaces[key] - - -class ChatbotV2Objects: - """Interface to Chatbot V2 database.""" - - def __init__( - self, - currentUser: Optional[User] = None, - mandateId: Optional[str] = None, - featureInstanceId: Optional[str] = None - ): - self.currentUser = currentUser - self.userId = currentUser.id if currentUser else None - self.mandateId = mandateId - self.featureInstanceId = featureInstanceId - self.featureCode = FEATURE_CODE - self.rbac = None - self._initializeDatabase() - if currentUser: - self.setUserContext(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) - - def setUserContext( - self, - currentUser: User, - mandateId: Optional[str] = None, - featureInstanceId: Optional[str] = None - ): - """Set user context for the interface.""" - self.currentUser = currentUser - self.userId = currentUser.id - self.mandateId = mandateId - self.featureInstanceId = featureInstanceId - if not self.userId: - raise ValueError("Invalid user context: id is required") - from modules.security.rootAccess import getRootDbAppConnector - dbApp = getRootDbAppConnector() - self.rbac = RbacClass(self.db, dbApp=dbApp) - self.db.updateContext(self.userId) - - def __del__(self): - if hasattr(self, "db") and self.db is not None: - try: - self.db.close() - except Exception as e: - logger.error(f"Error closing database connection: {e}") - - def _initializeDatabase(self): - try: - dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data") - dbDatabase = "poweron_chatbotv2" - dbUser = APP_CONFIG.get("DB_USER") - dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") - dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) - self.db = DatabaseConnector( - dbHost=dbHost, - dbDatabase=dbDatabase, - dbUser=dbUser, - dbPassword=dbPassword, - dbPort=dbPort, - userId=self.userId - ) - logger.info("ChatbotV2 database initialized successfully") - except Exception as e: - logger.error(f"Failed to initialize ChatbotV2 database: {e}") - raise - - def checkRbacPermission( - self, - modelClass: type, - operation: str, - recordId: Optional[str] = None - ) -> bool: - """Check RBAC permission for an operation on a table.""" - if not self.rbac or not self.currentUser: - return False - from modules.interfaces.interfaceRbac import buildDataObjectKey - objectKey = buildDataObjectKey(modelClass.__name__, featureCode=self.featureCode) - permissions = self.rbac.getUserPermissions( - self.currentUser, - AccessRuleContext.DATA, - objectKey, - mandateId=self.mandateId, - featureInstanceId=self.featureInstanceId - ) - if operation == "create": - return permissions.create != AccessLevel.NONE - if operation == "update": - return permissions.update != AccessLevel.NONE - if operation == "delete": - return permissions.delete != AccessLevel.NONE - if operation == "read": - return permissions.read != AccessLevel.NONE - return False - - def _separateObjectFields(self, model_class, data: Dict[str, Any]) -> tuple: - """Separate simple fields from object fields (contextFiles, messages).""" - simple_fields = {} - object_fields = {} - model_fields = model_class.model_fields - for field_name, value in data.items(): - if field_name in model_fields: - if field_name in ("contextFiles", "messages"): - object_fields[field_name] = value - continue - if field_name.startswith("_"): - simple_fields[field_name] = value - elif isinstance(value, (str, int, float, bool, type(None))): - simple_fields[field_name] = value - elif field_name in model_fields: - field_info = model_fields[field_name] - if hasattr(field_info, "annotation"): - from typing import get_origin, get_args - origin = get_origin(field_info.annotation) - if origin in (dict, list): - simple_fields[field_name] = value - else: - object_fields[field_name] = value - else: - object_fields[field_name] = value - return simple_fields, object_fields - - # ===== Conversation CRUD ===== - - def getConversations( - self, - pagination: Optional[PaginationParams] = None - ) -> Union[List[Dict[str, Any]], PaginatedResult]: - """Get conversations for current feature instance.""" - record_filter = {} - if self.featureInstanceId: - record_filter["featureInstanceId"] = self.featureInstanceId - records = getRecordsetWithRBAC( - self.db, - ChatbotV2Conversation, - self.currentUser, - recordFilter=record_filter if record_filter else None, - orderBy="lastActivity", - mandateId=self.mandateId, - featureInstanceId=self.featureInstanceId, - featureCode=self.featureCode - ) - if pagination is None: - return records - total = len(records) - page = pagination.page or 1 - page_size = pagination.pageSize or 20 - start = (page - 1) * page_size - items = records[start : start + page_size] - total_pages = math.ceil(total / page_size) if total > 0 else 0 - return PaginatedResult(items=items, totalItems=total, totalPages=total_pages) - - def getConversation(self, conversationId: str) -> Optional[ChatbotV2Conversation]: - """Get a conversation by ID with hydrated context files and messages.""" - records = getRecordsetWithRBAC( - self.db, - ChatbotV2Conversation, - self.currentUser, - recordFilter={"id": conversationId}, - mandateId=self.mandateId, - featureInstanceId=self.featureInstanceId, - featureCode=self.featureCode - ) - if not records: - return None - r = records[0] - context_files = self.getContextFiles(conversationId) - messages = self.getMessages(conversationId) - max_steps = r.get("maxSteps") - if max_steps is None: - max_steps = 10 - return ChatbotV2Conversation( - id=r["id"], - featureInstanceId=r.get("featureInstanceId", "") or self.featureInstanceId or "", - mandateId=r.get("mandateId"), - name=r.get("name"), - status=r.get("status", "extracting"), - currentRound=r.get("currentRound", 0), - lastActivity=r.get("lastActivity", getUtcTimestamp()), - startedAt=r.get("startedAt", getUtcTimestamp()), - extractedContextId=r.get("extractedContextId"), - maxSteps=max_steps, - contextFiles=context_files, - messages=messages - ) - - def createConversation(self, data: Dict[str, Any]) -> ChatbotV2Conversation: - """Create a new conversation.""" - if not self.checkRbacPermission(ChatbotV2Conversation, "create"): - raise PermissionError("No permission to create conversations") - data["featureInstanceId"] = data.get("featureInstanceId") or self.featureInstanceId or "" - data["mandateId"] = data.get("mandateId") or self.mandateId - simple, obj = self._separateObjectFields(ChatbotV2Conversation, data) - if "maxSteps" not in simple or simple["maxSteps"] is None: - simple["maxSteps"] = 10 - created = self.db.recordCreate(ChatbotV2Conversation, simple) - max_steps = created.get("maxSteps") - if max_steps is None: - max_steps = 10 - return ChatbotV2Conversation( - id=created["id"], - featureInstanceId=created.get("featureInstanceId", ""), - mandateId=created.get("mandateId"), - name=created.get("name"), - status=created.get("status", "extracting"), - currentRound=created.get("currentRound", 0), - lastActivity=created.get("lastActivity", getUtcTimestamp()), - startedAt=created.get("startedAt", getUtcTimestamp()), - extractedContextId=created.get("extractedContextId"), - maxSteps=max_steps, - contextFiles=[], - messages=[] - ) - - def updateConversation(self, conversationId: str, data: Dict[str, Any]) -> Optional[ChatbotV2Conversation]: - """Update a conversation.""" - conv = self.getConversation(conversationId) - if not conv: - return None - if not self.checkRbacPermission(ChatbotV2Conversation, "update"): - raise PermissionError("No permission to update conversation") - simple, _ = self._separateObjectFields(ChatbotV2Conversation, data) - simple["lastActivity"] = getUtcTimestamp() - updated = self.db.recordModify(ChatbotV2Conversation, conversationId, simple) - if not updated: - return None - return self.getConversation(conversationId) - - def deleteConversation(self, conversationId: str) -> bool: - """Delete a conversation and all related data.""" - conv = self.getConversation(conversationId) - if not conv: - return False - if not self.checkRbacPermission(ChatbotV2Conversation, "delete"): - raise PermissionError("No permission to delete conversation") - for cf in self.getContextFiles(conversationId): - self.db.recordDelete(ChatbotV2ContextFile, cf.id) - ctx = self.getExtractedContextByConversation(conversationId) - if ctx: - self.db.recordDelete(ChatbotV2ExtractedContext, ctx.id) - for msg in self.getMessages(conversationId): - for doc in self.getDocuments(msg.id): - self.db.recordDelete(ChatbotV2Document, doc.id) - self.db.recordDelete(ChatbotV2Message, msg.id) - for log in self.getLogs(conversationId): - self.db.recordDelete(ChatbotV2Log, log.id) - return self.db.recordDelete(ChatbotV2Conversation, conversationId) - - # ===== Context File CRUD ===== - - def getContextFiles(self, conversationId: str) -> List[ChatbotV2ContextFile]: - """Get context files for a conversation.""" - records = getRecordsetWithRBAC( - self.db, - ChatbotV2ContextFile, - self.currentUser, - recordFilter={"conversationId": conversationId}, - orderBy="uploadOrder", - mandateId=self.mandateId, - featureInstanceId=self.featureInstanceId, - featureCode=self.featureCode - ) - return [ChatbotV2ContextFile(**r) for r in records] - - def createContextFile(self, data: Dict[str, Any]) -> ChatbotV2ContextFile: - """Create a context file record.""" - return ChatbotV2ContextFile(**self.db.recordCreate(ChatbotV2ContextFile, data)) - - # ===== Extracted Context CRUD ===== - - def getExtractedContext(self, extractedContextId: str) -> Optional[ChatbotV2ExtractedContext]: - """Get extracted context by ID.""" - records = self.db.getRecordset( - ChatbotV2ExtractedContext, - recordFilter={"id": extractedContextId} - ) - if not records: - return None - return ChatbotV2ExtractedContext(**records[0]) - - def getExtractedContextByConversation(self, conversationId: str) -> Optional[ChatbotV2ExtractedContext]: - """Get extracted context for a conversation.""" - records = self.db.getRecordset( - ChatbotV2ExtractedContext, - recordFilter={"conversationId": conversationId} - ) - if not records: - return None - return ChatbotV2ExtractedContext(**records[0]) - - def createExtractedContext(self, data: Dict[str, Any]) -> ChatbotV2ExtractedContext: - """Create extracted context record.""" - created = self.db.recordCreate(ChatbotV2ExtractedContext, data) - return ChatbotV2ExtractedContext(**created) - - def updateExtractedContext(self, extractedContextId: str, data: Dict[str, Any]) -> Optional[ChatbotV2ExtractedContext]: - """Update extracted context.""" - updated = self.db.recordModify(ChatbotV2ExtractedContext, extractedContextId, data) - return ChatbotV2ExtractedContext(**updated) if updated else None - - # ===== Message CRUD ===== - - def getMessages(self, conversationId: str) -> List[ChatbotV2Message]: - """Get messages for a conversation.""" - records = getRecordsetWithRBAC( - self.db, - ChatbotV2Message, - self.currentUser, - recordFilter={"conversationId": conversationId}, - orderBy="sequenceNr", - mandateId=self.mandateId, - featureInstanceId=self.featureInstanceId, - featureCode=self.featureCode - ) - return [ChatbotV2Message(**r) for r in records] - - def createMessage(self, data: Dict[str, Any]) -> ChatbotV2Message: - """Create a message.""" - if "id" not in data or not data["id"]: - data["id"] = str(uuid.uuid4()) - created = self.db.recordCreate(ChatbotV2Message, data) - return ChatbotV2Message(**created) - - # ===== Document CRUD ===== - - def getDocuments(self, messageId: str) -> List[ChatbotV2Document]: - """Get documents for a message.""" - records = self.db.getRecordset( - ChatbotV2Document, - recordFilter={"messageId": messageId} - ) - return [ChatbotV2Document(**r) for r in records] - - def createDocument(self, data: Dict[str, Any]) -> ChatbotV2Document: - """Create a document record.""" - created = self.db.recordCreate(ChatbotV2Document, data) - return ChatbotV2Document(**created) - - # ===== Log CRUD ===== - - def getLogs(self, conversationId: str) -> List[ChatbotV2Log]: - """Get logs for a conversation.""" - records = self.db.getRecordset( - ChatbotV2Log, - recordFilter={"conversationId": conversationId} - ) - # Sort by timestamp (connector doesn't support orderBy) - logs = [ChatbotV2Log(**r) for r in records] - logs.sort(key=lambda log: parseTimestamp(log.timestamp, default=0)) - return logs - - def createLog(self, data: Dict[str, Any]) -> ChatbotV2Log: - """Create a log entry.""" - if "timestamp" not in data: - data["timestamp"] = getUtcTimestamp() - created = self.db.recordCreate(ChatbotV2Log, data) - return ChatbotV2Log(**created) - - # ===== Unified Chat Data (for streaming/polling) ===== - - def getUnifiedChatData( - self, - conversationId: str, - afterTimestamp: Optional[float] = None - ) -> Dict[str, Any]: - """Get unified chat data (messages, logs) in chronological order.""" - conv = self.getConversation(conversationId) - if not conv: - return {"items": []} - items = [] - for msg in conv.messages: - ts = parseTimestamp(msg.publishedAt, default=getUtcTimestamp()) - if afterTimestamp is not None and ts <= afterTimestamp: - continue - items.append({"type": "message", "createdAt": ts, "item": msg}) - for log in self.getLogs(conversationId): - ts = parseTimestamp(log.timestamp, default=getUtcTimestamp()) - if afterTimestamp is not None and ts <= afterTimestamp: - continue - items.append({"type": "log", "createdAt": ts, "item": log}) - items.sort(key=lambda x: x.get("createdAt", 0)) - return {"items": items} diff --git a/modules/features/chatbotV2/mainChatbotV2.py b/modules/features/chatbotV2/mainChatbotV2.py deleted file mode 100644 index 11f6336e..00000000 --- a/modules/features/chatbotV2/mainChatbotV2.py +++ /dev/null @@ -1,250 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Chatbot V2 Feature Container - Main Module. -Handles feature initialization and RBAC catalog registration. -Context-aware chat with file upload and extraction. -""" - -import logging -from typing import Dict, List, Any - -logger = logging.getLogger(__name__) - -# Feature metadata -FEATURE_CODE = "chatbotv2" -FEATURE_LABEL = {"en": "Chatbot V2", "de": "Chatbot V2", "fr": "Chatbot V2"} -FEATURE_ICON = "mdi-robot-outline" - -# UI Objects for RBAC catalog - single "conversations" view (upload + chat in one page) -UI_OBJECTS = [ - { - "objectKey": "ui.feature.chatbotv2.conversations", - "label": {"en": "Conversations", "de": "Konversationen", "fr": "Conversations"}, - "meta": {"area": "conversations"} - }, -] - -# Resource Objects for RBAC catalog -RESOURCE_OBJECTS = [ - { - "objectKey": "resource.feature.chatbotv2.upload", - "label": {"en": "Upload Files", "de": "Dateien hochladen", "fr": "Télécharger fichiers"}, - "meta": {"endpoint": "/api/chatbotv2/{instanceId}/upload", "method": "POST"} - }, - { - "objectKey": "resource.feature.chatbotv2.startStream", - "label": {"en": "Start Chat (Stream)", "de": "Chat starten (Stream)", "fr": "Démarrer chat (Stream)"}, - "meta": {"endpoint": "/api/chatbotv2/{instanceId}/start/stream", "method": "POST"} - }, - { - "objectKey": "resource.feature.chatbotv2.stop", - "label": {"en": "Stop Chat", "de": "Chat stoppen", "fr": "Arrêter chat"}, - "meta": {"endpoint": "/api/chatbotv2/{instanceId}/stop/{workflowId}", "method": "POST"} - }, - { - "objectKey": "resource.feature.chatbotv2.threads", - "label": {"en": "Get Threads", "de": "Threads abrufen", "fr": "Récupérer threads"}, - "meta": {"endpoint": "/api/chatbotv2/{instanceId}/threads", "method": "GET"} - }, - { - "objectKey": "resource.feature.chatbotv2.delete", - "label": {"en": "Delete Chat", "de": "Chat löschen", "fr": "Supprimer chat"}, - "meta": {"endpoint": "/api/chatbotv2/{instanceId}/conversations/{workflowId}", "method": "DELETE"} - }, -] - -# Template roles for this feature -TEMPLATE_ROLES = [ - { - "roleLabel": "chatbotv2-viewer", - "description": { - "en": "Chatbot V2 Viewer - View threads (read-only)", - "de": "Chatbot V2 Betrachter - Threads ansehen (nur lesen)", - "fr": "Visualiseur Chatbot V2 - Consulter les threads (lecture seule)" - }, - "accessRules": [ - {"context": "UI", "item": "ui.feature.chatbotv2.conversations", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.chatbotv2.threads", "view": True}, - {"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"}, - ] - }, - { - "roleLabel": "chatbotv2-user", - "description": { - "en": "Chatbot V2 User - Upload, extract, and chat with own threads", - "de": "Chatbot V2 Benutzer - Hochladen, extrahieren und chatten mit eigenen Threads", - "fr": "Utilisateur Chatbot V2 - Upload, extraction et chat avec ses threads" - }, - "accessRules": [ - {"context": "UI", "item": "ui.feature.chatbotv2.conversations", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.chatbotv2.upload", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.chatbotv2.startStream", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.chatbotv2.stop", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.chatbotv2.threads", "view": True}, - {"context": "RESOURCE", "item": "resource.feature.chatbotv2.delete", "view": True}, - {"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"}, - ] - }, - { - "roleLabel": "chatbotv2-admin", - "description": { - "en": "Chatbot V2 Admin - Full access to all features", - "de": "Chatbot V2 Admin - Vollzugriff auf alle Funktionen", - "fr": "Administrateur Chatbot V2 - Accès complet" - }, - "accessRules": [ - {"context": "UI", "item": None, "view": True}, - {"context": "RESOURCE", "item": None, "view": True}, - {"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"}, - ] - }, -] - - -def getFeatureDefinition() -> Dict[str, Any]: - """Return the feature definition for registration.""" - return { - "code": FEATURE_CODE, - "label": FEATURE_LABEL, - "icon": FEATURE_ICON, - } - - -def getUiObjects() -> List[Dict[str, Any]]: - """Return UI objects for RBAC catalog registration.""" - return UI_OBJECTS - - -def getResourceObjects() -> List[Dict[str, Any]]: - """Return resource objects for RBAC catalog registration.""" - return RESOURCE_OBJECTS - - -def getTemplateRoles() -> List[Dict[str, Any]]: - """Return template roles for this feature.""" - return TEMPLATE_ROLES - - -def registerFeature(catalogService) -> bool: - """ - Register this feature's RBAC objects in the catalog. - """ - try: - for uiObj in UI_OBJECTS: - catalogService.registerUiObject( - featureCode=FEATURE_CODE, - objectKey=uiObj["objectKey"], - label=uiObj["label"], - meta=uiObj.get("meta") - ) - - for resObj in RESOURCE_OBJECTS: - catalogService.registerResourceObject( - featureCode=FEATURE_CODE, - objectKey=resObj["objectKey"], - label=resObj["label"], - meta=resObj.get("meta") - ) - - _syncTemplateRolesToDb() - - logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects") - return True - - except Exception as e: - logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}") - return False - - -def _syncTemplateRolesToDb() -> int: - """Sync template roles and their AccessRules to the database.""" - try: - from modules.interfaces.interfaceDbApp import getRootInterface - from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext - - rootInterface = getRootInterface() - - existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE) - templateRoles = [r for r in existingRoles if r.mandateId is None] - existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles} - - createdCount = 0 - for roleTemplate in TEMPLATE_ROLES: - roleLabel = roleTemplate["roleLabel"] - - if roleLabel in existingRoleLabels: - roleId = existingRoleLabels[roleLabel] - _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) - else: - newRole = Role( - roleLabel=roleLabel, - description=roleTemplate.get("description", {}), - featureCode=FEATURE_CODE, - mandateId=None, - featureInstanceId=None, - isSystemRole=False - ) - createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump()) - roleId = createdRole.get("id") - - _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) - - logger.info(f"Created template role '{roleLabel}' with ID {roleId}") - createdCount += 1 - - if createdCount > 0: - logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles") - - return createdCount - - except Exception as e: - logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}") - return 0 - - -def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int: - """Ensure AccessRules exist for a role based on templates.""" - from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext - - existingRules = rootInterface.getAccessRulesByRole(roleId) - existingSignatures = set() - for rule in existingRules: - sig = (rule.context.value if rule.context else None, rule.item) - existingSignatures.add(sig) - - createdCount = 0 - for template in ruleTemplates: - context = template.get("context", "UI") - item = template.get("item") - sig = (context, item) - - if sig in existingSignatures: - continue - - if context == "UI": - contextEnum = AccessRuleContext.UI - elif context == "DATA": - contextEnum = AccessRuleContext.DATA - elif context == "RESOURCE": - contextEnum = AccessRuleContext.RESOURCE - else: - contextEnum = context - - newRule = AccessRule( - roleId=roleId, - context=contextEnum, - item=item, - view=template.get("view", False), - read=template.get("read"), - create=template.get("create"), - update=template.get("update"), - delete=template.get("delete"), - ) - rootInterface.db.recordCreate(AccessRule, newRule.model_dump()) - createdCount += 1 - - if createdCount > 0: - logger.debug(f"Created {createdCount} AccessRules for role {roleId}") - - return createdCount diff --git a/modules/features/chatbotV2/routeFeatureChatbotV2.py b/modules/features/chatbotV2/routeFeatureChatbotV2.py deleted file mode 100644 index dd460029..00000000 --- a/modules/features/chatbotV2/routeFeatureChatbotV2.py +++ /dev/null @@ -1,350 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Chatbot V2 routes - context-aware chat with file upload and extraction. -""" - -import asyncio -import json -import math -import logging -import uuid -from typing import Optional, Any, Dict, Union -from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request, status, UploadFile, File -from fastapi.responses import StreamingResponse -from pydantic import BaseModel, Field - -from modules.auth import limiter, getRequestContext, RequestContext -from modules.interfaces.interfaceDbApp import getRootInterface -from modules.interfaces.interfaceFeatures import getFeatureInterface -from modules.datamodels.datamodelChat import UserInputRequest -from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata -from modules.shared.timeUtils import getUtcTimestamp - -from . import interfaceFeatureChatbotV2 as interfaceDbChat -from .interfaceFeatureChatbotV2 import getInterface as getChatbotV2Interface -from .datamodelFeatureChatbotV2 import ChatbotV2Conversation -from .serviceChatbotV2 import uploadAndExtract, chatProcessV2 -from modules.features.chatbot.streaming.events import get_event_manager - -logger = logging.getLogger(__name__) - -router = APIRouter( - prefix="/api/chatbotv2", - tags=["Chatbot V2"], - responses={404: {"description": "Not found"}} -) - - -class UploadRequest(BaseModel): - """Request body for file upload - files must be uploaded to central storage first.""" - listFileId: list[str] = Field(default_factory=list, description="List of file IDs from central storage") - - -def _getServiceChat(context: RequestContext, instanceId: Optional[str] = None): - """Get ChatbotV2 interface with instance context.""" - mandateId = str(context.mandateId) if context.mandateId else None - return getChatbotV2Interface( - context.user, - mandateId=mandateId, - featureInstanceId=instanceId - ) - - -def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str: - """Validate that the user has access to the feature instance.""" - rootInterface = getRootInterface() - featureInterface = getFeatureInterface(rootInterface.db) - instance = featureInterface.getFeatureInstance(instanceId) - if not instance: - raise HTTPException( - status_code=404, - detail=f"Feature instance '{instanceId}' not found" - ) - if instance.featureCode != "chatbotv2": - raise HTTPException( - status_code=400, - detail=f"Instance '{instanceId}' is not a chatbotv2 instance" - ) - if not context.hasSysAdminRole: - featureAccesses = rootInterface.getFeatureAccessesForUser(str(context.user.id)) - hasAccess = any( - str(fa.featureInstanceId) == instanceId and fa.enabled - for fa in featureAccesses - ) - if not hasAccess: - raise HTTPException( - status_code=403, - detail=f"Access denied to feature instance '{instanceId}'" - ) - return str(instance.mandateId) - - -# ============================================================================= -# Upload - start extraction -# ============================================================================= -@router.post("/{instanceId}/upload") -@limiter.limit("60/minute") -async def upload_files( - request: Request, - instanceId: str = Path(..., description="Feature Instance ID"), - body: UploadRequest = Body(...), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """ - Upload files as context and start extraction. - Files must be uploaded to central storage first; pass their file IDs in listFileId. - Returns conversationId. Extraction runs in background; poll threads or use SSE for status. - """ - mandateId = _validateInstanceAccess(instanceId, context) - if not body.listFileId: - raise HTTPException(status_code=400, detail="listFileId is required and must not be empty") - try: - conversation = await uploadAndExtract( - context.user, - mandateId=mandateId, - instanceId=instanceId, - listFileId=body.listFileId - ) - return { - "conversationId": conversation.id, - "status": conversation.status, - "message": "Extraction started. Poll GET /threads?workflowId={} for status.".format(conversation.id) - } - except Exception as e: - logger.error(f"Error in upload_files: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=str(e)) - - -# ============================================================================= -# List threads - MUST be first to avoid /{instanceId}/{workflowId} matching -# ============================================================================= -@router.get("/{instanceId}/threads") -@limiter.limit("120/minute") -def get_threads( - request: Request, - instanceId: str = Path(..., description="Feature Instance ID"), - workflowId: Optional[str] = Query(None, description="Optional workflow/conversation ID for details"), - pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"), - context: RequestContext = Depends(getRequestContext) -) -> Union[PaginatedResponse, Dict[str, Any]]: - """List conversations or get details for a specific one.""" - _validateInstanceAccess(instanceId, context) - interfaceDbChat = _getServiceChat(context, instanceId) - - if workflowId: - conv = interfaceDbChat.getConversation(workflowId) - if not conv: - raise HTTPException(status_code=404, detail=f"Conversation {workflowId} not found") - workflow_dict = conv.model_dump() - chatData = interfaceDbChat.getUnifiedChatData(workflowId, None) - return {"workflow": workflow_dict, "chatData": chatData} - - paginationParams = None - if pagination: - try: - paginationDict = json.loads(pagination) - paginationParams = PaginationParams(**paginationDict) if paginationDict else None - except (json.JSONDecodeError, ValueError) as e: - raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") - - all_convs = interfaceDbChat.getConversations(pagination=None) - # all_convs from getConversations can be list of dicts (from getRecordsetWithRBAC) - items = [c if isinstance(c, dict) else c.model_dump() for c in all_convs] - - if paginationParams: - totalItems = len(items) - totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0 - startIdx = (paginationParams.page - 1) * paginationParams.pageSize - endIdx = startIdx + paginationParams.pageSize - workflows = items[startIdx:endIdx] - else: - workflows = items - totalItems = len(items) - totalPages = 1 - - metadata = PaginationMetadata( - currentPage=paginationParams.page if paginationParams else 1, - pageSize=paginationParams.pageSize if paginationParams else len(workflows), - totalItems=totalItems, - totalPages=totalPages, - sort=paginationParams.sort if paginationParams else [], - filters=paginationParams.filters if paginationParams else None - ) - return PaginatedResponse(items=workflows, pagination=metadata) - - -# ============================================================================= -# Start/continue chat (SSE stream) -# ============================================================================= -@router.post("/{instanceId}/start/stream") -@limiter.limit("120/minute") -async def stream_chat_start( - request: Request, - instanceId: str = Path(..., description="Feature Instance ID"), - workflowId: Optional[str] = Query(None, description="Optional conversation ID to continue"), - userInput: UserInputRequest = Body(...), - context: RequestContext = Depends(getRequestContext) -) -> StreamingResponse: - """Start or continue a chat with SSE streaming.""" - mandateId = _validateInstanceAccess(instanceId, context) - event_manager = get_event_manager() - final_workflow_id = workflowId or userInput.workflowId - - try: - workflow = await chatProcessV2( - context.user, - mandateId=mandateId, - userInput=userInput, - conversationId=final_workflow_id, - instanceId=instanceId - ) - if not workflow: - raise HTTPException(status_code=500, detail="Failed to create or load workflow") - - queue = event_manager.get_queue(workflow.id) - if not queue: - queue = event_manager.create_queue(workflow.id) - - async def event_stream(): - try: - interfaceDbChat = _getServiceChat(context, instanceId) - chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) - if chatData.get("items"): - for item in chatData["items"]: - ser = { - "type": item.get("type"), - "createdAt": item.get("createdAt"), - "item": item.get("item").model_dump() if hasattr(item.get("item"), "model_dump") else item.get("item") - } - yield f"data: {json.dumps(ser)}\n\n" - - keepalive_interval = 30.0 - last_keepalive = asyncio.get_event_loop().time() - status_check_interval = 5.0 - last_status_check = asyncio.get_event_loop().time() - timeout = 300.0 - start_time = asyncio.get_event_loop().time() - - while True: - elapsed = asyncio.get_event_loop().time() - start_time - if elapsed > timeout: - break - if await request.is_disconnected(): - break - current_time = asyncio.get_event_loop().time() - - if current_time - last_status_check >= status_check_interval: - try: - cw = interfaceDbChat.getConversation(workflow.id) - if cw and cw.status == "stopped": - break - except Exception: - pass - last_status_check = current_time - - try: - event = await asyncio.wait_for(queue.get(), timeout=1.0) - event_type = event.get("type") - event_data = event.get("data", {}) - - if event_type == "chatdata" and event_data: - if event_data.get("type") == "status": - yield f"data: {json.dumps({'type': 'status', 'label': event_data.get('label', '')})}\n\n" - else: - item = event_data - if isinstance(item, dict) and "item" in item: - obj = item.get("item") - if hasattr(obj, "model_dump"): - item = {**item, "item": obj.model_dump()} - yield f"data: {json.dumps(item)}\n\n" - elif event_type in ("complete", "stopped"): - break - elif event_type == "error" and event.get("step") == "error": - break - last_keepalive = current_time - except asyncio.TimeoutError: - if current_time - last_keepalive >= keepalive_interval: - yield ": keepalive\n\n" - last_keepalive = current_time - except Exception as e: - logger.error(f"Error in event stream: {e}") - break - except Exception as e: - logger.error(f"Error in event stream generator: {e}", exc_info=True) - - return StreamingResponse( - event_stream(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no" - } - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error in stream_chat_start: {str(e)}", exc_info=True) - raise HTTPException(status_code=500, detail=str(e)) - - -# ============================================================================= -# Stop chat -# ============================================================================= -@router.post("/{instanceId}/stop/{workflowId}") -@limiter.limit("120/minute") -async def stop_chat( - request: Request, - instanceId: str = Path(..., description="Feature Instance ID"), - workflowId: str = Path(..., description="Conversation ID to stop"), - context: RequestContext = Depends(getRequestContext) -) -> ChatbotV2Conversation: - """Stop a running chat.""" - _validateInstanceAccess(instanceId, context) - interfaceDbChat = _getServiceChat(context, instanceId) - conv = interfaceDbChat.getConversation(workflowId) - if not conv: - raise HTTPException(status_code=404, detail=f"Conversation {workflowId} not found") - interfaceDbChat.updateConversation(workflowId, {"status": "stopped", "lastActivity": getUtcTimestamp()}) - interfaceDbChat.createLog({ - "conversationId": workflowId, - "message": "Workflow stopped by user", - "type": "warning", - "status": "stopped", - "timestamp": getUtcTimestamp() - }) - event_manager = get_event_manager() - await event_manager.emit_event( - context_id=workflowId, - event_type="stopped", - data={"workflowId": workflowId}, - event_category="workflow", - message="Workflow stopped by user", - step="stopped" - ) - return interfaceDbChat.getConversation(workflowId) - - -# ============================================================================= -# Delete conversation - use /conversations/{workflowId} to avoid -# /{instanceId}/{workflowId} matching GET /threads (workflowId="threads") -# ============================================================================= -@router.delete("/{instanceId}/conversations/{workflowId}") -@limiter.limit("120/minute") -def delete_conversation( - request: Request, - instanceId: str = Path(..., description="Feature Instance ID"), - workflowId: str = Path(..., description="Conversation ID to delete"), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """Delete a conversation and its data.""" - _validateInstanceAccess(instanceId, context) - interfaceDbChat = _getServiceChat(context, instanceId) - conv = interfaceDbChat.getConversation(workflowId) - if not conv: - raise HTTPException(status_code=404, detail=f"Conversation {workflowId} not found") - success = interfaceDbChat.deleteConversation(workflowId) - if not success: - raise HTTPException(status_code=500, detail="Failed to delete conversation") - return {"id": workflowId, "message": "Conversation deleted successfully"} diff --git a/modules/features/chatbotV2/serviceChatbotV2.py b/modules/features/chatbotV2/serviceChatbotV2.py deleted file mode 100644 index e71f92b2..00000000 --- a/modules/features/chatbotV2/serviceChatbotV2.py +++ /dev/null @@ -1,258 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Chatbot V2 service - orchestration for upload/extraction and chat. -""" - -import logging -import uuid -from typing import Optional, List - -from modules.datamodels.datamodelUam import User -from modules.datamodels.datamodelChat import UserInputRequest -from modules.datamodels.datamodelAi import OperationTypeEnum, ProcessingModeEnum -from modules.shared.timeUtils import getUtcTimestamp - -from modules.services import getInterface as getServices -from .interfaceFeatureChatbotV2 import getInterface as getChatbotV2Interface -from .datamodelFeatureChatbotV2 import ChatbotV2Conversation -from .contextExtractionLangGraph import run_extraction -from .chatbotV2 import create_chat_graph -from .config import load_chatbotv2_config_from_instance -from .bridges import AICenterChatModel, clear_workflow_allowed_providers, ChatbotV2Checkpointer -from modules.features.chatbot.streaming.events import get_event_manager - -logger = logging.getLogger(__name__) - - -async def _load_config(instance_id: Optional[str]): - """Load ChatbotV2 config from feature instance.""" - if not instance_id: - return None - from modules.interfaces.interfaceDbApp import getRootInterface - from modules.interfaces.interfaceFeatures import getFeatureInterface - root = getRootInterface() - feat = getFeatureInterface(root.db) - instance = feat.getFeatureInstance(instance_id) - if not instance: - return None - return load_chatbotv2_config_from_instance(instance) - - -async def uploadAndExtract( - currentUser: User, - mandateId: Optional[str], - instanceId: str, - listFileId: List[str] -) -> ChatbotV2Conversation: - """ - Create conversation, store context files, run extraction, save result. - """ - interface = getChatbotV2Interface(currentUser, mandateId=mandateId, featureInstanceId=instanceId) - services = getServices(currentUser, mandateId=mandateId, featureInstanceId=instanceId) - - conversation_id = str(uuid.uuid4()) - conv_data = { - "id": conversation_id, - "featureInstanceId": instanceId, - "mandateId": mandateId, - "status": "extracting", - "name": "Context Chat", - "currentRound": 0, - "maxSteps": 10, - "startedAt": getUtcTimestamp(), - "lastActivity": getUtcTimestamp() - } - conv = interface.createConversation(conv_data) - - files_for_extraction = [] - for idx, file_id in enumerate(listFileId): - try: - file_info = services.chat.getFileInfo(file_id) - if not file_info: - logger.warning(f"File {file_id} not found") - continue - file_bytes = services.chat.getFileData(file_id) - if not file_bytes: - logger.warning(f"No data for file {file_id}") - continue - interface.createContextFile({ - "conversationId": conversation_id, - "fileId": file_id, - "fileName": file_info.get("fileName", "document"), - "mimeType": file_info.get("mimeType", "application/octet-stream"), - "fileSize": file_info.get("size", 0), - "uploadOrder": idx - }) - files_for_extraction.append({ - "fileId": file_id, - "bytes": file_bytes, - "mimeType": file_info.get("mimeType", "application/octet-stream"), - "fileName": file_info.get("fileName", "document") - }) - except Exception as e: - logger.error(f"Error loading file {file_id}: {e}", exc_info=True) - - if not files_for_extraction: - interface.updateConversation(conversation_id, {"status": "ready"}) - interface.createExtractedContext({ - "conversationId": conversation_id, - "textBlocks": [], - "summaries": [], - "extractionStatus": "failed", - "errors": ["No files could be loaded"] - }) - return interface.getConversation(conversation_id) - - result = run_extraction(files_for_extraction) - # Store sections in summaries for chat context (build_context_system_prompt uses sections) - summaries = result.get("summaries", []) or result.get("sections", []) - extracted = interface.createExtractedContext({ - "conversationId": conversation_id, - "textBlocks": result.get("textBlocks", []), - "summaries": summaries, - "extractionStatus": "completed", - "errors": result.get("errors", []), - "createdAt": getUtcTimestamp() - }) - interface.updateConversation(conversation_id, { - "status": "ready", - "extractedContextId": extracted.id, - "lastActivity": getUtcTimestamp() - }) - return interface.getConversation(conversation_id) - - -async def chatProcessV2( - currentUser: User, - mandateId: Optional[str], - userInput: UserInputRequest, - conversationId: Optional[str], - instanceId: str -) -> Optional[ChatbotV2Conversation]: - """ - Run chat with extracted context. Creates or resumes conversation. - """ - interface = getChatbotV2Interface(currentUser, mandateId=mandateId, featureInstanceId=instanceId) - event_manager = get_event_manager() - - config = await _load_config(instanceId) - base_prompt = config.systemPrompt if config else "You are a helpful assistant. Answer based on the provided context." - - if conversationId: - conv = interface.getConversation(conversationId) - if not conv: - raise ValueError(f"Conversation {conversationId} not found") - if conv.status == "extracting": - raise ValueError("Conversation not ready for chat (status: extracting). Wait for extraction to complete.") - # Reset stale "running" from previous failed/interrupted request - if conv.status == "running": - logger.info("Resetting stale conversation status from 'running' to 'ready'") - interface.updateConversation(conversationId, {"status": "ready"}) - new_round = conv.currentRound + 1 - interface.updateConversation(conversationId, { - "status": "running", - "currentRound": new_round, - "lastActivity": getUtcTimestamp() - }) - conv = interface.getConversation(conversationId) - if not event_manager.has_queue(conversationId): - event_manager.create_queue(conversationId) - else: - raise ValueError("conversationId is required for Chatbot V2 chat") - - extracted_ctx = interface.getExtractedContextByConversation(conversationId) - ctx_dict = {"textBlocks": [], "sections": []} - if extracted_ctx: - tb = extracted_ctx.textBlocks or [] - ctx_dict["textBlocks"] = tb - # summaries hold sections from extraction (fileName, text, blockCount) - ctx_dict["sections"] = extracted_ctx.summaries if isinstance(extracted_ctx.summaries, list) else [] - if not ctx_dict["sections"] and tb: - # Build sections from textBlocks if summaries empty - for doc in tb: - blocks = doc.get("blocks", []) - text_parts = [b.get("text", "") for b in blocks] - ctx_dict["sections"].append({ - "fileId": doc.get("fileId"), - "fileName": doc.get("fileName", "document"), - "text": "\n".join(text_parts), - "blockCount": len(blocks) - }) - - user_msg = userInput.prompt or "" - if not user_msg.strip(): - raise ValueError("Prompt is required") - - max_context_chars = config.maxContextChars if config else None - chunk_size = config.chunkSize if config else None - chunk_overlap = config.chunkOverlap if config else None - # Resolve to concrete values (chat node reads from configurable) - max_ctx = max_context_chars if max_context_chars and max_context_chars > 0 else 60_000 - cs = chunk_size if chunk_size and chunk_size > 0 else 15_000 - co = chunk_overlap if chunk_overlap is not None and chunk_overlap >= 0 else 500 - - allowed_providers = config.model.allowedProviders if config else [] - services = getServices(currentUser, mandateId=mandateId, featureInstanceId=instanceId) - if allowed_providers: - services.allowedProviders = allowed_providers # type: ignore[attr-defined] - - model = AICenterChatModel( - user=currentUser, - operation_type=OperationTypeEnum.DATA_ANALYSE, - processing_mode=ProcessingModeEnum.BASIC, - workflow_id=conversationId, - allowed_providers=allowed_providers if allowed_providers else None - ) - memory = ChatbotV2Checkpointer( - user=currentUser, - workflow_id=conversationId, - mandateId=mandateId, - featureInstanceId=instanceId - ) - app = create_chat_graph(model, memory) - - chatbotv2_context = { - "ctx_dict": ctx_dict, - "user_question": user_msg, - "base_prompt": base_prompt, - "max_context_chars": max_ctx, - "chunk_size": cs, - "chunk_overlap": co, - } - - interface.createMessage({ - "id": str(uuid.uuid4()), - "conversationId": conversationId, - "message": user_msg, - "role": "user", - "status": "first", - "sequenceNr": len(interface.getMessages(conversationId)) + 1, - "publishedAt": getUtcTimestamp(), - "roundNumber": conv.currentRound - }) - - try: - result = await app.ainvoke( - { - "messages": [{"role": "user", "content": user_msg}], - "chatbotv2_context": chatbotv2_context, - }, - config={"configurable": {"thread_id": conversationId}} - ) - # Assistant message is stored by ChatbotV2Checkpointer.put() - do NOT create again here. - # The stream sends chatData at start (from DB), so no need to emit chatdata event. - await event_manager.emit_event( - context_id=conversationId, - event_type="complete", - data={}, - event_category="workflow" - ) - finally: - clear_workflow_allowed_providers(conversationId) - - interface.updateConversation(conversationId, { - "status": "ready", - "lastActivity": getUtcTimestamp() - }) - return interface.getConversation(conversationId) diff --git a/modules/routes/routeSystem.py b/modules/routes/routeSystem.py index 12867604..1039d8e9 100644 --- a/modules/routes/routeSystem.py +++ b/modules/routes/routeSystem.py @@ -119,9 +119,6 @@ def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]: elif featureCode == "chatbot": from modules.features.chatbot.mainChatbot import UI_OBJECTS return UI_OBJECTS - elif featureCode == "chatbotv2": - from modules.features.chatbotV2.mainChatbotV2 import UI_OBJECTS - return UI_OBJECTS else: logger.warning(f"Unknown feature code: {featureCode}") return [] diff --git a/modules/system/registry.py b/modules/system/registry.py index 3f037a51..3f3717a9 100644 --- a/modules/system/registry.py +++ b/modules/system/registry.py @@ -38,11 +38,10 @@ def discoverFeatureContainers() -> List[str]: def _router_load_order_key(filepath: str) -> tuple: """ - Sort key for router loading. Longer/ Lexicographically later prefixes first - so that /api/chatbotv2 is registered before /api/chatbot (avoids prefix collision). + Sort key for router loading. Longer/lexicographically later prefixes first + so that more specific routes (e.g. /api/chatplayground) register before generic ones. """ featureDir = os.path.basename(os.path.dirname(filepath)) - # chatbotV2 before chatbot - use negative length then name so longer names sort first return (-len(featureDir), featureDir) diff --git a/scripts/script_db_init_chatbotv2.py b/scripts/script_db_init_chatbotv2.py deleted file mode 100644 index b8cda85f..00000000 --- a/scripts/script_db_init_chatbotv2.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python3 -""" -Initialize poweron_chatbotv2 database for the Chatbot V2 feature. - -Creates the poweron_chatbotv2 database if it does not exist. -Uses DB_CHATBOTV2_* config (falls back to DB_*). -Tables (ChatbotV2Conversation, ChatbotV2ContextFile, ChatbotV2ExtractedContext, -ChatbotV2Message, ChatbotV2Document, ChatbotV2Log) are auto-created by the -connector on first use. - -Usage: - python script_db_init_chatbotv2.py [--dry-run] -""" - -import os -import sys -import argparse -import logging -from pathlib import Path - -scriptPath = Path(__file__).resolve() -gatewayPath = scriptPath.parent.parent -sys.path.insert(0, str(gatewayPath)) -os.chdir(str(gatewayPath)) - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - -import psycopg2 -from modules.shared.configuration import APP_CONFIG - -DB_NAME = "poweron_chatbotv2" -CONFIG_PREFIX = "DB_CHATBOTV2" - - -def _get_config(): - """Get DB config: DB_CHATBOTV2_* with fallback to DB_*.""" - host = APP_CONFIG.get(f"{CONFIG_PREFIX}_HOST") or APP_CONFIG.get("DB_HOST", "localhost") - port = int(APP_CONFIG.get(f"{CONFIG_PREFIX}_PORT") or APP_CONFIG.get("DB_PORT", "5432")) - user = APP_CONFIG.get(f"{CONFIG_PREFIX}_USER") or APP_CONFIG.get("DB_USER") - password = ( - APP_CONFIG.get(f"{CONFIG_PREFIX}_PASSWORD_SECRET") - or APP_CONFIG.get(f"{CONFIG_PREFIX}_PASSWORD") - or APP_CONFIG.get("DB_PASSWORD_SECRET") - or APP_CONFIG.get("DB_PASSWORD") - ) - return {"host": host, "port": port, "user": user, "password": password} - - -def init_chatbotv2_db(dry_run: bool = False) -> bool: - """Create poweron_chatbotv2 database if it does not exist.""" - config = _get_config() - if not config["user"] or not config["password"]: - logger.error("DB_USER and DB_PASSWORD (or DB_CHATBOTV2_*) required") - return False - - try: - conn = psycopg2.connect( - host=config["host"], - port=config["port"], - database="postgres", - user=config["user"], - password=config["password"], - ) - conn.autocommit = True - - with conn.cursor() as cur: - cur.execute( - "SELECT 1 FROM pg_database WHERE datname = %s", - (DB_NAME,), - ) - exists = cur.fetchone() is not None - - if exists: - logger.info(f"Database {DB_NAME} already exists") - else: - if dry_run: - logger.info(f"[DRY-RUN] Would create database {DB_NAME}") - else: - cur.execute(f'CREATE DATABASE "{DB_NAME}"') - logger.info(f"Created database {DB_NAME}") - - conn.close() - return True - except Exception as e: - logger.error(f"Failed to init {DB_NAME}: {e}") - return False - - -def main(): - parser = argparse.ArgumentParser(description="Initialize poweron_chatbotv2 database") - parser.add_argument("--dry-run", action="store_true", help="Do not create, only report") - args = parser.parse_args() - ok = init_chatbotv2_db(dry_run=args.dry_run) - sys.exit(0 if ok else 1) - - -if __name__ == "__main__": - main()