gateway/modules/features/realEstate/bzoExtractionLangGraph.py

738 lines
27 KiB
Python

"""
LangGraph-based pipeline for extracting structured content from BZO PDFs.
"""
import logging
import re
from typing import TypedDict, List, Dict, Any, Optional
from dataclasses import dataclass
from langgraph.graph import StateGraph, START, END
from modules.features.realEstate.bzoPdfExtractor import BZOPdfExtractor, TextBlock
from modules.features.realEstate.bzoRuleTaxonomy import RULE_TAXONOMY
logger = logging.getLogger(__name__)
# ===== State Definition =====
@dataclass
class ClassifiedBlock:
"""Classified text block."""
block: TextBlock
block_type: str # "article", "heading", "table", "other"
article_label: Optional[str] = None
article_title: Optional[str] = None
@dataclass
class Article:
"""Assembled article."""
article_label: str
article_title: Optional[str]
text: str
page_start: int
page_end: int
section_level_1: Optional[str] = None
section_level_2: Optional[str] = None
section_level_3: Optional[str] = None
zone_raw: Optional[str] = None
@dataclass
class ZoneInfo:
"""Zone information."""
zone_code: str
zone_name: str
zone_category: Optional[str] = None
zone_subcategory: Optional[str] = None
empfindlichkeitsstufe: Optional[str] = None
geschosszahl: Optional[int] = None
gewerbeerleichterung: bool = False
@dataclass
class RuleCandidate:
"""Rule candidate from pattern matching."""
rule_type: str
matched_text: str
article_text: str
page: int
is_table_rule: bool = False
table_zones: List[str] = None
condition_text: Optional[str] = None
@dataclass
class ParsedRule:
"""Parsed rule with structured values."""
rule_type: str
value_numeric: Optional[float]
value_text: str
unit: Optional[str]
condition_text: Optional[str]
is_table_rule: bool
table_zones: List[str]
page: int
text_snippet: str
zone_raw: Optional[str] = None
rule_scope: str = "general"
confidence: float = 0.5
class BZOExtractionState(TypedDict):
"""State for BZO extraction pipeline."""
# Input metadata
dokument_id: Optional[str]
pdf_id: str
# Extracted text blocks (stored as dicts for serialization)
text_blocks: List[Dict[str, Any]]
# Classified blocks (stored as dicts for serialization)
classified_blocks: List[Dict[str, Any]]
# Assembled articles (stored as dicts for serialization)
articles: List[Dict[str, Any]]
# Zone tracking
current_zones: Dict[str, Dict[str, Any]]
zones: List[Dict[str, Any]]
# Rule extraction (stored as dicts for serialization)
rule_candidates: List[Dict[str, Any]]
parsed_rules: List[Dict[str, Any]]
# Processing metadata
errors: List[str]
warnings: List[str]
# ===== Node Implementations =====
def extract_pdf_text(state: BZOExtractionState) -> BZOExtractionState:
"""Extract text blocks from PDF."""
try:
# PDF bytes should be passed in state context
# This is handled in run_extraction function
# State already has text_blocks populated
return state
except Exception as e:
logger.error(f"Error extracting PDF text: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"PDF extraction error: {str(e)}"]
return state
def classify_text_block(state: BZOExtractionState) -> BZOExtractionState:
"""Classify text blocks into articles, headings, tables, etc."""
try:
classified = []
for block_dict in state["text_blocks"]:
text = block_dict["text"].strip()
if not text:
continue
block_type = "other"
article_label = None
article_title = None
# Check for article patterns
article_match = re.search(r'Art\.?\s*(\d+[a-z]?)', text, re.IGNORECASE)
if article_match:
block_type = "article"
article_label = f"Art. {article_match.group(1)}"
# Try to extract title (text after article label, before first period or newline)
title_match = re.search(r'Art\.?\s*\d+[a-z]?\s+(.+?)(?:\.|$|\n)', text, re.IGNORECASE)
if title_match:
article_title = title_match.group(1).strip()
# Check for heading patterns (Roman numerals, letters, numbers)
elif re.match(r'^[A-Z]\.\s+[A-Z]', text) or re.match(r'^[IVX]+\.\s+[A-Z]', text) or re.match(r'^\d+\.\s+[A-Z]', text):
block_type = "heading"
# Check for table patterns (multiple tabs or aligned columns)
elif '\t' in text or (len(text.split()) > 5 and text.count(' ') > 2):
block_type = "table"
classified.append({
"block": {
"page": block_dict["page"],
"text": block_dict["text"],
"block_id": block_dict["block_id"],
"bbox": block_dict.get("bbox")
},
"block_type": block_type,
"article_label": article_label,
"article_title": article_title
})
# Update state with new classified blocks
existing_blocks = state.get("classified_blocks", [])
state["classified_blocks"] = existing_blocks + classified
return state
except Exception as e:
logger.error(f"Error classifying text blocks: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Classification error: {str(e)}"]
return state
def assemble_articles(state: BZOExtractionState) -> BZOExtractionState:
"""Assemble classified blocks into articles with hierarchical structure."""
try:
articles = []
current_article = None
current_section_1 = None
current_section_2 = None
current_section_3 = None
for classified_dict in state["classified_blocks"]:
block_dict = classified_dict["block"]
block = TextBlock(
page=block_dict["page"],
text=block_dict["text"],
block_id=block_dict["block_id"],
bbox=block_dict.get("bbox")
)
text = block.text.strip()
block_type = classified_dict["block_type"]
article_label = classified_dict.get("article_label")
article_title = classified_dict.get("article_title")
# Update section levels
if block_type == "heading":
# Level 1: A., B., C.
if re.match(r'^[A-Z]\.\s+', text):
current_section_1 = text.split('.', 1)[0] + '.'
current_section_2 = None
current_section_3 = None
# Level 2: I., II., III.
elif re.match(r'^[IVX]+\.\s+', text):
current_section_2 = text.split('.', 1)[0] + '.'
current_section_3 = None
# Level 3: 1., 2., 3.
elif re.match(r'^\d+\.\s+', text):
current_section_3 = text.split('.', 1)[0] + '.'
# Start new article
if article_label:
# Save previous article if exists
if current_article:
articles.append(current_article)
# Start new article
current_article = {
"article_label": article_label,
"article_title": article_title,
"text": text,
"page_start": block.page,
"page_end": block.page,
"section_level_1": current_section_1,
"section_level_2": current_section_2,
"section_level_3": current_section_3,
"zone_raw": None
}
# Continue current article
elif current_article:
current_article["text"] += "\n" + text
current_article["page_end"] = block.page
# Add last article
if current_article:
articles.append(current_article)
# Update state with new articles
existing_articles = state.get("articles", [])
state["articles"] = existing_articles + articles
return state
except Exception as e:
logger.error(f"Error assembling articles: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Article assembly error: {str(e)}"]
return state
def detect_zone_changes(state: BZOExtractionState) -> BZOExtractionState:
"""Detect zone declarations and maintain zone scope."""
try:
zones = []
current_zones = state.get("current_zones", {})
for article_dict in state["articles"]:
text = article_dict.get("text", "")
article_label = article_dict.get("article_label", "")
page_start = article_dict.get("page_start", 0)
# Pattern: "Wohnzone W2", "Zone W3", "Gewerbezone G1"
zone_patterns = [
r'(?:Wohnzone|Zone|Gewerbezone|Industriezone|Zentrumszone|Ortsbildschutzzone|Erholungszone)\s+([A-Z0-9/]+)',
r'([A-Z]\d+(?:/\d+)?(?:G)?)', # W2/30, W2/30G, Z3, K3/4
]
for pattern in zone_patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
zone_code = match.group(1).upper()
# Parse zone code
gewerbeerleichterung = zone_code.endswith('G')
if gewerbeerleichterung:
zone_code_base = zone_code[:-1]
else:
zone_code_base = zone_code
# Extract geschosszahl from code (e.g., W2 -> 2, W3/50 -> 3)
geschosszahl = None
if '/' in zone_code_base:
parts = zone_code_base.split('/')
geschosszahl_match = re.search(r'(\d+)', parts[0])
if geschosszahl_match:
geschosszahl = int(geschosszahl_match.group(1))
else:
geschosszahl_match = re.search(r'(\d+)', zone_code_base)
if geschosszahl_match:
geschosszahl = int(geschosszahl_match.group(1))
# Determine zone category from context
zone_category = None
if 'Wohnzone' in text or zone_code.startswith('W'):
zone_category = "Wohnzonen"
elif 'Zentrumszone' in text or zone_code.startswith('Z'):
zone_category = "Zentrumszonen"
elif 'Gewerbezone' in text or zone_code.startswith('G'):
zone_category = "Arbeitsplatzzonen"
elif 'Industriezone' in text or zone_code.startswith('I'):
zone_category = "Arbeitsplatzzonen"
zone_info = ZoneInfo(
zone_code=zone_code,
zone_name=f"Zone {zone_code}",
zone_category=zone_category,
geschosszahl=geschosszahl,
gewerbeerleichterung=gewerbeerleichterung
)
current_zones[zone_code] = zone_info
zones.append({
"zone_code": zone_code,
"zone_name": zone_info.zone_name,
"zone_category": zone_category,
"geschosszahl": geschosszahl,
"gewerbeerleichterung": gewerbeerleichterung,
"source_article": article_label,
"page": page_start
})
# Update state with zones
state["current_zones"] = current_zones
existing_zones = state.get("zones", [])
state["zones"] = existing_zones + zones
return state
except Exception as e:
logger.error(f"Error detecting zones: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Zone detection error: {str(e)}"]
return state
def detect_rule_candidates(state: BZOExtractionState) -> BZOExtractionState:
"""Detect rule candidates using pattern matching."""
try:
candidates = []
for article_dict in state["articles"]:
text = article_dict.get("text", "")
article_label = article_dict.get("article_label", "")
page_start = article_dict.get("page_start", 0)
# Check each rule type in taxonomy
for rule_type, rule_config in RULE_TAXONOMY.items():
patterns = rule_config.get("patterns", [])
for pattern in patterns:
# Create regex pattern (case-insensitive)
regex_pattern = re.compile(pattern, re.IGNORECASE)
matches = regex_pattern.finditer(text)
for match in matches:
# Extract context around match
start = max(0, match.start() - 100)
end = min(len(text), match.end() + 100)
context = text[start:end]
# Check for conditions (geographic, temporal, etc.)
condition_text = None
condition_patterns = [
r'(?:nördlich|südlich|östlich|westlich|oberhalb|unterhalb)\s+[^,\.]+',
r'(?:für|bei|in)\s+[^,\.]+',
]
for cond_pattern in condition_patterns:
cond_match = re.search(cond_pattern, context, re.IGNORECASE)
if cond_match:
condition_text = cond_match.group(0)
break
candidate = {
"rule_type": rule_type,
"matched_text": match.group(0),
"article_text": text,
"page": page_start,
"condition_text": condition_text,
"is_table_rule": False,
"table_zones": []
}
candidates.append(candidate)
# Update state with rule candidates
existing_candidates = state.get("rule_candidates", [])
state["rule_candidates"] = existing_candidates + candidates
return state
except Exception as e:
logger.error(f"Error detecting rule candidates: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Rule candidate detection error: {str(e)}"]
return state
def parse_rule_values(state: BZOExtractionState) -> BZOExtractionState:
"""Parse rule values using regex (LLM fallback can be added later)."""
try:
parsed_rules = []
for candidate_dict in state["rule_candidates"]:
rule_type = candidate_dict["rule_type"]
rule_config = RULE_TAXONOMY.get(rule_type, {})
units = rule_config.get("units", [])
value_type = rule_config.get("value_type", "numeric")
# Extract value using regex
matched_text = candidate_dict["matched_text"]
article_text = candidate_dict["article_text"]
text = matched_text + " " + article_text[article_text.find(matched_text):article_text.find(matched_text) + 200]
value_numeric = None
value_text = matched_text
unit = None
# Try to extract numeric value
if value_type in ["numeric", "integer"]:
# Pattern: "max. 4", "30 %", "min. 3.5 m"
value_patterns = [
r'(?:max|maximal|min|mindestens|höchstens)\s*\.?\s*(\d+(?:\.\d+)?)',
r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)',
r'(\d+(?:\.\d+)?)',
]
for pattern in value_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
try:
value_numeric = float(match.group(1))
if value_type == "integer":
value_numeric = int(value_numeric)
# Check for unit
unit_match = re.search(r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)', text, re.IGNORECASE)
if unit_match:
unit = unit_match.group(2).lower()
if unit in ["meter", "metern"]:
unit = "m"
elif unit == "prozent":
unit = "%"
break
except ValueError:
continue
# Calculate confidence
confidence = 0.5
if value_numeric is not None:
confidence = 0.8
if unit:
confidence = 0.9
# Determine zone and scope
zone_raw = None
rule_scope = "general"
# Check current zones context
if state.get("current_zones"):
# Use first zone as default (can be improved)
zone_raw = list(state["current_zones"].keys())[0] if state["current_zones"] else None
rule_scope = "zone" if zone_raw else "general"
parsed_rule = {
"rule_type": rule_type,
"value_numeric": value_numeric,
"value_text": value_text,
"unit": unit,
"condition_text": candidate_dict.get("condition_text"),
"is_table_rule": candidate_dict.get("is_table_rule", False),
"table_zones": candidate_dict.get("table_zones", []),
"page": candidate_dict["page"],
"text_snippet": value_text,
"zone_raw": zone_raw,
"rule_scope": rule_scope,
"confidence": confidence
}
parsed_rules.append(parsed_rule)
# Update state with parsed rules
existing_rules = state.get("parsed_rules", [])
state["parsed_rules"] = existing_rules + parsed_rules
return state
except Exception as e:
logger.error(f"Error parsing rule values: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Rule parsing error: {str(e)}"]
return state
def assign_zone_and_scope(state: BZOExtractionState) -> BZOExtractionState:
"""Assign zone and scope to parsed rules."""
try:
# Rules already have zone and scope assigned in parse_rule_values
# This node can refine assignments if needed
return state
except Exception as e:
logger.error(f"Error assigning zone and scope: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Zone/scope assignment error: {str(e)}"]
return state
def confidence_scoring(state: BZOExtractionState) -> BZOExtractionState:
"""Calculate confidence scores for extracted data."""
try:
# Confidence already calculated in parse_rule_values
# This node can refine scores if needed
return state
except Exception as e:
logger.error(f"Error calculating confidence: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Confidence scoring error: {str(e)}"]
return state
# ===== Graph Construction =====
def create_bzo_extraction_graph():
"""Create and compile the BZO extraction graph."""
workflow = StateGraph(BZOExtractionState)
# Add nodes
workflow.add_node("extract_pdf_text", extract_pdf_text)
workflow.add_node("classify_text_block", classify_text_block)
workflow.add_node("assemble_articles", assemble_articles)
workflow.add_node("detect_zone_changes", detect_zone_changes)
workflow.add_node("detect_rule_candidates", detect_rule_candidates)
workflow.add_node("parse_rule_values", parse_rule_values)
workflow.add_node("assign_zone_and_scope", assign_zone_and_scope)
workflow.add_node("confidence_scoring", confidence_scoring)
# Define edges
workflow.set_entry_point("extract_pdf_text")
workflow.add_edge("extract_pdf_text", "classify_text_block")
workflow.add_edge("classify_text_block", "assemble_articles")
workflow.add_edge("assemble_articles", "detect_zone_changes")
workflow.add_edge("detect_zone_changes", "detect_rule_candidates")
workflow.add_edge("detect_rule_candidates", "parse_rule_values")
workflow.add_edge("parse_rule_values", "assign_zone_and_scope")
workflow.add_edge("assign_zone_and_scope", "confidence_scoring")
workflow.add_edge("confidence_scoring", END)
return workflow.compile()
def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None) -> Dict[str, Any]:
"""
Run the extraction pipeline on a PDF and return structured, sorted results.
Args:
pdf_bytes: PDF file content as bytes
pdf_id: Optional identifier for the PDF (defaults to generated ID)
dokument_id: Optional dokument ID for reference
Returns:
Dictionary with extracted and sorted content:
{
"articles": [...], # Sorted by page_start, then article_label
"zones": [...], # Sorted by zone_code
"rules": [...], # Sorted by rule_type, then page
"errors": [...],
"warnings": [...]
}
"""
import uuid
if not pdf_id:
pdf_id = f"pdf_{uuid.uuid4().hex[:8]}"
# Initialize state
state: BZOExtractionState = {
"dokument_id": dokument_id,
"pdf_id": pdf_id,
"text_blocks": [],
"classified_blocks": [],
"articles": [],
"current_zones": {},
"zones": [],
"rule_candidates": [],
"parsed_rules": [],
"errors": [],
"warnings": []
}
# Extract PDF text first
pdf_extractor = BZOPdfExtractor()
text_blocks_objects = pdf_extractor.extract_text_blocks(pdf_bytes, state["pdf_id"])
# Convert TextBlock objects to dicts for state
state["text_blocks"] = [
{
"page": tb.page,
"text": tb.text,
"block_id": tb.block_id,
"bbox": tb.bbox
}
for tb in text_blocks_objects
]
# Create and run graph
graph = create_bzo_extraction_graph()
final_state = graph.invoke(state)
# Sort and structure results
articles = sorted(
final_state.get("articles", []),
key=lambda x: (x.get("page_start", 0), x.get("article_label", ""))
)
zones = sorted(
final_state.get("zones", []),
key=lambda x: x.get("zone_code", "")
)
rules = sorted(
final_state.get("parsed_rules", []),
key=lambda x: (x.get("rule_type", ""), x.get("page", 0))
)
return {
"articles": articles,
"zones": zones,
"rules": rules,
"errors": final_state.get("errors", []),
"warnings": final_state.get("warnings", [])
}
def extract_from_documents(
document_retriever,
dokument_ids: List[str]
) -> Dict[str, Any]:
"""
Extract BZO content from one or more documents.
Args:
document_retriever: BZODocumentRetriever instance
dokument_ids: List of dokument IDs to process
Returns:
Dictionary with results per document:
{
"results": [
{
"dokument_id": "...",
"articles": [...],
"zones": [...],
"rules": [...],
"errors": [...],
"warnings": [...]
},
...
],
"summary": {
"total_documents": N,
"successful": M,
"failed": K,
"total_articles": X,
"total_zones": Y,
"total_rules": Z
}
}
"""
results = []
total_articles = 0
total_zones = 0
total_rules = 0
successful = 0
failed = 0
# Retrieve documents
dokumente = document_retriever.get_documents_by_ids(dokument_ids)
for dokument in dokumente:
try:
# Retrieve PDF content
pdf_bytes = document_retriever.retrieve_pdf_content(dokument)
if not pdf_bytes:
logger.warning(f"Could not retrieve PDF for dokument {dokument.id}")
results.append({
"dokument_id": dokument.id,
"articles": [],
"zones": [],
"rules": [],
"errors": [f"Could not retrieve PDF content"],
"warnings": []
})
failed += 1
continue
# Run extraction
extraction_result = run_extraction(
pdf_bytes=pdf_bytes,
pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}",
dokument_id=dokument.id
)
# Add dokument_id to result
extraction_result["dokument_id"] = dokument.id
results.append(extraction_result)
# Update counters
total_articles += len(extraction_result.get("articles", []))
total_zones += len(extraction_result.get("zones", []))
total_rules += len(extraction_result.get("rules", []))
if extraction_result.get("errors"):
failed += 1
else:
successful += 1
except Exception as e:
logger.error(f"Error processing dokument {dokument.id}: {str(e)}", exc_info=True)
results.append({
"dokument_id": dokument.id,
"articles": [],
"zones": [],
"rules": [],
"errors": [f"Processing error: {str(e)}"],
"warnings": []
})
failed += 1
return {
"results": results,
"summary": {
"total_documents": len(dokument_ids),
"successful": successful,
"failed": failed,
"total_articles": total_articles,
"total_zones": total_zones,
"total_rules": total_rules
}
}