wiki/z-archive/2026-04-unified-document-model.md
2026-04-16 23:12:56 +02:00

39 KiB
Raw Blame History

Unified Document Model (UDM) — Dokumenten-Extraktion, Workflow ForEach & High-Volume

Abgeschlossen am 2026-04-16. Technische Referenz: b-reference/gateway/workflow.md (UDM, Nodes, Engine), b-reference/gateway/ai-agent.md (UDM-Tools). Konzept: c-work/0-ideas/unified-document-model.md.

Abschlussbericht

  • Umgesetzt: UDM-Datenmodell und Bridge, Extraktions-Pipeline (inkl. Registry-Singleton, PDF-Memory), context.extractContent, Port-Typen und Executor-Routing (context.*ActionNodeExecutor), data.consolidate / ai.consolidate, Loop (level, concurrency), flow.merge mit dynamischer Input-Anzahl, data.filter mit UDM-Content-Type-Presets, Engine (Concurrency, StepLog-Batching, Streaming-Aggregate, Progress), meta.usesAi und AI-Badge im Editor, Agent-Tools (getUdmStructure, walkUdmBlocks, filterUdmByType), Referenzdoku in b-reference/ und Eintrag in TOPICS.md.
  • Tests im Repo (Auszug): gateway/tests/unit/datamodels/test_udm_models.py, test_udm_bridge.py, gateway/tests/integration/extraction/test_extract_udm_pipeline.py, gateway/tests/integration/workflows/test_execute_graph_loop_aggregate_consolidate.py, gateway/tests/unit/nodeDefinitions/test_usesai_flag.py, gateway/tests/unit/serviceAgent/test_udm_agent_tools.py.
  • Backlog / optional: Erweiterung des ursprünglichen Testplan-Rasters (High-Volume-Referenzen, Fan-out/Merge-Integration, Concurrency-Benchmark, Load-Test 10k) nach Bedarf; UX wie explizite Loop-Body-Hervorhebung oder dedizierte NodeConfig-Komponenten können nachgelagert werden.

Grundlage: UDM-Konzept

Dieses Feature basiert auf dem Unified Document Model (UDM) — einer generischen, formatunabhängigen 3-Ebenen-Baumstruktur für Dokumenten-Extraktion. Das vollständige Konzept mit Datenmodell, Format-Mappings (PDF, DOCX, PPTX, XLSX, HTML), Workflow-Integration, Archive-Handling und JSON-Beispielen ist hier definiert:

c-work/0-ideas/unified-document-model.md

Kernprinzipien des UDM:

  • 3-Ebenen-Garantie: Jedes Dokument hat DocumentStructuralNode[]ContentBlock[]
  • Einheitliche Blattknoten: Alle atomaren Inhalte sind ContentBlock-Objekte mit identischer Struktur
  • Generische Traversierung: Workflow-Nodes arbeiten formatunabhängig über dieselbe Baumstruktur
  • Keine formatspezifischen Zwischenschichten: Konzepte wie "Paragraph", "Row", "Cell" werden in ContentBlock.attributes absorbiert
Ebene Typ Beschreibung
Level 1 Document Wurzelknoten pro Quelldatei
Level 2 StructuralNode Seite, Abschnitt, Slide oder Sheet
Level 3 ContentBlock Atomarer Inhalt: Text, Bild, Tabelle, Code, Media, Link, Formel

Dieser Plan beschreibt die Umsetzung des UDM-Konzepts im Gateway, die nötigen Workflow-Nodes, und die High-Volume-Skalierbarkeit.


Beschreibung und Kontext

Das bestehende Dokumenten-Extraktionssystem arbeitet mit ContentPart / ContentExtracted als flacher Liste. Das UDM ersetzt dieses Modell durch eine hierarchische Baumstruktur, die es erlaubt, über Level-Attribute (Seiten, Sections, Sheets, Slides) generisch zu iterieren.

Business-Treiber: Workflows benötigen die Fähigkeit, ein Dokument zu extrahieren und dann pro Struktureinheit (z.B. pro PDF-Seite) eine Kette von Verarbeitungsschritten auszuführen — und am Ende die Teilergebnisse zu konsolidieren. Zusätzlich muss das System mit grossen Datenmengen umgehen können (z.B. ZIP mit 10.000 PDFs).

Heutige Lücken:

  1. Es gibt keinen Extract-Node im Graphical Editor — context.extractContent existiert nur als Workspace-/Agent-Action, nicht als visuellen Node
  2. Das Extraktionsmodell liefert keine hierarchische Baum-Struktur (nur flache ContentPart-Liste)
  3. Der flow.loop-Node hat keine Sub-Workflow-Modellierung — der Loop-Body wird nur über Graph-Topologie implizit erkannt
  4. Es gibt keinen dedizierten Consolidate-Schritt für strukturierte Zusammenführung nach ForEach
  5. Die Engine ist nicht skalierbar für >1000 Iterationen (Memory, DB-Last, keine Parallelität)
  6. Nodes im Editor haben keine visuelle Kennzeichnung, ob sie AI nutzen oder deterministisch arbeiten

Abhängigkeiten: AI-Agent-Tools (_documentTools.py), Workflow-Engine (executionEngine.py), Graphical Editor (Frontend), Extraktion (serviceExtraction).

Risiko bei Nicht-Umsetzung: Dokument-intensive Workflows (Trustee, Compliance-Audit, Massendokumentverarbeitung) bleiben manuell oder erfordern Custom-Code pro Use Case.


