# Copyright (c) 2026 PowerOn AG # All rights reserved. """Generic data provider for querying feature-instance tables. Uses the RBAC catalog's DATA_OBJECTS metadata (table name, fields) and the DB connector to execute scoped, read-only queries against any registered feature table. All queries are automatically filtered by featureInstanceId and mandateId so data isolation is guaranteed. """ import asyncio import hashlib import logging import json import os import time from pathlib import Path from typing import Any, Dict, List, Optional, Set logger = logging.getLogger(__name__) _DEBUG_DIR = Path("D:/Athi/Local/Web/poweron/local/debug") def _isDebugEnabled() -> bool: try: from modules.shared.configuration import APP_CONFIG val = APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", False) return val is True or str(val).lower() == "true" except Exception: return False def _debugQueryLog(method: str, tableName: str, params: dict, result: dict, elapsed: float): """Append query + result to local/debug/debug_queryTable.log.""" if not _isDebugEnabled(): return debugDir = _DEBUG_DIR try: debugDir.mkdir(parents=True, exist_ok=True) logPath = debugDir / "debug_queryTable.log" ts = time.strftime("%Y-%m-%d %H:%M:%S") rows = result.get("rows", []) total = result.get("total", len(rows)) err = result.get("error") header = f"[{ts}] {method}({tableName}) — {len(rows)} rows returned, total={total}, elapsed={elapsed:.0f}ms" if err: header += f", ERROR={err}" lines = [header] lines.append(f" params: {json.dumps(params, ensure_ascii=False, default=str)}") for i, row in enumerate(rows): lines.append(f" [{i}] {json.dumps(row, ensure_ascii=False, default=str)}") lines.append("") with open(logPath, "a", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") except Exception: pass _ALLOWED_OPERATORS = {"=", "!=", ">", "<", ">=", "<=", "LIKE", "ILIKE", "IS NULL", "IS NOT NULL"} _ALLOWED_AGGREGATES = {"SUM", "COUNT", "AVG", "MIN", "MAX"} class FeatureDataProvider: """Reads feature-instance data from the DB using DATA_OBJECTS metadata.""" def __init__( self, dbConnector, neutralizeFields: Optional[Dict[str, List[str]]] = None, neutralizePolicy: Optional[Dict[str, Dict[str, Any]]] = None, neutralizationService: Optional[Any] = None, ): """ Args: dbConnector: A connectorDbPostgre.DatabaseConnector with an open connection. neutralizeFields: LEGACY per-table field names whose values are replaced with a whole-value placeholder ``[NEUT..]``. Kept for backward compatibility; superseded by ``neutralizePolicy``. neutralizePolicy: Per-table type/inheritance-aware policy, e.g. ``{"TrusteePosition": {"tableActive": True, "explicitFields": {"iban"}}}``. * ``tableActive`` -- effective (own/inherited) table-level neutralize flag. * ``explicitFields`` -- fields whose neutralize flag is set EXPLICITLY. Applied via :meth:`finalizeRowsAsync` following the A2 rules: strings substring-neutralized when effective (explicit or inherited), binary dropped, other scalars only when explicit. neutralizationService: The mandate/instance-scoped NeutralizationService used for substring neutralization of string cells (reuses the standard neutralization engine; no external LLM is introduced here). """ self._db = dbConnector self._neutralizeFields: Dict[str, Set[str]] = { tbl: set(fields) for tbl, fields in (neutralizeFields or {}).items() } self._neutralizePolicy: Dict[str, Dict[str, Any]] = neutralizePolicy or {} self._neutralizer = neutralizationService # ------------------------------------------------------------------ # public API (called by FeatureDataAgent tools) # ------------------------------------------------------------------ def getAvailableTables(self, featureCode: str) -> List[Dict[str, Any]]: """Return DATA_OBJECTS registered for *featureCode*.""" from modules.security.rbacCatalog import getCatalogService catalog = getCatalogService() return catalog.getDataObjects(featureCode) def getTableSchema(self, featureCode: str, tableName: str) -> Optional[Dict[str, Any]]: """Return the DATA_OBJECT entry for a specific table.""" for obj in self.getAvailableTables(featureCode): if obj.get("meta", {}).get("table") == tableName: return obj return None def getActualColumns(self, tableName: str) -> List[str]: """Read real column names from PostgreSQL information_schema.""" try: with self._db.borrowCursor() as cur: cur.execute( "SELECT column_name FROM information_schema.columns " "WHERE table_schema = 'public' AND LOWER(table_name) = LOWER(%s) " "ORDER BY ordinal_position", [tableName], ) cols = [row["column_name"] for row in cur.fetchall()] return [c for c in cols if not c.startswith("_")] except Exception as e: logger.warning(f"getActualColumns({tableName}) failed: {e}") return [] async def finalizeRowsAsync(self, tableName: str, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Make raw DB rows AI-safe: apply the field-neutralization policy and JSON-serialize. The query methods (``browseTable``/``queryTable``/``aggregateTable``) return RAW rows so this step can see the real Python types (bytes vs str vs scalar). * When a rich ``neutralizePolicy`` exists for the table, the A2 type/inheritance rules apply (see :func:`_neutralizeAndSerializeRows`). * Otherwise the legacy whole-value behavior is preserved (``neutralizeFields``). * With no neutralization at all, rows are just JSON-serialized. Always returns JSON-serializable rows. """ policy = self._neutralizePolicy.get(tableName) if policy: return await _neutralizeAndSerializeRows(rows, policy, self._neutralizer) serialized = [_serializeRow(dict(r)) for r in rows] legacyFields = self._neutralizeFields.get(tableName) if legacyFields: serialized = [_neutralizeRowFields(row, legacyFields) for row in serialized] return serialized def browseTable( self, tableName: str, featureInstanceId: str, mandateId: str, fields: List[str] = None, limit: int = 50, offset: int = 0, extraFilters: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """List rows from a feature table with pagination. Returns ``{"rows": [...], "total": N, "limit": L, "offset": O}``. """ _validateTableName(tableName) if fields: invalid = [f for f in fields if not _isValidIdentifier(f)] if invalid: return { "rows": [], "total": 0, "limit": limit, "offset": offset, "error": f"Invalid field name(s): {', '.join(invalid)}. Use getActualColumns to discover valid column names.", } scopeFilter = _buildScopeFilter(tableName, featureInstanceId, mandateId, db=self._db) extraWhere, extraParams = _buildFilterClauses(extraFilters) fullWhere = scopeFilter["where"] allParams = list(scopeFilter["params"]) if extraWhere: fullWhere += " AND " + extraWhere allParams.extend(extraParams) t0 = time.time() try: with self._db.borrowCursor() as cur: countSql = f'SELECT COUNT(*) FROM "{tableName}" WHERE {fullWhere}' cur.execute(countSql, allParams) total = cur.fetchone()["count"] if cur.rowcount else 0 selectCols = ", ".join(f'"{f}"' for f in fields) if fields else "*" dataSql = ( f'SELECT {selectCols} FROM "{tableName}" ' f'WHERE {fullWhere} ' f'ORDER BY "id" LIMIT %s OFFSET %s' ) cur.execute(dataSql, allParams + [limit, offset]) # Return RAW rows; neutralization + JSON-serialization happen in # finalizeRowsAsync (needs the real Python types to apply A2 rules). rows = [dict(r) for r in cur.fetchall()] result = {"rows": rows, "total": total, "limit": limit, "offset": offset} _debugQueryLog("browseTable", tableName, { "fields": fields, "limit": limit, "offset": offset, }, result, (time.time() - t0) * 1000) return result except Exception as e: logger.error(f"browseTable({tableName}) failed: {e}") elapsed = (time.time() - t0) * 1000 errResult = {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)} _debugQueryLog("browseTable", tableName, { "fields": fields, "limit": limit, "offset": offset, }, errResult, elapsed) return errResult def aggregateTable( self, tableName: str, featureInstanceId: str, mandateId: str, aggregate: str, field: str, groupBy: str = None, extraFilters: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """Run an aggregate query (SUM, COUNT, AVG, MIN, MAX) on a feature table. Returns ``{"rows": [{"groupValue": ..., "result": ...}], "aggregate": ..., "field": ..., "groupBy": ...}``. """ _validateTableName(tableName) aggregate = aggregate.upper() if aggregate not in _ALLOWED_AGGREGATES: return {"rows": [], "error": f"Unsupported aggregate: {aggregate}. Allowed: {', '.join(sorted(_ALLOWED_AGGREGATES))}"} if not _isValidIdentifier(field): return {"rows": [], "error": f"Invalid field name: {field}"} if groupBy and not _isValidIdentifier(groupBy): return {"rows": [], "error": f"Invalid groupBy field: {groupBy}"} scopeFilter = _buildScopeFilter(tableName, featureInstanceId, mandateId, db=self._db) extraWhere, extraParams = _buildFilterClauses(extraFilters) fullWhere = scopeFilter["where"] allParams = list(scopeFilter["params"]) if extraWhere: fullWhere += " AND " + extraWhere allParams.extend(extraParams) t0 = time.time() try: with self._db.borrowCursor() as cur: if groupBy: sql = ( f'SELECT "{groupBy}" AS "groupValue", {aggregate}("{field}") AS "result" ' f'FROM "{tableName}" WHERE {fullWhere} ' f'GROUP BY "{groupBy}" ORDER BY "result" DESC' ) else: sql = ( f'SELECT {aggregate}("{field}") AS "result" ' f'FROM "{tableName}" WHERE {fullWhere}' ) cur.execute(sql, allParams) rows = [dict(r) for r in cur.fetchall()] result = { "rows": rows, "aggregate": aggregate, "field": field, "groupBy": groupBy, } _debugQueryLog("aggregateTable", tableName, { "aggregate": aggregate, "field": field, "groupBy": groupBy, }, result, (time.time() - t0) * 1000) return result except Exception as e: logger.error(f"aggregateTable({tableName}, {aggregate}({field})) failed: {e}") elapsed = (time.time() - t0) * 1000 errResult = {"rows": [], "error": str(e), "aggregate": aggregate, "field": field, "groupBy": groupBy} _debugQueryLog("aggregateTable", tableName, { "aggregate": aggregate, "field": field, "groupBy": groupBy, }, errResult, elapsed) return errResult def queryTable( self, tableName: str, featureInstanceId: str, mandateId: str, filters: List[Dict[str, Any]] = None, fields: List[str] = None, orderBy: str = None, limit: int = 50, offset: int = 0, extraFilters: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """Query a feature table with optional filters. ``filters`` is a list of ``{"field": "x", "op": "=", "value": "y"}``. ``extraFilters`` are mandatory record-level scoping filters injected by the pipeline. """ _validateTableName(tableName) if fields: invalid = [f for f in fields if not _isValidIdentifier(f)] if invalid: return { "rows": [], "total": 0, "limit": limit, "offset": offset, "error": f"Invalid field name(s): {', '.join(invalid)}. Use getActualColumns to discover valid column names.", } scopeFilter = _buildScopeFilter(tableName, featureInstanceId, mandateId, db=self._db) combinedFilters = list(filters or []) + list(extraFilters or []) extraWhere, extraParams = _buildFilterClauses(combinedFilters if combinedFilters else None) fullWhere = scopeFilter["where"] allParams = list(scopeFilter["params"]) if extraWhere: fullWhere += " AND " + extraWhere allParams.extend(extraParams) t0 = time.time() try: with self._db.borrowCursor() as cur: countSql = f'SELECT COUNT(*) FROM "{tableName}" WHERE {fullWhere}' cur.execute(countSql, allParams) total = cur.fetchone()["count"] if cur.rowcount else 0 selectCols = ", ".join(f'"{f}"' for f in fields) if fields else "*" orderClause = f'ORDER BY "{orderBy}"' if orderBy and _isValidIdentifier(orderBy) else 'ORDER BY "id"' dataSql = ( f'SELECT {selectCols} FROM "{tableName}" ' f'WHERE {fullWhere} {orderClause} LIMIT %s OFFSET %s' ) cur.execute(dataSql, allParams + [limit, offset]) rows = [dict(r) for r in cur.fetchall()] result = {"rows": rows, "total": total, "limit": limit, "offset": offset} _debugQueryLog("queryTable", tableName, { "filters": filters, "fields": fields, "orderBy": orderBy, "limit": limit, "offset": offset, }, result, (time.time() - t0) * 1000) return result except Exception as e: logger.error(f"queryTable({tableName}) failed: {e}") elapsed = (time.time() - t0) * 1000 errResult = {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)} _debugQueryLog("queryTable", tableName, { "filters": filters, "fields": fields, "orderBy": orderBy, "limit": limit, "offset": offset, }, errResult, elapsed) return errResult # ------------------------------------------------------------------ # helpers # ------------------------------------------------------------------ _instanceColCache: Dict[str, str] = {} def _resolveInstanceColumn(tableName: str, db=None) -> str: """Detect whether the table uses ``instanceId`` or ``featureInstanceId``.""" if tableName in _instanceColCache: return _instanceColCache[tableName] if db: try: with db.borrowCursor() as cur: cur.execute( "SELECT column_name FROM information_schema.columns " "WHERE table_schema = 'public' AND LOWER(table_name) = LOWER(%s) " "AND column_name IN ('featureInstanceId', 'instanceId')", [tableName], ) cols = [row["column_name"] for row in cur.fetchall()] if "featureInstanceId" in cols: _instanceColCache[tableName] = "featureInstanceId" return "featureInstanceId" if "instanceId" in cols: _instanceColCache[tableName] = "instanceId" return "instanceId" except Exception: pass return "instanceId" def _validateTableName(tableName: str): if not tableName or not _isValidIdentifier(tableName): raise ValueError(f"Invalid table name: {tableName}") def _isValidIdentifier(name: str) -> bool: """Only allow alphanumeric + underscore to prevent SQL injection.""" return name.isidentifier() def _buildScopeFilter(tableName: str, featureInstanceId: str, mandateId: str, db=None, dbConnection=None) -> Dict[str, Any]: """Build the mandatory WHERE clause that scopes rows to the feature instance. Feature tables use either ``instanceId`` (commcoach, teamsbot) or ``featureInstanceId`` (trustee) as the FK. We detect the actual column from ``information_schema`` when a DB connector is provided. """ instanceCol = _resolveInstanceColumn(tableName, db or dbConnection) conditions = [] params = [] conditions.append(f'"{instanceCol}" = %s') params.append(featureInstanceId) if mandateId: conditions.append('"mandateId" = %s') params.append(mandateId) return {"where": " AND ".join(conditions), "params": params} def _buildFilterClauses(filters: Optional[List[Dict[str, Any]]]) -> tuple: """Convert agent-provided filter dicts into safe SQL.""" if not filters: return "", [] parts = [] params = [] for f in filters: field = f.get("field", "") op = (f.get("op") or "=").upper() value = f.get("value") if not field or not _isValidIdentifier(field): continue if op not in _ALLOWED_OPERATORS: continue if op in ("IS NULL", "IS NOT NULL"): parts.append(f'"{field}" {op}') else: parts.append(f'"{field}" {op} %s') params.append(value) return " AND ".join(parts), params def _serializeRow(row: Dict[str, Any]) -> Dict[str, Any]: """Ensure all values are JSON-serializable.""" for k, v in row.items(): if isinstance(v, (bytes, bytearray)): row[k] = f"" elif hasattr(v, "isoformat"): row[k] = v.isoformat() return row _PLACEHOLDER_PREFIX = "NEUT" def _neutralizeRowFields(row: Dict[str, Any], fieldsToNeutralize: Set[str]) -> Dict[str, Any]: """Replace values in sensitive fields with stable, deterministic placeholders. The placeholder format ``[NEUT..]`` is stable for the same value so that identical values in different rows produce the same token. This allows the AI to reason about equality without seeing the real data. """ for field in fieldsToNeutralize: val = row.get(field) if val is None or val == "": continue shortHash = hashlib.sha256(str(val).encode()).hexdigest()[:8] row[field] = f"[{_PLACEHOLDER_PREFIX}.{field}.{shortHash}]" return row # ------------------------------------------------------------------ # A2: type / inheritance-aware field neutralization for source data # ------------------------------------------------------------------ # # Rules (see wiki neutralization.md Failsafe 5/6): # 1. STRING (incl. JSON/markdown/code -- anything textual): substring-neutralize # via the private NeutralizationService whenever neutralize is EFFECTIVE for the # field (explicit OR inherited). The placeholders stay embedded in the text so the # record remains usable; the field name is passed as a type hint. # 2. BINARY (bytes): never neutralized -- the column is DROPPED when neutralization # applies to the table/field. # 3. OTHER SCALARS (number/float/int/date/bool): neutralized (whole-value placeholder) # ONLY when the field flag is set EXPLICITLY -- never via inheritance. _NEUT_CONCURRENCY = 4 def _isStructuralField(key: str) -> bool: """Identifiers / system columns are references, not PII content -- never neutralized. Excludes primary/foreign keys and audit columns so neutralization never corrupts record references and never wastes an LLM call on a UUID/enum value. """ if key.startswith("_") or key.startswith("sys"): return True if key == "id" or key.endswith("Id") or key.endswith("_id"): return True return key in ("mandateId", "featureInstanceId", "instanceId", "createdBy", "updatedBy") def _isTextValue(value: Any) -> bool: """True for values that should be treated as neutralizable text (str/JSON-ish).""" return isinstance(value, str) or isinstance(value, (dict, list)) async def _neutralizeOneText(fieldName: str, text: str, neutralizer: Any) -> Optional[str]: """Substring-neutralize a single text value, using the field name as a type hint. The hint (``": "``) is prepended so the private LLM can infer the entity type for short PII columns, then stripped from the result. On any prefix mismatch or error the cell is fail-safe redacted (never returns the raw value). """ prefix = f"{fieldName}: " try: result = await neutralizer.processTextAsync(prefix + text) except Exception as e: # noqa: BLE001 - neutralization must fail closed logger.warning("field neutralization failed for '%s': %s", fieldName, e) return "[REDACTED]" out = result.get("neutralized_text") if isinstance(result, dict) else None if not isinstance(out, str): return "[REDACTED]" if out.startswith(prefix): return out[len(prefix):] # Engine altered the hint prefix (rare) -- fail closed rather than leak. logger.warning("field neutralization prefix mismatch for '%s'; redacting", fieldName) return "[REDACTED]" async def _neutralizeAndSerializeRows( rows: List[Dict[str, Any]], policy: Dict[str, Any], neutralizer: Any, ) -> List[Dict[str, Any]]: """Apply the A2 field-neutralization rules to raw rows and JSON-serialize them.""" tableActive = bool(policy.get("tableActive")) explicitFields: Set[str] = set(policy.get("explicitFields") or []) outRows: List[Dict[str, Any]] = [] # (fieldName, originalText) -> neutralizedText (dedup across the whole result set) pending: Dict[tuple, Optional[str]] = {} cellRefs: List[tuple] = [] # (rowIdx, key, fieldName, originalText) for row in rows: out: Dict[str, Any] = {} for key, value in row.items(): fieldExplicit = key in explicitFields fieldEffective = fieldExplicit or tableActive if value is None: out[key] = None continue # Identifiers / system columns: serialize but never neutralize. if _isStructuralField(key): out[key] = value.isoformat() if hasattr(value, "isoformat") else ( f"" if isinstance(value, (bytes, bytearray)) else value ) continue if isinstance(value, (bytes, bytearray)): # Rule 2: binary is dropped when neutralization applies; else legacy marker. if tableActive or fieldExplicit: continue out[key] = f"" continue if _isTextValue(value): textVal = value if isinstance(value, str) else json.dumps(value, ensure_ascii=False, default=str) if fieldEffective and textVal != "": pending.setdefault((key, textVal), None) cellRefs.append((len(outRows), key, key, textVal)) out[key] = textVal continue # Rule 3: other scalars (number/float/int/date/bool) -- explicit only. serialized = value.isoformat() if hasattr(value, "isoformat") else value if fieldExplicit: shortHash = hashlib.sha256(str(value).encode()).hexdigest()[:8] out[key] = f"[{_PLACEHOLDER_PREFIX}.{key}.{shortHash}]" else: out[key] = serialized outRows.append(out) if not cellRefs: return outRows if neutralizer is None or not hasattr(neutralizer, "processTextAsync"): # Fail-safe: neutralization required but no engine -> redact the affected cells. for rowIdx, key, _fieldName, _origText in cellRefs: outRows[rowIdx][key] = "[REDACTED]" return outRows sem = asyncio.Semaphore(_NEUT_CONCURRENCY) async def _resolvePair(fieldName: str, origText: str) -> None: async with sem: pending[(fieldName, origText)] = await _neutralizeOneText(fieldName, origText, neutralizer) await asyncio.gather(*[ _resolvePair(fieldName, origText) for (fieldName, origText) in pending.keys() ]) for rowIdx, key, fieldName, origText in cellRefs: neutralized = pending.get((fieldName, origText)) if neutralized is not None: outRows[rowIdx][key] = neutralized return outRows