platform-core/modules/features/realEstate/bzoExtraction.py
ValueOn AG bc7c6fe27c
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 13s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
elimination of technical issues (imports)
2026-06-06 00:32:45 +02:00

1483 lines
60 KiB
Python

"""
Pipeline for extracting structured content from BZO PDFs.
The extraction runs as a plain sequential pipeline of step functions; each step
takes the shared state dict, mutates/returns it, and the steps are chained
directly (no external workflow-orchestration framework).
"""
import logging
import re
import uuid
from typing import TypedDict, List, Dict, Any, Optional
from dataclasses import dataclass
from modules.features.realEstate.bzoPdfExtractor import BZOPdfExtractor, TextBlock
from modules.features.realEstate.bzoRuleTaxonomy import RULE_TAXONOMY
logger = logging.getLogger(__name__)
# ===== BZO Params Extraction State (LLM step) =====
class BZOParamsExtractionState(TypedDict):
"""State for BZO params extraction via LLM."""
extracted_content: Dict[str, Any]
bauzone: str
total_area_m2: Optional[float]
relevant_rules: List[Dict[str, Any]]
relevant_articles: List[Dict[str, Any]]
zone_parameter_tables: List[Dict[str, Any]]
ai_service: Any
gemeinde: str
# Output
bauzone_params_list: List[str]
fakten: List[Dict[str, str]]
zusatzinformationen: List[Dict[str, Any]]
errors: List[str]
# ===== State Definition =====
@dataclass
class ClassifiedBlock:
"""Classified text block."""
block: TextBlock
block_type: str # "article", "heading", "table", "other"
article_label: Optional[str] = None
article_title: Optional[str] = None
@dataclass
class Article:
"""Assembled article."""
article_label: str
article_title: Optional[str]
text: str
page_start: int
page_end: int
section_level_1: Optional[str] = None
section_level_2: Optional[str] = None
section_level_3: Optional[str] = None
zone_raw: Optional[str] = None
@dataclass
class ZoneInfo:
"""Zone information."""
zone_code: str
zone_name: str
zone_category: Optional[str] = None
zone_subcategory: Optional[str] = None
empfindlichkeitsstufe: Optional[str] = None
geschosszahl: Optional[int] = None
gewerbeerleichterung: bool = False
@dataclass
class RuleCandidate:
"""Rule candidate from pattern matching."""
rule_type: str
matched_text: str
article_text: str
page: int
is_table_rule: bool = False
table_zones: List[str] = None
condition_text: Optional[str] = None
@dataclass
class ParsedRule:
"""Parsed rule with structured values."""
rule_type: str
value_numeric: Optional[float]
value_text: str
unit: Optional[str]
condition_text: Optional[str]
is_table_rule: bool
table_zones: List[str]
page: int
text_snippet: str
zone_raw: Optional[str] = None
rule_scope: str = "general"
confidence: float = 0.5
class BZOExtractionState(TypedDict):
"""State for BZO extraction pipeline."""
# Input metadata
dokument_id: Optional[str]
pdf_id: str
# Extracted text blocks (stored as dicts for serialization)
text_blocks: List[Dict[str, Any]]
# Classified blocks (stored as dicts for serialization)
classified_blocks: List[Dict[str, Any]]
# Assembled articles (stored as dicts for serialization)
articles: List[Dict[str, Any]]
# Zone tracking
current_zones: Dict[str, Dict[str, Any]]
zones: List[Dict[str, Any]]
# Rule extraction (stored as dicts for serialization)
rule_candidates: List[Dict[str, Any]]
parsed_rules: List[Dict[str, Any]]
# Zone-parameter tables (structured table data mapping zones to parameters)
zone_parameter_tables: List[Dict[str, Any]]
# Processing metadata
errors: List[str]
warnings: List[str]
# ===== Node Implementations (Simplified 4-node pipeline) =====
def classify_and_assemble(state: BZOExtractionState) -> BZOExtractionState:
"""Classify text blocks and assemble into articles (merged node)."""
try:
classified = []
for block_dict in state["text_blocks"]:
text = block_dict["text"].strip()
if not text:
continue
block_type = "other"
article_label = None
article_title = None
article_match = re.search(r'Art\.?\s*(\d+[a-z]?)', text, re.IGNORECASE)
if article_match:
block_type = "article"
article_label = f"Art. {article_match.group(1)}"
title_match = re.search(r'Art\.?\s*\d+[a-z]?\s+(.+?)(?:\.|$|\n)', text, re.IGNORECASE)
if title_match:
article_title = title_match.group(1).strip()
elif re.match(r'^[A-Z]\.\s+[A-Z]', text) or re.match(r'^[IVX]+\.\s+[A-Z]', text) or re.match(r'^\d+\.\s+[A-Z]', text):
block_type = "heading"
elif '\t' in text or (len(text.split()) > 5 and text.count(' ') > 2):
block_type = "table"
classified.append({
"block": {"page": block_dict["page"], "text": block_dict["text"], "block_id": block_dict["block_id"], "bbox": block_dict.get("bbox")},
"block_type": block_type, "article_label": article_label, "article_title": article_title
})
state["classified_blocks"] = classified
articles = []
current_article = None
current_section_1 = current_section_2 = current_section_3 = None
for classified_dict in classified:
block_dict = classified_dict["block"]
text = block_dict["text"].strip()
block_type = classified_dict["block_type"]
article_label = classified_dict.get("article_label")
article_title = classified_dict.get("article_title")
if block_type == "heading":
if re.match(r'^[A-Z]\.\s+', text):
current_section_1 = text.split('.', 1)[0] + '.'
current_section_2 = current_section_3 = None
elif re.match(r'^[IVX]+\.\s+', text):
current_section_2 = text.split('.', 1)[0] + '.'
current_section_3 = None
elif re.match(r'^\d+\.\s+', text):
current_section_3 = text.split('.', 1)[0] + '.'
if article_label:
if current_article:
articles.append(current_article)
current_article = {
"article_label": article_label, "article_title": article_title, "text": text,
"page_start": block_dict["page"], "page_end": block_dict["page"],
"section_level_1": current_section_1, "section_level_2": current_section_2,
"section_level_3": current_section_3, "zone_raw": None
}
elif current_article:
current_article["text"] += "\n" + text
current_article["page_end"] = block_dict["page"]
if current_article:
articles.append(current_article)
state["articles"] = articles
return state
except Exception as e:
logger.error(f"Error in classify_and_assemble: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Classify/assemble error: {str(e)}"]
return state
def extract_zones_and_tables(state: BZOExtractionState) -> BZOExtractionState:
"""Detect zones and extract zone-parameter tables (merged node)."""
try:
# Part 1: Detect zone declarations
zones = []
current_zones = {}
for article_dict in state["articles"]:
text = article_dict.get("text", "")
article_label = article_dict.get("article_label", "")
page_start = article_dict.get("page_start", 0)
# Pattern: "Wohnzone W2", "Zone W3", "Gewerbezone G1"
zone_patterns = [
r'(?:Wohnzone|Zone|Gewerbezone|Industriezone|Zentrumszone|Ortsbildschutzzone|Erholungszone)\s+([A-Z0-9/]+)',
r'([A-Z]\d+(?:/\d+)?(?:G)?)', # W2/30, W2/30G, Z3, K3/4
]
for pattern in zone_patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
zone_code = match.group(1).upper()
# Parse zone code
gewerbeerleichterung = zone_code.endswith('G')
if gewerbeerleichterung:
zone_code_base = zone_code[:-1]
else:
zone_code_base = zone_code
# Extract geschosszahl from code (e.g., W2 -> 2, W3/50 -> 3)
geschosszahl = None
if '/' in zone_code_base:
parts = zone_code_base.split('/')
geschosszahl_match = re.search(r'(\d+)', parts[0])
if geschosszahl_match:
geschosszahl = int(geschosszahl_match.group(1))
else:
geschosszahl_match = re.search(r'(\d+)', zone_code_base)
if geschosszahl_match:
geschosszahl = int(geschosszahl_match.group(1))
# Determine zone category from context
zone_category = None
if 'Wohnzone' in text or zone_code.startswith('W'):
zone_category = "Wohnzonen"
elif 'Zentrumszone' in text or zone_code.startswith('Z'):
zone_category = "Zentrumszonen"
elif 'Gewerbezone' in text or zone_code.startswith('G'):
zone_category = "Arbeitsplatzzonen"
elif 'Industriezone' in text or zone_code.startswith('I'):
zone_category = "Arbeitsplatzzonen"
zone_info = ZoneInfo(
zone_code=zone_code,
zone_name=f"Zone {zone_code}",
zone_category=zone_category,
geschosszahl=geschosszahl,
gewerbeerleichterung=gewerbeerleichterung
)
current_zones[zone_code] = zone_info
zones.append({
"zone_code": zone_code,
"zone_name": zone_info.zone_name,
"zone_category": zone_category,
"geschosszahl": geschosszahl,
"gewerbeerleichterung": gewerbeerleichterung,
"source_article": article_label,
"page": page_start
})
state["current_zones"] = current_zones
state["zones"] = zones
# Part 2: Extract zone-parameter tables
_extract_zone_parameter_tables_impl(state)
return state
except Exception as e:
logger.error(f"Error in extract_zones_and_tables: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Zones/tables error: {str(e)}"]
return state
def _extract_zone_parameter_tables_impl(state: BZOExtractionState) -> None:
"""Extract zone-parameter tables from classified blocks. Mutates state in place."""
tables = []
table_blocks = [b for b in state.get("classified_blocks", []) if b.get("block_type") == "table"]
zone_pattern = r'\b([WLIZK]\d+(?:/\d+)?(?:G\*?)?)\b'
parameter_keywords = [
r'Ausnützungsziffer', r'Überbauungsziffer', r'Vollgeschosse', r'Dachgeschosse', r'Attikageschoss', r'Untergeschoss',
r'Gebäudelänge', r'Grenzabstand', r'Fassadenhöhen', r'Grundabstand', r'Mehrlängen', r'Höchstmass'
]
parameter_row_patterns = [
r'^[a-g]\)\s+(.+?)(?:\s+max\.|min\.|:)?',
r'^(Ausnützungsziffer|Überbauungsziffer|Vollgeschosse|Dachgeschosse|Attikageschoss|Untergeschoss|Gebäudelänge|Grenzabstand|Fassadenhöhen|Grundabstand|Mehrlängen|Höchstmass|Höchstmaß)',
]
subparameter_patterns = [
r'^(Grundabstand|Mehrlängen|Höchstmass|Höchstmaß|Fassadenhöhen)\s*(min\.|max\.)?',
r'^(anrechenbare\s+Dachgeschosse|anrechenbares\s+Attikageschoss|anrechenbares\s+Untergeschoss)',
]
numeric_pattern = r'(\d+(?:\.\d+)?)\s*(%|m|Geschoss|Geschosse|Geschosse\s+max\.?|Geschoss\s+max\.?)?'
for table_block in table_blocks:
block_dict = table_block.get("block", {})
text = block_dict.get("text", "")
page = block_dict.get("page", 0)
if not text or len(text.strip()) < 20:
continue
lines = text.split('\n')
header_row_idx, zone_columns = None, []
for idx, line in enumerate(lines):
zone_matches = re.findall(zone_pattern, line, re.IGNORECASE)
if len(zone_matches) >= 3:
header_row_idx, zone_columns = idx, zone_matches
break
if not zone_columns:
has_parameters = any(re.search(kw, text, re.IGNORECASE) for kw in parameter_keywords)
has_zones = len(re.findall(zone_pattern, text, re.IGNORECASE)) >= 3
if has_parameters and has_zones:
zone_columns = list(dict.fromkeys(re.findall(zone_pattern, text, re.IGNORECASE)))
header_row_idx = 0
if not zone_columns:
continue
article_context = None
for block in state.get("classified_blocks", []):
if block.get("block", {}).get("page") == page and block.get("article_label"):
article_context = block.get("article_label")
break
table_data = {"page": page, "zones": zone_columns, "parameters": [], "source_text": text[:500], "article": article_context}
start_idx = (header_row_idx + 1) if header_row_idx is not None else 0
current_parameter = current_subparameter = None
parameter_values = subparameter_values = {}
for line_idx in range(start_idx, len(lines)):
line = lines[line_idx].strip()
if not line:
continue
is_parameter_row, parameter_name = False, None
for pat in parameter_row_patterns:
m = re.match(pat, line, re.IGNORECASE)
if m:
is_parameter_row, parameter_name = True, re.sub(r'\s+max\.?\s*$', '', re.sub(r'\s+min\.?\s*$', '', m.group(1).strip(), flags=re.I), flags=re.I)
break
is_subparameter, subparameter_name = False, None
if not is_parameter_row:
for pat in subparameter_patterns:
m = re.search(pat, line, re.IGNORECASE)
if m:
is_subparameter, subparameter_name = True, m.group(1).strip() + (f" {m.group(2).strip()}" if m.lastindex and m.lastindex >= 2 and m.group(2) else "")
break
target_values = subparameter_values if current_subparameter else parameter_values
if is_parameter_row and parameter_name:
if current_parameter and parameter_values:
table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context})
current_parameter, current_subparameter, parameter_values, subparameter_values = parameter_name, None, {}, {}
continue
if is_subparameter and subparameter_name:
if current_subparameter and subparameter_values and current_parameter:
table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context})
current_subparameter, subparameter_values = subparameter_name, {}
continue
if current_parameter or current_subparameter:
line_parts = re.split(r'\s{2,}|\t', line)
line_parts = [p.strip() for p in line_parts if p.strip()]
n = len(zone_columns)
value_parts = []
# Column-based: extract trailing numeric/fraction parts that align with zone count
for p in reversed(line_parts):
if re.match(r'^\d+(?:\.\d+)?\s*(%|m)?$', p, re.I) or re.match(r'^\d+/\d+$', p):
val = re.sub(r'\s*(%|m)$', '', p, flags=re.I).strip()
unit = None
um = re.search(r'\s*(%|m)$', p, re.I)
if um:
unit = 'm' if um.group(1).lower() == 'm' else '%'
value_parts.insert(0, (val, unit))
else:
break
if len(value_parts) == n:
for zi, zone in enumerate(zone_columns):
if zone not in target_values:
target_values[zone] = []
val, unit = value_parts[zi]
target_values[zone].append({"value": val, "unit": unit, "raw_text": line[:200], "line_number": line_idx})
else:
# Fallback: regex match by character position
all_matches = [(m.start(), m.group(0), m.group(1), m.group(2) if m.lastindex and m.lastindex > 1 else None) for m in re.finditer(numeric_pattern, line, re.I)]
all_matches += [(m.start(), m.group(0), m.group(0), None) for m in re.finditer(r'(\d+/\d+)', line, re.I)]
all_matches.sort(key=lambda x: x[0])
if len(all_matches) == n:
for zi, zone in enumerate(zone_columns):
if zone not in target_values:
target_values[zone] = []
_, _, val, unit = all_matches[zi]
target_values[zone].append({"value": val, "unit": unit.strip() if unit else None, "raw_text": line[:200], "line_number": line_idx})
if current_subparameter and subparameter_values and current_parameter:
table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context})
if current_parameter and parameter_values:
table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context})
if table_data["parameters"]:
tables.append(table_data)
state["zone_parameter_tables"] = state.get("zone_parameter_tables", []) + tables
if tables:
logger.info(f"Extracted {len(tables)} zone-parameter tables")
# Zone code pattern: W5, W2/30, Z3, K3/4, W5G, W 5 (optional space)
_ZONE_CODE_PATTERN = re.compile(r'\b([WZIK]\s*\d+(?:\s*/\s*\d+)?(?:G)?)\b', re.IGNORECASE)
def _zones_in_text(text: str) -> List[str]:
"""Extract zone codes (W5, W2/30, Z3, etc.) from text. Returns unique list, normalized (e.g. W5)."""
matches = _ZONE_CODE_PATTERN.findall(text)
seen = set()
result = []
for m in matches:
# Normalize: remove spaces -> W5, W2/30
n = re.sub(r'\s+', '', m).upper()
if n and n not in seen:
seen.add(n)
result.append(n)
return result
def extract_rules(state: BZOExtractionState) -> BZOExtractionState:
"""Detect rule candidates and parse values. Associates each rule with zones from its source article."""
try:
candidates = []
for article_dict in state["articles"]:
text = article_dict.get("text", "")
page_start = article_dict.get("page_start", 0)
# Zones mentioned in THIS article - rules from this article apply to these zones
article_zones = _zones_in_text(text)
for rule_type, rule_config in RULE_TAXONOMY.items():
for pattern in rule_config.get("patterns", []):
for match in re.finditer(pattern, text, re.IGNORECASE):
start, end = max(0, match.start() - 100), min(len(text), match.end() + 100)
context = text[start:end]
condition_text = None
for cond_pat in [r'(?:nördlich|südlich|östlich|westlich|oberhalb|unterhalb)\s+[^,\.]+', r'(?:für|bei|in)\s+[^,\.]+']:
cm = re.search(cond_pat, context, re.IGNORECASE)
if cm:
condition_text = cm.group(0)
break
candidates.append({
"rule_type": rule_type, "matched_text": match.group(0), "article_text": text,
"page": page_start, "article_label": article_dict.get("article_label"),
"condition_text": condition_text, "is_table_rule": False,
"table_zones": article_zones.copy(),
})
parsed_rules = []
for candidate_dict in candidates:
rule_type = candidate_dict["rule_type"]
rule_config = RULE_TAXONOMY.get(rule_type, {})
units = rule_config.get("units", [])
value_type = rule_config.get("value_type", "numeric")
# Extract value using regex
matched_text = candidate_dict["matched_text"]
article_text = candidate_dict["article_text"]
text = matched_text + " " + article_text[article_text.find(matched_text):article_text.find(matched_text) + 200]
value_numeric = None
value_text = matched_text
unit = None
# Try to extract numeric value
if value_type in ["numeric", "integer"]:
# Pattern: "max. 4", "30 %", "min. 3.5 m"
value_patterns = [
r'(?:max|maximal|min|mindestens|höchstens)\s*\.?\s*(\d+(?:\.\d+)?)',
r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)',
r'(\d+(?:\.\d+)?)',
]
for pattern in value_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
try:
value_numeric = float(match.group(1))
if value_type == "integer":
value_numeric = int(value_numeric)
# Check for unit
unit_match = re.search(r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)', text, re.IGNORECASE)
if unit_match:
unit = unit_match.group(2).lower()
if unit in ["meter", "metern"]:
unit = "m"
elif unit == "prozent":
unit = "%"
break
except ValueError:
continue
# Calculate confidence
confidence = 0.5
if value_numeric is not None:
confidence = 0.8
if unit:
confidence = 0.9
# Zone association from source article (zones mentioned in that article)
article_zones = candidate_dict.get("table_zones", [])
zone_raw = article_zones[0] if article_zones else None
rule_scope = "zone" if zone_raw else "general"
parsed_rule = {
"rule_type": rule_type,
"value_numeric": value_numeric,
"value_text": value_text,
"unit": unit,
"condition_text": candidate_dict.get("condition_text"),
"is_table_rule": candidate_dict.get("is_table_rule", False),
"table_zones": article_zones,
"page": candidate_dict["page"],
"article_label": candidate_dict.get("article_label"),
"text_snippet": value_text,
"zone_raw": zone_raw,
"rule_scope": rule_scope,
"confidence": confidence
}
parsed_rules.append(parsed_rule)
state["parsed_rules"] = parsed_rules
return state
except Exception as e:
logger.error(f"Error in extract_rules: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Extract rules error: {str(e)}"]
return state
# ===== Wohnzone Parameter Extraction =====
# Canonical order for BZO parameters (Fakten)
BZO_PARAM_ORDER = [
"vollgeschosse", "vollgeschoss",
"anrechenbares untergeschoss", "untergeschoss",
"anrechenbares dachgeschoss", "dachgeschoss", "attikageschoss",
"ausnützungsziffer", "ausnutzungsziffer", "az",
"überbauungsziffer",
"gebäudehöhe", "fassadenhöhen",
"grundabstand", "grenzabstand",
"gebäudelänge",
"mehrlängen", "höchstmass",
"baumassenziffer", "grünflächenziffer", "wohnflächenanteil", "gebäudebreite",
]
RULE_TYPE_TO_PARAM: Dict[str, str] = {
"max_building_height": "Gebäudehöhe max.",
"max_floors": "Vollgeschosse max.",
"max_attachable_attics": "anrechenbares Dachgeschoss max.",
"max_attachable_basement": "anrechenbares Untergeschoss max.",
"density": "Ausnützungsziffer",
"building_coverage": "Überbauungsziffer",
"building_mass_index": "Baumassenziffer (BMZ)",
"green_space_index": "Grünflächenziffer (GFZ)",
"boundary_distance": "Grundabstand min.",
"boundary_distance_length_surcharge": "Mehrlängen-zuschlag (MLZ)",
"boundary_distance_max": "Höchstmass Grenzabstand max.",
"building_length": "Gebäudelänge max.",
"building_width": "Gebäudebreite max.",
"residential_area_share": "Wohnflächenanteil",
}
RULE_TYPE_TO_DEFAULT_UNIT: Dict[str, str] = {
"max_building_height": "m",
"max_floors": "Stk.",
"max_attachable_attics": "Stk.",
"max_attachable_basement": "Stk.",
"density": "%",
"building_coverage": "%",
"building_mass_index": "",
"green_space_index": "%",
"boundary_distance": "m",
"boundary_distance_length_surcharge": "",
"boundary_distance_max": "m",
"building_length": "m",
"building_width": "m",
"residential_area_share": "%",
}
_ARTIKEL_KEYWORDS = [
r"herabsetzung", r"grenzabstand", r"nutzweise", r"wohnanteil",
r"besondere\s+gebäude", r"überbauungsziffer", r"sonderregel",
r"ausnahmen", r"abweichungen", r"erleichterungen",
r"mischung", r"gewerbe", r"dienstleistung",
r"kantonale", r"abstandsvorschriften",
r"vollgeschoss", r"reduziert", r"mindestmass",
r"störend", r"nicht\s+störend", r"mässig\s+störend",
]
# Artikel that are parameter tables - EXCLUDE from Weiterführende Bestimmungen
_ZUSATZ_EXCLUDE_TITLES = ("zonen", "grundmasse", "mehrlängenzuschlag", "mehrlaengenzuschlag")
# Artikel that are substantive provisions - INCLUDE in Weiterführende Bestimmungen
_ZUSATZ_INCLUDE_TITLES = (
"herabsetzung", "nutzweise", "besondere", "besonderes",
"ausnahmen", "abweichungen", "erleichterungen", "sonderregel",
"wohnanteil", "nutzungsart", "abstandsvorschriften",
"mischung", "gewerbe", "dienstleistung",
)
def _format_article_text_readable(text: str, article_label: str = "", article_title: str = "") -> str:
"""Format raw PDF-extracted text for readable display."""
if not text or not text.strip():
return ""
# Strip redundant article header at start (e.g. "Art. 16 Nutzweise" when already in summary)
if article_label or article_title:
prefix = f"{article_label} {article_title}".strip()
if prefix:
pat = re.escape(prefix)
text = re.sub(rf"^{pat}\s*", "", text.strip(), flags=re.I).lstrip()
lines = []
for line in text.split("\n"):
line = line.strip()
if not line:
continue
lines.append(line)
if not lines:
return ""
# Join hyphenated word breaks (e.g. "Gewerbe-\nund" -> "Gewerbe und")
merged = []
i = 0
while i < len(lines):
line = lines[i]
while line.rstrip().endswith("-") and i + 1 < len(lines):
line = line.rstrip()[:-1] + lines[i + 1].strip()
i += 1
if re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines):
next_line = lines[i + 1]
if not re.match(r"^Art\.\s", next_line) and len(next_line) > 3:
line = line + " " + next_line.strip()
i += 1
elif re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines) and re.match(r"^Art\.\s", lines[i + 1]):
i += 1
continue
merged.append(line)
i += 1
combined = " ".join(merged)
# Fix run-together paragraph numbers: "1In" -> "1. In", "2Ist" -> "2. Ist"
combined = re.sub(r"(\d)([A-ZÄÖÜ])", r"\1. \2", combined)
# Also fix "a)Something" -> "a) Something" for subparagraphs
combined = re.sub(r"([a-z]\))([A-ZÄÖÜ])", r"\1 \2", combined)
# Split into paragraphs: numbered (1. ..., 2. ...) or lettered (a) ..., b) ...)
parts = re.split(r"(?=\d+\.\s+[A-ZÄÖÜa-zäöü])|(?=[a-z]\)\s+[A-ZÄÖÜa-zäöü])", combined)
paragraphs = []
for p in parts:
p = p.strip()
if not p or len(p) < 3:
continue
paragraphs.append(p)
return "\n\n".join(paragraphs)
def _is_zusatzinfo_article(title: str) -> bool:
"""True if article should appear in Weiterführende Bestimmungen (provisions, not param tables)."""
t = (title or "").lower().strip()
for exc in _ZUSATZ_EXCLUDE_TITLES:
if exc in t:
return False
for inc in _ZUSATZ_INCLUDE_TITLES:
if inc in t:
return True
return False
def _bzo_build_source(page: Optional[int], article: Optional[str]) -> str:
"""Build source string: Art. X, S. Y"""
parts = []
if article:
parts.append(str(article))
if page is not None and page > 0:
parts.append(f"S. {page}")
return ", ".join(parts) if parts else ""
def _bzo_zone_matches_table(bauzone: str, zone_col: str) -> bool:
"""Check if table column zone matches target bauzone."""
b = (bauzone or "").upper().strip()
z = (zone_col or "").upper().strip()
if not b or not z:
return False
return b in z or (len(z) >= 2 and z in b)
def _bzo_article_mentions_bauzone(article_text: str, bauzone: str) -> bool:
"""Check if article text mentions the bauzone or applies to it."""
if not bauzone or not article_text:
return False
b = bauzone.upper().strip()
t = article_text.upper()
if b in t:
return True
if len(b) >= 2 and b[0] in "WZIK" and re.search(rf"\b{b[0]}\s*\d+", t):
base = re.sub(r"\s+", "", b.split("/")[0].rstrip("G"))
if base in t or re.search(rf"\b{base}\b", t):
return True
return False
def _bzo_get_params_from_tables(
zone_parameter_tables: List[Dict[str, Any]],
bauzone: str
) -> List[Dict[str, Any]]:
"""Extract parameter values for a Bauzone from zone-parameter tables."""
result = []
seen = set()
for table in zone_parameter_tables:
zones = table.get("zones", [])
if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in zones):
continue
page = table.get("page")
art = table.get("article")
for param in table.get("parameters", []):
values_by_zone = param.get("values_by_zone", {})
for zone, values in values_by_zone.items():
if not _bzo_zone_matches_table(bauzone, str(zone)):
continue
if not isinstance(values, list) or len(values) == 0:
continue
val_entry = values[0]
value = val_entry.get("value", "")
unit = val_entry.get("unit") or ""
param_name = param.get("parameter", "")
key = f"{param_name}|{value}|{unit}"
if key not in seen:
seen.add(key)
source = _bzo_build_source(page, param.get("article") or art)
result.append({
"parameter": param_name,
"value": str(value),
"unit": str(unit).strip() if unit else "",
"source": source or "Tabelle im Dokument",
"rule_type": None,
})
return result
def _bzo_filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""Filter rules by Bauzone code."""
bauzone_upper = (bauzone or "").upper()
out = []
for r in rules:
if bauzone_upper in (r.get("zone_raw") or "").upper():
out.append(r)
continue
for tz in (r.get("table_zones") or []):
if bauzone_upper in str(tz).upper():
out.append(r)
break
else:
if bauzone_upper in (r.get("text_snippet") or "").upper():
out.append(r)
return out
def _bzo_get_params_from_rules(rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert parsed rules to {parameter, value, unit, source, rule_type} format."""
result = []
seen = set()
for r in rules:
rule_type = r.get("rule_type", "")
param_name = RULE_TYPE_TO_PARAM.get(rule_type) or rule_type.replace("_", " ").title()
value_numeric = r.get("value_numeric")
value_text = r.get("value_text", "")
unit = r.get("unit") or ""
if value_numeric is not None:
val_str = str(int(value_numeric)) if isinstance(value_numeric, float) and value_numeric == int(value_numeric) else str(value_numeric)
else:
val_str = str(value_text).strip() if value_text else ""
if not val_str:
continue
val_lower = val_str.lower()
if val_lower in ("gebäudelänge", "gebäudebreite", "mehrlängenzuschlag", "mehrlängen", "grenzabstand", "fassadenhöhe"):
continue
unit_str = str(unit).strip() if unit else (RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, ""))
page = r.get("page")
article = r.get("article_label")
source = _bzo_build_source(page, article) or "Artikeltxt"
key = f"{param_name}|{val_str}|{unit_str}"
if key not in seen:
seen.add(key)
result.append({
"parameter": param_name,
"value": val_str,
"unit": unit_str,
"source": source,
"rule_type": rule_type,
})
return result
def _bzo_param_to_rule_type(param_name: str) -> Optional[str]:
"""Map parameter display name to rule_type."""
p = (param_name or "").lower()
if "vollgeschoss" in p:
return "max_floors"
if "dachgeschoss" in p or "attika" in p:
return "max_attachable_attics"
if "untergeschoss" in p:
return "max_attachable_basement"
if "ausnützungsziffer" in p or "ausnutzungsziffer" in p or " az " in p:
return "density"
if "überbauungsziffer" in p or " uz " in p:
return "building_coverage"
if "baumassenziffer" in p or "bmz" in p:
return "building_mass_index"
if "grünflächen" in p or "gfz" in p:
return "green_space_index"
if "grenzabstand" in p or "grundabstand" in p:
return "boundary_distance"
if "mehrlängen" in p or "mlz" in p:
return "boundary_distance_length_surcharge"
if "höchstmass" in p:
return "boundary_distance_max"
if "gebäudelänge" in p:
return "building_length"
if "gebäudebreite" in p:
return "building_width"
if "fassadenhöhe" in p or "gebäudehöhe" in p:
return "max_building_height"
if "wohnflächenanteil" in p or "wohnanteil" in p:
return "residential_area_share"
return None
def _bzo_merge_rules(
from_tables: List[Dict[str, Any]],
from_rules: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Merge table params and rule params. Tables take precedence."""
by_param_lower: Dict[str, Dict[str, Any]] = {}
for r in from_tables:
p = (r.get("parameter") or "").lower()
if p and p not in by_param_lower:
rr = r.copy()
if not rr.get("rule_type"):
rr["rule_type"] = _bzo_param_to_rule_type(rr.get("parameter", ""))
by_param_lower[p] = rr
for r in from_rules:
p = (r.get("parameter") or "").lower()
if p and p not in by_param_lower:
by_param_lower[p] = r.copy()
return list(by_param_lower.values())
def _bzo_param_sort_key(param_name: str) -> int:
"""Order parameters by BZO_PARAM_ORDER."""
p = (param_name or "").lower()
for i, kw in enumerate(BZO_PARAM_ORDER):
if kw in p:
return i
return 99
def _bzo_extract_zusatzinformationen(
articles: List[Dict[str, Any]],
bauzone: str = "",
zone_parameter_tables: Optional[List[Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""Extract article excerpts relevant to the bauzone."""
result = []
seen = set()
patterns = [re.compile(kw, re.IGNORECASE) for kw in _ARTIKEL_KEYWORDS]
table_articles = set()
if zone_parameter_tables and bauzone:
for t in zone_parameter_tables:
if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in t.get("zones", [])):
continue
table_articles.add(t.get("article") or "")
for art in articles:
label = art.get("article_label") or ""
title = (art.get("article_title") or "").strip()
text = (art.get("text") or "").strip()
page = art.get("page_start") or art.get("page_end") or 0
if not label or not text:
continue
key = f"{label}|{page}"
if key in seen:
continue
combined = f"{title} {text}"
if not any(p.search(combined) for p in patterns):
continue
if bauzone:
if not _bzo_article_mentions_bauzone(combined, bauzone) and label not in table_articles:
continue
seen.add(key)
source = _bzo_build_source(page, label)
result.append({
"article_label": label,
"article_title": title,
"text": text[:3500].strip(),
"page": page,
"source": source or "BZO-Dokument",
})
return sorted(result, key=lambda x: (x.get("page", 0), x.get("article_label", "")))
def extract_wohnzone_params(
extracted_content: Dict[str, Any],
bauzone: str,
relevant_rules: Optional[List[Dict[str, Any]]] = None,
total_area_m2: Optional[float] = None,
) -> Dict[str, Any]:
"""
Extract BZO parameters for a Wohnzone from extracted content.
Returns ordered list of fakten (with sources) and zusatzinformationen.
"""
articles = extracted_content.get("articles", [])
zone_parameter_tables = extracted_content.get("zone_parameter_tables", [])
all_rules = extracted_content.get("rules", [])
rules_to_use = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone(all_rules, bauzone)
from_tables = _bzo_get_params_from_tables(zone_parameter_tables, bauzone)
from_rules = _bzo_get_params_from_rules(rules_to_use)
bauzone_rules = _bzo_merge_rules(from_tables, from_rules)
fakten = []
if bauzone:
fakten.append({"item": "Auswertung für Bauzone", "value": bauzone, "source": ""})
if total_area_m2 is not None and total_area_m2 > 0:
fakten.append({
"item": "Grundstücksfläche",
"value": f"{total_area_m2:,.0f}".replace(",", "'"),
"source": "Parzellendaten",
})
for r in sorted(bauzone_rules, key=lambda x: _bzo_param_sort_key(x.get("parameter", ""))):
param = r.get("parameter", "").strip()
val = r.get("value", "")
unit = (r.get("unit") or "").strip()
rule_type = r.get("rule_type") or _bzo_param_to_rule_type(param)
if not unit and rule_type:
unit = RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, "")
value_str = f"{val}{(' ' + unit) if unit else ''}".strip()
if param and value_str:
fakten.append({
"item": param,
"value": value_str,
"source": r.get("source") or "BZO-Dokument",
})
zusatzinformationen = _bzo_extract_zusatzinformationen(
articles, bauzone, zone_parameter_tables
)
return {
"bauzone": bauzone,
"fakten": fakten,
"zusatzinformationen": zusatzinformationen,
}
# ===== LLM-based BZO Params Extraction =====
def _build_bauzone_context_for_llm(state: BZOParamsExtractionState) -> str:
"""Build context string for LLM from extracted BZO content."""
bauzone = (state.get("bauzone") or "").upper()
zone_parameter_tables = state.get("zone_parameter_tables", [])
relevant_articles = state.get("relevant_articles", [])
relevant_rules = state.get("relevant_rules", [])
total_area_m2 = state.get("total_area_m2")
parts = []
if total_area_m2 is not None and total_area_m2 > 0:
parts.append(f"Grundstücksfläche der Parzelle: {total_area_m2:,.0f}".replace(",", "'"))
parts.append("")
# Full article texts - LLM can parse tables like Art. 14 (zones in rows, values in columns)
parts.append("=== ARTIKEL MIT VOLLEM TEXT (Tabellen genau lesen, richtige Spalte/Zeile für Bauzone wählen) ===")
for art in relevant_articles:
label = art.get("article_label", "")
title = (art.get("article_title") or "").strip()
text = art.get("text", "")
page = art.get("page_start") or art.get("page_end", 0)
parts.append(f"\n{label}: {title}")
parts.append(f"Seite: {page}")
parts.append(f"Inhalt:\n{text}")
parts.append("")
# Zone-parameter tables (pre-parsed)
if zone_parameter_tables:
parts.append("=== VORSTRUKTURIERTE TABELLENWERTE FÜR BAUZONE ===")
for table in zone_parameter_tables:
page = table.get("page", 0)
art = table.get("article", "")
parts.append(f"\n{art} (S. {page}):")
for param in table.get("parameters", []):
pname = param.get("parameter", "")
for zone, values in (param.get("values_by_zone") or {}).items():
if bauzone in (zone or "").upper():
if isinstance(values, list) and values:
v = values[0].get("value", "")
u = values[0].get("unit") or ""
parts.append(f" {pname} [{zone}]: {v} {u}".strip())
parts.append("")
# Rules from text
if relevant_rules:
parts.append("=== REGELN AUS ARTIKELTEXT ===")
for r in relevant_rules[:20]:
rt = r.get("rule_type", "")
vn = r.get("value_numeric")
vt = r.get("value_text", "")
u = r.get("unit", "")
page = r.get("page", 0)
art = r.get("article_label", "")
val = str(int(vn)) if vn is not None and isinstance(vn, float) and vn == int(vn) else (str(vn) if vn is not None else vt)
parts.append(f" {rt}: {val} {u} ({art}, S. {page})".strip())
return "\n".join(parts)
def _parse_llm_bullet_list(text: str) -> List[Dict[str, str]]:
"""Parse LLM response into fakten list. Expects lines like '- Param: value (Art. X, S. Y)'."""
fakten = []
for line in (text or "").strip().split("\n"):
line = line.strip()
if not line or not line.startswith("-"):
continue
line = line.lstrip("- ").strip()
# Match "Param: value (source)" or "Param: value"
match = re.match(r"^(.+?):\s*(.+?)(?:\s*\(([^)]+)\))?\s*$", line)
if match:
item = match.group(1).strip()
value = match.group(2).strip()
source = (match.group(3) or "").strip()
if item and value:
fakten.append({"item": item, "value": value, "source": source})
elif ":" in line:
idx = line.find(":")
fakten.append({
"item": line[:idx].strip(),
"value": line[idx + 1 :].strip(),
"source": "",
})
return fakten
async def _llm_filter_relevant_provisions(
ai_service: Any,
bauzone: str,
fakten: List[Dict[str, str]],
provision_articles: List[Dict[str, Any]],
) -> Optional[set]:
"""
Use LLM to determine which provision articles are relevant for a parcel in this bauzone.
Returns set of article labels (e.g. {"Art. 15", "Art. 16"}) or None to include all on error.
"""
if not provision_articles:
return set()
fakten_str = "\n".join(
f"- {f.get('item', '')}: {f.get('value', '')}" for f in fakten
if f.get("item") and "Auswertung" not in (f.get("item") or "")
)
articles_str = "\n".join(
f"- {a.get('article_label', '')}: {a.get('article_title', '')}"
for a in provision_articles
if a.get("article_label")
)
prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO).
Eine Parzelle liegt in der Bauzone {bauzone}. Folgende BZO-Parameter gelten für diese Zone:
{fakten_str}
Folgende Bestimmungen (Weiterführende Artikel) könnten zutreffen:
{articles_str}
AUFGABE: Welche dieser Artikel sind für eine Parzelle in Bauzone {bauzone} mit diesen Parametern TATSÄCHLICH RELEVANT?
- Nur Artikel angeben, die auf diese Zone/Parameter Bezug nehmen oder Bedingungen nennen, die hier greifen
- z.B. Art. 15 Herabsetzung: relevant wenn Vollgeschosse und Grenzabstand vorhanden (Reduktion bei weggelassenen Geschossen)
- z.B. Art. 16 Nutzweise: relevant für Wohnzonen mit Wohnanteil
- z.B. Art. 40 Wohnanteil: nur wenn dieser Artikel die Zone {bauzone} erwähnt oder für Wohnzonen gilt
- Artikel die andere Zonen betreffen (z.B. nur Z5, I) und {bauzone} ausschliessen: NICHT aufnehmen
Antwort NUR mit den relevanten Artikelnummern, eine pro Zeile (z.B. "Art. 15", "Art. 16"). Keine anderen Zeichen."""
try:
response = await ai_service.callAiPlanning(
prompt=prompt,
debugType="bzo_relevant_provisions",
)
labels = set()
for line in (response or "").strip().split("\n"):
m = re.search(r"(Art\.\s*\d+[a-z]?)", line.strip(), re.I)
if m:
lbl = re.sub(r"\s+", " ", m.group(1).strip())
labels.add(lbl)
return labels if labels else None # None = include all (fallback on error or empty)
except Exception as e:
logger.warning(f"LLM provision filter failed: {e}")
return None
async def llm_extract_bauzone_params_node(state: BZOParamsExtractionState) -> BZOParamsExtractionState:
"""Use LLM to extract BZO parameters for Bauzone as bullet list."""
bauzone = state.get("bauzone", "")
gemeinde = state.get("gemeinde", "")
ai_service = state.get("ai_service")
errors = list(state.get("errors", []))
if not ai_service:
errors.append("AI service not provided")
return {**state, "fakten": [], "bauzone_params_list": [], "errors": errors}
context = _build_bauzone_context_for_llm(state)
prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO). Extrahiere alle relevanten BZO-Parameter für die Bauzone {bauzone} in {gemeinde}.
BZO-INHALT:
{context}
AUFGABE: Erstelle eine geordnete Bullet-Liste ALLER zutreffenden Parameter für Bauzone {bauzone}.
Priorität: Vollgeschosse, anrechenbares Untergeschoss, anrechenbares Dachgeschoss, Ausnützungsziffer, Überbauungsziffer, Gebäudehöhe, Grundabstand/Grenzabstand, Gebäudelänge, Mehrlängenzuschlag, Höchstmass, sowie alle anderen Bestimmungen die für diese Zone gelten.
WICHTIG:
- Bei Tabellen: die richtige Spalte/Zeile für {bauzone} verwenden (z.B. Art. 14 Mehrlängenzuschlag: W5 = 13 m)
- Jede Zeile: "- Parametername: Wert (Art. X, S. Y)"
- Nur tatsächlich im Dokument vorhandene Werte angeben
- Einheit (m, %, Stk.) bei Zahlen mit angeben
- Keine leeren Zeilen oder Kommentare - nur die Liste
Antwort NUR mit der Bullet-Liste, sonst nichts:"""
try:
ai_response = await ai_service.callAiPlanning(
prompt=prompt,
debugType="bzo_params_extraction",
)
response_text = (ai_response or "").strip()
# Parse into fakten
fakten = _parse_llm_bullet_list(response_text)
# Build bauzone_params_list (raw "- ..." strings)
bauzone_params_list = [f"- {f['item']}: {f['value']}" + (f" ({f['source']})" if f.get("source") else "") for f in fakten]
# Add header items if missing
if bauzone and not any("Auswertung" in (f.get("item") or "") for f in fakten):
fakten.insert(0, {"item": "Auswertung für Bauzone", "value": bauzone, "source": ""})
total_area_m2 = state.get("total_area_m2")
if total_area_m2 is not None and total_area_m2 > 0 and not any("Grundstücksfläche" in (f.get("item") or "") for f in fakten):
fakten.insert(1, {
"item": "Grundstücksfläche",
"value": f"{total_area_m2:,.0f}".replace(",", "'"),
"source": "Parzellendaten",
})
# Zusatzinformationen: only provisions RELEVANT for this parcel in this bauzone
all_articles = state.get("extracted_content", {}).get("articles", []) or state.get("relevant_articles", [])
provision_articles = [a for a in all_articles if _is_zusatzinfo_article((a.get("article_title") or "").strip())]
relevant_labels = await _llm_filter_relevant_provisions(
ai_service=ai_service,
bauzone=bauzone,
fakten=fakten,
provision_articles=provision_articles,
)
def _norm_label(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip())
zusatzinformationen = []
for art in provision_articles:
label = art.get("article_label", "")
title = (art.get("article_title") or "").strip()
norm = _norm_label(label)
if relevant_labels is not None and norm and norm not in relevant_labels:
continue
raw_text = (art.get("text") or "")[:4000]
text = _format_article_text_readable(
raw_text,
article_label=label,
article_title=title,
)
if not text:
continue
page = art.get("page_start") or art.get("page_end", 0)
source = f"{label}, S. {page}" if label else f"S. {page}"
zusatzinformationen.append({
"article_label": label,
"article_title": title,
"text": text,
"source": source,
})
return {
**state,
"fakten": fakten,
"bauzone_params_list": bauzone_params_list,
"zusatzinformationen": zusatzinformationen,
"errors": errors,
}
except Exception as e:
logger.error(f"LLM BZO params extraction failed: {e}", exc_info=True)
errors.append(str(e))
return {
**state,
"fakten": [],
"bauzone_params_list": [],
"zusatzinformationen": [],
"errors": errors,
}
def _filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""Filter articles that mention the Bauzone."""
bauzone_upper = (bauzone or "").upper()
return [
a for a in articles
if bauzone_upper in (a.get("text") or "").upper() or bauzone_upper in (a.get("zone_raw") or "").upper()
]
def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""Filter zone-parameter tables to those containing the Bauzone."""
bauzone_upper = (bauzone or "").upper()
relevant = []
for table in tables:
zones = table.get("zones", [])
matching = [z for z in zones if bauzone_upper in str(z).upper()]
if matching:
filtered = {
"page": table.get("page"),
"article": table.get("article"),
"zones": matching,
"parameters": [
{"parameter": p.get("parameter"), "values_by_zone": {
z: v for z, v in (p.get("values_by_zone") or {}).items()
if bauzone_upper in str(z).upper()
}}
for p in table.get("parameters", [])
if any(bauzone_upper in str(z).upper() for z in (p.get("values_by_zone") or {}))
],
}
filtered["parameters"] = [x for x in filtered["parameters"] if x["values_by_zone"]]
if filtered["parameters"]:
relevant.append(filtered)
return relevant
async def run_bzo_params_extraction(
extracted_content: Dict[str, Any],
bauzone: str,
ai_service: Any,
gemeinde: str,
relevant_rules: Optional[List[Dict[str, Any]]] = None,
relevant_articles: Optional[List[Dict[str, Any]]] = None,
total_area_m2: Optional[float] = None,
) -> Dict[str, Any]:
"""
Extract BZO parameters for a Bauzone via LLM.
Returns fakten (item/value/source), bauzone_params_list (bullet strings), zusatzinformationen.
"""
rules = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone(
extracted_content.get("rules", []), bauzone
)
articles = relevant_articles if relevant_articles is not None else _filter_articles_by_bauzone(
extracted_content.get("articles", []), bauzone
)
tables = _filter_tables_by_bauzone(
extracted_content.get("zone_parameter_tables", []),
bauzone
)
state: BZOParamsExtractionState = {
"extracted_content": extracted_content,
"bauzone": bauzone,
"total_area_m2": total_area_m2,
"relevant_rules": rules,
"relevant_articles": articles,
"zone_parameter_tables": tables,
"ai_service": ai_service,
"gemeinde": gemeinde,
"bauzone_params_list": [],
"fakten": [],
"zusatzinformationen": [],
"errors": [],
}
final_state = await llm_extract_bauzone_params_node(state)
return {
"bauzone": bauzone,
"fakten": final_state.get("fakten", []),
"bauzone_params_list": final_state.get("bauzone_params_list", []),
"zusatzinformationen": final_state.get("zusatzinformationen", []),
"errors": final_state.get("errors", []),
}
# ===== Pipeline Execution =====
def _run_bzo_extraction_pipeline(state: BZOExtractionState) -> BZOExtractionState:
"""Run the BZO extraction steps sequentially on the shared state."""
state = classify_and_assemble(state)
state = extract_zones_and_tables(state)
state = extract_rules(state)
return state
def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None) -> Dict[str, Any]:
"""
Run the extraction pipeline on a PDF and return structured, sorted results.
Args:
pdf_bytes: PDF file content as bytes
pdf_id: Optional identifier for the PDF (defaults to generated ID)
dokument_id: Optional dokument ID for reference
Returns:
Dictionary with extracted and sorted content:
{
"articles": [...], # Sorted by page_start, then article_label
"zones": [...], # Sorted by zone_code
"rules": [...], # Sorted by rule_type, then page
"errors": [...],
"warnings": [...]
}
"""
if not pdf_id:
pdf_id = f"pdf_{uuid.uuid4().hex[:8]}"
# Initialize state
state: BZOExtractionState = {
"dokument_id": dokument_id,
"pdf_id": pdf_id,
"text_blocks": [],
"classified_blocks": [],
"articles": [],
"current_zones": {},
"zones": [],
"rule_candidates": [],
"parsed_rules": [],
"zone_parameter_tables": [],
"errors": [],
"warnings": []
}
# Extract PDF text first
pdf_extractor = BZOPdfExtractor()
text_blocks_objects = pdf_extractor.extract_text_blocks(pdf_bytes, state["pdf_id"])
# Convert TextBlock objects to dicts for state
state["text_blocks"] = [
{
"page": tb.page,
"text": tb.text,
"block_id": tb.block_id,
"bbox": tb.bbox
}
for tb in text_blocks_objects
]
# Run the extraction pipeline
final_state = _run_bzo_extraction_pipeline(state)
# Sort and structure results
articles = sorted(
final_state.get("articles", []),
key=lambda x: (x.get("page_start", 0), x.get("article_label", ""))
)
zones = sorted(
final_state.get("zones", []),
key=lambda x: x.get("zone_code", "")
)
rules = sorted(
final_state.get("parsed_rules", []),
key=lambda x: (x.get("rule_type", ""), x.get("page", 0))
)
zone_parameter_tables = final_state.get("zone_parameter_tables", [])
return {
"articles": articles,
"zones": zones,
"rules": rules,
"zone_parameter_tables": zone_parameter_tables,
"errors": final_state.get("errors", []),
"warnings": final_state.get("warnings", [])
}
def extract_from_documents(
document_retriever,
dokument_ids: List[str]
) -> Dict[str, Any]:
"""
Extract BZO content from one or more documents.
Args:
document_retriever: BZODocumentRetriever instance
dokument_ids: List of dokument IDs to process
Returns:
Dictionary with results per document:
{
"results": [
{
"dokument_id": "...",
"articles": [...],
"zones": [...],
"rules": [...],
"errors": [...],
"warnings": [...]
},
...
],
"summary": {
"total_documents": N,
"successful": M,
"failed": K,
"total_articles": X,
"total_zones": Y,
"total_rules": Z
}
}
"""
results = []
total_articles = 0
total_zones = 0
total_rules = 0
successful = 0
failed = 0
# Retrieve documents
dokumente = document_retriever.get_documents_by_ids(dokument_ids)
for dokument in dokumente:
try:
# Retrieve PDF content
pdf_bytes = document_retriever.retrieve_pdf_content(dokument)
if not pdf_bytes:
logger.warning(f"Could not retrieve PDF for dokument {dokument.id}")
results.append({
"dokument_id": dokument.id,
"articles": [],
"zones": [],
"rules": [],
"errors": [f"Could not retrieve PDF content"],
"warnings": []
})
failed += 1
continue
# Run extraction
extraction_result = run_extraction(
pdf_bytes=pdf_bytes,
pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}",
dokument_id=dokument.id
)
# Add dokument_id to result
extraction_result["dokument_id"] = dokument.id
results.append(extraction_result)
# Update counters
total_articles += len(extraction_result.get("articles", []))
total_zones += len(extraction_result.get("zones", []))
total_rules += len(extraction_result.get("rules", []))
if extraction_result.get("errors"):
failed += 1
else:
successful += 1
except Exception as e:
logger.error(f"Error processing dokument {dokument.id}: {str(e)}", exc_info=True)
results.append({
"dokument_id": dokument.id,
"articles": [],
"zones": [],
"rules": [],
"errors": [f"Processing error: {str(e)}"],
"warnings": []
})
failed += 1
return {
"results": results,
"summary": {
"total_documents": len(dokument_ids),
"successful": successful,
"failed": failed,
"total_articles": total_articles,
"total_zones": total_zones,
"total_rules": total_rules
}
}