Fokus und kritische Details

  • Migration des Extraktionsmodells: ContentPart → UDM muss rückwärtskompatibel sein. Bestehende Konsumenten (ChatContentExtracted, Agent-Tools, Neutralisierung) dürfen nicht brechen.
  • Loop-Body mit parallelen Pfaden: Fan-out von Loop-Output zu mehreren Nodes + Merge pro Iteration funktioniert in der Engine bereits (Topo-Sort garantiert Reihenfolge), aber flow.merge hat heute fix 2 Inputs.
  • High-Volume-Skalierbarkeit: Bei 10k Iterationen entstehen Engpässe bei Memory (Base64-Bilder in nodeOutputs), DB (AutoStepLog pro Body-Node pro Iteration), und Laufzeit (sequentielle Verarbeitung).
  • Extractor-Performance: extractorContainer.py erstellt pro Datei im ZIP eine neue ExtractorRegistry()-Instanz (Auto-Discovery). Bei 10k Dateien ist das ein massiver Overhead.
  • PDF-Memory: extractorPdf.py lädt via buf.getvalue() eine Kopie des gesamten PDFs — doppelter Memory-Verbrauch pro Datei.

Ziel und Nicht-Ziele

Ziele

  1. context.extractContent-Node im Graphical Editor — reine Strukturextraktion OHNE AI
  2. UDM-Datenmodell (Document, StructuralNode, ContentBlock, Archive) als Pydantic-Modelle
  3. Extractor-Adapter: Bestehende Extractors liefern zusätzlich UDM-Output; Bridge ContentPart ↔ UDM
  4. ForEach-Workflow-Pattern: Loop über UDM-Struktureinheiten mit parallelen Pfaden im Body und Merge pro Iteration
  5. Consolidate-Nodes: data.consolidate (deterministisch) + ai.consolidate (AI-gestützt) für strukturierte Zusammenführung
  6. High-Volume-Fähigkeit: ZIP mit 10.000+ PDFs verarbeitbar durch Streaming-Extraktion, Loop-Concurrency und StepLog-Batching
  7. Neue Port-Typen: UdmDocument, UdmNodeList, ConsolidateResult
  8. Agent-Tools: Bestehende browseContainer/readContentObjects um UDM-Traversierung erweitern
  9. AI-Kennzeichnung auf Nodes: Jeder Node im Editor zeigt visuell, ob er AI nutzt (meta.usesAi) — Kostentransparenz und schnelle Übersicht

Explizit NICHT

  • Neue Dateiformate (Markdown, LaTeX, etc.) — kommt separat
  • Visuelles Sub-Graph-Grouping (Compound-Nodes) im Editor — wird mit impliziter Body-Erkennung gelöst
  • Breaking Change an ContentPart-API — Bridge-Layer garantiert Kompatibilität

Betroffene Module

Gateway

Modul Änderung
datamodels/datamodelUdm.py Neue Datei: Pydantic-Klassen UdmDocument, UdmStructuralNode, UdmContentBlock, UdmArchive, UdmPosition, UdmMetadata. Bridge-Funktionen. UdmContentBlock.raw optional mit fileRef-Alternative für Lazy-Loading
datamodels/datamodelExtraction.py ExtractionOptions.outputFormat Feld ("parts" / "udm" / "both")
serviceExtraction/subRegistry.py Extractor-Interface erweitern: extractToUdm(). ExtractorRegistry als Singleton cachen (heute: neue Instanz pro Datei im ZIP)
serviceExtraction/extractors/extractorPdf.py UDM-Output pro Seite. Fix buf.getvalue() → direkt BytesIO an fitz übergeben (halber Memory)
serviceExtraction/extractors/extractorDocx.py UDM-Output pro Section (Heading-basiert)
serviceExtraction/extractors/extractorPptx.py UDM-Output pro Slide
serviceExtraction/extractors/extractorXlsx.py UDM-Output pro Sheet
serviceExtraction/extractors/extractorHtml.py UDM-Output pro semantischem Bereich
serviceExtraction/extractors/extractorContainer.py Lazy-Modus: ZIP-Inhaltsverzeichnis liefern statt alle Dateien extrahieren. Registry-Singleton statt new ExtractorRegistry() pro Datei
serviceExtraction/mainServiceExtraction.py extractContent() mit UDM-Option. Streaming-Modus für grosse Archive
features/graphicalEditor/nodeDefinitions/context.py Neue Datei: Node context.extractContent
features/graphicalEditor/nodeDefinitions/data.py Neuer Node: data.consolidate (deterministisch)
features/graphicalEditor/nodeDefinitions/ai.py Neuer Node: ai.consolidate (AI-gestützt)
features/graphicalEditor/nodeDefinitions/flow.py flow.loop: Parameter level (UDM-Ebene) + concurrency (parallele Iterationen). flow.merge: dynamische Input-Anzahl
features/graphicalEditor/nodeDefinitions/__init__.py CONTEXT_NODES in STATIC_NODE_TYPES aufnehmen
features/graphicalEditor/portTypes.py Neue Port-Typen: UdmDocument, UdmNodeList, ConsolidateResult. Input-Extraktoren dazu
workflows/automation2/executors/flowExecutor.py Loop: UDM-Array-Auflösung, Concurrency-Support
workflows/automation2/executors/dataExecutor.py data.consolidate-Logik (deterministisch). Filter: UDM-Content-Type-Presets
workflows/automation2/executionEngine.py (_getExecutor) Branch context.* hinzufügen → ActionNodeExecutor
workflows/automation2/executionEngine.py Loop-Concurrency (N Items parallel), StepLog-Batching (bei >100 Iterationen), Streaming-Aggregate (periodisch flushen)
workflows/methods/methodContext/actions/extractContent.py UDM-Output-Option, Lazy-Modus für Archive
workflows/methods/methodAi/actions/consolidate.py Neue Datei: Action consolidate für ai.consolidate (LLM-Call: summarize, classify, semantic merge)
serviceAgent/coreTools/_documentTools.py UDM-Tools: walkUdmBlocks, filterUdmByType, getUdmStructure

