19 KiB
Analyse: Typed Node Handover System für den Graphical Editor
1 Problemstellung
Der Graphical Editor besitzt heute keine formale Typisierung der Daten, die zwischen Nodes fließen. Jede Node produziert ein freiformiges Dict; die Folge-Node versucht heuristisch, daraus das Passende zu extrahieren. Das führt zu:
| Symptom | Wo sichtbar |
|---|---|
Spezialcode pro Paar (z.B. _extractEmailContentFromUpstream, _getContextFromUpstream, _gatherAttachmentDocumentsFromUpstream) |
actionNodeExecutor.py (~860 Zeilen, >50 % handover Heuristik) |
DataPicker zeigt hartcodierte Beispiel-Schemas (outputPreviewRegistry.ts) — stimmt oft nicht mit echten Laufzeit-Outputs überein |
Frontend outputPreviewRegistry.ts |
Kein data.transform, data.filter, data.aggregate implementiert — nur in automation.md als Placeholder aufgelistet |
nodeDefinitions/ hat keine Datei dafür |
Kein „Aggregate" (Gegenstück zu flow.loop's For-Each) |
Engine sammelt Loop-Body-Resultate nicht auf |
Frontend-Parameter haben eigene Typdefinitionen (node-level type: "string") statt der in MethodBase etablierten WorkflowActionParameter mit FrontendType-Enum + Validierung |
ai.py, email.py etc. vs. methodBase.py |
Kern: Die Method/Action-Schicht hat bereits eine saubere Parameter-Typisierung (WorkflowActionParameter, _validateType, _validateParameters, FrontendType). Aber der Graph-Editor nutzt sie weder für Input-Konfiguration noch für die Handover-Logik.
2 Ist-Zustand (Layer für Layer)
2.1 Node-Definitionen (nodeDefinitions/*.py)
Jede Node ist ein Dict mit:
{
"id": "email.draftEmail",
"parameters": [
{"name": "subject", "type": "string", "required": True, ...},
],
"inputs": 1, "outputs": 1,
"_method": "outlook", "_action": "composeAndDraftEmailWithContext",
"_paramMap": {"connectionId": "connectionReference", ...},
}
Was fehlt:
- Kein
outputSchema— was liefert die Node zurück? - Kein
inputSchema— was erwartet die Node am Eingang? type: "string"ist Node-Editor-intern, nicht identisch mitWorkflowActionParameter.type(str,int,List[str], …).- Kein
frontendType(d.h. Frontend baut eigene Config-Components per Node-Typ statt generisch).
2.2 Execution Engine (executionEngine.py)
- Topologische Sortierung → iteriert Nodes → ruft Executor auf → speichert Output in
nodeOutputs[nodeId]. - Kein Output-Vertrag: der Executor liefert
Any, das Dict wird 1:1 innodeOutputsgelegt. - Folge-Nodes holen via
inputSources[nodeId][0]den Vorgänger-Output und hoffen, dass er das richtige Format hat.
2.3 ActionNodeExecutor (actionNodeExecutor.py)
- ~860 Zeilen, davon ~60 % dedizierte Merge-Logik für Paare:
email→AI,AI→email.draftEmail,file.createcontent-gathering, ClickUp merge, etc. - Aufruft am Ende
ActionExecutor.executeAction(method, action, params)— dessenMethodBase._validateParametersprüft die Action-Parameter, nicht den Graph-Port-Kontrakt. - Baut ein Output-Dict mit teilweise inkonsistenten Keys (
documentsvs.documentListvs.datavs.context).
2.4 DataPicker + outputPreviewRegistry (Frontend)
outputPreviewRegistry.tsregistriert statische Beispiele pro Node-Typ.- DataPicker zeigt daraus einen Baum und erzeugt
DataRef { type: "ref", nodeId, path }. resolveParameterReferences(Backend) löstDataRefgegen echtenodeOutputs.- Problem: Die Preview-Struktur ist manuell gepflegt und divergiert vom echten Output. Der User wählt Pfade, die zur Laufzeit nicht existieren oder anders heißen.
2.5 Method/Action Parameter-System (bereits existent)
WorkflowActionParameter + MethodBase._validateParameters:
class WorkflowActionParameter(BaseModel):
name: str
type: str # 'str', 'int', 'List[str]', 'Dict[str, Any]'
frontendType: FrontendType # TEXT, TEXTAREA, SELECT, USER_CONNECTION, …
frontendOptions: Optional[...]
required: bool
default: Optional[Any]
validation: Optional[Dict]
_validateType konvertiert + prüft: str→str, int→int, List[str]→[str,…], usw.
Dieses System funktioniert bereits für Workspace-Actions. Es wird aber nicht für Node-Ein-/Ausgänge genutzt.
3 Ziel-Architektur: Typed Port System
3.1 Kern-Konzept: Port-Schema
Jede Node deklariert typisierte Ports für Ein- und Ausgänge — analog zu WorkflowActionParameter, aber für den Graphen.
┌──────────────────────────────────────────────────────────────────────┐
│ Node Definition (Beispiel: email.draftEmail) │
│ │
│ inputPorts: │
│ [0]: { schema: EmailDraft } ← structured type │
│ EmailDraft = { subject: str, body: str, to: List[str] } │
│ │
│ outputPorts: │
│ [0]: { schema: ActionResult } │
│ ActionResult = { success: bool, error: str?, │
│ documents: List[Document] } │
│ │
│ parameters: (config, unabhängig von Ports) │
│ connectionId: { type: str, frontendType: USER_CONNECTION } │
└──────────────────────────────────────────────────────────────────────┘
3.2 Port-Schema Definition
class PortField(BaseModel):
name: str
type: str # str, int, bool, List[str], List[Document], Dict[str,Any], …
description: Dict[str, str] # {en, de, fr}
required: bool = True
class PortSchema(BaseModel):
fields: List[PortField]
class NodePortDefinition(BaseModel):
inputPorts: Dict[int, PortSchema] # port-index → schema
outputPorts: Dict[int, PortSchema] # port-index → schema
3.3 Einheitliche Output-Typen (Port-Typen-Katalog)
Statt freier Dicts gibt es benannte Schemas, die Node-übergreifend wiederverwendbar sind:
| Port-Typ | Felder | Produziert von | Konsumiert von |
|---|---|---|---|
DocumentList |
documents: List[{name, data, mimeType, metadata}] |
sharepoint.readFile, sharepoint.downloadFile, ai.*, file.create, input.upload | ai.*, file.create, email.draftEmail, sharepoint.uploadFile |
FileList |
files: List[{url, name, path, size}] |
sharepoint.listFiles, sharepoint.findFile | sharepoint.readFile, sharepoint.downloadFile, flow.loop |
EmailDraft |
subject: str, body: str, to: List[str], cc?: List[str], attachments?: DocumentList |
ai.prompt (mode=email) | email.draftEmail, email.sendEmail |
EmailList |
emails: List[{subject, from, to, body, date, attachments}] |
email.checkEmail, email.searchEmail | ai.prompt, flow.loop, flow.ifElse |
TaskList |
tasks: List[{id, name, status, url, …}] |
clickup.searchTasks, clickup.listTasks | flow.loop, clickup.updateTask |
TaskResult |
success: bool, taskId: str, task: {id, name, status} |
clickup.createTask, clickup.updateTask | flow.ifElse |
FormPayload |
payload: Dict[str, Any] (dynamisch, Keys = Feldnamen) |
trigger.form, input.form | Alle (via Referenz auf Einzelfelder) |
AiResult |
prompt: str, response: str, context: str, documents: List[Document] |
ai.* | email.draftEmail, file.create, sharepoint.uploadFile |
BoolResult |
result: bool, reason?: str |
input.approval, input.confirmation | flow.ifElse |
TextResult |
text: str |
input.comment | ai.*, file.create |
LoopItem |
currentItem: Any, currentIndex: int, items: List[Any], count: int |
flow.loop (pro Iteration) | Loop-Body Nodes |
AggregateResult |
items: List[Any], count: int |
data.aggregate (NEU) | Alle |
3.4 Schema auf Node-Definitions
Erweiterung der bestehenden Node-Definitions um outputPorts und inputPorts:
{
"id": "ai.prompt",
"category": "ai",
"parameters": [...], # Config — bleibt wie bisher
"inputs": 1,
"outputs": 1,
# NEU ─────────────────────────────────────────
"inputPorts": {
0: {"accepts": ["DocumentList", "TextResult", "FormPayload", "EmailList", "AiResult", "Any"]},
},
"outputPorts": {
0: {"schema": "AiResult"},
},
}
{
"id": "email.draftEmail",
"inputPorts": {
0: {"accepts": ["EmailDraft", "AiResult", "TextResult"]},
},
"outputPorts": {
0: {"schema": "ActionResult"},
},
}
3.5 Konsequenzen für bestehende Schichten
A) Node-Definitionen — einmalige Migration
Jede Node bekommt inputPorts + outputPorts. Bestehende inputs/outputs (Zählwerte) bleiben für Rückwärtskompatibilität, aber die Schemas werden die Quelle der Wahrheit.
B) Executors — Output normalisieren
Jeder Executor erhält eine _normalizeOutput(rawResult, portSchema) → Dict Funktion. Sie sorgt dafür, dass die Keys/Typen dem deklarierten Schema entsprechen. Für AiResult z.B.:
def _normalizeAiResult(raw: Any) -> Dict:
return {
"prompt": raw.get("prompt", ""),
"response": raw.get("response", raw.get("context", "")),
"context": raw.get("context", ""),
"documents": _ensureDocumentList(raw.get("documents", [])),
}
Der Spezialcode in actionNodeExecutor.py (~400 Zeilen Paar-Heuristik) fällt dann weg und wird durch generische Normalizer pro Port-Typ ersetzt.
C) ExecutionEngine — Handover-Validierung
Nach jedem Node-Execute: nodeOutputs[nodeId] wird gegen outputPorts[0].schema geprüft (Warnung oder Fehler bei Mismatch). Vor jedem Node-Execute: Input-Port-Kompatibilitäts-Check (accepts enthält den Schema-Typ des Vorgänger-Outputs).
D) Frontend — DataPicker aus Schema generieren
outputPreviewRegistry.ts wird ersetzt durch eine generische Funktion, die aus dem PortSchema den Preview-Baum baut. Kein manuelles Pflegen mehr:
function buildPreviewFromSchema(schema: PortSchema): Record<string, unknown> {
// Iteriert schema.fields → Beispielwert pro Typ
}
E) Frontend — Generisches Parameter-Rendering
Statt pro Node-Typ eine eigene Config-Component zu schreiben (aktuell 12+ Dateien), können die meisten Parameter-Felder generisch aus dem WorkflowActionParameter-Schema gerendert werden. Spezial-UIs (FormBuilder, ClickUp Browse, etc.) bleiben als Override.
F) Verbindungs-Validierung im Editor
Beim Ziehen einer Kante: prüfe sourceNode.outputPorts[outputIdx].schema ∈ targetNode.inputPorts[inputIdx].accepts. Falls inkompatibel: Kante rot markieren / verhindern.
4 Fehlende Nodes / Konzepte
4.1 data.aggregate (Gegenstück zu flow.loop)
Loop verteilt Items → Body → aber am Ende gibt es keinen Collector. Heute liegt nur das letzte Body-Ergebnis in nodeOutputs.
Lösung: data.aggregate-Node
{
"id": "data.aggregate",
"category": "data",
"label": {"en": "Aggregate", "de": "Sammeln", "fr": "Agréger"},
"description": {"en": "Collect results from loop body into a list", ...},
"parameters": [
{"name": "mode", "type": "string", "options": ["collect", "concat", "sum", "count"], "default": "collect"},
],
"inputs": 1, "outputs": 1,
"inputPorts": {0: {"accepts": ["Any"]}},
"outputPorts": {0: {"schema": "AggregateResult"}},
"executor": "data",
}
Engine-Änderung: Im Loop-Body erkennt die Engine eine data.aggregate-Node und sammelt pro Iteration das Ergebnis in einer Liste. Nach Loop-Ende: nodeOutputs[aggregateNodeId] = { items: [...], count: N }.
4.2 data.transform
Reiner Mapping-Node: Felder umbenennen, extrahieren, umstrukturieren. Konfiguration per Key-Value-Mapping oder einfachem Expression-Sprache.
{
"id": "data.transform",
"category": "data",
"parameters": [
{"name": "mapping", "type": "json", "description": "Key-Value Mapping: {outputField: inputRef}"},
],
"inputPorts": {0: {"accepts": ["Any"]}},
"outputPorts": {0: {"schema": "Dict"}},
}
4.3 data.filter
Filtert Items einer Liste nach Bedingung (wie WHERE in SQL). Eingabe: List, Ausgabe: gefilterter List.
{
"id": "data.filter",
"category": "data",
"parameters": [
{"name": "condition", "type": "string", "description": "Filter expression (z.B. item.status == 'open')"},
],
"inputPorts": {0: {"accepts": ["AggregateResult", "FileList", "TaskList", "EmailList"]}},
"outputPorts": {0: {"schema": "AggregateResult"}},
}
4.4 flow.merge
Heute in automation.md gelistet, aber nicht implementiert. Wartet auf N Eingänge und kombiniert sie.
5 Migrations-Strategie
Phase 1: Port-Schema-Deklaration (Backend, kein Breaking Change)
PortSchemaals Pydantic-Modell definieren.- Katalog der Port-Typen (
DocumentList,AiResult,EmailDraft, …) anlegen. - Jede Node-Definition in
nodeDefinitions/*.pyuminputPorts/outputPortserweitern. - API
GET /node-typesliefert die Schemas mit.
Phase 2: Output-Normalizer (Backend)
- Pro Port-Typ einen Normalizer schreiben (10–15 Funktionen).
- In
executionEngine.py: nach jedem Execute_normalizeOutput(result, schema)aufrufen. - Spezialcode in
actionNodeExecutor.pyschrittweise durch Normalizer ersetzen. - Step-Logs mit normalisiertem Output (Debugging/Tracing sofort besser).
Phase 3: Frontend — DataPicker aus Schema (Frontend)
outputPreviewRegistry.tsdurch Schema-basierte Preview-Generierung ersetzen.- DataPicker zeigt korrekte Felder + Typen an.
- Verbindungs-Validierung (rote Kanten bei Typ-Mismatch).
Phase 4: Generisches Parameter-Rendering (Frontend)
NodeConfigPanelrendert Parameter generisch ausWorkflowActionParameter-Schema.- Spezial-Components (
FormNodeConfig,ClickUpNodeConfig, …) bleiben als Override für komplexe UIs. - Neue Nodes brauchen keine eigene Config-Component mehr.
Phase 5: Neue Data-Nodes
data.aggregateimplementieren (Engine + Node-Definition + Frontend).data.transformimplementieren.data.filterimplementieren.flow.mergeimplementieren.
6 Konkretes Beispiel: AI-Mail-Entwurf → Mail-Versand
Heute (heuristisch)
[email.checkEmail] ──→ [ai.prompt] ──→ [email.draftEmail]
↓ ↓ ↓
freeform Dict freeform Dict ~100 Zeilen Spezialcode
(data.emails…) (context, docs) in actionNodeExecutor:
_extractEmailContentFromUpstream
_getIncomingEmailFromUpstream
_unpackIncomingEmail
_gatherAttachmentDocumentsFromUpstream
Ziel (typisiert)
[email.checkEmail] [ai.prompt] [email.draftEmail]
outputPort[0]: outputPort[0]: inputPort[0]:
schema: EmailList schema: AiResult accepts: [EmailDraft, AiResult]
──────────────────→ ──────────────────→ Normalizer kennt AiResult und
Input: EmailList Input: EmailList mapped response → body,
Engine prüft Kompatibilität Engine prüft kein Spezialcode nötig.
Entscheidend: Der AI-Node kann so konfiguriert werden, dass sein Output-Typ EmailDraft statt AiResult ist (z.B. via Parameter outputMode: "emailDraft" oder via Prompt-Instruktion + structured output). Dann ist der Downstream-Merge trivial.
7 Offene Fragen / Entscheidungen
| # | Frage | Optionen |
|---|---|---|
| 1 | Soll Typ-Mismatch eine Kante verhindern (hard) oder nur warnen (soft)? | Empfehlung: soft (Warnung + gelbe Kante). Hard-Block wäre zu restriktiv bei generischen Nodes. |
| 2 | Port-Typ Any erlauben? |
Ja, als Fallback. Nodes wie flow.ifElse leiten den Input transparent durch. |
| 3 | Dynamische Schemas (z.B. input.form Payload hängt von Felddefinition ab)? |
Ja. outputPorts kann eine Funktion referenzieren, die das Schema aus den parameters ableitet (wie heute outputPreviewRegistry). |
| 4 | Sollen bestehende Workflows beim Upgrade automatisch migriert werden? | Nein. Alte Workflows laufen weiter (Normalizer fängt fehlende Keys ab). Neue Workflows profitieren von Validierung. |
| 5 | Separater data.aggregate-Node oder implizit in flow.loop? |
Empfehlung: separater Node. Explizit ist klarer; der Loop-Node bleibt rein für Iteration. |
| 6 | Braucht es einen data.join-Node (merge zwei Listen)? |
Später. Erst aggregate + filter + transform etablieren. |
8 Aufwand-Schätzung
| Phase | Geschätzter Aufwand | Abhängigkeiten |
|---|---|---|
| 1 — Port-Schema-Deklaration | 2–3 Tage | Keine |
| 2 — Output-Normalizer | 3–5 Tage | Phase 1 |
| 3 — Frontend DataPicker aus Schema | 2–3 Tage | Phase 1 |
| 4 — Generisches Parameter-Rendering | 3–4 Tage | Phase 1 |
| 5 — Data Nodes (aggregate, transform, filter) | 3–5 Tage | Phase 2 |
| Gesamt | ~13–20 Tage | Phasen 1–3 sind der kritische Pfad |
9 Schlüssel-Dateien
| Bereich | Pfade |
|---|---|
| Node-Definitionen | gateway/modules/features/graphicalEditor/nodeDefinitions/*.py |
| Execution Engine | gateway/modules/workflows/automation2/executionEngine.py |
| ActionNodeExecutor (Handover-Heuristik) | gateway/modules/workflows/automation2/executors/actionNodeExecutor.py |
| Graph-Utilities | gateway/modules/workflows/automation2/graphUtils.py |
| Method/Action Typsystem | gateway/modules/workflows/methods/methodBase.py, gateway/modules/datamodels/datamodelWorkflowActions.py |
| FrontendType Enum | gateway/modules/shared/frontendTypes.py |
| Output-Preview (Frontend) | frontend_nyla/src/components/FlowEditor/nodes/shared/outputPreviewRegistry.ts |
| DataPicker (Frontend) | frontend_nyla/src/components/FlowEditor/nodes/shared/DataPicker.tsx |
| DataRef/DynamicValue (Frontend) | frontend_nyla/src/components/FlowEditor/nodes/shared/dataRef.ts |
| Node Config Registry (Frontend) | frontend_nyla/src/components/FlowEditor/nodes/configs/index.ts |
| DataFlow Context (Frontend) | frontend_nyla/src/components/FlowEditor/context/Automation2DataFlowContext.tsx |