gateway/modules/features/realEstate/bzoExtractionLangGraph.py

1497 lines
60 KiB
Python

"""
LangGraph-based pipeline for extracting structured content from BZO PDFs.
"""
import logging
import re
from typing import TypedDict, List, Dict, Any, Optional
from dataclasses import dataclass
from langgraph.graph import StateGraph, START, END
from modules.features.realEstate.bzoPdfExtractor import BZOPdfExtractor, TextBlock
from modules.features.realEstate.bzoRuleTaxonomy import RULE_TAXONOMY
logger = logging.getLogger(__name__)
# ===== BZO Params Extraction State (LangGraph with LLM) =====
class BZOParamsExtractionState(TypedDict):
"""State for BZO params extraction via LLM."""
extracted_content: Dict[str, Any]
bauzone: str
total_area_m2: Optional[float]
relevant_rules: List[Dict[str, Any]]
relevant_articles: List[Dict[str, Any]]
zone_parameter_tables: List[Dict[str, Any]]
ai_service: Any
gemeinde: str
# Output
bauzone_params_list: List[str]
fakten: List[Dict[str, str]]
zusatzinformationen: List[Dict[str, Any]]
errors: List[str]
# ===== State Definition =====
@dataclass
class ClassifiedBlock:
"""Classified text block."""
block: TextBlock
block_type: str # "article", "heading", "table", "other"
article_label: Optional[str] = None
article_title: Optional[str] = None
@dataclass
class Article:
"""Assembled article."""
article_label: str
article_title: Optional[str]
text: str
page_start: int
page_end: int
section_level_1: Optional[str] = None
section_level_2: Optional[str] = None
section_level_3: Optional[str] = None
zone_raw: Optional[str] = None
@dataclass
class ZoneInfo:
"""Zone information."""
zone_code: str
zone_name: str
zone_category: Optional[str] = None
zone_subcategory: Optional[str] = None
empfindlichkeitsstufe: Optional[str] = None
geschosszahl: Optional[int] = None
gewerbeerleichterung: bool = False
@dataclass
class RuleCandidate:
"""Rule candidate from pattern matching."""
rule_type: str
matched_text: str
article_text: str
page: int
is_table_rule: bool = False
table_zones: List[str] = None
condition_text: Optional[str] = None
@dataclass
class ParsedRule:
"""Parsed rule with structured values."""
rule_type: str
value_numeric: Optional[float]
value_text: str
unit: Optional[str]
condition_text: Optional[str]
is_table_rule: bool
table_zones: List[str]
page: int
text_snippet: str
zone_raw: Optional[str] = None
rule_scope: str = "general"
confidence: float = 0.5
class BZOExtractionState(TypedDict):
"""State for BZO extraction pipeline."""
# Input metadata
dokument_id: Optional[str]
pdf_id: str
# Extracted text blocks (stored as dicts for serialization)
text_blocks: List[Dict[str, Any]]
# Classified blocks (stored as dicts for serialization)
classified_blocks: List[Dict[str, Any]]
# Assembled articles (stored as dicts for serialization)
articles: List[Dict[str, Any]]
# Zone tracking
current_zones: Dict[str, Dict[str, Any]]
zones: List[Dict[str, Any]]
# Rule extraction (stored as dicts for serialization)
rule_candidates: List[Dict[str, Any]]
parsed_rules: List[Dict[str, Any]]
# Zone-parameter tables (structured table data mapping zones to parameters)
zone_parameter_tables: List[Dict[str, Any]]
# Processing metadata
errors: List[str]
warnings: List[str]
# ===== Node Implementations (Simplified 4-node pipeline) =====
def classify_and_assemble(state: BZOExtractionState) -> BZOExtractionState:
"""Classify text blocks and assemble into articles (merged node)."""
try:
classified = []
for block_dict in state["text_blocks"]:
text = block_dict["text"].strip()
if not text:
continue
block_type = "other"
article_label = None
article_title = None
article_match = re.search(r'Art\.?\s*(\d+[a-z]?)', text, re.IGNORECASE)
if article_match:
block_type = "article"
article_label = f"Art. {article_match.group(1)}"
title_match = re.search(r'Art\.?\s*\d+[a-z]?\s+(.+?)(?:\.|$|\n)', text, re.IGNORECASE)
if title_match:
article_title = title_match.group(1).strip()
elif re.match(r'^[A-Z]\.\s+[A-Z]', text) or re.match(r'^[IVX]+\.\s+[A-Z]', text) or re.match(r'^\d+\.\s+[A-Z]', text):
block_type = "heading"
elif '\t' in text or (len(text.split()) > 5 and text.count(' ') > 2):
block_type = "table"
classified.append({
"block": {"page": block_dict["page"], "text": block_dict["text"], "block_id": block_dict["block_id"], "bbox": block_dict.get("bbox")},
"block_type": block_type, "article_label": article_label, "article_title": article_title
})
state["classified_blocks"] = classified
articles = []
current_article = None
current_section_1 = current_section_2 = current_section_3 = None
for classified_dict in classified:
block_dict = classified_dict["block"]
text = block_dict["text"].strip()
block_type = classified_dict["block_type"]
article_label = classified_dict.get("article_label")
article_title = classified_dict.get("article_title")
if block_type == "heading":
if re.match(r'^[A-Z]\.\s+', text):
current_section_1 = text.split('.', 1)[0] + '.'
current_section_2 = current_section_3 = None
elif re.match(r'^[IVX]+\.\s+', text):
current_section_2 = text.split('.', 1)[0] + '.'
current_section_3 = None
elif re.match(r'^\d+\.\s+', text):
current_section_3 = text.split('.', 1)[0] + '.'
if article_label:
if current_article:
articles.append(current_article)
current_article = {
"article_label": article_label, "article_title": article_title, "text": text,
"page_start": block_dict["page"], "page_end": block_dict["page"],
"section_level_1": current_section_1, "section_level_2": current_section_2,
"section_level_3": current_section_3, "zone_raw": None
}
elif current_article:
current_article["text"] += "\n" + text
current_article["page_end"] = block_dict["page"]
if current_article:
articles.append(current_article)
state["articles"] = articles
return state
except Exception as e:
logger.error(f"Error in classify_and_assemble: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Classify/assemble error: {str(e)}"]
return state
def extract_zones_and_tables(state: BZOExtractionState) -> BZOExtractionState:
"""Detect zones and extract zone-parameter tables (merged node)."""
try:
# Part 1: Detect zone declarations
zones = []
current_zones = {}
for article_dict in state["articles"]:
text = article_dict.get("text", "")
article_label = article_dict.get("article_label", "")
page_start = article_dict.get("page_start", 0)
# Pattern: "Wohnzone W2", "Zone W3", "Gewerbezone G1"
zone_patterns = [
r'(?:Wohnzone|Zone|Gewerbezone|Industriezone|Zentrumszone|Ortsbildschutzzone|Erholungszone)\s+([A-Z0-9/]+)',
r'([A-Z]\d+(?:/\d+)?(?:G)?)', # W2/30, W2/30G, Z3, K3/4
]
for pattern in zone_patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
zone_code = match.group(1).upper()
# Parse zone code
gewerbeerleichterung = zone_code.endswith('G')
if gewerbeerleichterung:
zone_code_base = zone_code[:-1]
else:
zone_code_base = zone_code
# Extract geschosszahl from code (e.g., W2 -> 2, W3/50 -> 3)
geschosszahl = None
if '/' in zone_code_base:
parts = zone_code_base.split('/')
geschosszahl_match = re.search(r'(\d+)', parts[0])
if geschosszahl_match:
geschosszahl = int(geschosszahl_match.group(1))
else:
geschosszahl_match = re.search(r'(\d+)', zone_code_base)
if geschosszahl_match:
geschosszahl = int(geschosszahl_match.group(1))
# Determine zone category from context
zone_category = None
if 'Wohnzone' in text or zone_code.startswith('W'):
zone_category = "Wohnzonen"
elif 'Zentrumszone' in text or zone_code.startswith('Z'):
zone_category = "Zentrumszonen"
elif 'Gewerbezone' in text or zone_code.startswith('G'):
zone_category = "Arbeitsplatzzonen"
elif 'Industriezone' in text or zone_code.startswith('I'):
zone_category = "Arbeitsplatzzonen"
zone_info = ZoneInfo(
zone_code=zone_code,
zone_name=f"Zone {zone_code}",
zone_category=zone_category,
geschosszahl=geschosszahl,
gewerbeerleichterung=gewerbeerleichterung
)
current_zones[zone_code] = zone_info
zones.append({
"zone_code": zone_code,
"zone_name": zone_info.zone_name,
"zone_category": zone_category,
"geschosszahl": geschosszahl,
"gewerbeerleichterung": gewerbeerleichterung,
"source_article": article_label,
"page": page_start
})
state["current_zones"] = current_zones
state["zones"] = zones
# Part 2: Extract zone-parameter tables
_extract_zone_parameter_tables_impl(state)
return state
except Exception as e:
logger.error(f"Error in extract_zones_and_tables: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Zones/tables error: {str(e)}"]
return state
def _extract_zone_parameter_tables_impl(state: BZOExtractionState) -> None:
"""Extract zone-parameter tables from classified blocks. Mutates state in place."""
tables = []
table_blocks = [b for b in state.get("classified_blocks", []) if b.get("block_type") == "table"]
zone_pattern = r'\b([WLIZK]\d+(?:/\d+)?(?:G\*?)?)\b'
parameter_keywords = [
r'Ausnützungsziffer', r'Überbauungsziffer', r'Vollgeschosse', r'Dachgeschosse', r'Attikageschoss', r'Untergeschoss',
r'Gebäudelänge', r'Grenzabstand', r'Fassadenhöhen', r'Grundabstand', r'Mehrlängen', r'Höchstmass'
]
parameter_row_patterns = [
r'^[a-g]\)\s+(.+?)(?:\s+max\.|min\.|:)?',
r'^(Ausnützungsziffer|Überbauungsziffer|Vollgeschosse|Dachgeschosse|Attikageschoss|Untergeschoss|Gebäudelänge|Grenzabstand|Fassadenhöhen|Grundabstand|Mehrlängen|Höchstmass|Höchstmaß)',
]
subparameter_patterns = [
r'^(Grundabstand|Mehrlängen|Höchstmass|Höchstmaß|Fassadenhöhen)\s*(min\.|max\.)?',
r'^(anrechenbare\s+Dachgeschosse|anrechenbares\s+Attikageschoss|anrechenbares\s+Untergeschoss)',
]
numeric_pattern = r'(\d+(?:\.\d+)?)\s*(%|m|Geschoss|Geschosse|Geschosse\s+max\.?|Geschoss\s+max\.?)?'
for table_block in table_blocks:
block_dict = table_block.get("block", {})
text = block_dict.get("text", "")
page = block_dict.get("page", 0)
if not text or len(text.strip()) < 20:
continue
lines = text.split('\n')
header_row_idx, zone_columns = None, []
for idx, line in enumerate(lines):
zone_matches = re.findall(zone_pattern, line, re.IGNORECASE)
if len(zone_matches) >= 3:
header_row_idx, zone_columns = idx, zone_matches
break
if not zone_columns:
has_parameters = any(re.search(kw, text, re.IGNORECASE) for kw in parameter_keywords)
has_zones = len(re.findall(zone_pattern, text, re.IGNORECASE)) >= 3
if has_parameters and has_zones:
zone_columns = list(dict.fromkeys(re.findall(zone_pattern, text, re.IGNORECASE)))
header_row_idx = 0
if not zone_columns:
continue
article_context = None
for block in state.get("classified_blocks", []):
if block.get("block", {}).get("page") == page and block.get("article_label"):
article_context = block.get("article_label")
break
table_data = {"page": page, "zones": zone_columns, "parameters": [], "source_text": text[:500], "article": article_context}
start_idx = (header_row_idx + 1) if header_row_idx is not None else 0
current_parameter = current_subparameter = None
parameter_values = subparameter_values = {}
for line_idx in range(start_idx, len(lines)):
line = lines[line_idx].strip()
if not line:
continue
is_parameter_row, parameter_name = False, None
for pat in parameter_row_patterns:
m = re.match(pat, line, re.IGNORECASE)
if m:
is_parameter_row, parameter_name = True, re.sub(r'\s+max\.?\s*$', '', re.sub(r'\s+min\.?\s*$', '', m.group(1).strip(), flags=re.I), flags=re.I)
break
is_subparameter, subparameter_name = False, None
if not is_parameter_row:
for pat in subparameter_patterns:
m = re.search(pat, line, re.IGNORECASE)
if m:
is_subparameter, subparameter_name = True, m.group(1).strip() + (f" {m.group(2).strip()}" if m.lastindex and m.lastindex >= 2 and m.group(2) else "")
break
target_values = subparameter_values if current_subparameter else parameter_values
if is_parameter_row and parameter_name:
if current_parameter and parameter_values:
table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context})
current_parameter, current_subparameter, parameter_values, subparameter_values = parameter_name, None, {}, {}
continue
if is_subparameter and subparameter_name:
if current_subparameter and subparameter_values and current_parameter:
table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context})
current_subparameter, subparameter_values = subparameter_name, {}
continue
if current_parameter or current_subparameter:
line_parts = re.split(r'\s{2,}|\t', line)
line_parts = [p.strip() for p in line_parts if p.strip()]
n = len(zone_columns)
value_parts = []
# Column-based: extract trailing numeric/fraction parts that align with zone count
for p in reversed(line_parts):
if re.match(r'^\d+(?:\.\d+)?\s*(%|m)?$', p, re.I) or re.match(r'^\d+/\d+$', p):
val = re.sub(r'\s*(%|m)$', '', p, flags=re.I).strip()
unit = None
um = re.search(r'\s*(%|m)$', p, re.I)
if um:
unit = 'm' if um.group(1).lower() == 'm' else '%'
value_parts.insert(0, (val, unit))
else:
break
if len(value_parts) == n:
for zi, zone in enumerate(zone_columns):
if zone not in target_values:
target_values[zone] = []
val, unit = value_parts[zi]
target_values[zone].append({"value": val, "unit": unit, "raw_text": line[:200], "line_number": line_idx})
else:
# Fallback: regex match by character position
all_matches = [(m.start(), m.group(0), m.group(1), m.group(2) if m.lastindex and m.lastindex > 1 else None) for m in re.finditer(numeric_pattern, line, re.I)]
all_matches += [(m.start(), m.group(0), m.group(0), None) for m in re.finditer(r'(\d+/\d+)', line, re.I)]
all_matches.sort(key=lambda x: x[0])
if len(all_matches) == n:
for zi, zone in enumerate(zone_columns):
if zone not in target_values:
target_values[zone] = []
_, _, val, unit = all_matches[zi]
target_values[zone].append({"value": val, "unit": unit.strip() if unit else None, "raw_text": line[:200], "line_number": line_idx})
if current_subparameter and subparameter_values and current_parameter:
table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context})
if current_parameter and parameter_values:
table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context})
if table_data["parameters"]:
tables.append(table_data)
state["zone_parameter_tables"] = state.get("zone_parameter_tables", []) + tables
if tables:
logger.info(f"Extracted {len(tables)} zone-parameter tables")
# Zone code pattern: W5, W2/30, Z3, K3/4, W5G, W 5 (optional space)
_ZONE_CODE_PATTERN = re.compile(r'\b([WZIK]\s*\d+(?:\s*/\s*\d+)?(?:G)?)\b', re.IGNORECASE)
def _zones_in_text(text: str) -> List[str]:
"""Extract zone codes (W5, W2/30, Z3, etc.) from text. Returns unique list, normalized (e.g. W5)."""
matches = _ZONE_CODE_PATTERN.findall(text)
seen = set()
result = []
for m in matches:
# Normalize: remove spaces -> W5, W2/30
n = re.sub(r'\s+', '', m).upper()
if n and n not in seen:
seen.add(n)
result.append(n)
return result
def extract_rules(state: BZOExtractionState) -> BZOExtractionState:
"""Detect rule candidates and parse values. Associates each rule with zones from its source article."""
try:
candidates = []
for article_dict in state["articles"]:
text = article_dict.get("text", "")
page_start = article_dict.get("page_start", 0)
# Zones mentioned in THIS article - rules from this article apply to these zones
article_zones = _zones_in_text(text)
for rule_type, rule_config in RULE_TAXONOMY.items():
for pattern in rule_config.get("patterns", []):
for match in re.finditer(pattern, text, re.IGNORECASE):
start, end = max(0, match.start() - 100), min(len(text), match.end() + 100)
context = text[start:end]
condition_text = None
for cond_pat in [r'(?:nördlich|südlich|östlich|westlich|oberhalb|unterhalb)\s+[^,\.]+', r'(?:für|bei|in)\s+[^,\.]+']:
cm = re.search(cond_pat, context, re.IGNORECASE)
if cm:
condition_text = cm.group(0)
break
candidates.append({
"rule_type": rule_type, "matched_text": match.group(0), "article_text": text,
"page": page_start, "article_label": article_dict.get("article_label"),
"condition_text": condition_text, "is_table_rule": False,
"table_zones": article_zones.copy(),
})
parsed_rules = []
for candidate_dict in candidates:
rule_type = candidate_dict["rule_type"]
rule_config = RULE_TAXONOMY.get(rule_type, {})
units = rule_config.get("units", [])
value_type = rule_config.get("value_type", "numeric")
# Extract value using regex
matched_text = candidate_dict["matched_text"]
article_text = candidate_dict["article_text"]
text = matched_text + " " + article_text[article_text.find(matched_text):article_text.find(matched_text) + 200]
value_numeric = None
value_text = matched_text
unit = None
# Try to extract numeric value
if value_type in ["numeric", "integer"]:
# Pattern: "max. 4", "30 %", "min. 3.5 m"
value_patterns = [
r'(?:max|maximal|min|mindestens|höchstens)\s*\.?\s*(\d+(?:\.\d+)?)',
r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)',
r'(\d+(?:\.\d+)?)',
]
for pattern in value_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
try:
value_numeric = float(match.group(1))
if value_type == "integer":
value_numeric = int(value_numeric)
# Check for unit
unit_match = re.search(r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)', text, re.IGNORECASE)
if unit_match:
unit = unit_match.group(2).lower()
if unit in ["meter", "metern"]:
unit = "m"
elif unit == "prozent":
unit = "%"
break
except ValueError:
continue
# Calculate confidence
confidence = 0.5
if value_numeric is not None:
confidence = 0.8
if unit:
confidence = 0.9
# Zone association from source article (zones mentioned in that article)
article_zones = candidate_dict.get("table_zones", [])
zone_raw = article_zones[0] if article_zones else None
rule_scope = "zone" if zone_raw else "general"
parsed_rule = {
"rule_type": rule_type,
"value_numeric": value_numeric,
"value_text": value_text,
"unit": unit,
"condition_text": candidate_dict.get("condition_text"),
"is_table_rule": candidate_dict.get("is_table_rule", False),
"table_zones": article_zones,
"page": candidate_dict["page"],
"article_label": candidate_dict.get("article_label"),
"text_snippet": value_text,
"zone_raw": zone_raw,
"rule_scope": rule_scope,
"confidence": confidence
}
parsed_rules.append(parsed_rule)
state["parsed_rules"] = parsed_rules
return state
except Exception as e:
logger.error(f"Error in extract_rules: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Extract rules error: {str(e)}"]
return state
# ===== Wohnzone Parameter Extraction =====
# Canonical order for BZO parameters (Fakten)
BZO_PARAM_ORDER = [
"vollgeschosse", "vollgeschoss",
"anrechenbares untergeschoss", "untergeschoss",
"anrechenbares dachgeschoss", "dachgeschoss", "attikageschoss",
"ausnützungsziffer", "ausnutzungsziffer", "az",
"überbauungsziffer",
"gebäudehöhe", "fassadenhöhen",
"grundabstand", "grenzabstand",
"gebäudelänge",
"mehrlängen", "höchstmass",
"baumassenziffer", "grünflächenziffer", "wohnflächenanteil", "gebäudebreite",
]
RULE_TYPE_TO_PARAM: Dict[str, str] = {
"max_building_height": "Gebäudehöhe max.",
"max_floors": "Vollgeschosse max.",
"max_attachable_attics": "anrechenbares Dachgeschoss max.",
"max_attachable_basement": "anrechenbares Untergeschoss max.",
"density": "Ausnützungsziffer",
"building_coverage": "Überbauungsziffer",
"building_mass_index": "Baumassenziffer (BMZ)",
"green_space_index": "Grünflächenziffer (GFZ)",
"boundary_distance": "Grundabstand min.",
"boundary_distance_length_surcharge": "Mehrlängen-zuschlag (MLZ)",
"boundary_distance_max": "Höchstmass Grenzabstand max.",
"building_length": "Gebäudelänge max.",
"building_width": "Gebäudebreite max.",
"residential_area_share": "Wohnflächenanteil",
}
RULE_TYPE_TO_DEFAULT_UNIT: Dict[str, str] = {
"max_building_height": "m",
"max_floors": "Stk.",
"max_attachable_attics": "Stk.",
"max_attachable_basement": "Stk.",
"density": "%",
"building_coverage": "%",
"building_mass_index": "",
"green_space_index": "%",
"boundary_distance": "m",
"boundary_distance_length_surcharge": "",
"boundary_distance_max": "m",
"building_length": "m",
"building_width": "m",
"residential_area_share": "%",
}
_ARTIKEL_KEYWORDS = [
r"herabsetzung", r"grenzabstand", r"nutzweise", r"wohnanteil",
r"besondere\s+gebäude", r"überbauungsziffer", r"sonderregel",
r"ausnahmen", r"abweichungen", r"erleichterungen",
r"mischung", r"gewerbe", r"dienstleistung",
r"kantonale", r"abstandsvorschriften",
r"vollgeschoss", r"reduziert", r"mindestmass",
r"störend", r"nicht\s+störend", r"mässig\s+störend",
]
# Artikel that are parameter tables - EXCLUDE from Weiterführende Bestimmungen
_ZUSATZ_EXCLUDE_TITLES = ("zonen", "grundmasse", "mehrlängenzuschlag", "mehrlaengenzuschlag")
# Artikel that are substantive provisions - INCLUDE in Weiterführende Bestimmungen
_ZUSATZ_INCLUDE_TITLES = (
"herabsetzung", "nutzweise", "besondere", "besonderes",
"ausnahmen", "abweichungen", "erleichterungen", "sonderregel",
"wohnanteil", "nutzungsart", "abstandsvorschriften",
"mischung", "gewerbe", "dienstleistung",
)
def _format_article_text_readable(text: str, article_label: str = "", article_title: str = "") -> str:
"""Format raw PDF-extracted text for readable display."""
if not text or not text.strip():
return ""
# Strip redundant article header at start (e.g. "Art. 16 Nutzweise" when already in summary)
if article_label or article_title:
prefix = f"{article_label} {article_title}".strip()
if prefix:
pat = re.escape(prefix)
text = re.sub(rf"^{pat}\s*", "", text.strip(), flags=re.I).lstrip()
lines = []
for line in text.split("\n"):
line = line.strip()
if not line:
continue
lines.append(line)
if not lines:
return ""
# Join hyphenated word breaks (e.g. "Gewerbe-\nund" -> "Gewerbe und")
merged = []
i = 0
while i < len(lines):
line = lines[i]
while line.rstrip().endswith("-") and i + 1 < len(lines):
line = line.rstrip()[:-1] + lines[i + 1].strip()
i += 1
if re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines):
next_line = lines[i + 1]
if not re.match(r"^Art\.\s", next_line) and len(next_line) > 3:
line = line + " " + next_line.strip()
i += 1
elif re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines) and re.match(r"^Art\.\s", lines[i + 1]):
i += 1
continue
merged.append(line)
i += 1
combined = " ".join(merged)
# Fix run-together paragraph numbers: "1In" -> "1. In", "2Ist" -> "2. Ist"
combined = re.sub(r"(\d)([A-ZÄÖÜ])", r"\1. \2", combined)
# Also fix "a)Something" -> "a) Something" for subparagraphs
combined = re.sub(r"([a-z]\))([A-ZÄÖÜ])", r"\1 \2", combined)
# Split into paragraphs: numbered (1. ..., 2. ...) or lettered (a) ..., b) ...)
parts = re.split(r"(?=\d+\.\s+[A-ZÄÖÜa-zäöü])|(?=[a-z]\)\s+[A-ZÄÖÜa-zäöü])", combined)
paragraphs = []
for p in parts:
p = p.strip()
if not p or len(p) < 3:
continue
paragraphs.append(p)
return "\n\n".join(paragraphs)
def _is_zusatzinfo_article(title: str) -> bool:
"""True if article should appear in Weiterführende Bestimmungen (provisions, not param tables)."""
t = (title or "").lower().strip()
for exc in _ZUSATZ_EXCLUDE_TITLES:
if exc in t:
return False
for inc in _ZUSATZ_INCLUDE_TITLES:
if inc in t:
return True
return False
def _bzo_build_source(page: Optional[int], article: Optional[str]) -> str:
"""Build source string: Art. X, S. Y"""
parts = []
if article:
parts.append(str(article))
if page is not None and page > 0:
parts.append(f"S. {page}")
return ", ".join(parts) if parts else ""
def _bzo_zone_matches_table(bauzone: str, zone_col: str) -> bool:
"""Check if table column zone matches target bauzone."""
b = (bauzone or "").upper().strip()
z = (zone_col or "").upper().strip()
if not b or not z:
return False
return b in z or (len(z) >= 2 and z in b)
def _bzo_article_mentions_bauzone(article_text: str, bauzone: str) -> bool:
"""Check if article text mentions the bauzone or applies to it."""
if not bauzone or not article_text:
return False
b = bauzone.upper().strip()
t = article_text.upper()
if b in t:
return True
if len(b) >= 2 and b[0] in "WZIK" and re.search(rf"\b{b[0]}\s*\d+", t):
base = re.sub(r"\s+", "", b.split("/")[0].rstrip("G"))
if base in t or re.search(rf"\b{base}\b", t):
return True
return False
def _bzo_get_params_from_tables(
zone_parameter_tables: List[Dict[str, Any]],
bauzone: str
) -> List[Dict[str, Any]]:
"""Extract parameter values for a Bauzone from zone-parameter tables."""
result = []
seen = set()
for table in zone_parameter_tables:
zones = table.get("zones", [])
if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in zones):
continue
page = table.get("page")
art = table.get("article")
for param in table.get("parameters", []):
values_by_zone = param.get("values_by_zone", {})
for zone, values in values_by_zone.items():
if not _bzo_zone_matches_table(bauzone, str(zone)):
continue
if not isinstance(values, list) or len(values) == 0:
continue
val_entry = values[0]
value = val_entry.get("value", "")
unit = val_entry.get("unit") or ""
param_name = param.get("parameter", "")
key = f"{param_name}|{value}|{unit}"
if key not in seen:
seen.add(key)
source = _bzo_build_source(page, param.get("article") or art)
result.append({
"parameter": param_name,
"value": str(value),
"unit": str(unit).strip() if unit else "",
"source": source or "Tabelle im Dokument",
"rule_type": None,
})
return result
def _bzo_filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""Filter rules by Bauzone code."""
bauzone_upper = (bauzone or "").upper()
out = []
for r in rules:
if bauzone_upper in (r.get("zone_raw") or "").upper():
out.append(r)
continue
for tz in (r.get("table_zones") or []):
if bauzone_upper in str(tz).upper():
out.append(r)
break
else:
if bauzone_upper in (r.get("text_snippet") or "").upper():
out.append(r)
return out
def _bzo_get_params_from_rules(rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert parsed rules to {parameter, value, unit, source, rule_type} format."""
result = []
seen = set()
for r in rules:
rule_type = r.get("rule_type", "")
param_name = RULE_TYPE_TO_PARAM.get(rule_type) or rule_type.replace("_", " ").title()
value_numeric = r.get("value_numeric")
value_text = r.get("value_text", "")
unit = r.get("unit") or ""
if value_numeric is not None:
val_str = str(int(value_numeric)) if isinstance(value_numeric, float) and value_numeric == int(value_numeric) else str(value_numeric)
else:
val_str = str(value_text).strip() if value_text else ""
if not val_str:
continue
val_lower = val_str.lower()
if val_lower in ("gebäudelänge", "gebäudebreite", "mehrlängenzuschlag", "mehrlängen", "grenzabstand", "fassadenhöhe"):
continue
unit_str = str(unit).strip() if unit else (RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, ""))
page = r.get("page")
article = r.get("article_label")
source = _bzo_build_source(page, article) or "Artikeltxt"
key = f"{param_name}|{val_str}|{unit_str}"
if key not in seen:
seen.add(key)
result.append({
"parameter": param_name,
"value": val_str,
"unit": unit_str,
"source": source,
"rule_type": rule_type,
})
return result
def _bzo_param_to_rule_type(param_name: str) -> Optional[str]:
"""Map parameter display name to rule_type."""
p = (param_name or "").lower()
if "vollgeschoss" in p:
return "max_floors"
if "dachgeschoss" in p or "attika" in p:
return "max_attachable_attics"
if "untergeschoss" in p:
return "max_attachable_basement"
if "ausnützungsziffer" in p or "ausnutzungsziffer" in p or " az " in p:
return "density"
if "überbauungsziffer" in p or " uz " in p:
return "building_coverage"
if "baumassenziffer" in p or "bmz" in p:
return "building_mass_index"
if "grünflächen" in p or "gfz" in p:
return "green_space_index"
if "grenzabstand" in p or "grundabstand" in p:
return "boundary_distance"
if "mehrlängen" in p or "mlz" in p:
return "boundary_distance_length_surcharge"
if "höchstmass" in p:
return "boundary_distance_max"
if "gebäudelänge" in p:
return "building_length"
if "gebäudebreite" in p:
return "building_width"
if "fassadenhöhe" in p or "gebäudehöhe" in p:
return "max_building_height"
if "wohnflächenanteil" in p or "wohnanteil" in p:
return "residential_area_share"
return None
def _bzo_merge_rules(
from_tables: List[Dict[str, Any]],
from_rules: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Merge table params and rule params. Tables take precedence."""
by_param_lower: Dict[str, Dict[str, Any]] = {}
for r in from_tables:
p = (r.get("parameter") or "").lower()
if p and p not in by_param_lower:
rr = r.copy()
if not rr.get("rule_type"):
rr["rule_type"] = _bzo_param_to_rule_type(rr.get("parameter", ""))
by_param_lower[p] = rr
for r in from_rules:
p = (r.get("parameter") or "").lower()
if p and p not in by_param_lower:
by_param_lower[p] = r.copy()
return list(by_param_lower.values())
def _bzo_param_sort_key(param_name: str) -> int:
"""Order parameters by BZO_PARAM_ORDER."""
p = (param_name or "").lower()
for i, kw in enumerate(BZO_PARAM_ORDER):
if kw in p:
return i
return 99
def _bzo_extract_zusatzinformationen(
articles: List[Dict[str, Any]],
bauzone: str = "",
zone_parameter_tables: Optional[List[Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""Extract article excerpts relevant to the bauzone."""
result = []
seen = set()
patterns = [re.compile(kw, re.IGNORECASE) for kw in _ARTIKEL_KEYWORDS]
table_articles = set()
if zone_parameter_tables and bauzone:
for t in zone_parameter_tables:
if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in t.get("zones", [])):
continue
table_articles.add(t.get("article") or "")
for art in articles:
label = art.get("article_label") or ""
title = (art.get("article_title") or "").strip()
text = (art.get("text") or "").strip()
page = art.get("page_start") or art.get("page_end") or 0
if not label or not text:
continue
key = f"{label}|{page}"
if key in seen:
continue
combined = f"{title} {text}"
if not any(p.search(combined) for p in patterns):
continue
if bauzone:
if not _bzo_article_mentions_bauzone(combined, bauzone) and label not in table_articles:
continue
seen.add(key)
source = _bzo_build_source(page, label)
result.append({
"article_label": label,
"article_title": title,
"text": text[:3500].strip(),
"page": page,
"source": source or "BZO-Dokument",
})
return sorted(result, key=lambda x: (x.get("page", 0), x.get("article_label", "")))
def extract_wohnzone_params(
extracted_content: Dict[str, Any],
bauzone: str,
relevant_rules: Optional[List[Dict[str, Any]]] = None,
total_area_m2: Optional[float] = None,
) -> Dict[str, Any]:
"""
Extract BZO parameters for a Wohnzone from extracted content.
Returns ordered list of fakten (with sources) and zusatzinformationen.
"""
articles = extracted_content.get("articles", [])
zone_parameter_tables = extracted_content.get("zone_parameter_tables", [])
all_rules = extracted_content.get("rules", [])
rules_to_use = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone(all_rules, bauzone)
from_tables = _bzo_get_params_from_tables(zone_parameter_tables, bauzone)
from_rules = _bzo_get_params_from_rules(rules_to_use)
bauzone_rules = _bzo_merge_rules(from_tables, from_rules)
fakten = []
if bauzone:
fakten.append({"item": "Auswertung für Bauzone", "value": bauzone, "source": ""})
if total_area_m2 is not None and total_area_m2 > 0:
fakten.append({
"item": "Grundstücksfläche",
"value": f"{total_area_m2:,.0f}".replace(",", "'"),
"source": "Parzellendaten",
})
for r in sorted(bauzone_rules, key=lambda x: _bzo_param_sort_key(x.get("parameter", ""))):
param = r.get("parameter", "").strip()
val = r.get("value", "")
unit = (r.get("unit") or "").strip()
rule_type = r.get("rule_type") or _bzo_param_to_rule_type(param)
if not unit and rule_type:
unit = RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, "")
value_str = f"{val}{(' ' + unit) if unit else ''}".strip()
if param and value_str:
fakten.append({
"item": param,
"value": value_str,
"source": r.get("source") or "BZO-Dokument",
})
zusatzinformationen = _bzo_extract_zusatzinformationen(
articles, bauzone, zone_parameter_tables
)
return {
"bauzone": bauzone,
"fakten": fakten,
"zusatzinformationen": zusatzinformationen,
}
# ===== LangGraph: LLM-based BZO Params Extraction =====
def _build_bauzone_context_for_llm(state: BZOParamsExtractionState) -> str:
"""Build context string for LLM from extracted BZO content."""
bauzone = (state.get("bauzone") or "").upper()
zone_parameter_tables = state.get("zone_parameter_tables", [])
relevant_articles = state.get("relevant_articles", [])
relevant_rules = state.get("relevant_rules", [])
total_area_m2 = state.get("total_area_m2")
parts = []
if total_area_m2 is not None and total_area_m2 > 0:
parts.append(f"Grundstücksfläche der Parzelle: {total_area_m2:,.0f}".replace(",", "'"))
parts.append("")
# Full article texts - LLM can parse tables like Art. 14 (zones in rows, values in columns)
parts.append("=== ARTIKEL MIT VOLLEM TEXT (Tabellen genau lesen, richtige Spalte/Zeile für Bauzone wählen) ===")
for art in relevant_articles:
label = art.get("article_label", "")
title = (art.get("article_title") or "").strip()
text = art.get("text", "")
page = art.get("page_start") or art.get("page_end", 0)
parts.append(f"\n{label}: {title}")
parts.append(f"Seite: {page}")
parts.append(f"Inhalt:\n{text}")
parts.append("")
# Zone-parameter tables (pre-parsed)
if zone_parameter_tables:
parts.append("=== VORSTRUKTURIERTE TABELLENWERTE FÜR BAUZONE ===")
for table in zone_parameter_tables:
page = table.get("page", 0)
art = table.get("article", "")
parts.append(f"\n{art} (S. {page}):")
for param in table.get("parameters", []):
pname = param.get("parameter", "")
for zone, values in (param.get("values_by_zone") or {}).items():
if bauzone in (zone or "").upper():
if isinstance(values, list) and values:
v = values[0].get("value", "")
u = values[0].get("unit") or ""
parts.append(f" {pname} [{zone}]: {v} {u}".strip())
parts.append("")
# Rules from text
if relevant_rules:
parts.append("=== REGELN AUS ARTIKELTEXT ===")
for r in relevant_rules[:20]:
rt = r.get("rule_type", "")
vn = r.get("value_numeric")
vt = r.get("value_text", "")
u = r.get("unit", "")
page = r.get("page", 0)
art = r.get("article_label", "")
val = str(int(vn)) if vn is not None and isinstance(vn, float) and vn == int(vn) else (str(vn) if vn is not None else vt)
parts.append(f" {rt}: {val} {u} ({art}, S. {page})".strip())
return "\n".join(parts)
def _parse_llm_bullet_list(text: str) -> List[Dict[str, str]]:
"""Parse LLM response into fakten list. Expects lines like '- Param: value (Art. X, S. Y)'."""
fakten = []
for line in (text or "").strip().split("\n"):
line = line.strip()
if not line or not line.startswith("-"):
continue
line = line.lstrip("- ").strip()
# Match "Param: value (source)" or "Param: value"
match = re.match(r"^(.+?):\s*(.+?)(?:\s*\(([^)]+)\))?\s*$", line)
if match:
item = match.group(1).strip()
value = match.group(2).strip()
source = (match.group(3) or "").strip()
if item and value:
fakten.append({"item": item, "value": value, "source": source})
elif ":" in line:
idx = line.find(":")
fakten.append({
"item": line[:idx].strip(),
"value": line[idx + 1 :].strip(),
"source": "",
})
return fakten
async def _llm_filter_relevant_provisions(
ai_service: Any,
bauzone: str,
fakten: List[Dict[str, str]],
provision_articles: List[Dict[str, Any]],
) -> Optional[set]:
"""
Use LLM to determine which provision articles are relevant for a parcel in this bauzone.
Returns set of article labels (e.g. {"Art. 15", "Art. 16"}) or None to include all on error.
"""
if not provision_articles:
return set()
fakten_str = "\n".join(
f"- {f.get('item', '')}: {f.get('value', '')}" for f in fakten
if f.get("item") and "Auswertung" not in (f.get("item") or "")
)
articles_str = "\n".join(
f"- {a.get('article_label', '')}: {a.get('article_title', '')}"
for a in provision_articles
if a.get("article_label")
)
prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO).
Eine Parzelle liegt in der Bauzone {bauzone}. Folgende BZO-Parameter gelten für diese Zone:
{fakten_str}
Folgende Bestimmungen (Weiterführende Artikel) könnten zutreffen:
{articles_str}
AUFGABE: Welche dieser Artikel sind für eine Parzelle in Bauzone {bauzone} mit diesen Parametern TATSÄCHLICH RELEVANT?
- Nur Artikel angeben, die auf diese Zone/Parameter Bezug nehmen oder Bedingungen nennen, die hier greifen
- z.B. Art. 15 Herabsetzung: relevant wenn Vollgeschosse und Grenzabstand vorhanden (Reduktion bei weggelassenen Geschossen)
- z.B. Art. 16 Nutzweise: relevant für Wohnzonen mit Wohnanteil
- z.B. Art. 40 Wohnanteil: nur wenn dieser Artikel die Zone {bauzone} erwähnt oder für Wohnzonen gilt
- Artikel die andere Zonen betreffen (z.B. nur Z5, I) und {bauzone} ausschliessen: NICHT aufnehmen
Antwort NUR mit den relevanten Artikelnummern, eine pro Zeile (z.B. "Art. 15", "Art. 16"). Keine anderen Zeichen."""
try:
response = await ai_service.callAiPlanning(
prompt=prompt,
debugType="bzo_relevant_provisions",
)
labels = set()
for line in (response or "").strip().split("\n"):
m = re.search(r"(Art\.\s*\d+[a-z]?)", line.strip(), re.I)
if m:
lbl = re.sub(r"\s+", " ", m.group(1).strip())
labels.add(lbl)
return labels if labels else None # None = include all (fallback on error or empty)
except Exception as e:
logger.warning(f"LLM provision filter failed: {e}")
return None
async def llm_extract_bauzone_params_node(state: BZOParamsExtractionState) -> BZOParamsExtractionState:
"""LangGraph node: use LLM to extract BZO parameters for Bauzone as bullet list."""
bauzone = state.get("bauzone", "")
gemeinde = state.get("gemeinde", "")
ai_service = state.get("ai_service")
errors = list(state.get("errors", []))
if not ai_service:
errors.append("AI service not provided")
return {**state, "fakten": [], "bauzone_params_list": [], "errors": errors}
context = _build_bauzone_context_for_llm(state)
prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO). Extrahiere alle relevanten BZO-Parameter für die Bauzone {bauzone} in {gemeinde}.
BZO-INHALT:
{context}
AUFGABE: Erstelle eine geordnete Bullet-Liste ALLER zutreffenden Parameter für Bauzone {bauzone}.
Priorität: Vollgeschosse, anrechenbares Untergeschoss, anrechenbares Dachgeschoss, Ausnützungsziffer, Überbauungsziffer, Gebäudehöhe, Grundabstand/Grenzabstand, Gebäudelänge, Mehrlängenzuschlag, Höchstmass, sowie alle anderen Bestimmungen die für diese Zone gelten.
WICHTIG:
- Bei Tabellen: die richtige Spalte/Zeile für {bauzone} verwenden (z.B. Art. 14 Mehrlängenzuschlag: W5 = 13 m)
- Jede Zeile: "- Parametername: Wert (Art. X, S. Y)"
- Nur tatsächlich im Dokument vorhandene Werte angeben
- Einheit (m, %, Stk.) bei Zahlen mit angeben
- Keine leeren Zeilen oder Kommentare - nur die Liste
Antwort NUR mit der Bullet-Liste, sonst nichts:"""
try:
ai_response = await ai_service.callAiPlanning(
prompt=prompt,
debugType="bzo_params_extraction",
)
response_text = (ai_response or "").strip()
# Parse into fakten
fakten = _parse_llm_bullet_list(response_text)
# Build bauzone_params_list (raw "- ..." strings)
bauzone_params_list = [f"- {f['item']}: {f['value']}" + (f" ({f['source']})" if f.get("source") else "") for f in fakten]
# Add header items if missing
if bauzone and not any("Auswertung" in (f.get("item") or "") for f in fakten):
fakten.insert(0, {"item": "Auswertung für Bauzone", "value": bauzone, "source": ""})
total_area_m2 = state.get("total_area_m2")
if total_area_m2 is not None and total_area_m2 > 0 and not any("Grundstücksfläche" in (f.get("item") or "") for f in fakten):
fakten.insert(1, {
"item": "Grundstücksfläche",
"value": f"{total_area_m2:,.0f}".replace(",", "'"),
"source": "Parzellendaten",
})
# Zusatzinformationen: only provisions RELEVANT for this parcel in this bauzone
all_articles = state.get("extracted_content", {}).get("articles", []) or state.get("relevant_articles", [])
provision_articles = [a for a in all_articles if _is_zusatzinfo_article((a.get("article_title") or "").strip())]
relevant_labels = await _llm_filter_relevant_provisions(
ai_service=ai_service,
bauzone=bauzone,
fakten=fakten,
provision_articles=provision_articles,
)
def _norm_label(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip())
zusatzinformationen = []
for art in provision_articles:
label = art.get("article_label", "")
title = (art.get("article_title") or "").strip()
norm = _norm_label(label)
if relevant_labels is not None and norm and norm not in relevant_labels:
continue
raw_text = (art.get("text") or "")[:4000]
text = _format_article_text_readable(
raw_text,
article_label=label,
article_title=title,
)
if not text:
continue
page = art.get("page_start") or art.get("page_end", 0)
source = f"{label}, S. {page}" if label else f"S. {page}"
zusatzinformationen.append({
"article_label": label,
"article_title": title,
"text": text,
"source": source,
})
return {
**state,
"fakten": fakten,
"bauzone_params_list": bauzone_params_list,
"zusatzinformationen": zusatzinformationen,
"errors": errors,
}
except Exception as e:
logger.error(f"LLM BZO params extraction failed: {e}", exc_info=True)
errors.append(str(e))
return {
**state,
"fakten": [],
"bauzone_params_list": [],
"zusatzinformationen": [],
"errors": errors,
}
def create_bzo_params_extraction_graph():
"""Create LangGraph for LLM-based BZO params extraction."""
workflow = StateGraph(BZOParamsExtractionState)
workflow.add_node("llm_extract", llm_extract_bauzone_params_node)
workflow.set_entry_point("llm_extract")
workflow.add_edge("llm_extract", END)
return workflow.compile()
def _filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""Filter articles that mention the Bauzone."""
bauzone_upper = (bauzone or "").upper()
return [
a for a in articles
if bauzone_upper in (a.get("text") or "").upper() or bauzone_upper in (a.get("zone_raw") or "").upper()
]
def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""Filter zone-parameter tables to those containing the Bauzone."""
bauzone_upper = (bauzone or "").upper()
relevant = []
for table in tables:
zones = table.get("zones", [])
matching = [z for z in zones if bauzone_upper in str(z).upper()]
if matching:
filtered = {
"page": table.get("page"),
"article": table.get("article"),
"zones": matching,
"parameters": [
{"parameter": p.get("parameter"), "values_by_zone": {
z: v for z, v in (p.get("values_by_zone") or {}).items()
if bauzone_upper in str(z).upper()
}}
for p in table.get("parameters", [])
if any(bauzone_upper in str(z).upper() for z in (p.get("values_by_zone") or {}))
],
}
filtered["parameters"] = [x for x in filtered["parameters"] if x["values_by_zone"]]
if filtered["parameters"]:
relevant.append(filtered)
return relevant
async def run_bzo_params_extraction(
extracted_content: Dict[str, Any],
bauzone: str,
ai_service: Any,
gemeinde: str,
relevant_rules: Optional[List[Dict[str, Any]]] = None,
relevant_articles: Optional[List[Dict[str, Any]]] = None,
total_area_m2: Optional[float] = None,
) -> Dict[str, Any]:
"""
Run LangGraph workflow to extract BZO parameters for a Bauzone via LLM.
Returns fakten (item/value/source), bauzone_params_list (bullet strings), zusatzinformationen.
"""
rules = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone(
extracted_content.get("rules", []), bauzone
)
articles = relevant_articles if relevant_articles is not None else _filter_articles_by_bauzone(
extracted_content.get("articles", []), bauzone
)
tables = _filter_tables_by_bauzone(
extracted_content.get("zone_parameter_tables", []),
bauzone
)
state: BZOParamsExtractionState = {
"extracted_content": extracted_content,
"bauzone": bauzone,
"total_area_m2": total_area_m2,
"relevant_rules": rules,
"relevant_articles": articles,
"zone_parameter_tables": tables,
"ai_service": ai_service,
"gemeinde": gemeinde,
"bauzone_params_list": [],
"fakten": [],
"zusatzinformationen": [],
"errors": [],
}
graph = create_bzo_params_extraction_graph()
final_state = await graph.ainvoke(state)
return {
"bauzone": bauzone,
"fakten": final_state.get("fakten", []),
"bauzone_params_list": final_state.get("bauzone_params_list", []),
"zusatzinformationen": final_state.get("zusatzinformationen", []),
"errors": final_state.get("errors", []),
}
# ===== Graph Construction =====
def create_bzo_extraction_graph():
"""Create and compile the BZO extraction graph (simplified 4-node pipeline)."""
workflow = StateGraph(BZOExtractionState)
workflow.add_node("classify_and_assemble", classify_and_assemble)
workflow.add_node("extract_zones_and_tables", extract_zones_and_tables)
workflow.add_node("extract_rules", extract_rules)
workflow.set_entry_point("classify_and_assemble")
workflow.add_edge("classify_and_assemble", "extract_zones_and_tables")
workflow.add_edge("extract_zones_and_tables", "extract_rules")
workflow.add_edge("extract_rules", END)
return workflow.compile()
def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None) -> Dict[str, Any]:
"""
Run the extraction pipeline on a PDF and return structured, sorted results.
Args:
pdf_bytes: PDF file content as bytes
pdf_id: Optional identifier for the PDF (defaults to generated ID)
dokument_id: Optional dokument ID for reference
Returns:
Dictionary with extracted and sorted content:
{
"articles": [...], # Sorted by page_start, then article_label
"zones": [...], # Sorted by zone_code
"rules": [...], # Sorted by rule_type, then page
"errors": [...],
"warnings": [...]
}
"""
import uuid
if not pdf_id:
pdf_id = f"pdf_{uuid.uuid4().hex[:8]}"
# Initialize state
state: BZOExtractionState = {
"dokument_id": dokument_id,
"pdf_id": pdf_id,
"text_blocks": [],
"classified_blocks": [],
"articles": [],
"current_zones": {},
"zones": [],
"rule_candidates": [],
"parsed_rules": [],
"zone_parameter_tables": [],
"errors": [],
"warnings": []
}
# Extract PDF text first
pdf_extractor = BZOPdfExtractor()
text_blocks_objects = pdf_extractor.extract_text_blocks(pdf_bytes, state["pdf_id"])
# Convert TextBlock objects to dicts for state
state["text_blocks"] = [
{
"page": tb.page,
"text": tb.text,
"block_id": tb.block_id,
"bbox": tb.bbox
}
for tb in text_blocks_objects
]
# Create and run graph
graph = create_bzo_extraction_graph()
final_state = graph.invoke(state)
# Sort and structure results
articles = sorted(
final_state.get("articles", []),
key=lambda x: (x.get("page_start", 0), x.get("article_label", ""))
)
zones = sorted(
final_state.get("zones", []),
key=lambda x: x.get("zone_code", "")
)
rules = sorted(
final_state.get("parsed_rules", []),
key=lambda x: (x.get("rule_type", ""), x.get("page", 0))
)
zone_parameter_tables = final_state.get("zone_parameter_tables", [])
return {
"articles": articles,
"zones": zones,
"rules": rules,
"zone_parameter_tables": zone_parameter_tables,
"errors": final_state.get("errors", []),
"warnings": final_state.get("warnings", [])
}
def extract_from_documents(
document_retriever,
dokument_ids: List[str]
) -> Dict[str, Any]:
"""
Extract BZO content from one or more documents.
Args:
document_retriever: BZODocumentRetriever instance
dokument_ids: List of dokument IDs to process
Returns:
Dictionary with results per document:
{
"results": [
{
"dokument_id": "...",
"articles": [...],
"zones": [...],
"rules": [...],
"errors": [...],
"warnings": [...]
},
...
],
"summary": {
"total_documents": N,
"successful": M,
"failed": K,
"total_articles": X,
"total_zones": Y,
"total_rules": Z
}
}
"""
results = []
total_articles = 0
total_zones = 0
total_rules = 0
successful = 0
failed = 0
# Retrieve documents
dokumente = document_retriever.get_documents_by_ids(dokument_ids)
for dokument in dokumente:
try:
# Retrieve PDF content
pdf_bytes = document_retriever.retrieve_pdf_content(dokument)
if not pdf_bytes:
logger.warning(f"Could not retrieve PDF for dokument {dokument.id}")
results.append({
"dokument_id": dokument.id,
"articles": [],
"zones": [],
"rules": [],
"errors": [f"Could not retrieve PDF content"],
"warnings": []
})
failed += 1
continue
# Run extraction
extraction_result = run_extraction(
pdf_bytes=pdf_bytes,
pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}",
dokument_id=dokument.id
)
# Add dokument_id to result
extraction_result["dokument_id"] = dokument.id
results.append(extraction_result)
# Update counters
total_articles += len(extraction_result.get("articles", []))
total_zones += len(extraction_result.get("zones", []))
total_rules += len(extraction_result.get("rules", []))
if extraction_result.get("errors"):
failed += 1
else:
successful += 1
except Exception as e:
logger.error(f"Error processing dokument {dokument.id}: {str(e)}", exc_info=True)
results.append({
"dokument_id": dokument.id,
"articles": [],
"zones": [],
"rules": [],
"errors": [f"Processing error: {str(e)}"],
"warnings": []
})
failed += 1
return {
"results": results,
"summary": {
"total_documents": len(dokument_ids),
"successful": successful,
"failed": failed,
"total_articles": total_articles,
"total_zones": total_zones,
"total_rules": total_rules
}
}