Frontend

Modul Änderung
FlowEditor/nodes/context/ExtractContentNodeConfig.tsx Neue Datei: Config-UI für context.extractContent
FlowEditor/nodes/loop/LoopNodeConfig.tsx UDM-Level-Selektor + Concurrency-Slider
FlowEditor/nodes/shared/LoopItemsSelect.tsx UDM-Structural-Level als Quelle
FlowEditor/nodes/shared/types.ts Neue Node-Config-Types
FlowEditor/FlowCanvas.tsx Visuelle Loop-Body-Markierung (farbiger Hintergrund). AI-Badge auf Nodes die AI nutzen
FlowEditor/NodeSidebar.tsx Neue Nodes in Palette (context.extractContent, data.consolidate, ai.consolidate). AI-Badge in Palette
FlowEditor/nodes/shared/AiBadge.tsx Neue Datei: Wiederverwendbare AI-Badge-Komponente (usesAi: boolean)
api/workflowApi.ts NodeType-Erweiterung

DB-Migration

Nein — UDM-Daten fliessen als JSON durch Workflow-Context/nodeOutputs, keine neue DB-Tabelle nötig.


Architektur-Design

A) UDM-Datenmodell (Gateway)

class UdmMetadata(BaseModel):
    title: Optional[str] = None
    author: Optional[str] = None
    createdAt: Optional[str] = None
    modifiedAt: Optional[str] = None
    sourcePath: str = ""
    tags: List[str] = Field(default_factory=list)
    custom: Dict[str, Any] = Field(default_factory=dict)

class UdmBoundingBox(BaseModel):
    x: float; y: float; width: float; height: float
    unit: Literal["px", "pt", "mm"] = "pt"

class UdmPosition(BaseModel):
    index: int
    page: Optional[int] = None
    row: Optional[int] = None
    col: Optional[int] = None
    bbox: Optional[UdmBoundingBox] = None

class UdmContentBlock(BaseModel):
    id: str
    contentType: Literal["text", "image", "table", "code", "media", "link", "formula"]
    raw: str = ""
    fileRef: Optional[str] = None      # Lazy-Loading: Referenz statt inline Base64
    mimeType: Optional[str] = None
    language: Optional[str] = None
    attributes: Dict[str, Any] = Field(default_factory=dict)
    position: UdmPosition = Field(default_factory=lambda: UdmPosition(index=0))
    metadata: UdmMetadata = Field(default_factory=UdmMetadata)

class UdmStructuralNode(BaseModel):
    id: str
    role: Literal["page", "section", "slide", "sheet"]
    index: int
    label: Optional[str] = None
    metadata: UdmMetadata = Field(default_factory=UdmMetadata)
    children: List[UdmContentBlock] = Field(default_factory=list)

class UdmDocument(BaseModel):
    id: str
    role: Literal["document"] = "document"
    sourceType: Literal["pdf", "docx", "pptx", "xlsx", "html"]
    sourcePath: str = ""
    metadata: UdmMetadata = Field(default_factory=UdmMetadata)
    children: List[UdmStructuralNode] = Field(default_factory=list)

class UdmArchive(BaseModel):
    id: str
    role: Literal["archive"] = "archive"
    sourceType: Literal["zip", "tar", "gz"]
    sourcePath: str = ""
    metadata: UdmMetadata = Field(default_factory=UdmMetadata)
    children: List[Union[UdmArchive, UdmDocument]] = Field(default_factory=list)

Schlüssel-Erweiterung für High-Volume: UdmContentBlock.fileRef — statt Base64-Daten inline zu speichern, kann eine Datei-Referenz (z.B. fileId oder temp-Pfad) hinterlegt werden. Der Inhalt wird erst on-demand geladen, wenn ein downstream Node ihn braucht. Bei 10k PDFs spart das mehrere GB RAM.

B) Bridge: ContentPart ↔ UDM

def _contentPartsToUdm(extracted: ContentExtracted, sourceType: str, sourcePath: str) -> UdmDocument:
    """Konvertiert flache ContentPart-Liste in UDM-Baum.
    Groupiert nach parentId oder typeGroup zu StructuralNodes."""

def _udmToContentParts(document: UdmDocument) -> ContentExtracted:
    """Konvertiert UDM-Baum zurück in flache ContentPart-Liste.
    Für Rückwärtskompatibilität."""

C) Neuer Node: context.extractContent (Strukturextraktion OHNE AI)

Problem heute: context.extractContent existiert nur als Method-Action für Workspace/Agent. Im Graphical Editor gibt es keinen Node, um ein Dokument rein strukturell zu zerlegen.

Lösung: Ein neuer Node context.extractContent in der Kategorie context:

  1. Dokument(e) entgegennehmen (Input: DocumentList)
  2. Reine Extraktion (kein AI-Call, nur Parser)
  3. UDM-Struktur als Output
  4. Downstream gezielt nutzbar: Loop → Filter → AI nur für bestimmte Blöcke

Node-Definition

