From 9d82d3d353070a06b66825a4decf8b0716c24a86 Mon Sep 17 00:00:00 2001 From: Ida Date: Tue, 21 Apr 2026 11:27:44 +0200 Subject: [PATCH] P0: injection facade --- .../commcoach/serviceCommcoachIndexer.py | 28 +- modules/routes/routeDataFiles.py | 26 +- .../serviceAgent/coreTools/_documentTools.py | 18 +- .../serviceAgent/coreTools/_workspaceTools.py | 18 +- .../serviceKnowledge/mainServiceKnowledge.py | 303 +++++++++++++++++- tests/unit/serviceKnowledge/__init__.py | 0 .../serviceKnowledge/test_requestIngestion.py | 172 ++++++++++ 7 files changed, 536 insertions(+), 29 deletions(-) create mode 100644 tests/unit/serviceKnowledge/__init__.py create mode 100644 tests/unit/serviceKnowledge/test_requestIngestion.py diff --git a/modules/features/commcoach/serviceCommcoachIndexer.py b/modules/features/commcoach/serviceCommcoachIndexer.py index b43764a1..2f042795 100644 --- a/modules/features/commcoach/serviceCommcoachIndexer.py +++ b/modules/features/commcoach/serviceCommcoachIndexer.py @@ -174,14 +174,26 @@ async def indexSessionData( for c in chunks ] - await knowledgeService.indexFile( - fileId=syntheticFileId, - fileName=f"coaching-session-{sessionId[:8]}", - mimeType="application/x-coaching-session", - userId=userId, - featureInstanceId=featureInstanceId, - mandateId=mandateId, - contentObjects=contentObjects, + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + + await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="coaching_session", + sourceId=syntheticFileId, + fileName=f"coaching-session-{sessionId[:8]}", + mimeType="application/x-coaching-session", + userId=userId, + featureInstanceId=featureInstanceId, + mandateId=mandateId, + contentObjects=contentObjects, + provenance={ + "lane": "feature", + "feature": "commcoach", + "sessionId": sessionId, + "contextId": contextId, + "messageCount": len(messages or []), + }, + ) ) logger.info(f"Successfully indexed coaching session {sessionId} ({len(chunks)} chunks)") except Exception as e: diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py index 90431ba2..f281d15e 100644 --- a/modules/routes/routeDataFiles.py +++ b/modules/routes/routeDataFiles.py @@ -77,7 +77,7 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user): """Background task: pre-scan + extraction + knowledge indexing. Step 1: Structure Pre-Scan (AI-free) -> FileContentIndex (persisted) Step 2: Content extraction via runExtraction -> ContentParts - Step 3: KnowledgeService.indexFile -> chunking + embedding -> Knowledge Store""" + Step 3: KnowledgeService.requestIngestion -> idempotent chunking + embedding -> Knowledge Store""" userId = user.id if hasattr(user, "id") else str(user) try: mgmtInterface = interfaceDbManagement.getInterface(user) @@ -181,15 +181,21 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user): ) knowledgeService = getService("knowledge", ctx) - await knowledgeService.indexFile( - fileId=fileId, - fileName=fileName, - mimeType=mimeType, - userId=userId, - featureInstanceId=str(feature_instance_id) if feature_instance_id else "", - mandateId=str(mandate_id) if mandate_id else "", - contentObjects=contentObjects, - structure=contentIndex.structure, + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + + await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="file", + sourceId=fileId, + fileName=fileName, + mimeType=mimeType, + userId=userId, + featureInstanceId=str(feature_instance_id) if feature_instance_id else "", + mandateId=str(mandate_id) if mandate_id else "", + contentObjects=contentObjects, + structure=contentIndex.structure, + provenance={"lane": "upload", "route": "routeDataFiles._autoIndexFile"}, + ) ) # Re-acquire interface after await to avoid stale user context from the singleton diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_documentTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_documentTools.py index a48e53b3..b9b00755 100644 --- a/modules/serviceCenter/services/serviceAgent/coreTools/_documentTools.py +++ b/modules/serviceCenter/services/serviceAgent/coreTools/_documentTools.py @@ -434,11 +434,19 @@ def _registerDocumentTools(registry: ToolRegistry, services): if contentObjects: _diFiId, _diMId = _resolveFileScope(fileId, context) - await knowledgeService.indexFile( - fileId=fileId, fileName=fileName, mimeType=fileMime, - userId=context.get("userId", ""), contentObjects=contentObjects, - featureInstanceId=_diFiId, - mandateId=_diMId, + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="agent_tool", + sourceId=fileId, + fileName=fileName, + mimeType=fileMime, + userId=context.get("userId", ""), + contentObjects=contentObjects, + featureInstanceId=_diFiId, + mandateId=_diMId, + provenance={"lane": "agent", "tool": "describeImage"}, + ) ) chunks = knowledgeService._knowledgeDb.getContentChunks(fileId) diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_workspaceTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_workspaceTools.py index 9a6af658..bb548081 100644 --- a/modules/serviceCenter/services/serviceAgent/coreTools/_workspaceTools.py +++ b/modules/serviceCenter/services/serviceAgent/coreTools/_workspaceTools.py @@ -132,11 +132,19 @@ def _registerWorkspaceTools(registry: ToolRegistry, services): try: userId = context.get("userId", "") _fiId, _mId = _resolveFileScope(fileId, context) - await knowledgeService.indexFile( - fileId=fileId, fileName=fileName, mimeType=mimeType, - userId=userId, contentObjects=contentObjects, - featureInstanceId=_fiId, - mandateId=_mId, + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="agent_tool", + sourceId=fileId, + fileName=fileName, + mimeType=mimeType, + userId=userId, + contentObjects=contentObjects, + featureInstanceId=_fiId, + mandateId=_mId, + provenance={"lane": "agent", "tool": "readFile"}, + ) ) except Exception as e: logger.warning(f"readFile: knowledge indexing failed for {fileId}: {e}") diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py index dab8cc25..716ade31 100644 --- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py +++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py @@ -2,9 +2,13 @@ # All rights reserved. """Knowledge service: 3-tier RAG with indexing, semantic search, and context building.""" +import hashlib +import json import logging import re -from typing import Any, Callable, Dict, List, Optional +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional, Union from modules.datamodels.datamodelKnowledge import ( FileContentIndex, ContentChunk, WorkflowMemory, @@ -20,6 +24,65 @@ DEFAULT_CHUNK_TOKENS = 400 DEFAULT_CONTEXT_BUDGET = 12000 +# ============================================================================= +# Ingestion façade (P0 of unified-knowledge-indexing concept) +# ============================================================================= + +@dataclass +class IngestionJob: + """One request to add or refresh content in the unified knowledge store. + + Callers from any lane (routes, feature hooks, agent tools, connector sync) + describe the work they want done via this object; idempotency, scope + resolution, and embedding are handled by KnowledgeService.requestIngestion. + """ + sourceKind: str + sourceId: str + fileName: str + mimeType: str + userId: str + contentObjects: List[Dict[str, Any]] = field(default_factory=list) + featureInstanceId: str = "" + mandateId: str = "" + structure: Optional[Dict[str, Any]] = None + containerPath: Optional[str] = None + contentVersion: Optional[str] = None + provenance: Optional[Dict[str, Any]] = None + + +@dataclass +class IngestionHandle: + """Result of requestIngestion. Stable across in-process and future queue impls.""" + jobId: str + status: str + contentHash: str + fileId: str + index: Optional[FileContentIndex] = None + error: Optional[str] = None + + +def _computeIngestionHash(contentObjects: List[Dict[str, Any]]) -> str: + """Deterministic SHA256 over (contentObjectId, contentType, data) tuples. + + Sorted by contentObjectId so re-ordering of extractor output does not + invalidate the cache; text whitespace is preserved intentionally because + chunk boundaries depend on it. + """ + normalized = sorted( + ( + ( + str(o.get("contentObjectId", "") or ""), + str(o.get("contentType", "text") or "text"), + o.get("data", "") or "", + ) + for o in (contentObjects or []) + ), + key=lambda t: t[0], + ) + payload = json.dumps(normalized, ensure_ascii=False, separators=(",", ":")) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + class KnowledgeService: """Service for Knowledge Store operations: indexing, retrieval, and context building.""" @@ -46,6 +109,196 @@ class KnowledgeService: results = await self._embed([text]) return results[0] if results else [] + # ========================================================================= + # Ingestion façade (single entry point for all lanes) + # ========================================================================= + + async def requestIngestion(self, job: IngestionJob) -> IngestionHandle: + """Unified entry point for filling the knowledge corpus. + + Applies idempotency based on a content hash (or caller-supplied + `contentVersion`) persisted in `FileContentIndex.structure._ingestion`. + Re-runs indexing only when the hash differs or the previous run did + not reach `indexed` state. Runs embedding synchronously for now + (callers already schedule background tasks where needed). + """ + jobId = f"{job.sourceKind}:{job.sourceId}" + startMs = time.time() + contentHash = job.contentVersion or _computeIngestionHash(job.contentObjects) + + # 1. Check for duplicate via existing FileContentIndex row. + existing = None + try: + existing = self._knowledgeDb.getFileContentIndex(job.sourceId) + except Exception: + existing = None + + if existing: + existingStructure = ( + existing.get("structure") if isinstance(existing, dict) + else getattr(existing, "structure", {}) + ) or {} + existingMeta = existingStructure.get("_ingestion", {}) or {} + existingStatus = ( + existing.get("status") if isinstance(existing, dict) + else getattr(existing, "status", "") + ) or "" + if existingMeta.get("hash") == contentHash and existingStatus == "indexed": + logger.info( + "ingestion.skipped.duplicate sourceKind=%s sourceId=%s hash=%s", + job.sourceKind, job.sourceId, contentHash[:12], + extra={ + "event": "ingestion.skipped.duplicate", + "jobId": jobId, + "sourceKind": job.sourceKind, + "sourceId": job.sourceId, + "hash": contentHash, + "durationMs": int((time.time() - startMs) * 1000), + }, + ) + return IngestionHandle( + jobId=jobId, + status="duplicate", + contentHash=contentHash, + fileId=job.sourceId, + index=None, + ) + + # 2. Prepare ingestion metadata; stays in structure._ingestion so + # later connector revoke/purge can filter chunks by sourceKind / + # provenance.connectionId without a schema migration. + ingestionMeta = { + "hash": contentHash, + "sourceKind": job.sourceKind, + "sourceId": job.sourceId, + "contentVersion": job.contentVersion, + "indexedAt": getUtcTimestamp(), + "provenance": dict(job.provenance or {}), + } + structure = dict(job.structure or {}) + structure["_ingestion"] = ingestionMeta + + logger.info( + "ingestion.queued sourceKind=%s sourceId=%s objects=%d hash=%s", + job.sourceKind, job.sourceId, len(job.contentObjects or []), contentHash[:12], + extra={ + "event": "ingestion.queued", + "jobId": jobId, + "sourceKind": job.sourceKind, + "sourceId": job.sourceId, + "hash": contentHash, + "objectCount": len(job.contentObjects or []), + }, + ) + + # 3. Run real indexing. + try: + index = await self._indexFileInternal( + fileId=job.sourceId, + fileName=job.fileName, + mimeType=job.mimeType, + userId=job.userId, + featureInstanceId=job.featureInstanceId, + mandateId=job.mandateId, + contentObjects=job.contentObjects or [], + structure=structure, + containerPath=job.containerPath, + ) + except Exception as exc: + logger.error( + "ingestion.failed sourceKind=%s sourceId=%s error=%s", + job.sourceKind, job.sourceId, exc, + exc_info=True, + extra={ + "event": "ingestion.failed", + "jobId": jobId, + "sourceKind": job.sourceKind, + "sourceId": job.sourceId, + "hash": contentHash, + "error": str(exc), + "durationMs": int((time.time() - startMs) * 1000), + }, + ) + try: + self._knowledgeDb.updateFileStatus(job.sourceId, "failed") + except Exception: + pass + return IngestionHandle( + jobId=jobId, + status="failed", + contentHash=contentHash, + fileId=job.sourceId, + index=None, + error=str(exc), + ) + + logger.info( + "ingestion.indexed sourceKind=%s sourceId=%s objects=%d durationMs=%d", + job.sourceKind, job.sourceId, len(job.contentObjects or []), + int((time.time() - startMs) * 1000), + extra={ + "event": "ingestion.indexed", + "jobId": jobId, + "sourceKind": job.sourceKind, + "sourceId": job.sourceId, + "hash": contentHash, + "objectCount": len(job.contentObjects or []), + "durationMs": int((time.time() - startMs) * 1000), + }, + ) + return IngestionHandle( + jobId=jobId, + status="indexed", + contentHash=contentHash, + fileId=job.sourceId, + index=index, + ) + + def getIngestionStatus( + self, handleOrJobId: Union[IngestionHandle, str] + ) -> Dict[str, Any]: + """Map a handle or `sourceKind:sourceId` jobId to a status snapshot.""" + if isinstance(handleOrJobId, IngestionHandle): + sourceId = handleOrJobId.fileId + jobId = handleOrJobId.jobId + elif isinstance(handleOrJobId, str) and ":" in handleOrJobId: + jobId = handleOrJobId + sourceId = handleOrJobId.split(":", 1)[1] + else: + jobId = str(handleOrJobId) + sourceId = str(handleOrJobId) + + row = None + try: + row = self._knowledgeDb.getFileContentIndex(sourceId) + except Exception: + row = None + if not row: + return { + "jobId": jobId, + "sourceId": sourceId, + "status": "unknown", + "contentHash": None, + } + + structure = ( + row.get("structure") if isinstance(row, dict) + else getattr(row, "structure", {}) + ) or {} + meta = structure.get("_ingestion", {}) or {} + status = ( + row.get("status") if isinstance(row, dict) + else getattr(row, "status", "") + ) or "unknown" + return { + "jobId": jobId, + "sourceId": sourceId, + "status": status, + "contentHash": meta.get("hash"), + "sourceKind": meta.get("sourceKind"), + "indexedAt": meta.get("indexedAt"), + } + # ========================================================================= # File Indexing (called after extraction, before embedding) # ========================================================================= @@ -61,6 +314,54 @@ class KnowledgeService: contentObjects: List[Dict[str, Any]] = None, structure: Dict[str, Any] = None, containerPath: str = None, + ) -> Optional[FileContentIndex]: + """Backward-compatible wrapper delegating to requestIngestion. + + Existing callers that still invoke `indexFile` directly automatically + participate in the idempotency/metrics layer. New callers should + prefer `requestIngestion` so they can pass `sourceKind` and + `provenance` for connector revoke/purge later. + """ + job = IngestionJob( + sourceKind="file", + sourceId=fileId, + fileName=fileName, + mimeType=mimeType, + userId=userId, + featureInstanceId=featureInstanceId, + mandateId=mandateId, + contentObjects=list(contentObjects or []), + structure=structure, + containerPath=containerPath, + ) + handle = await self.requestIngestion(job) + if handle.index is not None: + return handle.index + if handle.status == "duplicate": + row = None + try: + row = self._knowledgeDb.getFileContentIndex(fileId) + except Exception: + row = None + if isinstance(row, dict): + try: + return FileContentIndex(**row) + except Exception: + return None + return row + return None + + async def _indexFileInternal( + self, + fileId: str, + fileName: str, + mimeType: str, + userId: str, + featureInstanceId: str = "", + mandateId: str = "", + contentObjects: List[Dict[str, Any]] = None, + structure: Dict[str, Any] = None, + containerPath: str = None, ) -> FileContentIndex: """Index a file's content objects and create embeddings for text chunks. diff --git a/tests/unit/serviceKnowledge/__init__.py b/tests/unit/serviceKnowledge/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/serviceKnowledge/test_requestIngestion.py b/tests/unit/serviceKnowledge/test_requestIngestion.py new file mode 100644 index 00000000..595faeff --- /dev/null +++ b/tests/unit/serviceKnowledge/test_requestIngestion.py @@ -0,0 +1,172 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Unit tests for the P0 ingestion facade on KnowledgeService. + +Covers acceptance criteria AC4 (idempotent ingestion for unchanged content) +and hash stability. The knowledge DB interface and AI embedding service are +stubbed so the test runs without any external dependency. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import ( + IngestionJob, + KnowledgeService, + _computeIngestionHash, +) + + +class _StubKnowledgeDb: + """Minimal in-memory stand-in for interfaceDbKnowledge.""" + + def __init__(self): + self.index = None + self.upsertIndexCalls = 0 + self.upsertChunkCalls = 0 + + def upsertFileContentIndex(self, index): + self.index = index.model_dump() if hasattr(index, "model_dump") else dict(index) + self.upsertIndexCalls += 1 + + def upsertContentChunk(self, chunk): + self.upsertChunkCalls += 1 + + def updateFileStatus(self, fileId, status): + if self.index is not None: + self.index["status"] = status + + def getFileContentIndex(self, fileId): + return self.index + + +def _makeService(): + """Create a KnowledgeService with stubbed db and ai dependencies.""" + stubDb = _StubKnowledgeDb() + + aiService = MagicMock() + + async def _callEmbedding(texts): + return MagicMock( + errorCount=0, + content="", + metadata={"embeddings": [[0.0] * 4 for _ in texts]}, + ) + + aiService.callEmbedding = _callEmbedding + + def getService(name): + if name == "ai": + return aiService + raise KeyError(name) + + context = MagicMock() + context.user = MagicMock() + # Return a non-empty but empty-dict record so the FileItem lookup branch + # in _indexFileInternal resolves without touching a real DB. + context.interfaceDbComponent = MagicMock() + context.interfaceDbComponent.getRecordset = MagicMock(return_value=[{}]) + + with patch( + "modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge.getKnowledgeInterface", + return_value=stubDb, + ): + service = KnowledgeService(context, getService) + + return service, stubDb + + +@pytest.mark.asyncio +async def test_duplicate_skipped(): + service, db = _makeService() + job = IngestionJob( + sourceKind="file", + sourceId="file-123", + fileName="a.txt", + mimeType="text/plain", + userId="u1", + contentObjects=[ + {"contentObjectId": "c1", "contentType": "text", "data": "hello world"} + ], + ) + + first = await service.requestIngestion(job) + assert first.status == "indexed" + chunksAfterFirst = db.upsertChunkCalls + assert chunksAfterFirst >= 1 + + second = await service.requestIngestion(job) + assert second.status == "duplicate" + assert second.contentHash == first.contentHash + # No additional embedding work. + assert db.upsertChunkCalls == chunksAfterFirst + + +@pytest.mark.asyncio +async def test_reindex_on_content_change(): + service, db = _makeService() + base = IngestionJob( + sourceKind="file", + sourceId="file-123", + fileName="a.txt", + mimeType="text/plain", + userId="u1", + contentObjects=[ + {"contentObjectId": "c1", "contentType": "text", "data": "hello world"} + ], + ) + first = await service.requestIngestion(base) + assert first.status == "indexed" + chunksAfterFirst = db.upsertChunkCalls + + changed = IngestionJob( + sourceKind="file", + sourceId="file-123", + fileName="a.txt", + mimeType="text/plain", + userId="u1", + contentObjects=[ + {"contentObjectId": "c1", "contentType": "text", "data": "hello universe"} + ], + ) + second = await service.requestIngestion(changed) + assert second.status == "indexed" + assert second.contentHash != first.contentHash + assert db.upsertChunkCalls > chunksAfterFirst + + +def test_hash_stable_under_reordering(): + a = [ + {"contentObjectId": "c1", "contentType": "text", "data": "alpha"}, + {"contentObjectId": "c2", "contentType": "text", "data": "beta"}, + ] + b = list(reversed(a)) + assert _computeIngestionHash(a) == _computeIngestionHash(b) + + +def test_hash_changes_on_data_edit(): + a = [{"contentObjectId": "c1", "contentType": "text", "data": "alpha"}] + b = [{"contentObjectId": "c1", "contentType": "text", "data": "alpha!"}] + assert _computeIngestionHash(a) != _computeIngestionHash(b) + + +@pytest.mark.asyncio +async def test_get_ingestion_status_after_index(): + service, _db = _makeService() + job = IngestionJob( + sourceKind="coaching_session", + sourceId="coaching-session:abc", + fileName="session", + mimeType="application/x-coaching-session", + userId="u1", + contentObjects=[ + {"contentObjectId": "m0", "contentType": "text", "data": "User: hi"} + ], + provenance={"lane": "feature", "feature": "commcoach"}, + ) + handle = await service.requestIngestion(job) + status = service.getIngestionStatus(handle) + assert status["status"] == "indexed" + assert status["sourceKind"] == "coaching_session" + assert status["contentHash"] == handle.contentHash