78 lines
2.3 KiB
Python
78 lines
2.3 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""Resolve effective policies (neutralize, ragIndexEnabled) for DataSource tree hierarchies.
|
|
|
|
Tree-inheritance rule: nearest ancestor DataSource with an explicit value wins.
|
|
If no ancestor has a value, the default (False) is used.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def resolveEffectiveNeutralize(
|
|
ds: Dict[str, Any],
|
|
allDataSources: List[Dict[str, Any]],
|
|
) -> bool:
|
|
"""Compute effective neutralize by walking up the path tree.
|
|
|
|
A DataSource at /sites/HR/Documents inherits from /sites/HR if
|
|
that ancestor has neutralize=True and the child has no explicit override.
|
|
"""
|
|
ownValue = ds.get("neutralize")
|
|
if ownValue is not None and ownValue is not False:
|
|
return True
|
|
if ownValue is False:
|
|
return False
|
|
return _findAncestorPolicy(ds, allDataSources, "neutralize")
|
|
|
|
|
|
def resolveEffectiveRagIndexEnabled(
|
|
ds: Dict[str, Any],
|
|
allDataSources: List[Dict[str, Any]],
|
|
) -> bool:
|
|
"""Compute effective ragIndexEnabled by walking up the path tree."""
|
|
ownValue = ds.get("ragIndexEnabled")
|
|
if ownValue is True:
|
|
return True
|
|
if ownValue is False:
|
|
return False
|
|
return _findAncestorPolicy(ds, allDataSources, "ragIndexEnabled")
|
|
|
|
|
|
def _findAncestorPolicy(
|
|
ds: Dict[str, Any],
|
|
allDataSources: List[Dict[str, Any]],
|
|
field: str,
|
|
) -> bool:
|
|
"""Walk ancestors (longest-prefix match) to find an inherited policy value."""
|
|
dsPath = ds.get("path", "")
|
|
connectionId = ds.get("connectionId", "")
|
|
if not dsPath:
|
|
return False
|
|
|
|
ancestors = []
|
|
for candidate in allDataSources:
|
|
if candidate.get("id") == ds.get("id"):
|
|
continue
|
|
if candidate.get("connectionId") != connectionId:
|
|
continue
|
|
candidatePath = candidate.get("path", "")
|
|
if not candidatePath:
|
|
continue
|
|
if dsPath.startswith(candidatePath) and len(candidatePath) < len(dsPath):
|
|
ancestors.append(candidate)
|
|
|
|
ancestors.sort(key=lambda a: len(a.get("path", "")), reverse=True)
|
|
|
|
for ancestor in ancestors:
|
|
val = ancestor.get(field)
|
|
if val is True:
|
|
return True
|
|
if val is False:
|
|
return False
|
|
return False
|