{
    "id": "context.extractContent",
    "category": "context",
    "label": t("Inhalt extrahieren"),
    "description": t("Dokumentstruktur extrahieren ohne KI (Seiten, Abschnitte, Bilder, Tabellen)"),
    "parameters": [
        {"name": "outputDetail", "type": "string", "required": False, "frontendType": "select",
         "frontendOptions": {"options": ["full", "structure", "references"]},
         "description": t("Detailgrad"), "default": "full"},
        {"name": "includeImages", "type": "boolean", "required": False, "frontendType": "checkbox",
         "description": t("Bilder extrahieren"), "default": True},
        {"name": "includeTables", "type": "boolean", "required": False, "frontendType": "checkbox",
         "description": t("Tabellen extrahieren"), "default": True},
    ],
    "inputs": 1,
    "outputs": 1,
    "inputPorts": {0: {"accepts": ["DocumentList", "Transit"]}},
    "outputPorts": {0: {"schema": "UdmDocument"}},
    "meta": {"icon": "mdi-file-tree-outline", "color": "#00897B"},
    "_method": "context",
    "_action": "extractContent",
}

Parameter outputDetail

Wert Beschreibung Use Case
full Volle UDM-Struktur inkl. Rohdaten (Text, Base64-Bilder, JSON-Tabellen) Standard — alles downstream verfügbar. Bis ~100 Dokumente.
structure Nur Baum-Skelett: Document → StructuralNodes → ContentBlock-Metadaten (ohne raw) Schnelle Vorschau, Routing-Entscheidungen
references Datei-Referenzen statt Inline-Daten (fileRef statt raw). Inhalt wird on-demand im Loop geladen High-Volume: 1000+ Dokumente. Spart RAM.

D) ForEach/Consolidate Workflow-Pattern

Einfacher Fall: 1 PDF, pro Seite verarbeiten

[Upload] → [Extract] → [Loop (pro Seite)] → [AI] → [Aggregate] → [Consolidate]

Parallele Pfade im Loop-Body: Bild + Text pro Seite

[Upload] → [Extract] → [Loop (pro Seite)]
                              ├→ [Filter: Bilder] → [AI Vision]  ──┐
                              └→ [Filter: Text]   → [AI Text]   ──┤
                                                                   ↓
                                                            [Merge (pro Seite)]
                                                                   ↓
                                                            [Aggregate]
                          [Consolidate (alle Seiten → Tabelle)] ←──┘

Warum das heute schon funktioniert:

  • Fan-out: Ein Output-Port kann mehrere Connections haben (Target-Inputs sind limitiert auf 1, Source-Outputs nicht)
  • getLoopBodyNodeIds() (BFS) findet alle Nodes in beiden Pfaden
  • topoSort ordnet: Filter + AI vor Merge (weil Merge von beiden abhängt)
  • Body-Nodes werden sequentiell in Topo-Sort-Reihenfolge abgearbeitet → Merge hat beide Inputs

Umgesetzt (Plan):

  • flow.merge: dynamische Input-Anzahl (25 via Parameter inputCount)
  • data.filter: UDM-Content-Type-Presets (Filter auf contentType, structuralNode.index, attributes.*)

High-Volume-Fall: ZIP mit 10.000 PDFs

[Upload ZIP] → [Extract (references)] → [Loop (pro Dokument, concurrency: 10)]
                                               ↓
                                         [Lazy-Extract (1 PDF, Seite 1)]
                                               ↓
                                         [Filter: Bild Seite 1]
                                               ↓
                                         [AI Vision → CSV-Zeile]
                                               ↓
                                         [Aggregate (CSV-Zeilen)]
                     [Consolidate (mode: table → grosses CSV)] ←──┘

Schlüssel: outputDetail: "references" — der erste Extract liefert nur eine leichtgewichtige Liste von 10k Datei-Referenzen (~10 KB statt ~4 GB). Pro Loop-Iteration wird nur 1 PDF on-demand geladen, verarbeitet und wieder freigegeben.

High-Volume: ZIP mit Fan-out + Merge pro Dokument

Dasselbe Muster wie bei einem einzelnen PDF (zwei parallele Body-Pfade + Merge pro Iteration), angewendet auf pro Archiv-Eintrag statt pro Seite:

[Upload ZIP] → [Extract (references)] → [Loop (pro Dokument, concurrency: N)]
                                              ├→ [Lazy-Extract / Seite 1] → [AI Vision] ──┐
                                              └→ [Filter: Text] → [AI Text] ────────────┤
                                                                                      [Merge]
                                                                                        ↓
                                                                                  [Aggregate]
[Consolidate] ←──────────────────────────────────────────────────────────────────────────┘

Pro Iteration gilt: Fan-out und Topo-Sort im Loop-Body wie oben; der erste Extract liefert nur Referenzen, sodass nicht alle PDFs gleichzeitig im RAM liegen.

E) Flow.loop Erweiterungen

# Neuer Parameter: UDM-Level
{
    "name": "level",
    "type": "string",
    "required": False,
    "frontendType": "select",
    "frontendOptions": {"options": ["auto", "documents", "structuralNodes", "contentBlocks"]},
    "description": t("UDM-Iterationsebene"),
    "default": "auto",
}

