1497 lines
60 KiB
Python
1497 lines
60 KiB
Python
"""
|
|
LangGraph-based pipeline for extracting structured content from BZO PDFs.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from typing import TypedDict, List, Dict, Any, Optional
|
|
from dataclasses import dataclass
|
|
from langgraph.graph import StateGraph, START, END
|
|
|
|
from modules.features.realEstate.bzoPdfExtractor import BZOPdfExtractor, TextBlock
|
|
from modules.features.realEstate.bzoRuleTaxonomy import RULE_TAXONOMY
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ===== BZO Params Extraction State (LangGraph with LLM) =====
|
|
|
|
class BZOParamsExtractionState(TypedDict):
|
|
"""State for BZO params extraction via LLM."""
|
|
extracted_content: Dict[str, Any]
|
|
bauzone: str
|
|
total_area_m2: Optional[float]
|
|
relevant_rules: List[Dict[str, Any]]
|
|
relevant_articles: List[Dict[str, Any]]
|
|
zone_parameter_tables: List[Dict[str, Any]]
|
|
ai_service: Any
|
|
gemeinde: str
|
|
# Output
|
|
bauzone_params_list: List[str]
|
|
fakten: List[Dict[str, str]]
|
|
zusatzinformationen: List[Dict[str, Any]]
|
|
errors: List[str]
|
|
|
|
|
|
# ===== State Definition =====
|
|
|
|
@dataclass
|
|
class ClassifiedBlock:
|
|
"""Classified text block."""
|
|
block: TextBlock
|
|
block_type: str # "article", "heading", "table", "other"
|
|
article_label: Optional[str] = None
|
|
article_title: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class Article:
|
|
"""Assembled article."""
|
|
article_label: str
|
|
article_title: Optional[str]
|
|
text: str
|
|
page_start: int
|
|
page_end: int
|
|
section_level_1: Optional[str] = None
|
|
section_level_2: Optional[str] = None
|
|
section_level_3: Optional[str] = None
|
|
zone_raw: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class ZoneInfo:
|
|
"""Zone information."""
|
|
zone_code: str
|
|
zone_name: str
|
|
zone_category: Optional[str] = None
|
|
zone_subcategory: Optional[str] = None
|
|
empfindlichkeitsstufe: Optional[str] = None
|
|
geschosszahl: Optional[int] = None
|
|
gewerbeerleichterung: bool = False
|
|
|
|
|
|
@dataclass
|
|
class RuleCandidate:
|
|
"""Rule candidate from pattern matching."""
|
|
rule_type: str
|
|
matched_text: str
|
|
article_text: str
|
|
page: int
|
|
is_table_rule: bool = False
|
|
table_zones: List[str] = None
|
|
condition_text: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class ParsedRule:
|
|
"""Parsed rule with structured values."""
|
|
rule_type: str
|
|
value_numeric: Optional[float]
|
|
value_text: str
|
|
unit: Optional[str]
|
|
condition_text: Optional[str]
|
|
is_table_rule: bool
|
|
table_zones: List[str]
|
|
page: int
|
|
text_snippet: str
|
|
zone_raw: Optional[str] = None
|
|
rule_scope: str = "general"
|
|
confidence: float = 0.5
|
|
|
|
|
|
class BZOExtractionState(TypedDict):
|
|
"""State for BZO extraction pipeline."""
|
|
# Input metadata
|
|
dokument_id: Optional[str]
|
|
pdf_id: str
|
|
|
|
# Extracted text blocks (stored as dicts for serialization)
|
|
text_blocks: List[Dict[str, Any]]
|
|
|
|
# Classified blocks (stored as dicts for serialization)
|
|
classified_blocks: List[Dict[str, Any]]
|
|
|
|
# Assembled articles (stored as dicts for serialization)
|
|
articles: List[Dict[str, Any]]
|
|
|
|
# Zone tracking
|
|
current_zones: Dict[str, Dict[str, Any]]
|
|
zones: List[Dict[str, Any]]
|
|
|
|
# Rule extraction (stored as dicts for serialization)
|
|
rule_candidates: List[Dict[str, Any]]
|
|
parsed_rules: List[Dict[str, Any]]
|
|
|
|
# Zone-parameter tables (structured table data mapping zones to parameters)
|
|
zone_parameter_tables: List[Dict[str, Any]]
|
|
|
|
# Processing metadata
|
|
errors: List[str]
|
|
warnings: List[str]
|
|
|
|
|
|
# ===== Node Implementations (Simplified 4-node pipeline) =====
|
|
|
|
def classify_and_assemble(state: BZOExtractionState) -> BZOExtractionState:
|
|
"""Classify text blocks and assemble into articles (merged node)."""
|
|
try:
|
|
classified = []
|
|
for block_dict in state["text_blocks"]:
|
|
text = block_dict["text"].strip()
|
|
if not text:
|
|
continue
|
|
block_type = "other"
|
|
article_label = None
|
|
article_title = None
|
|
article_match = re.search(r'Art\.?\s*(\d+[a-z]?)', text, re.IGNORECASE)
|
|
if article_match:
|
|
block_type = "article"
|
|
article_label = f"Art. {article_match.group(1)}"
|
|
title_match = re.search(r'Art\.?\s*\d+[a-z]?\s+(.+?)(?:\.|$|\n)', text, re.IGNORECASE)
|
|
if title_match:
|
|
article_title = title_match.group(1).strip()
|
|
elif re.match(r'^[A-Z]\.\s+[A-Z]', text) or re.match(r'^[IVX]+\.\s+[A-Z]', text) or re.match(r'^\d+\.\s+[A-Z]', text):
|
|
block_type = "heading"
|
|
elif '\t' in text or (len(text.split()) > 5 and text.count(' ') > 2):
|
|
block_type = "table"
|
|
classified.append({
|
|
"block": {"page": block_dict["page"], "text": block_dict["text"], "block_id": block_dict["block_id"], "bbox": block_dict.get("bbox")},
|
|
"block_type": block_type, "article_label": article_label, "article_title": article_title
|
|
})
|
|
state["classified_blocks"] = classified
|
|
|
|
articles = []
|
|
current_article = None
|
|
current_section_1 = current_section_2 = current_section_3 = None
|
|
for classified_dict in classified:
|
|
block_dict = classified_dict["block"]
|
|
text = block_dict["text"].strip()
|
|
block_type = classified_dict["block_type"]
|
|
article_label = classified_dict.get("article_label")
|
|
article_title = classified_dict.get("article_title")
|
|
if block_type == "heading":
|
|
if re.match(r'^[A-Z]\.\s+', text):
|
|
current_section_1 = text.split('.', 1)[0] + '.'
|
|
current_section_2 = current_section_3 = None
|
|
elif re.match(r'^[IVX]+\.\s+', text):
|
|
current_section_2 = text.split('.', 1)[0] + '.'
|
|
current_section_3 = None
|
|
elif re.match(r'^\d+\.\s+', text):
|
|
current_section_3 = text.split('.', 1)[0] + '.'
|
|
if article_label:
|
|
if current_article:
|
|
articles.append(current_article)
|
|
current_article = {
|
|
"article_label": article_label, "article_title": article_title, "text": text,
|
|
"page_start": block_dict["page"], "page_end": block_dict["page"],
|
|
"section_level_1": current_section_1, "section_level_2": current_section_2,
|
|
"section_level_3": current_section_3, "zone_raw": None
|
|
}
|
|
elif current_article:
|
|
current_article["text"] += "\n" + text
|
|
current_article["page_end"] = block_dict["page"]
|
|
if current_article:
|
|
articles.append(current_article)
|
|
state["articles"] = articles
|
|
return state
|
|
except Exception as e:
|
|
logger.error(f"Error in classify_and_assemble: {e}", exc_info=True)
|
|
state["errors"] = state.get("errors", []) + [f"Classify/assemble error: {str(e)}"]
|
|
return state
|
|
|
|
|
|
def extract_zones_and_tables(state: BZOExtractionState) -> BZOExtractionState:
|
|
"""Detect zones and extract zone-parameter tables (merged node)."""
|
|
try:
|
|
# Part 1: Detect zone declarations
|
|
zones = []
|
|
current_zones = {}
|
|
for article_dict in state["articles"]:
|
|
text = article_dict.get("text", "")
|
|
article_label = article_dict.get("article_label", "")
|
|
page_start = article_dict.get("page_start", 0)
|
|
|
|
# Pattern: "Wohnzone W2", "Zone W3", "Gewerbezone G1"
|
|
zone_patterns = [
|
|
r'(?:Wohnzone|Zone|Gewerbezone|Industriezone|Zentrumszone|Ortsbildschutzzone|Erholungszone)\s+([A-Z0-9/]+)',
|
|
r'([A-Z]\d+(?:/\d+)?(?:G)?)', # W2/30, W2/30G, Z3, K3/4
|
|
]
|
|
|
|
for pattern in zone_patterns:
|
|
matches = re.finditer(pattern, text, re.IGNORECASE)
|
|
for match in matches:
|
|
zone_code = match.group(1).upper()
|
|
|
|
# Parse zone code
|
|
gewerbeerleichterung = zone_code.endswith('G')
|
|
if gewerbeerleichterung:
|
|
zone_code_base = zone_code[:-1]
|
|
else:
|
|
zone_code_base = zone_code
|
|
|
|
# Extract geschosszahl from code (e.g., W2 -> 2, W3/50 -> 3)
|
|
geschosszahl = None
|
|
if '/' in zone_code_base:
|
|
parts = zone_code_base.split('/')
|
|
geschosszahl_match = re.search(r'(\d+)', parts[0])
|
|
if geschosszahl_match:
|
|
geschosszahl = int(geschosszahl_match.group(1))
|
|
else:
|
|
geschosszahl_match = re.search(r'(\d+)', zone_code_base)
|
|
if geschosszahl_match:
|
|
geschosszahl = int(geschosszahl_match.group(1))
|
|
|
|
# Determine zone category from context
|
|
zone_category = None
|
|
if 'Wohnzone' in text or zone_code.startswith('W'):
|
|
zone_category = "Wohnzonen"
|
|
elif 'Zentrumszone' in text or zone_code.startswith('Z'):
|
|
zone_category = "Zentrumszonen"
|
|
elif 'Gewerbezone' in text or zone_code.startswith('G'):
|
|
zone_category = "Arbeitsplatzzonen"
|
|
elif 'Industriezone' in text or zone_code.startswith('I'):
|
|
zone_category = "Arbeitsplatzzonen"
|
|
|
|
zone_info = ZoneInfo(
|
|
zone_code=zone_code,
|
|
zone_name=f"Zone {zone_code}",
|
|
zone_category=zone_category,
|
|
geschosszahl=geschosszahl,
|
|
gewerbeerleichterung=gewerbeerleichterung
|
|
)
|
|
|
|
current_zones[zone_code] = zone_info
|
|
zones.append({
|
|
"zone_code": zone_code,
|
|
"zone_name": zone_info.zone_name,
|
|
"zone_category": zone_category,
|
|
"geschosszahl": geschosszahl,
|
|
"gewerbeerleichterung": gewerbeerleichterung,
|
|
"source_article": article_label,
|
|
"page": page_start
|
|
})
|
|
|
|
state["current_zones"] = current_zones
|
|
state["zones"] = zones
|
|
|
|
# Part 2: Extract zone-parameter tables
|
|
_extract_zone_parameter_tables_impl(state)
|
|
return state
|
|
except Exception as e:
|
|
logger.error(f"Error in extract_zones_and_tables: {e}", exc_info=True)
|
|
state["errors"] = state.get("errors", []) + [f"Zones/tables error: {str(e)}"]
|
|
return state
|
|
|
|
|
|
def _extract_zone_parameter_tables_impl(state: BZOExtractionState) -> None:
|
|
"""Extract zone-parameter tables from classified blocks. Mutates state in place."""
|
|
tables = []
|
|
table_blocks = [b for b in state.get("classified_blocks", []) if b.get("block_type") == "table"]
|
|
zone_pattern = r'\b([WLIZK]\d+(?:/\d+)?(?:G\*?)?)\b'
|
|
parameter_keywords = [
|
|
r'Ausnützungsziffer', r'Überbauungsziffer', r'Vollgeschosse', r'Dachgeschosse', r'Attikageschoss', r'Untergeschoss',
|
|
r'Gebäudelänge', r'Grenzabstand', r'Fassadenhöhen', r'Grundabstand', r'Mehrlängen', r'Höchstmass'
|
|
]
|
|
parameter_row_patterns = [
|
|
r'^[a-g]\)\s+(.+?)(?:\s+max\.|min\.|:)?',
|
|
r'^(Ausnützungsziffer|Überbauungsziffer|Vollgeschosse|Dachgeschosse|Attikageschoss|Untergeschoss|Gebäudelänge|Grenzabstand|Fassadenhöhen|Grundabstand|Mehrlängen|Höchstmass|Höchstmaß)',
|
|
]
|
|
subparameter_patterns = [
|
|
r'^(Grundabstand|Mehrlängen|Höchstmass|Höchstmaß|Fassadenhöhen)\s*(min\.|max\.)?',
|
|
r'^(anrechenbare\s+Dachgeschosse|anrechenbares\s+Attikageschoss|anrechenbares\s+Untergeschoss)',
|
|
]
|
|
numeric_pattern = r'(\d+(?:\.\d+)?)\s*(%|m|Geschoss|Geschosse|Geschosse\s+max\.?|Geschoss\s+max\.?)?'
|
|
for table_block in table_blocks:
|
|
block_dict = table_block.get("block", {})
|
|
text = block_dict.get("text", "")
|
|
page = block_dict.get("page", 0)
|
|
if not text or len(text.strip()) < 20:
|
|
continue
|
|
lines = text.split('\n')
|
|
header_row_idx, zone_columns = None, []
|
|
for idx, line in enumerate(lines):
|
|
zone_matches = re.findall(zone_pattern, line, re.IGNORECASE)
|
|
if len(zone_matches) >= 3:
|
|
header_row_idx, zone_columns = idx, zone_matches
|
|
break
|
|
if not zone_columns:
|
|
has_parameters = any(re.search(kw, text, re.IGNORECASE) for kw in parameter_keywords)
|
|
has_zones = len(re.findall(zone_pattern, text, re.IGNORECASE)) >= 3
|
|
if has_parameters and has_zones:
|
|
zone_columns = list(dict.fromkeys(re.findall(zone_pattern, text, re.IGNORECASE)))
|
|
header_row_idx = 0
|
|
if not zone_columns:
|
|
continue
|
|
article_context = None
|
|
for block in state.get("classified_blocks", []):
|
|
if block.get("block", {}).get("page") == page and block.get("article_label"):
|
|
article_context = block.get("article_label")
|
|
break
|
|
table_data = {"page": page, "zones": zone_columns, "parameters": [], "source_text": text[:500], "article": article_context}
|
|
start_idx = (header_row_idx + 1) if header_row_idx is not None else 0
|
|
current_parameter = current_subparameter = None
|
|
parameter_values = subparameter_values = {}
|
|
for line_idx in range(start_idx, len(lines)):
|
|
line = lines[line_idx].strip()
|
|
if not line:
|
|
continue
|
|
is_parameter_row, parameter_name = False, None
|
|
for pat in parameter_row_patterns:
|
|
m = re.match(pat, line, re.IGNORECASE)
|
|
if m:
|
|
is_parameter_row, parameter_name = True, re.sub(r'\s+max\.?\s*$', '', re.sub(r'\s+min\.?\s*$', '', m.group(1).strip(), flags=re.I), flags=re.I)
|
|
break
|
|
is_subparameter, subparameter_name = False, None
|
|
if not is_parameter_row:
|
|
for pat in subparameter_patterns:
|
|
m = re.search(pat, line, re.IGNORECASE)
|
|
if m:
|
|
is_subparameter, subparameter_name = True, m.group(1).strip() + (f" {m.group(2).strip()}" if m.lastindex and m.lastindex >= 2 and m.group(2) else "")
|
|
break
|
|
target_values = subparameter_values if current_subparameter else parameter_values
|
|
if is_parameter_row and parameter_name:
|
|
if current_parameter and parameter_values:
|
|
table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context})
|
|
current_parameter, current_subparameter, parameter_values, subparameter_values = parameter_name, None, {}, {}
|
|
continue
|
|
if is_subparameter and subparameter_name:
|
|
if current_subparameter and subparameter_values and current_parameter:
|
|
table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context})
|
|
current_subparameter, subparameter_values = subparameter_name, {}
|
|
continue
|
|
if current_parameter or current_subparameter:
|
|
line_parts = re.split(r'\s{2,}|\t', line)
|
|
line_parts = [p.strip() for p in line_parts if p.strip()]
|
|
n = len(zone_columns)
|
|
value_parts = []
|
|
# Column-based: extract trailing numeric/fraction parts that align with zone count
|
|
for p in reversed(line_parts):
|
|
if re.match(r'^\d+(?:\.\d+)?\s*(%|m)?$', p, re.I) or re.match(r'^\d+/\d+$', p):
|
|
val = re.sub(r'\s*(%|m)$', '', p, flags=re.I).strip()
|
|
unit = None
|
|
um = re.search(r'\s*(%|m)$', p, re.I)
|
|
if um:
|
|
unit = 'm' if um.group(1).lower() == 'm' else '%'
|
|
value_parts.insert(0, (val, unit))
|
|
else:
|
|
break
|
|
if len(value_parts) == n:
|
|
for zi, zone in enumerate(zone_columns):
|
|
if zone not in target_values:
|
|
target_values[zone] = []
|
|
val, unit = value_parts[zi]
|
|
target_values[zone].append({"value": val, "unit": unit, "raw_text": line[:200], "line_number": line_idx})
|
|
else:
|
|
# Fallback: regex match by character position
|
|
all_matches = [(m.start(), m.group(0), m.group(1), m.group(2) if m.lastindex and m.lastindex > 1 else None) for m in re.finditer(numeric_pattern, line, re.I)]
|
|
all_matches += [(m.start(), m.group(0), m.group(0), None) for m in re.finditer(r'(\d+/\d+)', line, re.I)]
|
|
all_matches.sort(key=lambda x: x[0])
|
|
if len(all_matches) == n:
|
|
for zi, zone in enumerate(zone_columns):
|
|
if zone not in target_values:
|
|
target_values[zone] = []
|
|
_, _, val, unit = all_matches[zi]
|
|
target_values[zone].append({"value": val, "unit": unit.strip() if unit else None, "raw_text": line[:200], "line_number": line_idx})
|
|
if current_subparameter and subparameter_values and current_parameter:
|
|
table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context})
|
|
if current_parameter and parameter_values:
|
|
table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context})
|
|
if table_data["parameters"]:
|
|
tables.append(table_data)
|
|
state["zone_parameter_tables"] = state.get("zone_parameter_tables", []) + tables
|
|
if tables:
|
|
logger.info(f"Extracted {len(tables)} zone-parameter tables")
|
|
|
|
|
|
# Zone code pattern: W5, W2/30, Z3, K3/4, W5G, W 5 (optional space)
|
|
_ZONE_CODE_PATTERN = re.compile(r'\b([WZIK]\s*\d+(?:\s*/\s*\d+)?(?:G)?)\b', re.IGNORECASE)
|
|
|
|
|
|
def _zones_in_text(text: str) -> List[str]:
|
|
"""Extract zone codes (W5, W2/30, Z3, etc.) from text. Returns unique list, normalized (e.g. W5)."""
|
|
matches = _ZONE_CODE_PATTERN.findall(text)
|
|
seen = set()
|
|
result = []
|
|
for m in matches:
|
|
# Normalize: remove spaces -> W5, W2/30
|
|
n = re.sub(r'\s+', '', m).upper()
|
|
if n and n not in seen:
|
|
seen.add(n)
|
|
result.append(n)
|
|
return result
|
|
|
|
|
|
def extract_rules(state: BZOExtractionState) -> BZOExtractionState:
|
|
"""Detect rule candidates and parse values. Associates each rule with zones from its source article."""
|
|
try:
|
|
candidates = []
|
|
for article_dict in state["articles"]:
|
|
text = article_dict.get("text", "")
|
|
page_start = article_dict.get("page_start", 0)
|
|
# Zones mentioned in THIS article - rules from this article apply to these zones
|
|
article_zones = _zones_in_text(text)
|
|
for rule_type, rule_config in RULE_TAXONOMY.items():
|
|
for pattern in rule_config.get("patterns", []):
|
|
for match in re.finditer(pattern, text, re.IGNORECASE):
|
|
start, end = max(0, match.start() - 100), min(len(text), match.end() + 100)
|
|
context = text[start:end]
|
|
condition_text = None
|
|
for cond_pat in [r'(?:nördlich|südlich|östlich|westlich|oberhalb|unterhalb)\s+[^,\.]+', r'(?:für|bei|in)\s+[^,\.]+']:
|
|
cm = re.search(cond_pat, context, re.IGNORECASE)
|
|
if cm:
|
|
condition_text = cm.group(0)
|
|
break
|
|
candidates.append({
|
|
"rule_type": rule_type, "matched_text": match.group(0), "article_text": text,
|
|
"page": page_start, "article_label": article_dict.get("article_label"),
|
|
"condition_text": condition_text, "is_table_rule": False,
|
|
"table_zones": article_zones.copy(),
|
|
})
|
|
parsed_rules = []
|
|
for candidate_dict in candidates:
|
|
rule_type = candidate_dict["rule_type"]
|
|
rule_config = RULE_TAXONOMY.get(rule_type, {})
|
|
units = rule_config.get("units", [])
|
|
value_type = rule_config.get("value_type", "numeric")
|
|
|
|
# Extract value using regex
|
|
matched_text = candidate_dict["matched_text"]
|
|
article_text = candidate_dict["article_text"]
|
|
text = matched_text + " " + article_text[article_text.find(matched_text):article_text.find(matched_text) + 200]
|
|
|
|
value_numeric = None
|
|
value_text = matched_text
|
|
unit = None
|
|
|
|
# Try to extract numeric value
|
|
if value_type in ["numeric", "integer"]:
|
|
# Pattern: "max. 4", "30 %", "min. 3.5 m"
|
|
value_patterns = [
|
|
r'(?:max|maximal|min|mindestens|höchstens)\s*\.?\s*(\d+(?:\.\d+)?)',
|
|
r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)',
|
|
r'(\d+(?:\.\d+)?)',
|
|
]
|
|
|
|
for pattern in value_patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
try:
|
|
value_numeric = float(match.group(1))
|
|
if value_type == "integer":
|
|
value_numeric = int(value_numeric)
|
|
|
|
# Check for unit
|
|
unit_match = re.search(r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)', text, re.IGNORECASE)
|
|
if unit_match:
|
|
unit = unit_match.group(2).lower()
|
|
if unit in ["meter", "metern"]:
|
|
unit = "m"
|
|
elif unit == "prozent":
|
|
unit = "%"
|
|
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
# Calculate confidence
|
|
confidence = 0.5
|
|
if value_numeric is not None:
|
|
confidence = 0.8
|
|
if unit:
|
|
confidence = 0.9
|
|
|
|
# Zone association from source article (zones mentioned in that article)
|
|
article_zones = candidate_dict.get("table_zones", [])
|
|
zone_raw = article_zones[0] if article_zones else None
|
|
rule_scope = "zone" if zone_raw else "general"
|
|
|
|
parsed_rule = {
|
|
"rule_type": rule_type,
|
|
"value_numeric": value_numeric,
|
|
"value_text": value_text,
|
|
"unit": unit,
|
|
"condition_text": candidate_dict.get("condition_text"),
|
|
"is_table_rule": candidate_dict.get("is_table_rule", False),
|
|
"table_zones": article_zones,
|
|
"page": candidate_dict["page"],
|
|
"article_label": candidate_dict.get("article_label"),
|
|
"text_snippet": value_text,
|
|
"zone_raw": zone_raw,
|
|
"rule_scope": rule_scope,
|
|
"confidence": confidence
|
|
}
|
|
parsed_rules.append(parsed_rule)
|
|
state["parsed_rules"] = parsed_rules
|
|
return state
|
|
except Exception as e:
|
|
logger.error(f"Error in extract_rules: {e}", exc_info=True)
|
|
state["errors"] = state.get("errors", []) + [f"Extract rules error: {str(e)}"]
|
|
return state
|
|
|
|
|
|
# ===== Wohnzone Parameter Extraction =====
|
|
|
|
# Canonical order for BZO parameters (Fakten)
|
|
BZO_PARAM_ORDER = [
|
|
"vollgeschosse", "vollgeschoss",
|
|
"anrechenbares untergeschoss", "untergeschoss",
|
|
"anrechenbares dachgeschoss", "dachgeschoss", "attikageschoss",
|
|
"ausnützungsziffer", "ausnutzungsziffer", "az",
|
|
"überbauungsziffer",
|
|
"gebäudehöhe", "fassadenhöhen",
|
|
"grundabstand", "grenzabstand",
|
|
"gebäudelänge",
|
|
"mehrlängen", "höchstmass",
|
|
"baumassenziffer", "grünflächenziffer", "wohnflächenanteil", "gebäudebreite",
|
|
]
|
|
|
|
RULE_TYPE_TO_PARAM: Dict[str, str] = {
|
|
"max_building_height": "Gebäudehöhe max.",
|
|
"max_floors": "Vollgeschosse max.",
|
|
"max_attachable_attics": "anrechenbares Dachgeschoss max.",
|
|
"max_attachable_basement": "anrechenbares Untergeschoss max.",
|
|
"density": "Ausnützungsziffer",
|
|
"building_coverage": "Überbauungsziffer",
|
|
"building_mass_index": "Baumassenziffer (BMZ)",
|
|
"green_space_index": "Grünflächenziffer (GFZ)",
|
|
"boundary_distance": "Grundabstand min.",
|
|
"boundary_distance_length_surcharge": "Mehrlängen-zuschlag (MLZ)",
|
|
"boundary_distance_max": "Höchstmass Grenzabstand max.",
|
|
"building_length": "Gebäudelänge max.",
|
|
"building_width": "Gebäudebreite max.",
|
|
"residential_area_share": "Wohnflächenanteil",
|
|
}
|
|
|
|
RULE_TYPE_TO_DEFAULT_UNIT: Dict[str, str] = {
|
|
"max_building_height": "m",
|
|
"max_floors": "Stk.",
|
|
"max_attachable_attics": "Stk.",
|
|
"max_attachable_basement": "Stk.",
|
|
"density": "%",
|
|
"building_coverage": "%",
|
|
"building_mass_index": "",
|
|
"green_space_index": "%",
|
|
"boundary_distance": "m",
|
|
"boundary_distance_length_surcharge": "",
|
|
"boundary_distance_max": "m",
|
|
"building_length": "m",
|
|
"building_width": "m",
|
|
"residential_area_share": "%",
|
|
}
|
|
|
|
_ARTIKEL_KEYWORDS = [
|
|
r"herabsetzung", r"grenzabstand", r"nutzweise", r"wohnanteil",
|
|
r"besondere\s+gebäude", r"überbauungsziffer", r"sonderregel",
|
|
r"ausnahmen", r"abweichungen", r"erleichterungen",
|
|
r"mischung", r"gewerbe", r"dienstleistung",
|
|
r"kantonale", r"abstandsvorschriften",
|
|
r"vollgeschoss", r"reduziert", r"mindestmass",
|
|
r"störend", r"nicht\s+störend", r"mässig\s+störend",
|
|
]
|
|
|
|
# Artikel that are parameter tables - EXCLUDE from Weiterführende Bestimmungen
|
|
_ZUSATZ_EXCLUDE_TITLES = ("zonen", "grundmasse", "mehrlängenzuschlag", "mehrlaengenzuschlag")
|
|
|
|
# Artikel that are substantive provisions - INCLUDE in Weiterführende Bestimmungen
|
|
_ZUSATZ_INCLUDE_TITLES = (
|
|
"herabsetzung", "nutzweise", "besondere", "besonderes",
|
|
"ausnahmen", "abweichungen", "erleichterungen", "sonderregel",
|
|
"wohnanteil", "nutzungsart", "abstandsvorschriften",
|
|
"mischung", "gewerbe", "dienstleistung",
|
|
)
|
|
|
|
|
|
def _format_article_text_readable(text: str, article_label: str = "", article_title: str = "") -> str:
|
|
"""Format raw PDF-extracted text for readable display."""
|
|
if not text or not text.strip():
|
|
return ""
|
|
# Strip redundant article header at start (e.g. "Art. 16 Nutzweise" when already in summary)
|
|
if article_label or article_title:
|
|
prefix = f"{article_label} {article_title}".strip()
|
|
if prefix:
|
|
pat = re.escape(prefix)
|
|
text = re.sub(rf"^{pat}\s*", "", text.strip(), flags=re.I).lstrip()
|
|
lines = []
|
|
for line in text.split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
lines.append(line)
|
|
if not lines:
|
|
return ""
|
|
# Join hyphenated word breaks (e.g. "Gewerbe-\nund" -> "Gewerbe und")
|
|
merged = []
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
while line.rstrip().endswith("-") and i + 1 < len(lines):
|
|
line = line.rstrip()[:-1] + lines[i + 1].strip()
|
|
i += 1
|
|
if re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines):
|
|
next_line = lines[i + 1]
|
|
if not re.match(r"^Art\.\s", next_line) and len(next_line) > 3:
|
|
line = line + " " + next_line.strip()
|
|
i += 1
|
|
elif re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines) and re.match(r"^Art\.\s", lines[i + 1]):
|
|
i += 1
|
|
continue
|
|
merged.append(line)
|
|
i += 1
|
|
combined = " ".join(merged)
|
|
# Fix run-together paragraph numbers: "1In" -> "1. In", "2Ist" -> "2. Ist"
|
|
combined = re.sub(r"(\d)([A-ZÄÖÜ])", r"\1. \2", combined)
|
|
# Also fix "a)Something" -> "a) Something" for subparagraphs
|
|
combined = re.sub(r"([a-z]\))([A-ZÄÖÜ])", r"\1 \2", combined)
|
|
# Split into paragraphs: numbered (1. ..., 2. ...) or lettered (a) ..., b) ...)
|
|
parts = re.split(r"(?=\d+\.\s+[A-ZÄÖÜa-zäöü])|(?=[a-z]\)\s+[A-ZÄÖÜa-zäöü])", combined)
|
|
paragraphs = []
|
|
for p in parts:
|
|
p = p.strip()
|
|
if not p or len(p) < 3:
|
|
continue
|
|
paragraphs.append(p)
|
|
return "\n\n".join(paragraphs)
|
|
|
|
|
|
def _is_zusatzinfo_article(title: str) -> bool:
|
|
"""True if article should appear in Weiterführende Bestimmungen (provisions, not param tables)."""
|
|
t = (title or "").lower().strip()
|
|
for exc in _ZUSATZ_EXCLUDE_TITLES:
|
|
if exc in t:
|
|
return False
|
|
for inc in _ZUSATZ_INCLUDE_TITLES:
|
|
if inc in t:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _bzo_build_source(page: Optional[int], article: Optional[str]) -> str:
|
|
"""Build source string: Art. X, S. Y"""
|
|
parts = []
|
|
if article:
|
|
parts.append(str(article))
|
|
if page is not None and page > 0:
|
|
parts.append(f"S. {page}")
|
|
return ", ".join(parts) if parts else ""
|
|
|
|
|
|
def _bzo_zone_matches_table(bauzone: str, zone_col: str) -> bool:
|
|
"""Check if table column zone matches target bauzone."""
|
|
b = (bauzone or "").upper().strip()
|
|
z = (zone_col or "").upper().strip()
|
|
if not b or not z:
|
|
return False
|
|
return b in z or (len(z) >= 2 and z in b)
|
|
|
|
|
|
def _bzo_article_mentions_bauzone(article_text: str, bauzone: str) -> bool:
|
|
"""Check if article text mentions the bauzone or applies to it."""
|
|
if not bauzone or not article_text:
|
|
return False
|
|
b = bauzone.upper().strip()
|
|
t = article_text.upper()
|
|
if b in t:
|
|
return True
|
|
if len(b) >= 2 and b[0] in "WZIK" and re.search(rf"\b{b[0]}\s*\d+", t):
|
|
base = re.sub(r"\s+", "", b.split("/")[0].rstrip("G"))
|
|
if base in t or re.search(rf"\b{base}\b", t):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _bzo_get_params_from_tables(
|
|
zone_parameter_tables: List[Dict[str, Any]],
|
|
bauzone: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Extract parameter values for a Bauzone from zone-parameter tables."""
|
|
result = []
|
|
seen = set()
|
|
for table in zone_parameter_tables:
|
|
zones = table.get("zones", [])
|
|
if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in zones):
|
|
continue
|
|
page = table.get("page")
|
|
art = table.get("article")
|
|
for param in table.get("parameters", []):
|
|
values_by_zone = param.get("values_by_zone", {})
|
|
for zone, values in values_by_zone.items():
|
|
if not _bzo_zone_matches_table(bauzone, str(zone)):
|
|
continue
|
|
if not isinstance(values, list) or len(values) == 0:
|
|
continue
|
|
val_entry = values[0]
|
|
value = val_entry.get("value", "")
|
|
unit = val_entry.get("unit") or ""
|
|
param_name = param.get("parameter", "")
|
|
key = f"{param_name}|{value}|{unit}"
|
|
if key not in seen:
|
|
seen.add(key)
|
|
source = _bzo_build_source(page, param.get("article") or art)
|
|
result.append({
|
|
"parameter": param_name,
|
|
"value": str(value),
|
|
"unit": str(unit).strip() if unit else "",
|
|
"source": source or "Tabelle im Dokument",
|
|
"rule_type": None,
|
|
})
|
|
return result
|
|
|
|
|
|
def _bzo_filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
|
|
"""Filter rules by Bauzone code."""
|
|
bauzone_upper = (bauzone or "").upper()
|
|
out = []
|
|
for r in rules:
|
|
if bauzone_upper in (r.get("zone_raw") or "").upper():
|
|
out.append(r)
|
|
continue
|
|
for tz in (r.get("table_zones") or []):
|
|
if bauzone_upper in str(tz).upper():
|
|
out.append(r)
|
|
break
|
|
else:
|
|
if bauzone_upper in (r.get("text_snippet") or "").upper():
|
|
out.append(r)
|
|
return out
|
|
|
|
|
|
def _bzo_get_params_from_rules(rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Convert parsed rules to {parameter, value, unit, source, rule_type} format."""
|
|
result = []
|
|
seen = set()
|
|
for r in rules:
|
|
rule_type = r.get("rule_type", "")
|
|
param_name = RULE_TYPE_TO_PARAM.get(rule_type) or rule_type.replace("_", " ").title()
|
|
value_numeric = r.get("value_numeric")
|
|
value_text = r.get("value_text", "")
|
|
unit = r.get("unit") or ""
|
|
if value_numeric is not None:
|
|
val_str = str(int(value_numeric)) if isinstance(value_numeric, float) and value_numeric == int(value_numeric) else str(value_numeric)
|
|
else:
|
|
val_str = str(value_text).strip() if value_text else ""
|
|
if not val_str:
|
|
continue
|
|
val_lower = val_str.lower()
|
|
if val_lower in ("gebäudelänge", "gebäudebreite", "mehrlängenzuschlag", "mehrlängen", "grenzabstand", "fassadenhöhe"):
|
|
continue
|
|
unit_str = str(unit).strip() if unit else (RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, ""))
|
|
page = r.get("page")
|
|
article = r.get("article_label")
|
|
source = _bzo_build_source(page, article) or "Artikeltxt"
|
|
key = f"{param_name}|{val_str}|{unit_str}"
|
|
if key not in seen:
|
|
seen.add(key)
|
|
result.append({
|
|
"parameter": param_name,
|
|
"value": val_str,
|
|
"unit": unit_str,
|
|
"source": source,
|
|
"rule_type": rule_type,
|
|
})
|
|
return result
|
|
|
|
|
|
def _bzo_param_to_rule_type(param_name: str) -> Optional[str]:
|
|
"""Map parameter display name to rule_type."""
|
|
p = (param_name or "").lower()
|
|
if "vollgeschoss" in p:
|
|
return "max_floors"
|
|
if "dachgeschoss" in p or "attika" in p:
|
|
return "max_attachable_attics"
|
|
if "untergeschoss" in p:
|
|
return "max_attachable_basement"
|
|
if "ausnützungsziffer" in p or "ausnutzungsziffer" in p or " az " in p:
|
|
return "density"
|
|
if "überbauungsziffer" in p or " uz " in p:
|
|
return "building_coverage"
|
|
if "baumassenziffer" in p or "bmz" in p:
|
|
return "building_mass_index"
|
|
if "grünflächen" in p or "gfz" in p:
|
|
return "green_space_index"
|
|
if "grenzabstand" in p or "grundabstand" in p:
|
|
return "boundary_distance"
|
|
if "mehrlängen" in p or "mlz" in p:
|
|
return "boundary_distance_length_surcharge"
|
|
if "höchstmass" in p:
|
|
return "boundary_distance_max"
|
|
if "gebäudelänge" in p:
|
|
return "building_length"
|
|
if "gebäudebreite" in p:
|
|
return "building_width"
|
|
if "fassadenhöhe" in p or "gebäudehöhe" in p:
|
|
return "max_building_height"
|
|
if "wohnflächenanteil" in p or "wohnanteil" in p:
|
|
return "residential_area_share"
|
|
return None
|
|
|
|
|
|
def _bzo_merge_rules(
|
|
from_tables: List[Dict[str, Any]],
|
|
from_rules: List[Dict[str, Any]],
|
|
) -> List[Dict[str, Any]]:
|
|
"""Merge table params and rule params. Tables take precedence."""
|
|
by_param_lower: Dict[str, Dict[str, Any]] = {}
|
|
for r in from_tables:
|
|
p = (r.get("parameter") or "").lower()
|
|
if p and p not in by_param_lower:
|
|
rr = r.copy()
|
|
if not rr.get("rule_type"):
|
|
rr["rule_type"] = _bzo_param_to_rule_type(rr.get("parameter", ""))
|
|
by_param_lower[p] = rr
|
|
for r in from_rules:
|
|
p = (r.get("parameter") or "").lower()
|
|
if p and p not in by_param_lower:
|
|
by_param_lower[p] = r.copy()
|
|
return list(by_param_lower.values())
|
|
|
|
|
|
def _bzo_param_sort_key(param_name: str) -> int:
|
|
"""Order parameters by BZO_PARAM_ORDER."""
|
|
p = (param_name or "").lower()
|
|
for i, kw in enumerate(BZO_PARAM_ORDER):
|
|
if kw in p:
|
|
return i
|
|
return 99
|
|
|
|
|
|
def _bzo_extract_zusatzinformationen(
|
|
articles: List[Dict[str, Any]],
|
|
bauzone: str = "",
|
|
zone_parameter_tables: Optional[List[Dict[str, Any]]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Extract article excerpts relevant to the bauzone."""
|
|
result = []
|
|
seen = set()
|
|
patterns = [re.compile(kw, re.IGNORECASE) for kw in _ARTIKEL_KEYWORDS]
|
|
table_articles = set()
|
|
if zone_parameter_tables and bauzone:
|
|
for t in zone_parameter_tables:
|
|
if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in t.get("zones", [])):
|
|
continue
|
|
table_articles.add(t.get("article") or "")
|
|
for art in articles:
|
|
label = art.get("article_label") or ""
|
|
title = (art.get("article_title") or "").strip()
|
|
text = (art.get("text") or "").strip()
|
|
page = art.get("page_start") or art.get("page_end") or 0
|
|
if not label or not text:
|
|
continue
|
|
key = f"{label}|{page}"
|
|
if key in seen:
|
|
continue
|
|
combined = f"{title} {text}"
|
|
if not any(p.search(combined) for p in patterns):
|
|
continue
|
|
if bauzone:
|
|
if not _bzo_article_mentions_bauzone(combined, bauzone) and label not in table_articles:
|
|
continue
|
|
seen.add(key)
|
|
source = _bzo_build_source(page, label)
|
|
result.append({
|
|
"article_label": label,
|
|
"article_title": title,
|
|
"text": text[:3500].strip(),
|
|
"page": page,
|
|
"source": source or "BZO-Dokument",
|
|
})
|
|
return sorted(result, key=lambda x: (x.get("page", 0), x.get("article_label", "")))
|
|
|
|
|
|
def extract_wohnzone_params(
|
|
extracted_content: Dict[str, Any],
|
|
bauzone: str,
|
|
relevant_rules: Optional[List[Dict[str, Any]]] = None,
|
|
total_area_m2: Optional[float] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Extract BZO parameters for a Wohnzone from extracted content.
|
|
Returns ordered list of fakten (with sources) and zusatzinformationen.
|
|
"""
|
|
articles = extracted_content.get("articles", [])
|
|
zone_parameter_tables = extracted_content.get("zone_parameter_tables", [])
|
|
all_rules = extracted_content.get("rules", [])
|
|
rules_to_use = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone(all_rules, bauzone)
|
|
from_tables = _bzo_get_params_from_tables(zone_parameter_tables, bauzone)
|
|
from_rules = _bzo_get_params_from_rules(rules_to_use)
|
|
bauzone_rules = _bzo_merge_rules(from_tables, from_rules)
|
|
fakten = []
|
|
if bauzone:
|
|
fakten.append({"item": "Auswertung für Bauzone", "value": bauzone, "source": ""})
|
|
if total_area_m2 is not None and total_area_m2 > 0:
|
|
fakten.append({
|
|
"item": "Grundstücksfläche",
|
|
"value": f"{total_area_m2:,.0f} m²".replace(",", "'"),
|
|
"source": "Parzellendaten",
|
|
})
|
|
for r in sorted(bauzone_rules, key=lambda x: _bzo_param_sort_key(x.get("parameter", ""))):
|
|
param = r.get("parameter", "").strip()
|
|
val = r.get("value", "")
|
|
unit = (r.get("unit") or "").strip()
|
|
rule_type = r.get("rule_type") or _bzo_param_to_rule_type(param)
|
|
if not unit and rule_type:
|
|
unit = RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, "")
|
|
value_str = f"{val}{(' ' + unit) if unit else ''}".strip()
|
|
if param and value_str:
|
|
fakten.append({
|
|
"item": param,
|
|
"value": value_str,
|
|
"source": r.get("source") or "BZO-Dokument",
|
|
})
|
|
zusatzinformationen = _bzo_extract_zusatzinformationen(
|
|
articles, bauzone, zone_parameter_tables
|
|
)
|
|
return {
|
|
"bauzone": bauzone,
|
|
"fakten": fakten,
|
|
"zusatzinformationen": zusatzinformationen,
|
|
}
|
|
|
|
|
|
# ===== LangGraph: LLM-based BZO Params Extraction =====
|
|
|
|
def _build_bauzone_context_for_llm(state: BZOParamsExtractionState) -> str:
|
|
"""Build context string for LLM from extracted BZO content."""
|
|
bauzone = (state.get("bauzone") or "").upper()
|
|
zone_parameter_tables = state.get("zone_parameter_tables", [])
|
|
relevant_articles = state.get("relevant_articles", [])
|
|
relevant_rules = state.get("relevant_rules", [])
|
|
total_area_m2 = state.get("total_area_m2")
|
|
parts = []
|
|
|
|
if total_area_m2 is not None and total_area_m2 > 0:
|
|
parts.append(f"Grundstücksfläche der Parzelle: {total_area_m2:,.0f} m²".replace(",", "'"))
|
|
parts.append("")
|
|
|
|
# Full article texts - LLM can parse tables like Art. 14 (zones in rows, values in columns)
|
|
parts.append("=== ARTIKEL MIT VOLLEM TEXT (Tabellen genau lesen, richtige Spalte/Zeile für Bauzone wählen) ===")
|
|
for art in relevant_articles:
|
|
label = art.get("article_label", "")
|
|
title = (art.get("article_title") or "").strip()
|
|
text = art.get("text", "")
|
|
page = art.get("page_start") or art.get("page_end", 0)
|
|
parts.append(f"\n{label}: {title}")
|
|
parts.append(f"Seite: {page}")
|
|
parts.append(f"Inhalt:\n{text}")
|
|
parts.append("")
|
|
|
|
# Zone-parameter tables (pre-parsed)
|
|
if zone_parameter_tables:
|
|
parts.append("=== VORSTRUKTURIERTE TABELLENWERTE FÜR BAUZONE ===")
|
|
for table in zone_parameter_tables:
|
|
page = table.get("page", 0)
|
|
art = table.get("article", "")
|
|
parts.append(f"\n{art} (S. {page}):")
|
|
for param in table.get("parameters", []):
|
|
pname = param.get("parameter", "")
|
|
for zone, values in (param.get("values_by_zone") or {}).items():
|
|
if bauzone in (zone or "").upper():
|
|
if isinstance(values, list) and values:
|
|
v = values[0].get("value", "")
|
|
u = values[0].get("unit") or ""
|
|
parts.append(f" {pname} [{zone}]: {v} {u}".strip())
|
|
parts.append("")
|
|
|
|
# Rules from text
|
|
if relevant_rules:
|
|
parts.append("=== REGELN AUS ARTIKELTEXT ===")
|
|
for r in relevant_rules[:20]:
|
|
rt = r.get("rule_type", "")
|
|
vn = r.get("value_numeric")
|
|
vt = r.get("value_text", "")
|
|
u = r.get("unit", "")
|
|
page = r.get("page", 0)
|
|
art = r.get("article_label", "")
|
|
val = str(int(vn)) if vn is not None and isinstance(vn, float) and vn == int(vn) else (str(vn) if vn is not None else vt)
|
|
parts.append(f" {rt}: {val} {u} ({art}, S. {page})".strip())
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _parse_llm_bullet_list(text: str) -> List[Dict[str, str]]:
|
|
"""Parse LLM response into fakten list. Expects lines like '- Param: value (Art. X, S. Y)'."""
|
|
fakten = []
|
|
for line in (text or "").strip().split("\n"):
|
|
line = line.strip()
|
|
if not line or not line.startswith("-"):
|
|
continue
|
|
line = line.lstrip("- ").strip()
|
|
# Match "Param: value (source)" or "Param: value"
|
|
match = re.match(r"^(.+?):\s*(.+?)(?:\s*\(([^)]+)\))?\s*$", line)
|
|
if match:
|
|
item = match.group(1).strip()
|
|
value = match.group(2).strip()
|
|
source = (match.group(3) or "").strip()
|
|
if item and value:
|
|
fakten.append({"item": item, "value": value, "source": source})
|
|
elif ":" in line:
|
|
idx = line.find(":")
|
|
fakten.append({
|
|
"item": line[:idx].strip(),
|
|
"value": line[idx + 1 :].strip(),
|
|
"source": "",
|
|
})
|
|
return fakten
|
|
|
|
|
|
async def _llm_filter_relevant_provisions(
|
|
ai_service: Any,
|
|
bauzone: str,
|
|
fakten: List[Dict[str, str]],
|
|
provision_articles: List[Dict[str, Any]],
|
|
) -> Optional[set]:
|
|
"""
|
|
Use LLM to determine which provision articles are relevant for a parcel in this bauzone.
|
|
Returns set of article labels (e.g. {"Art. 15", "Art. 16"}) or None to include all on error.
|
|
"""
|
|
if not provision_articles:
|
|
return set()
|
|
fakten_str = "\n".join(
|
|
f"- {f.get('item', '')}: {f.get('value', '')}" for f in fakten
|
|
if f.get("item") and "Auswertung" not in (f.get("item") or "")
|
|
)
|
|
articles_str = "\n".join(
|
|
f"- {a.get('article_label', '')}: {a.get('article_title', '')}"
|
|
for a in provision_articles
|
|
if a.get("article_label")
|
|
)
|
|
prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO).
|
|
|
|
Eine Parzelle liegt in der Bauzone {bauzone}. Folgende BZO-Parameter gelten für diese Zone:
|
|
{fakten_str}
|
|
|
|
Folgende Bestimmungen (Weiterführende Artikel) könnten zutreffen:
|
|
{articles_str}
|
|
|
|
AUFGABE: Welche dieser Artikel sind für eine Parzelle in Bauzone {bauzone} mit diesen Parametern TATSÄCHLICH RELEVANT?
|
|
- Nur Artikel angeben, die auf diese Zone/Parameter Bezug nehmen oder Bedingungen nennen, die hier greifen
|
|
- z.B. Art. 15 Herabsetzung: relevant wenn Vollgeschosse und Grenzabstand vorhanden (Reduktion bei weggelassenen Geschossen)
|
|
- z.B. Art. 16 Nutzweise: relevant für Wohnzonen mit Wohnanteil
|
|
- z.B. Art. 40 Wohnanteil: nur wenn dieser Artikel die Zone {bauzone} erwähnt oder für Wohnzonen gilt
|
|
- Artikel die andere Zonen betreffen (z.B. nur Z5, I) und {bauzone} ausschliessen: NICHT aufnehmen
|
|
|
|
Antwort NUR mit den relevanten Artikelnummern, eine pro Zeile (z.B. "Art. 15", "Art. 16"). Keine anderen Zeichen."""
|
|
|
|
try:
|
|
response = await ai_service.callAiPlanning(
|
|
prompt=prompt,
|
|
debugType="bzo_relevant_provisions",
|
|
)
|
|
labels = set()
|
|
for line in (response or "").strip().split("\n"):
|
|
m = re.search(r"(Art\.\s*\d+[a-z]?)", line.strip(), re.I)
|
|
if m:
|
|
lbl = re.sub(r"\s+", " ", m.group(1).strip())
|
|
labels.add(lbl)
|
|
return labels if labels else None # None = include all (fallback on error or empty)
|
|
except Exception as e:
|
|
logger.warning(f"LLM provision filter failed: {e}")
|
|
return None
|
|
|
|
|
|
async def llm_extract_bauzone_params_node(state: BZOParamsExtractionState) -> BZOParamsExtractionState:
|
|
"""LangGraph node: use LLM to extract BZO parameters for Bauzone as bullet list."""
|
|
bauzone = state.get("bauzone", "")
|
|
gemeinde = state.get("gemeinde", "")
|
|
ai_service = state.get("ai_service")
|
|
errors = list(state.get("errors", []))
|
|
|
|
if not ai_service:
|
|
errors.append("AI service not provided")
|
|
return {**state, "fakten": [], "bauzone_params_list": [], "errors": errors}
|
|
|
|
context = _build_bauzone_context_for_llm(state)
|
|
|
|
prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO). Extrahiere alle relevanten BZO-Parameter für die Bauzone {bauzone} in {gemeinde}.
|
|
|
|
BZO-INHALT:
|
|
{context}
|
|
|
|
AUFGABE: Erstelle eine geordnete Bullet-Liste ALLER zutreffenden Parameter für Bauzone {bauzone}.
|
|
Priorität: Vollgeschosse, anrechenbares Untergeschoss, anrechenbares Dachgeschoss, Ausnützungsziffer, Überbauungsziffer, Gebäudehöhe, Grundabstand/Grenzabstand, Gebäudelänge, Mehrlängenzuschlag, Höchstmass, sowie alle anderen Bestimmungen die für diese Zone gelten.
|
|
|
|
WICHTIG:
|
|
- Bei Tabellen: die richtige Spalte/Zeile für {bauzone} verwenden (z.B. Art. 14 Mehrlängenzuschlag: W5 = 13 m)
|
|
- Jede Zeile: "- Parametername: Wert (Art. X, S. Y)"
|
|
- Nur tatsächlich im Dokument vorhandene Werte angeben
|
|
- Einheit (m, %, Stk.) bei Zahlen mit angeben
|
|
- Keine leeren Zeilen oder Kommentare - nur die Liste
|
|
|
|
Antwort NUR mit der Bullet-Liste, sonst nichts:"""
|
|
|
|
try:
|
|
ai_response = await ai_service.callAiPlanning(
|
|
prompt=prompt,
|
|
debugType="bzo_params_extraction",
|
|
)
|
|
response_text = (ai_response or "").strip()
|
|
# Parse into fakten
|
|
fakten = _parse_llm_bullet_list(response_text)
|
|
# Build bauzone_params_list (raw "- ..." strings)
|
|
bauzone_params_list = [f"- {f['item']}: {f['value']}" + (f" ({f['source']})" if f.get("source") else "") for f in fakten]
|
|
# Add header items if missing
|
|
if bauzone and not any("Auswertung" in (f.get("item") or "") for f in fakten):
|
|
fakten.insert(0, {"item": "Auswertung für Bauzone", "value": bauzone, "source": ""})
|
|
total_area_m2 = state.get("total_area_m2")
|
|
if total_area_m2 is not None and total_area_m2 > 0 and not any("Grundstücksfläche" in (f.get("item") or "") for f in fakten):
|
|
fakten.insert(1, {
|
|
"item": "Grundstücksfläche",
|
|
"value": f"{total_area_m2:,.0f} m²".replace(",", "'"),
|
|
"source": "Parzellendaten",
|
|
})
|
|
# Zusatzinformationen: only provisions RELEVANT for this parcel in this bauzone
|
|
all_articles = state.get("extracted_content", {}).get("articles", []) or state.get("relevant_articles", [])
|
|
provision_articles = [a for a in all_articles if _is_zusatzinfo_article((a.get("article_title") or "").strip())]
|
|
relevant_labels = await _llm_filter_relevant_provisions(
|
|
ai_service=ai_service,
|
|
bauzone=bauzone,
|
|
fakten=fakten,
|
|
provision_articles=provision_articles,
|
|
)
|
|
def _norm_label(s: str) -> str:
|
|
return re.sub(r"\s+", " ", (s or "").strip())
|
|
|
|
zusatzinformationen = []
|
|
for art in provision_articles:
|
|
label = art.get("article_label", "")
|
|
title = (art.get("article_title") or "").strip()
|
|
norm = _norm_label(label)
|
|
if relevant_labels is not None and norm and norm not in relevant_labels:
|
|
continue
|
|
raw_text = (art.get("text") or "")[:4000]
|
|
text = _format_article_text_readable(
|
|
raw_text,
|
|
article_label=label,
|
|
article_title=title,
|
|
)
|
|
if not text:
|
|
continue
|
|
page = art.get("page_start") or art.get("page_end", 0)
|
|
source = f"{label}, S. {page}" if label else f"S. {page}"
|
|
zusatzinformationen.append({
|
|
"article_label": label,
|
|
"article_title": title,
|
|
"text": text,
|
|
"source": source,
|
|
})
|
|
return {
|
|
**state,
|
|
"fakten": fakten,
|
|
"bauzone_params_list": bauzone_params_list,
|
|
"zusatzinformationen": zusatzinformationen,
|
|
"errors": errors,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"LLM BZO params extraction failed: {e}", exc_info=True)
|
|
errors.append(str(e))
|
|
return {
|
|
**state,
|
|
"fakten": [],
|
|
"bauzone_params_list": [],
|
|
"zusatzinformationen": [],
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
def create_bzo_params_extraction_graph():
|
|
"""Create LangGraph for LLM-based BZO params extraction."""
|
|
workflow = StateGraph(BZOParamsExtractionState)
|
|
workflow.add_node("llm_extract", llm_extract_bauzone_params_node)
|
|
workflow.set_entry_point("llm_extract")
|
|
workflow.add_edge("llm_extract", END)
|
|
return workflow.compile()
|
|
|
|
|
|
def _filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
|
|
"""Filter articles that mention the Bauzone."""
|
|
bauzone_upper = (bauzone or "").upper()
|
|
return [
|
|
a for a in articles
|
|
if bauzone_upper in (a.get("text") or "").upper() or bauzone_upper in (a.get("zone_raw") or "").upper()
|
|
]
|
|
|
|
|
|
def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
|
|
"""Filter zone-parameter tables to those containing the Bauzone."""
|
|
bauzone_upper = (bauzone or "").upper()
|
|
relevant = []
|
|
for table in tables:
|
|
zones = table.get("zones", [])
|
|
matching = [z for z in zones if bauzone_upper in str(z).upper()]
|
|
if matching:
|
|
filtered = {
|
|
"page": table.get("page"),
|
|
"article": table.get("article"),
|
|
"zones": matching,
|
|
"parameters": [
|
|
{"parameter": p.get("parameter"), "values_by_zone": {
|
|
z: v for z, v in (p.get("values_by_zone") or {}).items()
|
|
if bauzone_upper in str(z).upper()
|
|
}}
|
|
for p in table.get("parameters", [])
|
|
if any(bauzone_upper in str(z).upper() for z in (p.get("values_by_zone") or {}))
|
|
],
|
|
}
|
|
filtered["parameters"] = [x for x in filtered["parameters"] if x["values_by_zone"]]
|
|
if filtered["parameters"]:
|
|
relevant.append(filtered)
|
|
return relevant
|
|
|
|
|
|
async def run_bzo_params_extraction(
|
|
extracted_content: Dict[str, Any],
|
|
bauzone: str,
|
|
ai_service: Any,
|
|
gemeinde: str,
|
|
relevant_rules: Optional[List[Dict[str, Any]]] = None,
|
|
relevant_articles: Optional[List[Dict[str, Any]]] = None,
|
|
total_area_m2: Optional[float] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Run LangGraph workflow to extract BZO parameters for a Bauzone via LLM.
|
|
Returns fakten (item/value/source), bauzone_params_list (bullet strings), zusatzinformationen.
|
|
"""
|
|
rules = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone(
|
|
extracted_content.get("rules", []), bauzone
|
|
)
|
|
articles = relevant_articles if relevant_articles is not None else _filter_articles_by_bauzone(
|
|
extracted_content.get("articles", []), bauzone
|
|
)
|
|
tables = _filter_tables_by_bauzone(
|
|
extracted_content.get("zone_parameter_tables", []),
|
|
bauzone
|
|
)
|
|
|
|
state: BZOParamsExtractionState = {
|
|
"extracted_content": extracted_content,
|
|
"bauzone": bauzone,
|
|
"total_area_m2": total_area_m2,
|
|
"relevant_rules": rules,
|
|
"relevant_articles": articles,
|
|
"zone_parameter_tables": tables,
|
|
"ai_service": ai_service,
|
|
"gemeinde": gemeinde,
|
|
"bauzone_params_list": [],
|
|
"fakten": [],
|
|
"zusatzinformationen": [],
|
|
"errors": [],
|
|
}
|
|
|
|
graph = create_bzo_params_extraction_graph()
|
|
final_state = await graph.ainvoke(state)
|
|
|
|
return {
|
|
"bauzone": bauzone,
|
|
"fakten": final_state.get("fakten", []),
|
|
"bauzone_params_list": final_state.get("bauzone_params_list", []),
|
|
"zusatzinformationen": final_state.get("zusatzinformationen", []),
|
|
"errors": final_state.get("errors", []),
|
|
}
|
|
|
|
|
|
# ===== Graph Construction =====
|
|
|
|
def create_bzo_extraction_graph():
|
|
"""Create and compile the BZO extraction graph (simplified 4-node pipeline)."""
|
|
workflow = StateGraph(BZOExtractionState)
|
|
workflow.add_node("classify_and_assemble", classify_and_assemble)
|
|
workflow.add_node("extract_zones_and_tables", extract_zones_and_tables)
|
|
workflow.add_node("extract_rules", extract_rules)
|
|
workflow.set_entry_point("classify_and_assemble")
|
|
workflow.add_edge("classify_and_assemble", "extract_zones_and_tables")
|
|
workflow.add_edge("extract_zones_and_tables", "extract_rules")
|
|
workflow.add_edge("extract_rules", END)
|
|
return workflow.compile()
|
|
|
|
|
|
def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Run the extraction pipeline on a PDF and return structured, sorted results.
|
|
|
|
Args:
|
|
pdf_bytes: PDF file content as bytes
|
|
pdf_id: Optional identifier for the PDF (defaults to generated ID)
|
|
dokument_id: Optional dokument ID for reference
|
|
|
|
Returns:
|
|
Dictionary with extracted and sorted content:
|
|
{
|
|
"articles": [...], # Sorted by page_start, then article_label
|
|
"zones": [...], # Sorted by zone_code
|
|
"rules": [...], # Sorted by rule_type, then page
|
|
"errors": [...],
|
|
"warnings": [...]
|
|
}
|
|
"""
|
|
import uuid
|
|
|
|
if not pdf_id:
|
|
pdf_id = f"pdf_{uuid.uuid4().hex[:8]}"
|
|
|
|
# Initialize state
|
|
state: BZOExtractionState = {
|
|
"dokument_id": dokument_id,
|
|
"pdf_id": pdf_id,
|
|
"text_blocks": [],
|
|
"classified_blocks": [],
|
|
"articles": [],
|
|
"current_zones": {},
|
|
"zones": [],
|
|
"rule_candidates": [],
|
|
"parsed_rules": [],
|
|
"zone_parameter_tables": [],
|
|
"errors": [],
|
|
"warnings": []
|
|
}
|
|
|
|
# Extract PDF text first
|
|
pdf_extractor = BZOPdfExtractor()
|
|
text_blocks_objects = pdf_extractor.extract_text_blocks(pdf_bytes, state["pdf_id"])
|
|
# Convert TextBlock objects to dicts for state
|
|
state["text_blocks"] = [
|
|
{
|
|
"page": tb.page,
|
|
"text": tb.text,
|
|
"block_id": tb.block_id,
|
|
"bbox": tb.bbox
|
|
}
|
|
for tb in text_blocks_objects
|
|
]
|
|
|
|
# Create and run graph
|
|
graph = create_bzo_extraction_graph()
|
|
final_state = graph.invoke(state)
|
|
|
|
# Sort and structure results
|
|
articles = sorted(
|
|
final_state.get("articles", []),
|
|
key=lambda x: (x.get("page_start", 0), x.get("article_label", ""))
|
|
)
|
|
|
|
zones = sorted(
|
|
final_state.get("zones", []),
|
|
key=lambda x: x.get("zone_code", "")
|
|
)
|
|
|
|
rules = sorted(
|
|
final_state.get("parsed_rules", []),
|
|
key=lambda x: (x.get("rule_type", ""), x.get("page", 0))
|
|
)
|
|
|
|
zone_parameter_tables = final_state.get("zone_parameter_tables", [])
|
|
|
|
return {
|
|
"articles": articles,
|
|
"zones": zones,
|
|
"rules": rules,
|
|
"zone_parameter_tables": zone_parameter_tables,
|
|
"errors": final_state.get("errors", []),
|
|
"warnings": final_state.get("warnings", [])
|
|
}
|
|
|
|
|
|
def extract_from_documents(
|
|
document_retriever,
|
|
dokument_ids: List[str]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Extract BZO content from one or more documents.
|
|
|
|
Args:
|
|
document_retriever: BZODocumentRetriever instance
|
|
dokument_ids: List of dokument IDs to process
|
|
|
|
Returns:
|
|
Dictionary with results per document:
|
|
{
|
|
"results": [
|
|
{
|
|
"dokument_id": "...",
|
|
"articles": [...],
|
|
"zones": [...],
|
|
"rules": [...],
|
|
"errors": [...],
|
|
"warnings": [...]
|
|
},
|
|
...
|
|
],
|
|
"summary": {
|
|
"total_documents": N,
|
|
"successful": M,
|
|
"failed": K,
|
|
"total_articles": X,
|
|
"total_zones": Y,
|
|
"total_rules": Z
|
|
}
|
|
}
|
|
"""
|
|
results = []
|
|
total_articles = 0
|
|
total_zones = 0
|
|
total_rules = 0
|
|
successful = 0
|
|
failed = 0
|
|
|
|
# Retrieve documents
|
|
dokumente = document_retriever.get_documents_by_ids(dokument_ids)
|
|
|
|
for dokument in dokumente:
|
|
try:
|
|
# Retrieve PDF content
|
|
pdf_bytes = document_retriever.retrieve_pdf_content(dokument)
|
|
if not pdf_bytes:
|
|
logger.warning(f"Could not retrieve PDF for dokument {dokument.id}")
|
|
results.append({
|
|
"dokument_id": dokument.id,
|
|
"articles": [],
|
|
"zones": [],
|
|
"rules": [],
|
|
"errors": [f"Could not retrieve PDF content"],
|
|
"warnings": []
|
|
})
|
|
failed += 1
|
|
continue
|
|
|
|
# Run extraction
|
|
extraction_result = run_extraction(
|
|
pdf_bytes=pdf_bytes,
|
|
pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}",
|
|
dokument_id=dokument.id
|
|
)
|
|
|
|
# Add dokument_id to result
|
|
extraction_result["dokument_id"] = dokument.id
|
|
results.append(extraction_result)
|
|
|
|
# Update counters
|
|
total_articles += len(extraction_result.get("articles", []))
|
|
total_zones += len(extraction_result.get("zones", []))
|
|
total_rules += len(extraction_result.get("rules", []))
|
|
|
|
if extraction_result.get("errors"):
|
|
failed += 1
|
|
else:
|
|
successful += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing dokument {dokument.id}: {str(e)}", exc_info=True)
|
|
results.append({
|
|
"dokument_id": dokument.id,
|
|
"articles": [],
|
|
"zones": [],
|
|
"rules": [],
|
|
"errors": [f"Processing error: {str(e)}"],
|
|
"warnings": []
|
|
})
|
|
failed += 1
|
|
|
|
return {
|
|
"results": results,
|
|
"summary": {
|
|
"total_documents": len(dokument_ids),
|
|
"successful": successful,
|
|
"failed": failed,
|
|
"total_articles": total_articles,
|
|
"total_zones": total_zones,
|
|
"total_rules": total_rules
|
|
}
|
|
}
|