# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Chunk-based context retrieval for Chatbot V2. Splits documents into chunks and selects relevant chunks per user question. If no relevant chunk is found, falls back to next chunks in document order - no context is lost. """ import re import logging from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) # Default chunk size (~5k tokens each), overlap for context continuity DEFAULT_CHUNK_SIZE = 15_000 DEFAULT_CHUNK_OVERLAP = 500 # Stopwords for relevance scoring (DE/EN/FR - minimal set) STOPWORDS = { "der", "die", "das", "den", "dem", "des", "ein", "eine", "einer", "eines", "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "is", "are", "was", "were", "be", "been", "le", "la", "les", "un", "une", "des", "du", "de", "et", "ou", "was", "wer", "wie", "wo", "wann", "warum", "welche", "welcher", } def _extract_keywords(text: str) -> set: """Extract significant words for relevance scoring.""" text_lower = text.lower() words = re.findall(r"\b[a-zàâäéèêëïîôùûüÿæœçß]{2,}\b", text_lower) return {w for w in words if w not in STOPWORDS and len(w) > 1} def chunk_sections( sections: List[Dict[str, Any]], chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP ) -> List[Dict[str, Any]]: """ Split sections into overlapping chunks. Each chunk has: chunkIndex, text, fileName, fileId, sectionIndex. """ chunks = [] chunk_index = 0 for sec_idx, section in enumerate(sections): text = section.get("text", "") file_name = section.get("fileName", "document") file_id = section.get("fileId", "") if not text: continue start = 0 while start < len(text): end = min(start + chunk_size, len(text)) chunk_text = text[start:end] chunks.append({ "chunkIndex": chunk_index, "text": chunk_text, "fileName": file_name, "fileId": file_id, "sectionIndex": sec_idx, "startChar": start, "endChar": end, }) chunk_index += 1 start = end - chunk_overlap if end < len(text) else len(text) return chunks def chunk_text_blocks( text_blocks: List[Dict[str, Any]], chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP ) -> List[Dict[str, Any]]: """Split textBlocks (blocks per file) into chunks.""" chunks = [] chunk_index = 0 for doc in text_blocks: blocks = doc.get("blocks", []) file_name = doc.get("fileName", "document") file_id = doc.get("fileId", "") text_parts = [] for b in blocks: text_parts.append(b.get("text", "")) full_text = "\n".join(text_parts) if not full_text: continue start = 0 while start < len(full_text): end = min(start + chunk_size, len(full_text)) chunk_text = full_text[start:end] chunks.append({ "chunkIndex": chunk_index, "text": chunk_text, "fileName": file_name, "fileId": file_id, "startChar": start, "endChar": end, }) chunk_index += 1 start = end - chunk_overlap if end < len(full_text) else len(full_text) return chunks def score_chunk_relevance(chunk_text: str, question: str) -> float: """ Score how relevant a chunk is to the user question. Uses keyword overlap with simple IDF-like weighting. Returns 0 if no overlap. """ if not question or not chunk_text: return 0.0 q_words = _extract_keywords(question) if not q_words: return 0.0 chunk_lower = chunk_text.lower() score = 0.0 for w in q_words: count = chunk_lower.count(w) if count > 0: score += 1.0 + 0.5 * min(count - 1, 3) return score def get_ordered_chunks_for_question( chunks: List[Dict[str, Any]], question: str ) -> List[Dict[str, Any]]: """ Return chunks ordered by relevance (or doc order if no match). Does NOT limit by max_context_chars - used for iterative batch retrieval. """ if not chunks: return [] scored = [] for c in chunks: score = score_chunk_relevance(c.get("text", ""), question) scored.append((score, c)) max_score = max(s for s, _ in scored) if max_score > 0: scored.sort(key=lambda x: (-x[0], x[1].get("chunkIndex", 0))) else: scored.sort(key=lambda x: x[1].get("chunkIndex", 0)) logger.debug("No chunk matched question keywords - using chunks in document order") return [c for _, c in scored] def get_chunk_batch( ordered_chunks: List[Dict[str, Any]], batch_index: int, max_context_chars: int ) -> List[Dict[str, Any]]: """ Get the Nth batch of chunks that fits in max_context_chars. Batch 0 = first chunk(s) that fit, batch 1 = next chunk(s), etc. Returns [] when batch_index is beyond available chunks. """ if not ordered_chunks or batch_index < 0: return [] used_chars = 0 chunks_in_batch = 0 start_idx = 0 # Find start index for this batch (skip chunks from previous batches) for _ in range(batch_index): batch_chars = 0 i = start_idx while i < len(ordered_chunks): text = ordered_chunks[i].get("text", "") if batch_chars + len(text) <= max_context_chars: batch_chars += len(text) i += 1 else: if batch_chars < max_context_chars and len(text) > 0: batch_chars += min(len(text), max_context_chars - batch_chars) i += 1 break start_idx = i if start_idx >= len(ordered_chunks): return [] # Collect chunks for this batch selected = [] for i in range(start_idx, len(ordered_chunks)): chunk = ordered_chunks[i] text = chunk.get("text", "") if not text: continue if used_chars + len(text) <= max_context_chars: selected.append(chunk) used_chars += len(text) else: if used_chars < max_context_chars: remaining = max_context_chars - used_chars selected.append({**chunk, "text": text[:remaining]}) break return selected def select_chunks_for_question( chunks: List[Dict[str, Any]], question: str, max_context_chars: int ) -> List[Dict[str, Any]]: """ Select first batch of chunks (for backward compatibility). For iterative retry, use get_ordered_chunks_for_question + get_chunk_batch. """ ordered = get_ordered_chunks_for_question(chunks, question) return get_chunk_batch(ordered, 0, max_context_chars) # Phrases that indicate the AI found no relevant content (DE/EN/FR) NOT_FOUND_PHRASES = [ "nicht enthalten", "nicht im kontext", "nicht im dokument", "nicht im bereitgestellten", "nicht auffindbar", "keine information", "keine angaben", "nicht gefunden", "nicht verfügbar", "kein hinweis", "enthalten nicht", "artikel nicht enthalten", "nicht im vorliegenden", "bereitgestellten kontext enthält nicht", "im kontext nicht", "not contained", "not found", "no information", "not in the context", "not in the document", "pas dans le contexte", "non trouvé", ] def response_indicates_not_found(response_content: str) -> bool: """ Check if the AI response indicates that the requested content was not found. When True, we should try the next chunk batch. """ if not response_content or not isinstance(response_content, str): return False text_lower = response_content.lower().strip() # Only treat short/medium responses as "not found" - long answers may mention it incidentally if len(text_lower) > 600: return False for phrase in NOT_FOUND_PHRASES: if phrase in text_lower: return True return False