# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Feature-data RAG bootstrap: indexes FeatureDataSource rows into the knowledge store. Analogous to connection.bootstrap for external connections (Google, Microsoft), this handler reads FeatureDataSource records with ragIndexEnabled=True, queries the underlying feature tables via FeatureDataProvider, serialises each row into text, and feeds it through KnowledgeService.requestIngestion so the data appears in ContentChunk embeddings for semantic RAG search. Job type: ``feature.bootstrap`` Payload: ``{"featureInstanceId": "...", "featureDataSourceIds": [...] (optional)}`` """ from __future__ import annotations import json import logging from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) FEATURE_BOOTSTRAP_JOB_TYPE = "feature.bootstrap" def _loadRagEnabledFds(featureInstanceId: str, featureDataSourceIds: Optional[List[str]] = None): """Load FeatureDataSource rows whose effective ragIndexEnabled is True. Returns dicts with resolved flags so downstream code can read them directly. """ from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlagFds rootIf = getRootInterface() allFds = rootIf.db.getRecordset( FeatureDataSource, recordFilter={"featureInstanceId": featureInstanceId} ) resolved = [] for fds in allFds: tblName = (fds.get("tableName") if isinstance(fds, dict) else getattr(fds, "tableName", "")) or "" fCode = (fds.get("featureCode") if isinstance(fds, dict) else getattr(fds, "featureCode", "")) or "" if tblName == "*" or not tblName or not fCode: continue effRag = getEffectiveFlagFds(fds, "ragIndexEnabled", allFds, mode="aggregate") if effRag is not True: continue row = dict(fds) if isinstance(fds, dict) else {**fds.__dict__} row["_effectiveNeutralize"] = getEffectiveFlagFds(fds, "neutralize", allFds, mode="aggregate") row["ragIndexEnabled"] = True resolved.append(row) if featureDataSourceIds: idSet = set(featureDataSourceIds) resolved = [r for r in resolved if r.get("id") in idSet] return resolved def _serializeRowToText(row: Dict[str, Any], neutralizeFields: Optional[List[str]] = None) -> str: """Convert a feature-table row into readable text for embedding. Skips internal fields (starting with '_' or 'sys') and produces ``key: value`` lines that embed well semantically. """ neutralizeSet = set(neutralizeFields or []) lines = [] for key, value in row.items(): if key.startswith("_") or key.startswith("sys"): continue if key == "id": continue if value is None or value == "" or value == []: continue if key in neutralizeSet: value = "[REDACTED]" elif isinstance(value, (dict, list)): value = json.dumps(value, ensure_ascii=False, default=str) else: value = str(value) lines.append(f"{key}: {value}") return "\n".join(lines) def _getFeatureDbConnector(featureCode: str): """Create a lightweight DB connector to the feature database.""" from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG dbName = f"poweron_{featureCode.lower()}" return DatabaseConnector( dbHost=APP_CONFIG.get("DB_HOST", "localhost"), dbDatabase=dbName, dbUser=APP_CONFIG.get("DB_USER"), dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"), dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), userId="system.feature_bootstrap", ) async def _featureBootstrapHandler( job: Dict[str, Any], progressCb, ) -> Dict[str, Any]: """Walk RAG-enabled FeatureDataSources and index their rows.""" payload = job.get("payload") or {} featureInstanceId = payload.get("featureInstanceId") featureDataSourceIds = payload.get("featureDataSourceIds") if not featureInstanceId: raise ValueError("feature.bootstrap requires payload.featureInstanceId") progressCb(5, messageKey="Feature-Datenquellen werden geladen...") fdsList = _loadRagEnabledFds(featureInstanceId, featureDataSourceIds) if not fdsList: logger.info( "feature.bootstrap.skipped — no rag-enabled FDS for feature %s", featureInstanceId, ) return {"featureInstanceId": featureInstanceId, "skipped": True, "reason": "no_rag_enabled_fds"} from modules.serviceCenter.services.serviceAgent.featureDataProvider import FeatureDataProvider from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob from modules.serviceCenter.context import ServiceCenterContext from modules.serviceCenter import getService from modules.security.rootAccess import getRootUser totalIndexed = 0 totalSkipped = 0 totalFailed = 0 fdsResults = [] for fdsIdx, fds in enumerate(fdsList): fdsId = fds.get("id", "") featureCode = fds.get("featureCode", "") tableName = fds.get("tableName", "") fdsFeatureInstanceId = fds.get("featureInstanceId", "") mandateId = fds.get("mandateId", "") neutralizeFields = fds.get("neutralizeFields") or [] recordFilter = fds.get("recordFilter") or {} effectiveNeutralize = bool(fds.get("_effectiveNeutralize", False)) progressPct = 5 + int(90 * fdsIdx / len(fdsList)) progressCb( progressPct, messageKey="Indexiere {table} ({n}/{total})...", messageParams={"table": tableName, "n": fdsIdx + 1, "total": len(fdsList)}, ) if not featureCode or not tableName or not fdsFeatureInstanceId: logger.warning("feature.bootstrap: skipping FDS %s — missing featureCode/tableName/fiId", fdsId) continue try: dbConnector = _getFeatureDbConnector(featureCode) provider = FeatureDataProvider(dbConnector) rootUser = getRootUser() ctx = ServiceCenterContext( user=rootUser, mandate_id=mandateId, feature_instance_id=fdsFeatureInstanceId, ) knowledgeService = getService("knowledge", ctx) extraFilters = [ {"field": k, "op": "=", "value": v} for k, v in recordFilter.items() ] if recordFilter else None batchSize = 200 offset = 0 fdsIndexed = 0 fdsSkipped = 0 fdsFailed = 0 while True: result = provider.browseTable( tableName=tableName, featureInstanceId=fdsFeatureInstanceId, mandateId=mandateId, limit=batchSize, offset=offset, extraFilters=extraFilters, ) rows = result.get("rows", []) if not rows: break for row in rows: rowId = row.get("id", "") if not rowId: continue textContent = _serializeRowToText(row, neutralizeFields if effectiveNeutralize else None) if not textContent.strip(): fdsSkipped += 1 continue contentVersion = str(row.get("sysUpdatedAt") or row.get("sysCreatedAt") or "") ingestionJob = IngestionJob( sourceKind="feature_record", sourceId=f"{fdsFeatureInstanceId}:{tableName}:{rowId}", fileName=f"{tableName}-{rowId}", mimeType="application/vnd.poweron.feature-record+json", userId="system", featureInstanceId=fdsFeatureInstanceId, mandateId=mandateId, contentObjects=[{ "contentType": "text", "data": textContent, "contextRef": { "table": tableName, "featureCode": featureCode, "featureInstanceId": fdsFeatureInstanceId, "rowId": rowId, }, "contentObjectId": f"{tableName}:{rowId}", }], structure={"sourceTable": tableName, "featureCode": featureCode}, contentVersion=contentVersion, provenance={ "featureDataSourceId": fdsId, "tableName": tableName, "featureCode": featureCode, "featureInstanceId": fdsFeatureInstanceId, }, neutralize=effectiveNeutralize, ) try: handle = await knowledgeService.requestIngestion(ingestionJob) if handle.status == "failed": fdsFailed += 1 logger.warning( "feature.bootstrap: ingestion failed fds=%s table=%s row=%s error=%s", fdsId, tableName, rowId, handle.error, ) elif handle.status == "duplicate": fdsSkipped += 1 else: fdsIndexed += 1 except Exception as ingErr: fdsFailed += 1 logger.error( "feature.bootstrap: ingestion error fds=%s row=%s: %s", fdsId, rowId, ingErr, ) offset += batchSize if len(rows) < batchSize: break totalIndexed += fdsIndexed totalSkipped += fdsSkipped totalFailed += fdsFailed fdsResults.append({ "featureDataSourceId": fdsId, "tableName": tableName, "featureCode": featureCode, "indexed": fdsIndexed, "skippedDuplicate": fdsSkipped, "failed": fdsFailed, }) except Exception as fdsErr: logger.error( "feature.bootstrap: error processing FDS %s (%s.%s): %s", fdsId, featureCode, tableName, fdsErr, exc_info=True, ) fdsResults.append({ "featureDataSourceId": fdsId, "tableName": tableName, "featureCode": featureCode, "error": str(fdsErr), }) progressCb(100, messageKey="Feature-Daten-Sync abgeschlossen.") return { "featureInstanceId": featureInstanceId, "indexed": totalIndexed, "skippedDuplicate": totalSkipped, "failed": totalFailed, "dataSources": fdsResults, }