183 lines
7.5 KiB
Python
183 lines
7.5 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
PDF in-place neutralization using PyMuPDF.
|
|
Removes original text completely and inserts full UUID placeholders.
|
|
PyMuPDF uses insert_textbox which wraps long placeholders to preserve layout.
|
|
|
|
NOTE: PyMuPDF search_for() matches substrings (e.g. "CH" matches inside "Zurich",
|
|
"CHE-115...", ".ch"). We skip short/ambiguous keys to avoid false redactions.
|
|
"""
|
|
|
|
import io
|
|
import logging
|
|
from typing import Dict, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Minimum length for PDF search - shorter keys cause substring false positives
|
|
_MIN_SEARCH_LENGTH = 5
|
|
|
|
# Keys we never search for in PDF (substrings of many common words)
|
|
_PDF_SEARCH_BLOCKLIST = frozenset({
|
|
"CH", "DE", "FR", "IT", # Country codes - match in Zurich, CHF, Deutschland, etc.
|
|
"Nr", "Nr.", "Nr:", "No", "No.", "No:", # Abbreviations - match in Pol-Nr., Policy No., etc.
|
|
"www", ".ch", ".com", ".org", ".net", # Domain parts - match in URLs
|
|
})
|
|
|
|
|
|
def neutralize_pdf_in_place(
|
|
pdf_bytes: bytes,
|
|
mapping: Dict[str, str],
|
|
) -> Optional[bytes]:
|
|
"""
|
|
Remove sensitive text and replace with UUID placeholders in-place.
|
|
Content is fully removed (not just covered) so it cannot be copied.
|
|
|
|
Args:
|
|
pdf_bytes: Original PDF file content
|
|
mapping: Dict of original_text -> placeholder (e.g. [address.uuid])
|
|
|
|
Returns:
|
|
Modified PDF bytes, or None on failure
|
|
"""
|
|
if not mapping:
|
|
return pdf_bytes
|
|
|
|
try:
|
|
import fitz # PyMuPDF
|
|
except ImportError:
|
|
logger.warning("PyMuPDF (fitz) not available for PDF in-place neutralization")
|
|
return None
|
|
|
|
try:
|
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
|
except Exception as e:
|
|
logger.error(f"Failed to open PDF: {e}")
|
|
return None
|
|
|
|
sorted_items = sorted(mapping.items(), key=lambda x: -len(x[0]))
|
|
fill_color = (1, 1, 1)
|
|
text_color = (0, 0, 0)
|
|
fontname = "helv"
|
|
fontsize = 8
|
|
|
|
try:
|
|
font = fitz.Font(fontname)
|
|
for page_num in range(len(doc)):
|
|
page = doc[page_num]
|
|
name_inserts = []
|
|
address_inserts = []
|
|
phone_inserts = []
|
|
policy_inserts = []
|
|
date_inserts = []
|
|
ssn_inserts = [] # SSN/UID: CHE-115.665.634, long placeholder doesn't fit
|
|
|
|
for original_text, placeholder in sorted_items:
|
|
if not original_text or not placeholder:
|
|
continue
|
|
# Skip keys that cause substring false positives (PyMuPDF search_for matches substrings)
|
|
if len(original_text) < _MIN_SEARCH_LENGTH:
|
|
logger.debug("Skipping PDF search for short key %r (would match substrings)", original_text[:20])
|
|
continue
|
|
if original_text.strip() in _PDF_SEARCH_BLOCKLIST:
|
|
logger.debug("Skipping PDF search for blocklisted key %r", original_text)
|
|
continue
|
|
|
|
search_text = original_text
|
|
insert_text = placeholder
|
|
is_name = placeholder.startswith("[name.")
|
|
is_address = placeholder.startswith("[address.")
|
|
is_phone = placeholder.startswith("[phone.")
|
|
is_policy = placeholder.startswith("[policy.")
|
|
is_date = placeholder.startswith("[date.")
|
|
is_ssn = placeholder.startswith("[ssn.")
|
|
if is_policy:
|
|
for prefix in ("Police Nr. ", "Police Nr.: ", "Polizzenr. ", "Policy no. ", "Policy No. "):
|
|
candidate = prefix + original_text
|
|
try:
|
|
hits = page.search_for(candidate, quads=False)
|
|
if hits:
|
|
search_text = candidate
|
|
break
|
|
except Exception:
|
|
continue
|
|
elif is_ssn and any(original_text.startswith(p) for p in ("CHE-", "DE-", "FR-", "IT-")):
|
|
# UID/company ID: try "UID-Nr. CHE-..." or "UID-Nr.: " for wider rect
|
|
for prefix in ("UID-Nr. ", "UID-Nr.: ", "UID No. ", "UID: ", "UID-Nummer: "):
|
|
candidate = prefix + original_text
|
|
try:
|
|
hits = page.search_for(candidate, quads=False)
|
|
if hits:
|
|
search_text = candidate
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
try:
|
|
instances = page.search_for(search_text, quads=False)
|
|
except Exception:
|
|
instances = []
|
|
|
|
for rect in instances:
|
|
try:
|
|
if is_name or is_address or is_phone or is_policy or is_date or is_ssn:
|
|
page.add_redact_annot(rect, fill=fill_color)
|
|
if is_name:
|
|
name_inserts.append((rect, insert_text))
|
|
elif is_address:
|
|
address_inserts.append((rect, insert_text))
|
|
elif is_phone:
|
|
phone_inserts.append((rect, insert_text))
|
|
elif is_policy:
|
|
policy_inserts.append((rect, insert_text))
|
|
elif is_date:
|
|
date_inserts.append((rect, insert_text))
|
|
else:
|
|
ssn_inserts.append((rect, insert_text))
|
|
else:
|
|
page.add_redact_annot(
|
|
rect,
|
|
text=insert_text,
|
|
fill=fill_color,
|
|
text_color=text_color,
|
|
fontname=fontname,
|
|
fontsize=fontsize,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Redact failed for {original_text[:40]!r}: {e}")
|
|
|
|
try:
|
|
page.apply_redactions()
|
|
except Exception as e:
|
|
logger.debug(f"apply_redactions page {page_num + 1}: {e}")
|
|
|
|
# Insert placeholders with font size fitted to rect (avoids PyMuPDF shrinking to nothing)
|
|
for rect, text in name_inserts + address_inserts + phone_inserts + policy_inserts + date_inserts + ssn_inserts:
|
|
try:
|
|
tl = font.text_length(text, fontsize=1)
|
|
fs = max(3, min(fontsize, rect.width / tl)) if tl > 0 else 4
|
|
rc = page.insert_textbox(
|
|
rect, text, fontname=fontname, fontsize=fs,
|
|
align=0, color=text_color
|
|
)
|
|
if rc < 0:
|
|
page.insert_textbox(
|
|
rect, text, fontname=fontname, fontsize=2,
|
|
align=0, color=text_color
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Insert placeholder failed: {e}")
|
|
|
|
buf = io.BytesIO()
|
|
doc.save(buf, garbage=4, deflate=True)
|
|
doc.close()
|
|
return buf.getvalue()
|
|
|
|
except Exception as e:
|
|
logger.error(f"PDF in-place neutralization failed: {e}", exc_info=True)
|
|
try:
|
|
doc.close()
|
|
except Exception:
|
|
pass
|
|
return None
|