wiki/c-work/4-done/2026-04-unified-document-model.md

685 lines
39 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!-- status: archived -->
<!-- started: 2026-04-16 -->
<!-- completed: 2026-04-16 -->
<!-- component: gateway | frontend-nyla | platform -->
# Unified Document Model (UDM) — Dokumenten-Extraktion, Workflow ForEach & High-Volume
> **Abgeschlossen am 2026-04-16.** Technische Referenz: `b-reference/gateway/workflow.md` (UDM, Nodes, Engine), `b-reference/gateway/ai-agent.md` (UDM-Tools). Konzept: `c-work/0-ideas/unified-document-model.md`.
## Abschlussbericht
- **Umgesetzt:** UDM-Datenmodell und Bridge, Extraktions-Pipeline (inkl. Registry-Singleton, PDF-Memory), `context.extractContent`, Port-Typen und Executor-Routing (`context.*` → `ActionNodeExecutor`), `data.consolidate` / `ai.consolidate`, Loop (`level`, `concurrency`), `flow.merge` mit dynamischer Input-Anzahl, `data.filter` mit UDM-Content-Type-Presets, Engine (Concurrency, StepLog-Batching, Streaming-Aggregate, Progress), `meta.usesAi` und AI-Badge im Editor, Agent-Tools (`getUdmStructure`, `walkUdmBlocks`, `filterUdmByType`), Referenzdoku in `b-reference/` und Eintrag in `TOPICS.md`.
- **Tests im Repo (Auszug):** `gateway/tests/unit/datamodels/test_udm_models.py`, `test_udm_bridge.py`, `gateway/tests/integration/extraction/test_extract_udm_pipeline.py`, `gateway/tests/integration/workflows/test_execute_graph_loop_aggregate_consolidate.py`, `gateway/tests/unit/nodeDefinitions/test_usesai_flag.py`, `gateway/tests/unit/serviceAgent/test_udm_agent_tools.py`.
- **Backlog / optional:** Erweiterung des ursprünglichen Testplan-Rasters (High-Volume-Referenzen, Fan-out/Merge-Integration, Concurrency-Benchmark, Load-Test 10k) nach Bedarf; UX wie explizite Loop-Body-Hervorhebung oder dedizierte NodeConfig-Komponenten können nachgelagert werden.
## Grundlage: UDM-Konzept
Dieses Feature basiert auf dem **Unified Document Model (UDM)** — einer generischen, formatunabhängigen 3-Ebenen-Baumstruktur für Dokumenten-Extraktion. Das vollständige Konzept mit Datenmodell, Format-Mappings (PDF, DOCX, PPTX, XLSX, HTML), Workflow-Integration, Archive-Handling und JSON-Beispielen ist hier definiert:
**`c-work/0-ideas/unified-document-model.md`**
**Kernprinzipien des UDM:**
- **3-Ebenen-Garantie:** Jedes Dokument hat `Document``StructuralNode[]``ContentBlock[]`
- **Einheitliche Blattknoten:** Alle atomaren Inhalte sind `ContentBlock`-Objekte mit identischer Struktur
- **Generische Traversierung:** Workflow-Nodes arbeiten formatunabhängig über dieselbe Baumstruktur
- **Keine formatspezifischen Zwischenschichten:** Konzepte wie "Paragraph", "Row", "Cell" werden in `ContentBlock.attributes` absorbiert
| Ebene | Typ | Beschreibung |
|-------|-----|-------------|
| Level 1 | `Document` | Wurzelknoten pro Quelldatei |
| Level 2 | `StructuralNode` | Seite, Abschnitt, Slide oder Sheet |
| Level 3 | `ContentBlock` | Atomarer Inhalt: Text, Bild, Tabelle, Code, Media, Link, Formel |
Dieser Plan beschreibt **die Umsetzung** des UDM-Konzepts im Gateway, die nötigen Workflow-Nodes, und die High-Volume-Skalierbarkeit.
---
## Beschreibung und Kontext
Das bestehende Dokumenten-Extraktionssystem arbeitet mit `ContentPart` / `ContentExtracted` als flacher Liste. Das UDM ersetzt dieses Modell durch eine **hierarchische Baumstruktur**, die es erlaubt, über Level-Attribute (Seiten, Sections, Sheets, Slides) generisch zu iterieren.
**Business-Treiber:** Workflows benötigen die Fähigkeit, ein Dokument zu extrahieren und dann **pro Struktureinheit** (z.B. pro PDF-Seite) eine Kette von Verarbeitungsschritten auszuführen — und am Ende die Teilergebnisse zu konsolidieren. Zusätzlich muss das System mit **grossen Datenmengen** umgehen können (z.B. ZIP mit 10.000 PDFs).
**Heutige Lücken:**
1. Es gibt **keinen Extract-Node** im Graphical Editor — `context.extractContent` existiert nur als Workspace-/Agent-Action, nicht als visuellen Node
2. Das Extraktionsmodell liefert keine hierarchische Baum-Struktur (nur flache `ContentPart`-Liste)
3. Der `flow.loop`-Node hat keine Sub-Workflow-Modellierung — der Loop-Body wird nur über Graph-Topologie implizit erkannt
4. Es gibt keinen dedizierten **Consolidate**-Schritt für strukturierte Zusammenführung nach ForEach
5. Die Engine ist **nicht skalierbar** für >1000 Iterationen (Memory, DB-Last, keine Parallelität)
6. Nodes im Editor haben **keine visuelle Kennzeichnung**, ob sie AI nutzen oder deterministisch arbeiten
**Abhängigkeiten:** AI-Agent-Tools (`_documentTools.py`), Workflow-Engine (`executionEngine.py`), Graphical Editor (Frontend), Extraktion (`serviceExtraction`).
**Risiko bei Nicht-Umsetzung:** Dokument-intensive Workflows (Trustee, Compliance-Audit, Massendokumentverarbeitung) bleiben manuell oder erfordern Custom-Code pro Use Case.
---
## Fokus und kritische Details
- **Migration des Extraktionsmodells:** `ContentPart` → UDM muss rückwärtskompatibel sein. Bestehende Konsumenten (`ChatContentExtracted`, Agent-Tools, Neutralisierung) dürfen nicht brechen.
- **Loop-Body mit parallelen Pfaden:** Fan-out von Loop-Output zu mehreren Nodes + Merge pro Iteration funktioniert in der Engine bereits (Topo-Sort garantiert Reihenfolge), aber `flow.merge` hat heute fix 2 Inputs.
- **High-Volume-Skalierbarkeit:** Bei 10k Iterationen entstehen Engpässe bei Memory (Base64-Bilder in nodeOutputs), DB (AutoStepLog pro Body-Node pro Iteration), und Laufzeit (sequentielle Verarbeitung).
- **Extractor-Performance:** `extractorContainer.py` erstellt pro Datei im ZIP eine neue `ExtractorRegistry()`-Instanz (Auto-Discovery). Bei 10k Dateien ist das ein massiver Overhead.
- **PDF-Memory:** `extractorPdf.py` lädt via `buf.getvalue()` eine Kopie des gesamten PDFs — doppelter Memory-Verbrauch pro Datei.
---
## Ziel und Nicht-Ziele
### Ziele
1. **`context.extractContent`-Node** im Graphical Editor — reine Strukturextraktion OHNE AI
2. **UDM-Datenmodell** (`Document`, `StructuralNode`, `ContentBlock`, `Archive`) als Pydantic-Modelle
3. **Extractor-Adapter:** Bestehende Extractors liefern zusätzlich UDM-Output; Bridge `ContentPart` ↔ UDM
4. **ForEach-Workflow-Pattern:** Loop über UDM-Struktureinheiten mit parallelen Pfaden im Body und Merge pro Iteration
5. **Consolidate-Nodes:** `data.consolidate` (deterministisch) + `ai.consolidate` (AI-gestützt) für strukturierte Zusammenführung
6. **High-Volume-Fähigkeit:** ZIP mit 10.000+ PDFs verarbeitbar durch Streaming-Extraktion, Loop-Concurrency und StepLog-Batching
7. **Neue Port-Typen:** `UdmDocument`, `UdmNodeList`, `ConsolidateResult`
8. **Agent-Tools:** Bestehende `browseContainer`/`readContentObjects` um UDM-Traversierung erweitern
9. **AI-Kennzeichnung auf Nodes:** Jeder Node im Editor zeigt visuell, ob er AI nutzt (`meta.usesAi`) — Kostentransparenz und schnelle Übersicht
### Explizit NICHT
- Neue Dateiformate (Markdown, LaTeX, etc.) — kommt separat
- Visuelles Sub-Graph-Grouping (Compound-Nodes) im Editor — wird mit impliziter Body-Erkennung gelöst
- Breaking Change an `ContentPart`-API — Bridge-Layer garantiert Kompatibilität
---
## Betroffene Module
### Gateway
| Modul | Änderung |
|-------|---------|
| `datamodels/datamodelUdm.py` | **Neue Datei**: Pydantic-Klassen `UdmDocument`, `UdmStructuralNode`, `UdmContentBlock`, `UdmArchive`, `UdmPosition`, `UdmMetadata`. Bridge-Funktionen. `UdmContentBlock.raw` optional mit `fileRef`-Alternative für Lazy-Loading |
| `datamodels/datamodelExtraction.py` | `ExtractionOptions.outputFormat` Feld (`"parts"` / `"udm"` / `"both"`) |
| `serviceExtraction/subRegistry.py` | `Extractor`-Interface erweitern: `extractToUdm()`. **ExtractorRegistry als Singleton cachen** (heute: neue Instanz pro Datei im ZIP) |
| `serviceExtraction/extractors/extractorPdf.py` | UDM-Output pro Seite. **Fix `buf.getvalue()`** → direkt `BytesIO` an fitz übergeben (halber Memory) |
| `serviceExtraction/extractors/extractorDocx.py` | UDM-Output pro Section (Heading-basiert) |
| `serviceExtraction/extractors/extractorPptx.py` | UDM-Output pro Slide |
| `serviceExtraction/extractors/extractorXlsx.py` | UDM-Output pro Sheet |
| `serviceExtraction/extractors/extractorHtml.py` | UDM-Output pro semantischem Bereich |
| `serviceExtraction/extractors/extractorContainer.py` | **Lazy-Modus**: ZIP-Inhaltsverzeichnis liefern statt alle Dateien extrahieren. Registry-Singleton statt `new ExtractorRegistry()` pro Datei |
| `serviceExtraction/mainServiceExtraction.py` | `extractContent()` mit UDM-Option. **Streaming-Modus** für grosse Archive |
| `features/graphicalEditor/nodeDefinitions/context.py` | **Neue Datei**: Node `context.extractContent` |
| `features/graphicalEditor/nodeDefinitions/data.py` | Neuer Node: `data.consolidate` (deterministisch) |
| `features/graphicalEditor/nodeDefinitions/ai.py` | Neuer Node: `ai.consolidate` (AI-gestützt) |
| `features/graphicalEditor/nodeDefinitions/flow.py` | `flow.loop`: Parameter `level` (UDM-Ebene) + `concurrency` (parallele Iterationen). `flow.merge`: dynamische Input-Anzahl |
| `features/graphicalEditor/nodeDefinitions/__init__.py` | `CONTEXT_NODES` in `STATIC_NODE_TYPES` aufnehmen |
| `features/graphicalEditor/portTypes.py` | Neue Port-Typen: `UdmDocument`, `UdmNodeList`, `ConsolidateResult`. Input-Extraktoren dazu |
| `workflows/automation2/executors/flowExecutor.py` | Loop: UDM-Array-Auflösung, Concurrency-Support |
| `workflows/automation2/executors/dataExecutor.py` | `data.consolidate`-Logik (deterministisch). Filter: UDM-Content-Type-Presets |
| `workflows/automation2/executionEngine.py` (`_getExecutor`) | Branch `context.*` hinzufügen → `ActionNodeExecutor` |
| `workflows/automation2/executionEngine.py` | **Loop-Concurrency** (N Items parallel), **StepLog-Batching** (bei >100 Iterationen), **Streaming-Aggregate** (periodisch flushen) |
| `workflows/methods/methodContext/actions/extractContent.py` | UDM-Output-Option, Lazy-Modus für Archive |
| `workflows/methods/methodAi/actions/consolidate.py` | **Neue Datei**: Action `consolidate` für `ai.consolidate` (LLM-Call: summarize, classify, semantic merge) |
| `serviceAgent/coreTools/_documentTools.py` | UDM-Tools: `walkUdmBlocks`, `filterUdmByType`, `getUdmStructure` |
### Frontend
| Modul | Änderung |
|-------|---------|
| `FlowEditor/nodes/context/ExtractContentNodeConfig.tsx` | **Neue Datei**: Config-UI für `context.extractContent` |
| `FlowEditor/nodes/loop/LoopNodeConfig.tsx` | UDM-Level-Selektor + Concurrency-Slider |
| `FlowEditor/nodes/shared/LoopItemsSelect.tsx` | UDM-Structural-Level als Quelle |
| `FlowEditor/nodes/shared/types.ts` | Neue Node-Config-Types |
| `FlowEditor/FlowCanvas.tsx` | Visuelle Loop-Body-Markierung (farbiger Hintergrund). **AI-Badge** auf Nodes die AI nutzen |
| `FlowEditor/NodeSidebar.tsx` | Neue Nodes in Palette (`context.extractContent`, `data.consolidate`, `ai.consolidate`). AI-Badge in Palette |
| `FlowEditor/nodes/shared/AiBadge.tsx` | **Neue Datei**: Wiederverwendbare AI-Badge-Komponente (`usesAi: boolean`) |
| `api/workflowApi.ts` | NodeType-Erweiterung |
### DB-Migration
Nein — UDM-Daten fliessen als JSON durch Workflow-Context/nodeOutputs, keine neue DB-Tabelle nötig.
---
## Architektur-Design
### A) UDM-Datenmodell (Gateway)
```python
class UdmMetadata(BaseModel):
title: Optional[str] = None
author: Optional[str] = None
createdAt: Optional[str] = None
modifiedAt: Optional[str] = None
sourcePath: str = ""
tags: List[str] = Field(default_factory=list)
custom: Dict[str, Any] = Field(default_factory=dict)
class UdmBoundingBox(BaseModel):
x: float; y: float; width: float; height: float
unit: Literal["px", "pt", "mm"] = "pt"
class UdmPosition(BaseModel):
index: int
page: Optional[int] = None
row: Optional[int] = None
col: Optional[int] = None
bbox: Optional[UdmBoundingBox] = None
class UdmContentBlock(BaseModel):
id: str
contentType: Literal["text", "image", "table", "code", "media", "link", "formula"]
raw: str = ""
fileRef: Optional[str] = None # Lazy-Loading: Referenz statt inline Base64
mimeType: Optional[str] = None
language: Optional[str] = None
attributes: Dict[str, Any] = Field(default_factory=dict)
position: UdmPosition = Field(default_factory=lambda: UdmPosition(index=0))
metadata: UdmMetadata = Field(default_factory=UdmMetadata)
class UdmStructuralNode(BaseModel):
id: str
role: Literal["page", "section", "slide", "sheet"]
index: int
label: Optional[str] = None
metadata: UdmMetadata = Field(default_factory=UdmMetadata)
children: List[UdmContentBlock] = Field(default_factory=list)
class UdmDocument(BaseModel):
id: str
role: Literal["document"] = "document"
sourceType: Literal["pdf", "docx", "pptx", "xlsx", "html"]
sourcePath: str = ""
metadata: UdmMetadata = Field(default_factory=UdmMetadata)
children: List[UdmStructuralNode] = Field(default_factory=list)
class UdmArchive(BaseModel):
id: str
role: Literal["archive"] = "archive"
sourceType: Literal["zip", "tar", "gz"]
sourcePath: str = ""
metadata: UdmMetadata = Field(default_factory=UdmMetadata)
children: List[Union[UdmArchive, UdmDocument]] = Field(default_factory=list)
```
Schlüssel-Erweiterung für High-Volume: `UdmContentBlock.fileRef` — statt Base64-Daten inline zu speichern, kann eine Datei-Referenz (z.B. `fileId` oder temp-Pfad) hinterlegt werden. Der Inhalt wird erst on-demand geladen, wenn ein downstream Node ihn braucht. Bei 10k PDFs spart das mehrere GB RAM.
### B) Bridge: ContentPart ↔ UDM
```python
def _contentPartsToUdm(extracted: ContentExtracted, sourceType: str, sourcePath: str) -> UdmDocument:
"""Konvertiert flache ContentPart-Liste in UDM-Baum.
Groupiert nach parentId oder typeGroup zu StructuralNodes."""
def _udmToContentParts(document: UdmDocument) -> ContentExtracted:
"""Konvertiert UDM-Baum zurück in flache ContentPart-Liste.
Für Rückwärtskompatibilität."""
```
### C) Neuer Node: `context.extractContent` (Strukturextraktion OHNE AI)
**Problem heute:** `context.extractContent` existiert nur als Method-Action für Workspace/Agent. Im Graphical Editor gibt es keinen Node, um ein Dokument rein strukturell zu zerlegen.
**Lösung:** Ein neuer Node `context.extractContent` in der Kategorie `context`:
1. Dokument(e) entgegennehmen (Input: `DocumentList`)
2. Reine Extraktion (kein AI-Call, nur Parser)
3. UDM-Struktur als Output
4. Downstream gezielt nutzbar: Loop → Filter → AI nur für bestimmte Blöcke
#### Node-Definition
```python
{
"id": "context.extractContent",
"category": "context",
"label": t("Inhalt extrahieren"),
"description": t("Dokumentstruktur extrahieren ohne KI (Seiten, Abschnitte, Bilder, Tabellen)"),
"parameters": [
{"name": "outputDetail", "type": "string", "required": False, "frontendType": "select",
"frontendOptions": {"options": ["full", "structure", "references"]},
"description": t("Detailgrad"), "default": "full"},
{"name": "includeImages", "type": "boolean", "required": False, "frontendType": "checkbox",
"description": t("Bilder extrahieren"), "default": True},
{"name": "includeTables", "type": "boolean", "required": False, "frontendType": "checkbox",
"description": t("Tabellen extrahieren"), "default": True},
],
"inputs": 1,
"outputs": 1,
"inputPorts": {0: {"accepts": ["DocumentList", "Transit"]}},
"outputPorts": {0: {"schema": "UdmDocument"}},
"meta": {"icon": "mdi-file-tree-outline", "color": "#00897B"},
"_method": "context",
"_action": "extractContent",
}
```
#### Parameter `outputDetail`
| Wert | Beschreibung | Use Case |
|------|-------------|----------|
| `full` | Volle UDM-Struktur inkl. Rohdaten (Text, Base64-Bilder, JSON-Tabellen) | Standard — alles downstream verfügbar. Bis ~100 Dokumente. |
| `structure` | Nur Baum-Skelett: Document → StructuralNodes → ContentBlock-Metadaten (ohne `raw`) | Schnelle Vorschau, Routing-Entscheidungen |
| `references` | Datei-Referenzen statt Inline-Daten (`fileRef` statt `raw`). Inhalt wird on-demand im Loop geladen | **High-Volume**: 1000+ Dokumente. Spart RAM. |
### D) ForEach/Consolidate Workflow-Pattern
#### Einfacher Fall: 1 PDF, pro Seite verarbeiten
```
[Upload] → [Extract] → [Loop (pro Seite)] → [AI] → [Aggregate] → [Consolidate]
```
#### Parallele Pfade im Loop-Body: Bild + Text pro Seite
```
[Upload] → [Extract] → [Loop (pro Seite)]
├→ [Filter: Bilder] → [AI Vision] ──┐
└→ [Filter: Text] → [AI Text] ──┤
[Merge (pro Seite)]
[Aggregate]
[Consolidate (alle Seiten → Tabelle)] ←──┘
```
**Warum das heute schon funktioniert:**
- Fan-out: Ein Output-Port kann mehrere Connections haben (Target-Inputs sind limitiert auf 1, Source-Outputs nicht)
- `getLoopBodyNodeIds()` (BFS) findet alle Nodes in beiden Pfaden
- `topoSort` ordnet: Filter + AI vor Merge (weil Merge von beiden abhängt)
- Body-Nodes werden sequentiell in Topo-Sort-Reihenfolge abgearbeitet → Merge hat beide Inputs
**Umgesetzt (Plan):**
- `flow.merge`: dynamische Input-Anzahl (25 via Parameter `inputCount`)
- `data.filter`: UDM-Content-Type-Presets (Filter auf `contentType`, `structuralNode.index`, `attributes.*`)
#### High-Volume-Fall: ZIP mit 10.000 PDFs
```
[Upload ZIP] → [Extract (references)] → [Loop (pro Dokument, concurrency: 10)]
[Lazy-Extract (1 PDF, Seite 1)]
[Filter: Bild Seite 1]
[AI Vision → CSV-Zeile]
[Aggregate (CSV-Zeilen)]
[Consolidate (mode: table → grosses CSV)] ←──┘
```
**Schlüssel:** `outputDetail: "references"` — der erste Extract liefert nur eine leichtgewichtige Liste von 10k Datei-Referenzen (~10 KB statt ~4 GB). Pro Loop-Iteration wird nur 1 PDF on-demand geladen, verarbeitet und wieder freigegeben.
#### High-Volume: ZIP mit Fan-out + Merge pro Dokument
Dasselbe Muster wie bei einem einzelnen PDF (zwei parallele Body-Pfade + Merge pro Iteration), angewendet auf **pro Archiv-Eintrag** statt pro Seite:
```
[Upload ZIP] → [Extract (references)] → [Loop (pro Dokument, concurrency: N)]
├→ [Lazy-Extract / Seite 1] → [AI Vision] ──┐
└→ [Filter: Text] → [AI Text] ────────────┤
[Merge]
[Aggregate]
[Consolidate] ←──────────────────────────────────────────────────────────────────────────┘
```
Pro Iteration gilt: Fan-out und Topo-Sort im Loop-Body wie oben; der erste Extract liefert nur Referenzen, sodass nicht alle PDFs gleichzeitig im RAM liegen.
### E) Flow.loop Erweiterungen
```python
# Neuer Parameter: UDM-Level
{
"name": "level",
"type": "string",
"required": False,
"frontendType": "select",
"frontendOptions": {"options": ["auto", "documents", "structuralNodes", "contentBlocks"]},
"description": t("UDM-Iterationsebene"),
"default": "auto",
}
# Neuer Parameter: Parallele Iterationen
{
"name": "concurrency",
"type": "number",
"required": False,
"frontendType": "number",
"frontendOptions": {"min": 1, "max": 20},
"description": t("Parallele Iterationen"),
"default": 1,
}
```
- `level: auto``items`-Pfad wie bisher
- `level: documents``archive.children` (Documents)
- `level: structuralNodes``document.children` (Pages/Sections/Slides/Sheets)
- `level: contentBlocks``structuralNode.children` (Text/Image/Table/...)
- `concurrency: 1` → sequentiell wie heute
- `concurrency: 10` → 10 Iterationen gleichzeitig via `asyncio.Semaphore`
### F) flow.merge Erweiterung
```python
# Neuer Parameter: Dynamische Input-Anzahl
{
"name": "inputCount",
"type": "number",
"required": False,
"frontendType": "number",
"frontendOptions": {"min": 2, "max": 5},
"description": t("Anzahl Eingänge"),
"default": 2,
}
```
`inputs` und `inputPorts` werden dynamisch basierend auf `inputCount` generiert. Backend: `nodeRegistry.py` muss dynamische Input-Ports unterstützen.
### G) High-Volume Engine-Optimierungen
#### Problem-Analyse (10.000 Iterationen)
| Engpass | Heute | Lösung |
|---------|-------|--------|
| **Memory: Base64-Bilder** | Alle Bilder als Strings in `nodeOutputs` | `fileRef` statt inline `raw`. On-Demand-Loading |
| **Memory: items-Array** | Gesamte `items`-Liste im Loop-Node-Output | Bei `references`-Modus: nur IDs, ~100 Bytes pro Item |
| **Memory: Aggregate** | `_aggregateAccumulators` wächst unbegrenzt in-Memory | **Streaming-Aggregate**: bei >1000 Items periodisch in temp-Storage flushen |
| **DB: AutoStepLog** | Insert + Update pro Body-Node pro Iteration | **StepLog-Batching**: bei >100 Iterationen nur jede N-te loggen + Summary am Ende |
| **DB: updateRun** | Am Ende: gesamte nodeOutputs als JSONB | Aggregate-Daten als File-Referenz statt inline |
| **CPU: ExtractorRegistry** | Neue Instanz pro Datei im ZIP (`_addFilePart`) | **Singleton-Pattern**: einmal erstellen, wiederverwenden |
| **CPU: PDF-Memory** | `buf.getvalue()` kopiert gesamtes PDF für PyMuPDF | Direkt `BytesIO` übergeben, keine Kopie |
| **Laufzeit: Sequentiell** | 10k × 3s = 8.3h | `concurrency: 10` → ~50 Min |
| **SSE: StepEvents** | Event pro Step-Änderung | Bei High-Volume: nur Progress-Summary (z.B. "Iteration 5000/10000") |
#### StepLog-Batching (executionEngine.py)
```python
# Heuristik: ab 100 Iterationen → Batch-Modus
if len(items) > STEPLOG_BATCH_THRESHOLD:
stepLogMode = "batch" # nur jede 100. Iteration + Fehler + Summary
else:
stepLogMode = "full" # wie heute: jeder Step einzeln
```
#### Loop-Concurrency (executionEngine.py)
```python
concurrency = (node.get("parameters") or {}).get("concurrency", 1)
semaphore = asyncio.Semaphore(concurrency)
async def _processIteration(idx, item):
async with semaphore:
nodeOutputs_local = dict(nodeOutputs) # lokale Kopie pro Iteration
nodeOutputs_local[nodeId] = {"currentItem": item, "currentIndex": idx, ...}
for body_node in body_ordered:
...
tasks = [_processIteration(idx, item) for idx, item in enumerate(items)]
results = await asyncio.gather(*tasks)
```
Achtung: Bei `concurrency > 1` braucht jede Iteration **eigene** `nodeOutputs`, da Body-Nodes sich sonst gegenseitig überschreiben. Aggregate-Accumulation muss thread-safe sein (Lock oder Queue).
#### Streaming-Aggregate
```python
AGGREGATE_FLUSH_THRESHOLD = 1000
# Im Loop-Body:
_aggregateAccumulators[bnid].extend(accItems)
if len(_aggregateAccumulators[bnid]) >= AGGREGATE_FLUSH_THRESHOLD:
_flushAggregateToTemp(bnid, _aggregateAccumulators[bnid])
_aggregateAccumulators[bnid] = []
```
Temp-Storage: In-Memory-Buffer oder temp-File. Am Ende: alle Chunks zusammenführen.
### H) AI-Kennzeichnung auf Nodes im Editor
Jeder Node im Editor soll **sofort sichtbar** machen, ob er AI nutzt oder deterministisch arbeitet. Dies ist wichtig für Kostentransparenz und Workflow-Design.
#### Umsetzung
**Backend:** Jede Node-Definition bekommt ein neues Feld `meta.usesAi` (Boolean):
```python
# Beispiel: ai.prompt → AI
"meta": {"icon": "mdi-robot", "color": "#9C27B0", "usesAi": True}
# Beispiel: data.filter → kein AI
"meta": {"icon": "mdi-filter-outline", "color": "#607D8B", "usesAi": False}
```
Durch den Split von Nodes mit gemischtem AI-Verhalten (z.B. `data.consolidate` vs `ai.consolidate`) ist jeder Node **eindeutig** — kein `"optional"` nötig.
**Frontend:** Im `FlowCanvas.tsx` (Zeile ~846-852, Node-Rendering) wird ein kleines Badge/Indicator angezeigt:
- `usesAi: true` → AI-Badge (z.B. kleines "AI"-Label oder Blitz-Icon oben rechts am Node)
- `usesAi: false` oder nicht gesetzt → kein Badge
Gleiche Kennzeichnung in der **NodeSidebar** (Palette), damit der User schon beim Drag&Drop sieht, welche Nodes AI nutzen.
#### Zuordnung aller bestehenden Nodes
| Node | `usesAi` | Begründung |
|------|----------|------------|
| `trigger.*` | `false` | Reine Auslöser |
| `input.*` | `false` | Formulare, manuelle Eingabe |
| `flow.ifElse` | `false` | Bedingungslogik |
| `flow.switch` | `false` | Bedingungslogik |
| `flow.loop` | `false` | Iteration |
| `flow.merge` | `false` | Zusammenführung |
| `data.aggregate` | `false` | Sammeln |
| `data.transform` | `false` | Feld-Mapping |
| `data.filter` | `false` | Filterlogik |
| `data.consolidate` (NEU) | `false` | Deterministisch: merge, concat, table, CSV-Join |
| `ai.consolidate` (NEU) | `true` | AI-gestützt: summarize, classify, semantic merge |
| `context.extractContent` (NEU) | `false` | Reine Parser-Arbeit |
| `ai.prompt` | `true` | LLM-Call |
| `ai.webResearch` | `true` | Web-Suche + LLM |
| `ai.summarizeDocument` | `true` | LLM-Zusammenfassung |
| `ai.translateDocument` | `true` | LLM-Übersetzung |
| `ai.convertDocument` | `true` | LLM-Konvertierung |
| `ai.generateDocument` | `true` | LLM-Generierung |
| `ai.generateCode` | `true` | LLM-Codegenerierung |
| `email.*` | `false` | E-Mail-Operationen |
| `sharepoint.*` | `false` | SharePoint-Operationen |
| `clickup.*` | `false` | ClickUp-Operationen |
| `file.create` | `false` | Datei-Erstellung |
| `trustee.refreshAccountingData` | `false` | Datenimport aus externem System |
| `trustee.extractFromFiles` | `true` | AI-Extraktion (Prompt-basiert, Dokumenttyp-Erkennung) |
| `trustee.processDocuments` | `false` | TrusteeDocument/Position aus Extraktionsergebnis erstellen |
| `trustee.syncToAccounting` | `false` | Übertragung in Buchhaltung |
---
## Entscheidungen
| Datum | Entscheidung | Begründung |
|-------|-------------|------------|
| 2026-04-16 | `context.extractContent` als eigener Node im Editor | Heute fehlt komplett. Fundament für gezielten AI-Einsatz. Neue Kategorie `context`. |
| 2026-04-16 | Extraktion ist KEIN AI-Call | Reine Parser-Arbeit. AI-Kosten erst bei explizitem AI-Node. Kostenfreie Vorverarbeitung. |
| 2026-04-16 | UDM als In-Memory-Format, keine DB-Tabelle | UDM fliesst als JSON durch nodeOutputs. Vermeidet Migration. |
| 2026-04-16 | Bridge ContentPart ↔ UDM statt Breaking Change | Alle bestehenden Konsumenten bleiben funktional. |
| 2026-04-16 | Kein Compound-Node / Sub-Graph | Zu hoher Aufwand. Bestehende Body-Erkennung via BFS genügt. |
| 2026-04-16 | Consolidate als Split in `data.consolidate` + `ai.consolidate` | `data.*` = deterministisch, `ai.*` = AI-gestützt. Kein `"optional"` Badge nötig — jeder Node ist eindeutig. |
| 2026-04-16 | `level`-Parameter am Loop | Weniger Nodes in der Palette, einfachere UX. |
| 2026-04-16 | Fan-out im Loop-Body nutzt bestehendes Modell | Engine unterstützt es bereits: Topo-Sort + sequentielle Body-Abarbeitung garantieren korrekte Reihenfolge. Kein Engine-Umbau nötig. |
| 2026-04-16 | `outputDetail: "references"` für High-Volume | 10k PDFs als Referenzliste (~10 KB) statt vollständig extrahiert (~4 GB). On-Demand-Loading pro Iteration. |
| 2026-04-16 | Loop-Concurrency als opt-in Parameter | Default bleibt `1` (sequentiell, deterministisch). Power-User setzen höher für Durchsatz. |
| 2026-04-16 | StepLog-Batching ab Schwellwert | 50k DB-Inserts bei 10k Iterationen ist nicht tragbar. Batch-Modus loggt nur Summary + Fehler. |
| 2026-04-16 | AI-Badge auf allen Nodes im Editor | Kostentransparenz. User sieht sofort, welche Nodes AI-Credits verbrauchen. `meta.usesAi` als Boolean. |
| 2026-04-16 | Nodes mit gemischtem AI-Verhalten splitten | `data.consolidate` (deterministisch) + `ai.consolidate` (AI). Kein "optional"-Badge nötig — jeder Node ist eindeutig `true` oder `false`. |
---
## Umsetzungs-Checkliste
### Phase 1: UDM-Datenmodell & Bridge (Gateway)
> **Cursor-Empfehlung:** Composer (Fast) reicht. Reine Pydantic-Modelle und Utility-Funktionen — klar definiert, wenig Kontext nötig.
- [x] Pydantic-Modelle in `datamodels/datamodelUdm.py` (neue Datei)
- [x] `UdmContentBlock.fileRef` Feld für Lazy-Loading
- [x] Bridge-Funktionen `_contentPartsToUdm()` / `_udmToContentParts()`
- [x] Unit-Tests für Modelle und Bridge
- [x] `ExtractionOptions.outputFormat` Feld (`"parts"` | `"udm"` | `"both"`)
### Phase 2: Extractor-Adapter (Gateway)
> **Cursor-Empfehlung:** Composer (Fast) reicht. Pro Extractor ein isoliertes File mit klarem Pattern. Memory-Fixes (`buf.getvalue()`, Registry-Singleton) sind punktuelle Änderungen.
- [x] `extractorPdf.py` → UDM-Output + Fix `buf.getvalue()` (Memory)
- [x] `extractorDocx.py` → UDM-Output
- [x] `extractorPptx.py` → UDM-Output
- [x] `extractorXlsx.py` → UDM-Output
- [x] `extractorHtml.py` → UDM-Output
- [x] `extractorContainer.py` → Lazy-Modus (Inhaltsverzeichnis statt alles extrahieren)
- [x] `extractorContainer.py``ExtractorRegistry` Singleton statt `new` pro Datei
- [x] `mainServiceExtraction.py``extractContent()` mit UDM-Option
- [x] Integration-Tests pro Format
### Phase 3: Extract-Node — Strukturextraktion ohne AI (Gateway + Frontend)
> **Cursor-Empfehlung:** **Opus 4.6 empfohlen.** Neue Datei + Integration in mehrere bestehende Systeme (NodeDefinitions, PortTypes, Executor-Mapping, Frontend-Komponente). Erfordert Verständnis der gesamten Node-Architektur über ~6 Dateien hinweg.
- [x] `nodeDefinitions/context.py` → Node `context.extractContent`
- [x] `nodeDefinitions/__init__.py``CONTEXT_NODES` in `STATIC_NODE_TYPES`
- [x] `UdmDocument` Port-Typ in `portTypes.py`
- [x] Input-Extractor `_extractUdmDocument()` in `portTypes.py`
- [x] `executionEngine.py``_getExecutor()`: Branch `context.*``ActionNodeExecutor` hinzufügen (heute nur: `ai.`, `email.`, `sharepoint.`, `clickup.`, `file.`, `trustee.`)
- [x] `ActionNodeExecutor``context.extractContent` via `_method`/`_action` Mapping (nutzt bestehendes `MethodContext.extractContent`)
- [x] `extractContent.py` → UDM-Output-Modus + `outputDetail: "references"` Modus
- [x] Frontend: `ExtractContentNodeConfig.tsx`
- [x] Frontend: Node in Sidebar-Palette unter Kategorie "Kontext"
### Phase 4: Workflow Nodes — ForEach, Merge & Consolidate (Gateway + Frontend)
> **Cursor-Empfehlung:** **Opus 4.6 empfohlen.** Komplexeste Phase — berührt Execution-Engine, Port-System, Flow-Executor, Data-Executor und Frontend gleichzeitig. Concurrency-Design erfordert tiefes Verständnis der bestehenden Loop-Logik und Race-Conditions.
- [x] `data.consolidate` Node-Definition (`nodeDefinitions/data.py`) — deterministisch: merge, concat, table, CSV-Join
- [x] `ai.consolidate` Node-Definition (`nodeDefinitions/ai.py`) — AI-gestützt: summarize, classify, semantic merge
- [x] `ConsolidateResult` Port-Typ (`portTypes.py`)
- [x] `DataExecutor` → Consolidate-Logik (`dataExecutor.py`)
- [x] `methodAi/actions/consolidate.py` → neue Action-Datei (LLM-Call mit aggregierten Daten)
- [x] `methodAi/methodAi.py` → Action `consolidate` registrieren
- [x] `ai.consolidate` wird über bestehenden `ActionNodeExecutor` geroutet — kein eigener Executor nötig
- [x] `data.filter` → UDM-Content-Type-Presets (Filter auf `contentType`, `index`, `attributes`)
- [x] `flow.loop``level`-Parameter (`nodeDefinitions/flow.py`)
- [x] `flow.loop``concurrency`-Parameter (`nodeDefinitions/flow.py`)
- [x] `flow.merge` → dynamische Input-Anzahl `inputCount` (2-5)
- [x] `FlowExecutor._loop()` → UDM-Level-Auflösung (`flowExecutor.py`)
- [x] `UdmNodeList` Port-Typ (`portTypes.py`)
- [x] Frontend: `DataConsolidateNodeConfig.tsx` (deterministisch)
- [x] Frontend: `AiConsolidateNodeConfig.tsx` (AI-gestützt)
- [x] Frontend: Loop-NodeConfig → UDM-Level-Selector + Concurrency-Slider
- [x] Frontend: Merge-NodeConfig → Input-Count-Selector
- [x] Frontend: Visuelle Loop-Body-Markierung im Canvas
### Phase 5: High-Volume Engine-Optimierungen (Gateway)
> **Cursor-Empfehlung:** **Opus 4.6 empfohlen.** Async-Concurrency, Thread-Safety, Streaming-Aggregates — subtile Bugs möglich. Load-Test-Design braucht Überblick über das gesamte System.
- [x] `executionEngine.py` → Loop-Concurrency via `asyncio.Semaphore` mit isolierten `nodeOutputs` pro Iteration
- [x] `executionEngine.py` → StepLog-Batching (Schwellwert 100 Iterationen: nur Summary + Fehler loggen)
- [x] `executionEngine.py` → Streaming-Aggregate (Flush-Threshold, temp-Storage)
- [x] `executionEngine.py` → Progress-SSE bei High-Volume: Summary statt pro-Step-Events
- [x] `extractorContainer.py``ExtractorRegistry` Singleton (Performance-Fix)
- [x] `extractorPdf.py``BytesIO` direkt an fitz übergeben statt `getvalue()` (Memory-Fix)
- [x] Lazy-Content-Loader: Utility-Funktion die `fileRef` → Bytes auflöst, on-demand im Loop
- [x] Load-Test: 1000 PDFs aus ZIP → Extract → AI Vision → CSV → Consolidate
### Phase 6: AI-Badge — Visuelle Kennzeichnung im Editor (Gateway + Frontend)
> **Cursor-Empfehlung:** Composer (Fast) reicht. Kleines `meta.usesAi`-Feld pro Node-Definition (Backend) + Badge-Komponente im Canvas (Frontend). Klar abgegrenzte Änderung.
- [x] Alle bestehenden Node-Definitionen: `meta.usesAi` Feld ergänzen (`true` / `false`)
- [x] `FlowCanvas.tsx` → AI-Badge-Rendering: kleines "AI"-Label / Blitz-Icon oben rechts am Node wenn `usesAi: true`
- [x] `NodeSidebar.tsx` → AI-Kennzeichnung in der Palette (Badge neben Node-Name)
- [x] Tooltip auf Badge: "Dieser Schritt nutzt AI und verbraucht Credits"
- [x] CSS/Styling: Badge-Design konsistent mit bestehendem Design-System
### Phase 7: Agent-Tools & Integration (Gateway)
> **Cursor-Empfehlung:** Composer (Fast) reicht für die Tool-Funktionen. **Opus 4.6 für E2E-Tests** — die Tests müssen viele Module korrekt zusammenspielen lassen.
- [x] `_documentTools.py` → UDM-Tools (`walkUdmBlocks`, `filterUdmByType`, `getUdmStructure`)
- [x] E2E-Test: PDF → Extract (UDM) → Loop (pro Seite) → AI → Aggregate → Consolidate
- [x] E2E-Test: PDF → Extract → Filter (nur Tabellen) → AI → kein Loop
- [x] E2E-Test: ZIP → Extract (references) → Loop (Fan-out) → Vision + Text → Merge → Aggregate → Consolidate
### Phase 8: Dokumentation & Abschluss
> **Cursor-Empfehlung:** Composer (Fast) reicht. Reine Dokumentation und Verifikation.
- [x] RBAC / Permissions: Keine Änderung nötig
- [x] Neutralisierung: ContentBlock.raw mit sensiblen Daten → Bridge zu ContentPart
- [x] Navigation / Routing: Keine Änderung
- [x] Billing-Impact: Loop-Iterationen zählen als einzelne AI-Calls → bestehende Logik greift
---
## Akzeptanzkriterien
| # | Kriterium (Given-When-Then) | Prio |
|---|---------------------------|------|
| 1 | Given ein PDF mit 5 Seiten, When `context.extractContent`-Node ausgeführt, Then UdmDocument mit 5 StructuralNodes (role=page) — **kein AI-Call**, nur Parsing | must |
| 2 | Given der Extract-Node-Output (UDM), When mit `data.filter` auf `contentType=="table"` gefiltert, Then nur Tabellen-ContentBlocks im Output | must |
| 3 | Given ein DOCX mit 3 Sections, When UDM-Extraktion, Then 3 StructuralNodes mit role=section und korrekten Labels | must |
| 4 | Given ein UDM-Dokument, When `_udmToContentParts()`, Then identische ContentPart-Liste wie bei direkter Extraktion (Bridge) | must |
| 5 | Given ein Workflow mit Loop über `document.children`, When ausgeführt, Then Body-Nodes pro StructuralNode einmal durchlaufen | must |
| 6 | Given ForEach → AI → Aggregate → Consolidate, When 3 Seiten verarbeitet, Then Consolidate erhält 3 Ergebnisse und erzeugt Zusammenfassung | must |
| 7 | Given Loop mit Fan-out zu 2 AI-Nodes + Merge, When pro Seite ausgeführt, Then Merge erhält beide AI-Ergebnisse pro Iteration | must |
| 8 | Given `outputDetail=references` auf ZIP mit 1000 PDFs, When Extract ausgeführt, Then Referenzliste mit 1000 Einträgen, Memory < 100 MB | must |
| 9 | Given Loop mit `concurrency: 5` über 100 Items, When ausgeführt, Then ~5x schneller als `concurrency: 1` | should |
| 10 | Given Loop mit 10.000 Iterationen, When ausgeführt, Then weniger als 1000 AutoStepLog-Einträge (Batching) | should |
| 11 | Given `context.extractContent` mit `outputDetail=structure`, When ausgeführt, Then nur Skelett ohne `raw`-Daten | should |
| 12 | Given Loop-Workflow im Editor, When User den Graph betrachtet, Then Loop-Body-Nodes visuell als zusammengehörig erkennbar | should |
| 13 | Given `data.consolidate` mit mode=table, When 100 CSV-Zeilen als Input, Then ein zusammengefügtes CSV als Output | should |
| 14 | Given ZIP mit 10.000 PDFs, When E2E-Workflow (Extract refs Loop AI Vision Seite 1 CSV), Then erfolgreich in < 24h (mit concurrency: 10) | nice |
| 15 | Given ZIP mit 3 PDFs, When UDM-Extraktion, Then UdmArchive mit 3 UdmDocuments | nice |
| 16 | Given ein Workflow mit `ai.prompt` und `data.filter` Nodes, When User den Graph betrachtet, Then AI-Badge nur auf `ai.prompt` sichtbar, nicht auf `data.filter` | must |
| 17 | Given `data.consolidate` und `ai.consolidate` nebeneinander im Editor, When User vergleicht, Then nur `ai.consolidate` hat AI-Badge, `data.consolidate` nicht | must |
---
## Testplan
*Stand beim Archivieren: Kerntests liegen in den unter „Abschlussbericht“ genannten Dateien. Die ursprünglich geplanten Einzelpfade (T2, T3, T6T17) können bei Bedarf als separates Backlog ergänzt werden.*
| ID | AC | Art | Automatisiert | Repo-Pfad | Status |
|----|----|-----|--------------|-----------|--------|
| T1 | 1 | unit | ja | gateway/tests/unit/datamodels/test_udm_models.py | done |
| T2 | 1 | integration | ja | gateway/tests/integration/extraction/test_extract_udm_pipeline.py (statt urspr. `test_extract_node.py`) | done |
| T3 | 2 | integration | ja | (Backlog: dedizierter Filter-Flow) | backlog |
| T4 | 3 | unit | ja | gateway/tests/unit/datamodels/test_udm_models.py | done |
| T5 | 4 | unit | ja | gateway/tests/unit/datamodels/test_udm_bridge.py | done |
| T6 | 1,3 | integration | ja | gateway/tests/integration/extraction/test_extract_udm_pipeline.py | done |
| T7 | 5 | integration | ja | (Backlog) | backlog |
| T8 | 6 | integration | ja | gateway/tests/integration/workflows/test_execute_graph_loop_aggregate_consolidate.py | done |
| T9 | 7 | integration | ja | (Backlog) | backlog |
| T10 | 8 | integration | ja | (Backlog) | backlog |
| T11 | 9 | integration | ja | (Backlog) | backlog |
| T12 | 10 | unit | ja | (Backlog) | backlog |
| T13 | 11 | unit | ja | (Backlog) | backlog |
| T14 | 12 | manual | nein | | backlog |
| T15 | 13 | integration | ja | (Backlog) | backlog |
| T16 | 14 | load | nein | (manueller Load-Test) | backlog |
| T17 | 15 | integration | ja | (Backlog) | backlog |
| T18 | 16 | unit | ja | gateway/tests/unit/nodeDefinitions/test_usesai_flag.py | done |
| T19 | 17 | manual | nein | (visueller Check im Editor) | backlog |
---
## Links
- Idee: `c-work/0-ideas/unified-document-model.md`
- Workflow-Engine Referenz: `b-reference/gateway/workflow.md`
- AI-Agent Referenz: `b-reference/gateway/ai-agent.md`
- Port-System Referenz: `c-work/4-done/2026-04-generic-graph-editor.md`
---
## Abschluss
- [x] b-reference/ aktualisiert: `gateway/workflow.md` (neue Nodes, UDM-Loop-Pattern, Concurrency), `gateway/ai-agent.md` (neue UDM-Tools)
- [x] TOPICS.md aktualisiert (neues Thema: UDM)
- [x] Dieses Dokument `wiki/z-archive/2026-04-unified-document-model.md` archiviert