# Neuer Parameter: Parallele Iterationen
{
    "name": "concurrency",
    "type": "number",
    "required": False,
    "frontendType": "number",
    "frontendOptions": {"min": 1, "max": 20},
    "description": t("Parallele Iterationen"),
    "default": 1,
}
  • level: autoitems-Pfad wie bisher
  • level: documentsarchive.children (Documents)
  • level: structuralNodesdocument.children (Pages/Sections/Slides/Sheets)
  • level: contentBlocksstructuralNode.children (Text/Image/Table/...)
  • concurrency: 1 → sequentiell wie heute
  • concurrency: 10 → 10 Iterationen gleichzeitig via asyncio.Semaphore

F) flow.merge Erweiterung

# Neuer Parameter: Dynamische Input-Anzahl
{
    "name": "inputCount",
    "type": "number",
    "required": False,
    "frontendType": "number",
    "frontendOptions": {"min": 2, "max": 5},
    "description": t("Anzahl Eingänge"),
    "default": 2,
}

inputs und inputPorts werden dynamisch basierend auf inputCount generiert. Backend: nodeRegistry.py muss dynamische Input-Ports unterstützen.

G) High-Volume Engine-Optimierungen

Problem-Analyse (10.000 Iterationen)

Engpass Heute Lösung
Memory: Base64-Bilder Alle Bilder als Strings in nodeOutputs fileRef statt inline raw. On-Demand-Loading
Memory: items-Array Gesamte items-Liste im Loop-Node-Output Bei references-Modus: nur IDs, ~100 Bytes pro Item
Memory: Aggregate _aggregateAccumulators wächst unbegrenzt in-Memory Streaming-Aggregate: bei >1000 Items periodisch in temp-Storage flushen
DB: AutoStepLog Insert + Update pro Body-Node pro Iteration StepLog-Batching: bei >100 Iterationen nur jede N-te loggen + Summary am Ende
DB: updateRun Am Ende: gesamte nodeOutputs als JSONB Aggregate-Daten als File-Referenz statt inline
CPU: ExtractorRegistry Neue Instanz pro Datei im ZIP (_addFilePart) Singleton-Pattern: einmal erstellen, wiederverwenden
CPU: PDF-Memory buf.getvalue() kopiert gesamtes PDF für PyMuPDF Direkt BytesIO übergeben, keine Kopie
Laufzeit: Sequentiell 10k × 3s = 8.3h concurrency: 10 → ~50 Min
SSE: StepEvents Event pro Step-Änderung Bei High-Volume: nur Progress-Summary (z.B. "Iteration 5000/10000")

StepLog-Batching (executionEngine.py)

# Heuristik: ab 100 Iterationen → Batch-Modus
if len(items) > STEPLOG_BATCH_THRESHOLD:
    stepLogMode = "batch"  # nur jede 100. Iteration + Fehler + Summary
else:
    stepLogMode = "full"   # wie heute: jeder Step einzeln

Loop-Concurrency (executionEngine.py)

concurrency = (node.get("parameters") or {}).get("concurrency", 1)
semaphore = asyncio.Semaphore(concurrency)

async def _processIteration(idx, item):
    async with semaphore:
        nodeOutputs_local = dict(nodeOutputs)  # lokale Kopie pro Iteration
        nodeOutputs_local[nodeId] = {"currentItem": item, "currentIndex": idx, ...}
        for body_node in body_ordered:
            ...

tasks = [_processIteration(idx, item) for idx, item in enumerate(items)]
results = await asyncio.gather(*tasks)

Achtung: Bei concurrency > 1 braucht jede Iteration eigene nodeOutputs, da Body-Nodes sich sonst gegenseitig überschreiben. Aggregate-Accumulation muss thread-safe sein (Lock oder Queue).

Streaming-Aggregate

AGGREGATE_FLUSH_THRESHOLD = 1000

# Im Loop-Body:
_aggregateAccumulators[bnid].extend(accItems)
if len(_aggregateAccumulators[bnid]) >= AGGREGATE_FLUSH_THRESHOLD:
    _flushAggregateToTemp(bnid, _aggregateAccumulators[bnid])
    _aggregateAccumulators[bnid] = []

Temp-Storage: In-Memory-Buffer oder temp-File. Am Ende: alle Chunks zusammenführen.

H) AI-Kennzeichnung auf Nodes im Editor

Jeder Node im Editor soll sofort sichtbar machen, ob er AI nutzt oder deterministisch arbeitet. Dies ist wichtig für Kostentransparenz und Workflow-Design.

Umsetzung

Backend: Jede Node-Definition bekommt ein neues Feld meta.usesAi (Boolean):

# Beispiel: ai.prompt → AI
"meta": {"icon": "mdi-robot", "color": "#9C27B0", "usesAi": True}

# Beispiel: data.filter → kein AI
"meta": {"icon": "mdi-filter-outline", "color": "#607D8B", "usesAi": False}

Durch den Split von Nodes mit gemischtem AI-Verhalten (z.B. data.consolidate vs ai.consolidate) ist jeder Node eindeutig — kein "optional" nötig.

Frontend: Im FlowCanvas.tsx (Zeile ~846-852, Node-Rendering) wird ein kleines Badge/Indicator angezeigt:

  • usesAi: true → AI-Badge (z.B. kleines "AI"-Label oder Blitz-Icon oben rechts am Node)
  • usesAi: false oder nicht gesetzt → kein Badge

Gleiche Kennzeichnung in der NodeSidebar (Palette), damit der User schon beim Drag&Drop sieht, welche Nodes AI nutzen.

Zuordnung aller bestehenden Nodes

