gateway/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py
ValueOn AG 48c0f900af rag
2026-05-12 15:19:01 +02:00

78 lines
2.3 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Resolve effective policies (neutralize, ragIndexEnabled) for DataSource tree hierarchies.
Tree-inheritance rule: nearest ancestor DataSource with an explicit value wins.
If no ancestor has a value, the default (False) is used.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
def resolveEffectiveNeutralize(
ds: Dict[str, Any],
allDataSources: List[Dict[str, Any]],
) -> bool:
"""Compute effective neutralize by walking up the path tree.
A DataSource at /sites/HR/Documents inherits from /sites/HR if
that ancestor has neutralize=True and the child has no explicit override.
"""
ownValue = ds.get("neutralize")
if ownValue is not None and ownValue is not False:
return True
if ownValue is False:
return False
return _findAncestorPolicy(ds, allDataSources, "neutralize")
def resolveEffectiveRagIndexEnabled(
ds: Dict[str, Any],
allDataSources: List[Dict[str, Any]],
) -> bool:
"""Compute effective ragIndexEnabled by walking up the path tree."""
ownValue = ds.get("ragIndexEnabled")
if ownValue is True:
return True
if ownValue is False:
return False
return _findAncestorPolicy(ds, allDataSources, "ragIndexEnabled")
def _findAncestorPolicy(
ds: Dict[str, Any],
allDataSources: List[Dict[str, Any]],
field: str,
) -> bool:
"""Walk ancestors (longest-prefix match) to find an inherited policy value."""
dsPath = ds.get("path", "")
connectionId = ds.get("connectionId", "")
if not dsPath:
return False
ancestors = []
for candidate in allDataSources:
if candidate.get("id") == ds.get("id"):
continue
if candidate.get("connectionId") != connectionId:
continue
candidatePath = candidate.get("path", "")
if not candidatePath:
continue
if dsPath.startswith(candidatePath) and len(candidatePath) < len(dsPath):
ancestors.append(candidate)
ancestors.sort(key=lambda a: len(a.get("path", "")), reverse=True)
for ancestor in ancestors:
val = ancestor.get(field)
if val is True:
return True
if val is False:
return False
return False