platform-core/modules/serviceCenter/services/serviceKnowledge/subFeatureBootstrap.py
ValueOn AG d61e29bcac
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 24s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
fixes private model and udb scoping sources
2026-06-03 09:37:03 +02:00

306 lines
12 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Feature-data RAG bootstrap: indexes FeatureDataSource rows into the knowledge store.
Analogous to connection.bootstrap for external connections (Google, Microsoft),
this handler reads FeatureDataSource records with ragIndexEnabled=True, queries
the underlying feature tables via FeatureDataProvider, serialises each row into
text, and feeds it through KnowledgeService.requestIngestion so the data
appears in ContentChunk embeddings for semantic RAG search.
Job type: ``feature.bootstrap``
Payload: ``{"featureInstanceId": "...", "featureDataSourceIds": [...] (optional)}``
"""
from __future__ import annotations
import json
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
FEATURE_BOOTSTRAP_JOB_TYPE = "feature.bootstrap"
def _loadRagEnabledFds(featureInstanceId: str, featureDataSourceIds: Optional[List[str]] = None):
"""Load FeatureDataSource rows whose effective ragIndexEnabled is True.
Returns dicts with resolved flags so downstream code can read them directly.
"""
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlagFds
rootIf = getRootInterface()
allFds = rootIf.db.getRecordset(
FeatureDataSource, recordFilter={"featureInstanceId": featureInstanceId}
)
resolved = []
for fds in allFds:
tblName = (fds.get("tableName") if isinstance(fds, dict) else getattr(fds, "tableName", "")) or ""
fCode = (fds.get("featureCode") if isinstance(fds, dict) else getattr(fds, "featureCode", "")) or ""
if tblName == "*" or not tblName or not fCode:
continue
effRag = getEffectiveFlagFds(fds, "ragIndexEnabled", allFds, mode="aggregate")
if effRag is not True:
continue
row = dict(fds) if isinstance(fds, dict) else {**fds.__dict__}
row["_effectiveNeutralize"] = getEffectiveFlagFds(fds, "neutralize", allFds, mode="aggregate")
row["ragIndexEnabled"] = True
resolved.append(row)
if featureDataSourceIds:
idSet = set(featureDataSourceIds)
resolved = [r for r in resolved if r.get("id") in idSet]
return resolved
def _serializeRowToText(row: Dict[str, Any], neutralizeFields: Optional[List[str]] = None) -> str:
"""Convert a feature-table row into readable text for embedding.
Skips internal fields (starting with '_' or 'sys') and produces
``key: value`` lines that embed well semantically.
"""
neutralizeSet = set(neutralizeFields or [])
lines = []
for key, value in row.items():
if key.startswith("_") or key.startswith("sys"):
continue
if key == "id":
continue
if value is None or value == "" or value == []:
continue
if key in neutralizeSet:
value = "[REDACTED]"
elif isinstance(value, (dict, list)):
value = json.dumps(value, ensure_ascii=False, default=str)
else:
value = str(value)
lines.append(f"{key}: {value}")
return "\n".join(lines)
def _getFeatureDbConnector(featureCode: str):
"""Create a lightweight DB connector to the feature database."""
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
dbName = f"poweron_{featureCode.lower()}"
return DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase=dbName,
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
userId="system.feature_bootstrap",
)
async def _featureBootstrapHandler(
job: Dict[str, Any],
progressCb,
) -> Dict[str, Any]:
"""Walk RAG-enabled FeatureDataSources and index their rows."""
payload = job.get("payload") or {}
featureInstanceId = payload.get("featureInstanceId")
featureDataSourceIds = payload.get("featureDataSourceIds")
if not featureInstanceId:
raise ValueError("feature.bootstrap requires payload.featureInstanceId")
progressCb(5, messageKey="Feature-Datenquellen werden geladen...")
fdsList = _loadRagEnabledFds(featureInstanceId, featureDataSourceIds)
if not fdsList:
logger.info(
"feature.bootstrap.skipped — no rag-enabled FDS for feature %s",
featureInstanceId,
)
return {"featureInstanceId": featureInstanceId, "skipped": True, "reason": "no_rag_enabled_fds"}
from modules.serviceCenter.services.serviceAgent.featureDataProvider import FeatureDataProvider
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter import getService
from modules.security.rootAccess import getRootUser
totalIndexed = 0
totalSkipped = 0
totalFailed = 0
fdsResults = []
for fdsIdx, fds in enumerate(fdsList):
fdsId = fds.get("id", "")
featureCode = fds.get("featureCode", "")
tableName = fds.get("tableName", "")
fdsFeatureInstanceId = fds.get("featureInstanceId", "")
mandateId = fds.get("mandateId", "")
neutralizeFields = fds.get("neutralizeFields") or []
recordFilter = fds.get("recordFilter") or {}
effectiveNeutralize = bool(fds.get("_effectiveNeutralize", False))
progressPct = 5 + int(90 * fdsIdx / len(fdsList))
progressCb(
progressPct,
messageKey="Indexiere {table} ({n}/{total})...",
messageParams={"table": tableName, "n": fdsIdx + 1, "total": len(fdsList)},
)
if not featureCode or not tableName or not fdsFeatureInstanceId:
logger.warning("feature.bootstrap: skipping FDS %s — missing featureCode/tableName/fiId", fdsId)
continue
try:
dbConnector = _getFeatureDbConnector(featureCode)
rootUser = getRootUser()
ctx = ServiceCenterContext(
user=rootUser,
mandate_id=mandateId,
feature_instance_id=fdsFeatureInstanceId,
)
knowledgeService = getService("knowledge", ctx)
# A2: index the SAME neutralized text the query path returns (parity).
neutralizationService = getService("neutralization", ctx)
neutralizePolicy = None
if effectiveNeutralize or neutralizeFields:
neutralizePolicy = {
tableName: {
"tableActive": bool(effectiveNeutralize),
"explicitFields": set(neutralizeFields),
}
}
provider = FeatureDataProvider(
dbConnector,
neutralizePolicy=neutralizePolicy,
neutralizationService=neutralizationService,
)
extraFilters = [
{"field": k, "op": "=", "value": v}
for k, v in recordFilter.items()
] if recordFilter else None
batchSize = 200
offset = 0
fdsIndexed = 0
fdsSkipped = 0
fdsFailed = 0
while True:
result = provider.browseTable(
tableName=tableName,
featureInstanceId=fdsFeatureInstanceId,
mandateId=mandateId,
limit=batchSize,
offset=offset,
extraFilters=extraFilters,
)
rows = result.get("rows", [])
if not rows:
break
# Apply the A2 field-neutralization policy + JSON-serialize (same as
# the sub-agent query path) before building the embedding text.
rows = await provider.finalizeRowsAsync(tableName, rows)
for row in rows:
rowId = row.get("id", "")
if not rowId:
continue
textContent = _serializeRowToText(row)
if not textContent.strip():
fdsSkipped += 1
continue
contentVersion = str(row.get("sysUpdatedAt") or row.get("sysCreatedAt") or "")
ingestionJob = IngestionJob(
sourceKind="feature_record",
sourceId=f"{fdsFeatureInstanceId}:{tableName}:{rowId}",
fileName=f"{tableName}-{rowId}",
mimeType="application/vnd.poweron.feature-record+json",
userId="system",
featureInstanceId=fdsFeatureInstanceId,
mandateId=mandateId,
contentObjects=[{
"contentType": "text",
"data": textContent,
"contextRef": {
"table": tableName,
"featureCode": featureCode,
"featureInstanceId": fdsFeatureInstanceId,
"rowId": rowId,
},
"contentObjectId": f"{tableName}:{rowId}",
}],
structure={"sourceTable": tableName, "featureCode": featureCode},
contentVersion=contentVersion,
provenance={
"featureDataSourceId": fdsId,
"tableName": tableName,
"featureCode": featureCode,
"featureInstanceId": fdsFeatureInstanceId,
},
neutralize=effectiveNeutralize,
)
try:
handle = await knowledgeService.requestIngestion(ingestionJob)
if handle.status == "failed":
fdsFailed += 1
logger.warning(
"feature.bootstrap: ingestion failed fds=%s table=%s row=%s error=%s",
fdsId, tableName, rowId, handle.error,
)
elif handle.status == "duplicate":
fdsSkipped += 1
else:
fdsIndexed += 1
except Exception as ingErr:
fdsFailed += 1
logger.error(
"feature.bootstrap: ingestion error fds=%s row=%s: %s",
fdsId, rowId, ingErr,
)
offset += batchSize
if len(rows) < batchSize:
break
totalIndexed += fdsIndexed
totalSkipped += fdsSkipped
totalFailed += fdsFailed
fdsResults.append({
"featureDataSourceId": fdsId,
"tableName": tableName,
"featureCode": featureCode,
"indexed": fdsIndexed,
"skippedDuplicate": fdsSkipped,
"failed": fdsFailed,
})
except Exception as fdsErr:
logger.error(
"feature.bootstrap: error processing FDS %s (%s.%s): %s",
fdsId, featureCode, tableName, fdsErr, exc_info=True,
)
fdsResults.append({
"featureDataSourceId": fdsId,
"tableName": tableName,
"featureCode": featureCode,
"error": str(fdsErr),
})
progressCb(100, messageKey="Feature-Daten-Sync abgeschlossen.")
return {
"featureInstanceId": featureInstanceId,
"indexed": totalIndexed,
"skippedDuplicate": totalSkipped,
"failed": totalFailed,
"dataSources": fdsResults,
}