Node usesAi Begründung
trigger.* false Reine Auslöser
input.* false Formulare, manuelle Eingabe
flow.ifElse false Bedingungslogik
flow.switch false Bedingungslogik
flow.loop false Iteration
flow.merge false Zusammenführung
data.aggregate false Sammeln
data.transform false Feld-Mapping
data.filter false Filterlogik
data.consolidate (NEU) false Deterministisch: merge, concat, table, CSV-Join
ai.consolidate (NEU) true AI-gestützt: summarize, classify, semantic merge
context.extractContent (NEU) false Reine Parser-Arbeit
ai.prompt true LLM-Call
ai.webResearch true Web-Suche + LLM
ai.summarizeDocument true LLM-Zusammenfassung
ai.translateDocument true LLM-Übersetzung
ai.convertDocument true LLM-Konvertierung
ai.generateDocument true LLM-Generierung
ai.generateCode true LLM-Codegenerierung
email.* false E-Mail-Operationen
sharepoint.* false SharePoint-Operationen
clickup.* false ClickUp-Operationen
file.create false Datei-Erstellung
trustee.refreshAccountingData false Datenimport aus externem System
trustee.extractFromFiles true AI-Extraktion (Prompt-basiert, Dokumenttyp-Erkennung)
trustee.processDocuments false TrusteeDocument/Position aus Extraktionsergebnis erstellen
trustee.syncToAccounting false Übertragung in Buchhaltung

Entscheidungen

Datum Entscheidung Begründung
2026-04-16 context.extractContent als eigener Node im Editor Heute fehlt komplett. Fundament für gezielten AI-Einsatz. Neue Kategorie context.
2026-04-16 Extraktion ist KEIN AI-Call Reine Parser-Arbeit. AI-Kosten erst bei explizitem AI-Node. Kostenfreie Vorverarbeitung.
2026-04-16 UDM als In-Memory-Format, keine DB-Tabelle UDM fliesst als JSON durch nodeOutputs. Vermeidet Migration.
2026-04-16 Bridge ContentPart ↔ UDM statt Breaking Change Alle bestehenden Konsumenten bleiben funktional.
2026-04-16 Kein Compound-Node / Sub-Graph Zu hoher Aufwand. Bestehende Body-Erkennung via BFS genügt.
2026-04-16 Consolidate als Split in data.consolidate + ai.consolidate data.* = deterministisch, ai.* = AI-gestützt. Kein "optional" Badge nötig — jeder Node ist eindeutig.
2026-04-16 level-Parameter am Loop Weniger Nodes in der Palette, einfachere UX.
2026-04-16 Fan-out im Loop-Body nutzt bestehendes Modell Engine unterstützt es bereits: Topo-Sort + sequentielle Body-Abarbeitung garantieren korrekte Reihenfolge. Kein Engine-Umbau nötig.
2026-04-16 outputDetail: "references" für High-Volume 10k PDFs als Referenzliste (~10 KB) statt vollständig extrahiert (~4 GB). On-Demand-Loading pro Iteration.
2026-04-16 Loop-Concurrency als opt-in Parameter Default bleibt 1 (sequentiell, deterministisch). Power-User setzen höher für Durchsatz.
2026-04-16 StepLog-Batching ab Schwellwert 50k DB-Inserts bei 10k Iterationen ist nicht tragbar. Batch-Modus loggt nur Summary + Fehler.
2026-04-16 AI-Badge auf allen Nodes im Editor Kostentransparenz. User sieht sofort, welche Nodes AI-Credits verbrauchen. meta.usesAi als Boolean.
2026-04-16 Nodes mit gemischtem AI-Verhalten splitten data.consolidate (deterministisch) + ai.consolidate (AI). Kein "optional"-Badge nötig — jeder Node ist eindeutig true oder false.

Umsetzungs-Checkliste

Phase 1: UDM-Datenmodell & Bridge (Gateway)

Cursor-Empfehlung: Composer (Fast) reicht. Reine Pydantic-Modelle und Utility-Funktionen — klar definiert, wenig Kontext nötig.

  • Pydantic-Modelle in datamodels/datamodelUdm.py (neue Datei)
  • UdmContentBlock.fileRef Feld für Lazy-Loading
  • Bridge-Funktionen _contentPartsToUdm() / _udmToContentParts()
  • Unit-Tests für Modelle und Bridge
  • ExtractionOptions.outputFormat Feld ("parts" | "udm" | "both")

Phase 2: Extractor-Adapter (Gateway)

Cursor-Empfehlung: Composer (Fast) reicht. Pro Extractor ein isoliertes File mit klarem Pattern. Memory-Fixes (buf.getvalue(), Registry-Singleton) sind punktuelle Änderungen.

  • extractorPdf.py → UDM-Output + Fix buf.getvalue() (Memory)
  • extractorDocx.py → UDM-Output
  • extractorPptx.py → UDM-Output
  • extractorXlsx.py → UDM-Output
  • extractorHtml.py → UDM-Output
  • extractorContainer.py → Lazy-Modus (Inhaltsverzeichnis statt alles extrahieren)
  • extractorContainer.pyExtractorRegistry Singleton statt new pro Datei
  • mainServiceExtraction.pyextractContent() mit UDM-Option
  • Integration-Tests pro Format

Phase 3: Extract-Node — Strukturextraktion ohne AI (Gateway + Frontend)

