diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py index 26614ff0..3abccdc4 100644 --- a/modules/routes/routeDataFiles.py +++ b/modules/routes/routeDataFiles.py @@ -122,9 +122,30 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user): f"{contentIndex.totalObjects} objects" ) - # Persist FileContentIndex immediately + # Persist FileContentIndex immediately. + # IMPORTANT: preserve `_ingestion` metadata and `status="indexed"` from any + # prior successful run — otherwise this upsert wipes the idempotency cache + # and requestIngestion cannot detect duplicates (AC4 breaks). from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface knowledgeDb = getKnowledgeInterface() + try: + _existing = knowledgeDb.getFileContentIndex(fileId) + except Exception: + _existing = None + if _existing: + _existingStruct = ( + _existing.get("structure") if isinstance(_existing, dict) + else getattr(_existing, "structure", {}) + ) or {} + _existingStatus = ( + _existing.get("status") if isinstance(_existing, dict) + else getattr(_existing, "status", "") + ) or "" + if "_ingestion" in _existingStruct: + contentIndex.structure = dict(contentIndex.structure or {}) + contentIndex.structure["_ingestion"] = _existingStruct["_ingestion"] + if _existingStatus == "indexed": + contentIndex.status = "indexed" knowledgeDb.upsertFileContentIndex(contentIndex) # Step 2: Content extraction (AI-free, produces ContentParts) diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_documentTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_documentTools.py index 64b3a147..62413103 100644 --- a/modules/serviceCenter/services/serviceAgent/coreTools/_documentTools.py +++ b/modules/serviceCenter/services/serviceAgent/coreTools/_documentTools.py @@ -11,8 +11,6 @@ from modules.serviceCenter.services.serviceAgent.toolRegistry import ToolRegistr from modules.serviceCenter.services.serviceAgent.coreTools._helpers import ( _getOrCreateTempFolder, - _looksLikeBinary, - _resolveFileScope, _MAX_TOOL_RESULT_CHARS, ) @@ -392,74 +390,7 @@ def _registerDocumentTools(registry: ToolRegistry, services): if chunkMime: mimeType = chunkMime - # 2) File not yet indexed -> trigger extraction via ExtractionService, then retry - if not imageData and knowledgeService and not knowledgeService.isFileIndexed(fileId): - try: - chatService = services.chat - fileInfo = chatService.getFileInfo(fileId) - fileContent = chatService.getFileContent(fileId) - if fileContent and fileInfo: - rawData = fileContent.get("data", "") - if isinstance(rawData, str) and len(rawData) > 100: - rawBytes = _b64.b64decode(rawData) - elif isinstance(rawData, bytes): - rawBytes = rawData - else: - rawBytes = None - - if rawBytes: - from modules.serviceCenter.services.serviceExtraction.subRegistry import ExtractorRegistry - from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction - from modules.datamodels.datamodelExtraction import ExtractionOptions - - fileMime = fileInfo.get("mimeType", "application/octet-stream") - fileName = fileInfo.get("fileName", fileId) - extracted = runExtraction( - ExtractorRegistry(), None, - rawBytes, fileName, fileMime, - ExtractionOptions(mergeStrategy=None), - ) - - contentObjects = [] - for part in extracted.parts: - tg = (part.typeGroup or "").lower() - ct = "image" if tg == "image" else "text" - if not part.data or not part.data.strip(): - continue - contentObjects.append({ - "contentObjectId": part.id, - "contentType": ct, - "data": part.data, - "contextRef": {"containerPath": fileName, "location": part.label, **(part.metadata or {})}, - }) - - if contentObjects: - _diFiId, _diMId = _resolveFileScope(fileId, context) - from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob - await knowledgeService.requestIngestion( - IngestionJob( - sourceKind="agent_tool", - sourceId=fileId, - fileName=fileName, - mimeType=fileMime, - userId=context.get("userId", ""), - contentObjects=contentObjects, - featureInstanceId=_diFiId, - mandateId=_diMId, - provenance={"lane": "agent", "tool": "describeImage"}, - ) - ) - - chunks = knowledgeService._knowledgeDb.getContentChunks(fileId) - imageChunks = [c for c in (chunks or []) if c.get("contentType") == "image"] - if pageIndex is not None: - imageChunks = [c for c in imageChunks if c.get("contextRef", {}).get("pageIndex") == pageIndex] - if imageChunks: - imageData = imageChunks[0].get("data", "") - except Exception as extractErr: - logger.warning(f"describeImage: on-demand extraction failed: {extractErr}") - - # 3) Direct image file (not a container) - use raw file data + # 2) Direct image file (not a container) - use raw file data if not imageData: chatService = services.chat fileContent = chatService.getFileContent(fileId) @@ -469,7 +400,7 @@ def _registerDocumentTools(registry: ToolRegistry, services): imageData = fileContent.get("data", "") mimeType = fileMimeType - # 4) PDF page rendering: render the requested page as an image via PyMuPDF + # 3) PDF page rendering: render the requested page as an image via PyMuPDF if not imageData: chatService = services.chat fileInfo = chatService.getFileInfo(fileId) if hasattr(chatService, "getFileInfo") else None diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_workspaceTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_workspaceTools.py index aa337472..e413c3f0 100644 --- a/modules/serviceCenter/services/serviceAgent/coreTools/_workspaceTools.py +++ b/modules/serviceCenter/services/serviceAgent/coreTools/_workspaceTools.py @@ -14,7 +14,6 @@ from modules.serviceCenter.services.serviceAgent.coreTools._helpers import ( _getOrCreateInstanceFolder, _getOrCreateTempFolder, _looksLikeBinary, - _resolveFileScope, _MAX_TOOL_RESULT_CHARS, ) @@ -50,6 +49,7 @@ def _registerWorkspaceTools(registry: ToolRegistry, services): return ToolResult(toolCallId="", toolName="readFile", success=False, error="fileId is required") try: knowledgeService = services.getService("knowledge") if hasattr(services, "getService") else None + fileStatus = None # 1) Knowledge Store: return already-extracted text chunks if knowledgeService: @@ -77,7 +77,8 @@ def _registerWorkspaceTools(registry: ToolRegistry, services): data=f"[File {fileId} is currently being processed (status: {fileStatus}). Try again shortly.]", ) - # 2) Not indexed yet: try on-demand extraction + # 2) Not indexed yet: inspect file type to decide how to serve the agent + # (binary -> instruct agent to wait / re-upload; text -> decode raw bytes inline) chatService = services.chat fileInfo = chatService.getFileInfo(fileId) if not fileInfo: @@ -100,92 +101,14 @@ def _registerWorkspaceTools(registry: ToolRegistry, services): isBinary = _looksLikeBinary(rawBytes) if isBinary: - try: - from modules.serviceCenter.services.serviceExtraction.subRegistry import ExtractorRegistry, ChunkerRegistry - from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction - from modules.datamodels.datamodelExtraction import ExtractionOptions - - extracted = runExtraction( - ExtractorRegistry(), ChunkerRegistry(), - rawBytes, fileName, mimeType, - ExtractionOptions(mergeStrategy=None), - ) - - contentObjects = [] - for part in extracted.parts: - tg = (part.typeGroup or "").lower() - ct = "image" if tg == "image" else "text" - if not part.data or not part.data.strip(): - continue - contentObjects.append({ - "contentObjectId": part.id, - "contentType": ct, - "data": part.data, - "contextRef": { - "containerPath": fileName, - "location": part.label or "file", - **(part.metadata or {}), - }, - }) - - if contentObjects: - if knowledgeService: - try: - userId = context.get("userId", "") - _fiId, _mId = _resolveFileScope(fileId, context) - from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob - await knowledgeService.requestIngestion( - IngestionJob( - sourceKind="agent_tool", - sourceId=fileId, - fileName=fileName, - mimeType=mimeType, - userId=userId, - contentObjects=contentObjects, - featureInstanceId=_fiId, - mandateId=_mId, - provenance={"lane": "agent", "tool": "readFile"}, - ) - ) - except Exception as e: - logger.warning(f"readFile: knowledge indexing failed for {fileId}: {e}") - - joined = "" - if knowledgeService: - _chunks = knowledgeService._knowledgeDb.getContentChunks(fileId) - _textChunks = [ - c for c in (_chunks or []) - if c.get("contentType") != "image" and c.get("data") - ] - if _textChunks: - joined = "\n\n".join(c["data"] for c in _textChunks) - if not joined: - textParts = [o["data"] for o in contentObjects if o["contentType"] != "image"] - joined = "\n\n".join(textParts) if textParts else "" - if joined: - chunked = _applyOffsetLimit(joined, offset, limit) - if chunked is not None: - return ToolResult(toolCallId="", toolName="readFile", success=True, data=chunked) - if len(joined) > _MAX_TOOL_RESULT_CHARS: - joined = joined[:_MAX_TOOL_RESULT_CHARS] + f"\n\n[Truncated – showing first {_MAX_TOOL_RESULT_CHARS} chars of {len(joined)}. Use offset/limit to read specific sections.]" - return ToolResult( - toolCallId="", toolName="readFile", success=True, - data=joined, - ) - imgCount = sum(1 for o in contentObjects if o["contentType"] == "image") - return ToolResult( - toolCallId="", toolName="readFile", success=True, - data=f"[Extracted {len(contentObjects)} content objects from '{fileName}' " - f"({imgCount} images, no readable text). " - f"Use describeImage(fileId='{fileId}') to analyze visual content.]", - ) - except Exception as extractErr: - logger.warning(f"readFile extraction failed for {fileId} ({fileName}): {extractErr}") - return ToolResult( toolCallId="", toolName="readFile", success=True, - data=f"[Binary file: '{fileName}', type={mimeType}, size={len(rawBytes)} bytes. " - f"Text extraction not available. Use describeImage for images.]", + data=( + f"[File '{fileName}' ({mimeType}) is not yet indexed " + f"(status: {fileStatus or 'unknown'}). Indexing runs automatically " + f"on upload. Please wait a few seconds and retry, or re-upload the file. " + f"For visual content use describeImage(fileId='{fileId}').]" + ), ) # 3) Text file: decode raw bytes diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py index 716ade31..57490eba 100644 --- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py +++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py @@ -62,23 +62,23 @@ class IngestionHandle: def _computeIngestionHash(contentObjects: List[Dict[str, Any]]) -> str: - """Deterministic SHA256 over (contentObjectId, contentType, data) tuples. + """Deterministic SHA256 over (contentType, data) tuples in extractor order. - Sorted by contentObjectId so re-ordering of extractor output does not - invalidate the cache; text whitespace is preserved intentionally because - chunk boundaries depend on it. + `contentObjectId` is intentionally excluded because extractors generate + fresh UUIDs per run (`uuid.uuid4()`), which would make the hash unstable + across re-extractions of the same source — defeating idempotency. + Order is preserved (no sort) because two different documents can share the + same multiset of parts but differ in arrangement (e.g. swapped pages). + Text whitespace is preserved intentionally because chunk boundaries + depend on it. """ - normalized = sorted( + normalized = [ ( - ( - str(o.get("contentObjectId", "") or ""), - str(o.get("contentType", "text") or "text"), - o.get("data", "") or "", - ) - for o in (contentObjects or []) - ), - key=lambda t: t[0], - ) + str(o.get("contentType", "text") or "text"), + o.get("data", "") or "", + ) + for o in (contentObjects or []) + ] payload = json.dumps(normalized, ensure_ascii=False, separators=(",", ":")) return hashlib.sha256(payload.encode("utf-8")).hexdigest() diff --git a/tests/unit/services/test_ingestion_hash_stability.py b/tests/unit/services/test_ingestion_hash_stability.py new file mode 100644 index 00000000..df25a4f0 --- /dev/null +++ b/tests/unit/services/test_ingestion_hash_stability.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Test that _computeIngestionHash is stable across re-extractions of the same source. + +Extractors generate fresh contentObjectIds (uuid.uuid4()) per run. The ingestion +hash MUST therefore be derived from content (contentType + data + order) only — +otherwise idempotency (AC4) silently fails: every re-extraction looks "new" and +triggers full re-embedding. +""" + +import os +import sys +import uuid + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import ( + _computeIngestionHash, +) + + +def _makeObjects(seed: str = "alpha"): + """Build a synthetic contentObjects list as routeDataFiles._autoIndexFile would.""" + return [ + { + "contentObjectId": str(uuid.uuid4()), + "contentType": "text", + "data": f"Page 1 of {seed}", + }, + { + "contentObjectId": str(uuid.uuid4()), + "contentType": "text", + "data": f"Page 2 of {seed}", + }, + { + "contentObjectId": str(uuid.uuid4()), + "contentType": "binary", + "data": "", + }, + ] + + +def test_hash_stable_across_uuid_regeneration(): + """Same content + different contentObjectIds → same hash.""" + a = _makeObjects("alpha") + b = _makeObjects("alpha") # identical data, fresh UUIDs + assert [o["contentObjectId"] for o in a] != [o["contentObjectId"] for o in b] + assert _computeIngestionHash(a) == _computeIngestionHash(b) + + +def test_hash_changes_when_data_changes(): + a = _makeObjects("alpha") + b = _makeObjects("beta") + assert _computeIngestionHash(a) != _computeIngestionHash(b) + + +def test_hash_is_order_sensitive(): + """Reordered pages produce a different hash (different document).""" + a = _makeObjects("alpha") + b = list(reversed(a)) + assert _computeIngestionHash(a) != _computeIngestionHash(b) + + +def test_hash_distinguishes_text_vs_binary_with_same_payload(): + a = [{"contentObjectId": "x", "contentType": "text", "data": "hello"}] + b = [{"contentObjectId": "x", "contentType": "binary", "data": "hello"}] + assert _computeIngestionHash(a) != _computeIngestionHash(b) + + +def test_hash_handles_empty_input(): + assert _computeIngestionHash([]) == _computeIngestionHash([]) + + +if __name__ == "__main__": + test_hash_stable_across_uuid_regeneration() + test_hash_changes_when_data_changes() + test_hash_is_order_sensitive() + test_hash_distinguishes_text_vs_binary_with_same_payload() + test_hash_handles_empty_input() + print("OK — all 5 ingestion-hash stability tests passed")