""" LangGraph-based pipeline for extracting structured content from BZO PDFs. """ import logging import re from typing import TypedDict, List, Dict, Any, Optional from dataclasses import dataclass from langgraph.graph import StateGraph, START, END from modules.features.realEstate.bzoPdfExtractor import BZOPdfExtractor, TextBlock from modules.features.realEstate.bzoRuleTaxonomy import RULE_TAXONOMY logger = logging.getLogger(__name__) # ===== BZO Params Extraction State (LangGraph with LLM) ===== class BZOParamsExtractionState(TypedDict): """State for BZO params extraction via LLM.""" extracted_content: Dict[str, Any] bauzone: str total_area_m2: Optional[float] relevant_rules: List[Dict[str, Any]] relevant_articles: List[Dict[str, Any]] zone_parameter_tables: List[Dict[str, Any]] ai_service: Any gemeinde: str # Output bauzone_params_list: List[str] fakten: List[Dict[str, str]] zusatzinformationen: List[Dict[str, Any]] errors: List[str] # ===== State Definition ===== @dataclass class ClassifiedBlock: """Classified text block.""" block: TextBlock block_type: str # "article", "heading", "table", "other" article_label: Optional[str] = None article_title: Optional[str] = None @dataclass class Article: """Assembled article.""" article_label: str article_title: Optional[str] text: str page_start: int page_end: int section_level_1: Optional[str] = None section_level_2: Optional[str] = None section_level_3: Optional[str] = None zone_raw: Optional[str] = None @dataclass class ZoneInfo: """Zone information.""" zone_code: str zone_name: str zone_category: Optional[str] = None zone_subcategory: Optional[str] = None empfindlichkeitsstufe: Optional[str] = None geschosszahl: Optional[int] = None gewerbeerleichterung: bool = False @dataclass class RuleCandidate: """Rule candidate from pattern matching.""" rule_type: str matched_text: str article_text: str page: int is_table_rule: bool = False table_zones: List[str] = None condition_text: Optional[str] = None @dataclass class ParsedRule: """Parsed rule with structured values.""" rule_type: str value_numeric: Optional[float] value_text: str unit: Optional[str] condition_text: Optional[str] is_table_rule: bool table_zones: List[str] page: int text_snippet: str zone_raw: Optional[str] = None rule_scope: str = "general" confidence: float = 0.5 class BZOExtractionState(TypedDict): """State for BZO extraction pipeline.""" # Input metadata dokument_id: Optional[str] pdf_id: str # Extracted text blocks (stored as dicts for serialization) text_blocks: List[Dict[str, Any]] # Classified blocks (stored as dicts for serialization) classified_blocks: List[Dict[str, Any]] # Assembled articles (stored as dicts for serialization) articles: List[Dict[str, Any]] # Zone tracking current_zones: Dict[str, Dict[str, Any]] zones: List[Dict[str, Any]] # Rule extraction (stored as dicts for serialization) rule_candidates: List[Dict[str, Any]] parsed_rules: List[Dict[str, Any]] # Zone-parameter tables (structured table data mapping zones to parameters) zone_parameter_tables: List[Dict[str, Any]] # Processing metadata errors: List[str] warnings: List[str] # ===== Node Implementations (Simplified 4-node pipeline) ===== def classify_and_assemble(state: BZOExtractionState) -> BZOExtractionState: """Classify text blocks and assemble into articles (merged node).""" try: classified = [] for block_dict in state["text_blocks"]: text = block_dict["text"].strip() if not text: continue block_type = "other" article_label = None article_title = None article_match = re.search(r'Art\.?\s*(\d+[a-z]?)', text, re.IGNORECASE) if article_match: block_type = "article" article_label = f"Art. {article_match.group(1)}" title_match = re.search(r'Art\.?\s*\d+[a-z]?\s+(.+?)(?:\.|$|\n)', text, re.IGNORECASE) if title_match: article_title = title_match.group(1).strip() elif re.match(r'^[A-Z]\.\s+[A-Z]', text) or re.match(r'^[IVX]+\.\s+[A-Z]', text) or re.match(r'^\d+\.\s+[A-Z]', text): block_type = "heading" elif '\t' in text or (len(text.split()) > 5 and text.count(' ') > 2): block_type = "table" classified.append({ "block": {"page": block_dict["page"], "text": block_dict["text"], "block_id": block_dict["block_id"], "bbox": block_dict.get("bbox")}, "block_type": block_type, "article_label": article_label, "article_title": article_title }) state["classified_blocks"] = classified articles = [] current_article = None current_section_1 = current_section_2 = current_section_3 = None for classified_dict in classified: block_dict = classified_dict["block"] text = block_dict["text"].strip() block_type = classified_dict["block_type"] article_label = classified_dict.get("article_label") article_title = classified_dict.get("article_title") if block_type == "heading": if re.match(r'^[A-Z]\.\s+', text): current_section_1 = text.split('.', 1)[0] + '.' current_section_2 = current_section_3 = None elif re.match(r'^[IVX]+\.\s+', text): current_section_2 = text.split('.', 1)[0] + '.' current_section_3 = None elif re.match(r'^\d+\.\s+', text): current_section_3 = text.split('.', 1)[0] + '.' if article_label: if current_article: articles.append(current_article) current_article = { "article_label": article_label, "article_title": article_title, "text": text, "page_start": block_dict["page"], "page_end": block_dict["page"], "section_level_1": current_section_1, "section_level_2": current_section_2, "section_level_3": current_section_3, "zone_raw": None } elif current_article: current_article["text"] += "\n" + text current_article["page_end"] = block_dict["page"] if current_article: articles.append(current_article) state["articles"] = articles return state except Exception as e: logger.error(f"Error in classify_and_assemble: {e}", exc_info=True) state["errors"] = state.get("errors", []) + [f"Classify/assemble error: {str(e)}"] return state def extract_zones_and_tables(state: BZOExtractionState) -> BZOExtractionState: """Detect zones and extract zone-parameter tables (merged node).""" try: # Part 1: Detect zone declarations zones = [] current_zones = {} for article_dict in state["articles"]: text = article_dict.get("text", "") article_label = article_dict.get("article_label", "") page_start = article_dict.get("page_start", 0) # Pattern: "Wohnzone W2", "Zone W3", "Gewerbezone G1" zone_patterns = [ r'(?:Wohnzone|Zone|Gewerbezone|Industriezone|Zentrumszone|Ortsbildschutzzone|Erholungszone)\s+([A-Z0-9/]+)', r'([A-Z]\d+(?:/\d+)?(?:G)?)', # W2/30, W2/30G, Z3, K3/4 ] for pattern in zone_patterns: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: zone_code = match.group(1).upper() # Parse zone code gewerbeerleichterung = zone_code.endswith('G') if gewerbeerleichterung: zone_code_base = zone_code[:-1] else: zone_code_base = zone_code # Extract geschosszahl from code (e.g., W2 -> 2, W3/50 -> 3) geschosszahl = None if '/' in zone_code_base: parts = zone_code_base.split('/') geschosszahl_match = re.search(r'(\d+)', parts[0]) if geschosszahl_match: geschosszahl = int(geschosszahl_match.group(1)) else: geschosszahl_match = re.search(r'(\d+)', zone_code_base) if geschosszahl_match: geschosszahl = int(geschosszahl_match.group(1)) # Determine zone category from context zone_category = None if 'Wohnzone' in text or zone_code.startswith('W'): zone_category = "Wohnzonen" elif 'Zentrumszone' in text or zone_code.startswith('Z'): zone_category = "Zentrumszonen" elif 'Gewerbezone' in text or zone_code.startswith('G'): zone_category = "Arbeitsplatzzonen" elif 'Industriezone' in text or zone_code.startswith('I'): zone_category = "Arbeitsplatzzonen" zone_info = ZoneInfo( zone_code=zone_code, zone_name=f"Zone {zone_code}", zone_category=zone_category, geschosszahl=geschosszahl, gewerbeerleichterung=gewerbeerleichterung ) current_zones[zone_code] = zone_info zones.append({ "zone_code": zone_code, "zone_name": zone_info.zone_name, "zone_category": zone_category, "geschosszahl": geschosszahl, "gewerbeerleichterung": gewerbeerleichterung, "source_article": article_label, "page": page_start }) state["current_zones"] = current_zones state["zones"] = zones # Part 2: Extract zone-parameter tables _extract_zone_parameter_tables_impl(state) return state except Exception as e: logger.error(f"Error in extract_zones_and_tables: {e}", exc_info=True) state["errors"] = state.get("errors", []) + [f"Zones/tables error: {str(e)}"] return state def _extract_zone_parameter_tables_impl(state: BZOExtractionState) -> None: """Extract zone-parameter tables from classified blocks. Mutates state in place.""" tables = [] table_blocks = [b for b in state.get("classified_blocks", []) if b.get("block_type") == "table"] zone_pattern = r'\b([WLIZK]\d+(?:/\d+)?(?:G\*?)?)\b' parameter_keywords = [ r'Ausnützungsziffer', r'Überbauungsziffer', r'Vollgeschosse', r'Dachgeschosse', r'Attikageschoss', r'Untergeschoss', r'Gebäudelänge', r'Grenzabstand', r'Fassadenhöhen', r'Grundabstand', r'Mehrlängen', r'Höchstmass' ] parameter_row_patterns = [ r'^[a-g]\)\s+(.+?)(?:\s+max\.|min\.|:)?', r'^(Ausnützungsziffer|Überbauungsziffer|Vollgeschosse|Dachgeschosse|Attikageschoss|Untergeschoss|Gebäudelänge|Grenzabstand|Fassadenhöhen|Grundabstand|Mehrlängen|Höchstmass|Höchstmaß)', ] subparameter_patterns = [ r'^(Grundabstand|Mehrlängen|Höchstmass|Höchstmaß|Fassadenhöhen)\s*(min\.|max\.)?', r'^(anrechenbare\s+Dachgeschosse|anrechenbares\s+Attikageschoss|anrechenbares\s+Untergeschoss)', ] numeric_pattern = r'(\d+(?:\.\d+)?)\s*(%|m|Geschoss|Geschosse|Geschosse\s+max\.?|Geschoss\s+max\.?)?' for table_block in table_blocks: block_dict = table_block.get("block", {}) text = block_dict.get("text", "") page = block_dict.get("page", 0) if not text or len(text.strip()) < 20: continue lines = text.split('\n') header_row_idx, zone_columns = None, [] for idx, line in enumerate(lines): zone_matches = re.findall(zone_pattern, line, re.IGNORECASE) if len(zone_matches) >= 3: header_row_idx, zone_columns = idx, zone_matches break if not zone_columns: has_parameters = any(re.search(kw, text, re.IGNORECASE) for kw in parameter_keywords) has_zones = len(re.findall(zone_pattern, text, re.IGNORECASE)) >= 3 if has_parameters and has_zones: zone_columns = list(dict.fromkeys(re.findall(zone_pattern, text, re.IGNORECASE))) header_row_idx = 0 if not zone_columns: continue article_context = None for block in state.get("classified_blocks", []): if block.get("block", {}).get("page") == page and block.get("article_label"): article_context = block.get("article_label") break table_data = {"page": page, "zones": zone_columns, "parameters": [], "source_text": text[:500], "article": article_context} start_idx = (header_row_idx + 1) if header_row_idx is not None else 0 current_parameter = current_subparameter = None parameter_values = subparameter_values = {} for line_idx in range(start_idx, len(lines)): line = lines[line_idx].strip() if not line: continue is_parameter_row, parameter_name = False, None for pat in parameter_row_patterns: m = re.match(pat, line, re.IGNORECASE) if m: is_parameter_row, parameter_name = True, re.sub(r'\s+max\.?\s*$', '', re.sub(r'\s+min\.?\s*$', '', m.group(1).strip(), flags=re.I), flags=re.I) break is_subparameter, subparameter_name = False, None if not is_parameter_row: for pat in subparameter_patterns: m = re.search(pat, line, re.IGNORECASE) if m: is_subparameter, subparameter_name = True, m.group(1).strip() + (f" {m.group(2).strip()}" if m.lastindex and m.lastindex >= 2 and m.group(2) else "") break target_values = subparameter_values if current_subparameter else parameter_values if is_parameter_row and parameter_name: if current_parameter and parameter_values: table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context}) current_parameter, current_subparameter, parameter_values, subparameter_values = parameter_name, None, {}, {} continue if is_subparameter and subparameter_name: if current_subparameter and subparameter_values and current_parameter: table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context}) current_subparameter, subparameter_values = subparameter_name, {} continue if current_parameter or current_subparameter: line_parts = re.split(r'\s{2,}|\t', line) line_parts = [p.strip() for p in line_parts if p.strip()] n = len(zone_columns) value_parts = [] # Column-based: extract trailing numeric/fraction parts that align with zone count for p in reversed(line_parts): if re.match(r'^\d+(?:\.\d+)?\s*(%|m)?$', p, re.I) or re.match(r'^\d+/\d+$', p): val = re.sub(r'\s*(%|m)$', '', p, flags=re.I).strip() unit = None um = re.search(r'\s*(%|m)$', p, re.I) if um: unit = 'm' if um.group(1).lower() == 'm' else '%' value_parts.insert(0, (val, unit)) else: break if len(value_parts) == n: for zi, zone in enumerate(zone_columns): if zone not in target_values: target_values[zone] = [] val, unit = value_parts[zi] target_values[zone].append({"value": val, "unit": unit, "raw_text": line[:200], "line_number": line_idx}) else: # Fallback: regex match by character position all_matches = [(m.start(), m.group(0), m.group(1), m.group(2) if m.lastindex and m.lastindex > 1 else None) for m in re.finditer(numeric_pattern, line, re.I)] all_matches += [(m.start(), m.group(0), m.group(0), None) for m in re.finditer(r'(\d+/\d+)', line, re.I)] all_matches.sort(key=lambda x: x[0]) if len(all_matches) == n: for zi, zone in enumerate(zone_columns): if zone not in target_values: target_values[zone] = [] _, _, val, unit = all_matches[zi] target_values[zone].append({"value": val, "unit": unit.strip() if unit else None, "raw_text": line[:200], "line_number": line_idx}) if current_subparameter and subparameter_values and current_parameter: table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context}) if current_parameter and parameter_values: table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context}) if table_data["parameters"]: tables.append(table_data) state["zone_parameter_tables"] = state.get("zone_parameter_tables", []) + tables if tables: logger.info(f"Extracted {len(tables)} zone-parameter tables") # Zone code pattern: W5, W2/30, Z3, K3/4, W5G, W 5 (optional space) _ZONE_CODE_PATTERN = re.compile(r'\b([WZIK]\s*\d+(?:\s*/\s*\d+)?(?:G)?)\b', re.IGNORECASE) def _zones_in_text(text: str) -> List[str]: """Extract zone codes (W5, W2/30, Z3, etc.) from text. Returns unique list, normalized (e.g. W5).""" matches = _ZONE_CODE_PATTERN.findall(text) seen = set() result = [] for m in matches: # Normalize: remove spaces -> W5, W2/30 n = re.sub(r'\s+', '', m).upper() if n and n not in seen: seen.add(n) result.append(n) return result def extract_rules(state: BZOExtractionState) -> BZOExtractionState: """Detect rule candidates and parse values. Associates each rule with zones from its source article.""" try: candidates = [] for article_dict in state["articles"]: text = article_dict.get("text", "") page_start = article_dict.get("page_start", 0) # Zones mentioned in THIS article - rules from this article apply to these zones article_zones = _zones_in_text(text) for rule_type, rule_config in RULE_TAXONOMY.items(): for pattern in rule_config.get("patterns", []): for match in re.finditer(pattern, text, re.IGNORECASE): start, end = max(0, match.start() - 100), min(len(text), match.end() + 100) context = text[start:end] condition_text = None for cond_pat in [r'(?:nördlich|südlich|östlich|westlich|oberhalb|unterhalb)\s+[^,\.]+', r'(?:für|bei|in)\s+[^,\.]+']: cm = re.search(cond_pat, context, re.IGNORECASE) if cm: condition_text = cm.group(0) break candidates.append({ "rule_type": rule_type, "matched_text": match.group(0), "article_text": text, "page": page_start, "article_label": article_dict.get("article_label"), "condition_text": condition_text, "is_table_rule": False, "table_zones": article_zones.copy(), }) parsed_rules = [] for candidate_dict in candidates: rule_type = candidate_dict["rule_type"] rule_config = RULE_TAXONOMY.get(rule_type, {}) units = rule_config.get("units", []) value_type = rule_config.get("value_type", "numeric") # Extract value using regex matched_text = candidate_dict["matched_text"] article_text = candidate_dict["article_text"] text = matched_text + " " + article_text[article_text.find(matched_text):article_text.find(matched_text) + 200] value_numeric = None value_text = matched_text unit = None # Try to extract numeric value if value_type in ["numeric", "integer"]: # Pattern: "max. 4", "30 %", "min. 3.5 m" value_patterns = [ r'(?:max|maximal|min|mindestens|höchstens)\s*\.?\s*(\d+(?:\.\d+)?)', r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)', r'(\d+(?:\.\d+)?)', ] for pattern in value_patterns: match = re.search(pattern, text, re.IGNORECASE) if match: try: value_numeric = float(match.group(1)) if value_type == "integer": value_numeric = int(value_numeric) # Check for unit unit_match = re.search(r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)', text, re.IGNORECASE) if unit_match: unit = unit_match.group(2).lower() if unit in ["meter", "metern"]: unit = "m" elif unit == "prozent": unit = "%" break except ValueError: continue # Calculate confidence confidence = 0.5 if value_numeric is not None: confidence = 0.8 if unit: confidence = 0.9 # Zone association from source article (zones mentioned in that article) article_zones = candidate_dict.get("table_zones", []) zone_raw = article_zones[0] if article_zones else None rule_scope = "zone" if zone_raw else "general" parsed_rule = { "rule_type": rule_type, "value_numeric": value_numeric, "value_text": value_text, "unit": unit, "condition_text": candidate_dict.get("condition_text"), "is_table_rule": candidate_dict.get("is_table_rule", False), "table_zones": article_zones, "page": candidate_dict["page"], "article_label": candidate_dict.get("article_label"), "text_snippet": value_text, "zone_raw": zone_raw, "rule_scope": rule_scope, "confidence": confidence } parsed_rules.append(parsed_rule) state["parsed_rules"] = parsed_rules return state except Exception as e: logger.error(f"Error in extract_rules: {e}", exc_info=True) state["errors"] = state.get("errors", []) + [f"Extract rules error: {str(e)}"] return state # ===== Wohnzone Parameter Extraction ===== # Canonical order for BZO parameters (Fakten) BZO_PARAM_ORDER = [ "vollgeschosse", "vollgeschoss", "anrechenbares untergeschoss", "untergeschoss", "anrechenbares dachgeschoss", "dachgeschoss", "attikageschoss", "ausnützungsziffer", "ausnutzungsziffer", "az", "überbauungsziffer", "gebäudehöhe", "fassadenhöhen", "grundabstand", "grenzabstand", "gebäudelänge", "mehrlängen", "höchstmass", "baumassenziffer", "grünflächenziffer", "wohnflächenanteil", "gebäudebreite", ] RULE_TYPE_TO_PARAM: Dict[str, str] = { "max_building_height": "Gebäudehöhe max.", "max_floors": "Vollgeschosse max.", "max_attachable_attics": "anrechenbares Dachgeschoss max.", "max_attachable_basement": "anrechenbares Untergeschoss max.", "density": "Ausnützungsziffer", "building_coverage": "Überbauungsziffer", "building_mass_index": "Baumassenziffer (BMZ)", "green_space_index": "Grünflächenziffer (GFZ)", "boundary_distance": "Grundabstand min.", "boundary_distance_length_surcharge": "Mehrlängen-zuschlag (MLZ)", "boundary_distance_max": "Höchstmass Grenzabstand max.", "building_length": "Gebäudelänge max.", "building_width": "Gebäudebreite max.", "residential_area_share": "Wohnflächenanteil", } RULE_TYPE_TO_DEFAULT_UNIT: Dict[str, str] = { "max_building_height": "m", "max_floors": "Stk.", "max_attachable_attics": "Stk.", "max_attachable_basement": "Stk.", "density": "%", "building_coverage": "%", "building_mass_index": "", "green_space_index": "%", "boundary_distance": "m", "boundary_distance_length_surcharge": "", "boundary_distance_max": "m", "building_length": "m", "building_width": "m", "residential_area_share": "%", } _ARTIKEL_KEYWORDS = [ r"herabsetzung", r"grenzabstand", r"nutzweise", r"wohnanteil", r"besondere\s+gebäude", r"überbauungsziffer", r"sonderregel", r"ausnahmen", r"abweichungen", r"erleichterungen", r"mischung", r"gewerbe", r"dienstleistung", r"kantonale", r"abstandsvorschriften", r"vollgeschoss", r"reduziert", r"mindestmass", r"störend", r"nicht\s+störend", r"mässig\s+störend", ] # Artikel that are parameter tables - EXCLUDE from Weiterführende Bestimmungen _ZUSATZ_EXCLUDE_TITLES = ("zonen", "grundmasse", "mehrlängenzuschlag", "mehrlaengenzuschlag") # Artikel that are substantive provisions - INCLUDE in Weiterführende Bestimmungen _ZUSATZ_INCLUDE_TITLES = ( "herabsetzung", "nutzweise", "besondere", "besonderes", "ausnahmen", "abweichungen", "erleichterungen", "sonderregel", "wohnanteil", "nutzungsart", "abstandsvorschriften", "mischung", "gewerbe", "dienstleistung", ) def _format_article_text_readable(text: str, article_label: str = "", article_title: str = "") -> str: """Format raw PDF-extracted text for readable display.""" if not text or not text.strip(): return "" # Strip redundant article header at start (e.g. "Art. 16 Nutzweise" when already in summary) if article_label or article_title: prefix = f"{article_label} {article_title}".strip() if prefix: pat = re.escape(prefix) text = re.sub(rf"^{pat}\s*", "", text.strip(), flags=re.I).lstrip() lines = [] for line in text.split("\n"): line = line.strip() if not line: continue lines.append(line) if not lines: return "" # Join hyphenated word breaks (e.g. "Gewerbe-\nund" -> "Gewerbe und") merged = [] i = 0 while i < len(lines): line = lines[i] while line.rstrip().endswith("-") and i + 1 < len(lines): line = line.rstrip()[:-1] + lines[i + 1].strip() i += 1 if re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines): next_line = lines[i + 1] if not re.match(r"^Art\.\s", next_line) and len(next_line) > 3: line = line + " " + next_line.strip() i += 1 elif re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines) and re.match(r"^Art\.\s", lines[i + 1]): i += 1 continue merged.append(line) i += 1 combined = " ".join(merged) # Fix run-together paragraph numbers: "1In" -> "1. In", "2Ist" -> "2. Ist" combined = re.sub(r"(\d)([A-ZÄÖÜ])", r"\1. \2", combined) # Also fix "a)Something" -> "a) Something" for subparagraphs combined = re.sub(r"([a-z]\))([A-ZÄÖÜ])", r"\1 \2", combined) # Split into paragraphs: numbered (1. ..., 2. ...) or lettered (a) ..., b) ...) parts = re.split(r"(?=\d+\.\s+[A-ZÄÖÜa-zäöü])|(?=[a-z]\)\s+[A-ZÄÖÜa-zäöü])", combined) paragraphs = [] for p in parts: p = p.strip() if not p or len(p) < 3: continue paragraphs.append(p) return "\n\n".join(paragraphs) def _is_zusatzinfo_article(title: str) -> bool: """True if article should appear in Weiterführende Bestimmungen (provisions, not param tables).""" t = (title or "").lower().strip() for exc in _ZUSATZ_EXCLUDE_TITLES: if exc in t: return False for inc in _ZUSATZ_INCLUDE_TITLES: if inc in t: return True return False def _bzo_build_source(page: Optional[int], article: Optional[str]) -> str: """Build source string: Art. X, S. Y""" parts = [] if article: parts.append(str(article)) if page is not None and page > 0: parts.append(f"S. {page}") return ", ".join(parts) if parts else "" def _bzo_zone_matches_table(bauzone: str, zone_col: str) -> bool: """Check if table column zone matches target bauzone.""" b = (bauzone or "").upper().strip() z = (zone_col or "").upper().strip() if not b or not z: return False return b in z or (len(z) >= 2 and z in b) def _bzo_article_mentions_bauzone(article_text: str, bauzone: str) -> bool: """Check if article text mentions the bauzone or applies to it.""" if not bauzone or not article_text: return False b = bauzone.upper().strip() t = article_text.upper() if b in t: return True if len(b) >= 2 and b[0] in "WZIK" and re.search(rf"\b{b[0]}\s*\d+", t): base = re.sub(r"\s+", "", b.split("/")[0].rstrip("G")) if base in t or re.search(rf"\b{base}\b", t): return True return False def _bzo_get_params_from_tables( zone_parameter_tables: List[Dict[str, Any]], bauzone: str ) -> List[Dict[str, Any]]: """Extract parameter values for a Bauzone from zone-parameter tables.""" result = [] seen = set() for table in zone_parameter_tables: zones = table.get("zones", []) if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in zones): continue page = table.get("page") art = table.get("article") for param in table.get("parameters", []): values_by_zone = param.get("values_by_zone", {}) for zone, values in values_by_zone.items(): if not _bzo_zone_matches_table(bauzone, str(zone)): continue if not isinstance(values, list) or len(values) == 0: continue val_entry = values[0] value = val_entry.get("value", "") unit = val_entry.get("unit") or "" param_name = param.get("parameter", "") key = f"{param_name}|{value}|{unit}" if key not in seen: seen.add(key) source = _bzo_build_source(page, param.get("article") or art) result.append({ "parameter": param_name, "value": str(value), "unit": str(unit).strip() if unit else "", "source": source or "Tabelle im Dokument", "rule_type": None, }) return result def _bzo_filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: """Filter rules by Bauzone code.""" bauzone_upper = (bauzone or "").upper() out = [] for r in rules: if bauzone_upper in (r.get("zone_raw") or "").upper(): out.append(r) continue for tz in (r.get("table_zones") or []): if bauzone_upper in str(tz).upper(): out.append(r) break else: if bauzone_upper in (r.get("text_snippet") or "").upper(): out.append(r) return out def _bzo_get_params_from_rules(rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert parsed rules to {parameter, value, unit, source, rule_type} format.""" result = [] seen = set() for r in rules: rule_type = r.get("rule_type", "") param_name = RULE_TYPE_TO_PARAM.get(rule_type) or rule_type.replace("_", " ").title() value_numeric = r.get("value_numeric") value_text = r.get("value_text", "") unit = r.get("unit") or "" if value_numeric is not None: val_str = str(int(value_numeric)) if isinstance(value_numeric, float) and value_numeric == int(value_numeric) else str(value_numeric) else: val_str = str(value_text).strip() if value_text else "" if not val_str: continue val_lower = val_str.lower() if val_lower in ("gebäudelänge", "gebäudebreite", "mehrlängenzuschlag", "mehrlängen", "grenzabstand", "fassadenhöhe"): continue unit_str = str(unit).strip() if unit else (RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, "")) page = r.get("page") article = r.get("article_label") source = _bzo_build_source(page, article) or "Artikeltxt" key = f"{param_name}|{val_str}|{unit_str}" if key not in seen: seen.add(key) result.append({ "parameter": param_name, "value": val_str, "unit": unit_str, "source": source, "rule_type": rule_type, }) return result def _bzo_param_to_rule_type(param_name: str) -> Optional[str]: """Map parameter display name to rule_type.""" p = (param_name or "").lower() if "vollgeschoss" in p: return "max_floors" if "dachgeschoss" in p or "attika" in p: return "max_attachable_attics" if "untergeschoss" in p: return "max_attachable_basement" if "ausnützungsziffer" in p or "ausnutzungsziffer" in p or " az " in p: return "density" if "überbauungsziffer" in p or " uz " in p: return "building_coverage" if "baumassenziffer" in p or "bmz" in p: return "building_mass_index" if "grünflächen" in p or "gfz" in p: return "green_space_index" if "grenzabstand" in p or "grundabstand" in p: return "boundary_distance" if "mehrlängen" in p or "mlz" in p: return "boundary_distance_length_surcharge" if "höchstmass" in p: return "boundary_distance_max" if "gebäudelänge" in p: return "building_length" if "gebäudebreite" in p: return "building_width" if "fassadenhöhe" in p or "gebäudehöhe" in p: return "max_building_height" if "wohnflächenanteil" in p or "wohnanteil" in p: return "residential_area_share" return None def _bzo_merge_rules( from_tables: List[Dict[str, Any]], from_rules: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: """Merge table params and rule params. Tables take precedence.""" by_param_lower: Dict[str, Dict[str, Any]] = {} for r in from_tables: p = (r.get("parameter") or "").lower() if p and p not in by_param_lower: rr = r.copy() if not rr.get("rule_type"): rr["rule_type"] = _bzo_param_to_rule_type(rr.get("parameter", "")) by_param_lower[p] = rr for r in from_rules: p = (r.get("parameter") or "").lower() if p and p not in by_param_lower: by_param_lower[p] = r.copy() return list(by_param_lower.values()) def _bzo_param_sort_key(param_name: str) -> int: """Order parameters by BZO_PARAM_ORDER.""" p = (param_name or "").lower() for i, kw in enumerate(BZO_PARAM_ORDER): if kw in p: return i return 99 def _bzo_extract_zusatzinformationen( articles: List[Dict[str, Any]], bauzone: str = "", zone_parameter_tables: Optional[List[Dict[str, Any]]] = None, ) -> List[Dict[str, Any]]: """Extract article excerpts relevant to the bauzone.""" result = [] seen = set() patterns = [re.compile(kw, re.IGNORECASE) for kw in _ARTIKEL_KEYWORDS] table_articles = set() if zone_parameter_tables and bauzone: for t in zone_parameter_tables: if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in t.get("zones", [])): continue table_articles.add(t.get("article") or "") for art in articles: label = art.get("article_label") or "" title = (art.get("article_title") or "").strip() text = (art.get("text") or "").strip() page = art.get("page_start") or art.get("page_end") or 0 if not label or not text: continue key = f"{label}|{page}" if key in seen: continue combined = f"{title} {text}" if not any(p.search(combined) for p in patterns): continue if bauzone: if not _bzo_article_mentions_bauzone(combined, bauzone) and label not in table_articles: continue seen.add(key) source = _bzo_build_source(page, label) result.append({ "article_label": label, "article_title": title, "text": text[:3500].strip(), "page": page, "source": source or "BZO-Dokument", }) return sorted(result, key=lambda x: (x.get("page", 0), x.get("article_label", ""))) def extract_wohnzone_params( extracted_content: Dict[str, Any], bauzone: str, relevant_rules: Optional[List[Dict[str, Any]]] = None, total_area_m2: Optional[float] = None, ) -> Dict[str, Any]: """ Extract BZO parameters for a Wohnzone from extracted content. Returns ordered list of fakten (with sources) and zusatzinformationen. """ articles = extracted_content.get("articles", []) zone_parameter_tables = extracted_content.get("zone_parameter_tables", []) all_rules = extracted_content.get("rules", []) rules_to_use = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone(all_rules, bauzone) from_tables = _bzo_get_params_from_tables(zone_parameter_tables, bauzone) from_rules = _bzo_get_params_from_rules(rules_to_use) bauzone_rules = _bzo_merge_rules(from_tables, from_rules) fakten = [] if bauzone: fakten.append({"item": "Auswertung für Bauzone", "value": bauzone, "source": ""}) if total_area_m2 is not None and total_area_m2 > 0: fakten.append({ "item": "Grundstücksfläche", "value": f"{total_area_m2:,.0f} m²".replace(",", "'"), "source": "Parzellendaten", }) for r in sorted(bauzone_rules, key=lambda x: _bzo_param_sort_key(x.get("parameter", ""))): param = r.get("parameter", "").strip() val = r.get("value", "") unit = (r.get("unit") or "").strip() rule_type = r.get("rule_type") or _bzo_param_to_rule_type(param) if not unit and rule_type: unit = RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, "") value_str = f"{val}{(' ' + unit) if unit else ''}".strip() if param and value_str: fakten.append({ "item": param, "value": value_str, "source": r.get("source") or "BZO-Dokument", }) zusatzinformationen = _bzo_extract_zusatzinformationen( articles, bauzone, zone_parameter_tables ) return { "bauzone": bauzone, "fakten": fakten, "zusatzinformationen": zusatzinformationen, } # ===== LangGraph: LLM-based BZO Params Extraction ===== def _build_bauzone_context_for_llm(state: BZOParamsExtractionState) -> str: """Build context string for LLM from extracted BZO content.""" bauzone = (state.get("bauzone") or "").upper() zone_parameter_tables = state.get("zone_parameter_tables", []) relevant_articles = state.get("relevant_articles", []) relevant_rules = state.get("relevant_rules", []) total_area_m2 = state.get("total_area_m2") parts = [] if total_area_m2 is not None and total_area_m2 > 0: parts.append(f"Grundstücksfläche der Parzelle: {total_area_m2:,.0f} m²".replace(",", "'")) parts.append("") # Full article texts - LLM can parse tables like Art. 14 (zones in rows, values in columns) parts.append("=== ARTIKEL MIT VOLLEM TEXT (Tabellen genau lesen, richtige Spalte/Zeile für Bauzone wählen) ===") for art in relevant_articles: label = art.get("article_label", "") title = (art.get("article_title") or "").strip() text = art.get("text", "") page = art.get("page_start") or art.get("page_end", 0) parts.append(f"\n{label}: {title}") parts.append(f"Seite: {page}") parts.append(f"Inhalt:\n{text}") parts.append("") # Zone-parameter tables (pre-parsed) if zone_parameter_tables: parts.append("=== VORSTRUKTURIERTE TABELLENWERTE FÜR BAUZONE ===") for table in zone_parameter_tables: page = table.get("page", 0) art = table.get("article", "") parts.append(f"\n{art} (S. {page}):") for param in table.get("parameters", []): pname = param.get("parameter", "") for zone, values in (param.get("values_by_zone") or {}).items(): if bauzone in (zone or "").upper(): if isinstance(values, list) and values: v = values[0].get("value", "") u = values[0].get("unit") or "" parts.append(f" {pname} [{zone}]: {v} {u}".strip()) parts.append("") # Rules from text if relevant_rules: parts.append("=== REGELN AUS ARTIKELTEXT ===") for r in relevant_rules[:20]: rt = r.get("rule_type", "") vn = r.get("value_numeric") vt = r.get("value_text", "") u = r.get("unit", "") page = r.get("page", 0) art = r.get("article_label", "") val = str(int(vn)) if vn is not None and isinstance(vn, float) and vn == int(vn) else (str(vn) if vn is not None else vt) parts.append(f" {rt}: {val} {u} ({art}, S. {page})".strip()) return "\n".join(parts) def _parse_llm_bullet_list(text: str) -> List[Dict[str, str]]: """Parse LLM response into fakten list. Expects lines like '- Param: value (Art. X, S. Y)'.""" fakten = [] for line in (text or "").strip().split("\n"): line = line.strip() if not line or not line.startswith("-"): continue line = line.lstrip("- ").strip() # Match "Param: value (source)" or "Param: value" match = re.match(r"^(.+?):\s*(.+?)(?:\s*\(([^)]+)\))?\s*$", line) if match: item = match.group(1).strip() value = match.group(2).strip() source = (match.group(3) or "").strip() if item and value: fakten.append({"item": item, "value": value, "source": source}) elif ":" in line: idx = line.find(":") fakten.append({ "item": line[:idx].strip(), "value": line[idx + 1 :].strip(), "source": "", }) return fakten async def _llm_filter_relevant_provisions( ai_service: Any, bauzone: str, fakten: List[Dict[str, str]], provision_articles: List[Dict[str, Any]], ) -> Optional[set]: """ Use LLM to determine which provision articles are relevant for a parcel in this bauzone. Returns set of article labels (e.g. {"Art. 15", "Art. 16"}) or None to include all on error. """ if not provision_articles: return set() fakten_str = "\n".join( f"- {f.get('item', '')}: {f.get('value', '')}" for f in fakten if f.get("item") and "Auswertung" not in (f.get("item") or "") ) articles_str = "\n".join( f"- {a.get('article_label', '')}: {a.get('article_title', '')}" for a in provision_articles if a.get("article_label") ) prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO). Eine Parzelle liegt in der Bauzone {bauzone}. Folgende BZO-Parameter gelten für diese Zone: {fakten_str} Folgende Bestimmungen (Weiterführende Artikel) könnten zutreffen: {articles_str} AUFGABE: Welche dieser Artikel sind für eine Parzelle in Bauzone {bauzone} mit diesen Parametern TATSÄCHLICH RELEVANT? - Nur Artikel angeben, die auf diese Zone/Parameter Bezug nehmen oder Bedingungen nennen, die hier greifen - z.B. Art. 15 Herabsetzung: relevant wenn Vollgeschosse und Grenzabstand vorhanden (Reduktion bei weggelassenen Geschossen) - z.B. Art. 16 Nutzweise: relevant für Wohnzonen mit Wohnanteil - z.B. Art. 40 Wohnanteil: nur wenn dieser Artikel die Zone {bauzone} erwähnt oder für Wohnzonen gilt - Artikel die andere Zonen betreffen (z.B. nur Z5, I) und {bauzone} ausschliessen: NICHT aufnehmen Antwort NUR mit den relevanten Artikelnummern, eine pro Zeile (z.B. "Art. 15", "Art. 16"). Keine anderen Zeichen.""" try: response = await ai_service.callAiPlanning( prompt=prompt, debugType="bzo_relevant_provisions", ) labels = set() for line in (response or "").strip().split("\n"): m = re.search(r"(Art\.\s*\d+[a-z]?)", line.strip(), re.I) if m: lbl = re.sub(r"\s+", " ", m.group(1).strip()) labels.add(lbl) return labels if labels else None # None = include all (fallback on error or empty) except Exception as e: logger.warning(f"LLM provision filter failed: {e}") return None async def llm_extract_bauzone_params_node(state: BZOParamsExtractionState) -> BZOParamsExtractionState: """LangGraph node: use LLM to extract BZO parameters for Bauzone as bullet list.""" bauzone = state.get("bauzone", "") gemeinde = state.get("gemeinde", "") ai_service = state.get("ai_service") errors = list(state.get("errors", [])) if not ai_service: errors.append("AI service not provided") return {**state, "fakten": [], "bauzone_params_list": [], "errors": errors} context = _build_bauzone_context_for_llm(state) prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO). Extrahiere alle relevanten BZO-Parameter für die Bauzone {bauzone} in {gemeinde}. BZO-INHALT: {context} AUFGABE: Erstelle eine geordnete Bullet-Liste ALLER zutreffenden Parameter für Bauzone {bauzone}. Priorität: Vollgeschosse, anrechenbares Untergeschoss, anrechenbares Dachgeschoss, Ausnützungsziffer, Überbauungsziffer, Gebäudehöhe, Grundabstand/Grenzabstand, Gebäudelänge, Mehrlängenzuschlag, Höchstmass, sowie alle anderen Bestimmungen die für diese Zone gelten. WICHTIG: - Bei Tabellen: die richtige Spalte/Zeile für {bauzone} verwenden (z.B. Art. 14 Mehrlängenzuschlag: W5 = 13 m) - Jede Zeile: "- Parametername: Wert (Art. X, S. Y)" - Nur tatsächlich im Dokument vorhandene Werte angeben - Einheit (m, %, Stk.) bei Zahlen mit angeben - Keine leeren Zeilen oder Kommentare - nur die Liste Antwort NUR mit der Bullet-Liste, sonst nichts:""" try: ai_response = await ai_service.callAiPlanning( prompt=prompt, debugType="bzo_params_extraction", ) response_text = (ai_response or "").strip() # Parse into fakten fakten = _parse_llm_bullet_list(response_text) # Build bauzone_params_list (raw "- ..." strings) bauzone_params_list = [f"- {f['item']}: {f['value']}" + (f" ({f['source']})" if f.get("source") else "") for f in fakten] # Add header items if missing if bauzone and not any("Auswertung" in (f.get("item") or "") for f in fakten): fakten.insert(0, {"item": "Auswertung für Bauzone", "value": bauzone, "source": ""}) total_area_m2 = state.get("total_area_m2") if total_area_m2 is not None and total_area_m2 > 0 and not any("Grundstücksfläche" in (f.get("item") or "") for f in fakten): fakten.insert(1, { "item": "Grundstücksfläche", "value": f"{total_area_m2:,.0f} m²".replace(",", "'"), "source": "Parzellendaten", }) # Zusatzinformationen: only provisions RELEVANT for this parcel in this bauzone all_articles = state.get("extracted_content", {}).get("articles", []) or state.get("relevant_articles", []) provision_articles = [a for a in all_articles if _is_zusatzinfo_article((a.get("article_title") or "").strip())] relevant_labels = await _llm_filter_relevant_provisions( ai_service=ai_service, bauzone=bauzone, fakten=fakten, provision_articles=provision_articles, ) def _norm_label(s: str) -> str: return re.sub(r"\s+", " ", (s or "").strip()) zusatzinformationen = [] for art in provision_articles: label = art.get("article_label", "") title = (art.get("article_title") or "").strip() norm = _norm_label(label) if relevant_labels is not None and norm and norm not in relevant_labels: continue raw_text = (art.get("text") or "")[:4000] text = _format_article_text_readable( raw_text, article_label=label, article_title=title, ) if not text: continue page = art.get("page_start") or art.get("page_end", 0) source = f"{label}, S. {page}" if label else f"S. {page}" zusatzinformationen.append({ "article_label": label, "article_title": title, "text": text, "source": source, }) return { **state, "fakten": fakten, "bauzone_params_list": bauzone_params_list, "zusatzinformationen": zusatzinformationen, "errors": errors, } except Exception as e: logger.error(f"LLM BZO params extraction failed: {e}", exc_info=True) errors.append(str(e)) return { **state, "fakten": [], "bauzone_params_list": [], "zusatzinformationen": [], "errors": errors, } def create_bzo_params_extraction_graph(): """Create LangGraph for LLM-based BZO params extraction.""" workflow = StateGraph(BZOParamsExtractionState) workflow.add_node("llm_extract", llm_extract_bauzone_params_node) workflow.set_entry_point("llm_extract") workflow.add_edge("llm_extract", END) return workflow.compile() def _filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: """Filter articles that mention the Bauzone.""" bauzone_upper = (bauzone or "").upper() return [ a for a in articles if bauzone_upper in (a.get("text") or "").upper() or bauzone_upper in (a.get("zone_raw") or "").upper() ] def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: """Filter zone-parameter tables to those containing the Bauzone.""" bauzone_upper = (bauzone or "").upper() relevant = [] for table in tables: zones = table.get("zones", []) matching = [z for z in zones if bauzone_upper in str(z).upper()] if matching: filtered = { "page": table.get("page"), "article": table.get("article"), "zones": matching, "parameters": [ {"parameter": p.get("parameter"), "values_by_zone": { z: v for z, v in (p.get("values_by_zone") or {}).items() if bauzone_upper in str(z).upper() }} for p in table.get("parameters", []) if any(bauzone_upper in str(z).upper() for z in (p.get("values_by_zone") or {})) ], } filtered["parameters"] = [x for x in filtered["parameters"] if x["values_by_zone"]] if filtered["parameters"]: relevant.append(filtered) return relevant async def run_bzo_params_extraction( extracted_content: Dict[str, Any], bauzone: str, ai_service: Any, gemeinde: str, relevant_rules: Optional[List[Dict[str, Any]]] = None, relevant_articles: Optional[List[Dict[str, Any]]] = None, total_area_m2: Optional[float] = None, ) -> Dict[str, Any]: """ Run LangGraph workflow to extract BZO parameters for a Bauzone via LLM. Returns fakten (item/value/source), bauzone_params_list (bullet strings), zusatzinformationen. """ rules = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone( extracted_content.get("rules", []), bauzone ) articles = relevant_articles if relevant_articles is not None else _filter_articles_by_bauzone( extracted_content.get("articles", []), bauzone ) tables = _filter_tables_by_bauzone( extracted_content.get("zone_parameter_tables", []), bauzone ) state: BZOParamsExtractionState = { "extracted_content": extracted_content, "bauzone": bauzone, "total_area_m2": total_area_m2, "relevant_rules": rules, "relevant_articles": articles, "zone_parameter_tables": tables, "ai_service": ai_service, "gemeinde": gemeinde, "bauzone_params_list": [], "fakten": [], "zusatzinformationen": [], "errors": [], } graph = create_bzo_params_extraction_graph() final_state = await graph.ainvoke(state) return { "bauzone": bauzone, "fakten": final_state.get("fakten", []), "bauzone_params_list": final_state.get("bauzone_params_list", []), "zusatzinformationen": final_state.get("zusatzinformationen", []), "errors": final_state.get("errors", []), } # ===== Graph Construction ===== def create_bzo_extraction_graph(): """Create and compile the BZO extraction graph (simplified 4-node pipeline).""" workflow = StateGraph(BZOExtractionState) workflow.add_node("classify_and_assemble", classify_and_assemble) workflow.add_node("extract_zones_and_tables", extract_zones_and_tables) workflow.add_node("extract_rules", extract_rules) workflow.set_entry_point("classify_and_assemble") workflow.add_edge("classify_and_assemble", "extract_zones_and_tables") workflow.add_edge("extract_zones_and_tables", "extract_rules") workflow.add_edge("extract_rules", END) return workflow.compile() def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None) -> Dict[str, Any]: """ Run the extraction pipeline on a PDF and return structured, sorted results. Args: pdf_bytes: PDF file content as bytes pdf_id: Optional identifier for the PDF (defaults to generated ID) dokument_id: Optional dokument ID for reference Returns: Dictionary with extracted and sorted content: { "articles": [...], # Sorted by page_start, then article_label "zones": [...], # Sorted by zone_code "rules": [...], # Sorted by rule_type, then page "errors": [...], "warnings": [...] } """ import uuid if not pdf_id: pdf_id = f"pdf_{uuid.uuid4().hex[:8]}" # Initialize state state: BZOExtractionState = { "dokument_id": dokument_id, "pdf_id": pdf_id, "text_blocks": [], "classified_blocks": [], "articles": [], "current_zones": {}, "zones": [], "rule_candidates": [], "parsed_rules": [], "zone_parameter_tables": [], "errors": [], "warnings": [] } # Extract PDF text first pdf_extractor = BZOPdfExtractor() text_blocks_objects = pdf_extractor.extract_text_blocks(pdf_bytes, state["pdf_id"]) # Convert TextBlock objects to dicts for state state["text_blocks"] = [ { "page": tb.page, "text": tb.text, "block_id": tb.block_id, "bbox": tb.bbox } for tb in text_blocks_objects ] # Create and run graph graph = create_bzo_extraction_graph() final_state = graph.invoke(state) # Sort and structure results articles = sorted( final_state.get("articles", []), key=lambda x: (x.get("page_start", 0), x.get("article_label", "")) ) zones = sorted( final_state.get("zones", []), key=lambda x: x.get("zone_code", "") ) rules = sorted( final_state.get("parsed_rules", []), key=lambda x: (x.get("rule_type", ""), x.get("page", 0)) ) zone_parameter_tables = final_state.get("zone_parameter_tables", []) return { "articles": articles, "zones": zones, "rules": rules, "zone_parameter_tables": zone_parameter_tables, "errors": final_state.get("errors", []), "warnings": final_state.get("warnings", []) } def extract_from_documents( document_retriever, dokument_ids: List[str] ) -> Dict[str, Any]: """ Extract BZO content from one or more documents. Args: document_retriever: BZODocumentRetriever instance dokument_ids: List of dokument IDs to process Returns: Dictionary with results per document: { "results": [ { "dokument_id": "...", "articles": [...], "zones": [...], "rules": [...], "errors": [...], "warnings": [...] }, ... ], "summary": { "total_documents": N, "successful": M, "failed": K, "total_articles": X, "total_zones": Y, "total_rules": Z } } """ results = [] total_articles = 0 total_zones = 0 total_rules = 0 successful = 0 failed = 0 # Retrieve documents dokumente = document_retriever.get_documents_by_ids(dokument_ids) for dokument in dokumente: try: # Retrieve PDF content pdf_bytes = document_retriever.retrieve_pdf_content(dokument) if not pdf_bytes: logger.warning(f"Could not retrieve PDF for dokument {dokument.id}") results.append({ "dokument_id": dokument.id, "articles": [], "zones": [], "rules": [], "errors": [f"Could not retrieve PDF content"], "warnings": [] }) failed += 1 continue # Run extraction extraction_result = run_extraction( pdf_bytes=pdf_bytes, pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}", dokument_id=dokument.id ) # Add dokument_id to result extraction_result["dokument_id"] = dokument.id results.append(extraction_result) # Update counters total_articles += len(extraction_result.get("articles", [])) total_zones += len(extraction_result.get("zones", [])) total_rules += len(extraction_result.get("rules", [])) if extraction_result.get("errors"): failed += 1 else: successful += 1 except Exception as e: logger.error(f"Error processing dokument {dokument.id}: {str(e)}", exc_info=True) results.append({ "dokument_id": dokument.id, "articles": [], "zones": [], "rules": [], "errors": [f"Processing error: {str(e)}"], "warnings": [] }) failed += 1 return { "results": results, "summary": { "total_documents": len(dokument_ids), "successful": successful, "failed": failed, "total_articles": total_articles, "total_zones": total_zones, "total_rules": total_rules } }