wiki/concepts/AI-Agent-Architecture-Konzept.md
2026-03-15 18:14:21 +01:00

1464 lines
57 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Konzept: AI Agent Architecture & Unified Workspace
> Poweron Platform -- Zentrales AI Handling und Editor-UI
> Status: Konzept / Version 2.0 / März 2026
---
## 1. Ausgangslage
Die Plattform hat vier isolierte Systeme für AI-Interaktionen, jedes mit eigenen Schwächen:
| System | Stärke | Schwäche |
|--------|--------|----------|
| **serviceAi** (Pipeline) | Modell-Fallback, Billing, Dokumentenextraktion | Rigide Pipeline (extract→structure→fill), N×M AI-Call-Explosion, kein Caching |
| **Codeeditor** | ReAct-Loop mit Tool-Use, Agent-Modus | Nur read-only Tools, eigenes UI, nur Codeeditor-Scope |
| **Chatbot** | LangGraph, SQL/Tavily-Integration | Kein File-Handling, keine Dokumentbearbeitung, eigenes UI |
| **Chatplayground** | Workflow-Management | Kein Streaming, kein Voice, eigenes UI |
Zusätzliche Defizite:
- **Kein Wissenstransfer:** Jeder Workflow startet bei null, selbst wenn dasselbe Dokument schon extrahiert wurde
- **File-System:** Flach ohne Ordner, Tags oder FeatureInstance-Zuordnung
- **Keine externen Datenquellen** im Chat referenzierbar (SharePoint-Ordner, Drive, etc.)
- **Grosse Dokumente:** 500-Seiten-PDFs werden blind vollständig extrahiert (500+ AI-Calls)
- **Kein Container-Handling:** ZIP-Files, Ordner und verschachtelte Dokumente werden nicht unterstützt
- **Keine generische Provider-Abstraktion:** SharePoint/Outlook je eigene Implementierung, obwohl beides MSFT-Services über eine Connection. Kein Google Drive, FTP etc.
- **36 Workflow-Actions isoliert:** Bestehende Actions (SharePoint, Outlook, Jira, AI) nicht als Agent-Tools nutzbar
---
## 2. Zielarchitektur
### 2.1 Kernidee: Agent statt Pipeline
**Bisher:** Code entscheidet starr, was passiert:
```
extractContent() → generateStructure() → fillSections() → render()
```
**Neu:** AI entscheidet dynamisch, welche Tools sie braucht:
```
User Prompt + Files → Agent Loop → Tool Calls → Result
```
### 2.2 Gesamtübersicht
```mermaid
graph TB
subgraph ui["Unified Workspace UI"]
ChatPanel["Chat / Streaming"]
FilePanel["File Browser"]
DataPanel["Data Source Picker"]
KnowledgePanel["Knowledge Browser"]
end
subgraph agentSvc["serviceAgent"]
Loop["Agent Loop\nReAct + native function calling"]
TR["Tool Registry"]
CTX["Context Manager\nRAG Retrieval"]
end
subgraph knowledge["Knowledge Store"]
SharedL["Shared Layer\nmandateId"]
InstanceL["Instance Layer\nuserId + featureInstanceId"]
WorkflowL["Workflow Layer\nworkflowId"]
end
subgraph tools["Tools"]
FileT["File Tools"]
DocT["Document Tools"]
ConnT["Connection Tools"]
ExtT["External Tools"]
WfT["Workflow Tools"]
end
subgraph infra["Bestehende Infrastruktur"]
AiSvc["serviceAi\nModell-Fallback + Billing"]
ExtrSvc["serviceExtraction"]
Stream["serviceStreaming"]
Voice["serviceVoice"]
UCon["UserConnections"]
end
ui -->|"SSE Stream"| agentSvc
CTX -->|"RAG"| knowledge
DocT -->|"Cache"| knowledge
Loop --> TR
TR --> tools
Loop --> CTX
Loop --> AiSvc
Loop --> Stream
DocT --> ExtrSvc
ConnT --> UCon
```
---
## 3. Agent Core (`serviceAgent`)
### 3.1 Aufbau
Neuer Service unter `modules/serviceCenter/services/serviceAgent/`:
| Datei | Funktion |
|-------|----------|
| `mainServiceAgent.py` | Entry Point: `runAgent(prompt, fileIds, toolSet, config)` |
| `agentLoop.py` | ReAct-Loop mit native function calling |
| `toolRegistry.py` | Tool-Registrierung und Dispatch |
| `conversationManager.py` | Context-Window-Management |
| `datamodelAgent.py` | AgentState, ToolDefinition, ToolResult, AgentConfig |
**ServiceCenter-Registry:**
```python
# In serviceCenter/registry.py
IMPORTABLE_SERVICES = {
...
"agent": {
"module": "modules.serviceCenter.services.serviceAgent.mainServiceAgent",
"class": "ServiceAgent",
"dependencies": ["ai", "chat", "extraction", "billing", "streaming"],
"objectKey": "service.agent"
}
}
```
`serviceAgent` hat die meisten transitiven Dependencies (über `ai``chat`, `utils`, `extraction`, `billing`). Die bestehende DI-Kette im ServiceCenter löst dies automatisch auf.
**AgentConfig:**
```python
class AgentConfig(BaseModel):
maxRounds: int = 25
maxCostCHF: Optional[float] = None # Workflow-Budget-Cap
entityCacheEnabled: bool = False # Opt-in Entity Extraction (Extra-AI-Call pro Round)
toolSet: str = "core" # Welches Tool-Set aktiv
operationType: str = "AGENT" # Für Modell-Auswahl
```
### 3.2 Agent Loop
```python
async def runAgent(prompt, fileIds, toolSet, config, progressCallback):
state = AgentState(maxRounds=config.maxRounds)
tools = toolRegistry.getTools(toolSet)
messages = [buildSystemPrompt(tools), {"role": "user", "content": prompt}]
while state.status == "running" and state.currentRound < state.maxRounds:
state.currentRound += 1
# Kosten-Budget prüfen
if config.maxCostCHF and await billingService.getWorkflowCost(workflowId) > config.maxCostCHF:
state.status = "budgetExceeded"
yield AgentEvent(type="final", content=_summarizeProgress(state))
break
response = await aiService.callAi(AiCallRequest(
messages=messages,
options=AiCallOptions(operationType=OperationTypeEnum.AGENT),
tools=tools
))
toolCalls = response.toolCalls
if not toolCalls:
state.status = "completed"
yield AgentEvent(type="final", content=response.content)
break
# Tool-Ausführung: sequential default, parallel nur für readOnly-Tools
results = await _executeToolCalls(toolCalls, context)
messages.extend(formatToolResults(toolCalls, results))
yield AgentEvent(type="toolResults", data=results)
# Graceful Degradation bei maxRounds
if state.currentRound >= state.maxRounds and state.status == "running":
state.status = "maxRoundsReached"
yield AgentEvent(type="final", content=_summarizeProgress(state))
async def _executeToolCalls(toolCalls, context):
"""Sequential als Default. Parallel nur für readOnly-Tools (keine Race Conditions)."""
readOnlyCalls = [tc for tc in toolCalls if toolRegistry.isReadOnly(tc.name)]
writeCalls = [tc for tc in toolCalls if not toolRegistry.isReadOnly(tc.name)]
results = {}
if readOnlyCalls:
readResults = await asyncio.gather(*[
toolRegistry.dispatch(tc.name, tc.args, context) for tc in readOnlyCalls
])
for tc, result in zip(readOnlyCalls, readResults):
results[tc.id] = result
for tc in writeCalls:
results[tc.id] = await toolRegistry.dispatch(tc.name, tc.args, context)
return [results[tc.id] for tc in toolCalls]
```
### 3.3 Design-Entscheidungen
- **Native function calling** als Standard (OpenAI tool_choice Format) -- zuverlässiger als text-basiertes Parsing
- **Text-basierter Fallback** für Modelle ohne function calling Support
- Nutzt bestehende `callAi`-Infrastruktur (Modell-Fallback, Billing, Provider-Auswahl) -- damit greifen Provider-RBAC, Model-RBAC und Billing-Checks automatisch
- `AiCallRequest` wird erweitert um `messages` und `tools` (abwärtskompatibel: bestehende `prompt`/`context`/`contentParts` bleiben)
- **Tool-Ausführung sequential** als Default -- parallel nur für explizit als `readOnly=True` markierte Tools (verhindert Race Conditions bei State-modifizierenden Tools)
- **Kosten-Budget** (`maxCostCHF`) pro Workflow -- Agent stoppt mit Fortschritts-Summary bei Überschreitung
- **Graceful Degradation** bei `maxRounds` -- Agent liefert Summary statt stiller Abbruch
### 3.4 Fehlerbehandlung
| Fehlertyp | Strategie |
|-----------|-----------|
| **Tool-Fehler** (Exception in Tool) | Fehler als ToolResult an Agent zurückgeben -- Agent entscheidet über Retry oder alternativen Ansatz |
| **Halluzinierte Tool-Namen** | Validierung gegen `toolRegistry.getTools()` -- ungültige Tools werden als Fehler-ToolResult zurückgegeben |
| **Externe API-Fehler** (Timeout, 5xx) | Retry mit Exponential Backoff (max 3 Versuche), dann Fehler-ToolResult |
| **AI-Call-Fehler** | Bestehende Fallback-Logik in `serviceAi` (Modell-Fallback, Provider-Switch) |
| **maxRounds erreicht** | Graceful Degradation: Agent liefert Summary des bisherigen Fortschritts |
| **Budget überschritten** | Agent stoppt, liefert Zusammenfassung, User wird informiert |
| **Knowledge Store nicht verfügbar** | Agent arbeitet ohne RAG-Kontext weiter (degraded mode) |
**Input-Validierung:** Tool-Argumente vom LLM sind unvalidiert. Jedes Tool validiert seine Args vor Weitergabe an darunterliegende Interfaces:
- Path-Traversal-Schutz (`readFile`, `writeFile`): Pfade auf erlaubten Scope beschränken
- SQL-Injection-Schutz (`sqlQuery`): Parametrisierte Queries
- Container-Limits: `maxTotalExtractedSize`, `maxFileCount`, Symlink-Blockierung (ZIP-Bomb-Schutz)
### 3.5 RBAC und Sicherheit
**Design-Entscheidung: Keine Tool-Level-RBAC nötig.** Die Berechtigungen greifen auf Daten- und Service-Ebene unterhalb der Tools:
| Tool | Absicherung unterhalb |
|------|----------------------|
| File-Tools (`readFile`, `writeFile`, `listFiles`) | `interfaceDbApp``getRecordsetWithRBAC()` mit WHERE nach `MY/GROUP/ALL` |
| External-Tools (`externalBrowse`, `externalDownload`) | UserConnection OAuth-Token -- User sieht nur, was sein Account erlaubt |
| AI-Calls (Agent-Loop) | `serviceAi``_checkBillingBeforeAiCall()` → Provider-RBAC + Model-RBAC |
| Service-Zugriff | `can_access_service()` prüft RESOURCE-ObjectKey |
**serviceAgent als IMPORTABLE_SERVICE:**
`serviceAgent` wird in der ServiceCenter-Registry registriert mit `objectKey = "service.agent"`. Damit greift `can_access_service()` und der Zugang zum Agent ist per Rolle steuerbar.
**Knowledge Store Shared Layer:**
Dokumente mit `isShared=True` sind mandateweit sichtbar. Neue Tabellen (`FileContentIndex`, `ContentChunk`) brauchen Einträge in `TABLE_NAMESPACE` mit DATA-ObjectKeys, damit `getRecordsetWithRBAC()` greift. Promote in Shared Layer (`isShared=True`) erfordert Access-Level `ALL`.
### 3.6 Tool Registry
```python
CORE_TOOLS = ["readFile", "writeFile", "listFiles", "searchFiles",
"extractContent", "summarizeContent", "webSearch"]
```
Jedes Tool wird mit `readOnly`-Flag registriert (steuert parallele vs. sequentielle Ausführung):
```python
toolRegistry.register("readFile", readFileTool, readOnly=True)
toolRegistry.register("writeFile", writeFileTool, readOnly=False)
toolRegistry.register("sqlQuery", sqlQueryTool, featureType="chatbot", readOnly=True)
toolRegistry.register("sharePointUpload", spTool, featureType="trustee", readOnly=False)
```
---
## 4. Knowledge Store (RAG)
### 4.1 Kernprinzip
Persistenter Wissensspeicher mit drei Ebenen. Ein File wird **einmal** extrahiert und steht danach allen Workflows zur Verfügung. Automatische Indexierung bei Upload.
### 4.2 Drei-Ebenen-Architektur
```mermaid
graph TB
subgraph scope["Knowledge Store"]
direction TB
subgraph shared["Shared Layer -- mandateId"]
GD["Globale Firmen-Dokumente\nTemplates, Shared Knowledge"]
end
subgraph instance["Instance Layer -- userId + featureInstanceId"]
FI["User-Dokumente\nFile-Embeddings\nDocument Indexes"]
end
subgraph wf["Workflow Layer -- workflowId"]
WF["Conversation Summary\nEntity Cache\nTool Results"]
end
shared --> instance
instance --> wf
end
```
| Ebene | Scope | Inhalt | Lebensdauer |
|-------|-------|--------|-------------|
| **Shared** | mandateId | Globale Dokumente, Templates | Permanent |
| **Instance** | userId + featureInstanceId | Extrahierte Dokumente, Embeddings | Permanent pro Workspace |
| **Workflow** | workflowId | Conversation Summaries, Entities, Tool-Ergebnisse | Lebt mit dem Workflow |
### 4.3 Automatische Indexierung bei Upload
Jedes hochgeladene File durchläuft eine Background-Pipeline -- **komplett ohne AI**:
```
Upload → Container auflösen (rekursiv) → Typ-Extractor pro File → Content-Objekte → Chunking → Embedding → Knowledge Store
```
```python
async def _onFileUploaded(fileId, userId, featureInstanceId):
fileData = await getFileData(fileId)
mimeType = _detectMimeType(fileData)
# 1. Container auflösen (ZIP/Folder → Files), dann pro File extrahieren
files = await _resolveToFiles(fileId, fileData, mimeType)
for file in files:
# 2. Typ-Extractor: Content-Objekte extrahieren (kein AI)
contentIndex = await _extractFileContents(file.id, file.data, file.mimeType)
# 3. Text-Content-Objekte chunken und embedden
textObjects = [o for o in contentIndex.objects if o.contentType == "text"]
chunks = _chunkForEmbedding(textObjects, chunkSize=512)
embeddings = await embeddingService.embed([c.data for c in chunks])
# 4. Im Knowledge Store persistieren (userId + featureInstanceId scoped)
for chunk, embedding in zip(chunks, embeddings):
await knowledgeStore.upsertContentChunk(ContentChunk(
contentObjectId=chunk.id, fileId=file.id,
userId=userId, featureInstanceId=featureInstanceId,
data=chunk.data, contextRef=chunk.contextRef,
embedding=embedding
))
await knowledgeStore.updateFileStatus(fileId, "indexed")
```
Ergebnis: Sekunden bis Minuten nach Upload ist das File für alle Workflows RAG-ready. Nachfolgende Aktivitäten holen den Inhalt direkt aus dem Knowledge Store.
### 4.4 Datenmodelle (PostgreSQL + pgvector)
```python
class FileContentIndex(BaseModel):
"""Index der Content-Strukturen pro File. Lebt im Instance Layer.
Wird bei Extraktion erstellt (kein AI)."""
id: str # = fileId
userId: str
featureInstanceId: str
mandateId: str
isShared: bool = False # Shared Layer Sichtbarkeit
fileName: str
mimeType: str
containerPath: Optional[str] # Pfad im Container (z.B. "archiv.zip/folder/report.pdf")
totalObjects: int # Anzahl Content-Objekte
totalSize: int
structure: Dict[str, Any] # Strukturübersicht (Pages, Sections, etc.)
objectSummary: List[Dict] # Kompakte Übersicht pro Content-Objekt
extractedAt: float
status: str # pending, extracted, embedding, indexed, failed
class ContentChunk(BaseModel):
"""Persistierter Content-Chunk. Wiederverwendbar über Workflows.
Skalares Content-Objekt (oder Chunk davon) mit Embedding."""
id: str
contentObjectId: str # FK zum Content-Objekt
fileId: str # FK zum File
userId: str
featureInstanceId: str
contentType: str # text, image, videostream, audiostream, other
data: str # Inhalt (Text, Base64, URL)
contextRef: Dict[str, Any] # Kontext-Referenz (Seite, Position, Label)
summary: Optional[str] # AI-generierte Zusammenfassung (bei Bedarf)
metadata: Dict[str, Any] = {}
embedding: List[float] # pgvector (NOT NULL für text-Chunks)
class WorkflowMemory(BaseModel):
"""Workflow-spezifischer Cache."""
id: str
workflowId: str
userId: str
featureInstanceId: str
key: str # z.B. "entity:companyName"
value: str
source: str # extraction, tool, conversation, summary
createdAt: float
embedding: Optional[List[float]]
```
### 4.5 RAG-Retrieval: 3-Ebenen-Suche
Vor jedem Agent-Round sucht der Context Manager über alle drei Ebenen:
```python
async def _buildAgentContext(self, currentPrompt, workflowId, userId,
featureInstanceId, mandateId, contextBudget):
# 1. Instance Layer: User-Dokumente (höchste Priorität)
instanceChunks = await knowledgeStore.semanticSearch(
query=currentPrompt, userId=userId,
featureInstanceId=featureInstanceId, limit=15, minScore=0.65
)
# 2. Shared Layer: Globale Firmen-Dokumente
sharedChunks = await knowledgeStore.semanticSearch(
query=currentPrompt, mandateId=mandateId,
isShared=True, limit=10, minScore=0.7
)
# 3. Workflow Layer: Aktueller Kontext
entities = await knowledgeStore.getEntities(workflowId)
summary = await knowledgeStore.getConversationSummary(workflowId)
toolResults = await knowledgeStore.getRecentToolResults(workflowId, limit=5)
context = ContextBuilder(budget=contextBudget)
context.add(priority=1, content=currentPrompt)
context.add(priority=2, content=instanceChunks)
context.add(priority=3, content=entities)
context.add(priority=4, content=sharedChunks)
context.add(priority=5, content=summary)
context.add(priority=6, content=toolResults)
return context.build()
```
### 4.6 Cross-Workflow Wissenstransfer
```
Workflow A (Montag):
User: "Analysiere Q4-Report.pdf"
→ Extraktion + Embedding im Instance Layer (200 Chunks)
→ Agent findet: Revenue = 12.5M CHF
Workflow B (Dienstag):
User: "Vergleiche Q4-Report mit Budget.xlsx"
→ Q4-Report: Bereits im Knowledge Store → 0 Extraktion
→ Budget.xlsx: Automatisch indexiert nach Upload
→ Semantische Suche "Q4 Revenue" → sofort relevanter Chunk
```
### 4.7 Progressive Summarization
Statt alte Nachrichten abzuschneiden (`trim_messages`):
```
Round 1-3: Volle Nachrichten
Round 4: Nachrichten 1-2 → Summary
Round 7: Summary(1-4) + volle 5-7
Round 15: Meta-Summary(1-10) + Summary(11-13) + volle 14-15
```
Der Agent verliert nie den Faden. Summaries werden im Workflow Layer persistiert. **Kosten:** Jede Summarization ist ein AI-Call (günstigeres Modell, z.B. `DATA_ANALYSE`). Bei einem 15-Round-Workflow: ~3 Summary-Calls. Diese Kosten fliessen ins Workflow-Budget (`maxCostCHF`) ein und erscheinen als eigener Typ in der `BillingTransaction`.
### 4.8 Entity Cache
Extraktion wichtiger Fakten nach Tool-Ergebnissen. **Opt-in per Config** (jeder Aufruf ist ein Extra-AI-Call):
```python
if config.entityCacheEnabled:
entities = await _extractEntities(response)
# {"company": "Acme AG", "revenueQ4": "12.5M CHF"}
for key, value in entities.items():
await knowledgeStore.upsertEntity(workflowId, key, value)
```
**Kosten-Kontrolle:** Entity-Extraktion ist standardmässig deaktiviert. Für häufig genutzte Entities (Firmennamen, Beträge, Daten) kann alternativ regelbasierte Extraktion (Regex/NER) ohne AI-Call eingesetzt werden.
### 4.9 Embedding-Strategie
| Aspekt | Entscheidung |
|--------|-------------|
| Embedding-Modell | OpenAI `text-embedding-3-small` (1536 dim); lokal: sentence-transformers |
| Vector Store | pgvector auf bestehendem PostgreSQL |
| Chunk-Grösse | 512 Tokens |
| Index-Typ | IVFFlat (< 100k Chunks), HNSW (> 100k) |
| Suche | Cosine Similarity, gefiltert nach userId + featureInstanceId |
---
## 5. Smart Document Handling
### 5.1 Problem
Ein 200MB PDF mit 500 Seiten: Aktuell werden alle 500 Seiten blind extrahiert, jede einzeln an AI gesendet → 500+ AI-Calls.
### 5.2 Lösung: 3-Stufen-Modell
```mermaid
graph TD
Upload["Dokument Upload"]
subgraph stage1["Stufe 1: Structure Pre-Scan"]
TOC["TOC Extraction"]
Headings["Heading Detection"]
PageMap["Page Map"]
end
subgraph stage2["Stufe 2: Selective Extraction"]
OnDemand["Nur angeforderte Seiten"]
Cache["Knowledge Store Cache"]
end
subgraph stage3["Stufe 3: Deep Analysis"]
Vision["Vision AI"]
TableEx["Tabellen-Extraktion"]
OCR["OCR"]
end
Upload --> stage1
stage1 -->|"Agent entscheidet"| stage2
stage2 -->|"Bei Bedarf"| stage3
```
**Stufe 1 -- Structure Pre-Scan (Sekunden, kein AI):**
Rein programmatisch mit PyMuPDF. Extrahiert TOC, Headings (Font-Size-Heuristik), Seitenstruktur, Bildpositionen. Ergebnis ist der `FileContentIndex` (siehe 5.4). Der Agent sieht sofort die Struktur eines 500-Seiten-PDFs.
```python
async def preScanDocument(fileId: str) -> FileContentIndex:
doc = fitz.open(stream=fileData, filetype="pdf")
toc = doc.get_toc()
pageMap = []
for i, page in enumerate(doc):
blocks = page.get_text("dict")["blocks"]
pageMap.append({
"pageIndex": i,
"headings": [h for h in blocks if _isHeading(h)],
"hasImages": any(b["type"] == 1 for b in blocks),
"textLength": len(page.get_text()),
"hasTable": _detectTableHeuristic(page)
})
sections = _buildSectionsFromTocOrHeadings(toc, pageMap)
return FileContentIndex(id=fileId, structure={"toc": toc, "sections": sections, "pageMap": pageMap})
```
**Stufe 2 -- On-Demand Extraction:**
Agent liest nur die Seiten, die er braucht. Ergebnisse werden im Knowledge Store gecacht.
```python
async def readSection(fileId, sectionId):
cached = await knowledgeStore.getCachedSection(fileId, sectionId)
if cached:
return cached
section = _findSection(docIndex, sectionId)
parts = _extractPages(fileData, section["startPage"], section["endPage"])
await knowledgeStore.cacheExtraction(fileId, sectionId, parts)
return _formatSectionResult(parts)
```
**Stufe 3 -- Deep Analysis (nur bei explizitem Bedarf):**
Vision AI für Bilder, Tabellen-Extraktion, OCR -- nur für einzelne angeforderte Elemente.
### 5.3 Kostenvergleich
| Szenario | Alt (Pipeline) | Neu (Smart) |
|----------|----------------|-------------|
| "Fasse Kapitel 3 zusammen" (8/500 Seiten) | 500 Seiten, 500+ AI-Calls | Pre-Scan + 8 Seiten + 1 AI-Call |
| "Was steht auf Seite 47?" | 500 Seiten, 500+ AI-Calls | Pre-Scan + 1 Seite + 1 AI-Call |
| Zweite Frage zum selben Dokument | Erneut 500 Seiten | Aus Knowledge Store, 0 Extraktion |
| Gesamtanalyse | 500+ AI-Calls | ~21 AI-Calls (20 Section-Summaries + 1 Meta-Summary) |
### 5.4 Container- und Content-Modell
Das System unterscheidet klar zwischen **physischen Strukturen** (Container, Files) und **logischen Content-Objekten** (Text, Bild, Audio, Video). Die gesamte Extraktion bis zu den Content-Objekten erfolgt **ohne AI**.
#### Physische Schicht: Container-Hierarchie
Container sind verschachtelt (n,m : 0..i):
```mermaid
graph TD
ZIP["ZIP (Container)"]
FA["folder-a/ (Container)"]
FB["folder-b/ (Container)"]
PDF["report.pdf (File → Typ-Extractor)"]
IMG["photo.png (File → Typ-Extractor)"]
DOCX["vertrag.docx (File → Typ-Extractor)"]
CSV["data.csv (File → Typ-Extractor)"]
ZIP --> FA
ZIP --> FB
FA --> PDF
FA --> IMG
FB --> DOCX
FB --> CSV
```
| Ebene | Regel |
|-------|-------|
| **ZIP / TAR / GZ** | Container → enthält n Folders + m Files |
| **Folder** | Container → enthält n Folders + m Files |
| **File** | Hat einen bestimmten Typ → wird vom passenden Extractor verarbeitet |
**File-Typen und ihre Container-Eigenschaft:**
| File-Typ | Container? | Struktur |
|----------|-----------|----------|
| PDF | Ja | Seiten als Kontext-Referenz, jede Seite enthält Content-Objekte |
| DOCX | Ja | Abschnitte/Paragraphen als Kontext-Referenz (Seiten nur als Referenz, nicht als Container) |
| PPTX | Ja | Slides als Kontext-Referenz, jeder Slide enthält Content-Objekte |
| XLSX | Ja | Sheets als Kontext-Referenz, jedes Sheet enthält Content-Objekte |
| PNG/JPG/GIF | Nein | File selbst ist ein einzelnes Content-Objekt (image) |
| MP4/WebM | Nein | File selbst ist ein einzelnes Content-Objekt (videostream) |
| MP3/WAV | Nein | File selbst ist ein einzelnes Content-Objekt (audiostream) |
| TXT/CSV/JSON/XML | Nein | File selbst ist ein einzelnes Content-Objekt (text) |
#### Logische Schicht: Content-Objekte
Jeder Extractor liefert **skalare Content-Objekte** mit Kontext-Referenz zum Ursprung im File. Es gibt genau fünf Content-Typen:
| ContentType | Beschreibung | Beispiel |
|-------------|-------------|----------|
| `text` | Textinhalt (Fliesstext, Tabelle als Markdown, Code) | Paragraph auf Seite 3, Tabelle in Sheet "Q4" |
| `image` | Einzelbild (als Base64 oder Referenz) | Bild auf Seite 5 unten links, Caption "Übersicht" |
| `videostream` | Video-Inhalt | Eingebettetes Video in PPTX Slide 7 |
| `audiostream` | Audio-Inhalt | Voice-Memo Attachment |
| `other` | Nicht klassifizierbar (Binary, Formeln, etc.) | Eingebettetes OLE-Objekt |
**Content-Objekt Datenmodell:**
```python
class ContentObject(BaseModel):
id: str
fileId: str # FK zum physischen File
contentType: str # text, image, videostream, audiostream, other
data: str # Inhalt (Text, Base64, URL)
contextRef: ContentContextRef # Kontext-Referenz zum Ursprung
metadata: Dict[str, Any] = {} # Extractor-spezifische Metadaten
sequence: int = 0 # Reihenfolge innerhalb des Kontexts
class ContentContextRef(BaseModel):
"""Referenz zum Kontext im Container/File."""
containerPath: str # z.B. "archiv.zip/folder-a/report.pdf"
location: str # z.B. "page:5/region:bottomLeft"
label: Optional[str] = None # z.B. "Abbildung 3: Übersicht"
pageIndex: Optional[int] = None # Seiten-Nummer (wenn relevant)
sectionId: Optional[str] = None # Abschnitt/Heading (wenn relevant)
sheetName: Optional[str] = None # Sheet-Name (XLSX)
slideIndex: Optional[int] = None # Slide-Nummer (PPTX)
```
#### Extraktions-Pipeline (komplett ohne AI)
```mermaid
graph LR
Input["ZIP / Folder / File"]
subgraph phase1 ["Phase 1: Container auflösen"]
Unpack["Entpacken\nrekursiv"]
FileList["File-Liste\nmit Typen"]
end
subgraph phase2 ["Phase 2: Content extrahieren"]
Ext["Typ-Extractor\npro File"]
CO["Content-Objekte\nmit ContextRef"]
end
subgraph phase3 ["Phase 3: Indexierung"]
Idx["ContentIndex\npro File"]
KS["Knowledge Store"]
end
Input --> phase1
phase1 --> phase2
phase2 --> phase3
```
```python
async def _extractFileContents(fileId: str, fileData: bytes, mimeType: str) -> FileContentIndex:
"""Extrahiert alle Content-Objekte eines Files. Kein AI-Call."""
extractor = extractorRegistry.resolve(mimeType)
contentObjects = await extractor.extract(fileData)
contentIndex = FileContentIndex(
fileId=fileId,
totalObjects=len(contentObjects),
structure=_buildStructureMap(contentObjects),
objects=contentObjects
)
await knowledgeStore.upsertContentIndex(contentIndex)
return contentIndex
```
**Rekursive Container-Auflösung:**
```python
MAX_TOTAL_EXTRACTED_SIZE = 500 * 1024 * 1024 # 500 MB
MAX_FILE_COUNT = 10000
async def _resolveContainerRecursive(containerData, containerPath="", depth=0, maxDepth=5,
_state=None):
"""Löst Container rekursiv auf bis zu den Files. Kein AI-Call.
Schutz vor ZIP-Bombs: maxDepth, maxTotalExtractedSize, maxFileCount, Symlink-Blockierung."""
if _state is None:
_state = {"totalSize": 0, "fileCount": 0}
allFiles = []
entries = _listEntries(containerData, blockSymlinks=True)
for entry in entries:
entryPath = f"{containerPath}/{entry.name}" if containerPath else entry.name
if entry.isContainer: # Folder oder verschachteltes ZIP
childData = _extractEntry(containerData, entry)
childFiles = await _resolveContainerRecursive(
childData, entryPath, depth + 1, maxDepth
)
allFiles.extend(childFiles)
else:
_state["totalSize"] += entry.size
_state["fileCount"] += 1
if _state["totalSize"] > MAX_TOTAL_EXTRACTED_SIZE:
raise ContainerLimitError(f"Total extracted size exceeds {MAX_TOTAL_EXTRACTED_SIZE}")
if _state["fileCount"] > MAX_FILE_COUNT:
raise ContainerLimitError(f"File count exceeds {MAX_FILE_COUNT}")
allFiles.append(FileEntry(
path=entryPath,
data=_extractEntry(containerData, entry),
mimeType=_detectMimeType(entry.name),
size=entry.size
))
return allFiles
```
#### File Content Index (Metadaten pro File)
Jedes File erhält einen `FileContentIndex` als Metadaten (kanonische Definition siehe Abschnitt 4.4). Zusätzlich enthält jeder Index `ContentObjectSummary`-Einträge:
```python
class ContentObjectSummary(BaseModel):
"""Kompakte Beschreibung eines Content-Objekts im Index."""
id: str
contentType: str # text, image, videostream, audiostream, other
contextRef: ContentContextRef
charCount: Optional[int] = None # Nur für text
dimensions: Optional[str] = None # Nur für image/video (z.B. "1920x1080")
duration: Optional[float] = None # Nur für audio/video (Sekunden)
```
Beispiel für einen PDF-ContentIndex:
```
report.pdf → FileContentIndex:
totalObjects: 47
structure: {pages: 12, images: 8, tables: 5, textBlocks: 34}
objectSummary:
- {id: "co-1", type: "text", contextRef: {page: 1, location: "header"}, charCount: 245}
- {id: "co-2", type: "text", contextRef: {page: 1, location: "body"}, charCount: 1830}
- {id: "co-3", type: "image", contextRef: {page: 1, location: "bottom", label: "Abb. 1"}, dimensions: "800x600"}
- ...
```
#### AI arbeitet auf skalaren Content-Objekten
Nach der Extraktion hat der Agent nur noch skalare Content-Objekte vor sich. AI-Calls werden **gezielt** auf einzelne Objekte angewendet:
```
Agent: "Analysiere Kapitel 3 des Reports"
1. browseContainer("report.pdf") → ContentIndex (47 Objekte, Struktur)
2. Agent sieht: Kapitel 3 = Seiten 5-8, 12 Content-Objekte
3. readContentObjects(fileId, filter={pageIndex: [5,6,7,8]}) → 12 skalare Objekte
4. AI-Call mit den 12 Objekten als Kontext → Analyse
```
#### Container-Tools für den Agent
| Tool | Funktion |
|------|----------|
| `browseContainer` | Zeigt ContentIndex / Baumstruktur eines Containers (ZIP, Folder, PDF, etc.) |
| `readContentObjects` | Liest spezifische Content-Objekte nach Filter (Seite, Typ, Section) |
| `extractContainerItem` | Extrahiert on demand ein Element, das noch nicht im Knowledge Store ist |
**Prompt-Referenzen:** Im Unified UI können Container direkt referenziert werden:
- `@archiv.zip` -- Agent sieht Baumstruktur, extrahiert gezielt
- `@projektordner/` -- Agent browst Ordnerinhalt
- `@report.pdf` -- Agent sieht ContentIndex mit allen Content-Objekten
---
## 6. Document Tools
Tools für den Agent -- arbeiten auf der physischen (Container) und logischen (Content-Objekte) Schicht:
**Container-Tools (physische Schicht):**
| Tool | Funktion | Intern |
|------|----------|--------|
| `browseContainer` | Baumstruktur + ContentIndex eines Containers anzeigen | Container-Auflösung + FileContentIndex |
| `extractContainerItem` | Spezifisches File aus Container on demand extrahieren | Rekursive Pipeline → Content-Objekte → Knowledge Store |
**Content-Tools (logische Schicht, skalare Objekte):**
| Tool | Funktion | Intern |
|------|----------|--------|
| `readContentObjects` | Content-Objekte nach Filter lesen (Seite, Typ, Section) | Knowledge Store / on-demand Extraktion |
| `summarizeContent` | AI-basierte Zusammenfassung von Content-Objekten | serviceAi.callAi() mit Content-Objekten als Input |
| `analyzeContent` | AI-basierte Analyse mit Prompt über Content-Objekte | serviceAi.callAi() |
| `generateDocument` | DOCX/PDF/CSV erzeugen | Bestehende Renderer |
| `editDocument` | Bestehendes Dokument ändern | Neue Logik |
---
## 7. File Management
### 7.1 FileItem erweitern
```python
class FileItem(BaseModel):
# bestehend:
id, mandateId, featureInstanceId, fileName, mimeType, fileHash, fileSize, creationDate
# NEU:
tags: List[str] = []
folderId: Optional[str] = None
description: Optional[str] = ""
status: str = "active" # active, archived, processing
```
### 7.2 FileFolder (neu)
```python
class FileFolder(BaseModel):
id: str
parentId: Optional[str] = None
name: str
featureInstanceId: str
mandateId: str
createdBy: str
autoRule: Optional[str] = None # z.B. "tag:financial"
```
### 7.3 File Tools
| Tool | Funktion |
|------|----------|
| `listFiles` | Auflisten mit Filter (tags, folder, mimeType, pattern) |
| `readFile` | Inhalt lesen |
| `writeFile` | Erstellen oder überschreiben |
| `moveFile` | In Ordner verschieben |
| `tagFile` | Tags hinzufügen/entfernen |
| `searchFiles` | Volltextsuche über Inhalte |
### 7.4 File-Referenzen im Prompt
- `@filename.pdf` -- Autocomplete-Referenz
- Drag-and-Drop Upload -- automatisch als File gespeichert + indexiert
- Agent erhält `fileIds` und arbeitet via Tools
---
## 8. Data Containers (externe Datenquellen)
### 8.1 Konzept
User kann externe Datenquellen aus UserConnections (OAuth: Microsoft, Google) im Chat referenzieren. Konfiguration pro FeatureInstance.
### 8.2 DataSource (neu)
```python
class DataSource(BaseModel):
id: str
connectionId: str # FK zu UserConnection
sourceType: str # sharepointFolder, googleDriveFolder, outlookFolder, ftpFolder
path: str # z.B. "/sites/MySite/Documents/Reports"
label: str
featureInstanceId: Optional[str]
autoSync: bool = False
lastSynced: Optional[float]
```
### 8.3 Provider-Connector-Architektur (1:n)
**Kernprinzip:** Ein Connector ist pro **Provider** (Lieferant), nicht pro Service. Eine MSFT-Connection bedient SharePoint, Outlook, Teams etc. gleichzeitig. Die Beziehung ist 1 Provider : n Services.
**IST-Zustand:** SharePoint und Outlook haben je eigene Services, Connection-Helper und Method-Actions, obwohl beide über dieselbe MSFT-Connection laufen. Es gibt keine generische Abstraktion. Google Drive, FTP etc. sind nicht implementiert.
**Lösung: ProviderConnector + ServiceAdapter**
```mermaid
graph TB
AgentLoop["Agent Loop"]
GenTool["Generische Connection-Tools"]
subgraph providers ["ProviderConnector (1 pro Lieferant)"]
MSFT["MsftConnector\n1 Connection → n Services"]
Google["GoogleConnector\n1 Connection → n Services"]
FtpProv["FtpConnector\n1 Connection → 1 Service"]
end
subgraph msftServices ["MSFT Services (über 1 Token)"]
SP["SharePoint\nFiles, Sites"]
OL["Outlook\nMail, Calendar"]
Teams["Teams\nMessages, Channels"]
OneDrive["OneDrive\nFiles"]
end
subgraph googleServices ["Google Services (über 1 Token)"]
GDrive["Google Drive\nFiles"]
GMail["Gmail\nMail"]
end
AgentLoop --> GenTool
GenTool --> providers
MSFT --> msftServices
Google --> googleServices
```
```python
class ProviderConnector(ABC):
"""Ein Connector pro Provider. Verwaltet eine UserConnection + Token.
Bietet Zugriff auf n Services des Providers."""
def __init__(self, connection: UserConnection, accessToken: str):
self.connection = connection
self.accessToken = accessToken
@abstractmethod
def getAvailableServices(self) -> List[str]:
"""Welche Services bietet dieser Provider?"""
@abstractmethod
def getServiceAdapter(self, service: str) -> "ServiceAdapter":
"""Gibt den ServiceAdapter für einen bestimmten Service zurück."""
class ServiceAdapter(ABC):
"""Standardisierte Operationen pro Service eines Providers."""
@abstractmethod
async def browse(self, path: str, filter: str = None) -> List[ExternalEntry]: ...
@abstractmethod
async def download(self, path: str) -> bytes: ...
@abstractmethod
async def upload(self, path: str, data: bytes, fileName: str) -> ExternalEntry: ...
@abstractmethod
async def search(self, query: str, path: str = None) -> List[ExternalEntry]: ...
```
**Konkrete Provider und ihre Services:**
| ProviderConnector | Authority | Services | Status |
|-------------------|-----------|----------|--------|
| `MsftConnector` | MSFT | `sharepoint` (Files, Sites), `outlook` (Mail, Calendar), `teams` (Messages), `onedrive` (Files) | Migration bestehender Services |
| `GoogleConnector` | GOOGLE | `drive` (Files), `gmail` (Mail) | Neu |
| `FtpConnector` | LOCAL | `files` (FTP/SFTP) | Neu |
| `JiraConnector` | LOCAL | `tickets` (Issues, Boards) | Migration bestehender Jira-Actions |
**ConnectorResolver:**
```python
class ConnectorResolver:
_providerRegistry = {
"msft": MsftConnector,
"google": GoogleConnector,
"local:ftp": FtpConnector,
"local:jira": JiraConnector,
}
async def resolve(self, connectionId: str) -> ProviderConnector:
"""Löst connectionId → Provider-Connector mit frischem Token auf."""
connection = await _getUserConnection(connectionId)
token = await _getFreshToken(connectionId)
providerClass = self._providerRegistry[connection.authority]
return providerClass(connection, token.tokenAccess)
async def resolveService(self, connectionId: str, service: str) -> ServiceAdapter:
"""Löst connectionId + service → konkreten ServiceAdapter auf."""
provider = await self.resolve(connectionId)
return provider.getServiceAdapter(service)
```
**Beispiel: MsftConnector (1 Connection → n Services):**
```python
class MsftConnector(ProviderConnector):
def getAvailableServices(self) -> List[str]:
return ["sharepoint", "outlook", "teams", "onedrive"]
def getServiceAdapter(self, service: str) -> ServiceAdapter:
adapters = {
"sharepoint": SharepointAdapter(self.accessToken),
"outlook": OutlookAdapter(self.accessToken),
"teams": TeamsAdapter(self.accessToken),
"onedrive": OneDriveAdapter(self.accessToken),
}
return adapters[service]
```
### 8.4 Connection Tools (generisch)
Ein **generisches Tool-Set** -- der Agent gibt `connectionId` + `service` an, der Rest läuft über den Provider:
| Tool | Funktion | Intern |
|------|----------|--------|
| `listConnections` | Verfügbare Connections + ihre Services anzeigen | UserConnections + `getAvailableServices()` |
| `externalBrowse` | Ordner/Files einer externen Quelle listen | `resolveService(connId, service).browse(path)` |
| `externalDownload` | Datei herunterladen → lokales File + Auto-Index | `adapter.download(path)``saveUploadedFile` → Knowledge Store |
| `externalUpload` | Lokales File in externe Quelle hochladen | `adapter.upload(path, data)` |
| `externalSearch` | In externer Quelle suchen | `adapter.search(query)` |
| `sendMail` | E-Mail senden (Mail-spezifische Parameter) | `resolveService(connId, "outlook").send()` |
Die bestehenden `MethodSharepoint`- und `MethodOutlook`-Actions werden schrittweise hinter `MsftConnector``SharepointAdapter` / `OutlookAdapter` migriert. Während der Migration fungiert der `ActionToolAdapter` (siehe Abschnitt 10.3) als Brücke.
### 8.5 Ablauf
```
User: "Analysiere die letzten 3 Reports aus SharePoint/Reports"
Agent → externalBrowse(connectionId="msft-123", service="sharepoint", path="/Reports", filter="*.pdf")
Agent → externalDownload(connectionId="msft-123", service="sharepoint", path="/Reports/Q4-Report.pdf")
→ File wird lokal gespeichert + automatisch im Knowledge Store indexiert
Agent → readContentObjects(fileId="abc123", filter={contentType: "text"})
→ Bereits indexiert, Content-Objekte aus Knowledge Store
Agent → Antwortet mit Analyse
```
Gleicher Provider, anderer Service:
```
User: "Sende die Analyse per Mail an das Team"
Agent → sendMail(connectionId="msft-123", to=["team@firma.ch"], subject="Q4 Analyse", body="...")
→ Selbe MSFT-Connection, Service "outlook"
```
Anderer Provider, identisches Tool-Interface:
```
User: "Lade das Budget aus meinem Google Drive"
Agent → externalBrowse(connectionId="google-456", service="drive", path="/Finanzen", filter="Budget*")
Agent → externalDownload(connectionId="google-456", service="drive", path="/Finanzen/Budget-2026.xlsx")
→ Identischer Flow: lokales File + Knowledge Store
```
---
## 9. Unified Workspace UI
### 9.1 Kernidee
Der Codeeditor hat die beste UI-Architektur (SSE Streaming, Agent-Progress, File-Management). Dieses UI wird zur Grundlage für alle AI-Interaktionen. Chatbot und Playground gehen darin auf.
### 9.2 Layout
```mermaid
graph LR
subgraph unified["Unified AI Workspace"]
subgraph left["Linke Sidebar"]
ConvList["Conversation List"]
FileTree["File Browser\nFolders + Tags"]
DataSrc["Data Sources\nSharePoint, Drive"]
end
subgraph center["Hauptbereich"]
ChatStream["Chat / Streaming\nMarkdown + Code + Edits\nTabellen + Charts"]
end
subgraph right["Rechte Sidebar"]
FilePreview["File Preview / Editor"]
ToolActivity["Tool Activity Log"]
end
subgraph bottom["Input-Bereich"]
PromptInput["Text + Voice"]
AttachBar["@file + Upload + DataSources"]
ActionBar["Send / Stop / Voice"]
end
end
```
### 9.3 UI-Komponenten
**Linke Sidebar:**
- **Conversation List:** Alle Workflows/Chats, sortiert nach Datum
- **File Browser:** Lokale Files mit Ordnerstruktur und Tags, Upload per Drag-and-Drop
- **Data Sources:** Konfigurierte externe Quellen (SharePoint, Drive) pro FeatureInstance
**Hauptbereich:**
- **Chat Stream:** SSE-basiertes Streaming mit Token-Chunks, Markdown-Rendering, Code-Blöcke, File-Edit-Proposals, Tabellen, Charts
**Rechte Sidebar (kontextabhängig):**
- **File Preview / Editor:** Vorschau oder Bearbeitung ausgewählter Files
- **Tool Activity Log:** Zeigt in Echtzeit, welche Tools der Agent nutzt
**Input-Bereich:**
- Text-Eingabe mit `@file`-Autocomplete
- Voice-Toggle (Mikrofon-Button)
- Attachment-Bar mit aktiven Files und DataSources
- Send / Stop / Voice-Buttons
### 9.4 SSE Event-Typen
Bestehend (vom Codeeditor übernommen):
| Event | Funktion |
|-------|----------|
| `message` | Text-Nachricht (user/assistant) |
| `status` | Progress-Label |
| `fileEditProposal` | Datei-Änderungsvorschlag |
| `fileVersion` | Akzeptierte Änderung |
| `agentProgress` | Round/Tool-Call Fortschritt |
| `agentSummary` | Abschluss-Statistik |
| `complete` / `stopped` / `error` | Workflow-Ende |
Neu:
| Event | Funktion |
|-------|----------|
| `chunk` | Token-Streaming für Echtzeit-Textausgabe |
| `toolCall` | Agent ruft Tool auf (für Activity Log) |
| `toolResult` | Tool-Ergebnis (für Activity Log) |
| `fileCreated` | Neues File erstellt |
| `dataSourceAccess` | Zugriff auf externe Quelle |
| `voiceResponse` | TTS Audio-Daten |
### 9.5 Voice Integration
- **STT:** Browser Web Speech API oder Whisper API. Mikrofon-Button im Input-Bereich. Transkribierter Text wird als Prompt gesendet.
- **TTS:** Agent-Antworten optional vorlesen. Nutzt bestehende `interfaceVoiceObjects` oder Browser TTS.
- **Voice-Toggle:** Input via Mikrofon, Output via TTS.
```
POST /api/workspace/{instanceId}/voice/transcribe → { "text": "..." }
POST /api/workspace/{instanceId}/voice/synthesize → audio blob
```
### 9.6 Prompt Input Request
```python
class WorkspaceInputRequest(BaseModel):
prompt: str
fileIds: List[str] = []
uploadedFiles: List[UploadFile] = []
dataSourceIds: List[str] = []
voiceMode: bool = False
workflowId: Optional[str] = None
userLanguage: str = "en"
```
---
## 10. Feature Integration
### 10.1 Unified Workspace ersetzt drei Features
Chatbot, Codeeditor und Playground migrieren auf `serviceAgent` + Unified UI. Die Unterscheidung erfolgt über die Tool-Konfiguration pro FeatureInstance:
| Feature (bisher) | Tool-Set |
|-------------------|----------|
| Chatbot (SQL) | `core` + `sqlQuery` + `webSearch` |
| Chatbot (Tavily) | `core` + `webSearch` |
| Codeeditor | `core` + `readFile` + `writeFile` + `applyFileEdit` + `searchFiles` |
| Playground | `core` + `webSearch` + `sendMail` |
| Trustee | `core` + `connectionTools` + `documentAnalysis` |
`core` = readFile, writeFile, listFiles, extractContent, summarizeContent, generateDocument
### 10.2 serviceAi als Low-Level-Layer
`serviceAi` bleibt bestehen für:
- Modell-Auswahl und Fallback
- Billing pro Call
- Provider-Routing (OpenAI, Anthropic, PrivateLLM)
- Native function calling Support
Die Orchestrierungs-Logik (Structure Filling, Looping, JSON Repair) wandert in den Agent.
### 10.3 Method/Action-Integration (bestehende Workflow-Actions als Tools)
**IST-Zustand:** Das Workflow-System hat **7 Methods mit 36 Actions**, registriert via `methodDiscovery.py`. Jede Action hat `actionId`, `description`, `parameters` (Schema) und `execute` (async). 22 davon haben `dynamicMode=True` und werden bereits dynamisch von AI ausgewählt. Die Ausführung läuft über `actionExecutor.executeAction(method, action, params)`.
| Kategorie | Methods | Actions | dynamicMode |
|-----------|---------|---------|-------------|
| **AI** | ai | process, webResearch, summarizeDocument, translateDocument, convertDocument, generateDocument, generateCode | Ja |
| **External** | sharepoint, outlook | 9 SharePoint + 4 Outlook Actions | Ja |
| **Domain** | context, trustee | getDocumentIndex, extractContent, neutralizeData, extractFromFiles, processDocuments, syncToAccounting | Teilweise |
| **Integration** | jira, chatbot | 8 Jira + 1 Chatbot Actions | Nein/Teilweise |
**Lösung: 3-Stufen-Migration**
```mermaid
graph LR
subgraph stufe1 ["Stufe 1: Automatisches Wrapping"]
Actions["Bestehende Actions\n36 Actions"]
Adapter["ActionToolAdapter"]
ToolReg["Tool Registry"]
end
subgraph stufe2 ["Stufe 2: Refactoring"]
NativeTools["Native Agent Tools"]
ConnTools["ProviderConnector Tools"]
end
subgraph stufe3 ["Stufe 3: Abloesung"]
AgentNative["Agent-native Logik\nersetzt AI-Actions"]
end
Actions --> Adapter
Adapter --> ToolReg
ToolReg --> NativeTools
NativeTools --> ConnTools
ConnTools --> AgentNative
```
**Stufe 1 -- Automatisches Wrapping (sofort, kein Rewrite):**
Der `ActionToolAdapter` wandelt jede Action mit `dynamicMode=True` automatisch in ein Agent-Tool um. Kein Code-Rewrite nötig:
```python
class ActionToolAdapter:
"""Wrapped bestehende Workflow-Actions als Agent-Tools."""
def _buildToolsFromActions(self) -> List[ToolDefinition]:
tools = []
for methodName, method in methods.items():
for actionName, action in method["actions"].items():
if not action.get("dynamicMode"):
continue
tools.append(ToolDefinition(
name=f"{methodName}.{actionName}",
description=action["description"],
parameters=_convertParameterSchema(action["parameters"])
))
return tools
async def dispatch(self, toolName: str, args: Dict) -> ToolResult:
methodName, actionName = toolName.split(".", 1)
result = await actionExecutor.executeAction(methodName, actionName, args)
return ToolResult(
success=result.success,
data=_formatActionResult(result),
error=result.error
)
```
Vorteil: Alle 22 dynamischen Actions sofort als Agent-Tools verfügbar.
**Stufe 2 -- Schrittweises Refactoring:**
- **External-Actions** (SharePoint, Outlook, Jira) → hinter `ProviderConnector` + `ServiceAdapter` migrieren → generische `externalBrowse/Download/Upload/Search` Tools (siehe Abschnitt 8.3)
- **Domain-Actions** (Trustee, Context) → bleiben als spezialisierte Tools, werden direkt in der `toolRegistry` registriert statt via Adapter
**Stufe 3 -- AI-Actions ablösen:**
- `ai.process`, `ai.generateDocument`, `ai.generateCode` etc. werden NICHT als Tools exponiert
- Diese Fähigkeiten sind im Agent selbst (der Agent IST die AI) -- er nutzt direkt `extractContent`, `generateDocument`, `readSection` etc.
- `subStructureFilling`, `subStructureGeneration`, `subAiCallLooping` entfallen
**Kein kompletter Rewrite nötig:** Die bestehenden Methods/Actions bleiben initial funktionsfähig (Chatplayground, Automation nutzen sie weiter). Der Agent nutzt sie via Adapter. Schrittweise Migration ohne Breaking Changes.
---
## 11. Background Workflows
### 11.1 Persistente Workflow-Queue
Background-Workflows dürfen nicht auf `asyncio.create_task()` basieren (Server-Restart = Tasks verloren). Stattdessen eine **DB-basierte Queue** auf bestehendem PostgreSQL:
```python
class BackgroundJob(BaseModel):
"""Persistierter Background-Job in PostgreSQL."""
id: str
workflowId: str
userId: str
featureInstanceId: str
status: str # queued, running, completed, failed, cancelled
description: str
subtasks: List[Dict[str, Any]]
maxCostCHF: Optional[float] # Billing-Cap
maxConcurrency: int = 1
retryCount: int = 0
maxRetries: int = 3
createdAt: float
startedAt: Optional[float]
completedAt: Optional[float]
error: Optional[str]
progress: float = 0.0
resultSummary: Optional[str]
```
**Worker-Prozess:** Ein Background-Worker pollt die Queue und führt Jobs aus:
```python
async def _processBackgroundQueue():
while True:
job = await backgroundQueue.dequeueNext()
if not job:
await asyncio.sleep(2)
continue
try:
await backgroundQueue.updateStatus(job.id, "running")
async for event in runAgent(job.subtasks, job.workflowId, ...):
await backgroundQueue.updateProgress(job.id, event)
# Billing-Cap prüfen
if job.maxCostCHF and await billingService.getWorkflowCost(job.workflowId) > job.maxCostCHF:
await backgroundQueue.updateStatus(job.id, "cancelled", error="budgetExceeded")
await _notifyUser(job.userId, f"Background-Job gestoppt: Budget überschritten")
break
await backgroundQueue.updateStatus(job.id, "completed")
except Exception as e:
if job.retryCount < job.maxRetries:
await backgroundQueue.requeueWithRetry(job.id)
else:
await backgroundQueue.updateStatus(job.id, "failed", error=str(e))
await _notifyUser(job.userId, f"Background-Job fehlgeschlagen: {e}")
```
### 11.2 Tools
```python
# Agent-Tool: Background-Job starten
async def startBackground(description, subtasks, maxCostCHF=None):
job = await backgroundQueue.enqueue(BackgroundJob(
workflowId=workflowId, description=description,
subtasks=subtasks, maxCostCHF=maxCostCHF, ...
))
return f"Background-Job {job.id} gestartet."
# Agent-Tool: Job-Status abfragen
async def checkBackgroundStatus(jobId):
job = await backgroundQueue.getJob(jobId)
return {"status": job.status, "progress": job.progress, "result": job.resultSummary}
```
### 11.3 Komplexe Ausgaben
Eine Agent-Antwort kann mehrere Ausgabe-Typen kombinieren:
- Text (Markdown) mit Token-Streaming
- Generierte/bearbeitete Files als Attachments
- Gestartete Hintergrund-Jobs mit Progress-Tracking
- Tabellen, Charts
- Voice-Ausgabe
- Externe Aktionen (Mail, SharePoint-Upload)
---
## 12. Observability und Monitoring
### 12.1 Agent-Tracing
Jeder Agent-Workflow wird vollständig protokolliert. Ein `AgentTrace` aggregiert alle Rounds, Tool-Calls und Kosten:
```python
class AgentTrace(BaseModel):
"""Vollständiges Protokoll eines Agent-Workflows."""
workflowId: str
userId: str
featureInstanceId: str
startedAt: float
completedAt: Optional[float]
status: str # running, completed, maxRoundsReached, budgetExceeded, error
totalRounds: int
totalToolCalls: int
totalCostCHF: float
abortReason: Optional[str]
rounds: List[AgentRoundLog]
class AgentRoundLog(BaseModel):
"""Log eines einzelnen Agent-Rounds."""
roundNumber: int
aiModel: str # Welches Modell wurde genutzt
inputTokens: int
outputTokens: int
costCHF: float
toolCalls: List[ToolCallLog] # Welche Tools mit welchen Args/Results
durationMs: int
class ToolCallLog(BaseModel):
"""Log eines einzelnen Tool-Calls."""
toolName: str
args: Dict[str, Any]
success: bool
durationMs: int
error: Optional[str]
```
### 12.2 Kosten-Transparenz
Die bestehende `BillingTransaction` erfasst einzelne AI-Calls. Zusätzlich wird pro Workflow eine **Kosten-Aggregation** geführt:
| Posten | Quelle | Tracking |
|--------|--------|----------|
| Agent-Rounds (AI-Calls) | `serviceAi.callAi()``billingCallback` | Automatisch via bestehende Billing-Integration |
| Entity Cache (Opt-in) | `_extractEntities()``serviceAi.callAi()` | Eigener `description`-Typ in BillingTransaction |
| Progressive Summaries | Summary-Calls → `serviceAi.callAi()` | Eigener `description`-Typ in BillingTransaction |
| Embeddings | `embeddingService.embed()` | Neuer Billing-Posten |
### 12.3 Monitoring
Metriken pro FeatureInstance / Mandate:
- Agent-Workflows pro Zeitraum (Anzahl, Durchschnittsdauer, Kosten)
- Tool-Nutzung (welche Tools, Erfolgsrate, Durchschnittsdauer)
- Budget-Auslastung (Kosten vs. maxCostCHF)
- Knowledge Store (Anzahl Files, Chunks, Embedding-Grösse)
- Abbruchgründe (maxRounds, Budget, Fehler)
---
## 13. Umsetzungsplan
```mermaid
graph LR
P1["Phase 1\nAgent Core\n+ ActionToolAdapter"]
P2["Phase 2\nKnowledge Store"]
P3["Phase 3\nSmart Documents\n+ Container Handling"]
P4["Phase 4\nDocument Tools"]
P5["Phase 5\nFile Management"]
P6["Phase 6\nData Containers\n+ ProviderConnectors"]
P7["Phase 7\nUnified UI"]
P8["Phase 8\nFeature Migration\n+ Action Refactoring"]
P9["Phase 9\nBackground Workflows"]
P1 --> P2
P1 --> P5
P2 --> P3
P3 --> P4
P5 --> P6
P4 --> P8
P6 --> P8
P7 --> P8
P8 --> P9
```
| Phase | Inhalt | Abhängigkeit |
|-------|--------|-------------|
| **1** | Agent Core: Loop, ToolRegistry, ConversationManager, AiCallRequest erweitern. **ActionToolAdapter**: bestehende 22 dynamicMode-Actions automatisch als Agent-Tools wrappen. serviceAgent als IMPORTABLE_SERVICE registrieren | Grundlage |
| **2** | Knowledge Store: pgvector, 3-Ebenen, Auto-Index bei Upload, RAG-Retrieval | Phase 1 |
| **3** | Smart Documents: Pre-Scan, On-Demand Extraction, Progressive Summarization. **Container Handling**: ContainerExtractor (ZIP/TAR), FolderExtractor, rekursive Extraktion, Lazy Loading | Phase 2 |
| **4** | Document Tools: extractContent, readSection, browseContainer, extractContainerItem mit Knowledge-Store-Integration | Phase 3 |
| **5** | File Management: Tags, Folders, File Tools. Container-Referenzen im Prompt (@archiv.zip, @ordner/) | Phase 1 (parallel zu 2-4) |
| **6** | Data Containers: DataSource, **ProviderConnector-Architektur** (1 Provider : n Services), ConnectorResolver, generische externalBrowse/Download/Upload/Search Tools. MsftConnector, GoogleConnector, FtpConnector | Phase 5 |
| **7** | Unified UI: Codeeditor-Basis, Streaming, Voice, Panels | Parallel zu 2-6 |
| **8** | Feature Integration: Chatbot + Codeeditor + Playground migrieren. **Action Refactoring**: External-Actions → ProviderConnector + ServiceAdapter, Domain-Actions → native Tools, AI-Actions entfallen (Agent ist die AI) | Phase 4 + 6 + 7 |
| **9** | Background Workflows: DB-basierte Queue (BackgroundJob), Worker-Prozess, Retry, Billing-Cap, Observability (AgentTrace, Monitoring) | Phase 8 |
---
## 14. Was bleibt, was geht
| Komponente | Status |
|------------|--------|
| `serviceAi.callAi()` + Modell-Fallback + Billing | Bleibt (Low-Level) |
| `serviceExtraction.extractContent()` | Bleibt + erweitert (Pre-Scan, On-Demand, Container Handling) |
| `aicoreModelSelector` + `aicoreModelRegistry` | Bleibt |
| `serviceStreaming` / EventManager | Bleibt (zentral für UI) |
| UserConnections + OAuth | Bleibt + erweitert (DataSource, ProviderConnector 1:n) |
| `interfaceVoiceObjects` (STT/TTS) | Bleibt (Unified UI) |
| SharePoint Graph API Service | Bleibt → wird zu `SharepointAdapter` hinter `MsftConnector` |
| PostgreSQL | Bleibt + erweitert (pgvector, FileContentIndex, ContentChunk, WorkflowMemory) |
| Workflow Methods/Actions (36 Actions) | Phase 1: ActionToolAdapter-Wrapping → Phase 8: schrittweise Migration |
| `MethodSharepoint` (9 Actions) | Migriert → `MsftConnector``SharepointAdapter` + generische Connection Tools |
| `MethodOutlook` (4 Actions) | Migriert → `MsftConnector``OutlookAdapter` + generische Connection Tools |
| `MethodAi` (7 Actions: process, generate, etc.) | Abgelöst (Agent ist die AI, nutzt Document Tools direkt) |
| `MethodJira` (8 Actions) | Migriert → `JiraConnector` + `JiraAdapter` |
| `MethodTrustee`, `MethodContext` | Bleiben als spezialisierte Agent-Tools |
| `actionExecutor` + `methodDiscovery` | Bleibt während Migration (Adapter), später optional |
| `subStructureFilling` (Pipeline) | Abgelöst (Agent + Document Tools + Knowledge Store) |
| `subStructureGeneration` | Abgelöst |
| `subAiCallLooping` (JSON Repair) | Abgelöst (Agent iteriert natürlich) |
| `trim_messages(strategy="last")` | Ersetzt durch Progressive Summarization |
| Codeeditor `codeEditorProcessor` | Migriert zu serviceAgent |
| Codeeditor UI (SSE, Events) | Basis für Unified UI |
| Chatbot LangGraph + DatabaseCheckpointer | Abgelöst (serviceAgent + Knowledge Store) |
| Chatplayground | Abgelöst (Unified UI) |
| `datamodelFiles.py` FileItem | Erweitert (tags, folders, description) |
| `BinaryExtractor` (ZIP Fallback) | Ersetzt durch `ContainerExtractor` (rekursive Extraktion) |