gateway/modules/features/neutralization/serviceNeutralization/subProcessPdfInPlace.py

192 lines
8 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
PDF in-place neutralization using PyMuPDF.
Removes original text completely and inserts full UUID placeholders.
PyMuPDF uses insert_textbox which wraps long placeholders to preserve layout.
NOTE: PyMuPDF search_for() matches substrings (e.g. "CH" matches inside "Zurich",
"CHE-115...", ".ch"). We skip short/ambiguous keys to avoid false redactions.
"""
import io
import logging
from typing import Dict, Optional
logger = logging.getLogger(__name__)
# Minimum length for PDF search - shorter keys cause substring false positives
_MIN_SEARCH_LENGTH = 5
# Keys we never search for in PDF (substrings of many common words)
_PDF_SEARCH_BLOCKLIST = frozenset({
"CH", "DE", "FR", "IT", # Country codes - match in Zurich, CHF, Deutschland, etc.
"Nr", "Nr.", "Nr:", "No", "No.", "No:", # Abbreviations - match in Pol-Nr., Policy No., etc.
"www", ".ch", ".com", ".org", ".net", # Domain parts - match in URLs
})
def neutralize_pdf_in_place(
pdf_bytes: bytes,
mapping: Dict[str, str],
) -> Optional[bytes]:
"""
Remove sensitive text and replace with UUID placeholders in-place.
Content is fully removed (not just covered) so it cannot be copied.
Args:
pdf_bytes: Original PDF file content
mapping: Dict of original_text -> placeholder (e.g. [address.uuid])
Returns:
Modified PDF bytes, or None on failure
"""
if not mapping:
return pdf_bytes
try:
import fitz # PyMuPDF
except ImportError:
logger.warning("PyMuPDF (fitz) not available for PDF in-place neutralization")
return None
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
except Exception as e:
logger.error(f"Failed to open PDF: {e}")
return None
# For same placeholder: only search longest original_text to avoid triple overlay
# (e.g. "Ida Dittrich", "Ida", "Dittrich" all map to [name.x] → only search "Ida Dittrich")
placeholder_to_longest: Dict[str, str] = {}
for orig, ph in mapping.items():
if not orig or not ph:
continue
if ph not in placeholder_to_longest or len(orig) > len(placeholder_to_longest[ph]):
placeholder_to_longest[ph] = orig
filtered = [(orig, ph) for ph, orig in placeholder_to_longest.items()]
sorted_items = sorted(filtered, key=lambda x: -len(x[0]))
fill_color = (1, 1, 1)
text_color = (0, 0, 0)
fontname = "helv"
fontsize = 8
try:
font = fitz.Font(fontname)
for page_num in range(len(doc)):
page = doc[page_num]
name_inserts = []
address_inserts = []
phone_inserts = []
policy_inserts = []
date_inserts = []
ssn_inserts = [] # SSN/UID: CHE-115.665.634, long placeholder doesn't fit
for original_text, placeholder in sorted_items:
if not original_text or not placeholder:
continue
# Skip keys that cause substring false positives (PyMuPDF search_for matches substrings)
if len(original_text) < _MIN_SEARCH_LENGTH:
logger.debug("Skipping PDF search for short key %r (would match substrings)", original_text[:20])
continue
if original_text.strip() in _PDF_SEARCH_BLOCKLIST:
logger.debug("Skipping PDF search for blocklisted key %r", original_text)
continue
search_text = original_text
insert_text = placeholder
is_name = placeholder.startswith("[name.")
is_address = placeholder.startswith("[address.")
is_phone = placeholder.startswith("[phone.")
is_policy = placeholder.startswith("[policy.")
is_date = placeholder.startswith("[date.")
is_ssn = placeholder.startswith("[ssn.")
if is_policy:
for prefix in ("Police Nr. ", "Police Nr.: ", "Polizzenr. ", "Policy no. ", "Policy No. "):
candidate = prefix + original_text
try:
hits = page.search_for(candidate, quads=False)
if hits:
search_text = candidate
break
except Exception:
continue
elif is_ssn and any(original_text.startswith(p) for p in ("CHE-", "DE-", "FR-", "IT-")):
# UID/company ID: try "UID-Nr. CHE-..." or "UID-Nr.: " for wider rect
for prefix in ("UID-Nr. ", "UID-Nr.: ", "UID No. ", "UID: ", "UID-Nummer: "):
candidate = prefix + original_text
try:
hits = page.search_for(candidate, quads=False)
if hits:
search_text = candidate
break
except Exception:
continue
try:
instances = page.search_for(search_text, quads=False)
except Exception:
instances = []
for rect in instances:
try:
if is_name or is_address or is_phone or is_policy or is_date or is_ssn:
page.add_redact_annot(rect, fill=fill_color)
if is_name:
name_inserts.append((rect, insert_text))
elif is_address:
address_inserts.append((rect, insert_text))
elif is_phone:
phone_inserts.append((rect, insert_text))
elif is_policy:
policy_inserts.append((rect, insert_text))
elif is_date:
date_inserts.append((rect, insert_text))
else:
ssn_inserts.append((rect, insert_text))
else:
page.add_redact_annot(
rect,
text=insert_text,
fill=fill_color,
text_color=text_color,
fontname=fontname,
fontsize=fontsize,
)
except Exception as e:
logger.warning(f"Redact failed for {original_text[:40]!r}: {e}")
try:
page.apply_redactions()
except Exception as e:
logger.debug(f"apply_redactions page {page_num + 1}: {e}")
# Insert placeholders with font size fitted to rect (avoids PyMuPDF shrinking to nothing)
for rect, text in name_inserts + address_inserts + phone_inserts + policy_inserts + date_inserts + ssn_inserts:
try:
tl = font.text_length(text, fontsize=1)
fs = max(3, min(fontsize, rect.width / tl)) if tl > 0 else 4
rc = page.insert_textbox(
rect, text, fontname=fontname, fontsize=fs,
align=0, color=text_color
)
if rc < 0:
page.insert_textbox(
rect, text, fontname=fontname, fontsize=2,
align=0, color=text_color
)
except Exception as e:
logger.warning(f"Insert placeholder failed: {e}")
buf = io.BytesIO()
doc.save(buf, garbage=4, deflate=True)
doc.close()
return buf.getvalue()
except Exception as e:
logger.error(f"PDF in-place neutralization failed: {e}", exc_info=True)
try:
doc.close()
except Exception:
pass
return None