Cursor-Empfehlung: Opus 4.6 empfohlen. Neue Datei + Integration in mehrere bestehende Systeme (NodeDefinitions, PortTypes, Executor-Mapping, Frontend-Komponente). Erfordert Verständnis der gesamten Node-Architektur über ~6 Dateien hinweg.

  • nodeDefinitions/context.py → Node context.extractContent
  • nodeDefinitions/__init__.pyCONTEXT_NODES in STATIC_NODE_TYPES
  • UdmDocument Port-Typ in portTypes.py
  • Input-Extractor _extractUdmDocument() in portTypes.py
  • executionEngine.py_getExecutor(): Branch context.*ActionNodeExecutor hinzufügen (heute nur: ai., email., sharepoint., clickup., file., trustee.)
  • ActionNodeExecutorcontext.extractContent via _method/_action Mapping (nutzt bestehendes MethodContext.extractContent)
  • extractContent.py → UDM-Output-Modus + outputDetail: "references" Modus
  • Frontend: ExtractContentNodeConfig.tsx
  • Frontend: Node in Sidebar-Palette unter Kategorie "Kontext"

Phase 4: Workflow Nodes — ForEach, Merge & Consolidate (Gateway + Frontend)

Cursor-Empfehlung: Opus 4.6 empfohlen. Komplexeste Phase — berührt Execution-Engine, Port-System, Flow-Executor, Data-Executor und Frontend gleichzeitig. Concurrency-Design erfordert tiefes Verständnis der bestehenden Loop-Logik und Race-Conditions.

  • data.consolidate Node-Definition (nodeDefinitions/data.py) — deterministisch: merge, concat, table, CSV-Join
  • ai.consolidate Node-Definition (nodeDefinitions/ai.py) — AI-gestützt: summarize, classify, semantic merge
  • ConsolidateResult Port-Typ (portTypes.py)
  • DataExecutor → Consolidate-Logik (dataExecutor.py)
  • methodAi/actions/consolidate.py → neue Action-Datei (LLM-Call mit aggregierten Daten)
  • methodAi/methodAi.py → Action consolidate registrieren
  • ai.consolidate wird über bestehenden ActionNodeExecutor geroutet — kein eigener Executor nötig
  • data.filter → UDM-Content-Type-Presets (Filter auf contentType, index, attributes)
  • flow.looplevel-Parameter (nodeDefinitions/flow.py)
  • flow.loopconcurrency-Parameter (nodeDefinitions/flow.py)
  • flow.merge → dynamische Input-Anzahl inputCount (2-5)
  • FlowExecutor._loop() → UDM-Level-Auflösung (flowExecutor.py)
  • UdmNodeList Port-Typ (portTypes.py)
  • Frontend: DataConsolidateNodeConfig.tsx (deterministisch)
  • Frontend: AiConsolidateNodeConfig.tsx (AI-gestützt)
  • Frontend: Loop-NodeConfig → UDM-Level-Selector + Concurrency-Slider
  • Frontend: Merge-NodeConfig → Input-Count-Selector
  • Frontend: Visuelle Loop-Body-Markierung im Canvas

Phase 5: High-Volume Engine-Optimierungen (Gateway)

Cursor-Empfehlung: Opus 4.6 empfohlen. Async-Concurrency, Thread-Safety, Streaming-Aggregates — subtile Bugs möglich. Load-Test-Design braucht Überblick über das gesamte System.

  • executionEngine.py → Loop-Concurrency via asyncio.Semaphore mit isolierten nodeOutputs pro Iteration
  • executionEngine.py → StepLog-Batching (Schwellwert 100 Iterationen: nur Summary + Fehler loggen)
  • executionEngine.py → Streaming-Aggregate (Flush-Threshold, temp-Storage)
  • executionEngine.py → Progress-SSE bei High-Volume: Summary statt pro-Step-Events
  • extractorContainer.pyExtractorRegistry Singleton (Performance-Fix)
  • extractorPdf.pyBytesIO direkt an fitz übergeben statt getvalue() (Memory-Fix)
  • Lazy-Content-Loader: Utility-Funktion die fileRef → Bytes auflöst, on-demand im Loop
  • Load-Test: 1000 PDFs aus ZIP → Extract → AI Vision → CSV → Consolidate

Phase 6: AI-Badge — Visuelle Kennzeichnung im Editor (Gateway + Frontend)

Cursor-Empfehlung: Composer (Fast) reicht. Kleines meta.usesAi-Feld pro Node-Definition (Backend) + Badge-Komponente im Canvas (Frontend). Klar abgegrenzte Änderung.

  • Alle bestehenden Node-Definitionen: meta.usesAi Feld ergänzen (true / false)
  • FlowCanvas.tsx → AI-Badge-Rendering: kleines "AI"-Label / Blitz-Icon oben rechts am Node wenn usesAi: true
  • NodeSidebar.tsx → AI-Kennzeichnung in der Palette (Badge neben Node-Name)
  • Tooltip auf Badge: "Dieser Schritt nutzt AI und verbraucht Credits"
  • CSS/Styling: Badge-Design konsistent mit bestehendem Design-System

Phase 7: Agent-Tools & Integration (Gateway)

Cursor-Empfehlung: Composer (Fast) reicht für die Tool-Funktionen. Opus 4.6 für E2E-Tests — die Tests müssen viele Module korrekt zusammenspielen lassen.

  • _documentTools.py → UDM-Tools (walkUdmBlocks, filterUdmByType, getUdmStructure)
  • E2E-Test: PDF → Extract (UDM) → Loop (pro Seite) → AI → Aggregate → Consolidate
  • E2E-Test: PDF → Extract → Filter (nur Tabellen) → AI → kein Loop
  • E2E-Test: ZIP → Extract (references) → Loop (Fan-out) → Vision + Text → Merge → Aggregate → Consolidate

Phase 8: Dokumentation & Abschluss

