241 lines
10 KiB
Python
241 lines
10 KiB
Python
import json
|
|
import os
|
|
from typing import Any, Dict, List, Set
|
|
from datetime import datetime, UTC
|
|
|
|
|
|
class NormalizationService:
|
|
"""
|
|
Produces a single canonical table in merged JSON using an AI-provided header mapping
|
|
and deterministic, in-code value normalization. No language heuristics in code.
|
|
"""
|
|
|
|
def __init__(self, services):
|
|
self.services = services
|
|
|
|
# Public API
|
|
def discoverStructures(self, mergedJson: Dict[str, Any]) -> Dict[str, Any]:
|
|
headers: Set[str] = set()
|
|
samples: Dict[str, List[str]] = {}
|
|
|
|
sections = mergedJson.get("sections", []) if isinstance(mergedJson, dict) else []
|
|
for section in sections:
|
|
if not isinstance(section, dict):
|
|
continue
|
|
|
|
# Use only the fundamental agreed JSON structure: content_type/elements
|
|
if section.get("content_type") != "table":
|
|
continue
|
|
|
|
# Extract table data from elements array
|
|
hdrs = []
|
|
rows = []
|
|
for element in section.get("elements", []):
|
|
if isinstance(element, dict) and "headers" in element and "rows" in element:
|
|
hdrs = element.get("headers") or []
|
|
rows = element.get("rows") or []
|
|
break
|
|
|
|
if not hdrs or not rows:
|
|
continue
|
|
|
|
for h in hdrs:
|
|
if not isinstance(h, str):
|
|
continue
|
|
headers.add(h)
|
|
# collect small value samples by column index
|
|
for row in rows[:5]:
|
|
if not isinstance(row, list):
|
|
continue
|
|
for i, value in enumerate(row):
|
|
headerName = hdrs[i] if i < len(hdrs) else f"col_{i}"
|
|
if headerName not in samples:
|
|
samples[headerName] = []
|
|
if len(samples[headerName]) < 5:
|
|
samples[headerName].append(str(value))
|
|
|
|
return {
|
|
"tableHeaders": sorted(list(headers)),
|
|
"headerSamples": samples,
|
|
}
|
|
|
|
async def requestHeaderMapping(self, inventory: Dict[str, Any], cacheKey: str, canonicalSpec: Dict[str, Any] | None = None, mergePrompt: str | None = None) -> Dict[str, Any]:
|
|
|
|
# Allow caller to specify any canonical schema. If none provided, default to discovered headers.
|
|
if canonicalSpec is None:
|
|
canonicalSpec = {
|
|
"canonicalHeaders": inventory.get("tableHeaders", []),
|
|
"constraints": {}
|
|
}
|
|
|
|
# Protect merge prompt context by wrapping in single quotes and escaping internal quotes
|
|
protectedMerge = None
|
|
if mergePrompt:
|
|
try:
|
|
protectedMerge = str(mergePrompt).replace("'", "\\'")
|
|
except Exception:
|
|
protectedMerge = str(mergePrompt)
|
|
|
|
prompt = (
|
|
"You are a mapping generator. Return ONLY JSON.\n\n"
|
|
"Given discovered headers and sample values, map them to the canonical headers.\n"
|
|
"Do not invent fields. Use null if no mapping. Provide normalization policy.\n\n"
|
|
f"CANONICAL_SPEC:\n{json.dumps(canonicalSpec, ensure_ascii=False, indent=2)}\n\n"
|
|
f"HEADERS_DISCOVERED:\n{json.dumps(inventory, ensure_ascii=False, indent=2)}\n\n"
|
|
+ (f"MERGE_PROMPT_CONTEXT (protected):\n'{protectedMerge}'\n\n" if protectedMerge is not None else "") +
|
|
"REPLY JSON SHAPE:\n(Example)\n"
|
|
"{\n \"mappings\": {\"<sourceHeader>\": \"<Canonical>|null\"},\n"
|
|
" \"normalizationPolicy\": {\n \"TotalAmount\": {\"decimalSeparator\": \",\"|\".\"},\n"
|
|
" \"Currency\": {\"stripSymbols\": true},\n"
|
|
" \"Date\": {\"formats\": [\"DD.MM.YYYY\",\"YYYY-MM-DD\"]}\n }\n}\n"
|
|
)
|
|
|
|
response = await self.services.ai.callAiPlanning(prompt=prompt, placeholders=None)
|
|
if not response:
|
|
return {"mapping": {}, "normalizationPolicy": {}}
|
|
|
|
# Extract JSON from response more safely
|
|
start_idx = response.find('{')
|
|
end_idx = response.rfind('}')
|
|
if start_idx == -1 or end_idx == -1 or start_idx >= end_idx:
|
|
return {"mapping": {}, "normalizationPolicy": {}}
|
|
|
|
js = response[start_idx:end_idx + 1]
|
|
try:
|
|
mapping = json.loads(js)
|
|
except json.JSONDecodeError:
|
|
return {"mapping": {}, "normalizationPolicy": {}}
|
|
# Normalize key naming from AI: prefer single key "mapping"
|
|
if "mapping" not in mapping and "mappings" in mapping and isinstance(mapping["mappings"], dict):
|
|
mapping["mapping"] = mapping["mappings"]
|
|
try:
|
|
del mapping["mappings"]
|
|
except Exception:
|
|
pass
|
|
# Ensure canonicalHeaders present in mapping for downstream use
|
|
if "canonicalHeaders" not in mapping:
|
|
mapping["canonicalHeaders"] = canonicalSpec.get("canonicalHeaders", [])
|
|
|
|
# debug artifact (now routed via writeDebugFile)
|
|
self.services.utils.writeDebugArtifact("mapping.json", mapping)
|
|
return mapping
|
|
|
|
def applyMapping(self, mergedJson: Dict[str, Any], mappingSpec: Dict[str, Any]) -> Dict[str, Any]:
|
|
mappings = (mappingSpec or {}).get("mapping", {})
|
|
policy = (mappingSpec or {}).get("normalizationPolicy", {})
|
|
|
|
# Prefer headers provided by mapping (generic across domains)
|
|
canonicalHeaders = (mappingSpec or {}).get("canonicalHeaders") or []
|
|
if not canonicalHeaders:
|
|
# Fallback to union of mapped targets
|
|
canonicalHeaders = sorted(list({t for t in mappings.values() if t}))
|
|
|
|
rows: List[List[str]] = []
|
|
sections = mergedJson.get("sections", []) if isinstance(mergedJson, dict) else []
|
|
for section in sections:
|
|
# Use only the fundamental agreed JSON structure: content_type/elements
|
|
if section.get("content_type") != "table":
|
|
continue
|
|
|
|
# Extract table data from elements array
|
|
sourceHeaders = []
|
|
sourceRows = []
|
|
for element in section.get("elements", []):
|
|
if isinstance(element, dict) and "headers" in element and "rows" in element:
|
|
sourceHeaders = element.get("headers") or []
|
|
sourceRows = element.get("rows") or []
|
|
break
|
|
|
|
if not sourceHeaders or not sourceRows:
|
|
continue
|
|
|
|
# Build index map: canonical -> source index or None
|
|
indexMap: Dict[str, int] = {}
|
|
for ci, ch in enumerate(canonicalHeaders):
|
|
srcIndex = None
|
|
for si, sh in enumerate(sourceHeaders):
|
|
# Prefer explicit mapping target; fallback to identity when names match
|
|
target = mappings.get(sh)
|
|
if target is None and sh == ch:
|
|
target = ch
|
|
if target == ch:
|
|
srcIndex = si
|
|
break
|
|
indexMap[ch] = srcIndex
|
|
|
|
# Transform rows
|
|
for r in sourceRows:
|
|
canonicalRow: List[str] = []
|
|
for ch in canonicalHeaders:
|
|
idx = indexMap.get(ch)
|
|
try:
|
|
value = r[idx] if (idx is not None and idx < len(r)) else ""
|
|
except (IndexError, KeyError) as e:
|
|
# Handle corrupted data gracefully
|
|
value = ""
|
|
canonicalRow.append(self._normalizeValue(ch, value, policy))
|
|
# consider as row if at least one non-empty meaningful field
|
|
if any(v.strip() for v in canonicalRow):
|
|
rows.append(canonicalRow)
|
|
|
|
canonical = {
|
|
"metadata": {
|
|
"title": mergedJson.get("metadata", {}).get("title", "Merged Document"),
|
|
"source_documents": mergedJson.get("metadata", {}).get("source_documents", [])
|
|
},
|
|
"sections": [
|
|
{
|
|
"id": "canonical_table_1",
|
|
"content_type": "table",
|
|
"elements": [
|
|
{
|
|
"headers": canonicalHeaders,
|
|
"rows": rows
|
|
}
|
|
],
|
|
"order": 1
|
|
}
|
|
]
|
|
}
|
|
|
|
# debug artifact (now routed via writeDebugFile)
|
|
self.services.utils.writeDebugArtifact("canonical_merged.json", canonical)
|
|
return canonical
|
|
|
|
def validateCanonical(self, canonicalJson: Dict[str, Any]) -> Dict[str, Any]:
|
|
rows = []
|
|
try:
|
|
sections = canonicalJson.get("sections", [])
|
|
for s in sections:
|
|
if s.get("content_type") == "table":
|
|
# Extract rows from elements array
|
|
for element in s.get("elements", []):
|
|
if isinstance(element, dict) and "rows" in element:
|
|
rows.extend(element.get("rows", []))
|
|
except Exception:
|
|
rows = []
|
|
report = {
|
|
"rowCount": len(rows),
|
|
"success": len(rows) > 0
|
|
}
|
|
self.services.utils.writeDebugArtifact("normalization_report.json", report)
|
|
return report
|
|
|
|
# Internal helpers
|
|
def _normalizeValue(self, canonicalHeader: str, value: Any, policy: Dict[str, Any]) -> str:
|
|
if value is None:
|
|
return ""
|
|
text = str(value).strip()
|
|
# Generic normalization guided by policy; avoid domain specifics
|
|
if canonicalHeader in (policy.get("numericFields", []) or []):
|
|
dec = ((policy.get(canonicalHeader) or {}).get("decimalSeparator")
|
|
or (policy.get("numeric") or {}).get("decimalSeparator")
|
|
or ".")
|
|
if dec == ",":
|
|
text = text.replace(".", "").replace(",", ".") if "," in text else text
|
|
text = ''.join(ch for ch in text if ch.isdigit() or ch in ['.', '-', '+'])
|
|
elif (policy.get("text") or {}).get("stripSymbols") and canonicalHeader in (policy.get("text", {}).get("applyTo", []) or []):
|
|
text = ''.join(ch for ch in text if ch.isalpha())
|
|
text = text.upper()
|
|
return text
|
|
|