platform-core/modules/serviceCenter/services/serviceAgent/ontologyToPromptCompiler.py
2026-05-16 22:55:43 +02:00

140 lines
4.8 KiB
Python

# Copyright (c) 2026 Patrick Motsch
# All rights reserved.
"""Deterministic compiler: OntologyDescriptor -> sub-agent prompt block.
Phase 2 replaces a feature's hand-written ``_AGENT_DOMAIN_HINTS`` text
with a structured :class:`OntologyDescriptor`. This compiler renders the
descriptor into a stable, terse Markdown-ish block that the sub-agent
appends to its system prompt -- the same source of truth the
:class:`QueryValidator` consults.
The output is intentionally:
* short (every token costs every call)
* deterministic (no f-string ordering bugs, no Python dict iteration)
* free of internal jargon ('canonicalQueryPattern' is rendered as
'CANONICAL PATTERN' for the LLM)
"""
from __future__ import annotations
from typing import Iterable, List
from modules.serviceCenter.services.serviceAgent.datamodelOntology import (
CanonicalQueryPattern,
Constraint,
ConstraintRule,
Entity,
OntologyDescriptor,
Relation,
)
def compileOntologyToPrompt(ontology: OntologyDescriptor) -> str:
"""Render *ontology* into a sub-agent prompt block.
The output starts with a stable marker line (``DOMAIN ONTOLOGY (...)``)
so downstream tooling can find/replace it deterministically.
"""
lines: List[str] = []
lines.append(f"DOMAIN ONTOLOGY ({ontology.featureCode}):")
lines.append("")
lines.extend(_renderEntities(ontology.entities))
relationLines = _renderRelations(ontology.relations)
if relationLines:
lines.append("")
lines.extend(relationLines)
constraintLines = _renderConstraints(ontology.constraints)
if constraintLines:
lines.append("")
lines.extend(constraintLines)
patternLines = _renderPatterns(ontology.canonicalPatterns)
if patternLines:
lines.append("")
lines.extend(patternLines)
return "\n".join(lines).rstrip() + "\n"
def _renderEntities(entities: Iterable[Entity]) -> List[str]:
out: List[str] = ["ENTITIES:"]
for e in entities:
head = f"- {e.name}"
if e.parentEntity:
head += f" (specializes {e.parentEntity})"
if e.pythonClass:
head += f" [table: {e.pythonClass}]"
out.append(head)
if e.description:
out.append(f" {e.description}")
for inv in e.invariants:
out.append(f" * {inv.description}")
return out
def _renderRelations(relations: Iterable[Relation]) -> List[str]:
rels = list(relations)
if not rels:
return []
out: List[str] = ["RELATIONS:"]
for r in rels:
line = f"- {r.fromEntity} -> {r.toEntity} ({r.cardinality.value}"
if r.via:
line += f" via {r.via}"
line += ")"
out.append(line)
return out
def _renderConstraints(constraints: Iterable[Constraint]) -> List[str]:
cons = list(constraints)
if not cons:
return []
out: List[str] = ["CONSTRAINTS (validator-enforced):"]
for c in cons:
rule = _ruleLabel(c.rule)
line = f"- {rule} on {c.appliesTo}: {c.message}"
params = c.params or {}
required = params.get("requiredFields")
if isinstance(required, list) and required:
line += f" (required filters: {', '.join(required)})"
intents = params.get("intents")
if isinstance(intents, list) and intents:
line += f" (intents: {', '.join(intents)})"
out.append(line)
return out
def _ruleLabel(rule: ConstraintRule) -> str:
return rule.value.replace("_", " ").lower()
def _renderPatterns(patterns: Iterable[CanonicalQueryPattern]) -> List[str]:
pats = list(patterns)
if not pats:
return []
out: List[str] = ["CANONICAL QUERY PATTERNS (mimic these tool calls):"]
for i, p in enumerate(pats, start=1):
out.append(f"{i}) intent={p.intent}: {p.description}")
out.append(f" call: {_renderPatternCall(p.pattern)}")
extra = p.pattern.get("_postProcessing") if isinstance(p.pattern, dict) else None
if isinstance(extra, str):
out.append(f" note: {extra}")
return out
def _renderPatternCall(pattern: dict) -> str:
"""Render the pattern as a compact one-line tool call signature."""
tool = pattern.get("tool", "?")
parts: List[str] = []
for key in ("tableName", "aggregate", "field", "groupBy", "orderBy"):
if key in pattern and pattern[key] is not None and not str(key).startswith("_"):
parts.append(f"{key}={pattern[key]!r}")
if "fields" in pattern and pattern["fields"]:
parts.append(f"fields={pattern['fields']}")
if "filters" in pattern and pattern["filters"]:
compact = ", ".join(
f"{f.get('field')}{f.get('op','=')}{f.get('value')!r}"
for f in pattern["filters"]
if isinstance(f, dict)
)
parts.append(f"filters=[{compact}]")
return f"{tool}({', '.join(parts)})"