Cursor-Empfehlung: Composer (Fast) reicht. Reine Dokumentation und Verifikation.

  • RBAC / Permissions: Keine Änderung nötig
  • Neutralisierung: ContentBlock.raw mit sensiblen Daten → Bridge zu ContentPart
  • Navigation / Routing: Keine Änderung
  • Billing-Impact: Loop-Iterationen zählen als einzelne AI-Calls → bestehende Logik greift

Akzeptanzkriterien

# Kriterium (Given-When-Then) Prio
1 Given ein PDF mit 5 Seiten, When context.extractContent-Node ausgeführt, Then UdmDocument mit 5 StructuralNodes (role=page) — kein AI-Call, nur Parsing must
2 Given der Extract-Node-Output (UDM), When mit data.filter auf contentType=="table" gefiltert, Then nur Tabellen-ContentBlocks im Output must
3 Given ein DOCX mit 3 Sections, When UDM-Extraktion, Then 3 StructuralNodes mit role=section und korrekten Labels must
4 Given ein UDM-Dokument, When _udmToContentParts(), Then identische ContentPart-Liste wie bei direkter Extraktion (Bridge) must
5 Given ein Workflow mit Loop über document.children, When ausgeführt, Then Body-Nodes pro StructuralNode einmal durchlaufen must
6 Given ForEach → AI → Aggregate → Consolidate, When 3 Seiten verarbeitet, Then Consolidate erhält 3 Ergebnisse und erzeugt Zusammenfassung must
7 Given Loop mit Fan-out zu 2 AI-Nodes + Merge, When pro Seite ausgeführt, Then Merge erhält beide AI-Ergebnisse pro Iteration must
8 Given outputDetail=references auf ZIP mit 1000 PDFs, When Extract ausgeführt, Then Referenzliste mit 1000 Einträgen, Memory < 100 MB must
9 Given Loop mit concurrency: 5 über 100 Items, When ausgeführt, Then ~5x schneller als concurrency: 1 should
10 Given Loop mit 10.000 Iterationen, When ausgeführt, Then weniger als 1000 AutoStepLog-Einträge (Batching) should
11 Given context.extractContent mit outputDetail=structure, When ausgeführt, Then nur Skelett ohne raw-Daten should
12 Given Loop-Workflow im Editor, When User den Graph betrachtet, Then Loop-Body-Nodes visuell als zusammengehörig erkennbar should
13 Given data.consolidate mit mode=table, When 100 CSV-Zeilen als Input, Then ein zusammengefügtes CSV als Output should
14 Given ZIP mit 10.000 PDFs, When E2E-Workflow (Extract refs → Loop → AI Vision Seite 1 → CSV), Then erfolgreich in < 24h (mit concurrency: 10) nice
15 Given ZIP mit 3 PDFs, When UDM-Extraktion, Then UdmArchive mit 3 UdmDocuments nice
16 Given ein Workflow mit ai.prompt und data.filter Nodes, When User den Graph betrachtet, Then AI-Badge nur auf ai.prompt sichtbar, nicht auf data.filter must
17 Given data.consolidate und ai.consolidate nebeneinander im Editor, When User vergleicht, Then nur ai.consolidate hat AI-Badge, data.consolidate nicht must

Testplan

Stand beim Archivieren: Kerntests liegen in den unter „Abschlussbericht“ genannten Dateien. Die ursprünglich geplanten Einzelpfade (T2, T3, T6T17) können bei Bedarf als separates Backlog ergänzt werden.

ID AC Art Automatisiert Repo-Pfad Status
T1 1 unit ja gateway/tests/unit/datamodels/test_udm_models.py done
T2 1 integration ja gateway/tests/integration/extraction/test_extract_udm_pipeline.py (statt urspr. test_extract_node.py) done
T3 2 integration ja — (Backlog: dedizierter Filter-Flow) backlog
T4 3 unit ja gateway/tests/unit/datamodels/test_udm_models.py done
T5 4 unit ja gateway/tests/unit/datamodels/test_udm_bridge.py done
T6 1,3 integration ja gateway/tests/integration/extraction/test_extract_udm_pipeline.py done
T7 5 integration ja — (Backlog) backlog
T8 6 integration ja gateway/tests/integration/workflows/test_execute_graph_loop_aggregate_consolidate.py done
T9 7 integration ja — (Backlog) backlog
T10 8 integration ja — (Backlog) backlog
T11 9 integration ja — (Backlog) backlog
T12 10 unit ja — (Backlog) backlog
T13 11 unit ja — (Backlog) backlog
T14 12 manual nein backlog
T15 13 integration ja — (Backlog) backlog
T16 14 load nein — (manueller Load-Test) backlog
T17 15 integration ja — (Backlog) backlog
T18 16 unit ja gateway/tests/unit/nodeDefinitions/test_usesai_flag.py done
T19 17 manual nein — (visueller Check im Editor) backlog

  • Idee: c-work/0-ideas/unified-document-model.md
  • Workflow-Engine Referenz: b-reference/gateway/workflow.md
  • AI-Agent Referenz: b-reference/gateway/ai-agent.md
  • Port-System Referenz: c-work/4-done/2026-04-generic-graph-editor.md

Abschluss

  • b-reference/ aktualisiert: gateway/workflow.md (neue Nodes, UDM-Loop-Pattern, Concurrency), gateway/ai-agent.md (neue UDM-Tools)
  • TOPICS.md aktualisiert (neues Thema: UDM)
  • Dieses Dokument → wiki/z-archive/2026-04-unified-document-model.md archiviert