# Konzept: AI Agent Architecture & Unified Workspace > Poweron Platform -- Zentrales AI Handling und Editor-UI > Status: Konzept / Version 2.0 / März 2026 --- ## 1. Ausgangslage Die Plattform hat vier isolierte Systeme für AI-Interaktionen, jedes mit eigenen Schwächen: | System | Stärke | Schwäche | |--------|--------|----------| | **serviceAi** (Pipeline) | Modell-Fallback, Billing, Dokumentenextraktion | Rigide Pipeline (extract→structure→fill), N×M AI-Call-Explosion, kein Caching | | **Codeeditor** | ReAct-Loop mit Tool-Use, Agent-Modus | Nur read-only Tools, eigenes UI, nur Codeeditor-Scope | | **Chatbot** | LangGraph, SQL/Tavily-Integration | Kein File-Handling, keine Dokumentbearbeitung, eigenes UI | | **Chatplayground** | Workflow-Management | Kein Streaming, kein Voice, eigenes UI | Zusätzliche Defizite: - **Kein Wissenstransfer:** Jeder Workflow startet bei null, selbst wenn dasselbe Dokument schon extrahiert wurde - **File-System:** Flach ohne Ordner, Tags oder FeatureInstance-Zuordnung - **Keine externen Datenquellen** im Chat referenzierbar (SharePoint-Ordner, Drive, etc.) - **Grosse Dokumente:** 500-Seiten-PDFs werden blind vollständig extrahiert (500+ AI-Calls) - **Kein Container-Handling:** ZIP-Files, Ordner und verschachtelte Dokumente werden nicht unterstützt - **Keine generische Provider-Abstraktion:** SharePoint/Outlook je eigene Implementierung, obwohl beides MSFT-Services über eine Connection. Kein Google Drive, FTP etc. - **36 Workflow-Actions isoliert:** Bestehende Actions (SharePoint, Outlook, Jira, AI) nicht als Agent-Tools nutzbar --- ## 2. Zielarchitektur ### 2.1 Kernidee: Agent statt Pipeline **Bisher:** Code entscheidet starr, was passiert: ``` extractContent() → generateStructure() → fillSections() → render() ``` **Neu:** AI entscheidet dynamisch, welche Tools sie braucht: ``` User Prompt + Files → Agent Loop → Tool Calls → Result ``` ### 2.2 Gesamtübersicht ```mermaid graph TB subgraph ui["Unified Workspace UI"] ChatPanel["Chat / Streaming"] FilePanel["File Browser"] DataPanel["Data Source Picker"] KnowledgePanel["Knowledge Browser"] end subgraph agentSvc["serviceAgent"] Loop["Agent Loop\nReAct + native function calling"] TR["Tool Registry"] CTX["Context Manager\nRAG Retrieval"] end subgraph knowledge["Knowledge Store"] SharedL["Shared Layer\nmandateId"] InstanceL["Instance Layer\nuserId + featureInstanceId"] WorkflowL["Workflow Layer\nworkflowId"] end subgraph tools["Tools"] FileT["File Tools"] DocT["Document Tools"] ConnT["Connection Tools"] ExtT["External Tools"] WfT["Workflow Tools"] end subgraph infra["Bestehende Infrastruktur"] AiSvc["serviceAi\nModell-Fallback + Billing"] ExtrSvc["serviceExtraction"] Stream["serviceStreaming"] Voice["serviceVoice"] UCon["UserConnections"] end ui -->|"SSE Stream"| agentSvc CTX -->|"RAG"| knowledge DocT -->|"Cache"| knowledge Loop --> TR TR --> tools Loop --> CTX Loop --> AiSvc Loop --> Stream DocT --> ExtrSvc ConnT --> UCon ``` --- ## 3. Agent Core (`serviceAgent`) ### 3.1 Aufbau Neuer Service unter `modules/serviceCenter/services/serviceAgent/`: | Datei | Funktion | |-------|----------| | `mainServiceAgent.py` | Entry Point: `runAgent(prompt, fileIds, toolSet, config)` | | `agentLoop.py` | ReAct-Loop mit native function calling | | `toolRegistry.py` | Tool-Registrierung und Dispatch | | `conversationManager.py` | Context-Window-Management | | `datamodelAgent.py` | AgentState, ToolDefinition, ToolResult, AgentConfig | **ServiceCenter-Registry:** ```python # In serviceCenter/registry.py IMPORTABLE_SERVICES = { ... "agent": { "module": "modules.serviceCenter.services.serviceAgent.mainServiceAgent", "class": "ServiceAgent", "dependencies": ["ai", "chat", "extraction", "billing", "streaming"], "objectKey": "service.agent" } } ``` `serviceAgent` hat die meisten transitiven Dependencies (über `ai` → `chat`, `utils`, `extraction`, `billing`). Die bestehende DI-Kette im ServiceCenter löst dies automatisch auf. **AgentConfig:** ```python class AgentConfig(BaseModel): maxRounds: int = 25 maxCostCHF: Optional[float] = None # Workflow-Budget-Cap entityCacheEnabled: bool = False # Opt-in Entity Extraction (Extra-AI-Call pro Round) toolSet: str = "core" # Welches Tool-Set aktiv operationType: str = "AGENT" # Für Modell-Auswahl ``` ### 3.2 Agent Loop ```python async def runAgent(prompt, fileIds, toolSet, config, progressCallback): state = AgentState(maxRounds=config.maxRounds) tools = toolRegistry.getTools(toolSet) messages = [buildSystemPrompt(tools), {"role": "user", "content": prompt}] while state.status == "running" and state.currentRound < state.maxRounds: state.currentRound += 1 # Kosten-Budget prüfen if config.maxCostCHF and await billingService.getWorkflowCost(workflowId) > config.maxCostCHF: state.status = "budgetExceeded" yield AgentEvent(type="final", content=_summarizeProgress(state)) break response = await aiService.callAi(AiCallRequest( messages=messages, options=AiCallOptions(operationType=OperationTypeEnum.AGENT), tools=tools )) toolCalls = response.toolCalls if not toolCalls: state.status = "completed" yield AgentEvent(type="final", content=response.content) break # Tool-Ausführung: sequential default, parallel nur für readOnly-Tools results = await _executeToolCalls(toolCalls, context) messages.extend(formatToolResults(toolCalls, results)) yield AgentEvent(type="toolResults", data=results) # Graceful Degradation bei maxRounds if state.currentRound >= state.maxRounds and state.status == "running": state.status = "maxRoundsReached" yield AgentEvent(type="final", content=_summarizeProgress(state)) async def _executeToolCalls(toolCalls, context): """Sequential als Default. Parallel nur für readOnly-Tools (keine Race Conditions).""" readOnlyCalls = [tc for tc in toolCalls if toolRegistry.isReadOnly(tc.name)] writeCalls = [tc for tc in toolCalls if not toolRegistry.isReadOnly(tc.name)] results = {} if readOnlyCalls: readResults = await asyncio.gather(*[ toolRegistry.dispatch(tc.name, tc.args, context) for tc in readOnlyCalls ]) for tc, result in zip(readOnlyCalls, readResults): results[tc.id] = result for tc in writeCalls: results[tc.id] = await toolRegistry.dispatch(tc.name, tc.args, context) return [results[tc.id] for tc in toolCalls] ``` ### 3.3 Design-Entscheidungen - **Native function calling** als Standard (OpenAI tool_choice Format) -- zuverlässiger als text-basiertes Parsing - **Text-basierter Fallback** für Modelle ohne function calling Support - Nutzt bestehende `callAi`-Infrastruktur (Modell-Fallback, Billing, Provider-Auswahl) -- damit greifen Provider-RBAC, Model-RBAC und Billing-Checks automatisch - `AiCallRequest` wird erweitert um `messages` und `tools` (abwärtskompatibel: bestehende `prompt`/`context`/`contentParts` bleiben) - **Tool-Ausführung sequential** als Default -- parallel nur für explizit als `readOnly=True` markierte Tools (verhindert Race Conditions bei State-modifizierenden Tools) - **Kosten-Budget** (`maxCostCHF`) pro Workflow -- Agent stoppt mit Fortschritts-Summary bei Überschreitung - **Graceful Degradation** bei `maxRounds` -- Agent liefert Summary statt stiller Abbruch ### 3.4 Fehlerbehandlung | Fehlertyp | Strategie | |-----------|-----------| | **Tool-Fehler** (Exception in Tool) | Fehler als ToolResult an Agent zurückgeben -- Agent entscheidet über Retry oder alternativen Ansatz | | **Halluzinierte Tool-Namen** | Validierung gegen `toolRegistry.getTools()` -- ungültige Tools werden als Fehler-ToolResult zurückgegeben | | **Externe API-Fehler** (Timeout, 5xx) | Retry mit Exponential Backoff (max 3 Versuche), dann Fehler-ToolResult | | **AI-Call-Fehler** | Bestehende Fallback-Logik in `serviceAi` (Modell-Fallback, Provider-Switch) | | **maxRounds erreicht** | Graceful Degradation: Agent liefert Summary des bisherigen Fortschritts | | **Budget überschritten** | Agent stoppt, liefert Zusammenfassung, User wird informiert | | **Knowledge Store nicht verfügbar** | Agent arbeitet ohne RAG-Kontext weiter (degraded mode) | **Input-Validierung:** Tool-Argumente vom LLM sind unvalidiert. Jedes Tool validiert seine Args vor Weitergabe an darunterliegende Interfaces: - Path-Traversal-Schutz (`readFile`, `writeFile`): Pfade auf erlaubten Scope beschränken - SQL-Injection-Schutz (`sqlQuery`): Parametrisierte Queries - Container-Limits: `maxTotalExtractedSize`, `maxFileCount`, Symlink-Blockierung (ZIP-Bomb-Schutz) ### 3.5 RBAC und Sicherheit **Design-Entscheidung: Keine Tool-Level-RBAC nötig.** Die Berechtigungen greifen auf Daten- und Service-Ebene unterhalb der Tools: | Tool | Absicherung unterhalb | |------|----------------------| | File-Tools (`readFile`, `writeFile`, `listFiles`) | `interfaceDbApp` → `getRecordsetWithRBAC()` mit WHERE nach `MY/GROUP/ALL` | | External-Tools (`externalBrowse`, `externalDownload`) | UserConnection OAuth-Token -- User sieht nur, was sein Account erlaubt | | AI-Calls (Agent-Loop) | `serviceAi` → `_checkBillingBeforeAiCall()` → Provider-RBAC + Model-RBAC | | Service-Zugriff | `can_access_service()` prüft RESOURCE-ObjectKey | **serviceAgent als IMPORTABLE_SERVICE:** `serviceAgent` wird in der ServiceCenter-Registry registriert mit `objectKey = "service.agent"`. Damit greift `can_access_service()` und der Zugang zum Agent ist per Rolle steuerbar. **Knowledge Store Shared Layer:** Dokumente mit `isShared=True` sind mandateweit sichtbar. Neue Tabellen (`FileContentIndex`, `ContentChunk`) brauchen Einträge in `TABLE_NAMESPACE` mit DATA-ObjectKeys, damit `getRecordsetWithRBAC()` greift. Promote in Shared Layer (`isShared=True`) erfordert Access-Level `ALL`. ### 3.6 Tool Registry ```python CORE_TOOLS = ["readFile", "writeFile", "listFiles", "searchFiles", "extractContent", "summarizeContent", "webSearch"] ``` Jedes Tool wird mit `readOnly`-Flag registriert (steuert parallele vs. sequentielle Ausführung): ```python toolRegistry.register("readFile", readFileTool, readOnly=True) toolRegistry.register("writeFile", writeFileTool, readOnly=False) toolRegistry.register("sqlQuery", sqlQueryTool, featureType="chatbot", readOnly=True) toolRegistry.register("sharePointUpload", spTool, featureType="trustee", readOnly=False) ``` --- ## 4. Knowledge Store (RAG) ### 4.1 Kernprinzip Persistenter Wissensspeicher mit drei Ebenen. Ein File wird **einmal** extrahiert und steht danach allen Workflows zur Verfügung. Automatische Indexierung bei Upload. ### 4.2 Drei-Ebenen-Architektur ```mermaid graph TB subgraph scope["Knowledge Store"] direction TB subgraph shared["Shared Layer -- mandateId"] GD["Globale Firmen-Dokumente\nTemplates, Shared Knowledge"] end subgraph instance["Instance Layer -- userId + featureInstanceId"] FI["User-Dokumente\nFile-Embeddings\nDocument Indexes"] end subgraph wf["Workflow Layer -- workflowId"] WF["Conversation Summary\nEntity Cache\nTool Results"] end shared --> instance instance --> wf end ``` | Ebene | Scope | Inhalt | Lebensdauer | |-------|-------|--------|-------------| | **Shared** | mandateId | Globale Dokumente, Templates | Permanent | | **Instance** | userId + featureInstanceId | Extrahierte Dokumente, Embeddings | Permanent pro Workspace | | **Workflow** | workflowId | Conversation Summaries, Entities, Tool-Ergebnisse | Lebt mit dem Workflow | ### 4.3 Automatische Indexierung bei Upload Jedes hochgeladene File durchläuft eine Background-Pipeline -- **komplett ohne AI**: ``` Upload → Container auflösen (rekursiv) → Typ-Extractor pro File → Content-Objekte → Chunking → Embedding → Knowledge Store ``` ```python async def _onFileUploaded(fileId, userId, featureInstanceId): fileData = await getFileData(fileId) mimeType = _detectMimeType(fileData) # 1. Container auflösen (ZIP/Folder → Files), dann pro File extrahieren files = await _resolveToFiles(fileId, fileData, mimeType) for file in files: # 2. Typ-Extractor: Content-Objekte extrahieren (kein AI) contentIndex = await _extractFileContents(file.id, file.data, file.mimeType) # 3. Text-Content-Objekte chunken und embedden textObjects = [o for o in contentIndex.objects if o.contentType == "text"] chunks = _chunkForEmbedding(textObjects, chunkSize=512) embeddings = await embeddingService.embed([c.data for c in chunks]) # 4. Im Knowledge Store persistieren (userId + featureInstanceId scoped) for chunk, embedding in zip(chunks, embeddings): await knowledgeStore.upsertContentChunk(ContentChunk( contentObjectId=chunk.id, fileId=file.id, userId=userId, featureInstanceId=featureInstanceId, data=chunk.data, contextRef=chunk.contextRef, embedding=embedding )) await knowledgeStore.updateFileStatus(fileId, "indexed") ``` Ergebnis: Sekunden bis Minuten nach Upload ist das File für alle Workflows RAG-ready. Nachfolgende Aktivitäten holen den Inhalt direkt aus dem Knowledge Store. ### 4.4 Datenmodelle (PostgreSQL + pgvector) ```python class FileContentIndex(BaseModel): """Index der Content-Strukturen pro File. Lebt im Instance Layer. Wird bei Extraktion erstellt (kein AI).""" id: str # = fileId userId: str featureInstanceId: str mandateId: str isShared: bool = False # Shared Layer Sichtbarkeit fileName: str mimeType: str containerPath: Optional[str] # Pfad im Container (z.B. "archiv.zip/folder/report.pdf") totalObjects: int # Anzahl Content-Objekte totalSize: int structure: Dict[str, Any] # Strukturübersicht (Pages, Sections, etc.) objectSummary: List[Dict] # Kompakte Übersicht pro Content-Objekt extractedAt: float status: str # pending, extracted, embedding, indexed, failed class ContentChunk(BaseModel): """Persistierter Content-Chunk. Wiederverwendbar über Workflows. Skalares Content-Objekt (oder Chunk davon) mit Embedding.""" id: str contentObjectId: str # FK zum Content-Objekt fileId: str # FK zum File userId: str featureInstanceId: str contentType: str # text, image, videostream, audiostream, other data: str # Inhalt (Text, Base64, URL) contextRef: Dict[str, Any] # Kontext-Referenz (Seite, Position, Label) summary: Optional[str] # AI-generierte Zusammenfassung (bei Bedarf) metadata: Dict[str, Any] = {} embedding: List[float] # pgvector (NOT NULL für text-Chunks) class WorkflowMemory(BaseModel): """Workflow-spezifischer Cache.""" id: str workflowId: str userId: str featureInstanceId: str key: str # z.B. "entity:companyName" value: str source: str # extraction, tool, conversation, summary createdAt: float embedding: Optional[List[float]] ``` ### 4.5 RAG-Retrieval: 3-Ebenen-Suche Vor jedem Agent-Round sucht der Context Manager über alle drei Ebenen: ```python async def _buildAgentContext(self, currentPrompt, workflowId, userId, featureInstanceId, mandateId, contextBudget): # 1. Instance Layer: User-Dokumente (höchste Priorität) instanceChunks = await knowledgeStore.semanticSearch( query=currentPrompt, userId=userId, featureInstanceId=featureInstanceId, limit=15, minScore=0.65 ) # 2. Shared Layer: Globale Firmen-Dokumente sharedChunks = await knowledgeStore.semanticSearch( query=currentPrompt, mandateId=mandateId, isShared=True, limit=10, minScore=0.7 ) # 3. Workflow Layer: Aktueller Kontext entities = await knowledgeStore.getEntities(workflowId) summary = await knowledgeStore.getConversationSummary(workflowId) toolResults = await knowledgeStore.getRecentToolResults(workflowId, limit=5) context = ContextBuilder(budget=contextBudget) context.add(priority=1, content=currentPrompt) context.add(priority=2, content=instanceChunks) context.add(priority=3, content=entities) context.add(priority=4, content=sharedChunks) context.add(priority=5, content=summary) context.add(priority=6, content=toolResults) return context.build() ``` ### 4.6 Cross-Workflow Wissenstransfer ``` Workflow A (Montag): User: "Analysiere Q4-Report.pdf" → Extraktion + Embedding im Instance Layer (200 Chunks) → Agent findet: Revenue = 12.5M CHF Workflow B (Dienstag): User: "Vergleiche Q4-Report mit Budget.xlsx" → Q4-Report: Bereits im Knowledge Store → 0 Extraktion → Budget.xlsx: Automatisch indexiert nach Upload → Semantische Suche "Q4 Revenue" → sofort relevanter Chunk ``` ### 4.7 Progressive Summarization Statt alte Nachrichten abzuschneiden (`trim_messages`): ``` Round 1-3: Volle Nachrichten Round 4: Nachrichten 1-2 → Summary Round 7: Summary(1-4) + volle 5-7 Round 15: Meta-Summary(1-10) + Summary(11-13) + volle 14-15 ``` Der Agent verliert nie den Faden. Summaries werden im Workflow Layer persistiert. **Kosten:** Jede Summarization ist ein AI-Call (günstigeres Modell, z.B. `DATA_ANALYSE`). Bei einem 15-Round-Workflow: ~3 Summary-Calls. Diese Kosten fliessen ins Workflow-Budget (`maxCostCHF`) ein und erscheinen als eigener Typ in der `BillingTransaction`. ### 4.8 Entity Cache Extraktion wichtiger Fakten nach Tool-Ergebnissen. **Opt-in per Config** (jeder Aufruf ist ein Extra-AI-Call): ```python if config.entityCacheEnabled: entities = await _extractEntities(response) # {"company": "Acme AG", "revenueQ4": "12.5M CHF"} for key, value in entities.items(): await knowledgeStore.upsertEntity(workflowId, key, value) ``` **Kosten-Kontrolle:** Entity-Extraktion ist standardmässig deaktiviert. Für häufig genutzte Entities (Firmennamen, Beträge, Daten) kann alternativ regelbasierte Extraktion (Regex/NER) ohne AI-Call eingesetzt werden. ### 4.9 Embedding-Strategie | Aspekt | Entscheidung | |--------|-------------| | Embedding-Modell | OpenAI `text-embedding-3-small` (1536 dim); lokal: sentence-transformers | | Vector Store | pgvector auf bestehendem PostgreSQL | | Chunk-Grösse | 512 Tokens | | Index-Typ | IVFFlat (< 100k Chunks), HNSW (> 100k) | | Suche | Cosine Similarity, gefiltert nach userId + featureInstanceId | --- ## 5. Smart Document Handling ### 5.1 Problem Ein 200MB PDF mit 500 Seiten: Aktuell werden alle 500 Seiten blind extrahiert, jede einzeln an AI gesendet → 500+ AI-Calls. ### 5.2 Lösung: 3-Stufen-Modell ```mermaid graph TD Upload["Dokument Upload"] subgraph stage1["Stufe 1: Structure Pre-Scan"] TOC["TOC Extraction"] Headings["Heading Detection"] PageMap["Page Map"] end subgraph stage2["Stufe 2: Selective Extraction"] OnDemand["Nur angeforderte Seiten"] Cache["Knowledge Store Cache"] end subgraph stage3["Stufe 3: Deep Analysis"] Vision["Vision AI"] TableEx["Tabellen-Extraktion"] OCR["OCR"] end Upload --> stage1 stage1 -->|"Agent entscheidet"| stage2 stage2 -->|"Bei Bedarf"| stage3 ``` **Stufe 1 -- Structure Pre-Scan (Sekunden, kein AI):** Rein programmatisch mit PyMuPDF. Extrahiert TOC, Headings (Font-Size-Heuristik), Seitenstruktur, Bildpositionen. Ergebnis ist der `FileContentIndex` (siehe 5.4). Der Agent sieht sofort die Struktur eines 500-Seiten-PDFs. ```python async def preScanDocument(fileId: str) -> FileContentIndex: doc = fitz.open(stream=fileData, filetype="pdf") toc = doc.get_toc() pageMap = [] for i, page in enumerate(doc): blocks = page.get_text("dict")["blocks"] pageMap.append({ "pageIndex": i, "headings": [h for h in blocks if _isHeading(h)], "hasImages": any(b["type"] == 1 for b in blocks), "textLength": len(page.get_text()), "hasTable": _detectTableHeuristic(page) }) sections = _buildSectionsFromTocOrHeadings(toc, pageMap) return FileContentIndex(id=fileId, structure={"toc": toc, "sections": sections, "pageMap": pageMap}) ``` **Stufe 2 -- On-Demand Extraction:** Agent liest nur die Seiten, die er braucht. Ergebnisse werden im Knowledge Store gecacht. ```python async def readSection(fileId, sectionId): cached = await knowledgeStore.getCachedSection(fileId, sectionId) if cached: return cached section = _findSection(docIndex, sectionId) parts = _extractPages(fileData, section["startPage"], section["endPage"]) await knowledgeStore.cacheExtraction(fileId, sectionId, parts) return _formatSectionResult(parts) ``` **Stufe 3 -- Deep Analysis (nur bei explizitem Bedarf):** Vision AI für Bilder, Tabellen-Extraktion, OCR -- nur für einzelne angeforderte Elemente. ### 5.3 Kostenvergleich | Szenario | Alt (Pipeline) | Neu (Smart) | |----------|----------------|-------------| | "Fasse Kapitel 3 zusammen" (8/500 Seiten) | 500 Seiten, 500+ AI-Calls | Pre-Scan + 8 Seiten + 1 AI-Call | | "Was steht auf Seite 47?" | 500 Seiten, 500+ AI-Calls | Pre-Scan + 1 Seite + 1 AI-Call | | Zweite Frage zum selben Dokument | Erneut 500 Seiten | Aus Knowledge Store, 0 Extraktion | | Gesamtanalyse | 500+ AI-Calls | ~21 AI-Calls (20 Section-Summaries + 1 Meta-Summary) | ### 5.4 Container- und Content-Modell Das System unterscheidet klar zwischen **physischen Strukturen** (Container, Files) und **logischen Content-Objekten** (Text, Bild, Audio, Video). Die gesamte Extraktion bis zu den Content-Objekten erfolgt **ohne AI**. #### Physische Schicht: Container-Hierarchie Container sind verschachtelt (n,m : 0..i): ```mermaid graph TD ZIP["ZIP (Container)"] FA["folder-a/ (Container)"] FB["folder-b/ (Container)"] PDF["report.pdf (File → Typ-Extractor)"] IMG["photo.png (File → Typ-Extractor)"] DOCX["vertrag.docx (File → Typ-Extractor)"] CSV["data.csv (File → Typ-Extractor)"] ZIP --> FA ZIP --> FB FA --> PDF FA --> IMG FB --> DOCX FB --> CSV ``` | Ebene | Regel | |-------|-------| | **ZIP / TAR / GZ** | Container → enthält n Folders + m Files | | **Folder** | Container → enthält n Folders + m Files | | **File** | Hat einen bestimmten Typ → wird vom passenden Extractor verarbeitet | **File-Typen und ihre Container-Eigenschaft:** | File-Typ | Container? | Struktur | |----------|-----------|----------| | PDF | Ja | Seiten als Kontext-Referenz, jede Seite enthält Content-Objekte | | DOCX | Ja | Abschnitte/Paragraphen als Kontext-Referenz (Seiten nur als Referenz, nicht als Container) | | PPTX | Ja | Slides als Kontext-Referenz, jeder Slide enthält Content-Objekte | | XLSX | Ja | Sheets als Kontext-Referenz, jedes Sheet enthält Content-Objekte | | PNG/JPG/GIF | Nein | File selbst ist ein einzelnes Content-Objekt (image) | | MP4/WebM | Nein | File selbst ist ein einzelnes Content-Objekt (videostream) | | MP3/WAV | Nein | File selbst ist ein einzelnes Content-Objekt (audiostream) | | TXT/CSV/JSON/XML | Nein | File selbst ist ein einzelnes Content-Objekt (text) | #### Logische Schicht: Content-Objekte Jeder Extractor liefert **skalare Content-Objekte** mit Kontext-Referenz zum Ursprung im File. Es gibt genau fünf Content-Typen: | ContentType | Beschreibung | Beispiel | |-------------|-------------|----------| | `text` | Textinhalt (Fliesstext, Tabelle als Markdown, Code) | Paragraph auf Seite 3, Tabelle in Sheet "Q4" | | `image` | Einzelbild (als Base64 oder Referenz) | Bild auf Seite 5 unten links, Caption "Übersicht" | | `videostream` | Video-Inhalt | Eingebettetes Video in PPTX Slide 7 | | `audiostream` | Audio-Inhalt | Voice-Memo Attachment | | `other` | Nicht klassifizierbar (Binary, Formeln, etc.) | Eingebettetes OLE-Objekt | **Content-Objekt Datenmodell:** ```python class ContentObject(BaseModel): id: str fileId: str # FK zum physischen File contentType: str # text, image, videostream, audiostream, other data: str # Inhalt (Text, Base64, URL) contextRef: ContentContextRef # Kontext-Referenz zum Ursprung metadata: Dict[str, Any] = {} # Extractor-spezifische Metadaten sequence: int = 0 # Reihenfolge innerhalb des Kontexts class ContentContextRef(BaseModel): """Referenz zum Kontext im Container/File.""" containerPath: str # z.B. "archiv.zip/folder-a/report.pdf" location: str # z.B. "page:5/region:bottomLeft" label: Optional[str] = None # z.B. "Abbildung 3: Übersicht" pageIndex: Optional[int] = None # Seiten-Nummer (wenn relevant) sectionId: Optional[str] = None # Abschnitt/Heading (wenn relevant) sheetName: Optional[str] = None # Sheet-Name (XLSX) slideIndex: Optional[int] = None # Slide-Nummer (PPTX) ``` #### Extraktions-Pipeline (komplett ohne AI) ```mermaid graph LR Input["ZIP / Folder / File"] subgraph phase1 ["Phase 1: Container auflösen"] Unpack["Entpacken\nrekursiv"] FileList["File-Liste\nmit Typen"] end subgraph phase2 ["Phase 2: Content extrahieren"] Ext["Typ-Extractor\npro File"] CO["Content-Objekte\nmit ContextRef"] end subgraph phase3 ["Phase 3: Indexierung"] Idx["ContentIndex\npro File"] KS["Knowledge Store"] end Input --> phase1 phase1 --> phase2 phase2 --> phase3 ``` ```python async def _extractFileContents(fileId: str, fileData: bytes, mimeType: str) -> FileContentIndex: """Extrahiert alle Content-Objekte eines Files. Kein AI-Call.""" extractor = extractorRegistry.resolve(mimeType) contentObjects = await extractor.extract(fileData) contentIndex = FileContentIndex( fileId=fileId, totalObjects=len(contentObjects), structure=_buildStructureMap(contentObjects), objects=contentObjects ) await knowledgeStore.upsertContentIndex(contentIndex) return contentIndex ``` **Rekursive Container-Auflösung:** ```python MAX_TOTAL_EXTRACTED_SIZE = 500 * 1024 * 1024 # 500 MB MAX_FILE_COUNT = 10000 async def _resolveContainerRecursive(containerData, containerPath="", depth=0, maxDepth=5, _state=None): """Löst Container rekursiv auf bis zu den Files. Kein AI-Call. Schutz vor ZIP-Bombs: maxDepth, maxTotalExtractedSize, maxFileCount, Symlink-Blockierung.""" if _state is None: _state = {"totalSize": 0, "fileCount": 0} allFiles = [] entries = _listEntries(containerData, blockSymlinks=True) for entry in entries: entryPath = f"{containerPath}/{entry.name}" if containerPath else entry.name if entry.isContainer: # Folder oder verschachteltes ZIP childData = _extractEntry(containerData, entry) childFiles = await _resolveContainerRecursive( childData, entryPath, depth + 1, maxDepth ) allFiles.extend(childFiles) else: _state["totalSize"] += entry.size _state["fileCount"] += 1 if _state["totalSize"] > MAX_TOTAL_EXTRACTED_SIZE: raise ContainerLimitError(f"Total extracted size exceeds {MAX_TOTAL_EXTRACTED_SIZE}") if _state["fileCount"] > MAX_FILE_COUNT: raise ContainerLimitError(f"File count exceeds {MAX_FILE_COUNT}") allFiles.append(FileEntry( path=entryPath, data=_extractEntry(containerData, entry), mimeType=_detectMimeType(entry.name), size=entry.size )) return allFiles ``` #### File Content Index (Metadaten pro File) Jedes File erhält einen `FileContentIndex` als Metadaten (kanonische Definition siehe Abschnitt 4.4). Zusätzlich enthält jeder Index `ContentObjectSummary`-Einträge: ```python class ContentObjectSummary(BaseModel): """Kompakte Beschreibung eines Content-Objekts im Index.""" id: str contentType: str # text, image, videostream, audiostream, other contextRef: ContentContextRef charCount: Optional[int] = None # Nur für text dimensions: Optional[str] = None # Nur für image/video (z.B. "1920x1080") duration: Optional[float] = None # Nur für audio/video (Sekunden) ``` Beispiel für einen PDF-ContentIndex: ``` report.pdf → FileContentIndex: totalObjects: 47 structure: {pages: 12, images: 8, tables: 5, textBlocks: 34} objectSummary: - {id: "co-1", type: "text", contextRef: {page: 1, location: "header"}, charCount: 245} - {id: "co-2", type: "text", contextRef: {page: 1, location: "body"}, charCount: 1830} - {id: "co-3", type: "image", contextRef: {page: 1, location: "bottom", label: "Abb. 1"}, dimensions: "800x600"} - ... ``` #### AI arbeitet auf skalaren Content-Objekten Nach der Extraktion hat der Agent nur noch skalare Content-Objekte vor sich. AI-Calls werden **gezielt** auf einzelne Objekte angewendet: ``` Agent: "Analysiere Kapitel 3 des Reports" 1. browseContainer("report.pdf") → ContentIndex (47 Objekte, Struktur) 2. Agent sieht: Kapitel 3 = Seiten 5-8, 12 Content-Objekte 3. readContentObjects(fileId, filter={pageIndex: [5,6,7,8]}) → 12 skalare Objekte 4. AI-Call mit den 12 Objekten als Kontext → Analyse ``` #### Container-Tools für den Agent | Tool | Funktion | |------|----------| | `browseContainer` | Zeigt ContentIndex / Baumstruktur eines Containers (ZIP, Folder, PDF, etc.) | | `readContentObjects` | Liest spezifische Content-Objekte nach Filter (Seite, Typ, Section) | | `extractContainerItem` | Extrahiert on demand ein Element, das noch nicht im Knowledge Store ist | **Prompt-Referenzen:** Im Unified UI können Container direkt referenziert werden: - `@archiv.zip` -- Agent sieht Baumstruktur, extrahiert gezielt - `@projektordner/` -- Agent browst Ordnerinhalt - `@report.pdf` -- Agent sieht ContentIndex mit allen Content-Objekten --- ## 6. Document Tools Tools für den Agent -- arbeiten auf der physischen (Container) und logischen (Content-Objekte) Schicht: **Container-Tools (physische Schicht):** | Tool | Funktion | Intern | |------|----------|--------| | `browseContainer` | Baumstruktur + ContentIndex eines Containers anzeigen | Container-Auflösung + FileContentIndex | | `extractContainerItem` | Spezifisches File aus Container on demand extrahieren | Rekursive Pipeline → Content-Objekte → Knowledge Store | **Content-Tools (logische Schicht, skalare Objekte):** | Tool | Funktion | Intern | |------|----------|--------| | `readContentObjects` | Content-Objekte nach Filter lesen (Seite, Typ, Section) | Knowledge Store / on-demand Extraktion | | `summarizeContent` | AI-basierte Zusammenfassung von Content-Objekten | serviceAi.callAi() mit Content-Objekten als Input | | `analyzeContent` | AI-basierte Analyse mit Prompt über Content-Objekte | serviceAi.callAi() | | `generateDocument` | DOCX/PDF/CSV erzeugen | Bestehende Renderer | | `editDocument` | Bestehendes Dokument ändern | Neue Logik | --- ## 7. File Management ### 7.1 FileItem erweitern ```python class FileItem(BaseModel): # bestehend: id, mandateId, featureInstanceId, fileName, mimeType, fileHash, fileSize, creationDate # NEU: tags: List[str] = [] folderId: Optional[str] = None description: Optional[str] = "" status: str = "active" # active, archived, processing ``` ### 7.2 FileFolder (neu) ```python class FileFolder(BaseModel): id: str parentId: Optional[str] = None name: str featureInstanceId: str mandateId: str createdBy: str autoRule: Optional[str] = None # z.B. "tag:financial" ``` ### 7.3 File Tools | Tool | Funktion | |------|----------| | `listFiles` | Auflisten mit Filter (tags, folder, mimeType, pattern) | | `readFile` | Inhalt lesen | | `writeFile` | Erstellen oder überschreiben | | `moveFile` | In Ordner verschieben | | `tagFile` | Tags hinzufügen/entfernen | | `searchFiles` | Volltextsuche über Inhalte | ### 7.4 File-Referenzen im Prompt - `@filename.pdf` -- Autocomplete-Referenz - Drag-and-Drop Upload -- automatisch als File gespeichert + indexiert - Agent erhält `fileIds` und arbeitet via Tools --- ## 8. Data Containers (externe Datenquellen) ### 8.1 Konzept User kann externe Datenquellen aus UserConnections (OAuth: Microsoft, Google) im Chat referenzieren. Konfiguration pro FeatureInstance. ### 8.2 DataSource (neu) ```python class DataSource(BaseModel): id: str connectionId: str # FK zu UserConnection sourceType: str # sharepointFolder, googleDriveFolder, outlookFolder, ftpFolder path: str # z.B. "/sites/MySite/Documents/Reports" label: str featureInstanceId: Optional[str] autoSync: bool = False lastSynced: Optional[float] ``` ### 8.3 Provider-Connector-Architektur (1:n) **Kernprinzip:** Ein Connector ist pro **Provider** (Lieferant), nicht pro Service. Eine MSFT-Connection bedient SharePoint, Outlook, Teams etc. gleichzeitig. Die Beziehung ist 1 Provider : n Services. **IST-Zustand:** SharePoint und Outlook haben je eigene Services, Connection-Helper und Method-Actions, obwohl beide über dieselbe MSFT-Connection laufen. Es gibt keine generische Abstraktion. Google Drive, FTP etc. sind nicht implementiert. **Lösung: ProviderConnector + ServiceAdapter** ```mermaid graph TB AgentLoop["Agent Loop"] GenTool["Generische Connection-Tools"] subgraph providers ["ProviderConnector (1 pro Lieferant)"] MSFT["MsftConnector\n1 Connection → n Services"] Google["GoogleConnector\n1 Connection → n Services"] FtpProv["FtpConnector\n1 Connection → 1 Service"] end subgraph msftServices ["MSFT Services (über 1 Token)"] SP["SharePoint\nFiles, Sites"] OL["Outlook\nMail, Calendar"] Teams["Teams\nMessages, Channels"] OneDrive["OneDrive\nFiles"] end subgraph googleServices ["Google Services (über 1 Token)"] GDrive["Google Drive\nFiles"] GMail["Gmail\nMail"] end AgentLoop --> GenTool GenTool --> providers MSFT --> msftServices Google --> googleServices ``` ```python class ProviderConnector(ABC): """Ein Connector pro Provider. Verwaltet eine UserConnection + Token. Bietet Zugriff auf n Services des Providers.""" def __init__(self, connection: UserConnection, accessToken: str): self.connection = connection self.accessToken = accessToken @abstractmethod def getAvailableServices(self) -> List[str]: """Welche Services bietet dieser Provider?""" @abstractmethod def getServiceAdapter(self, service: str) -> "ServiceAdapter": """Gibt den ServiceAdapter für einen bestimmten Service zurück.""" class ServiceAdapter(ABC): """Standardisierte Operationen pro Service eines Providers.""" @abstractmethod async def browse(self, path: str, filter: str = None) -> List[ExternalEntry]: ... @abstractmethod async def download(self, path: str) -> bytes: ... @abstractmethod async def upload(self, path: str, data: bytes, fileName: str) -> ExternalEntry: ... @abstractmethod async def search(self, query: str, path: str = None) -> List[ExternalEntry]: ... ``` **Konkrete Provider und ihre Services:** | ProviderConnector | Authority | Services | Status | |-------------------|-----------|----------|--------| | `MsftConnector` | MSFT | `sharepoint` (Files, Sites), `outlook` (Mail, Calendar), `teams` (Messages), `onedrive` (Files) | Migration bestehender Services | | `GoogleConnector` | GOOGLE | `drive` (Files), `gmail` (Mail) | Neu | | `FtpConnector` | LOCAL | `files` (FTP/SFTP) | Neu | | `JiraConnector` | LOCAL | `tickets` (Issues, Boards) | Migration bestehender Jira-Actions | **ConnectorResolver:** ```python class ConnectorResolver: _providerRegistry = { "msft": MsftConnector, "google": GoogleConnector, "local:ftp": FtpConnector, "local:jira": JiraConnector, } async def resolve(self, connectionId: str) -> ProviderConnector: """Löst connectionId → Provider-Connector mit frischem Token auf.""" connection = await _getUserConnection(connectionId) token = await _getFreshToken(connectionId) providerClass = self._providerRegistry[connection.authority] return providerClass(connection, token.tokenAccess) async def resolveService(self, connectionId: str, service: str) -> ServiceAdapter: """Löst connectionId + service → konkreten ServiceAdapter auf.""" provider = await self.resolve(connectionId) return provider.getServiceAdapter(service) ``` **Beispiel: MsftConnector (1 Connection → n Services):** ```python class MsftConnector(ProviderConnector): def getAvailableServices(self) -> List[str]: return ["sharepoint", "outlook", "teams", "onedrive"] def getServiceAdapter(self, service: str) -> ServiceAdapter: adapters = { "sharepoint": SharepointAdapter(self.accessToken), "outlook": OutlookAdapter(self.accessToken), "teams": TeamsAdapter(self.accessToken), "onedrive": OneDriveAdapter(self.accessToken), } return adapters[service] ``` ### 8.4 Connection Tools (generisch) Ein **generisches Tool-Set** -- der Agent gibt `connectionId` + `service` an, der Rest läuft über den Provider: | Tool | Funktion | Intern | |------|----------|--------| | `listConnections` | Verfügbare Connections + ihre Services anzeigen | UserConnections + `getAvailableServices()` | | `externalBrowse` | Ordner/Files einer externen Quelle listen | `resolveService(connId, service).browse(path)` | | `externalDownload` | Datei herunterladen → lokales File + Auto-Index | `adapter.download(path)` → `saveUploadedFile` → Knowledge Store | | `externalUpload` | Lokales File in externe Quelle hochladen | `adapter.upload(path, data)` | | `externalSearch` | In externer Quelle suchen | `adapter.search(query)` | | `sendMail` | E-Mail senden (Mail-spezifische Parameter) | `resolveService(connId, "outlook").send()` | Die bestehenden `MethodSharepoint`- und `MethodOutlook`-Actions werden schrittweise hinter `MsftConnector` → `SharepointAdapter` / `OutlookAdapter` migriert. Während der Migration fungiert der `ActionToolAdapter` (siehe Abschnitt 10.3) als Brücke. ### 8.5 Ablauf ``` User: "Analysiere die letzten 3 Reports aus SharePoint/Reports" Agent → externalBrowse(connectionId="msft-123", service="sharepoint", path="/Reports", filter="*.pdf") Agent → externalDownload(connectionId="msft-123", service="sharepoint", path="/Reports/Q4-Report.pdf") → File wird lokal gespeichert + automatisch im Knowledge Store indexiert Agent → readContentObjects(fileId="abc123", filter={contentType: "text"}) → Bereits indexiert, Content-Objekte aus Knowledge Store Agent → Antwortet mit Analyse ``` Gleicher Provider, anderer Service: ``` User: "Sende die Analyse per Mail an das Team" Agent → sendMail(connectionId="msft-123", to=["team@firma.ch"], subject="Q4 Analyse", body="...") → Selbe MSFT-Connection, Service "outlook" ``` Anderer Provider, identisches Tool-Interface: ``` User: "Lade das Budget aus meinem Google Drive" Agent → externalBrowse(connectionId="google-456", service="drive", path="/Finanzen", filter="Budget*") Agent → externalDownload(connectionId="google-456", service="drive", path="/Finanzen/Budget-2026.xlsx") → Identischer Flow: lokales File + Knowledge Store ``` --- ## 9. Unified Workspace UI ### 9.1 Kernidee Der Codeeditor hat die beste UI-Architektur (SSE Streaming, Agent-Progress, File-Management). Dieses UI wird zur Grundlage für alle AI-Interaktionen. Chatbot und Playground gehen darin auf. ### 9.2 Layout ```mermaid graph LR subgraph unified["Unified AI Workspace"] subgraph left["Linke Sidebar"] ConvList["Conversation List"] FileTree["File Browser\nFolders + Tags"] DataSrc["Data Sources\nSharePoint, Drive"] end subgraph center["Hauptbereich"] ChatStream["Chat / Streaming\nMarkdown + Code + Edits\nTabellen + Charts"] end subgraph right["Rechte Sidebar"] FilePreview["File Preview / Editor"] ToolActivity["Tool Activity Log"] end subgraph bottom["Input-Bereich"] PromptInput["Text + Voice"] AttachBar["@file + Upload + DataSources"] ActionBar["Send / Stop / Voice"] end end ``` ### 9.3 UI-Komponenten **Linke Sidebar:** - **Conversation List:** Alle Workflows/Chats, sortiert nach Datum - **File Browser:** Lokale Files mit Ordnerstruktur und Tags, Upload per Drag-and-Drop - **Data Sources:** Konfigurierte externe Quellen (SharePoint, Drive) pro FeatureInstance **Hauptbereich:** - **Chat Stream:** SSE-basiertes Streaming mit Token-Chunks, Markdown-Rendering, Code-Blöcke, File-Edit-Proposals, Tabellen, Charts **Rechte Sidebar (kontextabhängig):** - **File Preview / Editor:** Vorschau oder Bearbeitung ausgewählter Files - **Tool Activity Log:** Zeigt in Echtzeit, welche Tools der Agent nutzt **Input-Bereich:** - Text-Eingabe mit `@file`-Autocomplete - Voice-Toggle (Mikrofon-Button) - Attachment-Bar mit aktiven Files und DataSources - Send / Stop / Voice-Buttons ### 9.4 SSE Event-Typen Bestehend (vom Codeeditor übernommen): | Event | Funktion | |-------|----------| | `message` | Text-Nachricht (user/assistant) | | `status` | Progress-Label | | `fileEditProposal` | Datei-Änderungsvorschlag | | `fileVersion` | Akzeptierte Änderung | | `agentProgress` | Round/Tool-Call Fortschritt | | `agentSummary` | Abschluss-Statistik | | `complete` / `stopped` / `error` | Workflow-Ende | Neu: | Event | Funktion | |-------|----------| | `chunk` | Token-Streaming für Echtzeit-Textausgabe | | `toolCall` | Agent ruft Tool auf (für Activity Log) | | `toolResult` | Tool-Ergebnis (für Activity Log) | | `fileCreated` | Neues File erstellt | | `dataSourceAccess` | Zugriff auf externe Quelle | | `voiceResponse` | TTS Audio-Daten | ### 9.5 Voice Integration - **STT:** Browser Web Speech API oder Whisper API. Mikrofon-Button im Input-Bereich. Transkribierter Text wird als Prompt gesendet. - **TTS:** Agent-Antworten optional vorlesen. Nutzt bestehende `interfaceVoiceObjects` oder Browser TTS. - **Voice-Toggle:** Input via Mikrofon, Output via TTS. ``` POST /api/workspace/{instanceId}/voice/transcribe → { "text": "..." } POST /api/workspace/{instanceId}/voice/synthesize → audio blob ``` ### 9.6 Prompt Input Request ```python class WorkspaceInputRequest(BaseModel): prompt: str fileIds: List[str] = [] uploadedFiles: List[UploadFile] = [] dataSourceIds: List[str] = [] voiceMode: bool = False workflowId: Optional[str] = None userLanguage: str = "en" ``` --- ## 10. Feature Integration ### 10.1 Unified Workspace ersetzt drei Features Chatbot, Codeeditor und Playground migrieren auf `serviceAgent` + Unified UI. Die Unterscheidung erfolgt über die Tool-Konfiguration pro FeatureInstance: | Feature (bisher) | Tool-Set | |-------------------|----------| | Chatbot (SQL) | `core` + `sqlQuery` + `webSearch` | | Chatbot (Tavily) | `core` + `webSearch` | | Codeeditor | `core` + `readFile` + `writeFile` + `applyFileEdit` + `searchFiles` | | Playground | `core` + `webSearch` + `sendMail` | | Trustee | `core` + `connectionTools` + `documentAnalysis` | `core` = readFile, writeFile, listFiles, extractContent, summarizeContent, generateDocument ### 10.2 serviceAi als Low-Level-Layer `serviceAi` bleibt bestehen für: - Modell-Auswahl und Fallback - Billing pro Call - Provider-Routing (OpenAI, Anthropic, PrivateLLM) - Native function calling Support Die Orchestrierungs-Logik (Structure Filling, Looping, JSON Repair) wandert in den Agent. ### 10.3 Method/Action-Integration (bestehende Workflow-Actions als Tools) **IST-Zustand:** Das Workflow-System hat **7 Methods mit 36 Actions**, registriert via `methodDiscovery.py`. Jede Action hat `actionId`, `description`, `parameters` (Schema) und `execute` (async). 22 davon haben `dynamicMode=True` und werden bereits dynamisch von AI ausgewählt. Die Ausführung läuft über `actionExecutor.executeAction(method, action, params)`. | Kategorie | Methods | Actions | dynamicMode | |-----------|---------|---------|-------------| | **AI** | ai | process, webResearch, summarizeDocument, translateDocument, convertDocument, generateDocument, generateCode | Ja | | **External** | sharepoint, outlook | 9 SharePoint + 4 Outlook Actions | Ja | | **Domain** | context, trustee | getDocumentIndex, extractContent, neutralizeData, extractFromFiles, processDocuments, syncToAccounting | Teilweise | | **Integration** | jira, chatbot | 8 Jira + 1 Chatbot Actions | Nein/Teilweise | **Lösung: 3-Stufen-Migration** ```mermaid graph LR subgraph stufe1 ["Stufe 1: Automatisches Wrapping"] Actions["Bestehende Actions\n36 Actions"] Adapter["ActionToolAdapter"] ToolReg["Tool Registry"] end subgraph stufe2 ["Stufe 2: Refactoring"] NativeTools["Native Agent Tools"] ConnTools["ProviderConnector Tools"] end subgraph stufe3 ["Stufe 3: Abloesung"] AgentNative["Agent-native Logik\nersetzt AI-Actions"] end Actions --> Adapter Adapter --> ToolReg ToolReg --> NativeTools NativeTools --> ConnTools ConnTools --> AgentNative ``` **Stufe 1 -- Automatisches Wrapping (sofort, kein Rewrite):** Der `ActionToolAdapter` wandelt jede Action mit `dynamicMode=True` automatisch in ein Agent-Tool um. Kein Code-Rewrite nötig: ```python class ActionToolAdapter: """Wrapped bestehende Workflow-Actions als Agent-Tools.""" def _buildToolsFromActions(self) -> List[ToolDefinition]: tools = [] for methodName, method in methods.items(): for actionName, action in method["actions"].items(): if not action.get("dynamicMode"): continue tools.append(ToolDefinition( name=f"{methodName}.{actionName}", description=action["description"], parameters=_convertParameterSchema(action["parameters"]) )) return tools async def dispatch(self, toolName: str, args: Dict) -> ToolResult: methodName, actionName = toolName.split(".", 1) result = await actionExecutor.executeAction(methodName, actionName, args) return ToolResult( success=result.success, data=_formatActionResult(result), error=result.error ) ``` Vorteil: Alle 22 dynamischen Actions sofort als Agent-Tools verfügbar. **Stufe 2 -- Schrittweises Refactoring:** - **External-Actions** (SharePoint, Outlook, Jira) → hinter `ProviderConnector` + `ServiceAdapter` migrieren → generische `externalBrowse/Download/Upload/Search` Tools (siehe Abschnitt 8.3) - **Domain-Actions** (Trustee, Context) → bleiben als spezialisierte Tools, werden direkt in der `toolRegistry` registriert statt via Adapter **Stufe 3 -- AI-Actions ablösen:** - `ai.process`, `ai.generateDocument`, `ai.generateCode` etc. werden NICHT als Tools exponiert - Diese Fähigkeiten sind im Agent selbst (der Agent IST die AI) -- er nutzt direkt `extractContent`, `generateDocument`, `readSection` etc. - `subStructureFilling`, `subStructureGeneration`, `subAiCallLooping` entfallen **Kein kompletter Rewrite nötig:** Die bestehenden Methods/Actions bleiben initial funktionsfähig (Chatplayground, Automation nutzen sie weiter). Der Agent nutzt sie via Adapter. Schrittweise Migration ohne Breaking Changes. --- ## 11. Background Workflows ### 11.1 Persistente Workflow-Queue Background-Workflows dürfen nicht auf `asyncio.create_task()` basieren (Server-Restart = Tasks verloren). Stattdessen eine **DB-basierte Queue** auf bestehendem PostgreSQL: ```python class BackgroundJob(BaseModel): """Persistierter Background-Job in PostgreSQL.""" id: str workflowId: str userId: str featureInstanceId: str status: str # queued, running, completed, failed, cancelled description: str subtasks: List[Dict[str, Any]] maxCostCHF: Optional[float] # Billing-Cap maxConcurrency: int = 1 retryCount: int = 0 maxRetries: int = 3 createdAt: float startedAt: Optional[float] completedAt: Optional[float] error: Optional[str] progress: float = 0.0 resultSummary: Optional[str] ``` **Worker-Prozess:** Ein Background-Worker pollt die Queue und führt Jobs aus: ```python async def _processBackgroundQueue(): while True: job = await backgroundQueue.dequeueNext() if not job: await asyncio.sleep(2) continue try: await backgroundQueue.updateStatus(job.id, "running") async for event in runAgent(job.subtasks, job.workflowId, ...): await backgroundQueue.updateProgress(job.id, event) # Billing-Cap prüfen if job.maxCostCHF and await billingService.getWorkflowCost(job.workflowId) > job.maxCostCHF: await backgroundQueue.updateStatus(job.id, "cancelled", error="budgetExceeded") await _notifyUser(job.userId, f"Background-Job gestoppt: Budget überschritten") break await backgroundQueue.updateStatus(job.id, "completed") except Exception as e: if job.retryCount < job.maxRetries: await backgroundQueue.requeueWithRetry(job.id) else: await backgroundQueue.updateStatus(job.id, "failed", error=str(e)) await _notifyUser(job.userId, f"Background-Job fehlgeschlagen: {e}") ``` ### 11.2 Tools ```python # Agent-Tool: Background-Job starten async def startBackground(description, subtasks, maxCostCHF=None): job = await backgroundQueue.enqueue(BackgroundJob( workflowId=workflowId, description=description, subtasks=subtasks, maxCostCHF=maxCostCHF, ... )) return f"Background-Job {job.id} gestartet." # Agent-Tool: Job-Status abfragen async def checkBackgroundStatus(jobId): job = await backgroundQueue.getJob(jobId) return {"status": job.status, "progress": job.progress, "result": job.resultSummary} ``` ### 11.3 Komplexe Ausgaben Eine Agent-Antwort kann mehrere Ausgabe-Typen kombinieren: - Text (Markdown) mit Token-Streaming - Generierte/bearbeitete Files als Attachments - Gestartete Hintergrund-Jobs mit Progress-Tracking - Tabellen, Charts - Voice-Ausgabe - Externe Aktionen (Mail, SharePoint-Upload) --- ## 12. Observability und Monitoring ### 12.1 Agent-Tracing Jeder Agent-Workflow wird vollständig protokolliert. Ein `AgentTrace` aggregiert alle Rounds, Tool-Calls und Kosten: ```python class AgentTrace(BaseModel): """Vollständiges Protokoll eines Agent-Workflows.""" workflowId: str userId: str featureInstanceId: str startedAt: float completedAt: Optional[float] status: str # running, completed, maxRoundsReached, budgetExceeded, error totalRounds: int totalToolCalls: int totalCostCHF: float abortReason: Optional[str] rounds: List[AgentRoundLog] class AgentRoundLog(BaseModel): """Log eines einzelnen Agent-Rounds.""" roundNumber: int aiModel: str # Welches Modell wurde genutzt inputTokens: int outputTokens: int costCHF: float toolCalls: List[ToolCallLog] # Welche Tools mit welchen Args/Results durationMs: int class ToolCallLog(BaseModel): """Log eines einzelnen Tool-Calls.""" toolName: str args: Dict[str, Any] success: bool durationMs: int error: Optional[str] ``` ### 12.2 Kosten-Transparenz Die bestehende `BillingTransaction` erfasst einzelne AI-Calls. Zusätzlich wird pro Workflow eine **Kosten-Aggregation** geführt: | Posten | Quelle | Tracking | |--------|--------|----------| | Agent-Rounds (AI-Calls) | `serviceAi.callAi()` → `billingCallback` | Automatisch via bestehende Billing-Integration | | Entity Cache (Opt-in) | `_extractEntities()` → `serviceAi.callAi()` | Eigener `description`-Typ in BillingTransaction | | Progressive Summaries | Summary-Calls → `serviceAi.callAi()` | Eigener `description`-Typ in BillingTransaction | | Embeddings | `embeddingService.embed()` | Neuer Billing-Posten | ### 12.3 Monitoring Metriken pro FeatureInstance / Mandate: - Agent-Workflows pro Zeitraum (Anzahl, Durchschnittsdauer, Kosten) - Tool-Nutzung (welche Tools, Erfolgsrate, Durchschnittsdauer) - Budget-Auslastung (Kosten vs. maxCostCHF) - Knowledge Store (Anzahl Files, Chunks, Embedding-Grösse) - Abbruchgründe (maxRounds, Budget, Fehler) --- ## 13. Umsetzungsplan ```mermaid graph LR P1["Phase 1\nAgent Core\n+ ActionToolAdapter"] P2["Phase 2\nKnowledge Store"] P3["Phase 3\nSmart Documents\n+ Container Handling"] P4["Phase 4\nDocument Tools"] P5["Phase 5\nFile Management"] P6["Phase 6\nData Containers\n+ ProviderConnectors"] P7["Phase 7\nUnified UI"] P8["Phase 8\nFeature Migration\n+ Action Refactoring"] P9["Phase 9\nBackground Workflows"] P1 --> P2 P1 --> P5 P2 --> P3 P3 --> P4 P5 --> P6 P4 --> P8 P6 --> P8 P7 --> P8 P8 --> P9 ``` | Phase | Inhalt | Abhängigkeit | |-------|--------|-------------| | **1** | Agent Core: Loop, ToolRegistry, ConversationManager, AiCallRequest erweitern. **ActionToolAdapter**: bestehende 22 dynamicMode-Actions automatisch als Agent-Tools wrappen. serviceAgent als IMPORTABLE_SERVICE registrieren | Grundlage | | **2** | Knowledge Store: pgvector, 3-Ebenen, Auto-Index bei Upload, RAG-Retrieval | Phase 1 | | **3** | Smart Documents: Pre-Scan, On-Demand Extraction, Progressive Summarization. **Container Handling**: ContainerExtractor (ZIP/TAR), FolderExtractor, rekursive Extraktion, Lazy Loading | Phase 2 | | **4** | Document Tools: extractContent, readSection, browseContainer, extractContainerItem mit Knowledge-Store-Integration | Phase 3 | | **5** | File Management: Tags, Folders, File Tools. Container-Referenzen im Prompt (@archiv.zip, @ordner/) | Phase 1 (parallel zu 2-4) | | **6** | Data Containers: DataSource, **ProviderConnector-Architektur** (1 Provider : n Services), ConnectorResolver, generische externalBrowse/Download/Upload/Search Tools. MsftConnector, GoogleConnector, FtpConnector | Phase 5 | | **7** | Unified UI: Codeeditor-Basis, Streaming, Voice, Panels | Parallel zu 2-6 | | **8** | Feature Integration: Chatbot + Codeeditor + Playground migrieren. **Action Refactoring**: External-Actions → ProviderConnector + ServiceAdapter, Domain-Actions → native Tools, AI-Actions entfallen (Agent ist die AI) | Phase 4 + 6 + 7 | | **9** | Background Workflows: DB-basierte Queue (BackgroundJob), Worker-Prozess, Retry, Billing-Cap, Observability (AgentTrace, Monitoring) | Phase 8 | --- ## 14. Was bleibt, was geht | Komponente | Status | |------------|--------| | `serviceAi.callAi()` + Modell-Fallback + Billing | Bleibt (Low-Level) | | `serviceExtraction.extractContent()` | Bleibt + erweitert (Pre-Scan, On-Demand, Container Handling) | | `aicoreModelSelector` + `aicoreModelRegistry` | Bleibt | | `serviceStreaming` / EventManager | Bleibt (zentral für UI) | | UserConnections + OAuth | Bleibt + erweitert (DataSource, ProviderConnector 1:n) | | `interfaceVoiceObjects` (STT/TTS) | Bleibt (Unified UI) | | SharePoint Graph API Service | Bleibt → wird zu `SharepointAdapter` hinter `MsftConnector` | | PostgreSQL | Bleibt + erweitert (pgvector, FileContentIndex, ContentChunk, WorkflowMemory) | | Workflow Methods/Actions (36 Actions) | Phase 1: ActionToolAdapter-Wrapping → Phase 8: schrittweise Migration | | `MethodSharepoint` (9 Actions) | Migriert → `MsftConnector` → `SharepointAdapter` + generische Connection Tools | | `MethodOutlook` (4 Actions) | Migriert → `MsftConnector` → `OutlookAdapter` + generische Connection Tools | | `MethodAi` (7 Actions: process, generate, etc.) | Abgelöst (Agent ist die AI, nutzt Document Tools direkt) | | `MethodJira` (8 Actions) | Migriert → `JiraConnector` + `JiraAdapter` | | `MethodTrustee`, `MethodContext` | Bleiben als spezialisierte Agent-Tools | | `actionExecutor` + `methodDiscovery` | Bleibt während Migration (Adapter), später optional | | `subStructureFilling` (Pipeline) | Abgelöst (Agent + Document Tools + Knowledge Store) | | `subStructureGeneration` | Abgelöst | | `subAiCallLooping` (JSON Repair) | Abgelöst (Agent iteriert natürlich) | | `trim_messages(strategy="last")` | Ersetzt durch Progressive Summarization | | Codeeditor `codeEditorProcessor` | Migriert zu serviceAgent | | Codeeditor UI (SSE, Events) | Basis für Unified UI | | Chatbot LangGraph + DatabaseCheckpointer | Abgelöst (serviceAgent + Knowledge Store) | | Chatplayground | Abgelöst (Unified UI) | | `datamodelFiles.py` FileItem | Erweitert (tags, folders, description) | | `BinaryExtractor` (ZIP Fallback) | Ersetzt durch `ContainerExtractor` (rekursive Extraktion) |