# PowerOn Automation Unification — Datenmodell & Architektur **Version:** 2.0 **Datum:** 2026-04-05 **Referenz:** [Automation Business Spec](./automation-business-spec.md) --- ## 1. Plattform-Architektur: Schichten ``` ┌──────────────────────────────────────────────────────────────────┐ │ FRONTEND (React/TS) │ │ │ │ Shared Components: ChatBar, ChatStream, UnifiedDataBar, │ │ FlowEditor, FormGenerator, FolderTree │ │ │ │ Feature Pages: WorkspacePage, GraphicalEditorPage, │ │ CommcoachDossierView, ChatbotView, ... │ └──────────────────────────────┬────────────────────────────────────┘ │ HTTP/SSE/WebSocket ┌──────────────────────────────▼────────────────────────────────────┐ │ GATEWAY (FastAPI) │ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ Layer 1: FEATURES (Feature-Instanz-gebunden) │ │ │ │ │ │ │ │ features/workspace/ features/graphicalEditor/ │ │ │ │ features/commcoach/ features/chatbot/ │ │ │ │ features/trustee/ features/realEstate/ │ │ │ │ features/neutralization/ features/teamsbot/ │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ nutzt │ │ ┌──────────────────────────▼──────────────────────────────────┐ │ │ │ Layer 2: SERVICES (Feature-unabhängig, pro Request) │ │ │ │ │ │ │ │ serviceAgent serviceAi serviceKnowledge serviceBilling │ │ │ │ serviceChat serviceExtraction serviceGeneration │ │ │ │ serviceMessaging serviceWeb serviceSubscription │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ nutzt │ │ ┌──────────────────────────▼──────────────────────────────────┐ │ │ │ Layer 3: SHARED INFRASTRUCTURE │ │ │ │ │ │ │ │ workflows/methods/ (Unified Action Library / Toolboxes)│ │ │ │ workflows/processing/ (ActionExecutor, methodDiscovery) │ │ │ │ workflows/automation2/ (Graph Execution Engine) │ │ │ │ interfaces/ (DB-Abstraktion + RBAC) │ │ │ │ aicore/ (Provider Plugins + Model Selector) │ │ │ │ datamodels/ (Pydantic Models) │ │ │ │ security/ (RBAC Engine) │ │ │ │ shared/ (Utilities, eventManagement) │ │ │ └─────────────────────────────────────────────────────────────┘ │ └───────────────────────────────────────────────────────────────────┘ ``` --- ## 2. Feature-Container-Pattern (Ist-Zustand, bewährt) ### 2.1 Verzeichnisstruktur ``` features// ├── main.py ◄── Feature-Definition + Registrierung │ FEATURE_CODE: str Feature-Identifikator │ UI_OBJECTS: List[Dict] RBAC-fähige UI-Objekte │ RESOURCE_OBJECTS: List[Dict] RBAC-fähige Resource-Objekte │ TEMPLATE_ROLES: List[Dict] Rollen-Templates mit AccessRules │ REQUIRED_SERVICES: List[Dict] Service-Dependencies (optional) │ registerFeature(catalog) → bool Registriert Objekte, synct Rollen in DB │ getFeatureDefinition() → Dict Katalog-Metadaten (Label, Icon) │ getServices(user, ...) → Hub Service-Hub-Factory (optional) │ ├── routeFeature.py ◄── HTTP API │ router = APIRouter(prefix="/api/") │ [templateRouter] = APIRouter(...) Zusätzliche Router (optional) │ ├── interfaceFeature.py ◄── Datenbankzugriff │ Objects CRUD + RBAC-Filterung │ getInterface(user, mandateId, featureInstanceId) → Singleton │ ├── datamodelFeature.py ◄── Pydantic-Datenmodelle │ class (PowerOnModel): ... Feature-spezifische Entitäten │ ├── nodeDefinitions/ (optional) ◄── Graph-Node-Typen (für graphicalEditor) │ triggers.py, flow.py, ai.py, ... │ └── service/ (optional) ◄── Feature-interne Services └── mainService.py ``` ### 2.2 Automatische Discovery ```python # system/registry.py — Kein manuelles Wiring nötig discoverFeatureContainers() # Scannt features/*/routeFeature*.py → Liste von Feature-Dirs loadFeatureRouters(app) # Importiert routeFeature*.py, mountet router + *Router loadFeatureMainModules() # Importiert main*.py, cached registerAllFeaturesInCatalog(catalogService) # Für jedes main-Module: # getFeatureDefinition() → catalogService.registerFeatureDefinition() # registerFeature() → UI/Resource-Objekte + Template-Rollen sync ``` ### 2.3 Service-Hub-Wiring ```python # serviceHub/__init__.py — Dynamisches Service-Wiring class ServiceHub: def __init__(self, user, workflow, mandateId, featureInstanceId): # ServiceCenterContext erstellen self._serviceCenterContext = ServiceCenterContext(...) # Core Interfaces laden self.interfaceDbApp = getAppInterface(user, mandateId) self.interfaceDbChat = getChatInterface(user, mandateId, featureInstanceId) # Feature-Interfaces dynamisch laden (glob features/*/interfaceFeature*.py) self._loadFeatureInterfaces() # Feature-Services dynamisch laden (glob features/*/service*/mainService*.py) self._loadFeatureServices() def __getattr__(self, name): # Lazy: shared services via getService(name, context) bei Erstzugriff ``` --- ## 3. RBAC-Architektur ### 3.1 Zwei getrennte Template-Rollen-Systeme **WICHTIG:** Es gibt zwei getrennte Template-Systeme — diese sind **nicht** dieselben: ``` ┌────────────────────────────────────────────────────────────────────┐ │ TEMPLATE-ROLLEN (Global) │ │ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ System Templates (für Mandanten) Priority 1 │ │ │ │ isSystemRole=true, mandateId=null, featureCode=null │ │ │ │ │ │ │ │ "admin" → Mandant-Administration │ │ │ │ "user" → Standard-User │ │ │ │ "viewer" → Nur-Lesen │ │ │ │ │ │ │ │ → kopiert bei Mandant-Erstellung (copySystemRolesToMandate)│ │ │ └──────────────────────────────────────────────────────────┘ │ │ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Feature Templates (für Feature-Instanzen) Priority 1 │ │ │ │ isSystemRole=false, mandateId=null, featureCode="workspace"│ │ │ │ │ │ │ │ "workspace-admin" → alle UI/RESOURCE/DATA Rechte │ │ │ │ "workspace-user" → eingeschränkte DATA Rechte │ │ │ │ "workspace-viewer" → nur View │ │ │ │ │ │ │ │ → kopiert bei Feature-Instanz-Erstellung (_copyTemplateRoles)│ │ │ └──────────────────────────────────────────────────────────┘ │ └────────────────────────────────────────────────────────────────────┘ ┌────────────────────────────────────────────────────────────────────┐ │ MANDATE-ROLLEN Priority 2 │ │ mandateId=X, featureInstanceId=null, featureCode=null │ │ │ │ "admin" (kopiert von System Template) │ │ "user" (kopiert von System Template) │ │ "viewer" (kopiert von System Template) │ │ → zugewiesen via UserMandate → UserMandateRole │ └────────────────────────────────────────────────────────────────────┘ ┌────────────────────────────────────────────────────────────────────┐ │ INSTANZ-ROLLEN Priority 3 │ │ mandateId=X, featureInstanceId=Y, featureCode="workspace" │ │ │ │ "workspace-admin" (kopiert von Feature Template) │ │ "workspace-user" (kopiert von Feature Template) │ │ "workspace-viewer" (kopiert von Feature Template) │ │ → zugewiesen via FeatureAccess → FeatureAccessRole │ └────────────────────────────────────────────────────────────────────┘ ``` ### 3.2 Datenmodell RBAC ```python class Role(PowerOnModel): # Scope bestimmt die Priorität: mandateId: Optional[str] # null = Global/Template featureInstanceId: Optional[str] # null = nicht Instanz-spezifisch featureCode: Optional[str] # z.B. "workspace", "graphicalEditor" roleLabel: str # z.B. "workspace-admin" isSystemRole: bool # System-Rollen nicht löschbar class AccessRule(PowerOnModel): roleId: str # FK → Role context: AccessRuleContext # DATA | UI | RESOURCE item: Optional[str] # null=generic, oder spezifisch view: bool read: Optional[AccessLevel] # "n" | "o" | "m" | "a" create: Optional[AccessLevel] update: Optional[AccessLevel] delete: Optional[AccessLevel] class AccessLevel(str, Enum): NONE = "n" # kein Zugriff OWN = "o" # nur eigene Records MANDATE = "m" # alle Records im Mandant ALL = "a" # alle Records (System-Admin) ``` ### 3.3 User-Zuweisung ``` User ──► UserMandate ──► UserMandateRole ──► Role (Mandate-Scope) User ──► FeatureAccess ──► FeatureAccessRole ──► Role (Instanz-Scope) ``` ```python class UserMandate(PowerOnModel): userId: str mandateId: str class UserMandateRole(PowerOnModel): userMandateId: str # FK → UserMandate roleId: str # FK → Role (mandateId=X, featureInstanceId=null) class FeatureAccess(PowerOnModel): userId: str featureInstanceId: str enabled: bool class FeatureAccessRole(PowerOnModel): featureAccessId: str # FK → FeatureAccess roleId: str # FK → Role (mandateId=X, featureInstanceId=Y) ``` ### 3.4 Resolution-Algorithmus ``` 1. Sammle roleIds: - Mandate-Rollen: UserMandate → UserMandateRole → roleId - Instanz-Rollen: FeatureAccess → FeatureAccessRole → roleId 2. Lade AccessRules für alle roleIds (context + item filter) 3. Bestimme Priorität pro Rolle: - Priority 3: role.featureInstanceId gesetzt (Instanz) - Priority 2: role.mandateId gesetzt (Mandant) - Priority 1: beides null (Global/Template) 4. DATA-Permissions: Nur Rules der HÖCHSTEN Priorität zählen View: OR über alle Rules der höchsten Priorität Item-Spezifität: exact > prefix > generic (innerhalb Priorität) 5. SQL WHERE Clause: buildRbacWhereClause(permissions, user, table) ``` --- ## 4. Unified Workflow Datenmodell ### 4.1 Entitäten ```python # Ziel: datamodels/datamodelWorkflowUnified.py oder # features/graphicalEditor/datamodelFeatureGraphicalEditor.py class WorkflowStatus(str, Enum): DRAFT = "draft" PUBLISHED = "published" ARCHIVED = "archived" class Workflow(PowerOnModel): """Workflow-Metadaten und Ownership.""" id: str mandateId: str featureInstanceId: str label: str description: Optional[str] tags: List[str] = [] isTemplate: bool = False templateSourceId: Optional[str] # geklont von diesem Template currentVersionId: Optional[str] # aktive/published Version active: bool = True # Scheduler-enabled eventId: Optional[str] # APScheduler Job-ID (v1-Pattern) class WorkflowVersion(PowerOnModel): """Immutable Snapshot eines Workflow-Graphen.""" id: str workflowId: str # FK → Workflow versionNumber: int # auto-increment status: WorkflowStatus # draft | published | archived graph: Dict[str, Any] # { nodes: [...], connections: [...] } invocations: List[Dict[str, Any]] # Entry-Points (manual, schedule, webhook, form) publishedAt: Optional[float] publishedBy: Optional[str] ``` ### 4.2 State Machine: WorkflowVersion.status ``` ┌───────┐ create ───►│ DRAFT │◄─── edit (graph mutation via API oder AI Tools) └───┬───┘ │ publish ▼ ┌───────────┐ unpublish─►│ PUBLISHED │ (→ DRAFT) └─────┬─────┘ │ archive ▼ ┌───────────┐ │ ARCHIVED │──► re-publish (→ PUBLISHED) └───────────┘ Invariante: Pro Workflow maximal 1 Version mit status=PUBLISHED. Scheduler nutzt immer die PUBLISHED Version. ``` ### 4.3 Run-Modell ```python class RunStatus(str, Enum): PENDING = "pending" RUNNING = "running" PAUSED = "paused" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class WorkflowRun(PowerOnModel): """Einzelne Ausführung einer WorkflowVersion.""" id: str workflowId: str # FK → Workflow versionId: str # FK → WorkflowVersion status: RunStatus trigger: Dict[str, Any] # { type: "manual"|"schedule"|"webhook"|..., metadata } startedAt: float completedAt: Optional[float] nodeOutputs: Dict[str, Any] # Outputs pro Node-ID currentNodeId: Optional[str] # Paused bei diesem Node resumeContext: Dict[str, Any] # Kontext für Resume error: Optional[str] # Top-Level Fehler costTokens: Optional[int] # Aggregierte Token-Kosten costCredits: Optional[float] # Aggregierte Credit-Kosten ``` ### 4.4 State Machine: WorkflowRun.status ``` ┌─────────┐ executeGraph──►│ RUNNING │ └────┬────┘ │ ┌────────────────┼────────────────┐ │ │ │ ▼ ▼ ▼ ┌────────┐ ┌──────────┐ ┌────────┐ │ PAUSED │ │COMPLETED │ │ FAILED │ └───┬────┘ └──────────┘ └────────┘ │ │ resume(taskResult | emailReceived) ▼ ┌─────────┐ │ RUNNING │───► COMPLETED | FAILED | PAUSED | CANCELLED └─────────┘ Pause-Gründe: - input.* Node → HumanTask erstellt - email.checkEmail → EmailWait (Background Poller) Cancel: - Manuell durch User - Timeout bei HumanTask (expiresAt) ``` ### 4.5 RunStepLog und HumanTask ```python class StepStatus(str, Enum): RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" class RunStepLog(PowerOnModel): """Detaillierter Log pro Node-Execution.""" id: str runId: str # FK → WorkflowRun nodeId: str # Node-ID im Graph nodeType: str # z.B. "ai.prompt", "email.checkEmail" status: StepStatus inputSnapshot: Dict[str, Any] # Parameters + Upstream-Daten bei Execution output: Optional[Dict[str, Any]] # Node-Output error: Optional[str] startedAt: float completedAt: Optional[float] durationMs: Optional[int] tokensUsed: Optional[int] # AI-Calls in diesem Step retryCount: int = 0 class TaskStatus(str, Enum): PENDING = "pending" COMPLETED = "completed" CANCELLED = "cancelled" EXPIRED = "expired" class HumanTask(PowerOnModel): """Aufgabe für menschliche Eingabe bei Pause.""" id: str runId: str # FK → WorkflowRun workflowId: str # FK → Workflow (Convenience-FK) nodeId: str nodeType: str # input.approval, input.form, etc. config: Dict[str, Any] # Node-Parameter (Formular-Felder, Approval-Text) assigneeId: str status: TaskStatus result: Optional[Dict[str, Any]] expiresAt: Optional[float] # Timeout ``` ### 4.6 State Machine: HumanTask.status ``` ┌─────────┐ │ PENDING │ (erstellt bei Run-Pause) └────┬────┘ │ ├── complete(result) → COMPLETED (Run wird resumed mit result) ├── cancel() → CANCELLED (Run wird cancelled) └── expiresAt passed → EXPIRED (Run wird cancelled oder Default) ``` ### 4.7 ER-Diagramm ``` ┌──────────────────┐ ┌──────────────────────┐ │ Workflow │ 1───N │ WorkflowVersion │ ├──────────────────┤ ├──────────────────────┤ │ id │ │ id │ │ mandateId │ │ workflowId (FK) │ │ featureInstanceId│ │ versionNumber │ │ label │ │ status │ │ description │ │ graph {} │ │ tags [] │ │ invocations [] │ │ isTemplate │ │ publishedAt │ │ currentVersionId ├───1──►│ publishedBy │ │ active │ └──────────┬───────────┘ │ eventId │ │ 1:N └──────────────────┘ ┌──────────▼───────────┐ │ WorkflowRun │ ├──────────────────────┤ │ id │ │ workflowId (FK) │ │ versionId (FK) │ │ status │ │ trigger {} │ │ startedAt │ │ completedAt │ │ nodeOutputs {} │ │ currentNodeId │ │ resumeContext {} │ │ error │ │ costTokens │ │ costCredits │ └──────────┬───────────┘ 1:N │ │ 1:N ┌─────────────┘ └──────────┐ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ RunStepLog │ │ HumanTask │ ├──────────────────┤ ├──────────────────┤ │ id │ │ id │ │ runId (FK) │ │ runId (FK) │ │ nodeId │ │ workflowId (FK) │ │ nodeType │ │ nodeId │ │ status │ │ nodeType │ │ inputSnapshot {} │ │ config {} │ │ output {} │ │ assigneeId │ │ error │ │ status │ │ startedAt │ │ result {} │ │ completedAt │ │ expiresAt │ │ durationMs │ └──────────────────┘ │ tokensUsed │ │ retryCount │ └──────────────────┘ ``` --- ## 5. AI-Tool-Architektur: Toolbox-Datenmodell ### 5.1 Toolbox-Definition ```python # Ziel: serviceCenter/services/serviceAgent/toolboxRegistry.py class ToolboxDefinition(BaseModel): """Definition einer thematischen Tool-Gruppe.""" id: str # z.B. "core", "email", "sharepoint", "workflow" label: Dict[str, str] # Multilingual: {"en": "Email", "de": "E-Mail"} description: str featureCode: Optional[str] # null = Feature-unabhängig, "trustee" = Feature-spezifisch tools: List[str] # Tool-Namen in dieser Toolbox isDefault: bool = False # Immer aktiv (z.B. "core") requiresConnection: Optional[str] # "outlook", "sharepoint" → auto-aktiviert bei Connection class ToolboxRegistry: """Verwaltet Toolboxes und deren Zuordnung zu Tools.""" _toolboxes: Dict[str, ToolboxDefinition] _toolRegistry: ToolRegistry # Referenz auf die bestehende ToolRegistry def registerToolbox(self, toolbox: ToolboxDefinition): ... def getActiveToolboxes( self, featureCode: str, userConnections: List[str], # Aktive Connection-Typen des Users explicitToolboxes: List[str], # Explizit aktivierte Toolboxes ) -> List[str]: ... def getToolsForToolboxes( self, activeToolboxIds: List[str], ) -> List[ToolDefinition]: ... ``` ### 5.2 AgentConfig (erweitert mit Eskalation) ```python class AgentConfig(BaseModel): maxRounds: int = 25 maxCostCHF: Optional[float] = None initialToolboxes: List[str] = ["core"] # Tools bei Runde 1 availableToolboxes: List[str] = [] # Via requestToolbox anforderbar temperature: Optional[float] = None ``` ### 5.3 Dynamische Toolbox-Eskalation **Kern-Idee:** Der Agent startet mit einem kompakten Initial-Set (`initialToolboxes`). Er kennt den Katalog aller `availableToolboxes`. Wenn er Spezial-Tools braucht, ruft er `requestToolbox(id)` auf — die Tools werden in der nächsten Runde bereitgestellt. ``` Runde 1: Aktive Tools: core (readFile, webSearch, ...) + requestToolbox System-Prompt: "Verfügbare Toolboxes: email, sharepoint, ai, ..." User: "Lies meine letzten Emails und fasse sie zusammen" LLM: → requestToolbox("email") Runde 2: Aktive Tools: core + email (outlook_readEmails, ...) + requestToolbox LLM: → outlook_readEmails(connectionRef, folder="Inbox") Runde 3: LLM: → (Text-Antwort mit Zusammenfassung) → COMPLETED ``` **requestToolbox Meta-Tool:** ```python { "name": "requestToolbox", "description": "Request specialized tools for the current task.", "parameters": { "toolboxId": { "type": "string", "enum": [...] # Dynamisch: nur availableToolboxes }, "reason": {"type": "string"} } } ``` ### 5.4 Toolbox-Registrierung (Architektur) ``` App Startup │ ├── _registerCoreTools(registry) → 40 Core-Tools in ToolRegistry ├── ActionToolAdapter.registerAll(registry) → dynamicMode Actions als Tools │ └── ToolboxRegistry.registerAll() ├── Toolbox "core" → readFile, listFiles, webSearch, ... (isDefault=true) ├── Toolbox "ai" → summarizeContent, generateImage, ... ├── Toolbox "datasources" → browseDataSource, downloadFromDataSource, ... ├── Toolbox "email" → sendMail, outlook_readEmails, ... (requiresConnection="outlook") ├── Toolbox "sharepoint" → sharepoint_findDocumentPath, ... (requiresConnection="sharepoint") ├── Toolbox "clickup" → clickup_searchTasks, ... (requiresConnection="clickup") ├── Toolbox "jira" → jira_connectJira, ... (requiresConnection="jira") ├── Toolbox "workflow" → readWorkflowGraph, addNode, connectNodes, ... ├── Toolbox "trustee" → trustee_extractFromFiles, ... (featureCode="trustee") └── Toolbox "chatbot" → chatbot_queryDatabase, ... (featureCode="chatbot") Per Agent-Run: │ ├── Feature-Instanz Config → initialToolboxes + availableToolboxes ├── User Connections → filtert availableToolboxes (nur was User nutzen kann) │ ├── Runde 1: getToolsForToolboxes(initialToolboxes) + requestToolbox │ └── Nach requestToolbox("email"): Runde 2+: getToolsForToolboxes(initialToolboxes + ["email"]) + requestToolbox ``` ### 5.5 Sub-Agent-Architektur ```python # Bestehendes Pattern: featureDataAgent.py # Aufruf-Kette: # Main Agent → Tool "queryFeatureInstance" → _queryFeatureInstance() # → runFeatureDataAgent(question, featureInstanceId, ...) # → Eigene ToolRegistry (browseTable, queryTable) # → runAgentLoop(prompt, registry, config=AgentConfig(maxRounds=5)) # → Antwort zurück → ToolResult → Main Agent ``` **Sub-Agents sind komplementär zu Toolboxes:** - **Toolbox-Eskalation:** Agent bekommt Tools direkt und nutzt sie selbst (z.B. Email-Tools) - **Sub-Agent:** Agent delegiert an spezialisierten Mini-Agent mit eigenem Wissen (z.B. Trustee DB-Schema) Faustregel: Toolbox wenn die Tools generisch sind. Sub-Agent wenn Feature-spezifisches Kontext-/Schema-Wissen nötig ist. ### 5.6 Toolbox-Zuordnung pro Feature | Feature | `initialToolboxes` | `availableToolboxes` (anforderbar) | |---------|--------------------|------------------------------------| | **Workspace** | `["core"]` | `["ai", "datasources", "email", "sharepoint", "clickup", "jira"]` — gefiltert nach User-Connections | | **Graphical Editor Chat** | `["core", "workflow"]` | `["ai"]` | | **CommCoach** | `["core"]` | `[]` (keine Eskalation) | | **Chatbot** | (Eigenes Tool-System) | — | --- ## 6. Graph-Struktur und Node-Definitionen ### 6.1 Graph-Schema (innerhalb WorkflowVersion.graph) ```json { "nodes": [ { "id": "node-uuid", "type": "trigger.schedule", "position": { "x": 100, "y": 200 }, "parameters": { "cronExpression": "0 8 * * 1-5" } }, { "id": "node-uuid-2", "type": "ai.prompt", "position": { "x": 300, "y": 200 }, "parameters": { "prompt": "Analysiere die Emails und erstelle eine Zusammenfassung" } } ], "connections": [ { "source": "node-uuid", "target": "node-uuid-2", "sourceOutput": 0, "targetInput": 0 } ] } ``` ### 6.2 Node-Definition-Schema ```python # Jede Node-Definition ist ein Dict mit folgender Struktur: { "id": "ai.prompt", # Unique Node-Type-ID "category": "ai", # Kategorie (für Palette-Gruppierung) "label": {"en": "Prompt", "de": "Prompt"}, # Multilingual "description": {"en": "...", "de": "..."}, "parameters": [ { "name": "prompt", "type": "string", "required": True, "description": {"en": "AI prompt", "de": "KI-Prompt"} } ], "inputs": 1, # Anzahl Eingänge "outputs": 1, # Anzahl Ausgänge "meta": {"icon": "mdi-robot", "color": "#9C27B0"}, # Interne Felder (nicht an Frontend exponiert): "_method": "ai", # Method für ActionExecutor "_action": "process", # Action für ActionExecutor "_paramMap": {"prompt": "aiPrompt"}, # Parameter-Mapping Node → Action } ``` ### 6.3 Node-Type zu Method/Action Mapping | Node Type | `_method` | `_action` | `_paramMap` | |-----------|-----------|-----------|-------------| | `trigger.manual` | — | — | — (TriggerExecutor) | | `trigger.schedule` | — | — | — (TriggerExecutor) | | `trigger.form` | — | — | — (TriggerExecutor) | | `flow.ifElse` | — | — | — (FlowExecutor) | | `flow.switch` | — | — | — (FlowExecutor) | | `flow.loop` | — | — | — (FlowExecutor) | | `input.*` | — | — | — (InputExecutor → HumanTask) | | `ai.prompt` | `ai` | `process` | `prompt→aiPrompt` | | `ai.webResearch` | `ai` | `webResearch` | `query→prompt` | | `ai.summarizeDocument` | `ai` | `summarizeDocument` | — | | `ai.translateDocument` | `ai` | `translateDocument` | `targetLanguage→targetLanguage` | | `ai.generateDocument` | `ai` | `generateDocument` | `prompt→prompt` | | `email.checkEmail` | `outlook` | `readEmails` | `connectionId→connectionReference` | | `email.searchEmail` | `outlook` | `searchEmails` | `connectionId→connectionReference` | | `email.draftEmail` | `outlook` | `composeAndDraft...` | `connectionId→connectionReference` | | `sharepoint.*` | `sharepoint` | entsprechend | `connectionId→connectionReference` | | `clickup.*` | `clickup` | entsprechend | `connectionId→connectionReference` | | `file.create` | `file` | `create` | `template→template` | ### 6.4 Execution-Routing ``` executeGraph(graph, services, ...) │ ├── parseGraph() → nodes, connections, nodeIds ├── validateGraph() → Konsistenzprüfung ├── topoSort(nodes, connectionMap) → geordnete Node-Liste │ └── Pro Node: │ ├── nodeType.startsWith("trigger.") → TriggerExecutor │ → runEnvelope passthrough │ ├── nodeType.startsWith("flow.") → FlowExecutor │ → ifElse: evaluiert Bedingung, setzt active path │ → switch: evaluiert Match, setzt active path │ → loop: iteriert mit _loopState │ ├── nodeType.startsWith("input.") → InputExecutor │ → erstellt HumanTask │ → setzt Run auf PAUSED │ → raise PauseForHumanTaskError │ └── nodeType.startsWith("ai." | "email." | "sharepoint." | ...) → ActionNodeExecutor │ ├── _getNodeDefinition(nodeType) → _method, _action, _paramMap ├── Parameter-Mapping (Node-Params → Action-Params) ├── Upstream-Daten mergen (documents von vorherigen Nodes) └── ActionExecutor.executeAction(method, action, params) ``` --- ## 7. Scheduler-Architektur (konsolidiert) ### 7.1 Konsolidierter Scheduler (Ziel) ```python # Ziel: workflows/scheduler/mainScheduler.py # Übernimmt die besten Patterns aus v1 und v2 class WorkflowScheduler: """Konsolidierter Scheduler für Workflow-Automationen.""" def start(self, eventUser): """Startup: Initial Sync + Delayed Sync + Callback Registration.""" eventManager.start() self._syncScheduledWorkflows(eventUser) self._registerDelayedSync(eventUser, delaySeconds=5) callbackRegistry.register("workflow.changed", lambda _: self._syncScheduledWorkflows(eventUser)) def stop(self, eventUser): """Shutdown: Cleanup.""" pass # APScheduler handles job cleanup def _syncScheduledWorkflows(self, eventUser): """Inkrementeller Sync (v1-Pattern).""" workflows = self._getAllSchedulableWorkflows() for wf in workflows: jobId = f"workflow.{wf.id}" if wf.active and wf.currentVersionId: # Register/Replace Job version = self._getPublishedVersion(wf.currentVersionId) cronKwargs = self._extractSchedule(version) if cronKwargs: handler = self._createHandler(wf, version, eventUser) eventManager.registerCron( jobId=jobId, func=handler, cronKwargs=cronKwargs, replaceExisting=True # v1-Pattern: atomic replace ) # Persist eventId on Workflow (v1-Pattern: debugging) if wf.eventId != jobId: self._updateEventId(wf.id, jobId) else: # Deactivated: Remove Job + Clear eventId self._removeJob(jobId) if wf.eventId: self._updateEventId(wf.id, None) def _createHandler(self, wf, version, eventUser): """Handler: Reload + Active-Check + Execute (v1-Pattern).""" async def handler(): # Reload Workflow (v1-Pattern: Zustand könnte sich geändert haben) current = self._getWorkflow(wf.id) if not current or not current.active: return # Execute published version services = self._buildServices(current, eventUser) result = await executeGraph( graph=version.graph, services=services, workflowId=current.id, ... ) # Execution Log (v1-Pattern: capped audit trail) self._appendExecutionLog(current.id, result) # Thread-Bridge (v2-Pattern) return self._wrapAsync(handler) ``` ### 7.2 Scheduler State Machine ``` Workflow.active=false ──► UNREGISTERED (kein APScheduler Job) │ │ activate + published version mit schedule invocation ▼ Workflow.active=true ──► REGISTERED (eventId gesetzt, Job aktiv) │ ├── Cron fires → handler() → executeGraph → WorkflowRun │ ├── Workflow deactivated → remove job → UNREGISTERED (eventId=null) │ ├── Schedule geändert → replaceExisting=true → REGISTERED (neuer Trigger) │ └── Workflow gelöscht → remove job → (Workflow gelöscht) ``` --- ## 8. Backend Code-Struktur (Ziel) ``` gateway/modules/ │ ├── features/ │ ├── graphicalEditor/ ◄── NEUES FEATURE (ersetzt automation2) │ │ ├── mainGraphicalEditor.py │ │ │ FEATURE_CODE = "graphicalEditor" │ │ │ UI_OBJECTS = [ │ │ │ "ui.feature.graphicalEditor.editor", │ │ │ "ui.feature.graphicalEditor.workflows", │ │ │ "ui.feature.graphicalEditor.tasks", │ │ │ ] │ │ │ RESOURCE_OBJECTS = [ │ │ │ "resource.feature.graphicalEditor.execute", │ │ │ "resource.feature.graphicalEditor.schedule", │ │ │ ] │ │ │ TEMPLATE_ROLES = [ │ │ │ "graphicalEditor-admin", │ │ │ "graphicalEditor-user", │ │ │ "graphicalEditor-viewer", │ │ │ ] │ │ │ │ │ ├── routeFeatureGraphicalEditor.py │ │ │ router prefix: /api/workflows │ │ │ Endpoints: CRUD Workflows, Versions, Execute, Runs, Tasks, NodeTypes, Chat │ │ │ │ │ ├── interfaceFeatureGraphicalEditor.py │ │ │ DB: poweron_workflows (oder poweron_automation2 renamed) │ │ │ Tables: Workflow, WorkflowVersion, WorkflowRun, RunStepLog, HumanTask │ │ │ │ │ ├── datamodelFeatureGraphicalEditor.py │ │ │ Workflow, WorkflowVersion, WorkflowRun, RunStepLog, HumanTask │ │ │ WorkflowStatus, RunStatus, StepStatus, TaskStatus (State Machines) │ │ │ │ │ └── nodeDefinitions/ │ │ triggers.py, flow.py, input.py, ai.py, email.py, sharepoint.py, clickup.py, file.py │ │ │ ├── workspace/ ◄── BLEIBT (AI Chat mit UDB) │ ├── automation/ ◄── BLEIBT bis Ablösung durch graphicalEditor │ ├── commcoach/ ◄── BLEIBT │ ├── chatbot/ ◄── BLEIBT │ ├── trustee/ ◄── BLEIBT │ └── ... │ ├── serviceCenter/services/ │ ├── serviceAgent/ │ │ ├── mainServiceAgent.py ◄── Core Tools + Toolbox-Tagging │ │ ├── agentLoop.py ◄── Toolbox-Filtering statt toolSet │ │ ├── toolRegistry.py ◄── BLEIBT │ │ ├── toolboxRegistry.py ◄── NEU: Toolbox-Verwaltung │ │ ├── actionToolAdapter.py ◄── BLEIBT (+ Toolbox-Zuordnung) │ │ ├── featureDataAgent.py ◄── BLEIBT (Sub-Agent Pattern) │ │ └── datamodelAgent.py ◄── toolboxes: List[str] statt toolSet │ └── ... │ ├── workflows/ │ ├── methods/ ◄── BLEIBT (Unified Action Library) │ ├── processing/ ◄── BLEIBT (ActionExecutor, methodDiscovery) │ ├── automation2/ ◄── Graph Engine BLEIBT (execution, executors, graphUtils) │ ├── scheduler/ ◄── NEU (konsolidierter Scheduler) │ │ └── mainScheduler.py │ ├── automation/ ◄── BLEIBT bis Ablösung │ └── workflowManager.py ◄── BLEIBT (für Workspace Dynamic Mode) │ └── ... ``` --- ## 9. Frontend Code-Struktur (Ziel) ``` frontend_nyla/src/ │ ├── components/ ◄── SHARED COMPONENTS │ ├── ChatBar/ ◄── NEU (extrahiert aus WorkspaceInput) │ │ ├── ChatBar.tsx Props: onSend, onStop, onVoice, │ │ │ fileAttachments, showProviderSelector, │ │ │ showVoice, placeholder │ │ ├── ChatBar.module.css │ │ └── index.ts │ │ │ ├── ChatStream/ ◄── NEU (extrahiert aus workspace/ChatStream) │ │ ├── ChatStream.tsx SSE-driven message stream, wiederverwendbar │ │ └── index.ts │ │ │ ├── UnifiedDataBar/ ◄── BLEIBT │ │ │ ├── FlowEditor/ ◄── RENAMED von Automation2FlowEditor │ │ ├── editor/ │ │ │ ├── FlowEditor.tsx + AI Chat Panel (ChatBar + ChatStream) │ │ │ ├── FlowCanvas.tsx + Visual Run Tracing │ │ │ ├── EditorChatPanel.tsx ◄── NEU │ │ │ └── ... │ │ └── ... │ │ │ └── ... │ ├── pages/views/ │ ├── workspace/ │ │ ├── WorkspacePage.tsx Nutzt ChatBar, ChatStream, UDB │ │ └── ... │ │ │ ├── graphicalEditor/ ◄── RENAMED von automation2 │ │ ├── GraphicalEditorPage.tsx FlowEditor + UDB + ChatPanel │ │ ├── WorkflowsListPage.tsx │ │ └── TasksPage.tsx │ │ │ └── ... │ ├── api/ │ ├── workflowApi.ts ◄── Konsolidiert (ersetzt automationApi + automation2Api) │ └── ... │ └── ... ``` --- ## 10. Zusammenfassung: Architektur-Prinzipien | Prinzip | Umsetzung | |---------|-----------| | **Feature-Container-Pattern** | Jedes Feature folgt dem gleichen Verzeichnis-Pattern mit automatischer Discovery. Kein manuelles Wiring. | | **RBAC auf zwei Ebenen** | Mandate-Rollen (generell) + Instanz-Rollen (feature-spezifisch). Höchste Priorität gewinnt. | | **State Machines als Grundlage** | WorkflowVersion, WorkflowRun, HumanTask und Scheduler haben klar definierte Zustände und Übergänge. | | **Unified Action Library** | Eine Method/Action-Library für alle Consumers (Agent Tools, Graph Nodes, Workflow Processor). | | **Toolboxes statt flache Tool-Liste** | Thematisch gruppierte Tools, kontextabhängig aktiviert. Skaliert auf 100+ Tools. | | **Sub-Agents für Feature-Daten** | Main Agent bleibt schlank. Feature-spezifisches Wissen in dedizierten Sub-Agents. | | **Keine Migration** | Direkte Implementierung auf Automation2-Basis. v1 bleibt bis Ablösung. | | **Scheduler konsolidiert** | Beste Patterns aus v1 (inkrementell, eventId, reload+check) und v2 (interval, thread-bridge). |