306 lines
12 KiB
Python
306 lines
12 KiB
Python
# Copyright (c) 2026 PowerOn AG
|
|
# All rights reserved.
|
|
"""Feature-data RAG bootstrap: indexes FeatureDataSource rows into the knowledge store.
|
|
|
|
Analogous to connection.bootstrap for external connections (Google, Microsoft),
|
|
this handler reads FeatureDataSource records with ragIndexEnabled=True, queries
|
|
the underlying feature tables via FeatureDataProvider, serialises each row into
|
|
text, and feeds it through KnowledgeService.requestIngestion so the data
|
|
appears in ContentChunk embeddings for semantic RAG search.
|
|
|
|
Job type: ``feature.bootstrap``
|
|
Payload: ``{"featureInstanceId": "...", "featureDataSourceIds": [...] (optional)}``
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FEATURE_BOOTSTRAP_JOB_TYPE = "feature.bootstrap"
|
|
|
|
|
|
def _loadRagEnabledFds(featureInstanceId: str, featureDataSourceIds: Optional[List[str]] = None):
|
|
"""Load FeatureDataSource rows whose effective ragIndexEnabled is True.
|
|
|
|
Returns dicts with resolved flags so downstream code can read them directly.
|
|
"""
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.datamodels.datamodelFeatures import FeatureDataSource
|
|
from modules.serviceCenter.core.flagResolution import getEffectiveFlagFds
|
|
|
|
rootIf = getRootInterface()
|
|
allFds = rootIf.db.getRecordset(
|
|
FeatureDataSource, recordFilter={"featureInstanceId": featureInstanceId}
|
|
)
|
|
resolved = []
|
|
for fds in allFds:
|
|
tblName = (fds.get("tableName") if isinstance(fds, dict) else getattr(fds, "tableName", "")) or ""
|
|
fCode = (fds.get("featureCode") if isinstance(fds, dict) else getattr(fds, "featureCode", "")) or ""
|
|
if tblName == "*" or not tblName or not fCode:
|
|
continue
|
|
effRag = getEffectiveFlagFds(fds, "ragIndexEnabled", allFds, mode="aggregate")
|
|
if effRag is not True:
|
|
continue
|
|
row = dict(fds) if isinstance(fds, dict) else {**fds.__dict__}
|
|
row["_effectiveNeutralize"] = getEffectiveFlagFds(fds, "neutralize", allFds, mode="aggregate")
|
|
row["ragIndexEnabled"] = True
|
|
resolved.append(row)
|
|
|
|
if featureDataSourceIds:
|
|
idSet = set(featureDataSourceIds)
|
|
resolved = [r for r in resolved if r.get("id") in idSet]
|
|
return resolved
|
|
|
|
|
|
def _serializeRowToText(row: Dict[str, Any], neutralizeFields: Optional[List[str]] = None) -> str:
|
|
"""Convert a feature-table row into readable text for embedding.
|
|
|
|
Skips internal fields (starting with '_' or 'sys') and produces
|
|
``key: value`` lines that embed well semantically.
|
|
"""
|
|
neutralizeSet = set(neutralizeFields or [])
|
|
lines = []
|
|
for key, value in row.items():
|
|
if key.startswith("_") or key.startswith("sys"):
|
|
continue
|
|
if key == "id":
|
|
continue
|
|
if value is None or value == "" or value == []:
|
|
continue
|
|
if key in neutralizeSet:
|
|
value = "[REDACTED]"
|
|
elif isinstance(value, (dict, list)):
|
|
value = json.dumps(value, ensure_ascii=False, default=str)
|
|
else:
|
|
value = str(value)
|
|
lines.append(f"{key}: {value}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _getFeatureDbConnector(featureCode: str):
|
|
"""Create a lightweight DB connector to the feature database."""
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
dbName = f"poweron_{featureCode.lower()}"
|
|
return DatabaseConnector(
|
|
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
|
|
dbDatabase=dbName,
|
|
dbUser=APP_CONFIG.get("DB_USER"),
|
|
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
|
|
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
|
|
userId="system.feature_bootstrap",
|
|
)
|
|
|
|
|
|
async def _featureBootstrapHandler(
|
|
job: Dict[str, Any],
|
|
progressCb,
|
|
) -> Dict[str, Any]:
|
|
"""Walk RAG-enabled FeatureDataSources and index their rows."""
|
|
payload = job.get("payload") or {}
|
|
featureInstanceId = payload.get("featureInstanceId")
|
|
featureDataSourceIds = payload.get("featureDataSourceIds")
|
|
if not featureInstanceId:
|
|
raise ValueError("feature.bootstrap requires payload.featureInstanceId")
|
|
|
|
progressCb(5, messageKey="Feature-Datenquellen werden geladen...")
|
|
|
|
fdsList = _loadRagEnabledFds(featureInstanceId, featureDataSourceIds)
|
|
if not fdsList:
|
|
logger.info(
|
|
"feature.bootstrap.skipped — no rag-enabled FDS for feature %s",
|
|
featureInstanceId,
|
|
)
|
|
return {"featureInstanceId": featureInstanceId, "skipped": True, "reason": "no_rag_enabled_fds"}
|
|
|
|
from modules.serviceCenter.core.types import createFeatureDataProvider
|
|
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
|
|
from modules.serviceCenter.context import ServiceCenterContext
|
|
from modules.serviceCenter import getService
|
|
from modules.security.rootAccess import getRootUser
|
|
|
|
totalIndexed = 0
|
|
totalSkipped = 0
|
|
totalFailed = 0
|
|
fdsResults = []
|
|
|
|
for fdsIdx, fds in enumerate(fdsList):
|
|
fdsId = fds.get("id", "")
|
|
featureCode = fds.get("featureCode", "")
|
|
tableName = fds.get("tableName", "")
|
|
fdsFeatureInstanceId = fds.get("featureInstanceId", "")
|
|
mandateId = fds.get("mandateId", "")
|
|
neutralizeFields = fds.get("neutralizeFields") or []
|
|
recordFilter = fds.get("recordFilter") or {}
|
|
effectiveNeutralize = bool(fds.get("_effectiveNeutralize", False))
|
|
|
|
progressPct = 5 + int(90 * fdsIdx / len(fdsList))
|
|
progressCb(
|
|
progressPct,
|
|
messageKey="Indexiere {table} ({n}/{total})...",
|
|
messageParams={"table": tableName, "n": fdsIdx + 1, "total": len(fdsList)},
|
|
)
|
|
|
|
if not featureCode or not tableName or not fdsFeatureInstanceId:
|
|
logger.warning("feature.bootstrap: skipping FDS %s — missing featureCode/tableName/fiId", fdsId)
|
|
continue
|
|
|
|
try:
|
|
dbConnector = _getFeatureDbConnector(featureCode)
|
|
|
|
rootUser = getRootUser()
|
|
ctx = ServiceCenterContext(
|
|
user=rootUser,
|
|
mandateId=mandateId,
|
|
featureInstanceId=fdsFeatureInstanceId,
|
|
)
|
|
knowledgeService = getService("knowledge", ctx)
|
|
|
|
# A2: index the SAME neutralized text the query path returns (parity).
|
|
neutralizationService = getService("neutralization", ctx)
|
|
neutralizePolicy = None
|
|
if effectiveNeutralize or neutralizeFields:
|
|
neutralizePolicy = {
|
|
tableName: {
|
|
"tableActive": bool(effectiveNeutralize),
|
|
"explicitFields": set(neutralizeFields),
|
|
}
|
|
}
|
|
provider = createFeatureDataProvider(
|
|
dbConnector,
|
|
neutralizePolicy=neutralizePolicy,
|
|
neutralizationService=neutralizationService,
|
|
)
|
|
|
|
extraFilters = [
|
|
{"field": k, "op": "=", "value": v}
|
|
for k, v in recordFilter.items()
|
|
] if recordFilter else None
|
|
|
|
batchSize = 200
|
|
offset = 0
|
|
fdsIndexed = 0
|
|
fdsSkipped = 0
|
|
fdsFailed = 0
|
|
|
|
while True:
|
|
result = provider.browseTable(
|
|
tableName=tableName,
|
|
featureInstanceId=fdsFeatureInstanceId,
|
|
mandateId=mandateId,
|
|
limit=batchSize,
|
|
offset=offset,
|
|
extraFilters=extraFilters,
|
|
)
|
|
rows = result.get("rows", [])
|
|
if not rows:
|
|
break
|
|
|
|
# Apply the A2 field-neutralization policy + JSON-serialize (same as
|
|
# the sub-agent query path) before building the embedding text.
|
|
rows = await provider.finalizeRowsAsync(tableName, rows)
|
|
|
|
for row in rows:
|
|
rowId = row.get("id", "")
|
|
if not rowId:
|
|
continue
|
|
|
|
textContent = _serializeRowToText(row)
|
|
if not textContent.strip():
|
|
fdsSkipped += 1
|
|
continue
|
|
|
|
contentVersion = str(row.get("sysUpdatedAt") or row.get("sysCreatedAt") or "")
|
|
|
|
ingestionJob = IngestionJob(
|
|
sourceKind="feature_record",
|
|
sourceId=f"{fdsFeatureInstanceId}:{tableName}:{rowId}",
|
|
fileName=f"{tableName}-{rowId}",
|
|
mimeType="application/vnd.poweron.feature-record+json",
|
|
userId="system",
|
|
featureInstanceId=fdsFeatureInstanceId,
|
|
mandateId=mandateId,
|
|
contentObjects=[{
|
|
"contentType": "text",
|
|
"data": textContent,
|
|
"contextRef": {
|
|
"table": tableName,
|
|
"featureCode": featureCode,
|
|
"featureInstanceId": fdsFeatureInstanceId,
|
|
"rowId": rowId,
|
|
},
|
|
"contentObjectId": f"{tableName}:{rowId}",
|
|
}],
|
|
structure={"sourceTable": tableName, "featureCode": featureCode},
|
|
contentVersion=contentVersion,
|
|
provenance={
|
|
"featureDataSourceId": fdsId,
|
|
"tableName": tableName,
|
|
"featureCode": featureCode,
|
|
"featureInstanceId": fdsFeatureInstanceId,
|
|
},
|
|
neutralize=effectiveNeutralize,
|
|
)
|
|
|
|
try:
|
|
handle = await knowledgeService.requestIngestion(ingestionJob)
|
|
if handle.status == "failed":
|
|
fdsFailed += 1
|
|
logger.warning(
|
|
"feature.bootstrap: ingestion failed fds=%s table=%s row=%s error=%s",
|
|
fdsId, tableName, rowId, handle.error,
|
|
)
|
|
elif handle.status == "duplicate":
|
|
fdsSkipped += 1
|
|
else:
|
|
fdsIndexed += 1
|
|
except Exception as ingErr:
|
|
fdsFailed += 1
|
|
logger.error(
|
|
"feature.bootstrap: ingestion error fds=%s row=%s: %s",
|
|
fdsId, rowId, ingErr,
|
|
)
|
|
|
|
offset += batchSize
|
|
if len(rows) < batchSize:
|
|
break
|
|
|
|
totalIndexed += fdsIndexed
|
|
totalSkipped += fdsSkipped
|
|
totalFailed += fdsFailed
|
|
|
|
fdsResults.append({
|
|
"featureDataSourceId": fdsId,
|
|
"tableName": tableName,
|
|
"featureCode": featureCode,
|
|
"indexed": fdsIndexed,
|
|
"skippedDuplicate": fdsSkipped,
|
|
"failed": fdsFailed,
|
|
})
|
|
|
|
except Exception as fdsErr:
|
|
logger.error(
|
|
"feature.bootstrap: error processing FDS %s (%s.%s): %s",
|
|
fdsId, featureCode, tableName, fdsErr, exc_info=True,
|
|
)
|
|
fdsResults.append({
|
|
"featureDataSourceId": fdsId,
|
|
"tableName": tableName,
|
|
"featureCode": featureCode,
|
|
"error": str(fdsErr),
|
|
})
|
|
|
|
progressCb(100, messageKey="Feature-Daten-Sync abgeschlossen.")
|
|
|
|
return {
|
|
"featureInstanceId": featureInstanceId,
|
|
"indexed": totalIndexed,
|
|
"skippedDuplicate": totalSkipped,
|
|
"failed": totalFailed,
|
|
"dataSources": fdsResults,
|
|
}
|